tests: use less RepoKey/KeyfileKey

This commit is contained in:
Thomas Waldmann 2022-06-30 00:15:54 +02:00
parent dc2f2f47a8
commit ef24dafb15
4 changed files with 32 additions and 42 deletions

View File

@ -7,7 +7,7 @@ from hashlib import sha256
from ..helpers import Manifest, NoManifestError, Error, yes, bin_to_hex, dash_open
from ..repository import Repository
from .key import KeyfileKey, KeyfileNotFoundError, RepoKeyNotFoundError, KeyBlobStorage, identify_key
from .key import CHPOKeyfileKey, KeyfileNotFoundError, RepoKeyNotFoundError, KeyBlobStorage, identify_key
class UnencryptedRepo(Error):
@ -50,7 +50,7 @@ class KeyManager:
def load_keyblob(self):
if self.keyblob_storage == KeyBlobStorage.KEYFILE:
k = KeyfileKey(self.repository)
k = CHPOKeyfileKey(self.repository)
target = k.find_key()
with open(target) as fd:
self.keyblob = ''.join(fd.readlines()[1:])
@ -65,7 +65,7 @@ class KeyManager:
def store_keyblob(self, args):
if self.keyblob_storage == KeyBlobStorage.KEYFILE:
k = KeyfileKey(self.repository)
k = CHPOKeyfileKey(self.repository)
target = k.get_existing_or_new_target(args)
self.store_keyfile(target)
@ -73,7 +73,7 @@ class KeyManager:
self.repository.save_key(self.keyblob.encode('utf-8'))
def get_keyfile_data(self):
data = f'{KeyfileKey.FILE_ID} {bin_to_hex(self.repository.id)}\n'
data = f'{CHPOKeyfileKey.FILE_ID} {bin_to_hex(self.repository.id)}\n'
data += self.keyblob
if not self.keyblob.endswith('\n'):
data += '\n'
@ -136,7 +136,7 @@ class KeyManager:
fd.write(export)
def import_keyfile(self, args):
file_id = KeyfileKey.FILE_ID
file_id = CHPOKeyfileKey.FILE_ID
first_line = file_id + ' ' + bin_to_hex(self.repository.id) + '\n'
with dash_open(args.path, 'r') as fd:
file_first_line = fd.read(len(first_line))

View File

@ -10,7 +10,7 @@ from .key import TestKey
from ..archive import Statistics
from ..cache import AdHocCache
from ..compress import CompressionSpec
from ..crypto.key import RepoKey
from ..crypto.key import AESOCBRepoKey
from ..hashindex import ChunkIndex, CacheSynchronizer
from ..helpers import Manifest
from ..repository import Repository
@ -218,7 +218,7 @@ class TestAdHocCache:
@pytest.fixture
def key(self, repository, monkeypatch):
monkeypatch.setenv('BORG_PASSPHRASE', 'test')
key = RepoKey.create(repository, TestKey.MockArgs())
key = AESOCBRepoKey.create(repository, TestKey.MockArgs())
key.compressor = CompressionSpec('none').compressor
return key

View File

@ -9,7 +9,7 @@ from ..crypto.low_level import AES256_CTR_HMAC_SHA256, AES256_OCB, CHACHA20_POLY
from ..crypto.low_level import bytes_to_long, bytes_to_int, long_to_bytes
from ..crypto.low_level import hkdf_hmac_sha512
from ..crypto.low_level import AES, hmac_sha256
from ..crypto.key import KeyfileKey, RepoKey, FlexiKey
from ..crypto.key import CHPOKeyfileKey, AESOCBRepoKey, FlexiKey
from ..helpers import msgpack
from . import BaseTestCase
@ -264,7 +264,7 @@ def test_decrypt_key_file_argon2_chacha20_poly1305():
'algorithm': 'argon2 chacha20-poly1305',
'data': envelope,
})
key = KeyfileKey(None)
key = CHPOKeyfileKey(None)
decrypted = key.decrypt_key_file(encrypted, "hello, pass phrase")
@ -286,7 +286,7 @@ def test_decrypt_key_file_pbkdf2_sha256_aes256_ctr_hmac_sha256():
'data': data,
'hash': hash,
})
key = KeyfileKey(None)
key = CHPOKeyfileKey(None)
decrypted = key.decrypt_key_file(encrypted, passphrase)
@ -325,7 +325,7 @@ def test_repo_key_detect_does_not_raise_integrity_error(getpass, monkeypatch):
repository = MagicMock(id=b'repository_id')
getpass.return_value = "hello, pass phrase"
monkeypatch.setenv('BORG_DISPLAY_PASSPHRASE', 'no')
RepoKey.create(repository, args=MagicMock(key_algorithm='argon2'))
AESOCBRepoKey.create(repository, args=MagicMock(key_algorithm='argon2'))
repository.load_key.return_value = repository.save_key.call_args.args[0]
RepoKey.detect(repository, manifest_data=None)
AESOCBRepoKey.detect(repository, manifest_data=None)

View File

