From 0b3b78e139796286e0180ae3e48883cb31f8bbbb Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 11 Mar 2022 22:01:20 +0100 Subject: [PATCH] dedup code: assert_type --- src/borg/crypto/key.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/src/borg/crypto/key.py b/src/borg/crypto/key.py index 938c9d106..374799571 100644 --- a/src/borg/crypto/key.py +++ b/src/borg/crypto/key.py @@ -169,6 +169,11 @@ def assert_id(self, id, data): if not hmac.compare_digest(id_computed, id): raise IntegrityError('Chunk %s: id verification failed' % bin_to_hex(id)) + def assert_type(self, type_byte, id=None): + if type_byte not in self.TYPES_ACCEPTABLE: + id_str = bin_to_hex(id) if id is not None else '(unknown)' + raise IntegrityError(f'Chunk {id_str}: Invalid encryption envelope') + def _tam_key(self, salt, context): return hkdf_hmac_sha512( ikm=self.id_key + self.enc_key + self.enc_hmac_key, @@ -263,9 +268,7 @@ def encrypt(self, chunk): return b''.join([self.TYPE_STR, data]) def decrypt(self, id, data, decompress=True): - if data[0] not in self.TYPES_ACCEPTABLE: - id_str = bin_to_hex(id) if id is not None else '(unknown)' - raise IntegrityError('Chunk %s: Invalid encryption envelope' % id_str) + self.assert_type(data[0], id) payload = memoryview(data)[1:] if not decompress: return payload @@ -343,9 +346,7 @@ def encrypt(self, chunk): return self.cipher.encrypt(data, header=self.TYPE_STR, iv=next_iv) def decrypt(self, id, data, decompress=True): - if data[0] not in self.TYPES_ACCEPTABLE: - id_str = bin_to_hex(id) if id is not None else '(unknown)' - raise IntegrityError('Chunk %s: Invalid encryption envelope' % id_str) + self.assert_type(data[0], id) try: payload = self.cipher.decrypt(data) except IntegrityError as e: @@ -371,8 +372,7 @@ def init_ciphers(self, manifest_data=None): if manifest_data is None: nonce = 0 else: - if manifest_data[0] not in self.TYPES_ACCEPTABLE: - raise IntegrityError('Manifest: Invalid encryption envelope') + self.assert_type(manifest_data[0]) # manifest_blocks is a safe upper bound on the amount of cipher blocks needed # to encrypt the manifest. depending on the ciphersuite and overhead, it might # be a bit too high, but that does not matter. @@ -673,17 +673,15 @@ def save(self, target, passphrase, create=False): self.logically_encrypted = False def init_ciphers(self, manifest_data=None): - if manifest_data is not None and manifest_data[0] not in self.TYPES_ACCEPTABLE: - raise IntegrityError('Manifest: Invalid encryption envelope') + if manifest_data is not None: + self.assert_type(manifest_data[0]) def encrypt(self, chunk): data = self.compressor.compress(chunk) return b''.join([self.TYPE_STR, data]) def decrypt(self, id, data, decompress=True): - if data[0] not in self.TYPES_ACCEPTABLE: - id_str = bin_to_hex(id) if id is not None else '(unknown)' - raise IntegrityError('Chunk %s: Invalid envelope' % id_str) + self.assert_type(data[0], id) payload = memoryview(data)[1:] if not decompress: return payload