remove manifest TAMs

This commit is contained in:
Thomas Waldmann 2023-09-16 02:02:44 +02:00
parent 6a68ad5cd6
commit 1cf62d8fc7
No known key found for this signature in database
GPG Key ID: 243ACFA951F78E01
4 changed files with 15 additions and 192 deletions

View File

@ -21,7 +21,7 @@ from ..helpers import bin_to_hex
from ..helpers.passphrase import Passphrase, PasswordRetriesExceeded, PassphraseWrong
from ..helpers import msgpack
from ..helpers import workarounds
from ..item import Key, EncryptedKey, want_bytes
from ..item import Key, EncryptedKey
from ..manifest import Manifest
from ..platform import SaveFile
from ..repoobj import RepoObj
@ -63,30 +63,6 @@ class UnsupportedKeyFormatError(Error):
"""Your borg key is stored in an unsupported format. Try using a newer version of borg."""
class TAMRequiredError(IntegrityError):
__doc__ = textwrap.dedent(
"""
Manifest is unauthenticated, but it is required for this repository. Is somebody attacking you?
"""
).strip()
traceback = False
class TAMInvalid(IntegrityError):
__doc__ = IntegrityError.__doc__
traceback = False
def __init__(self):
# Error message becomes: "Data integrity error: Manifest authentication did not verify"
super().__init__("Manifest authentication did not verify")
class TAMUnsupportedSuiteError(IntegrityError):
"""Could not verify manifest: Unsupported suite {!r}; a newer version is needed."""
traceback = False
def key_creator(repository, args, *, other_key=None):
for key in AVAILABLE_KEY_TYPES:
if key.ARG_NAME == args.encryption:
@ -214,21 +190,15 @@ class KeyBase:
output_length=64,
)
def pack_and_authenticate_metadata(self, metadata_dict, context=b"manifest", salt=None):
if salt is None:
salt = os.urandom(64)
metadata_dict = StableDict(metadata_dict)
tam = metadata_dict["tam"] = StableDict({"type": "HKDF_HMAC_SHA512", "hmac": bytes(64), "salt": salt})
packed = msgpack.packb(metadata_dict)
tam_key = self._tam_key(salt, context)
tam["hmac"] = hmac.digest(tam_key, packed, "sha512")
return msgpack.packb(metadata_dict)
def pack_metadata(self, metadata_dict):
metadata_dict = StableDict(metadata_dict)
return msgpack.packb(metadata_dict)
def unpack_and_verify_manifest(self, data):
def pack_and_authenticate_metadata(self, metadata_dict, context): # TODO: remove
metadata_dict = StableDict(metadata_dict)
return msgpack.packb(metadata_dict)
def unpack_manifest(self, data):
"""Unpack msgpacked *data* and return manifest."""
if data.startswith(b"\xc1" * 4):
# This is a manifest from the future, we can't read it.
@ -237,29 +207,7 @@ class KeyBase:
unpacker = get_limited_unpacker("manifest")
unpacker.feed(data)
unpacked = unpacker.unpack()
if AUTHENTICATED_NO_KEY:
return unpacked
if "tam" not in unpacked:
raise TAMRequiredError(self.repository._location.canonical_path())
tam = unpacked.pop("tam", None)
if not isinstance(tam, dict):
raise TAMInvalid()
tam_type = tam.get("type", "<none>")
if tam_type != "HKDF_HMAC_SHA512":
raise TAMUnsupportedSuiteError(repr(tam_type))
tam_hmac = tam.get("hmac")
tam_salt = tam.get("salt")
if not isinstance(tam_salt, (bytes, str)) or not isinstance(tam_hmac, (bytes, str)):
raise TAMInvalid()
tam_hmac = want_bytes(tam_hmac) # legacy
tam_salt = want_bytes(tam_salt) # legacy
offset = data.index(tam_hmac)
data[offset : offset + 64] = bytes(64)
tam_key = self._tam_key(tam_salt, context=b"manifest")
calculated_hmac = hmac.digest(tam_key, data, "sha512")
if not hmac.compare_digest(calculated_hmac, tam_hmac):
raise TAMInvalid()
logger.debug("TAM-verified manifest")
unpacked.pop("tam", None) # legacy
return unpacked
def unpack_archive(self, data):

View File

