add tests for archive TAMs, upgrade

This commit is contained in:
Thomas Waldmann 2023-07-28 15:58:03 +02:00
parent d78ed697ae
commit 5e0632a3d0
No known key found for this signature in database
GPG Key ID: 243ACFA951F78E01
2 changed files with 126 additions and 8 deletions

View File

@ -36,11 +36,11 @@ from ..cache import Cache, LocalCache
from ..chunker import has_seek_hole
from ..constants import * # NOQA
from ..crypto.low_level import bytes_to_long, num_cipher_blocks
from ..crypto.key import KeyfileKeyBase, RepoKey, KeyfileKey, Passphrase, TAMRequiredError
from ..crypto.key import KeyfileKeyBase, RepoKey, KeyfileKey, Passphrase, TAMRequiredError, ArchiveTAMRequiredError
from ..crypto.keymanager import RepoIdMismatch, NotABorgKeyFile
from ..crypto.file_integrity import FileIntegrityError
from ..helpers import Location, get_security_dir
from ..helpers import Manifest, MandatoryFeatureUnsupported
from ..helpers import Manifest, MandatoryFeatureUnsupported, ArchiveInfo
from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR
from ..helpers import bin_to_hex
from ..helpers import MAX_S
@ -4128,6 +4128,70 @@ class ManifestAuthenticationTest(ArchiverTestCaseBase):
assert not self.cmd('list', self.repository_location)
class ArchiveAuthenticationTest(ArchiverTestCaseBase):
def write_archive_without_tam(self, repository, archive_name):
manifest, key = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
archive_data = msgpack.packb({
'version': 1,
'name': archive_name,
'items': [],
'cmdline': '',
'hostname': '',
'username': '',
'time': utcnow().strftime(ISO_FORMAT),
})
archive_id = key.id_hash(archive_data)
repository.put(archive_id, key.encrypt(archive_data))
manifest.archives[archive_name] = (archive_id, datetime.now())
manifest.write()
repository.commit(compact=False)
def test_upgrade_archives_tam(self):
self.cmd('init', '--encryption=repokey', self.repository_location)
self.create_src_archive('archive_tam')
repository = Repository(self.repository_path, exclusive=True)
with repository:
self.write_archive_without_tam(repository, "archive_no_tam")
output = self.cmd('list', '--format="{name} tam:{tam}{NL}"', self.repository_location)
assert 'archive_tam tam:verified' in output # good
assert 'archive_no_tam tam:none' in output # could be borg < 1.0.9 archive or fake
self.cmd('upgrade', '--archives-tam', self.repository_location)
output = self.cmd('list', '--format="{name} tam:{tam}{NL}"', self.repository_location)
assert 'archive_tam tam:verified' in output # still good
assert 'archive_no_tam tam:verified' in output # previously TAM-less archives got a TAM now
def test_check_rebuild_manifest(self):
self.cmd('init', '--encryption=repokey', self.repository_location)
self.create_src_archive('archive_tam')
repository = Repository(self.repository_path, exclusive=True)
with repository:
self.write_archive_without_tam(repository, "archive_no_tam")
repository.delete(Manifest.MANIFEST_ID) # kill manifest, so check has to rebuild it
repository.commit(compact=False)
self.cmd('check', '--repair', self.repository_location)
output = self.cmd('list', '--format="{name} tam:{tam}{NL}"', self.repository_location)
assert 'archive_tam tam:verified' in output # TAM-verified archive is in rebuilt manifest
assert 'archive_no_tam' not in output # check got rid of untrusted not TAM-verified archive
def test_check_rebuild_refcounts(self):
self.cmd('init', '--encryption=repokey', self.repository_location)
self.create_src_archive('archive_tam')
archive_id_pre_check = self.cmd('list', '--format="{name} {id}{NL}"', self.repository_location)
repository = Repository(self.repository_path, exclusive=True)
with repository:
self.write_archive_without_tam(repository, "archive_no_tam")
output = self.cmd('list', '--format="{name} tam:{tam}{NL}"', self.repository_location)
assert 'archive_tam tam:verified' in output # good
assert 'archive_no_tam tam:none' in output # could be borg < 1.0.9 archive or fake
self.cmd('check', '--repair', self.repository_location)
output = self.cmd('list', '--format="{name} tam:{tam}{NL}"', self.repository_location)
assert 'archive_tam tam:verified' in output # TAM-verified archive still there
assert 'archive_no_tam' not in output # check got rid of untrusted not TAM-verified archive
archive_id_post_check = self.cmd('list', '--format="{name} {id}{NL}"', self.repository_location)
assert archive_id_post_check == archive_id_pre_check # rebuild_refcounts didn't change archive_tam archive id
class RemoteArchiverTestCase(ArchiverTestCase):
prefix = '__testsuite__:'

View File

