From dc2f2f47a893dd4a4ac465a3ed5fdceab2e000a0 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Wed, 29 Jun 2022 22:51:23 +0200 Subject: [PATCH] rcreate: remove legacy encryption modes for new repos, fixes #6490 These are legacy crypto modes based on AES-CTR mode: (repokey|keyfile)[-blake2] New crypto modes with session keys and AEAD ciphers: (repokey|keyfile)[-blake2]-(aes-ocb|chacha20-poly1305) Tests needed some changes: - most used repokey/keyfile, changed to new modes - some nonce tests removed, the new crypto code does not generate the repo side nonces any more (were only used for AES-CTR) --- src/borg/archiver.py | 40 +-- src/borg/crypto/key.py | 21 +- src/borg/testsuite/archiver.py | 461 ++++++++++++++------------------- 3 files changed, 228 insertions(+), 294 deletions(-) diff --git a/src/borg/archiver.py b/src/borg/archiver.py index 8f7c4e449..1bb4c1487 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -48,6 +48,8 @@ try: from .compress import CompressionSpec, ZLIB, ZLIB_legacy, ObfuscateSize from .crypto.key import key_creator, key_argument_names, tam_required_file, tam_required from .crypto.key import RepoKey, KeyfileKey, Blake2RepoKey, Blake2KeyfileKey, FlexiKey + from .crypto.key import AESOCBRepoKey, CHPORepoKey, Blake2AESOCBRepoKey, Blake2CHPORepoKey + from .crypto.key import AESOCBKeyfileKey, CHPOKeyfileKey, Blake2AESOCBKeyfileKey, Blake2CHPOKeyfileKey from .crypto.keymanager import KeyManager from .helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR, EXIT_SIGNAL_BASE from .helpers import Error, NoManifestError, set_ec @@ -503,28 +505,32 @@ class Archiver: return EXIT_ERROR if args.key_mode == 'keyfile': - if isinstance(key, RepoKey): - key_new = KeyfileKey(repository) - elif isinstance(key, Blake2RepoKey): - key_new = Blake2KeyfileKey(repository) - elif isinstance(key, (KeyfileKey, Blake2KeyfileKey)): - print(f"Location already is {args.key_mode}") - return EXIT_SUCCESS + if isinstance(key, AESOCBRepoKey): + key_new = AESOCBKeyfileKey(repository) + elif isinstance(key, CHPORepoKey): + key_new = CHPOKeyfileKey(repository) + elif isinstance(key, Blake2AESOCBRepoKey): + key_new = Blake2AESOCBKeyfileKey(repository) + elif isinstance(key, Blake2CHPORepoKey): + key_new = Blake2CHPOKeyfileKey(repository) else: - raise Error("Unsupported key type") + print("Change not needed or not supported.") + return EXIT_WARNING if args.key_mode == 'repokey': - if isinstance(key, KeyfileKey): - key_new = RepoKey(repository) - elif isinstance(key, Blake2KeyfileKey): - key_new = Blake2RepoKey(repository) - elif isinstance(key, (RepoKey, Blake2RepoKey)): - print(f"Location already is {args.key_mode}") - return EXIT_SUCCESS + if isinstance(key, AESOCBKeyfileKey): + key_new = AESOCBRepoKey(repository) + elif isinstance(key, CHPOKeyfileKey): + key_new = CHPORepoKey(repository) + elif isinstance(key, Blake2AESOCBKeyfileKey): + key_new = Blake2AESOCBRepoKey(repository) + elif isinstance(key, Blake2CHPOKeyfileKey): + key_new = Blake2CHPORepoKey(repository) else: - raise Error("Unsupported key type") + print("Change not needed or not supported.") + return EXIT_WARNING for name in ('repository_id', 'enc_key', 'enc_hmac_key', 'id_key', 'chunk_seed', - 'tam_required', 'nonce_manager', 'cipher'): + 'tam_required', 'sessionid', 'cipher'): value = getattr(key, name) setattr(key_new, name, value) diff --git a/src/borg/crypto/key.py b/src/borg/crypto/key.py index 7dfda5851..e1c58c47a 100644 --- a/src/borg/crypto/key.py +++ b/src/borg/crypto/key.py @@ -98,7 +98,7 @@ def identify_key(manifest_data): if key_type == KeyType.PASSPHRASE: # legacy, see comment in KeyType class. return RepoKey - for key in AVAILABLE_KEY_TYPES: + for key in LEGACY_KEY_TYPES + AVAILABLE_KEY_TYPES: if key.TYPE == key_type: return key else: @@ -977,7 +977,7 @@ class CHPORepoKey(ID_HMAC_SHA_256, AEADKeyBase, FlexiKey): class Blake2AESOCBKeyfileKey(ID_BLAKE2b_256, AEADKeyBase, FlexiKey): TYPES_ACCEPTABLE = {KeyType.BLAKE2AESOCBKEYFILE, KeyType.BLAKE2AESOCBREPO} TYPE = KeyType.BLAKE2AESOCBKEYFILE - NAME = 'key file Blake2b AES-OCB' + NAME = 'key file BLAKE2b AES-OCB' ARG_NAME = 'keyfile-blake2-aes-ocb' STORAGE = KeyBlobStorage.KEYFILE CIPHERSUITE = AES256_OCB @@ -986,7 +986,7 @@ class Blake2AESOCBKeyfileKey(ID_BLAKE2b_256, AEADKeyBase, FlexiKey): class Blake2AESOCBRepoKey(ID_BLAKE2b_256, AEADKeyBase, FlexiKey): TYPES_ACCEPTABLE = {KeyType.BLAKE2AESOCBKEYFILE, KeyType.BLAKE2AESOCBREPO} TYPE = KeyType.BLAKE2AESOCBREPO - NAME = 'repokey Blake2b AES-OCB' + NAME = 'repokey BLAKE2b AES-OCB' ARG_NAME = 'repokey-blake2-aes-ocb' STORAGE = KeyBlobStorage.REPO CIPHERSUITE = AES256_OCB @@ -995,7 +995,7 @@ class Blake2AESOCBRepoKey(ID_BLAKE2b_256, AEADKeyBase, FlexiKey): class Blake2CHPOKeyfileKey(ID_BLAKE2b_256, AEADKeyBase, FlexiKey): TYPES_ACCEPTABLE = {KeyType.BLAKE2CHPOKEYFILE, KeyType.BLAKE2CHPOREPO} TYPE = KeyType.BLAKE2CHPOKEYFILE - NAME = 'key file Blake2b ChaCha20-Poly1305' + NAME = 'key file BLAKE2b ChaCha20-Poly1305' ARG_NAME = 'keyfile-blake2-chacha20-poly1305' STORAGE = KeyBlobStorage.KEYFILE CIPHERSUITE = CHACHA20_POLY1305 @@ -1004,16 +1004,23 @@ class Blake2CHPOKeyfileKey(ID_BLAKE2b_256, AEADKeyBase, FlexiKey): class Blake2CHPORepoKey(ID_BLAKE2b_256, AEADKeyBase, FlexiKey): TYPES_ACCEPTABLE = {KeyType.BLAKE2CHPOKEYFILE, KeyType.BLAKE2CHPOREPO} TYPE = KeyType.BLAKE2CHPOREPO - NAME = 'repokey Blake2b ChaCha20-Poly1305' + NAME = 'repokey BLAKE2b ChaCha20-Poly1305' ARG_NAME = 'repokey-blake2-chacha20-poly1305' STORAGE = KeyBlobStorage.REPO CIPHERSUITE = CHACHA20_POLY1305 +LEGACY_KEY_TYPES = ( + # legacy (AES-CTR based) crypto + KeyfileKey, RepoKey, + Blake2KeyfileKey, Blake2RepoKey, +) + AVAILABLE_KEY_TYPES = ( + # these are available encryption modes for new repositories + # not encrypted modes PlaintextKey, - KeyfileKey, RepoKey, AuthenticatedKey, - Blake2KeyfileKey, Blake2RepoKey, Blake2AuthenticatedKey, + AuthenticatedKey, Blake2AuthenticatedKey, # new crypto AESOCBKeyfileKey, AESOCBRepoKey, CHPOKeyfileKey, CHPORepoKey, diff --git a/src/borg/testsuite/archiver.py b/src/borg/testsuite/archiver.py index f7b11206a..7c34da238 100644 --- a/src/borg/testsuite/archiver.py +++ b/src/borg/testsuite/archiver.py @@ -36,7 +36,7 @@ from ..cache import Cache, LocalCache from ..chunker import has_seek_hole from ..constants import * # NOQA from ..crypto.low_level import bytes_to_long, num_cipher_blocks -from ..crypto.key import FlexiKey, RepoKey, KeyfileKey, Passphrase, TAMRequiredError +from ..crypto.key import FlexiKey, AESOCBRepoKey, AESOCBKeyfileKey, CHPOKeyfileKey, Passphrase, TAMRequiredError from ..crypto.keymanager import RepoIdMismatch, NotABorgKeyFile from ..crypto.file_integrity import FileIntegrityError from ..helpers import Location, get_security_dir @@ -59,6 +59,8 @@ from . import are_symlinks_supported, are_hardlinks_supported, are_fifos_support from .platform import fakeroot_detected, is_darwin from . import key +RK_ENCRYPTION = '--encryption=repokey-aes-ocb' +KF_ENCRYPTION = '--encryption=keyfile-chacha20-poly1305' src_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) @@ -391,7 +393,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_basic_functionality(self): have_root = self.create_test_files() # fork required to test show-rc output - output = self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey', '--show-version', '--show-rc', fork=True) + output = self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION, '--show-version', '--show-rc', fork=True) self.assert_in('borgbackup version', output) self.assert_in('terminating with success status, rc 0', output) self.cmd(f'--repo={self.repository_location}', 'create', '--exclude-nodump', 'test', 'input') @@ -481,7 +483,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): assert os.path.exists(parent_path) def test_unix_socket(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) try: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.bind(os.path.join(self.input_path, 'unix-socket')) @@ -499,7 +501,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): @pytest.mark.skipif(not are_symlinks_supported(), reason='symlinks not supported') def test_symlink_extract(self): self.create_test_files() - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') with changedir('output'): self.cmd(f'--repo={self.repository_location}', 'extract', 'test') @@ -512,7 +514,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): with changedir('input'): os.symlink('target', 'symlink1') os.link('symlink1', 'symlink2', follow_symlinks=False) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') with changedir('output'): output = self.cmd(f'--repo={self.repository_location}', 'extract', 'test') @@ -546,7 +548,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): atime, mtime = 123456780, 234567890 have_noatime = has_noatime('input/file1') os.utime('input/file1', (atime, mtime)) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', '--atime', 'test', 'input') with changedir('output'): self.cmd(f'--repo={self.repository_location}', 'extract', 'test') @@ -566,7 +568,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): birthtime, mtime, atime = 946598400, 946684800, 946771200 os.utime('input/file1', (atime, birthtime)) os.utime('input/file1', (atime, mtime)) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') with changedir('output'): self.cmd(f'--repo={self.repository_location}', 'extract', 'test') @@ -582,7 +584,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): birthtime, mtime, atime = 946598400, 946684800, 946771200 os.utime('input/file1', (atime, birthtime)) os.utime('input/file1', (atime, mtime)) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input', '--nobirthtime') with changedir('output'): self.cmd(f'--repo={self.repository_location}', 'extract', 'test') @@ -643,7 +645,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): if sparse_support: # we could create a sparse input file, so creating a backup of it and # extracting it again (as sparse) should also work: - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') with changedir(self.output_path): self.cmd(f'--repo={self.repository_location}', 'extract', 'test', '--sparse') @@ -662,7 +664,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): filename = os.path.join(self.input_path, filename) with open(filename, 'wb'): pass - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') for filename in filenames: with changedir('output'): @@ -672,7 +674,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_repository_swap_detection(self): self.create_test_files() os.environ['BORG_PASSPHRASE'] = 'passphrase' - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) repository_id = self._extract_repository_id(self.repository_path) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') shutil.rmtree(self.repository_path) @@ -689,7 +691,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.create_test_files() self.cmd(f'--repo={self.repository_location}_unencrypted', 'rcreate', '--encryption=none') os.environ['BORG_PASSPHRASE'] = 'passphrase' - self.cmd(f'--repo={self.repository_location}_encrypted', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}_encrypted', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}_encrypted', 'create', 'test', 'input') shutil.rmtree(self.repository_path + '_encrypted') os.rename(self.repository_path + '_unencrypted', self.repository_path + '_encrypted') @@ -702,7 +704,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_repository_swap_detection_no_cache(self): self.create_test_files() os.environ['BORG_PASSPHRASE'] = 'passphrase' - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) repository_id = self._extract_repository_id(self.repository_path) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') shutil.rmtree(self.repository_path) @@ -720,7 +722,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.create_test_files() self.cmd(f'--repo={self.repository_location}_unencrypted', 'rcreate', '--encryption=none') os.environ['BORG_PASSPHRASE'] = 'passphrase' - self.cmd(f'--repo={self.repository_location}_encrypted', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}_encrypted', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}_encrypted', 'create', 'test', 'input') self.cmd(f'--repo={self.repository_location}_unencrypted', 'rdelete', '--cache-only') self.cmd(f'--repo={self.repository_location}_encrypted', 'rdelete', '--cache-only') @@ -736,12 +738,12 @@ class ArchiverTestCase(ArchiverTestCaseBase): # Check that a repokey repo with a blank passphrase is considered like a plaintext repo. self.create_test_files() # User initializes her repository with her passphrase - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') # Attacker replaces it with her own repository, which is encrypted but has no passphrase set shutil.rmtree(self.repository_path) with environment_variable(BORG_PASSPHRASE=''): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) # Delete cache & security database, AKA switch to user perspective self.cmd(f'--repo={self.repository_location}', 'rdelete', '--cache-only') shutil.rmtree(self.get_security_dir()) @@ -756,7 +758,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.cmd(f'--repo={self.repository_location}', 'create', 'test.2', 'input') def test_repository_move(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) security_dir = self.get_security_dir() os.rename(self.repository_path, self.repository_path + '_new') with environment_variable(BORG_RELOCATED_REPO_ACCESS_IS_OK='yes'): @@ -774,7 +776,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): assert os.path.exists(os.path.join(security_dir, file)) def test_security_dir_compat(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) with open(os.path.join(self.get_security_dir(), 'location'), 'w') as fd: fd.write('something outdated') # This is fine, because the cache still has the correct information. security_dir and cache can disagree @@ -802,7 +804,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.cmd(f'--repo={self.repository_location}', 'rinfo') def test_strip_components(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('dir/file') self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') with changedir('output'): @@ -831,7 +833,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): os.link(os.path.join(self.input_path, 'dir1/source2'), os.path.join(self.input_path, 'dir1/aaaa')) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') @requires_hardlinks @@ -921,7 +923,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): assert os.stat('input/b/hardlink').st_nlink == 2 def test_extract_include_exclude(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('file1', size=1024 * 80) self.create_regular_file('file2', size=1024 * 80) self.create_regular_file('file3', size=1024 * 80) @@ -938,7 +940,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.assert_equal(sorted(os.listdir('output/input')), ['file1', 'file3']) def test_extract_include_exclude_regex(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('file1', size=1024 * 80) self.create_regular_file('file2', size=1024 * 80) self.create_regular_file('file3', size=1024 * 80) @@ -971,7 +973,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.assert_equal(sorted(os.listdir('output/input')), ['file3']) def test_extract_include_exclude_regex_from_file(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('file1', size=1024 * 80) self.create_regular_file('file2', size=1024 * 80) self.create_regular_file('file3', size=1024 * 80) @@ -1011,7 +1013,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.assert_equal(sorted(os.listdir('output/input')), ['file3']) def test_extract_with_pattern(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file("file1", size=1024 * 80) self.create_regular_file("file2", size=1024 * 80) self.create_regular_file("file3", size=1024 * 80) @@ -1044,7 +1046,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.assert_equal(sorted(os.listdir("output/input")), ["file1", "file2", "file333"]) def test_extract_list_output(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('file', size=1024 * 80) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') @@ -1069,7 +1071,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.assert_in("input/file", output) def test_extract_progress(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('file', size=1024 * 80) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') @@ -1078,7 +1080,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): assert 'Extracting:' in output def _create_test_caches(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('file1', size=1024 * 80) self.create_regular_file('cache1/%s' % CACHE_TAG_NAME, contents=CACHE_TAG_CONTENTS + b' extra stuff') @@ -1092,7 +1094,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): contents=CACHE_TAG_CONTENTS + b' extra stuff') def test_create_stdin(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) input_data = b'\x00foo\n\nbar\n \n' self.cmd(f'--repo={self.repository_location}', 'create', 'test', '-', input=input_data) item = json.loads(self.cmd(f'--repo={self.repository_location}', 'list', 'test', '--json-lines')) @@ -1104,7 +1106,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): assert extracted_data == input_data def test_create_content_from_command(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) input_data = 'some test content' name = 'a/b/c' self.cmd(f'--repo={self.repository_location}', 'create', '--stdin-name', name, '--content-from-command', @@ -1118,7 +1120,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): assert extracted_data == input_data + '\n' def test_create_content_from_command_with_failed_command(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) output = self.cmd(f'--repo={self.repository_location}', 'create', '--content-from-command', 'test', '--', 'sh', '-c', 'exit 73;', exit_code=2) assert output.endswith("Command 'sh' exited with status 73\n") @@ -1126,12 +1128,12 @@ class ArchiverTestCase(ArchiverTestCaseBase): assert archive_list['archives'] == [] def test_create_content_from_command_missing_command(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) output = self.cmd(f'--repo={self.repository_location}', 'create', 'test', '--content-from-command', exit_code=2) assert output.endswith('No command given.\n') def test_create_paths_from_stdin(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file("file1", size=1024 * 80) self.create_regular_file("dir1/file2", size=1024 * 80) self.create_regular_file("dir1/file3", size=1024 * 80) @@ -1145,7 +1147,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): assert paths == ['input/file1', 'input/dir1', 'input/file4'] def test_create_paths_from_command(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file("file1", size=1024 * 80) self.create_regular_file("file2", size=1024 * 80) self.create_regular_file("file3", size=1024 * 80) @@ -1159,7 +1161,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): assert paths == ['input/file1', 'input/file2', 'input/file3'] def test_create_paths_from_command_with_failed_command(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) output = self.cmd(f'--repo={self.repository_location}', 'create', '--paths-from-command', 'test', '--', 'sh', '-c', 'exit 73;', exit_code=2) assert output.endswith("Command 'sh' exited with status 73\n") @@ -1167,18 +1169,18 @@ class ArchiverTestCase(ArchiverTestCaseBase): assert archive_list['archives'] == [] def test_create_paths_from_command_missing_command(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) output = self.cmd(f'--repo={self.repository_location}', 'create', 'test', '--paths-from-command', exit_code=2) assert output.endswith('No command given.\n') def test_create_without_root(self): """test create without a root""" - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', exit_code=2) def test_create_pattern_root(self): """test create with only a root pattern""" - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('file1', size=1024 * 80) self.create_regular_file('file2', size=1024 * 80) output = self.cmd(f'--repo={self.repository_location}', 'create', 'test', '-v', '--list', '--pattern=R input') @@ -1187,7 +1189,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_create_pattern(self): """test file patterns during create""" - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('file1', size=1024 * 80) self.create_regular_file('file2', size=1024 * 80) self.create_regular_file('file_important', size=1024 * 80) @@ -1200,7 +1202,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_create_pattern_file(self): """test file patterns during create""" - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('file1', size=1024 * 80) self.create_regular_file('file2', size=1024 * 80) self.create_regular_file('otherfile', size=1024 * 80) @@ -1219,7 +1221,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): with open(self.patterns_file_path2, 'wb') as fd: fd.write(b'+ input/x/b\n- input/x*\n') - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('x/a/foo_a', size=1024 * 80) self.create_regular_file('x/b/foo_b', size=1024 * 80) self.create_regular_file('y/foo_y', size=1024 * 80) @@ -1236,7 +1238,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): with open(self.patterns_file_path2, 'wb') as fd: fd.write(b'+ input/x/b\n! input/x*\n') - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('x/a/foo_a', size=1024 * 80) self.create_regular_file('x/b/foo_b', size=1024 * 80) self.create_regular_file('y/foo_y', size=1024 * 80) @@ -1253,7 +1255,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): with open(self.patterns_file_path2, 'wb') as fd: fd.write(b'+ input/x/a\n+ input/x/b\n- input/x*\n') - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('x/a/foo_a', size=1024 * 80) self.create_regular_file('x/b/foo_b', size=1024 * 80) @@ -1274,7 +1276,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_create_no_cache_sync(self): self.create_test_files() - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'rdelete', '--cache-only') create_json = json.loads(self.cmd(f'--repo={self.repository_location}', 'create', '--no-cache-sync', '--json', '--error', @@ -1289,7 +1291,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.cmd(f'--repo={self.repository_location}', 'check') def test_extract_pattern_opt(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('file1', size=1024 * 80) self.create_regular_file('file2', size=1024 * 80) self.create_regular_file('file_important', size=1024 * 80) @@ -1317,7 +1319,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self._assert_test_caches() def _create_test_tagged(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('file1', size=1024 * 80) self.create_regular_file('tagged1/.NOBACKUP') self.create_regular_file('tagged2/00-NOBACKUP') @@ -1342,7 +1344,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self._assert_test_tagged() def _create_test_keep_tagged(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('file0', size=1024) self.create_regular_file('tagged1/.NOBACKUP1') self.create_regular_file('tagged1/file1', size=1024) @@ -1408,7 +1410,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): capabilities = b'\x01\x00\x00\x02\x00 \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' self.create_regular_file('file') xattr.setxattr(b'input/file', b'security.capability', capabilities) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') with changedir('output'): with patch.object(os, 'fchown', patched_fchown): @@ -1447,7 +1449,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): assert os.path.isfile(input_abspath) def test_path_normalization(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('dir1/dir2/file', size=1024 * 80) with changedir('input/dir1/dir2'): self.cmd(f'--repo={self.repository_location}', 'create', 'test', @@ -1457,7 +1459,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.assert_in(' input/dir1/dir2/file', output) def test_exclude_normalization(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('file1', size=1024 * 80) self.create_regular_file('file2', size=1024 * 80) with changedir('input'): @@ -1477,13 +1479,13 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_repeated_files(self): self.create_regular_file('file1', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input', 'input') def test_overwrite(self): self.create_regular_file('file1', size=1024 * 80) self.create_regular_file('dir2/file2', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') # Overwriting regular files and directories should be supported os.mkdir('output/input') @@ -1502,7 +1504,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_rename(self): self.create_regular_file('file1', size=1024 * 80) self.create_regular_file('dir2/file2', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') self.cmd(f'--repo={self.repository_location}', 'create', 'test.2', 'input') self.cmd(f'--repo={self.repository_location}', 'extract', 'test', '--dry-run') @@ -1521,7 +1523,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_info(self): self.create_regular_file('file1', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') info_repo = self.cmd(f'--repo={self.repository_location}', 'rinfo') assert 'Original size:' in info_repo @@ -1532,14 +1534,14 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_info_json(self): self.create_regular_file('file1', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') info_repo = json.loads(self.cmd(f'--repo={self.repository_location}', 'rinfo', '--json')) repository = info_repo['repository'] assert len(repository['id']) == 64 assert 'last_modified' in repository assert datetime.strptime(repository['last_modified'], ISO_FORMAT) # must not raise - assert info_repo['encryption']['mode'] == 'repokey' + assert info_repo['encryption']['mode'] == RK_ENCRYPTION[13:] assert 'keyfile' not in info_repo['encryption'] cache = info_repo['cache'] stats = cache['stats'] @@ -1562,7 +1564,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_info_json_of_empty_archive(self): """See https://github.com/borgbackup/borg/issues/6120""" - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) info_repo = json.loads(self.cmd(f'--repo={self.repository_location}', 'info', '--json', '--first=1')) assert info_repo["archives"] == [] info_repo = json.loads(self.cmd(f'--repo={self.repository_location}', 'info', '--json', '--last=1')) @@ -1570,7 +1572,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_comment(self): self.create_regular_file('file1', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test1', 'input') self.cmd(f'--repo={self.repository_location}', 'create', 'test2', 'input', '--comment', 'this is the comment') self.cmd(f'--repo={self.repository_location}', 'create', 'test3', 'input', '--comment', '"deleted" comment') @@ -1590,7 +1592,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_delete(self): self.create_regular_file('file1', size=1024 * 80) self.create_regular_file('dir2/file2', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') self.cmd(f'--repo={self.repository_location}', 'create', 'test.2', 'input') self.cmd(f'--repo={self.repository_location}', 'create', 'test.3', 'input') @@ -1610,7 +1612,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_delete_multiple(self): self.create_regular_file('file1', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test1', 'input') self.cmd(f'--repo={self.repository_location}', 'create', 'test2', 'input') self.cmd(f'--repo={self.repository_location}', 'create', 'test3', 'input') @@ -1623,7 +1625,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_delete_repo(self): self.create_regular_file('file1', size=1024 * 80) self.create_regular_file('dir2/file2', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') self.cmd(f'--repo={self.repository_location}', 'create', 'test.2', 'input') os.environ['BORG_DELETE_I_KNOW_WHAT_I_AM_DOING'] = 'no' @@ -1668,7 +1670,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.assert_not_in('test', output) def test_corrupted_repository(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_src_archive('test') self.cmd(f'--repo={self.repository_location}', 'extract', 'test', '--dry-run') output = self.cmd(f'--repo={self.repository_location}', 'check', '--show-version') @@ -1683,7 +1685,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.assert_in('Starting repository check', output) # --info given for root logger def test_readonly_check(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_src_archive('test') with self.read_only(self.repository_path): # verify that command normally doesn't work with read-only repo @@ -1698,7 +1700,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.cmd(f'--repo={self.repository_location}', 'check', '--verify-data', '--bypass-lock') def test_readonly_diff(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_src_archive('a') self.create_src_archive('b') with self.read_only(self.repository_path): @@ -1714,7 +1716,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.cmd(f'--repo={self.repository_location}', 'diff', 'a', 'b', '--bypass-lock') def test_readonly_export_tar(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_src_archive('test') with self.read_only(self.repository_path): # verify that command normally doesn't work with read-only repo @@ -1729,7 +1731,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.cmd(f'--repo={self.repository_location}', 'export-tar', 'test', 'test.tar', '--bypass-lock') def test_readonly_extract(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_src_archive('test') with self.read_only(self.repository_path): # verify that command normally doesn't work with read-only repo @@ -1744,7 +1746,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.cmd(f'--repo={self.repository_location}', 'extract', 'test', '--bypass-lock') def test_readonly_info(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_src_archive('test') with self.read_only(self.repository_path): # verify that command normally doesn't work with read-only repo @@ -1759,7 +1761,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.cmd(f'--repo={self.repository_location}', 'rinfo', '--bypass-lock') def test_readonly_list(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_src_archive('test') with self.read_only(self.repository_path): # verify that command normally doesn't work with read-only repo @@ -1775,7 +1777,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): @unittest.skipUnless(llfuse, 'llfuse not installed') def test_readonly_mount(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_src_archive('test') with self.read_only(self.repository_path): # verify that command normally doesn't work with read-only repo @@ -1796,13 +1798,13 @@ class ArchiverTestCase(ArchiverTestCaseBase): @pytest.mark.skipif('BORG_TESTS_IGNORE_MODES' in os.environ, reason='modes unreliable') def test_umask(self): self.create_regular_file('file1', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') mode = os.stat(self.repository_path).st_mode self.assertEqual(stat.S_IMODE(mode), 0o700) def test_create_dry_run(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', '--dry-run', 'test', 'input') # Make sure no archive has been created with Repository(self.repository_path) as repository: @@ -1825,23 +1827,23 @@ class ArchiverTestCase(ArchiverTestCaseBase): assert excinfo.value.args == (['unknown-feature'],) def test_unknown_feature_on_create(self): - print(self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey')) + print(self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION)) self.add_unknown_feature(Manifest.Operation.WRITE) self.cmd_raises_unknown_feature([f'--repo={self.repository_location}', 'create', 'test', 'input']) def test_unknown_feature_on_cache_sync(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'rdelete', '--cache-only') self.add_unknown_feature(Manifest.Operation.READ) self.cmd_raises_unknown_feature([f'--repo={self.repository_location}', 'create', 'test', 'input']) def test_unknown_feature_on_change_passphrase(self): - print(self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey')) + print(self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION)) self.add_unknown_feature(Manifest.Operation.CHECK) self.cmd_raises_unknown_feature([f'--repo={self.repository_location}', 'key', 'change-passphrase']) def test_unknown_feature_on_read(self): - print(self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey')) + print(self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION)) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') self.add_unknown_feature(Manifest.Operation.READ) with changedir('output'): @@ -1851,13 +1853,13 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.cmd_raises_unknown_feature([f'--repo={self.repository_location}', 'info', '-a', 'test']) def test_unknown_feature_on_rename(self): - print(self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey')) + print(self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION)) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') self.add_unknown_feature(Manifest.Operation.CHECK) self.cmd_raises_unknown_feature([f'--repo={self.repository_location}', 'rename', 'test', 'other']) def test_unknown_feature_on_delete(self): - print(self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey')) + print(self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION)) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') self.add_unknown_feature(Manifest.Operation.DELETE) # delete of an archive raises @@ -1868,7 +1870,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): @unittest.skipUnless(llfuse, 'llfuse not installed') def test_unknown_feature_on_mount(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') self.add_unknown_feature(Manifest.Operation.READ) mountpoint = os.path.join(self.tmpdir, 'mountpoint') @@ -1883,7 +1885,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): else: path_prefix = '' - print(self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey')) + print(self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION)) with Repository(self.repository_path, exclusive=True) as repository: if path_prefix: @@ -1919,13 +1921,13 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_progress_on(self): self.create_regular_file('file1', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) output = self.cmd(f'--repo={self.repository_location}', 'create', 'test4', 'input', '--progress') self.assert_in("\r", output) def test_progress_off(self): self.create_regular_file('file1', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) output = self.cmd(f'--repo={self.repository_location}', 'create', 'test5', 'input') self.assert_not_in("\r", output) @@ -1936,7 +1938,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.create_regular_file('file1', size=1024 * 80) time.sleep(1) # file2 must have newer timestamps than file1 self.create_regular_file('file2', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) output = self.cmd(f'--repo={self.repository_location}', 'create', '--list', 'test', 'input') self.assert_in("A input/file1", output) self.assert_in("A input/file2", output) @@ -1952,7 +1954,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.create_regular_file('file1', contents=b'123') time.sleep(1) # file2 must have newer timestamps than file1 self.create_regular_file('file2', size=10) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) output = self.cmd(f'--repo={self.repository_location}', 'create', 'test1', 'input', '--list', '--files-cache=ctime,size') # modify file1, but cheat with the mtime (and atime) and also keep same size: @@ -1969,7 +1971,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.create_regular_file('file1', size=10) time.sleep(1) # file2 must have newer timestamps than file1 self.create_regular_file('file2', size=10) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) output = self.cmd(f'--repo={self.repository_location}', 'create', '--list', '--files-cache=mtime,size', 'test1', 'input') # change mode of file1, no content change: @@ -1985,7 +1987,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.create_regular_file('file1', size=10) time.sleep(1) # file2 must have newer timestamps than file1 self.create_regular_file('file2', size=10) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) output = self.cmd(f'--repo={self.repository_location}', 'create', '--list', '--files-cache=rechunk,ctime', 'test1', 'input') # no changes here, but this mode rechunks unconditionally @@ -2002,7 +2004,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): if has_lchflags: self.create_regular_file('file3', size=1024 * 80) platform.set_flags(os.path.join(self.input_path, 'file3'), stat.UF_NODUMP) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) output = self.cmd(f'--repo={self.repository_location}', 'create', '--list', '--exclude-nodump', 'test', 'input') self.assert_in("A input/file1", output) self.assert_in("A input/file2", output) @@ -2018,7 +2020,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_create_json(self): self.create_regular_file('file1', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) create_info = json.loads(self.cmd(f'--repo={self.repository_location}', 'create', '--json', 'test', 'input')) # The usual keys @@ -2038,7 +2040,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.create_regular_file('file1', size=1024 * 80) time.sleep(1) # file2 must have newer timestamps than file1 self.create_regular_file('file2', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) # no listing by default output = self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') self.assert_not_in('file1', output) @@ -2068,7 +2070,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): finally: os.close(fd) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) data = b'foobar' * 1000 fifo_fn = os.path.join(self.input_path, 'fifo') @@ -2091,20 +2093,20 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_create_read_special_broken_symlink(self): os.symlink('somewhere does not exist', os.path.join(self.input_path, 'link')) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', '--read-special', 'test', 'input') output = self.cmd(f'--repo={self.repository_location}', 'list', 'test') assert 'input/link -> somewhere does not exist' in output # def test_cmdline_compatibility(self): # self.create_regular_file('file1', size=1024 * 80) - # self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + # self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) # self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') # output = self.cmd('foo', self.repository_location, '--old') # self.assert_in('"--old" has been deprecated. Use "--new" instead', output) def test_prune_repository(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test1', src_dir) self.cmd(f'--repo={self.repository_location}', 'create', 'test2', src_dir) # these are not really a checkpoints, but they look like some: @@ -2153,7 +2155,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): # This test must match docs/misc/prune-example.txt def test_prune_repository_example(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) # Archives that will be kept, per the example # Oldest archive self._create_archive_ts('test01', 2015, 1, 1) @@ -2214,7 +2216,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): # With an initial and daily backup, prune daily until oldest is replaced by a monthly backup def test_prune_retain_and_expire_oldest(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) # Initial backup self._create_archive_ts('original_archive', 2020, 9, 1, 11, 15) # Archive and prune daily for 30 days @@ -2237,7 +2239,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.assert_not_in('original_archive', output) def test_prune_repository_save_space(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test1', src_dir) self.cmd(f'--repo={self.repository_location}', 'create', 'test2', src_dir) output = self.cmd(f'--repo={self.repository_location}', 'prune', '--list', '--dry-run', '--keep-daily=1') @@ -2252,7 +2254,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.assert_in('test2', output) def test_prune_repository_prefix(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'foo-2015-08-12-10:00', src_dir) self.cmd(f'--repo={self.repository_location}', 'create', 'foo-2015-08-12-20:00', src_dir) self.cmd(f'--repo={self.repository_location}', 'create', 'bar-2015-08-12-10:00', src_dir) @@ -2273,7 +2275,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.assert_in('bar-2015-08-12-20:00', output) def test_prune_repository_glob(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', '2015-08-12-10:00-foo', src_dir) self.cmd(f'--repo={self.repository_location}', 'create', '2015-08-12-20:00-foo', src_dir) self.cmd(f'--repo={self.repository_location}', 'create', '2015-08-12-10:00-bar', src_dir) @@ -2294,7 +2296,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.assert_in('2015-08-12-20:00-bar', output) def test_list_prefix(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test-1', src_dir) self.cmd(f'--repo={self.repository_location}', 'create', 'something-else-than-test-1', src_dir) self.cmd(f'--repo={self.repository_location}', 'create', 'test-2', src_dir) @@ -2304,7 +2306,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.assert_not_in('something-else', output) def test_list_format(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', src_dir) output_1 = self.cmd(f'--repo={self.repository_location}', 'list', 'test') output_2 = self.cmd(f'--repo={self.repository_location}', 'list', 'test', '--format', '{mode} {user:6} {group:6} {size:8d} {mtime} {path}{extra}{NEWLINE}') @@ -2313,7 +2315,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.assertNotEqual(output_1, output_3) def test_archives_format(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', '--comment', 'comment 1', 'test-1', src_dir) self.cmd(f'--repo={self.repository_location}', 'create', '--comment', 'comment 2', 'test-2', src_dir) output_1 = self.cmd(f'--repo={self.repository_location}', 'rlist') @@ -2330,14 +2332,14 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_list_hash(self): self.create_regular_file('empty_file', size=0) self.create_regular_file('amb', contents=b'a' * 1000000) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') output = self.cmd(f'--repo={self.repository_location}', 'list', 'test', '--format', '{sha256} {path}{NL}') assert "cdc76e5c9914fb9281a1c7e284d73e67f1809a48a497200e046d39ccc7112cd0 input/amb" in output assert "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 input/empty_file" in output def test_list_consider_checkpoints(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test1', src_dir) # these are not really a checkpoints, but they look like some: self.cmd(f'--repo={self.repository_location}', 'create', 'test2.checkpoint', src_dir) @@ -2357,7 +2359,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): with open(os.path.join(self.input_path, 'two_chunks'), 'wb') as fd: fd.write(b'abba' * 2000000) fd.write(b'baab' * 2000000) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') output = self.cmd(f'--repo={self.repository_location}', 'list', 'test', '--format', '{num_chunks} {unique_chunks} {path}{NL}') assert "0 0 input/empty_file" in output @@ -2365,7 +2367,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_list_size(self): self.create_regular_file('compressible_file', size=10000) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', '-C', 'lz4', 'test', 'input') output = self.cmd(f'--repo={self.repository_location}', 'list', 'test', '--format', '{size} {path}{NL}') size, path = output.split("\n")[1].split(" ") @@ -2373,13 +2375,13 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_list_json(self): self.create_regular_file('file1', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') list_repo = json.loads(self.cmd(f'--repo={self.repository_location}', 'rlist', '--json')) repository = list_repo['repository'] assert len(repository['id']) == 64 assert datetime.strptime(repository['last_modified'], ISO_FORMAT) # must not raise - assert list_repo['encryption']['mode'] == 'repokey' + assert list_repo['encryption']['mode'] == RK_ENCRYPTION[13:] assert 'keyfile' not in list_repo['encryption'] archive0 = list_repo['archives'][0] assert datetime.strptime(archive0['time'], ISO_FORMAT) # must not raise @@ -2401,7 +2403,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_log_json(self): self.create_test_files() - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) log = self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input', '--log-json', '--list', '--debug') messages = {} # type -> message, one of each kind for line in log.splitlines(): @@ -2419,7 +2421,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_debug_profile(self): self.create_test_files() - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input', '--debug-profile=create.prof') self.cmd('debug', 'convert-profile', 'create.prof', 'create.pyprof') stats = pstats.Stats('create.pyprof') @@ -2433,12 +2435,12 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_common_options(self): self.create_test_files() - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) log = self.cmd(f'--repo={self.repository_location}', '--debug', 'create', 'test', 'input') assert 'security: read previous location' in log def test_change_passphrase(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) os.environ['BORG_NEW_PASSPHRASE'] = 'newpassphrase' # here we have both BORG_PASSPHRASE and BORG_NEW_PASSPHRASE set: self.cmd(f'--repo={self.repository_location}', 'key', 'change-passphrase') @@ -2446,39 +2448,39 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.cmd(f'--repo={self.repository_location}', 'rlist') def test_change_location_to_keyfile(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) log = self.cmd(f'--repo={self.repository_location}', 'rinfo') - assert '(repokey)' in log + assert '(repokey' in log self.cmd(f'--repo={self.repository_location}', 'key', 'change-location', 'keyfile') log = self.cmd(f'--repo={self.repository_location}', 'rinfo') - assert '(key file)' in log + assert '(key file' in log def test_change_location_to_b2keyfile(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey-blake2') + self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey-blake2-aes-ocb') log = self.cmd(f'--repo={self.repository_location}', 'rinfo') - assert '(repokey BLAKE2b)' in log + assert '(repokey BLAKE2b' in log self.cmd(f'--repo={self.repository_location}', 'key', 'change-location', 'keyfile') log = self.cmd(f'--repo={self.repository_location}', 'rinfo') - assert '(key file BLAKE2b)' in log + assert '(key file BLAKE2b' in log def test_change_location_to_repokey(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=keyfile') + self.cmd(f'--repo={self.repository_location}', 'rcreate', KF_ENCRYPTION) log = self.cmd(f'--repo={self.repository_location}', 'rinfo') - assert '(key file)' in log + assert '(key file' in log self.cmd(f'--repo={self.repository_location}', 'key', 'change-location', 'repokey') log = self.cmd(f'--repo={self.repository_location}', 'rinfo') - assert '(repokey)' in log + assert '(repokey' in log def test_change_location_to_b2repokey(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=keyfile-blake2') + self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=keyfile-blake2-aes-ocb') log = self.cmd(f'--repo={self.repository_location}', 'rinfo') - assert '(key file BLAKE2b)' in log + assert '(key file BLAKE2b' in log self.cmd(f'--repo={self.repository_location}', 'key', 'change-location', 'repokey') log = self.cmd(f'--repo={self.repository_location}', 'rinfo') - assert '(repokey BLAKE2b)' in log + assert '(repokey BLAKE2b' in log def test_break_lock(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'break-lock') def test_usage(self): @@ -2505,7 +2507,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): noatime_used = flags_noatime != flags_normal return noatime_used and atime_before == atime_after - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_test_files() have_noatime = has_noatime('input/file1') self.cmd(f'--repo={self.repository_location}', 'create', '--exclude-nodump', '--atime', 'archive', 'input') @@ -2594,7 +2596,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): @unittest.skipUnless(llfuse, 'llfuse not installed') def test_fuse_versions_view(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('test', contents=b'first') if are_hardlinks_supported(): self.create_regular_file('hardlink1', contents=b'123456') @@ -2626,7 +2628,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): @unittest.skipUnless(llfuse, 'llfuse not installed') def test_fuse_allow_damaged_files(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_src_archive('archive') # Get rid of a chunk and repair it archive, repository = self.open_archive('archive') @@ -2651,7 +2653,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): @unittest.skipUnless(llfuse, 'llfuse not installed') def test_fuse_mount_options(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_src_archive('arch11') self.create_src_archive('arch12') self.create_src_archive('arch21') @@ -2759,43 +2761,9 @@ class ArchiverTestCase(ArchiverTestCaseBase): # Undecorate borg.locking.Lock.migrate_lock = borg.locking.Lock.migrate_lock.__wrapped__ - def verify_aes_counter_uniqueness(self, method): - seen = set() # Chunks already seen - used = set() # counter values already used - - def verify_uniqueness(): - with Repository(self.repository_path) as repository: - for id, _ in repository.open_index(repository.get_transaction_id()).iteritems(): - data = repository.get(id) - hash = sha256(data).digest() - if hash not in seen: - seen.add(hash) - num_blocks = num_cipher_blocks(len(data) - 41) - nonce = bytes_to_long(data[33:41]) - for counter in range(nonce, nonce + num_blocks): - self.assert_not_in(counter, used) - used.add(counter) - - self.create_test_files() - os.environ['BORG_PASSPHRASE'] = 'passphrase' - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=' + method) - verify_uniqueness() - self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') - verify_uniqueness() - self.cmd(f'--repo={self.repository_location}', 'create', 'test.2', 'input') - verify_uniqueness() - self.cmd(f'--repo={self.repository_location}', 'delete', '-a', 'test.2') - verify_uniqueness() - - def test_aes_counter_uniqueness_keyfile(self): - self.verify_aes_counter_uniqueness('keyfile') - - def test_aes_counter_uniqueness_passphrase(self): - self.verify_aes_counter_uniqueness('repokey') - def test_debug_dump_archive_items(self): self.create_test_files() - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') with changedir('output'): output = self.cmd(f'--repo={self.repository_location}', 'debug', 'dump-archive-items', 'test') @@ -2805,7 +2773,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_debug_dump_repo_objs(self): self.create_test_files() - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') with changedir('output'): output = self.cmd(f'--repo={self.repository_location}', 'debug', 'dump-repo-objs') @@ -2814,7 +2782,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): assert 'Done.' in output def test_debug_put_get_delete_obj(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) data = b'some data' hexkey = sha256(data).hexdigest() self.create_regular_file('file', contents=data) @@ -2837,19 +2805,19 @@ class ArchiverTestCase(ArchiverTestCaseBase): raise EOFError with patch.object(FlexiKey, 'create', raise_eof): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey', exit_code=1) + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION, exit_code=1) assert not os.path.exists(self.repository_location) def test_init_requires_encryption_option(self): self.cmd(f'--repo={self.repository_location}', 'rcreate', exit_code=2) def test_init_nested_repositories(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) if self.FORK_DEFAULT: - self.cmd(f'--repo={self.repository_location}/nested', 'rcreate', '--encryption=repokey', exit_code=2) + self.cmd(f'--repo={self.repository_location}/nested', 'rcreate', RK_ENCRYPTION, exit_code=2) else: with pytest.raises(Repository.AlreadyExists): - self.cmd(f'--repo={self.repository_location}/nested', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}/nested', 'rcreate', RK_ENCRYPTION) def test_init_refuse_to_overwrite_keyfile(self): """BORG_KEY_FILE=something borg init should quit if "something" already exists. @@ -2857,10 +2825,10 @@ class ArchiverTestCase(ArchiverTestCaseBase): See https://github.com/borgbackup/borg/pull/6046""" keyfile = os.path.join(self.tmpdir, 'keyfile') with environment_variable(BORG_KEY_FILE=keyfile): - self.cmd(f'--repo={self.repository_location}0', 'rcreate', '--encryption=keyfile') + self.cmd(f'--repo={self.repository_location}0', 'rcreate', KF_ENCRYPTION) with open(keyfile) as file: before = file.read() - arg = (f'--repo={self.repository_location}1', 'rcreate', '--encryption=keyfile') + arg = (f'--repo={self.repository_location}1', 'rcreate', KF_ENCRYPTION) if self.FORK_DEFAULT: self.cmd(*arg, exit_code=2) else: @@ -2892,7 +2860,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): assert id in seen def test_check_cache(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') with self.open_repository() as repository: manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) @@ -2904,13 +2872,13 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.check_cache() def test_recreate_target_rc(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) output = self.cmd(f'--repo={self.repository_location}', 'recreate', '--target=asdf', exit_code=2) assert 'Need to specify single archive' in output def test_recreate_target(self): self.create_test_files() - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.check_cache() self.cmd(f'--repo={self.repository_location}', 'create', 'test0', 'input') self.check_cache() @@ -2930,7 +2898,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_recreate_basic(self): self.create_test_files() self.create_regular_file('dir2/file3', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test0', 'input') self.cmd(f'--repo={self.repository_location}', 'recreate', 'test0', 'input/dir2', '-e', 'input/dir2/file3') self.check_cache() @@ -2960,7 +2928,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): with open(os.path.join(self.input_path, 'large_file'), 'wb') as fd: fd.write(b'a' * 280) fd.write(b'b' * 280) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test1', 'input', '--chunker-params', '7,9,8,128') self.cmd(f'--repo={self.repository_location}', 'create', 'test2', 'input', '--files-cache=disabled') list = self.cmd(f'--repo={self.repository_location}', 'list', 'test1', 'input/large_file', @@ -2977,7 +2945,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_recreate_recompress(self): self.create_regular_file('compressible', size=10000) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input', '-C', 'none') file_list = self.cmd(f'--repo={self.repository_location}', 'list', 'test', 'input/compressible', '--format', '{size} {sha256}') @@ -2992,7 +2960,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_recreate_timestamp(self): local_timezone = datetime.now(timezone(timedelta(0))).astimezone().tzinfo self.create_test_files() - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test0', 'input') self.cmd(f'--repo={self.repository_location}', 'recreate', 'test0', '--timestamp', "1970-01-02T00:00:00", '--comment', 'test') @@ -3004,7 +2972,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_recreate_dry_run(self): self.create_regular_file('compressible', size=10000) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') archives_before = self.cmd(f'--repo={self.repository_location}', 'list', 'test') self.cmd(f'--repo={self.repository_location}', 'recreate', '-n', '-e', 'input/compressible') @@ -3014,7 +2982,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_recreate_skips_nothing_to_do(self): self.create_regular_file('file1', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') info_before = self.cmd(f'--repo={self.repository_location}', 'info', '-a', 'test') self.cmd(f'--repo={self.repository_location}', 'recreate', '--chunker-params', 'default') @@ -3023,13 +2991,13 @@ class ArchiverTestCase(ArchiverTestCaseBase): assert info_before == info_after # includes archive ID def test_with_lock(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) lock_path = os.path.join(self.repository_path, 'lock.exclusive') cmd = 'python3', '-c', 'import os, sys; sys.exit(42 if os.path.exists("%s") else 23)' % lock_path self.cmd(f'--repo={self.repository_location}', 'with-lock', *cmd, fork=True, exit_code=42) def test_recreate_list_output(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('file1', size=0) self.create_regular_file('file2', size=0) self.create_regular_file('file3', size=0) @@ -3059,13 +3027,13 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.assert_not_in("x input/file5", output) def test_bad_filters(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') self.cmd(f'--repo={self.repository_location}', 'delete', '--first', '1', '--last', '1', fork=True, exit_code=2) def test_key_export_keyfile(self): export_file = self.output_path + '/exported' - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption', 'keyfile') + self.cmd(f'--repo={self.repository_location}', 'rcreate', KF_ENCRYPTION) repo_id = self._extract_repository_id(self.repository_path) self.cmd(f'--repo={self.repository_location}', 'key', 'export', export_file) @@ -3091,7 +3059,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): assert key_contents2 == key_contents def test_key_import_keyfile_with_borg_key_file(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption', 'keyfile') + self.cmd(f'--repo={self.repository_location}', 'rcreate', KF_ENCRYPTION) exported_key_file = os.path.join(self.output_path, 'exported') self.cmd(f'--repo={self.repository_location}', 'key', 'export', exported_key_file) @@ -3112,7 +3080,7 @@ class ArchiverTestCase(ArchiverTestCaseBase): def test_key_export_repokey(self): export_file = self.output_path + '/exported' - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption', 'repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) repo_id = self._extract_repository_id(self.repository_path) self.cmd(f'--repo={self.repository_location}', 'key', 'export', export_file) @@ -3122,10 +3090,10 @@ class ArchiverTestCase(ArchiverTestCaseBase): assert export_contents.startswith('BORG_KEY ' + bin_to_hex(repo_id) + '\n') with Repository(self.repository_path) as repository: - repo_key = RepoKey(repository) + repo_key = AESOCBRepoKey(repository) repo_key.load(None, Passphrase.env_passphrase()) - backup_key = KeyfileKey(key.TestKey.MockRepository()) + backup_key = AESOCBKeyfileKey(key.TestKey.MockRepository()) backup_key.load(export_file, Passphrase.env_passphrase()) assert repo_key.enc_key == backup_key.enc_key @@ -3136,14 +3104,14 @@ class ArchiverTestCase(ArchiverTestCaseBase): self.cmd(f'--repo={self.repository_location}', 'key', 'import', export_file) with Repository(self.repository_path) as repository: - repo_key2 = RepoKey(repository) + repo_key2 = AESOCBRepoKey(repository) repo_key2.load(None, Passphrase.env_passphrase()) assert repo_key2.enc_key == repo_key2.enc_key def test_key_export_qr(self): export_file = self.output_path + '/exported.html' - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption', 'repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) repo_id = self._extract_repository_id(self.repository_path) self.cmd(f'--repo={self.repository_location}', 'key', 'export', '--qr-html', export_file) @@ -3158,13 +3126,13 @@ class ArchiverTestCase(ArchiverTestCaseBase): export_directory = self.output_path + '/exported' os.mkdir(export_directory) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption', 'repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'key', 'export', export_directory, exit_code=EXIT_ERROR) def test_key_import_errors(self): export_file = self.output_path + '/exported' - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption', 'keyfile') + self.cmd(f'--repo={self.repository_location}', 'rcreate', KF_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'key', 'import', export_file, exit_code=EXIT_ERROR) @@ -3190,13 +3158,13 @@ class ArchiverTestCase(ArchiverTestCaseBase): repo_id = 'e294423506da4e1ea76e8dcdf1a3919624ae3ae496fddf905610c351d3f09239' export_file = self.output_path + '/exported' - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption', 'keyfile') + self.cmd(f'--repo={self.repository_location}', 'rcreate', KF_ENCRYPTION) self._set_repository_id(self.repository_path, unhexlify(repo_id)) key_file = self.keys_path + '/' + os.listdir(self.keys_path)[0] with open(key_file, 'w') as fd: - fd.write(KeyfileKey.FILE_ID + ' ' + repo_id + '\n') + fd.write(CHPOKeyfileKey.FILE_ID + ' ' + repo_id + '\n') fd.write(b2a_base64(b'abcdefghijklmnopqrstu').decode()) self.cmd(f'--repo={self.repository_location}', 'key', 'export', '--paper', export_file) @@ -3214,12 +3182,12 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02 def test_key_import_paperkey(self): repo_id = 'e294423506da4e1ea76e8dcdf1a3919624ae3ae496fddf905610c351d3f09239' - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption', 'keyfile') + self.cmd(f'--repo={self.repository_location}', 'rcreate', KF_ENCRYPTION) self._set_repository_id(self.repository_path, unhexlify(repo_id)) key_file = self.keys_path + '/' + os.listdir(self.keys_path)[0] with open(key_file, 'w') as fd: - fd.write(KeyfileKey.FILE_ID + ' ' + repo_id + '\n') + fd.write(AESOCBKeyfileKey.FILE_ID + ' ' + repo_id + '\n') fd.write(b2a_base64(b'abcdefghijklmnopqrstu').decode()) typed_input = ( @@ -3259,7 +3227,7 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02 def test_debug_dump_manifest(self): self.create_regular_file('file1', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') dump_file = self.output_path + '/dump' output = self.cmd(f'--repo={self.repository_location}', 'debug', 'dump-manifest', dump_file) @@ -3274,7 +3242,7 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02 def test_debug_dump_archive(self): self.create_regular_file('file1', size=1024 * 80) - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') dump_file = self.output_path + '/dump' output = self.cmd(f'--repo={self.repository_location}', 'debug', 'dump-archive', 'test', dump_file) @@ -3287,7 +3255,7 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02 assert '_items' in result def test_debug_refcount_obj(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) output = self.cmd(f'--repo={self.repository_location}', 'debug', 'refcount-obj', '0' * 64).strip() assert output == 'object 0000000000000000000000000000000000000000000000000000000000000000 not found [info from chunks cache].' @@ -3306,14 +3274,14 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02 assert 'Python' in output def test_benchmark_crud(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) with environment_variable(_BORG_BENCHMARK_CRUD_TEST='YES'): self.cmd(f'--repo={self.repository_location}', 'benchmark', 'crud', self.input_path) def test_config(self): self.create_test_files() os.unlink('input/flagfile') - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) output = self.cmd(f'--repo={self.repository_location}', 'config', '--list') self.assert_in('[repository]', output) self.assert_in('version', output) @@ -3356,7 +3324,7 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02 def test_export_tar(self): self.create_test_files() os.unlink('input/flagfile') - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') self.cmd(f'--repo={self.repository_location}', 'export-tar', 'test', 'simple.tar', '--progress', '--tar-format=GNU') with changedir('output'): @@ -3371,7 +3339,7 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02 pytest.skip('gzip is not installed') self.create_test_files() os.unlink('input/flagfile') - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') list = self.cmd(f'--repo={self.repository_location}', 'export-tar', 'test', 'simple.tar.gz', '--list', '--tar-format=GNU') @@ -3387,7 +3355,7 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02 pytest.skip('gzip is not installed') self.create_test_files() os.unlink('input/flagfile') - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') list = self.cmd(f'--repo={self.repository_location}', 'export-tar', 'test', 'simple.tar', '--strip-components=1', '--list', '--tar-format=GNU') @@ -3497,69 +3465,22 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02 self.assert_in('this-repository-does-not-exist', output) self.assert_not_in('this-repository-does-not-exist::test', output) - def test_can_read_repo_even_if_nonce_is_deleted(self): - """Nonce is only used for encrypting new data. - - It should be possible to retrieve the data from an archive even if - both the client and the server forget the nonce""" - self.create_regular_file('file1', contents=b'Hello, borg') - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') - self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') - # Oops! We have removed the repo-side memory of the nonce! - # See https://github.com/borgbackup/borg/issues/5858 - os.remove(os.path.join(self.repository_path, 'nonce')) - # Oops! The client has lost the nonce too! - os.remove(os.path.join(self.get_security_dir(), 'nonce')) - - # The repo should still be readable - repo_info = self.cmd(f'--repo={self.repository_location}', 'rinfo') - assert 'Original size:' in repo_info - repo_list = self.cmd(f'--repo={self.repository_location}', 'rlist') - assert 'test' in repo_list - # The archive should still be readable - archive_info = self.cmd(f'--repo={self.repository_location}', 'info', '-a', 'test') - assert 'Archive name: test\n' in archive_info - archive_list = self.cmd(f'--repo={self.repository_location}', 'list', 'test') - assert 'file1' in archive_list - # Extracting the archive should work - with changedir('output'): - self.cmd(f'--repo={self.repository_location}', 'extract', 'test') - self.assert_dirs_equal('input', 'output/input') - - def test_recovery_from_deleted_repo_nonce(self): - """We should be able to recover if path/to/repo/nonce is deleted. - - The nonce is stored in two places: in the repo and in $HOME. - The nonce in the repo is only needed when multiple clients use the same - repo. Otherwise we can just use our own copy of the nonce. - """ - self.create_regular_file('file1', contents=b'Hello, borg') - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') - self.cmd(f'--repo={self.repository_location}', 'create', 'test', 'input') - # Oops! We have removed the repo-side memory of the nonce! - # See https://github.com/borgbackup/borg/issues/5858 - nonce = os.path.join(self.repository_path, 'nonce') - os.remove(nonce) - - self.cmd(f'--repo={self.repository_location}', 'create', 'test2', 'input') - assert os.path.exists(nonce) - def test_init_defaults_to_argon2(self): """https://github.com/borgbackup/borg/issues/747#issuecomment-1076160401""" - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) with Repository(self.repository_path) as repository: key = msgpack.unpackb(a2b_base64(repository.load_key())) assert key['algorithm'] == 'argon2 chacha20-poly1305' def test_init_with_explicit_key_algorithm(self): """https://github.com/borgbackup/borg/issues/747#issuecomment-1076160401""" - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey', '--key-algorithm=pbkdf2') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION, '--key-algorithm=pbkdf2') with Repository(self.repository_path) as repository: key = msgpack.unpackb(a2b_base64(repository.load_key())) assert key['algorithm'] == 'sha256' def verify_change_passphrase_does_not_change_algorithm(self, given_algorithm, expected_algorithm): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey', '--key-algorithm', given_algorithm) + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION, '--key-algorithm', given_algorithm) os.environ['BORG_NEW_PASSPHRASE'] = 'newpassphrase' self.cmd(f'--repo={self.repository_location}', 'key', 'change-passphrase') @@ -3575,7 +3496,7 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02 self.verify_change_passphrase_does_not_change_algorithm('pbkdf2', 'sha256') def verify_change_location_does_not_change_algorithm(self, given_algorithm, expected_algorithm): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=keyfile', '--key-algorithm', given_algorithm) + self.cmd(f'--repo={self.repository_location}', 'rcreate', KF_ENCRYPTION, '--key-algorithm', given_algorithm) self.cmd(f'--repo={self.repository_location}', 'key', 'change-location', 'repokey') @@ -3590,7 +3511,7 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02 self.verify_change_location_does_not_change_algorithm('pbkdf2', 'sha256') def test_key_change_algorithm(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey', '--key-algorithm=pbkdf2') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION, '--key-algorithm=pbkdf2') self.cmd(f'--repo={self.repository_location}', 'key', 'change-algorithm', 'argon2') @@ -3649,7 +3570,7 @@ class ArchiverCheckTestCase(ArchiverTestCaseBase): def setUp(self): super().setUp() with patch.object(ChunkBuffer, 'BUFFER_SIZE', 10): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_src_archive('archive1') self.create_src_archive('archive2') @@ -3844,7 +3765,7 @@ class ArchiverCheckTestCase(ArchiverTestCaseBase): assert 'testsuite/archiver.py: New missing file chunk detected' in output def test_verify_data(self): - self._test_verify_data('--encryption', 'repokey') + self._test_verify_data(RK_ENCRYPTION) def test_verify_data_unencrypted(self): self._test_verify_data('--encryption', 'none') @@ -3870,7 +3791,7 @@ class ManifestAuthenticationTest(ArchiverTestCaseBase): repository.commit(compact=False) def test_fresh_init_tam_required(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) repository = Repository(self.repository_path, exclusive=True) with repository: manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) @@ -3885,7 +3806,7 @@ class ManifestAuthenticationTest(ArchiverTestCaseBase): self.cmd(f'--repo={self.repository_location}', 'rlist') def test_not_required(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_src_archive('archive1234') repository = Repository(self.repository_path, exclusive=True) # Manifest must be authenticated now @@ -3908,32 +3829,32 @@ class RemoteArchiverTestCase(ArchiverTestCase): def test_remote_repo_restrict_to_path(self): # restricted to repo directory itself: with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-path', self.repository_path]): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) # restricted to repo directory itself, fail for other directories with same prefix: with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-path', self.repository_path]): with pytest.raises(PathNotAllowed): - self.cmd(f'--repo={self.repository_location}_0', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}_0', 'rcreate', RK_ENCRYPTION) # restricted to a completely different path: with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-path', '/foo']): with pytest.raises(PathNotAllowed): - self.cmd(f'--repo={self.repository_location}_1', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}_1', 'rcreate', RK_ENCRYPTION) path_prefix = os.path.dirname(self.repository_path) # restrict to repo directory's parent directory: with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-path', path_prefix]): - self.cmd(f'--repo={self.repository_location}_2', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}_2', 'rcreate', RK_ENCRYPTION) # restrict to repo directory's parent directory and another directory: with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-path', '/foo', '--restrict-to-path', path_prefix]): - self.cmd(f'--repo={self.repository_location}_3', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}_3', 'rcreate', RK_ENCRYPTION) def test_remote_repo_restrict_to_repository(self): # restricted to repo directory itself: with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-repository', self.repository_path]): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) parent_path = os.path.join(self.repository_path, '..') with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-repository', parent_path]): with pytest.raises(PathNotAllowed): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) @unittest.skip('only works locally') def test_debug_put_get_delete_obj(self): @@ -3948,7 +3869,7 @@ class RemoteArchiverTestCase(ArchiverTestCase): pass def test_remote_repo_strip_components_doesnt_leak(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('dir/file', contents=b"test file contents 1") self.create_regular_file('dir/file2', contents=b"test file contents 2") self.create_regular_file('skipped-file1', contents=b"test file contents 3") @@ -3974,7 +3895,7 @@ class ArchiverCorruptionTestCase(ArchiverTestCaseBase): def setUp(self): super().setUp() self.create_test_files() - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.cache_path = json.loads(self.cmd(f'--repo={self.repository_location}', 'rinfo', '--json'))['cache']['path'] def corrupt(self, file, amount=1): @@ -4068,7 +3989,7 @@ class DiffArchiverTestCase(ArchiverTestCaseBase): os.link('input/file_removed', 'input/hardlink_removed') os.link('input/file_removed2', 'input/hardlink_target_removed') - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) # Create the first snapshot self.cmd(f'--repo={self.repository_location}', 'create', 'test0', 'input') @@ -4254,7 +4175,7 @@ class DiffArchiverTestCase(ArchiverTestCaseBase): do_json_asserts(self.cmd(f'--repo={self.repository_location}', 'diff', 'test0', 'test1a', '--json-lines'), True) def test_sort_option(self): - self.cmd(f'--repo={self.repository_location}', 'rcreate', '--encryption=repokey') + self.cmd(f'--repo={self.repository_location}', 'rcreate', RK_ENCRYPTION) self.create_regular_file('a_file_removed', size=8) self.create_regular_file('f_file_removed', size=16) @@ -4315,7 +4236,7 @@ def test_get_args(): assert args.restrict_to_repositories == ['/r1', '/r2'] # trying to cheat - try to execute different subcommand args = archiver.get_args(['borg', 'serve', '--restrict-to-path=/p1', '--restrict-to-path=/p2', ], - 'borg --repo=/ rcreate --encryption=repokey') + f'borg --repo=/ rcreate {RK_ENCRYPTION}') assert args.func == archiver.do_serve # Check that environment variables in the forced command don't cause issues. If the command