@ -251,7 +251,7 @@ class Manifest:
key = key_factory(repository, cdata, ro_cls=ro_cls)
manifest = cls(key, repository, ro_cls=ro_cls)
_, data = manifest.repo_objs.parse(cls.MANIFEST_ID, cdata, ro_type=ROBJ_MANIFEST)
manifest_dict = key.unpack_and_verify_manifest(data)
manifest_dict = key.unpack_manifest(data)
m = ManifestItem(internal_dict=manifest_dict)
manifest.id = manifest.repo_objs.id_hash(data)
if m.get("version") not in (1, 2):
@ -313,6 +313,6 @@ class Manifest:
timestamp=self.timestamp,
config=StableDict(self.config),
)
data = self.key.pack_and_authenticate_metadata(manifest.as_dict())
data = self.key.pack_metadata(manifest.as_dict())
self.id = self.repo_objs.id_hash(data)
self.repository.put(self.MANIFEST_ID, self.repo_objs.format(self.MANIFEST_ID, {}, data, ro_type=ROBJ_MANIFEST))

View File

@ -1,22 +1,19 @@
import os
import shutil
from datetime import datetime, timezone, timedelta
from unittest.mock import patch
import pytest
from ...cache import Cache, LocalCache
from ...constants import * # NOQA
from ...crypto.key import TAMRequiredError
from ...helpers import Location, get_security_dir, bin_to_hex
from ...helpers import EXIT_ERROR
from ...helpers import msgpack
from ...manifest import Manifest, MandatoryFeatureUnsupported
from ...remote import RemoteRepository, PathNotAllowed
from ...repository import Repository
from .. import llfuse
from .. import changedir
from . import cmd, _extract_repository_id, open_repository, check_cache, create_test_files, create_src_archive
from . import cmd, _extract_repository_id, open_repository, check_cache, create_test_files
from . import _set_repository_id, create_regular_file, assert_creates_file, generate_archiver_tests, RK_ENCRYPTION
pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote") # NOQA
@ -322,66 +319,6 @@ def test_check_cache(archivers, request):
check_cache(archiver)
# Begin manifest TAM tests
def spoof_manifest(repository):
with repository:
manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
cdata = manifest.repo_objs.format(
Manifest.MANIFEST_ID,
{},
msgpack.packb(
{
"version": 1,
"archives": {},
"config": {},
"timestamp": (datetime.now(tz=timezone.utc) + timedelta(days=1)).isoformat(timespec="microseconds"),
}
),
ro_type=ROBJ_MANIFEST,
)
repository.put(Manifest.MANIFEST_ID, cdata)
repository.commit(compact=False)
def test_fresh_init_tam_required(archiver):
cmd(archiver, "rcreate", RK_ENCRYPTION)
repository = Repository(archiver.repository_path, exclusive=True)
with repository:
manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
cdata = manifest.repo_objs.format(
Manifest.MANIFEST_ID,
{},
msgpack.packb(
{
"version": 1,
"archives": {},
"timestamp": (datetime.now(tz=timezone.utc) + timedelta(days=1)).isoformat(timespec="microseconds"),
}
),
ro_type=ROBJ_MANIFEST,
)
repository.put(Manifest.MANIFEST_ID, cdata)
repository.commit(compact=False)
with pytest.raises(TAMRequiredError):
cmd(archiver, "rlist")
def test_not_required(archiver):
cmd(archiver, "rcreate", RK_ENCRYPTION)
create_src_archive(archiver, "archive1234")
repository = Repository(archiver.repository_path, exclusive=True)
# Manifest must be authenticated now
output = cmd(archiver, "rlist", "--debug")
assert "archive1234" in output
assert "TAM-verified manifest" in output
# Try to spoof / modify pre-1.0.9
spoof_manifest(repository)
# Fails
with pytest.raises(TAMRequiredError):
cmd(archiver, "rlist")
# Begin Remote Tests
def test_remote_repo_restrict_to_path(remote_archiver):
original_location, repo_path = remote_archiver.repository_location, remote_archiver.repository_path

View File