@ -11,6 +11,7 @@ from ..crypto.key import PlaintextKey, PassphraseKey, AuthenticatedKey, RepoKey,
Blake2KeyfileKey, Blake2RepoKey, Blake2AuthenticatedKey
from ..crypto.key import ID_HMAC_SHA_256, ID_BLAKE2b_256
from ..crypto.key import TAMRequiredError, TAMInvalid, TAMUnsupportedSuiteError, UnsupportedManifestError
from ..crypto.key import ArchiveTAMInvalid
from ..crypto.key import identify_key
from ..crypto.low_level import bytes_to_long
from ..crypto.low_level import IntegrityError as IntegrityErrorBase
@ -338,6 +339,8 @@ class TestTAM:
blob = msgpack.packb({})
with pytest.raises(TAMRequiredError):
key.unpack_and_verify_manifest(blob)
with pytest.raises(TAMRequiredError):
key.unpack_and_verify_archive(blob)
def test_missing(self, key):
blob = msgpack.packb({})
@ -345,6 +348,9 @@ class TestTAM:
unpacked, verified = key.unpack_and_verify_manifest(blob)
assert unpacked == {}
assert not verified
unpacked, verified, _ = key.unpack_and_verify_archive(blob)
assert unpacked == {}
assert not verified
def test_unknown_type_when_required(self, key):
blob = msgpack.packb({
@ -354,6 +360,8 @@ class TestTAM:
})
with pytest.raises(TAMUnsupportedSuiteError):
key.unpack_and_verify_manifest(blob)
with pytest.raises(TAMUnsupportedSuiteError):
key.unpack_and_verify_archive(blob)
def test_unknown_type(self, key):
blob = msgpack.packb({
@ -365,6 +373,9 @@ class TestTAM:
unpacked, verified = key.unpack_and_verify_manifest(blob)
assert unpacked == {}
assert not verified
unpacked, verified, _ = key.unpack_and_verify_archive(blob)
assert unpacked == {}
assert not verified
@pytest.mark.parametrize('tam, exc', (
({}, TAMUnsupportedSuiteError),
@ -372,13 +383,26 @@ class TestTAM:
(None, TAMInvalid),
(1234, TAMInvalid),
))
def test_invalid(self, key, tam, exc):
def test_invalid_manifest(self, key, tam, exc):
blob = msgpack.packb({
'tam': tam,
})
with pytest.raises(exc):
key.unpack_and_verify_manifest(blob)
@pytest.mark.parametrize('tam, exc', (
({}, TAMUnsupportedSuiteError),
({'type': b'\xff'}, TAMUnsupportedSuiteError),
(None, ArchiveTAMInvalid),
(1234, ArchiveTAMInvalid),
))
def test_invalid_archive(self, key, tam, exc):
blob = msgpack.packb({
'tam': tam,
})
with pytest.raises(exc):
key.unpack_and_verify_archive(blob)
@pytest.mark.parametrize('hmac, salt', (
({}, bytes(64)),
(bytes(64), {}),
@ -401,10 +425,12 @@ class TestTAM:
blob = msgpack.packb(data)
with pytest.raises(TAMInvalid):
key.unpack_and_verify_manifest(blob)
with pytest.raises(ArchiveTAMInvalid):
key.unpack_and_verify_archive(blob)
def test_round_trip(self, key):
def test_round_trip_manifest(self, key):
data = {'foo': 'bar'}
blob = key.pack_and_authenticate_metadata(data)
blob = key.pack_and_authenticate_metadata(data, context=b"manifest")
assert blob.startswith(b'\x82')
unpacked = msgpack.unpackb(blob)
@ -415,10 +441,23 @@ class TestTAM:
assert unpacked[b'foo'] == b'bar'
assert b'tam' not in unpacked
@pytest.mark.parametrize('which', (b'hmac', b'salt'))
def test_tampered(self, key, which):
def test_round_trip_archive(self, key):
data = {'foo': 'bar'}
blob = key.pack_and_authenticate_metadata(data)
blob = key.pack_and_authenticate_metadata(data, context=b"archive")
assert blob.startswith(b'\x82')
unpacked = msgpack.unpackb(blob)
assert unpacked[b'tam'][b'type'] == b'HKDF_HMAC_SHA512'
unpacked, verified, _ = key.unpack_and_verify_archive(blob)
assert verified
assert unpacked[b'foo'] == b'bar'
assert b'tam' not in unpacked
@pytest.mark.parametrize('which', (b'hmac', b'salt'))
def test_tampered_manifest(self, key, which):
data = {'foo': 'bar'}
blob = key.pack_and_authenticate_metadata(data, context=b"manifest")
assert blob.startswith(b'\x82')
unpacked = msgpack.unpackb(blob, object_hook=StableDict)
@ -429,3 +468,18 @@ class TestTAM:
with pytest.raises(TAMInvalid):
key.unpack_and_verify_manifest(blob)
@pytest.mark.parametrize('which', (b'hmac', b'salt'))
def test_tampered_archive(self, key, which):
data = {'foo': 'bar'}
blob = key.pack_and_authenticate_metadata(data, context=b"archive")
assert blob.startswith(b'\x82')
unpacked = msgpack.unpackb(blob, object_hook=StableDict)
assert len(unpacked[b'tam'][which]) == 64
unpacked[b'tam'][which] = unpacked[b'tam'][which][0:32] + bytes(32)
assert len(unpacked[b'tam'][which]) == 64
blob = msgpack.packb(unpacked)
with pytest.raises(ArchiveTAMInvalid):
key.unpack_and_verify_archive(blob)