@ -8,9 +8,10 @@ from unittest.mock import MagicMock
import pytest
from ..crypto.key import bin_to_hex
from ..crypto.key import PlaintextKey, AuthenticatedKey, RepoKey, KeyfileKey, \
Blake2KeyfileKey, Blake2RepoKey, Blake2AuthenticatedKey, \
AESOCBKeyfileKey, AESOCBRepoKey, CHPOKeyfileKey, CHPORepoKey
from ..crypto.key import PlaintextKey, AuthenticatedKey, Blake2AuthenticatedKey
from ..crypto.key import RepoKey, KeyfileKey, Blake2RepoKey, Blake2KeyfileKey
from ..crypto.key import AESOCBRepoKey, AESOCBKeyfileKey, CHPORepoKey, CHPOKeyfileKey
from ..crypto.key import Blake2AESOCBRepoKey, Blake2AESOCBKeyfileKey, Blake2CHPORepoKey, Blake2CHPOKeyfileKey
from ..crypto.key import ID_HMAC_SHA_256, ID_BLAKE2b_256
from ..crypto.key import TAMRequiredError, TAMInvalid, TAMUnsupportedSuiteError, UnsupportedManifestError, UnsupportedKeyFormatError
from ..crypto.key import identify_key
@ -76,16 +77,17 @@ class TestKey:
return tmpdir
@pytest.fixture(params=(
# not encrypted
PlaintextKey,
AuthenticatedKey,
KeyfileKey,
RepoKey,
AuthenticatedKey,
Blake2KeyfileKey,
Blake2RepoKey,
Blake2AuthenticatedKey,
AuthenticatedKey, Blake2AuthenticatedKey,
# legacy crypto
KeyfileKey, Blake2KeyfileKey,
RepoKey, Blake2RepoKey,
# new crypto
AESOCBKeyfileKey, AESOCBRepoKey,
Blake2AESOCBKeyfileKey, Blake2AESOCBRepoKey,
CHPOKeyfileKey, CHPORepoKey,
Blake2CHPOKeyfileKey, Blake2CHPORepoKey,
))
def key(self, request, monkeypatch):
monkeypatch.setenv('BORG_PASSPHRASE', 'test')
@ -143,33 +145,21 @@ class TestKey:
id = key.id_hash(chunk)
assert chunk == key2.decrypt(id, key.encrypt(id, chunk))
def test_keyfile_nonce_rollback_protection(self, monkeypatch, keys_dir):
monkeypatch.setenv('BORG_PASSPHRASE', 'test')
repository = self.MockRepository()
with open(os.path.join(get_security_dir(repository.id_str), 'nonce'), "w") as fd:
fd.write("0000000000002000")
key = KeyfileKey.create(repository, self.MockArgs())
chunk = b'ABC'
id = key.id_hash(chunk)
data = key.encrypt(id, chunk)
assert key.cipher.extract_iv(data) == 0x2000
assert key.decrypt(id, data) == chunk
def test_keyfile_kfenv(self, tmpdir, monkeypatch):
keyfile = tmpdir.join('keyfile')
monkeypatch.setenv('BORG_KEY_FILE', str(keyfile))
monkeypatch.setenv('BORG_PASSPHRASE', 'testkf')
assert not keyfile.exists()
key = KeyfileKey.create(self.MockRepository(), self.MockArgs())
key = CHPOKeyfileKey.create(self.MockRepository(), self.MockArgs())
assert keyfile.exists()
chunk = b'ABC'
chunk_id = key.id_hash(chunk)
chunk_cdata = key.encrypt(chunk_id, chunk)
key = KeyfileKey.detect(self.MockRepository(), chunk_cdata)
key = CHPOKeyfileKey.detect(self.MockRepository(), chunk_cdata)
assert chunk == key.decrypt(chunk_id, chunk_cdata)
keyfile.remove()
with pytest.raises(FileNotFoundError):
KeyfileKey.detect(self.MockRepository(), chunk_cdata)
CHPOKeyfileKey.detect(self.MockRepository(), chunk_cdata)
def test_keyfile2(self, monkeypatch, keys_dir):
with keys_dir.join('keyfile').open('w') as fd:
@ -275,7 +265,7 @@ class TestTAM:
@pytest.fixture
def key(self, monkeypatch):
monkeypatch.setenv('BORG_PASSPHRASE', 'test')
return KeyfileKey.create(TestKey.MockRepository(), TestKey.MockArgs())
return CHPOKeyfileKey.create(TestKey.MockRepository(), TestKey.MockArgs())
def test_unpack_future(self, key):
blob = b'\xc1\xc1\xc1\xc1foobar'
@ -385,7 +375,7 @@ class TestTAM:
def test_decrypt_key_file_unsupported_algorithm():
"""We will add more algorithms in the future. We should raise a helpful error."""
key = KeyfileKey(None)
key = CHPOKeyfileKey(None)
encrypted = msgpack.packb({
'algorithm': 'THIS ALGORITHM IS NOT SUPPORTED',
'version': 1,
@ -397,7 +387,7 @@ def test_decrypt_key_file_unsupported_algorithm():
def test_decrypt_key_file_v2_is_unsupported():
"""There may eventually be a version 2 of the format. For now we should raise a helpful error."""
key = KeyfileKey(None)
key = CHPOKeyfileKey(None)
encrypted = msgpack.packb({
'version': 2,
})
@ -415,10 +405,10 @@ def test_key_file_roundtrip(monkeypatch, cli_argument, expected_algorithm):
repository = MagicMock(id=b'repository_id')
monkeypatch.setenv('BORG_PASSPHRASE', "hello, pass phrase")
save_me = RepoKey.create(repository, args=MagicMock(key_algorithm=cli_argument))
save_me = AESOCBRepoKey.create(repository, args=MagicMock(key_algorithm=cli_argument))
saved = repository.save_key.call_args.args[0]
repository.load_key.return_value = saved
load_me = RepoKey.detect(repository, manifest_data=None)
load_me = AESOCBRepoKey.detect(repository, manifest_data=None)
assert to_dict(load_me) == to_dict(save_me)
assert msgpack.unpackb(a2b_base64(saved))['algorithm'] == expected_algorithm