always implicitly require manifest TAMs

remove a lot of complexity from the code that was just there to
support legacy borg versions < 1.0.9 which did not TAM authenticate
the manifest.

since then, borg writes TAM authentication to the manifest,
even if the repo is unencrypted.
if the repo is unencrypted, it did not check the somehow pointless
authentication that was generated without any secret, but
if we add that fake TAM, we can also verify the fake TAM.

if somebody explicitly switches off all crypto, they can not
expect authentication.

for everybody else, borg now always generates the TAM and also
verifies it.
This commit is contained in:
Thomas Waldmann 2023-06-12 23:49:53 +02:00
parent 4ded3620c5
commit 21d4407170
No known key found for this signature in database
GPG Key ID: 243ACFA951F78E01
6 changed files with 19 additions and 95 deletions

View File

@ -63,7 +63,7 @@ class KeysMixIn:
print("Change not needed or not supported.")
return EXIT_WARNING
for name in ("repository_id", "crypt_key", "id_key", "chunk_seed", "tam_required", "sessionid", "cipher"):
for name in ("repository_id", "crypt_key", "id_key", "chunk_seed", "sessionid", "cipher"):
value = getattr(key, name)
setattr(key_new, name, value)

View File

@ -3,7 +3,7 @@ import argparse
from ._common import with_repository, with_other_repository, Highlander
from ..cache import Cache
from ..constants import * # NOQA
from ..crypto.key import key_creator, key_argument_names, tam_required_file
from ..crypto.key import key_creator, key_argument_names
from ..helpers import EXIT_WARNING
from ..helpers import location_validator, Location
from ..helpers import parse_storage_quota
@ -35,10 +35,6 @@ class RCreateMixIn:
repository.commit(compact=False)
with Cache(repository, manifest, warn_if_unencrypted=False):
pass
if key.tam_required:
tam_file = tam_required_file(repository)
open(tam_file, "w").close()
if key.NAME != "plaintext":
logger.warning(
"\n"

View File

@ -136,16 +136,6 @@ def key_factory(repository, manifest_chunk, *, ro_cls=RepoObj):
return identify_key(manifest_data).detect(repository, manifest_data)
def tam_required_file(repository):
security_dir = get_security_dir(bin_to_hex(repository.id), legacy=(repository.version == 1))
return os.path.join(security_dir, "tam_required")
def tam_required(repository):
file = tam_required_file(repository)
return os.path.isfile(file)
def uses_same_chunker_secret(other_key, key):
"""is the chunker secret the same?"""
# avoid breaking the deduplication by a different chunker secret
@ -211,7 +201,6 @@ class KeyBase:
self.TYPE_STR = bytes([self.TYPE])
self.repository = repository
self.target = None # key location file path / repo obj
self.tam_required = True
self.copy_crypt_key = False
def id_hash(self, data):
@ -253,39 +242,25 @@ class KeyBase:
tam["hmac"] = hmac.digest(tam_key, packed, "sha512")
return msgpack.packb(metadata_dict)
def unpack_and_verify_manifest(self, data, force_tam_not_required=False):
"""Unpack msgpacked *data* and return (object, did_verify)."""
def unpack_and_verify_manifest(self, data):
"""Unpack msgpacked *data* and return manifest."""
if data.startswith(b"\xc1" * 4):
# This is a manifest from the future, we can't read it.
raise UnsupportedManifestError()
tam_required = self.tam_required
if force_tam_not_required and tam_required:
logger.warning("Manifest authentication DISABLED.")
tam_required = False
data = bytearray(data)
unpacker = get_limited_unpacker("manifest")
unpacker.feed(data)
unpacked = unpacker.unpack()
if AUTHENTICATED_NO_KEY:
return unpacked, True # True is a lie.
return unpacked
if "tam" not in unpacked:
if tam_required:
raise TAMRequiredError(self.repository._location.canonical_path())
else:
logger.debug("Manifest TAM not found and not required")
return unpacked, False
raise TAMRequiredError(self.repository._location.canonical_path())
tam = unpacked.pop("tam", None)
if not isinstance(tam, dict):
raise TAMInvalid()
tam_type = tam.get("type", "<none>")
if tam_type != "HKDF_HMAC_SHA512":
if tam_required:
raise TAMUnsupportedSuiteError(repr(tam_type))
else:
logger.debug(
"Ignoring manifest TAM made with unsupported suite, since TAM is not required: %r", tam_type
)
return unpacked, False
raise TAMUnsupportedSuiteError(repr(tam_type))
tam_hmac = tam.get("hmac")
tam_salt = tam.get("salt")
if not isinstance(tam_salt, (bytes, str)) or not isinstance(tam_hmac, (bytes, str)):
@ -299,7 +274,7 @@ class KeyBase:
if not hmac.compare_digest(calculated_hmac, tam_hmac):
raise TAMInvalid()
logger.debug("TAM-verified manifest")
return unpacked, True
return unpacked
def unpack_and_verify_archive(self, data, force_tam_not_required=False):
"""Unpack msgpacked *data* and return (object, did_verify)."""
@ -357,10 +332,6 @@ class PlaintextKey(KeyBase):
chunk_seed = 0
logically_encrypted = False
def __init__(self, repository):
super().__init__(repository)
self.tam_required = False
@classmethod
def create(cls, repository, args, **kw):
logger.info('Encryption NOT enabled.\nUse the "--encryption=repokey|keyfile" to enable encryption.')
@ -526,7 +497,6 @@ class FlexiKey:
self.crypt_key = key.crypt_key
self.id_key = key.id_key
self.chunk_seed = key.chunk_seed
self.tam_required = key.get("tam_required", tam_required(self.repository))
return True
return False
@ -639,7 +609,6 @@ class FlexiKey:
crypt_key=self.crypt_key,
id_key=self.id_key,
chunk_seed=self.chunk_seed,
tam_required=self.tam_required,
)
data = self.encrypt_key_file(msgpack.packb(key.as_dict()), passphrase, algorithm)
key_data = "\n".join(textwrap.wrap(b2a_base64(data).decode("ascii")))

