dedup code: assert_type

This commit is contained in:
Thomas Waldmann 2022-03-11 22:01:20 +01:00
parent b3e7e90c29
commit 0b3b78e139
1 changed files with 11 additions and 13 deletions

View File

@ -169,6 +169,11 @@ class KeyBase:
if not hmac.compare_digest(id_computed, id):
raise IntegrityError('Chunk %s: id verification failed' % bin_to_hex(id))
def assert_type(self, type_byte, id=None):
if type_byte not in self.TYPES_ACCEPTABLE:
id_str = bin_to_hex(id) if id is not None else '(unknown)'
raise IntegrityError(f'Chunk {id_str}: Invalid encryption envelope')
def _tam_key(self, salt, context):
return hkdf_hmac_sha512(
ikm=self.id_key + self.enc_key + self.enc_hmac_key,
@ -263,9 +268,7 @@ class PlaintextKey(KeyBase):
return b''.join([self.TYPE_STR, data])
def decrypt(self, id, data, decompress=True):
if data[0] not in self.TYPES_ACCEPTABLE:
id_str = bin_to_hex(id) if id is not None else '(unknown)'
raise IntegrityError('Chunk %s: Invalid encryption envelope' % id_str)
self.assert_type(data[0], id)
payload = memoryview(data)[1:]
if not decompress:
return payload
@ -343,9 +346,7 @@ class AESKeyBase(KeyBase):
return self.cipher.encrypt(data, header=self.TYPE_STR, iv=next_iv)
def decrypt(self, id, data, decompress=True):
if data[0] not in self.TYPES_ACCEPTABLE:
id_str = bin_to_hex(id) if id is not None else '(unknown)'
raise IntegrityError('Chunk %s: Invalid encryption envelope' % id_str)
self.assert_type(data[0], id)
try:
payload = self.cipher.decrypt(data)
except IntegrityError as e:
@ -371,8 +372,7 @@ class AESKeyBase(KeyBase):
if manifest_data is None:
nonce = 0
else:
if manifest_data[0] not in self.TYPES_ACCEPTABLE:
raise IntegrityError('Manifest: Invalid encryption envelope')
self.assert_type(manifest_data[0])
# manifest_blocks is a safe upper bound on the amount of cipher blocks needed
# to encrypt the manifest. depending on the ciphersuite and overhead, it might
# be a bit too high, but that does not matter.
@ -673,17 +673,15 @@ class AuthenticatedKeyBase(FlexiKey):
self.logically_encrypted = False
def init_ciphers(self, manifest_data=None):
if manifest_data is not None and manifest_data[0] not in self.TYPES_ACCEPTABLE:
raise IntegrityError('Manifest: Invalid encryption envelope')
if manifest_data is not None:
self.assert_type(manifest_data[0])
def encrypt(self, chunk):
data = self.compressor.compress(chunk)
return b''.join([self.TYPE_STR, data])
def decrypt(self, id, data, decompress=True):
if data[0] not in self.TYPES_ACCEPTABLE:
id_str = bin_to_hex(id) if id is not None else '(unknown)'
raise IntegrityError('Chunk %s: Invalid envelope' % id_str)
self.assert_type(data[0], id)
payload = memoryview(data)[1:]
if not decompress:
return payload