Key: crypt_key instead of enc_key + enc_hmac_key, fixes #6611

This commit is contained in:
Thomas Waldmann 2022-08-01 18:07:37 +02:00
parent 3794a3a9dc
commit 3ee69bc7ba
5 changed files with 35 additions and 53 deletions

View File

@ -61,16 +61,7 @@ class KeysMixIn:
print("Change not needed or not supported.")
return EXIT_WARNING
for name in (
"repository_id",
"enc_key",
"enc_hmac_key",
"id_key",
"chunk_seed",
"tam_required",
"sessionid",
"cipher",
):
for name in ("repository_id", "crypt_key", "id_key", "chunk_seed", "tam_required", "sessionid", "cipher"):
value = getattr(key, name)
setattr(key_new, name, value)

View File

@ -216,7 +216,7 @@ class KeyBase:
def _tam_key(self, salt, context):
return hkdf_hmac_sha512(
ikm=self.id_key + self.enc_key + self.enc_hmac_key,
ikm=self.id_key + self.crypt_key,
salt=salt,
info=b"borg-metadata-authentication-" + context,
output_length=64,
@ -345,7 +345,9 @@ class ID_BLAKE2b_256:
def init_from_random_data(self):
super().init_from_random_data()
self.enc_hmac_key = random_blake2b_256_key()
enc_key = os.urandom(32)
enc_hmac_key = random_blake2b_256_key()
self.crypt_key = enc_key + enc_hmac_key
self.id_key = random_blake2b_256_key()
@ -396,13 +398,11 @@ class AESKeyBase(KeyBase):
self.assert_id(id, data)
return data
def init_from_given_data(self, *, enc_key, enc_hmac_key, id_key, chunk_seed):
assert len(enc_key) >= 32
assert len(enc_hmac_key) >= 32
assert len(id_key) >= 32
def init_from_given_data(self, *, crypt_key, id_key, chunk_seed):
assert len(crypt_key) in (32 + 32, 32 + 128)
assert len(id_key) in (32, 128)
assert isinstance(chunk_seed, int)
self.enc_key = enc_key
self.enc_hmac_key = enc_hmac_key
self.crypt_key = crypt_key
self.id_key = id_key
self.chunk_seed = chunk_seed
@ -412,12 +412,11 @@ class AESKeyBase(KeyBase):
# Convert to signed int32
if chunk_seed & 0x80000000:
chunk_seed = chunk_seed - 0xFFFFFFFF - 1
self.init_from_given_data(
enc_key=data[0:32], enc_hmac_key=data[32:64], id_key=data[64:96], chunk_seed=chunk_seed
)
self.init_from_given_data(crypt_key=data[0:64], id_key=data[64:96], chunk_seed=chunk_seed)
def init_ciphers(self, manifest_data=None):
self.cipher = self.CIPHERSUITE(mac_key=self.enc_hmac_key, enc_key=self.enc_key, header_len=1, aad_offset=1)
enc_key, enc_hmac_key = self.crypt_key[0:32], self.crypt_key[32:]
self.cipher = self.CIPHERSUITE(mac_key=enc_hmac_key, enc_key=enc_key, header_len=1, aad_offset=1)
if manifest_data is None:
nonce = 0
else:
@ -465,8 +464,7 @@ class FlexiKey:
if key.version != 1:
raise IntegrityError("Invalid key file header")
self.repository_id = key.repository_id
self.enc_key = key.enc_key
self.enc_hmac_key = key.enc_hmac_key
self.crypt_key = key.crypt_key
self.id_key = key.id_key
self.chunk_seed = key.chunk_seed
self.tam_required = key.get("tam_required", tam_required(self.repository))
@ -579,8 +577,7 @@ class FlexiKey:
key = Key(
version=1,
repository_id=self.repository_id,
enc_key=self.enc_key,
enc_hmac_key=self.enc_hmac_key,
crypt_key=self.crypt_key,
id_key=self.id_key,
chunk_seed=self.chunk_seed,
tam_required=self.tam_required,
@ -608,16 +605,11 @@ class FlexiKey:
raise Error("You must keep the same ID hash (HMAC-SHA256 or BLAKE2b) or deduplication will break.")
if other_key.copy_ae_key:
# give the user the option to use the same authenticated encryption (AE) key
enc_key = other_key.enc_key
enc_hmac_key = other_key.enc_hmac_key
crypt_key = other_key.crypt_key
else:
# borg transfer re-encrypts all data anyway, thus we can default to a new, random AE key
data = os.urandom(64)
enc_key = data[0:32]
enc_hmac_key = data[32:64]
key.init_from_given_data(
enc_key=enc_key, enc_hmac_key=enc_hmac_key, id_key=other_key.id_key, chunk_seed=other_key.chunk_seed
)
crypt_key = os.urandom(64)
key.init_from_given_data(crypt_key=crypt_key, id_key=other_key.id_key, chunk_seed=other_key.chunk_seed)
passphrase = other_key._passphrase
else:
key.init_from_random_data()
@ -901,13 +893,11 @@ class AEADKeyBase(KeyBase):
# decrypting only succeeds if we got the ciphertext we wrote **for that chunk id**.
return data
def init_from_given_data(self, *, enc_key, enc_hmac_key, id_key, chunk_seed):
assert len(enc_key) >= 32
assert len(enc_hmac_key) >= 32
assert len(id_key) >= 32
def init_from_given_data(self, *, crypt_key, id_key, chunk_seed):
assert len(crypt_key) in (32 + 32, 32 + 128)
assert len(id_key) in (32, 128)
assert isinstance(chunk_seed, int)
self.enc_key = enc_key
self.enc_hmac_key = enc_hmac_key
self.crypt_key = crypt_key
self.id_key = id_key
self.chunk_seed = chunk_seed
@ -917,14 +907,12 @@ class AEADKeyBase(KeyBase):
# Convert to signed int32
if chunk_seed & 0x80000000:
chunk_seed = chunk_seed - 0xFFFFFFFF - 1
self.init_from_given_data(
enc_key=data[0:32], enc_hmac_key=data[32:64], id_key=data[64:96], chunk_seed=chunk_seed
)
self.init_from_given_data(crypt_key=data[0:64], id_key=data[64:96], chunk_seed=chunk_seed)
def _get_session_key(self, sessionid):
assert len(sessionid) == 24 # 192bit
key = hkdf_hmac_sha512(
ikm=self.enc_key + self.enc_hmac_key,
ikm=self.crypt_key,
salt=sessionid,
info=b"borg-session-key-" + self.CIPHERSUITE.__name__.encode(),
output_length=32,

View File

@ -439,14 +439,13 @@ class Key(PropDict):
If a Key shall be serialized, give as_dict() method output to msgpack packer.
"""
VALID_KEYS = {'version', 'repository_id', 'enc_key', 'enc_hmac_key', 'id_key', 'chunk_seed', 'tam_required'} # str-typed keys
VALID_KEYS = {'version', 'repository_id', 'crypt_key', 'id_key', 'chunk_seed', 'tam_required'} # str-typed keys
__slots__ = ("_dict", ) # avoid setting attributes not supported by properties
version = PropDict._make_property('version', int)
repository_id = PropDict._make_property('repository_id', bytes)
enc_key = PropDict._make_property('enc_key', bytes)
enc_hmac_key = PropDict._make_property('enc_hmac_key', bytes)
crypt_key = PropDict._make_property('crypt_key', bytes)
id_key = PropDict._make_property('id_key', bytes)
chunk_seed = PropDict._make_property('chunk_seed', int)
tam_required = PropDict._make_property('tam_required', bool)
@ -457,10 +456,14 @@ class Key(PropDict):
k = fix_key(d, k)
if k == 'version':
assert isinstance(v, int)
if k in ('repository_id', 'enc_key', 'enc_hmac_key', 'id_key'):
if k in ('repository_id', 'crypt_key', 'id_key'):
v = fix_bytes_value(d, k)
self._dict[k] = v
if 'crypt_key' not in self._dict: # legacy, we're loading an old key
k = fix_bytes_value(d, 'enc_key') + fix_bytes_value(d, 'enc_hmac_key')
assert isinstance(k, bytes), "k == %r" % k
assert len(k) in (32 + 32, 32 + 128) # 256+256 or 256+1024 bits
self._dict['crypt_key'] = k
class ArchiveItem(PropDict):
"""

View File

@ -3347,7 +3347,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
backup_key = AESOCBKeyfileKey(key.TestKey.MockRepository())
backup_key.load(export_file, Passphrase.env_passphrase())
assert repo_key.enc_key == backup_key.enc_key
assert repo_key.crypt_key == backup_key.crypt_key
with Repository(self.repository_path) as repository:
repository.save_key(b"")
@ -3358,7 +3358,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
repo_key2 = AESOCBRepoKey(repository)
repo_key2.load(None, Passphrase.env_passphrase())
assert repo_key2.enc_key == repo_key2.enc_key
assert repo_key2.crypt_key == repo_key2.crypt_key
def test_key_export_qr(self):
export_file = self.output_path + "/exported.html"

View File

@ -157,7 +157,7 @@ class TestKey:
key2 = KeyfileKey.detect(self.MockRepository(), manifest)
assert key2.cipher.next_iv() >= iv + key2.cipher.block_count(len(manifest) - KeyfileKey.PAYLOAD_OVERHEAD)
# Key data sanity check
assert len({key2.id_key, key2.enc_key, key2.enc_hmac_key}) == 3
assert len({key2.id_key, key2.crypt_key}) == 2
assert key2.chunk_seed != 0
chunk = b"foo"
id = key.id_hash(chunk)
@ -414,7 +414,7 @@ def test_decrypt_key_file_v2_is_unsupported():
def test_key_file_roundtrip(monkeypatch):
def to_dict(key):
extract = "repository_id", "enc_key", "enc_hmac_key", "id_key", "chunk_seed"
extract = "repository_id", "crypt_key", "id_key", "chunk_seed"
return {a: getattr(key, a) for a in extract}
repository = MagicMock(id=b"repository_id")