key_cmds converted

This commit is contained in:
bigtedde 2023-07-07 15:53:49 -04:00
parent 725149cb8b
commit 3cccf19671
1 changed files with 249 additions and 217 deletions

View File

@ -1,5 +1,4 @@
import os
import unittest
from binascii import unhexlify, b2a_base64, a2b_base64
import pytest
@ -13,69 +12,84 @@ from ...helpers import msgpack
from ...repository import Repository
from .. import environment_variable
from .. import key
from . import (
ArchiverTestCaseBase,
ArchiverTestCaseBinaryBase,
RemoteArchiverTestCaseBase,
RK_ENCRYPTION,
KF_ENCRYPTION,
BORG_EXES,
)
from . import RK_ENCRYPTION, KF_ENCRYPTION, cmd, _extract_repository_id, _set_repository_id
class ArchiverTestCase(ArchiverTestCaseBase):
def test_change_passphrase(self):
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
def pytest_generate_tests(metafunc):
# Generate tests for different scenarios: local repository, remote repository, and using the borg binary.
if "archivers" in metafunc.fixturenames:
metafunc.parametrize("archivers", ["archiver", "remote_archiver", "binary_archiver"])
def test_change_passphrase(archivers, request):
archiver = request.getfixturevalue(archivers)
repo_location = archiver.repository_location
cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
os.environ["BORG_NEW_PASSPHRASE"] = "newpassphrase"
# here we have both BORG_PASSPHRASE and BORG_NEW_PASSPHRASE set:
self.cmd(f"--repo={self.repository_location}", "key", "change-passphrase")
cmd(archiver, f"--repo={repo_location}", "key", "change-passphrase")
os.environ["BORG_PASSPHRASE"] = "newpassphrase"
self.cmd(f"--repo={self.repository_location}", "rlist")
cmd(archiver, f"--repo={repo_location}", "rlist")
def test_change_location_to_keyfile(self):
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
log = self.cmd(f"--repo={self.repository_location}", "rinfo")
def test_change_location_to_keyfile(archivers, request):
archiver = request.getfixturevalue(archivers)
repo_location = archiver.repository_location
cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
log = cmd(archiver, f"--repo={repo_location}", "rinfo")
assert "(repokey" in log
self.cmd(f"--repo={self.repository_location}", "key", "change-location", "keyfile")
log = self.cmd(f"--repo={self.repository_location}", "rinfo")
cmd(archiver, f"--repo={repo_location}", "key", "change-location", "keyfile")
log = cmd(archiver, f"--repo={repo_location}", "rinfo")
assert "(key file" in log
def test_change_location_to_b2keyfile(self):
self.cmd(f"--repo={self.repository_location}", "rcreate", "--encryption=repokey-blake2-aes-ocb")
log = self.cmd(f"--repo={self.repository_location}", "rinfo")
def test_change_location_to_b2keyfile(archivers, request):
archiver = request.getfixturevalue(archivers)
repo_location = archiver.repository_location
cmd(archiver, f"--repo={repo_location}", "rcreate", "--encryption=repokey-blake2-aes-ocb")
log = cmd(archiver, f"--repo={repo_location}", "rinfo")
assert "(repokey BLAKE2b" in log
self.cmd(f"--repo={self.repository_location}", "key", "change-location", "keyfile")
log = self.cmd(f"--repo={self.repository_location}", "rinfo")
cmd(archiver, f"--repo={repo_location}", "key", "change-location", "keyfile")
log = cmd(archiver, f"--repo={repo_location}", "rinfo")
assert "(key file BLAKE2b" in log
def test_change_location_to_repokey(self):
self.cmd(f"--repo={self.repository_location}", "rcreate", KF_ENCRYPTION)
log = self.cmd(f"--repo={self.repository_location}", "rinfo")
def test_change_location_to_repokey(archivers, request):
archiver = request.getfixturevalue(archivers)
repo_location = archiver.repository_location
cmd(archiver, f"--repo={repo_location}", "rcreate", KF_ENCRYPTION)
log = cmd(archiver, f"--repo={repo_location}", "rinfo")
assert "(key file" in log
self.cmd(f"--repo={self.repository_location}", "key", "change-location", "repokey")
log = self.cmd(f"--repo={self.repository_location}", "rinfo")
cmd(archiver, f"--repo={repo_location}", "key", "change-location", "repokey")
log = cmd(archiver, f"--repo={repo_location}", "rinfo")
assert "(repokey" in log
def test_change_location_to_b2repokey(self):
self.cmd(f"--repo={self.repository_location}", "rcreate", "--encryption=keyfile-blake2-aes-ocb")
log = self.cmd(f"--repo={self.repository_location}", "rinfo")
def test_change_location_to_b2repokey(archivers, request):
archiver = request.getfixturevalue(archivers)
repo_location = archiver.repository_location
cmd(archiver, f"--repo={repo_location}", "rcreate", "--encryption=keyfile-blake2-aes-ocb")
log = cmd(archiver, f"--repo={repo_location}", "rinfo")
assert "(key file BLAKE2b" in log
self.cmd(f"--repo={self.repository_location}", "key", "change-location", "repokey")
log = self.cmd(f"--repo={self.repository_location}", "rinfo")
cmd(archiver, f"--repo={repo_location}", "key", "change-location", "repokey")
log = cmd(archiver, f"--repo={repo_location}", "rinfo")
assert "(repokey BLAKE2b" in log
def test_key_export_keyfile(self):
export_file = self.output_path + "/exported"
self.cmd(f"--repo={self.repository_location}", "rcreate", KF_ENCRYPTION)
repo_id = self._extract_repository_id(self.repository_path)
self.cmd(f"--repo={self.repository_location}", "key", "export", export_file)
def test_key_export_keyfile(archivers, request):
archiver = request.getfixturevalue(archivers)
repo_location, repo_path, keys_path = archiver.repository_location, archiver.repository_path, archiver.keys_path
export_file = archiver.output_path + "/exported"
cmd(archiver, f"--repo={repo_location}", "rcreate", KF_ENCRYPTION)
repo_id = _extract_repository_id(repo_path)
cmd(archiver, f"--repo={repo_location}", "key", "export", export_file)
with open(export_file) as fd:
export_contents = fd.read()
assert export_contents.startswith("BORG_KEY " + bin_to_hex(repo_id) + "\n")
key_file = self.keys_path + "/" + os.listdir(self.keys_path)[0]
key_file = keys_path + "/" + os.listdir(keys_path)[0]
with open(key_file) as fd:
key_contents = fd.read()
@ -84,45 +98,51 @@ class ArchiverTestCase(ArchiverTestCaseBase):
os.unlink(key_file)
self.cmd(f"--repo={self.repository_location}", "key", "import", export_file)
cmd(archiver, f"--repo={repo_location}", "key", "import", export_file)
with open(key_file) as fd:
key_contents2 = fd.read()
assert key_contents2 == key_contents
def test_key_import_keyfile_with_borg_key_file(self):
self.cmd(f"--repo={self.repository_location}", "rcreate", KF_ENCRYPTION)
exported_key_file = os.path.join(self.output_path, "exported")
self.cmd(f"--repo={self.repository_location}", "key", "export", exported_key_file)
def test_key_import_keyfile_with_borg_key_file(archivers, request):
archiver = request.getfixturevalue(archivers)
repo_location, keys_path, output_path = archiver.repository_location, archiver.keys_path, archiver.output_path
cmd(archiver, f"--repo={repo_location}", "rcreate", KF_ENCRYPTION)
key_file = os.path.join(self.keys_path, os.listdir(self.keys_path)[0])
exported_key_file = os.path.join(output_path, "exported")
cmd(archiver, f"--repo={repo_location}", "key", "export", exported_key_file)
key_file = os.path.join(keys_path, os.listdir(keys_path)[0])
with open(key_file) as fd:
key_contents = fd.read()
os.unlink(key_file)
imported_key_file = os.path.join(self.output_path, "imported")
imported_key_file = os.path.join(output_path, "imported")
with environment_variable(BORG_KEY_FILE=imported_key_file):
self.cmd(f"--repo={self.repository_location}", "key", "import", exported_key_file)
cmd(archiver, f"--repo={repo_location}", "key", "import", exported_key_file)
assert not os.path.isfile(key_file), '"borg key import" should respect BORG_KEY_FILE'
with open(imported_key_file) as fd:
imported_key_contents = fd.read()
assert imported_key_contents == key_contents
def test_key_export_repokey(self):
export_file = self.output_path + "/exported"
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
repo_id = self._extract_repository_id(self.repository_path)
self.cmd(f"--repo={self.repository_location}", "key", "export", export_file)
def test_key_export_repokey(archivers, request):
archiver = request.getfixturevalue(archivers)
repo_location, repo_path, output_path = archiver.repository_location, archiver.repository_path, archiver.output_path
export_file = output_path + "/exported"
cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
repo_id = _extract_repository_id(repo_path)
cmd(archiver, f"--repo={repo_location}", "key", "export", export_file)
with open(export_file) as fd:
export_contents = fd.read()
assert export_contents.startswith("BORG_KEY " + bin_to_hex(repo_id) + "\n")
with Repository(self.repository_path) as repository:
with Repository(repo_path) as repository:
repo_key = AESOCBRepoKey(repository)
repo_key.load(None, Passphrase.env_passphrase())
@ -131,22 +151,25 @@ class ArchiverTestCase(ArchiverTestCaseBase):
assert repo_key.crypt_key == backup_key.crypt_key
with Repository(self.repository_path) as repository:
with Repository(repo_path) as repository:
repository.save_key(b"")
self.cmd(f"--repo={self.repository_location}", "key", "import", export_file)
cmd(archiver, f"--repo={repo_location}", "key", "import", export_file)
with Repository(self.repository_path) as repository:
with Repository(repo_path) as repository:
repo_key2 = AESOCBRepoKey(repository)
repo_key2.load(None, Passphrase.env_passphrase())
assert repo_key2.crypt_key == repo_key2.crypt_key
def test_key_export_qr(self):
export_file = self.output_path + "/exported.html"
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
repo_id = self._extract_repository_id(self.repository_path)
self.cmd(f"--repo={self.repository_location}", "key", "export", "--qr-html", export_file)
def test_key_export_qr(archivers, request):
archiver = request.getfixturevalue(archivers)
repo_location, repo_path, output_path = archiver.repository_location, archiver.repository_path, archiver.output_path
export_file = output_path + "/exported.html"
cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
repo_id = _extract_repository_id(repo_path)
cmd(archiver, f"--repo={repo_location}", "key", "export", "--qr-html", export_file)
with open(export_file, encoding="utf-8") as fd:
export_contents = fd.read()
@ -155,62 +178,68 @@ class ArchiverTestCase(ArchiverTestCaseBase):
assert export_contents.startswith("<!doctype html>")
assert export_contents.endswith("</html>\n")
def test_key_export_directory(self):
export_directory = self.output_path + "/exported"
def test_key_export_directory(archivers, request):
archiver = request.getfixturevalue(archivers)
repo_location, output_path = archiver.repository_location, archiver.output_path
export_directory = output_path + "/exported"
os.mkdir(export_directory)
cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
cmd(archiver, f"--repo={repo_location}", "key", "export", export_directory, exit_code=EXIT_ERROR)
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
self.cmd(f"--repo={self.repository_location}", "key", "export", export_directory, exit_code=EXIT_ERROR)
def test_key_export_qr_directory(self):
export_directory = self.output_path + "/exported"
def test_key_export_qr_directory(archivers, request):
archiver = request.getfixturevalue(archivers)
repo_location, output_path = archiver.repository_location, archiver.output_path
export_directory = output_path + "/exported"
os.mkdir(export_directory)
cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
cmd(archiver, f"--repo={repo_location}", "key", "export", "--qr-html", export_directory, exit_code=EXIT_ERROR)
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
self.cmd(
f"--repo={self.repository_location}", "key", "export", "--qr-html", export_directory, exit_code=EXIT_ERROR
)
def test_key_import_errors(archivers, request):
archiver = request.getfixturevalue(archivers)
repo_location, output_path = archiver.repository_location, archiver.output_path
export_file = output_path + "/exported"
cmd(archiver, f"--repo={repo_location}", "rcreate", KF_ENCRYPTION)
def test_key_import_errors(self):
export_file = self.output_path + "/exported"
self.cmd(f"--repo={self.repository_location}", "rcreate", KF_ENCRYPTION)
self.cmd(f"--repo={self.repository_location}", "key", "import", export_file, exit_code=EXIT_ERROR)
cmd(archiver, f"--repo={repo_location}", "key", "import", export_file, exit_code=EXIT_ERROR)
with open(export_file, "w") as fd:
fd.write("something not a key\n")
if self.FORK_DEFAULT:
self.cmd(f"--repo={self.repository_location}", "key", "import", export_file, exit_code=2)
if archiver.FORK_DEFAULT:
cmd(archiver, f"--repo={repo_location}", "key", "import", export_file, exit_code=2)
else:
with pytest.raises(NotABorgKeyFile):
self.cmd(f"--repo={self.repository_location}", "key", "import", export_file)
cmd(archiver, f"--repo={repo_location}", "key", "import", export_file)
with open(export_file, "w") as fd:
fd.write("BORG_KEY a0a0a0\n")
if self.FORK_DEFAULT:
self.cmd(f"--repo={self.repository_location}", "key", "import", export_file, exit_code=2)
if archiver.FORK_DEFAULT:
cmd(archiver, f"--repo={repo_location}", "key", "import", export_file, exit_code=2)
else:
with pytest.raises(RepoIdMismatch):
self.cmd(f"--repo={self.repository_location}", "key", "import", export_file)
cmd(archiver, f"--repo={repo_location}", "key", "import", export_file)
def test_key_export_paperkey(self):
def test_key_export_paperkey(archivers, request):
archiver = request.getfixturevalue(archivers)
repo_location, repo_path, output_path = archiver.repository_location, archiver.repository_path, archiver.output_path
repo_id = "e294423506da4e1ea76e8dcdf1a3919624ae3ae496fddf905610c351d3f09239"
export_file = self.output_path + "/exported"
self.cmd(f"--repo={self.repository_location}", "rcreate", KF_ENCRYPTION)
self._set_repository_id(self.repository_path, unhexlify(repo_id))
export_file = output_path + "/exported"
cmd(archiver, f"--repo={repo_location}", "rcreate", KF_ENCRYPTION)
_set_repository_id(repo_path, unhexlify(repo_id))
key_file = self.keys_path + "/" + os.listdir(self.keys_path)[0]
key_file = archiver.keys_path + "/" + os.listdir(archiver.keys_path)[0]
with open(key_file, "w") as fd:
fd.write(CHPOKeyfileKey.FILE_ID + " " + repo_id + "\n")
fd.write(b2a_base64(b"abcdefghijklmnopqrstu").decode())
self.cmd(f"--repo={self.repository_location}", "key", "export", "--paper", export_file)
cmd(archiver, f"--repo={repo_location}", "key", "export", "--paper", export_file)
with open(export_file) as fd:
export_contents = fd.read()
@ -226,12 +255,15 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02
"""
)
def test_key_import_paperkey(self):
repo_id = "e294423506da4e1ea76e8dcdf1a3919624ae3ae496fddf905610c351d3f09239"
self.cmd(f"--repo={self.repository_location}", "rcreate", KF_ENCRYPTION)
self._set_repository_id(self.repository_path, unhexlify(repo_id))
key_file = self.keys_path + "/" + os.listdir(self.keys_path)[0]
def test_key_import_paperkey(archivers, request):
archiver = request.getfixturevalue(archivers)
repo_location, repo_path, keys_path = archiver.repository_location, archiver.repository_path, archiver.keys_path
repo_id = "e294423506da4e1ea76e8dcdf1a3919624ae3ae496fddf905610c351d3f09239"
cmd(archiver, f"--repo={repo_location}", "rcreate", KF_ENCRYPTION)
_set_repository_id(repo_path, unhexlify(repo_id))
key_file = keys_path + "/" + os.listdir(keys_path)[0]
with open(key_file, "w") as fd:
fd.write(AESOCBKeyfileKey.FILE_ID + " " + repo_id + "\n")
fd.write(b2a_base64(b"abcdefghijklmnopqrstu").decode())
@ -262,45 +294,45 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02
# print(i.to_bytes(2, 'big'))
# break
self.cmd(f"--repo={self.repository_location}", "key", "import", "--paper", input=typed_input)
cmd(archiver, f"--repo={repo_location}", "key", "import", "--paper", input=typed_input)
# Test abort paths
typed_input = b"\ny\n"
self.cmd(f"--repo={self.repository_location}", "key", "import", "--paper", input=typed_input)
cmd(archiver, f"--repo={repo_location}", "key", "import", "--paper", input=typed_input)
typed_input = b"2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02\n\ny\n"
self.cmd(f"--repo={self.repository_location}", "key", "import", "--paper", input=typed_input)
cmd(archiver, f"--repo={repo_location}", "key", "import", "--paper", input=typed_input)
def test_init_defaults_to_argon2(self):
def test_init_defaults_to_argon2(archivers, request):
"""https://github.com/borgbackup/borg/issues/747#issuecomment-1076160401"""
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
with Repository(self.repository_path) as repository:
archiver = request.getfixturevalue(archivers)
repo_location, repo_path = archiver.repository_location, archiver.repository_path
cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
with Repository(repo_path) as repository:
key = msgpack.unpackb(a2b_base64(repository.load_key()))
assert key["algorithm"] == "argon2 chacha20-poly1305"
def test_change_passphrase_does_not_change_algorithm_argon2(self):
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
def test_change_passphrase_does_not_change_algorithm_argon2(archivers, request):
archiver = request.getfixturevalue(archivers)
repo_location, repo_path = archiver.repository_location, archiver.repository_path
cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
os.environ["BORG_NEW_PASSPHRASE"] = "newpassphrase"
cmd(archiver, f"--repo={repo_location}", "key", "change-passphrase")
self.cmd(f"--repo={self.repository_location}", "key", "change-passphrase")
with Repository(self.repository_path) as repository:
key = msgpack.unpackb(a2b_base64(repository.load_key()))
assert key["algorithm"] == "argon2 chacha20-poly1305"
def test_change_location_does_not_change_algorithm_argon2(self):
self.cmd(f"--repo={self.repository_location}", "rcreate", KF_ENCRYPTION)
self.cmd(f"--repo={self.repository_location}", "key", "change-location", "repokey")
with Repository(self.repository_path) as repository:
with Repository(repo_path) as repository:
key = msgpack.unpackb(a2b_base64(repository.load_key()))
assert key["algorithm"] == "argon2 chacha20-poly1305"
class RemoteArchiverTestCase(RemoteArchiverTestCaseBase, ArchiverTestCase):
"""run the same tests, but with a remote repository"""
def test_change_location_does_not_change_algorithm_argon2(archivers, request):
archiver = request.getfixturevalue(archivers)
repo_location, repo_path = archiver.repository_location, archiver.repository_path
cmd(archiver, f"--repo={repo_location}", "rcreate", KF_ENCRYPTION)
cmd(archiver, f"--repo={repo_location}", "key", "change-location", "repokey")
@unittest.skipUnless("binary" in BORG_EXES, "no borg.exe available")
class ArchiverTestCaseBinary(ArchiverTestCaseBinaryBase, ArchiverTestCase):
"""runs the same tests, but via the borg binary"""
with Repository(repo_path) as repository:
key = msgpack.unpackb(a2b_base64(repository.load_key()))
assert key["algorithm"] == "argon2 chacha20-poly1305"