add tests for archive TAMs

This commit is contained in:
Thomas Waldmann 2023-07-28 15:58:03 +02:00
parent 5cd2060345
commit 56da398711
No known key found for this signature in database
GPG Key ID: 243ACFA951F78E01
2 changed files with 120 additions and 15 deletions

View File

@ -8,7 +8,7 @@ import pytest
from ...cache import Cache, LocalCache from ...cache import Cache, LocalCache
from ...constants import * # NOQA from ...constants import * # NOQA
from ...crypto.key import TAMRequiredError from ...crypto.key import TAMRequiredError
from ...helpers import Location, get_security_dir, bin_to_hex from ...helpers import Location, get_security_dir, bin_to_hex, archive_ts_now
from ...helpers import EXIT_ERROR from ...helpers import EXIT_ERROR
from ...helpers import msgpack from ...helpers import msgpack
from ...manifest import Manifest, MandatoryFeatureUnsupported from ...manifest import Manifest, MandatoryFeatureUnsupported
@ -322,7 +322,7 @@ def test_check_cache(archivers, request):
check_cache(archiver) check_cache(archiver)
# Begin manifest tests # Begin manifest TAM tests
def spoof_manifest(repository): def spoof_manifest(repository):
with repository: with repository:
manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
@ -380,6 +380,62 @@ def test_not_required(archiver):
cmd(archiver, "rlist") cmd(archiver, "rlist")
# Begin archive TAM tests
def write_archive_without_tam(repository, archive_name):
manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
archive_data = msgpack.packb(
{
"version": 2,
"name": archive_name,
"item_ptrs": [],
"command_line": "",
"hostname": "",
"username": "",
"time": archive_ts_now().isoformat(timespec="microseconds"),
"size": 0,
"nfiles": 0,
}
)
archive_id = manifest.repo_objs.id_hash(archive_data)
cdata = manifest.repo_objs.format(archive_id, {}, archive_data)
repository.put(archive_id, cdata)
manifest.archives[archive_name] = (archive_id, datetime.now())
manifest.write()
repository.commit(compact=False)
def test_check_rebuild_manifest(archiver):
cmd(archiver, "rcreate", RK_ENCRYPTION)
create_src_archive(archiver, "archive_tam")
repository = Repository(archiver.repository_path, exclusive=True)
with repository:
write_archive_without_tam(repository, "archive_no_tam")
repository.delete(Manifest.MANIFEST_ID) # kill manifest, so check has to rebuild it
repository.commit(compact=False)
cmd(archiver, "check", "--repair")
output = cmd(archiver, "rlist", "--format='{name} tam:{tam}{NL}'")
assert "archive_tam tam:verified" in output # TAM-verified archive is in rebuilt manifest
assert "archive_no_tam" not in output # check got rid of untrusted not TAM-verified archive
def test_check_rebuild_refcounts(archiver):
cmd(archiver, "rcreate", RK_ENCRYPTION)
create_src_archive(archiver, "archive_tam")
archive_id_pre_check = cmd(archiver, "rlist", "--format='{name} {id}{NL}'")
repository = Repository(archiver.repository_path, exclusive=True)
with repository:
write_archive_without_tam(repository, "archive_no_tam")
output = cmd(archiver, "rlist", "--format='{name} tam:{tam}{NL}'")
assert "archive_tam tam:verified" in output # good
assert "archive_no_tam tam:none" in output # could be borg < 1.0.9 archive or fake
cmd(archiver, "check", "--repair")
output = cmd(archiver, "rlist", "--format='{name} tam:{tam}{NL}'")
assert "archive_tam tam:verified" in output # TAM-verified archive still there
assert "archive_no_tam" not in output # check got rid of untrusted not TAM-verified archive
archive_id_post_check = cmd(archiver, "rlist", "--format='{name} {id}{NL}'")
assert archive_id_post_check == archive_id_pre_check # rebuild_refcounts didn't change archive_tam archive id
# Begin Remote Tests # Begin Remote Tests
def test_remote_repo_restrict_to_path(remote_archiver): def test_remote_repo_restrict_to_path(remote_archiver):
original_location, repo_path = remote_archiver.repository_location, remote_archiver.repository_path original_location, repo_path = remote_archiver.repository_location, remote_archiver.repository_path