@ -11,13 +11,11 @@ from ..crypto.key import AEADKeyBase
from ..crypto.key import AESOCBRepoKey, AESOCBKeyfileKey, CHPORepoKey, CHPOKeyfileKey
from ..crypto.key import Blake2AESOCBRepoKey, Blake2AESOCBKeyfileKey, Blake2CHPORepoKey, Blake2CHPOKeyfileKey
from ..crypto.key import ID_HMAC_SHA_256, ID_BLAKE2b_256
from ..crypto.key import TAMRequiredError, TAMInvalid, TAMUnsupportedSuiteError
from ..crypto.key import UnsupportedManifestError, UnsupportedKeyFormatError
from ..crypto.key import identify_key
from ..crypto.low_level import IntegrityError as IntegrityErrorBase
from ..helpers import IntegrityError
from ..helpers import Location
from ..helpers import StableDict
from ..helpers import msgpack
from ..constants import KEY_ALGORITHMS
@ -266,63 +264,18 @@ class TestTAM:
def test_unpack_future(self, key):
blob = b"\xc1\xc1\xc1\xc1foobar"
with pytest.raises(UnsupportedManifestError):
key.unpack_and_verify_manifest(blob)
key.unpack_manifest(blob)
blob = b"\xc1\xc1\xc1"
with pytest.raises(msgpack.UnpackException):
key.unpack_and_verify_manifest(blob)
def test_missing(self, key):
blob = msgpack.packb({})
with pytest.raises(TAMRequiredError):
key.unpack_and_verify_manifest(blob)
def test_unknown_type(self, key):
blob = msgpack.packb({"tam": {"type": "HMAC_VOLLBIT"}})
with pytest.raises(TAMUnsupportedSuiteError):
key.unpack_and_verify_manifest(blob)
@pytest.mark.parametrize(
"tam, exc",
(
({}, TAMUnsupportedSuiteError),
({"type": b"\xff"}, TAMUnsupportedSuiteError),
(None, TAMInvalid),
(1234, TAMInvalid),
),
)
def test_invalid_manifest(self, key, tam, exc):
blob = msgpack.packb({"tam": tam})
with pytest.raises(exc):
key.unpack_and_verify_manifest(blob)
@pytest.mark.parametrize(
"hmac, salt",
(({}, bytes(64)), (bytes(64), {}), (None, bytes(64)), (bytes(64), None)),
ids=["ed-b64", "b64-ed", "n-b64", "b64-n"],
)
def test_wrong_types(self, key, hmac, salt):
data = {"tam": {"type": "HKDF_HMAC_SHA512", "hmac": hmac, "salt": salt}}
tam = data["tam"]
if hmac is None:
del tam["hmac"]
if salt is None:
del tam["salt"]
blob = msgpack.packb(data)
with pytest.raises(TAMInvalid):
key.unpack_and_verify_manifest(blob)
key.unpack_manifest(blob)
def test_round_trip_manifest(self, key):
data = {"foo": "bar"}
blob = key.pack_and_authenticate_metadata(data, context=b"manifest")
assert blob.startswith(b"\x82")
unpacked = msgpack.unpackb(blob)
assert unpacked["tam"]["type"] == "HKDF_HMAC_SHA512"
unpacked = key.unpack_and_verify_manifest(blob)
blob = key.pack_metadata(data)
unpacked = key.unpack_manifest(blob)
assert unpacked["foo"] == "bar"
assert "tam" not in unpacked
assert "tam" not in unpacked # legacy
def test_round_trip_archive(self, key):
data = {"foo": "bar"}
@ -331,21 +284,6 @@ class TestTAM:
assert unpacked["foo"] == "bar"
assert "tam" not in unpacked # legacy
@pytest.mark.parametrize("which", ("hmac", "salt"))
def test_tampered_manifest(self, key, which):
data = {"foo": "bar"}
blob = key.pack_and_authenticate_metadata(data, context=b"manifest")
assert blob.startswith(b"\x82")
unpacked = msgpack.unpackb(blob, object_hook=StableDict)
assert len(unpacked["tam"][which]) == 64
unpacked["tam"][which] = unpacked["tam"][which][0:32] + bytes(32)
assert len(unpacked["tam"][which]) == 64
blob = msgpack.packb(unpacked)
with pytest.raises(TAMInvalid):
key.unpack_and_verify_manifest(blob)
def test_decrypt_key_file_unsupported_algorithm():
"""We will add more algorithms in the future. We should raise a helpful error."""