import base64 import errno import getpass import hashlib import os import shutil import sys from argparse import ArgumentTypeError from datetime import datetime, timezone, timedelta from io import StringIO, BytesIO import pytest from ..archiver.prune_cmd import prune_within, prune_split from .. import platform from ..constants import * # NOQA from ..helpers import Location from ..helpers import Buffer from ..helpers import ( partial_format, format_file_size, parse_file_size, format_timedelta, format_line, PlaceholderError, replace_placeholders, ) from ..helpers import remove_dotdot_prefixes, make_path_safe, clean_lines from ..helpers import interval from ..helpers import get_base_dir, get_cache_dir, get_keys_dir, get_security_dir, get_config_dir, get_runtime_dir from ..helpers import is_slow_msgpack from ..helpers import msgpack from ..helpers import yes, TRUISH, FALSISH, DEFAULTISH from ..helpers import StableDict, bin_to_hex from ..helpers import parse_timestamp, ChunkIteratorFileWrapper, ChunkerParams from ..helpers import archivename_validator, text_validator from ..helpers import ProgressIndicatorPercent from ..helpers import swidth_slice from ..helpers import chunkit from ..helpers import safe_ns, safe_s, SUPPORT_32BIT_PLATFORMS from ..helpers import popen_with_error_handling from ..helpers import dash_open from ..helpers import iter_separated from ..helpers import eval_escapes from ..helpers import safe_unlink from ..helpers import text_to_json, binary_to_json from ..helpers import classify_ec, max_ec from ..helpers.passphrase import Passphrase, PasswordRetriesExceeded from ..platform import is_cygwin, is_win32, is_darwin from . import FakeInputs, are_hardlinks_supported from . import rejected_dotdot_paths def test_bin_to_hex(): assert bin_to_hex(b"") == "" assert bin_to_hex(b"\x00\x01\xff") == "0001ff" @pytest.mark.parametrize( "key,value", [("key", b"\x00\x01\x02\x03"), ("key", b"\x00\x01\x02"), ("key", b"\x00\x01"), ("key", b"\x00"), ("key", b"")], ) def test_binary_to_json(key, value): key_b64 = key + "_b64" d = binary_to_json(key, value) assert key_b64 in d assert base64.b64decode(d[key_b64]) == value @pytest.mark.parametrize( "key,value,strict", [ ("key", "abc", True), ("key", "äöü", True), ("key", "", True), ("key", b"\x00\xff".decode("utf-8", errors="surrogateescape"), False), ("key", "äöü".encode("latin1").decode("utf-8", errors="surrogateescape"), False), ], ) def test_text_to_json(key, value, strict): key_b64 = key + "_b64" d = text_to_json(key, value) value_b = value.encode("utf-8", errors="surrogateescape") if strict: # no surrogate-escapes, just unicode text assert key in d assert d[key] == value_b.decode("utf-8", errors="strict") assert d[key].encode("utf-8", errors="strict") == value_b assert key_b64 not in d # not needed. pure valid unicode. else: # requiring surrogate-escapes. text has replacement chars, base64 representation is present. assert key in d assert d[key] == value.encode("utf-8", errors="replace").decode("utf-8", errors="strict") assert d[key].encode("utf-8", errors="strict") == value.encode("utf-8", errors="replace") assert key_b64 in d assert base64.b64decode(d[key_b64]) == value_b class TestLocationWithoutEnv: @pytest.fixture def keys_dir(self, tmpdir, monkeypatch): tmpdir = str(tmpdir) monkeypatch.setenv("BORG_KEYS_DIR", tmpdir) if not tmpdir.endswith(os.path.sep): tmpdir += os.path.sep return tmpdir def test_ssh(self, monkeypatch, keys_dir): monkeypatch.delenv("BORG_REPO", raising=False) assert ( repr(Location("ssh://user@host:1234/some/path")) == "Location(proto='ssh', user='user', host='host', port=1234, path='/some/path')" ) assert Location("ssh://user@host:1234/some/path").to_key_filename() == keys_dir + "host__some_path" assert ( repr(Location("ssh://user@host:1234/some/path")) == "Location(proto='ssh', user='user', host='host', port=1234, path='/some/path')" ) assert ( repr(Location("ssh://user@host/some/path")) == "Location(proto='ssh', user='user', host='host', port=None, path='/some/path')" ) assert ( repr(Location("ssh://user@[::]:1234/some/path")) == "Location(proto='ssh', user='user', host='::', port=1234, path='/some/path')" ) assert ( repr(Location("ssh://user@[::]:1234/some/path")) == "Location(proto='ssh', user='user', host='::', port=1234, path='/some/path')" ) assert Location("ssh://user@[::]:1234/some/path").to_key_filename() == keys_dir + "____some_path" assert ( repr(Location("ssh://user@[::]/some/path")) == "Location(proto='ssh', user='user', host='::', port=None, path='/some/path')" ) assert ( repr(Location("ssh://user@[2001:db8::]:1234/some/path")) == "Location(proto='ssh', user='user', host='2001:db8::', port=1234, path='/some/path')" ) assert ( repr(Location("ssh://user@[2001:db8::]:1234/some/path")) == "Location(proto='ssh', user='user', host='2001:db8::', port=1234, path='/some/path')" ) assert ( Location("ssh://user@[2001:db8::]:1234/some/path").to_key_filename() == keys_dir + "2001_db8____some_path" ) assert ( repr(Location("ssh://user@[2001:db8::]/some/path")) == "Location(proto='ssh', user='user', host='2001:db8::', port=None, path='/some/path')" ) assert ( repr(Location("ssh://user@[2001:db8::c0:ffee]:1234/some/path")) == "Location(proto='ssh', user='user', host='2001:db8::c0:ffee', port=1234, path='/some/path')" ) assert ( repr(Location("ssh://user@[2001:db8::c0:ffee]:1234/some/path")) == "Location(proto='ssh', user='user', host='2001:db8::c0:ffee', port=1234, path='/some/path')" ) assert ( repr(Location("ssh://user@[2001:db8::c0:ffee]/some/path")) == "Location(proto='ssh', user='user', host='2001:db8::c0:ffee', port=None, path='/some/path')" ) assert ( repr(Location("ssh://user@[2001:db8::192.0.2.1]:1234/some/path")) == "Location(proto='ssh', user='user', host='2001:db8::192.0.2.1', port=1234, path='/some/path')" ) assert ( repr(Location("ssh://user@[2001:db8::192.0.2.1]:1234/some/path")) == "Location(proto='ssh', user='user', host='2001:db8::192.0.2.1', port=1234, path='/some/path')" ) assert ( repr(Location("ssh://user@[2001:db8::192.0.2.1]/some/path")) == "Location(proto='ssh', user='user', host='2001:db8::192.0.2.1', port=None, path='/some/path')" ) assert ( Location("ssh://user@[2001:db8::192.0.2.1]/some/path").to_key_filename() == keys_dir + "2001_db8__192_0_2_1__some_path" ) assert ( repr(Location("ssh://user@[2a02:0001:0002:0003:0004:0005:0006:0007]/some/path")) == "Location(proto='ssh', user='user', " "host='2a02:0001:0002:0003:0004:0005:0006:0007', port=None, path='/some/path')" ) assert ( repr(Location("ssh://user@[2a02:0001:0002:0003:0004:0005:0006:0007]:1234/some/path")) == "Location(proto='ssh', user='user', " "host='2a02:0001:0002:0003:0004:0005:0006:0007', port=1234, path='/some/path')" ) def test_socket(self, monkeypatch, keys_dir): monkeypatch.delenv("BORG_REPO", raising=False) assert ( repr(Location("socket:///repo/path")) == "Location(proto='socket', user=None, host=None, port=None, path='/repo/path')" ) assert Location("socket:///some/path").to_key_filename() == keys_dir + "some_path" def test_file(self, monkeypatch, keys_dir): monkeypatch.delenv("BORG_REPO", raising=False) assert ( repr(Location("file:///some/path")) == "Location(proto='file', user=None, host=None, port=None, path='/some/path')" ) assert ( repr(Location("file:///some/path")) == "Location(proto='file', user=None, host=None, port=None, path='/some/path')" ) assert Location("file:///some/path").to_key_filename() == keys_dir + "some_path" def test_smb(self, monkeypatch, keys_dir): monkeypatch.delenv("BORG_REPO", raising=False) assert ( repr(Location("file:////server/share/path")) == "Location(proto='file', user=None, host=None, port=None, path='//server/share/path')" ) assert Location("file:////server/share/path").to_key_filename() == keys_dir + "server_share_path" def test_folder(self, monkeypatch, keys_dir): monkeypatch.delenv("BORG_REPO", raising=False) assert repr(Location("path")) == "Location(proto='file', user=None, host=None, port=None, path='path')" assert Location("path").to_key_filename() == keys_dir + "path" def test_long_path(self, monkeypatch, keys_dir): monkeypatch.delenv("BORG_REPO", raising=False) assert Location(os.path.join(*(40 * ["path"]))).to_key_filename() == keys_dir + "_".join(20 * ["path"]) + "_" def test_abspath(self, monkeypatch, keys_dir): monkeypatch.delenv("BORG_REPO", raising=False) assert ( repr(Location("/some/absolute/path")) == "Location(proto='file', user=None, host=None, port=None, path='/some/absolute/path')" ) assert ( repr(Location("/some/absolute/path")) == "Location(proto='file', user=None, host=None, port=None, path='/some/absolute/path')" ) assert Location("/some/absolute/path").to_key_filename() == keys_dir + "some_absolute_path" assert ( repr(Location("ssh://user@host/some/path")) == "Location(proto='ssh', user='user', host='host', port=None, path='/some/path')" ) assert Location("ssh://user@host/some/path").to_key_filename() == keys_dir + "host__some_path" def test_relpath(self, monkeypatch, keys_dir): monkeypatch.delenv("BORG_REPO", raising=False) assert ( repr(Location("some/relative/path")) == "Location(proto='file', user=None, host=None, port=None, path='some/relative/path')" ) assert ( repr(Location("some/relative/path")) == "Location(proto='file', user=None, host=None, port=None, path='some/relative/path')" ) assert Location("some/relative/path").to_key_filename() == keys_dir + "some_relative_path" assert ( repr(Location("ssh://user@host/./some/path")) == "Location(proto='ssh', user='user', host='host', port=None, path='/./some/path')" ) assert Location("ssh://user@host/./some/path").to_key_filename() == keys_dir + "host__some_path" assert ( repr(Location("ssh://user@host/~/some/path")) == "Location(proto='ssh', user='user', host='host', port=None, path='/~/some/path')" ) assert Location("ssh://user@host/~/some/path").to_key_filename() == keys_dir + "host__some_path" def test_with_colons(self, monkeypatch, keys_dir): monkeypatch.delenv("BORG_REPO", raising=False) assert ( repr(Location("/abs/path:w:cols")) == "Location(proto='file', user=None, host=None, port=None, path='/abs/path:w:cols')" ) assert ( repr(Location("/abs/path:with:colons")) == "Location(proto='file', user=None, host=None, port=None, path='/abs/path:with:colons')" ) assert ( repr(Location("/abs/path:with:colons")) == "Location(proto='file', user=None, host=None, port=None, path='/abs/path:with:colons')" ) assert Location("/abs/path:with:colons").to_key_filename() == keys_dir + "abs_path_with_colons" def test_canonical_path(self, monkeypatch): monkeypatch.delenv("BORG_REPO", raising=False) locations = [ "some/path", "file://some/path", "host:some/path", "host:~user/some/path", "socket:///some/path", "ssh://host/some/path", "ssh://user@host:1234/some/path", ] for location in locations: assert ( Location(location).canonical_path() == Location(Location(location).canonical_path()).canonical_path() ), ("failed: %s" % location) def test_bad_syntax(self): with pytest.raises(ValueError): # this is invalid due to the 2nd colon, correct: 'ssh://user@host/path' Location("ssh://user@host:/path") @pytest.mark.parametrize( "name", [ "foobar", # placeholders "foobar-{now}", ], ) def test_archivename_ok(name): archivename_validator(name) # must not raise an exception @pytest.mark.parametrize( "name", [ "", # too short "x" * 201, # too long # invalid chars: "foo/bar", "foo\\bar", ">foo", "= 100 b3 = buffer.get() assert b3 is b2 # still same buffer (200) buffer.resize(100, init=True) assert len(buffer) == 100 # except on init b4 = buffer.get() assert b4 is not b3 # new, smaller buffer def test_limit(self): buffer = Buffer(bytearray, size=100, limit=200) buffer.resize(200) assert len(buffer) == 200 with pytest.raises(Buffer.MemoryLimitExceeded): buffer.resize(201) assert len(buffer) == 200 def test_get(self): buffer = Buffer(bytearray, size=100, limit=200) b1 = buffer.get(50) assert len(b1) >= 50 # == 100 b2 = buffer.get(100) assert len(b2) >= 100 # == 100 assert b2 is b1 # did not need resizing yet b3 = buffer.get(200) assert len(b3) == 200 assert b3 is not b2 # new, resized buffer with pytest.raises(Buffer.MemoryLimitExceeded): buffer.get(201) # beyond limit assert len(buffer) == 200 def test_yes_input(): inputs = list(TRUISH) input = FakeInputs(inputs) for i in inputs: assert yes(input=input) inputs = list(FALSISH) input = FakeInputs(inputs) for i in inputs: assert not yes(input=input) def test_yes_input_defaults(): inputs = list(DEFAULTISH) input = FakeInputs(inputs) for i in inputs: assert yes(default=True, input=input) input = FakeInputs(inputs) for i in inputs: assert not yes(default=False, input=input) def test_yes_input_custom(): input = FakeInputs(["YES", "SURE", "NOPE"]) assert yes(truish=("YES",), input=input) assert yes(truish=("SURE",), input=input) assert not yes(falsish=("NOPE",), input=input) def test_yes_env(monkeypatch): for value in TRUISH: monkeypatch.setenv("OVERRIDE_THIS", value) assert yes(env_var_override="OVERRIDE_THIS") for value in FALSISH: monkeypatch.setenv("OVERRIDE_THIS", value) assert not yes(env_var_override="OVERRIDE_THIS") def test_yes_env_default(monkeypatch): for value in DEFAULTISH: monkeypatch.setenv("OVERRIDE_THIS", value) assert yes(env_var_override="OVERRIDE_THIS", default=True) assert not yes(env_var_override="OVERRIDE_THIS", default=False) def test_yes_defaults(): input = FakeInputs(["invalid", "", " "]) assert not yes(input=input) # default=False assert not yes(input=input) assert not yes(input=input) input = FakeInputs(["invalid", "", " "]) assert yes(default=True, input=input) assert yes(default=True, input=input) assert yes(default=True, input=input) input = FakeInputs([]) assert yes(default=True, input=input) assert not yes(default=False, input=input) with pytest.raises(ValueError): yes(default=None) def test_yes_retry(): input = FakeInputs(["foo", "bar", TRUISH[0]]) assert yes(retry_msg="Retry: ", input=input) input = FakeInputs(["foo", "bar", FALSISH[0]]) assert not yes(retry_msg="Retry: ", input=input) def test_yes_no_retry(): input = FakeInputs(["foo", "bar", TRUISH[0]]) assert not yes(retry=False, default=False, input=input) input = FakeInputs(["foo", "bar", FALSISH[0]]) assert yes(retry=False, default=True, input=input) def test_yes_output(capfd): input = FakeInputs(["invalid", "y", "n"]) assert yes(msg="intro-msg", false_msg="false-msg", true_msg="true-msg", retry_msg="retry-msg", input=input) out, err = capfd.readouterr() assert out == "" assert "intro-msg" in err assert "retry-msg" in err assert "true-msg" in err assert not yes(msg="intro-msg", false_msg="false-msg", true_msg="true-msg", retry_msg="retry-msg", input=input) out, err = capfd.readouterr() assert out == "" assert "intro-msg" in err assert "retry-msg" not in err assert "false-msg" in err def test_yes_env_output(capfd, monkeypatch): env_var = "OVERRIDE_SOMETHING" monkeypatch.setenv(env_var, "yes") assert yes(env_var_override=env_var) out, err = capfd.readouterr() assert out == "" assert env_var in err assert "yes" in err def test_progress_percentage(capfd): pi = ProgressIndicatorPercent(1000, step=5, start=0, msg="%3.0f%%") pi.logger.setLevel("INFO") pi.show(0) out, err = capfd.readouterr() assert err == " 0%\n" pi.show(420) pi.show(680) out, err = capfd.readouterr() assert err == " 42%\n 68%\n" pi.show(1000) out, err = capfd.readouterr() assert err == "100%\n" pi.finish() out, err = capfd.readouterr() assert err == "\n" def test_progress_percentage_step(capfd): pi = ProgressIndicatorPercent(100, step=2, start=0, msg="%3.0f%%") pi.logger.setLevel("INFO") pi.show() out, err = capfd.readouterr() assert err == " 0%\n" pi.show() out, err = capfd.readouterr() assert err == "" # no output at 1% as we have step == 2 pi.show() out, err = capfd.readouterr() assert err == " 2%\n" def test_progress_percentage_quiet(capfd): pi = ProgressIndicatorPercent(1000, step=5, start=0, msg="%3.0f%%") pi.logger.setLevel("WARN") pi.show(0) out, err = capfd.readouterr() assert err == "" pi.show(1000) out, err = capfd.readouterr() assert err == "" pi.finish() out, err = capfd.readouterr() assert err == "" @pytest.mark.parametrize( "fmt, items_map, expected_result", [ ("{space:10}", {"space": " "}, " " * 10), ("{foobar}", {"bar": "wrong", "foobar": "correct"}, "correct"), ("{unknown_key}", {}, "{unknown_key}"), ("{key}{{escaped_key}}", {}, "{key}{{escaped_key}}"), ("{{escaped_key}}", {"escaped_key": 1234}, "{{escaped_key}}"), ], ) def test_partial_format(fmt, items_map, expected_result): assert partial_format(fmt, items_map) == expected_result def test_chunk_file_wrapper(): cfw = ChunkIteratorFileWrapper(iter([b"abc", b"def"])) assert cfw.read(2) == b"ab" assert cfw.read(50) == b"cdef" assert cfw.exhausted cfw = ChunkIteratorFileWrapper(iter([])) assert cfw.read(2) == b"" assert cfw.exhausted def test_chunkit(): it = chunkit("abcdefg", 3) assert next(it) == ["a", "b", "c"] assert next(it) == ["d", "e", "f"] assert next(it) == ["g"] with pytest.raises(StopIteration): next(it) with pytest.raises(StopIteration): next(it) it = chunkit("ab", 3) assert list(it) == [["a", "b"]] it = chunkit("", 3) assert list(it) == [] def test_clean_lines(): conf = """\ #comment data1 #data1 data2 data3 """.splitlines( keepends=True ) assert list(clean_lines(conf)) == ["data1 #data1", "data2", "data3"] assert list(clean_lines(conf, lstrip=False)) == ["data1 #data1", "data2", " data3"] assert list(clean_lines(conf, rstrip=False)) == ["data1 #data1\n", "data2\n", "data3\n"] assert list(clean_lines(conf, remove_empty=False)) == ["data1 #data1", "data2", "", "data3"] assert list(clean_lines(conf, remove_comments=False)) == ["#comment", "data1 #data1", "data2", "data3"] def test_format_line(): data = dict(foo="bar baz") assert format_line("", data) == "" assert format_line("{foo}", data) == "bar baz" assert format_line("foo{foo}foo", data) == "foobar bazfoo" def test_format_line_erroneous(): data = dict() with pytest.raises(PlaceholderError): assert format_line("{invalid}", data) with pytest.raises(PlaceholderError): assert format_line("{}", data) with pytest.raises(PlaceholderError): assert format_line("{now!r}", data) with pytest.raises(PlaceholderError): assert format_line("{now.__class__.__module__.__builtins__}", data) def test_replace_placeholders(): replace_placeholders.reset() # avoid overrides are spoiled by previous tests now = datetime.now() assert " " not in replace_placeholders("{now}") assert int(replace_placeholders("{now:%Y}")) == now.year def test_override_placeholders(): assert replace_placeholders("{uuid4}", overrides={"uuid4": "overridden"}) == "overridden" def working_swidth(): return platform.swidth("선") == 2 @pytest.mark.skipif(not working_swidth(), reason="swidth() is not supported / active") def test_swidth_slice(): string = "나윤선나윤선나윤선나윤선나윤선" assert swidth_slice(string, 1) == "" assert swidth_slice(string, -1) == "" assert swidth_slice(string, 4) == "나윤" assert swidth_slice(string, -4) == "윤선" @pytest.mark.skipif(not working_swidth(), reason="swidth() is not supported / active") def test_swidth_slice_mixed_characters(): string = "나윤a선나윤선나윤선나윤선나윤선" assert swidth_slice(string, 5) == "나윤a" assert swidth_slice(string, 6) == "나윤a" def utcfromtimestamp(timestamp): """Returns a naive datetime instance representing the timestamp in the UTC timezone""" return datetime.fromtimestamp(timestamp, timezone.utc).replace(tzinfo=None) def test_safe_timestamps(): if SUPPORT_32BIT_PLATFORMS: # ns fit into int64 assert safe_ns(2**64) <= 2**63 - 1 assert safe_ns(-1) == 0 # s fit into int32 assert safe_s(2**64) <= 2**31 - 1 assert safe_s(-1) == 0 # datetime won't fall over its y10k problem beyond_y10k = 2**100 with pytest.raises(OverflowError): utcfromtimestamp(beyond_y10k) assert utcfromtimestamp(safe_s(beyond_y10k)) > datetime(2038, 1, 1) assert utcfromtimestamp(safe_ns(beyond_y10k) / 1000000000) > datetime(2038, 1, 1) else: # ns fit into int64 assert safe_ns(2**64) <= 2**63 - 1 assert safe_ns(-1) == 0 # s are so that their ns conversion fits into int64 assert safe_s(2**64) * 1000000000 <= 2**63 - 1 assert safe_s(-1) == 0 # datetime won't fall over its y10k problem beyond_y10k = 2**100 with pytest.raises(OverflowError): utcfromtimestamp(beyond_y10k) assert utcfromtimestamp(safe_s(beyond_y10k)) > datetime(2262, 1, 1) assert utcfromtimestamp(safe_ns(beyond_y10k) / 1000000000) > datetime(2262, 1, 1) class TestPopenWithErrorHandling: @pytest.mark.skipif(not shutil.which("test"), reason='"test" binary is needed') def test_simple(self): proc = popen_with_error_handling("test 1") assert proc.wait() == 0 @pytest.mark.skipif( shutil.which("borg-foobar-test-notexist"), reason='"borg-foobar-test-notexist" binary exists (somehow?)' ) def test_not_found(self): proc = popen_with_error_handling("borg-foobar-test-notexist 1234") assert proc is None @pytest.mark.parametrize("cmd", ('mismatched "quote', 'foo --bar="baz', "")) def test_bad_syntax(self, cmd): proc = popen_with_error_handling(cmd) assert proc is None def test_shell(self): with pytest.raises(AssertionError): popen_with_error_handling("", shell=True) def test_dash_open(): assert dash_open("-", "r") is sys.stdin assert dash_open("-", "w") is sys.stdout assert dash_open("-", "rb") is sys.stdin.buffer assert dash_open("-", "wb") is sys.stdout.buffer def test_iter_separated(): # newline and utf-8 sep, items = "\n", ["foo", "bar/baz", "αáčő"] fd = StringIO(sep.join(items)) assert list(iter_separated(fd)) == items # null and bogus ending sep, items = "\0", ["foo/bar", "baz", "spam"] fd = StringIO(sep.join(items) + "\0") assert list(iter_separated(fd, sep=sep)) == ["foo/bar", "baz", "spam"] # multichar sep, items = "SEP", ["foo/bar", "baz", "spam"] fd = StringIO(sep.join(items)) assert list(iter_separated(fd, sep=sep)) == items # bytes sep, items = b"\n", [b"foo", b"blop\t", b"gr\xe4ezi"] fd = BytesIO(sep.join(items)) assert list(iter_separated(fd)) == items def test_eval_escapes(): assert eval_escapes("\\n\\0\\x23") == "\n\0#" assert eval_escapes("äç\\n") == "äç\n" @pytest.mark.skipif(not are_hardlinks_supported(), reason="hardlinks not supported") def test_safe_unlink_is_safe(tmpdir): contents = b"Hello, world\n" victim = tmpdir / "victim" victim.write_binary(contents) hard_link = tmpdir / "hardlink" os.link(str(victim), str(hard_link)) # hard_link.mklinkto is not implemented on win32 safe_unlink(hard_link) assert victim.read_binary() == contents @pytest.mark.skipif(not are_hardlinks_supported(), reason="hardlinks not supported") def test_safe_unlink_is_safe_ENOSPC(tmpdir, monkeypatch): contents = b"Hello, world\n" victim = tmpdir / "victim" victim.write_binary(contents) hard_link = tmpdir / "hardlink" os.link(str(victim), str(hard_link)) # hard_link.mklinkto is not implemented on win32 def os_unlink(_): raise OSError(errno.ENOSPC, "Pretend that we ran out of space") monkeypatch.setattr(os, "unlink", os_unlink) with pytest.raises(OSError): safe_unlink(hard_link) assert victim.read_binary() == contents class TestPassphrase: def test_passphrase_new_verification(self, capsys, monkeypatch): monkeypatch.setattr(getpass, "getpass", lambda prompt: "1234aöäü") monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "no") Passphrase.new() out, err = capsys.readouterr() assert "1234" not in out assert "1234" not in err monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "yes") passphrase = Passphrase.new() out, err = capsys.readouterr() assert "3132333461c3b6c3a4c3bc" not in out assert "3132333461c3b6c3a4c3bc" in err assert passphrase == "1234aöäü" monkeypatch.setattr(getpass, "getpass", lambda prompt: "1234/@=") Passphrase.new() out, err = capsys.readouterr() assert "1234/@=" not in out assert "1234/@=" in err def test_passphrase_new_empty(self, capsys, monkeypatch): monkeypatch.delenv("BORG_PASSPHRASE", False) monkeypatch.setattr(getpass, "getpass", lambda prompt: "") with pytest.raises(PasswordRetriesExceeded): Passphrase.new(allow_empty=False) out, err = capsys.readouterr() assert "must not be blank" in err def test_passphrase_new_retries(self, monkeypatch): monkeypatch.delenv("BORG_PASSPHRASE", False) ascending_numbers = iter(range(20)) monkeypatch.setattr(getpass, "getpass", lambda prompt: str(next(ascending_numbers))) with pytest.raises(PasswordRetriesExceeded): Passphrase.new() def test_passphrase_repr(self): assert "secret" not in repr(Passphrase("secret")) @pytest.mark.parametrize( "ec_range,ec_class", ( # inclusive range start, exclusive range end ((0, 1), "success"), ((1, 2), "warning"), ((2, 3), "error"), ((EXIT_ERROR_BASE, EXIT_WARNING_BASE), "error"), ((EXIT_WARNING_BASE, EXIT_SIGNAL_BASE), "warning"), ((EXIT_SIGNAL_BASE, 256), "signal"), ), ) def test_classify_ec(ec_range, ec_class): for ec in range(*ec_range): classify_ec(ec) == ec_class def test_ec_invalid(): with pytest.raises(ValueError): classify_ec(666) with pytest.raises(ValueError): classify_ec(-1) with pytest.raises(TypeError): classify_ec(None) @pytest.mark.parametrize( "ec1,ec2,ec_max", ( # same for modern / legacy (EXIT_SUCCESS, EXIT_SUCCESS, EXIT_SUCCESS), (EXIT_SUCCESS, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE), # legacy exit codes (EXIT_SUCCESS, EXIT_WARNING, EXIT_WARNING), (EXIT_SUCCESS, EXIT_ERROR, EXIT_ERROR), (EXIT_WARNING, EXIT_SUCCESS, EXIT_WARNING), (EXIT_WARNING, EXIT_WARNING, EXIT_WARNING), (EXIT_WARNING, EXIT_ERROR, EXIT_ERROR), (EXIT_WARNING, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE), (EXIT_ERROR, EXIT_SUCCESS, EXIT_ERROR), (EXIT_ERROR, EXIT_WARNING, EXIT_ERROR), (EXIT_ERROR, EXIT_ERROR, EXIT_ERROR), (EXIT_ERROR, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE), # some modern codes (EXIT_SUCCESS, EXIT_WARNING_BASE, EXIT_WARNING_BASE), (EXIT_SUCCESS, EXIT_ERROR_BASE, EXIT_ERROR_BASE), (EXIT_WARNING_BASE, EXIT_SUCCESS, EXIT_WARNING_BASE), (EXIT_WARNING_BASE + 1, EXIT_WARNING_BASE + 2, EXIT_WARNING_BASE + 1), (EXIT_WARNING_BASE, EXIT_ERROR_BASE, EXIT_ERROR_BASE), (EXIT_WARNING_BASE, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE), (EXIT_ERROR_BASE, EXIT_SUCCESS, EXIT_ERROR_BASE), (EXIT_ERROR_BASE, EXIT_WARNING_BASE, EXIT_ERROR_BASE), (EXIT_ERROR_BASE + 1, EXIT_ERROR_BASE + 2, EXIT_ERROR_BASE + 1), (EXIT_ERROR_BASE, EXIT_SIGNAL_BASE, EXIT_SIGNAL_BASE), ), ) def test_max_ec(ec1, ec2, ec_max): assert max_ec(ec1, ec2) == ec_max