View File

@ -11,13 +11,8 @@ from ..crypto.key import AEADKeyBase
from ..crypto.key import AESOCBRepoKey, AESOCBKeyfileKey, CHPORepoKey, CHPOKeyfileKey from ..crypto.key import AESOCBRepoKey, AESOCBKeyfileKey, CHPORepoKey, CHPOKeyfileKey
from ..crypto.key import Blake2AESOCBRepoKey, Blake2AESOCBKeyfileKey, Blake2CHPORepoKey, Blake2CHPOKeyfileKey from ..crypto.key import Blake2AESOCBRepoKey, Blake2AESOCBKeyfileKey, Blake2CHPORepoKey, Blake2CHPOKeyfileKey
from ..crypto.key import ID_HMAC_SHA_256, ID_BLAKE2b_256 from ..crypto.key import ID_HMAC_SHA_256, ID_BLAKE2b_256
from ..crypto.key import ( from ..crypto.key import TAMRequiredError, TAMInvalid, TAMUnsupportedSuiteError, ArchiveTAMInvalid
TAMRequiredError, from ..crypto.key import UnsupportedManifestError, UnsupportedKeyFormatError
TAMInvalid,
TAMUnsupportedSuiteError,
UnsupportedManifestError,
UnsupportedKeyFormatError,
)
from ..crypto.key import identify_key from ..crypto.key import identify_key
from ..crypto.low_level import IntegrityError as IntegrityErrorBase from ..crypto.low_level import IntegrityError as IntegrityErrorBase
from ..helpers import IntegrityError from ..helpers import IntegrityError
@ -281,6 +276,8 @@ class TestTAM:
blob = msgpack.packb({}) blob = msgpack.packb({})
with pytest.raises(TAMRequiredError): with pytest.raises(TAMRequiredError):
key.unpack_and_verify_manifest(blob) key.unpack_and_verify_manifest(blob)
with pytest.raises(TAMRequiredError):
key.unpack_and_verify_archive(blob)
def test_missing(self, key): def test_missing(self, key):
blob = msgpack.packb({}) blob = msgpack.packb({})
@ -288,11 +285,16 @@ class TestTAM:
unpacked, verified = key.unpack_and_verify_manifest(blob) unpacked, verified = key.unpack_and_verify_manifest(blob)
assert unpacked == {} assert unpacked == {}
assert not verified assert not verified
unpacked, verified, _ = key.unpack_and_verify_archive(blob)
assert unpacked == {}
assert not verified
def test_unknown_type_when_required(self, key): def test_unknown_type_when_required(self, key):
blob = msgpack.packb({"tam": {"type": "HMAC_VOLLBIT"}}) blob = msgpack.packb({"tam": {"type": "HMAC_VOLLBIT"}})
with pytest.raises(TAMUnsupportedSuiteError): with pytest.raises(TAMUnsupportedSuiteError):
key.unpack_and_verify_manifest(blob) key.unpack_and_verify_manifest(blob)
with pytest.raises(TAMUnsupportedSuiteError):
key.unpack_and_verify_archive(blob)
def test_unknown_type(self, key): def test_unknown_type(self, key):
blob = msgpack.packb({"tam": {"type": "HMAC_VOLLBIT"}}) blob = msgpack.packb({"tam": {"type": "HMAC_VOLLBIT"}})
@ -300,6 +302,9 @@ class TestTAM:
unpacked, verified = key.unpack_and_verify_manifest(blob) unpacked, verified = key.unpack_and_verify_manifest(blob)
assert unpacked == {} assert unpacked == {}
assert not verified assert not verified
unpacked, verified, _ = key.unpack_and_verify_archive(blob)
assert unpacked == {}
assert not verified
@pytest.mark.parametrize( @pytest.mark.parametrize(
"tam, exc", "tam, exc",
@ -310,11 +315,25 @@ class TestTAM:
(1234, TAMInvalid), (1234, TAMInvalid),
), ),
) )
def test_invalid(self, key, tam, exc): def test_invalid_manifest(self, key, tam, exc):
blob = msgpack.packb({"tam": tam}) blob = msgpack.packb({"tam": tam})
with pytest.raises(exc): with pytest.raises(exc):
key.unpack_and_verify_manifest(blob) key.unpack_and_verify_manifest(blob)
@pytest.mark.parametrize(
"tam, exc",
(
({}, TAMUnsupportedSuiteError),
({"type": b"\xff"}, TAMUnsupportedSuiteError),
(None, ArchiveTAMInvalid),
(1234, ArchiveTAMInvalid),
),
)
def test_invalid_archive(self, key, tam, exc):
blob = msgpack.packb({"tam": tam})
with pytest.raises(exc):
key.unpack_and_verify_archive(blob)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"hmac, salt", "hmac, salt",
(({}, bytes(64)), (bytes(64), {}), (None, bytes(64)), (bytes(64), None)), (({}, bytes(64)), (bytes(64), {}), (None, bytes(64)), (bytes(64), None)),
@ -330,10 +349,12 @@ class TestTAM:
blob = msgpack.packb(data) blob = msgpack.packb(data)
with pytest.raises(TAMInvalid): with pytest.raises(TAMInvalid):
key.unpack_and_verify_manifest(blob) key.unpack_and_verify_manifest(blob)
with pytest.raises(ArchiveTAMInvalid):
key.unpack_and_verify_archive(blob)
def test_round_trip(self, key): def test_round_trip_manifest(self, key):
data = {"foo": "bar"} data = {"foo": "bar"}
blob = key.pack_and_authenticate_metadata(data) blob = key.pack_and_authenticate_metadata(data, context=b"manifest")
assert blob.startswith(b"\x82") assert blob.startswith(b"\x82")
unpacked = msgpack.unpackb(blob) unpacked = msgpack.unpackb(blob)
@ -344,10 +365,23 @@ class TestTAM:
assert unpacked["foo"] == "bar" assert unpacked["foo"] == "bar"
assert "tam" not in unpacked assert "tam" not in unpacked
@pytest.mark.parametrize("which", ("hmac", "salt")) def test_round_trip_archive(self, key):
def test_tampered(self, key, which):
data = {"foo": "bar"} data = {"foo": "bar"}
blob = key.pack_and_authenticate_metadata(data) blob = key.pack_and_authenticate_metadata(data, context=b"archive")
assert blob.startswith(b"\x82")
unpacked = msgpack.unpackb(blob)
assert unpacked["tam"]["type"] == "HKDF_HMAC_SHA512"
unpacked, verified, _ = key.unpack_and_verify_archive(blob)
assert verified
assert unpacked["foo"] == "bar"
assert "tam" not in unpacked
@pytest.mark.parametrize("which", ("hmac", "salt"))
def test_tampered_manifest(self, key, which):
data = {"foo": "bar"}
blob = key.pack_and_authenticate_metadata(data, context=b"manifest")
assert blob.startswith(b"\x82") assert blob.startswith(b"\x82")
unpacked = msgpack.unpackb(blob, object_hook=StableDict) unpacked = msgpack.unpackb(blob, object_hook=StableDict)
@ -359,6 +393,21 @@ class TestTAM:
with pytest.raises(TAMInvalid): with pytest.raises(TAMInvalid):
key.unpack_and_verify_manifest(blob) key.unpack_and_verify_manifest(blob)
@pytest.mark.parametrize("which", ("hmac", "salt"))
def test_tampered_archive(self, key, which):
data = {"foo": "bar"}
blob = key.pack_and_authenticate_metadata(data, context=b"archive")
assert blob.startswith(b"\x82")
unpacked = msgpack.unpackb(blob, object_hook=StableDict)
assert len(unpacked["tam"][which]) == 64
unpacked["tam"][which] = unpacked["tam"][which][0:32] + bytes(32)
assert len(unpacked["tam"][which]) == 64
blob = msgpack.packb(unpacked)
with pytest.raises(ArchiveTAMInvalid):
key.unpack_and_verify_archive(blob)
def test_decrypt_key_file_unsupported_algorithm(): def test_decrypt_key_file_unsupported_algorithm():
"""We will add more algorithms in the future. We should raise a helpful error.""" """We will add more algorithms in the future. We should raise a helpful error."""