View File

@ -467,7 +467,7 @@ cdef class Key(PropDict):
crypt_key = PropDictProperty(bytes)
id_key = PropDictProperty(bytes)
chunk_seed = PropDictProperty(int)
tam_required = PropDictProperty(bool)
tam_required = PropDictProperty(bool) # legacy. borg now implicitly always requires TAM.
def update_internal(self, d):
# legacy support for migration (data from old msgpacks comes in as bytes always, but sometimes we want str)
@ -650,7 +650,6 @@ class ItemDiff:
self._can_compare_chunk_ids = can_compare_chunk_ids
self._chunk_1 = chunk_1
self._chunk_2 = chunk_2
self._changes = {}
if self._item1.is_link() or self._item2.is_link():

View File

@ -1,6 +1,4 @@
import enum
import os
import os.path
import re
from collections import abc, namedtuple
from datetime import datetime, timedelta, timezone
@ -229,7 +227,6 @@ class Manifest:
self.repo_objs = ro_cls(key)
self.repository = repository
self.item_keys = frozenset(item_keys) if item_keys is not None else ITEM_KEYS
self.tam_verified = False
self.timestamp = None
@property
@ -241,9 +238,9 @@ class Manifest:
return parse_timestamp(self.timestamp)
@classmethod
def load(cls, repository, operations, key=None, force_tam_not_required=False, *, ro_cls=RepoObj):
def load(cls, repository, operations, key=None, *, ro_cls=RepoObj):
from .item import ManifestItem
from .crypto.key import key_factory, tam_required_file, tam_required
from .crypto.key import key_factory
from .repository import Repository
try:
@ -254,9 +251,7 @@ class Manifest:
key = key_factory(repository, cdata, ro_cls=ro_cls)
manifest = cls(key, repository, ro_cls=ro_cls)
_, data = manifest.repo_objs.parse(cls.MANIFEST_ID, cdata)
manifest_dict, manifest.tam_verified = key.unpack_and_verify_manifest(
data, force_tam_not_required=force_tam_not_required
)
manifest_dict = key.unpack_and_verify_manifest(data)
m = ManifestItem(internal_dict=manifest_dict)
manifest.id = manifest.repo_objs.id_hash(data)
if m.get("version") not in (1, 2):
@ -268,17 +263,6 @@ class Manifest:
manifest.item_keys = ITEM_KEYS
manifest.item_keys |= frozenset(m.config.get("item_keys", [])) # new location of item_keys since borg2
manifest.item_keys |= frozenset(m.get("item_keys", [])) # legacy: borg 1.x: item_keys not in config yet
if manifest.tam_verified:
manifest_required = manifest.config.get("tam_required", False)
security_required = tam_required(repository)
if manifest_required and not security_required:
logger.debug("Manifest is TAM verified and says TAM is required, updating security database...")
file = tam_required_file(repository)
open(file, "w").close()
if not manifest_required and security_required:
logger.debug("Manifest is TAM verified and says TAM is *not* required, updating security database...")
os.unlink(tam_required_file(repository))
manifest.check_repository_compatibility(operations)
return manifest
@ -310,8 +294,6 @@ class Manifest:
def write(self):
from .item import ManifestItem
if self.key.tam_required:
self.config["tam_required"] = True
# self.timestamp needs to be strictly monotonically increasing. Clocks often are not set correctly
if self.timestamp is None:
self.timestamp = datetime.now(tz=timezone.utc).isoformat(timespec="microseconds")
@ -331,7 +313,6 @@ class Manifest:
timestamp=self.timestamp,
config=StableDict(self.config),
)
self.tam_verified = True
data = self.key.pack_and_authenticate_metadata(manifest.as_dict())
self.id = self.repo_objs.id_hash(data)
self.repository.put(self.MANIFEST_ID, self.repo_objs.format(self.MANIFEST_ID, {}, data))

