mirror of https://github.com/borgbackup/borg.git
Add Key.assert_id function
This commit is contained in:
parent
a80b371d09
commit
c2c90645ad
|
@ -108,6 +108,12 @@ class KeyBase:
|
||||||
def decrypt(self, id, data, decompress=True):
|
def decrypt(self, id, data, decompress=True):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def assert_id(self, id, data):
|
||||||
|
if id:
|
||||||
|
id_computed = self.id_hash(data)
|
||||||
|
if not compare_digest(id_computed, id):
|
||||||
|
raise IntegrityError('Chunk id verification failed')
|
||||||
|
|
||||||
|
|
||||||
class PlaintextKey(KeyBase):
|
class PlaintextKey(KeyBase):
|
||||||
TYPE = 0x02
|
TYPE = 0x02
|
||||||
|
@ -137,8 +143,7 @@ class PlaintextKey(KeyBase):
|
||||||
if not decompress:
|
if not decompress:
|
||||||
return Chunk(payload)
|
return Chunk(payload)
|
||||||
data = self.compressor.decompress(payload)
|
data = self.compressor.decompress(payload)
|
||||||
if id and sha256(data).digest() != id:
|
self.assert_id(id, data)
|
||||||
raise IntegrityError('Chunk id verification failed')
|
|
||||||
return Chunk(data)
|
return Chunk(data)
|
||||||
|
|
||||||
|
|
||||||
|
@ -183,11 +188,7 @@ class AESKeyBase(KeyBase):
|
||||||
if not decompress:
|
if not decompress:
|
||||||
return Chunk(payload)
|
return Chunk(payload)
|
||||||
data = self.compressor.decompress(payload)
|
data = self.compressor.decompress(payload)
|
||||||
if id:
|
self.assert_id(id, data)
|
||||||
hmac_given = id
|
|
||||||
hmac_computed = hmac_sha256(self.id_key, data)
|
|
||||||
if not compare_digest(hmac_computed, hmac_given):
|
|
||||||
raise IntegrityError('Chunk id verification failed')
|
|
||||||
return Chunk(data)
|
return Chunk(data)
|
||||||
|
|
||||||
def extract_nonce(self, payload):
|
def extract_nonce(self, payload):
|
||||||
|
|
|
@ -169,6 +169,18 @@ class TestKey:
|
||||||
assert key.decrypt(None, encrypted, decompress=False) != plaintext
|
assert key.decrypt(None, encrypted, decompress=False) != plaintext
|
||||||
assert key.decrypt(None, encrypted) == plaintext
|
assert key.decrypt(None, encrypted) == plaintext
|
||||||
|
|
||||||
|
def test_assert_id(self, key):
|
||||||
|
plaintext = b'123456789'
|
||||||
|
id = key.id_hash(plaintext)
|
||||||
|
key.assert_id(id, plaintext)
|
||||||
|
id_changed = bytearray(id)
|
||||||
|
id_changed[0] += 1
|
||||||
|
with pytest.raises(IntegrityError):
|
||||||
|
key.assert_id(id_changed, plaintext)
|
||||||
|
plaintext_changed = plaintext + b'1'
|
||||||
|
with pytest.raises(IntegrityError):
|
||||||
|
key.assert_id(id, plaintext_changed)
|
||||||
|
|
||||||
|
|
||||||
class TestPassphrase:
|
class TestPassphrase:
|
||||||
def test_passphrase_new_verification(self, capsys, monkeypatch):
|
def test_passphrase_new_verification(self, capsys, monkeypatch):
|
||||||
|
|
Loading…
Reference in New Issue