View File

@ -272,39 +272,19 @@ class TestTAM:
with pytest.raises(msgpack.UnpackException):
key.unpack_and_verify_manifest(blob)
def test_missing_when_required(self, key):
blob = msgpack.packb({})
with pytest.raises(TAMRequiredError):
key.unpack_and_verify_manifest(blob)
with pytest.raises(TAMRequiredError):
key.unpack_and_verify_archive(blob)
def test_missing(self, key):
blob = msgpack.packb({})
key.tam_required = False
unpacked, verified = key.unpack_and_verify_manifest(blob)
assert unpacked == {}
assert not verified
unpacked, verified, _ = key.unpack_and_verify_archive(blob)
assert unpacked == {}
assert not verified
def test_unknown_type_when_required(self, key):
blob = msgpack.packb({"tam": {"type": "HMAC_VOLLBIT"}})
with pytest.raises(TAMUnsupportedSuiteError):
with pytest.raises(TAMRequiredError):
key.unpack_and_verify_manifest(blob)
with pytest.raises(TAMUnsupportedSuiteError):
with pytest.raises(TAMRequiredError):
key.unpack_and_verify_archive(blob)
def test_unknown_type(self, key):
blob = msgpack.packb({"tam": {"type": "HMAC_VOLLBIT"}})
key.tam_required = False
unpacked, verified = key.unpack_and_verify_manifest(blob)
assert unpacked == {}
assert not verified
unpacked, verified, _ = key.unpack_and_verify_archive(blob)
assert unpacked == {}
assert not verified
with pytest.raises(TAMUnsupportedSuiteError):
key.unpack_and_verify_manifest(blob)
with pytest.raises(TAMUnsupportedSuiteError):
key.unpack_and_verify_archive(blob)
@pytest.mark.parametrize(
"tam, exc",
@ -360,8 +340,7 @@ class TestTAM:
unpacked = msgpack.unpackb(blob)
assert unpacked["tam"]["type"] == "HKDF_HMAC_SHA512"
unpacked, verified = key.unpack_and_verify_manifest(blob)
assert verified
unpacked = key.unpack_and_verify_manifest(blob)
assert unpacked["foo"] == "bar"
assert "tam" not in unpacked