1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2024-12-26 17:57:59 +00:00

add aad parameter to borg.crypto.low_level api

added it for all classes there, so the caller just give it.

for the legacy AES-CTR based classes, the given aad is completely ignored.
this is to stay compatible with repo data of borg < 1.3.

for the new AEAD based classes:
encrypt: the aad is fed into the auth tag computation
decrypt: same. decrypt will fail on auth tag mismatch.
This commit is contained in:
Thomas Waldmann 2022-03-21 14:30:26 +01:00
parent d3b78a6cf5
commit 8bd9477b96
3 changed files with 48 additions and 10 deletions

View file

@ -152,7 +152,7 @@ class UNENCRYPTED:
self.header_len = header_len self.header_len = header_len
self.set_iv(iv) self.set_iv(iv)
def encrypt(self, data, header=b'', iv=None): def encrypt(self, data, header=b'', iv=None, aad=None):
""" """
IMPORTANT: it is called encrypt to satisfy the crypto api naming convention, IMPORTANT: it is called encrypt to satisfy the crypto api naming convention,
but this does NOT encrypt and it does NOT compute and store a MAC either. but this does NOT encrypt and it does NOT compute and store a MAC either.
@ -162,7 +162,7 @@ class UNENCRYPTED:
assert self.iv is not None, 'iv needs to be set before encrypt is called' assert self.iv is not None, 'iv needs to be set before encrypt is called'
return header + data return header + data
def decrypt(self, envelope): def decrypt(self, envelope, aad=None):
""" """
IMPORTANT: it is called decrypt to satisfy the crypto api naming convention, IMPORTANT: it is called decrypt to satisfy the crypto api naming convention,
but this does NOT decrypt and it does NOT verify a MAC either, because data but this does NOT decrypt and it does NOT verify a MAC either, because data
@ -235,7 +235,7 @@ cdef class AES256_CTR_BASE:
""" """
raise NotImplementedError raise NotImplementedError
def encrypt(self, data, header=b'', iv=None): def encrypt(self, data, header=b'', iv=None, aad=None):
""" """
encrypt data, compute mac over aad + iv + cdata, prepend header. encrypt data, compute mac over aad + iv + cdata, prepend header.
aad_offset is the offset into the header where aad starts. aad_offset is the offset into the header where aad starts.
@ -285,7 +285,7 @@ cdef class AES256_CTR_BASE:
PyBuffer_Release(&hdata) PyBuffer_Release(&hdata)
PyBuffer_Release(&idata) PyBuffer_Release(&idata)
def decrypt(self, envelope): def decrypt(self, envelope, aad=None):
""" """
authenticate aad + iv + cdata, decrypt cdata, ignore header bytes up to aad_offset. authenticate aad + iv + cdata, decrypt cdata, ignore header bytes up to aad_offset.
""" """
@ -468,10 +468,13 @@ cdef class _AEAD_BASE:
def __dealloc__(self): def __dealloc__(self):
EVP_CIPHER_CTX_free(self.ctx) EVP_CIPHER_CTX_free(self.ctx)
def encrypt(self, data, header=b'', iv=None): def encrypt(self, data, header=b'', iv=None, aad=b''):
""" """
encrypt data, compute mac over aad + cdata, prepend header. encrypt data, compute auth tag over aad + header + cdata.
aad_offset is the offset into the header where aad starts. return header + auth tag + cdata.
aad_offset is the offset into the header where the authenticated header part starts.
aad is additional authenticated data, which won't be included in the returned data,
but only used for the auth tag computation.
""" """
if iv is not None: if iv is not None:
self.set_iv(iv) self.set_iv(iv)
@ -486,6 +489,7 @@ cdef class _AEAD_BASE:
assert hlen == self.header_len_expected assert hlen == self.header_len_expected
cdef int aoffset = self.aad_offset cdef int aoffset = self.aad_offset
cdef int alen = hlen - aoffset cdef int alen = hlen - aoffset
cdef int aadlen = len(aad)
cdef unsigned char *odata = <unsigned char *>PyMem_Malloc(hlen + self.mac_len + cdef unsigned char *odata = <unsigned char *>PyMem_Malloc(hlen + self.mac_len +
ilen + self.cipher_blk_len) ilen + self.cipher_blk_len)
if not odata: if not odata:
@ -494,6 +498,7 @@ cdef class _AEAD_BASE:
cdef int offset cdef int offset
cdef Py_buffer idata = ro_buffer(data) cdef Py_buffer idata = ro_buffer(data)
cdef Py_buffer hdata = ro_buffer(header) cdef Py_buffer hdata = ro_buffer(header)
cdef Py_buffer aadata = ro_buffer(aad)
try: try:
offset = 0 offset = 0
for i in range(hlen): for i in range(hlen):
@ -508,6 +513,9 @@ cdef class _AEAD_BASE:
rc = EVP_EncryptInit_ex(self.ctx, NULL, NULL, self.key, self.iv) rc = EVP_EncryptInit_ex(self.ctx, NULL, NULL, self.key, self.iv)
if not rc: if not rc:
raise CryptoError('EVP_EncryptInit_ex failed') raise CryptoError('EVP_EncryptInit_ex failed')
rc = EVP_EncryptUpdate(self.ctx, NULL, &olen, <const unsigned char*> aadata.buf, aadlen)
if not rc:
raise CryptoError('EVP_EncryptUpdate failed')
rc = EVP_EncryptUpdate(self.ctx, NULL, &olen, <const unsigned char*> hdata.buf+aoffset, alen) rc = EVP_EncryptUpdate(self.ctx, NULL, &olen, <const unsigned char*> hdata.buf+aoffset, alen)
if not rc: if not rc:
raise CryptoError('EVP_EncryptUpdate failed') raise CryptoError('EVP_EncryptUpdate failed')
@ -527,10 +535,12 @@ cdef class _AEAD_BASE:
PyMem_Free(odata) PyMem_Free(odata)
PyBuffer_Release(&hdata) PyBuffer_Release(&hdata)
PyBuffer_Release(&idata) PyBuffer_Release(&idata)
PyBuffer_Release(&aadata)
def decrypt(self, envelope): def decrypt(self, envelope, aad=b''):
""" """
authenticate aad + cdata, decrypt cdata, ignore header bytes up to aad_offset. authenticate aad + header + cdata (from envelope), ignore header bytes up to aad_offset.,
return decrypted cdata.
""" """
# AES-OCB, CHACHA20 ciphers all add a internal 32bit counter to the 96bit (12Byte) # AES-OCB, CHACHA20 ciphers all add a internal 32bit counter to the 96bit (12Byte)
# IV we provide, thus we must not decrypt more than 2^32 cipher blocks with same IV): # IV we provide, thus we must not decrypt more than 2^32 cipher blocks with same IV):
@ -541,12 +551,14 @@ cdef class _AEAD_BASE:
cdef int hlen = self.header_len_expected cdef int hlen = self.header_len_expected
cdef int aoffset = self.aad_offset cdef int aoffset = self.aad_offset
cdef int alen = hlen - aoffset cdef int alen = hlen - aoffset
cdef int aadlen = len(aad)
cdef unsigned char *odata = <unsigned char *>PyMem_Malloc(ilen + self.cipher_blk_len) cdef unsigned char *odata = <unsigned char *>PyMem_Malloc(ilen + self.cipher_blk_len)
if not odata: if not odata:
raise MemoryError raise MemoryError
cdef int olen cdef int olen
cdef int offset cdef int offset
cdef Py_buffer idata = ro_buffer(envelope) cdef Py_buffer idata = ro_buffer(envelope)
cdef Py_buffer aadata = ro_buffer(aad)
try: try:
if not EVP_DecryptInit_ex(self.ctx, self.cipher(), NULL, NULL, NULL): if not EVP_DecryptInit_ex(self.ctx, self.cipher(), NULL, NULL, NULL):
raise CryptoError('EVP_DecryptInit_ex failed') raise CryptoError('EVP_DecryptInit_ex failed')
@ -554,6 +566,9 @@ cdef class _AEAD_BASE:
raise CryptoError('EVP_CIPHER_CTX_ctrl SET IVLEN failed') raise CryptoError('EVP_CIPHER_CTX_ctrl SET IVLEN failed')
if not EVP_DecryptInit_ex(self.ctx, NULL, NULL, self.key, self.iv): if not EVP_DecryptInit_ex(self.ctx, NULL, NULL, self.key, self.iv):
raise CryptoError('EVP_DecryptInit_ex failed') raise CryptoError('EVP_DecryptInit_ex failed')
rc = EVP_DecryptUpdate(self.ctx, NULL, &olen, <const unsigned char*> aadata.buf, aadlen)
if not rc:
raise CryptoError('EVP_DecryptUpdate failed')
rc = EVP_DecryptUpdate(self.ctx, NULL, &olen, <const unsigned char*> idata.buf+aoffset, alen) rc = EVP_DecryptUpdate(self.ctx, NULL, &olen, <const unsigned char*> idata.buf+aoffset, alen)
if not rc: if not rc:
raise CryptoError('EVP_DecryptUpdate failed') raise CryptoError('EVP_DecryptUpdate failed')
@ -576,6 +591,7 @@ cdef class _AEAD_BASE:
finally: finally:
PyMem_Free(odata) PyMem_Free(odata)
PyBuffer_Release(&idata) PyBuffer_Release(&idata)
PyBuffer_Release(&aadata)
def block_count(self, length): def block_count(self, length):
return num_cipher_blocks(length, self.cipher_blk_len) return num_cipher_blocks(length, self.cipher_blk_len)

View file

@ -29,7 +29,7 @@
ChunkerTestCase, ChunkerTestCase,
] ]
SELFTEST_COUNT = 35 SELFTEST_COUNT = 36
class SelfTestResult(TestResult): class SelfTestResult(TestResult):

View file

@ -175,6 +175,28 @@ def test_AEAD(self):
self.assert_raises(IntegrityError, self.assert_raises(IntegrityError,
lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted)) lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted))
def test_AEAD_with_more_AAD(self):
# test giving extra aad to the .encrypt() and .decrypt() calls
key = b'X' * 32
iv_int = 0
data = b'foo' * 10
header = b'\x12\x34'
tests = []
if not is_libressl:
tests += [AES256_OCB, CHACHA20_POLY1305]
for cs_cls in tests:
# encrypt/mac
cs = cs_cls(key, iv_int, header_len=len(header), aad_offset=0)
hdr_mac_iv_cdata = cs.encrypt(data, header=header, aad=b'correct_chunkid')
# successful auth/decrypt (correct aad)
cs = cs_cls(key, iv_int, header_len=len(header), aad_offset=0)
pdata = cs.decrypt(hdr_mac_iv_cdata, aad=b'correct_chunkid')
self.assert_equal(data, pdata)
# unsuccessful auth (incorrect aad)
cs = cs_cls(key, iv_int, header_len=len(header), aad_offset=0)
self.assert_raises(IntegrityError,
lambda: cs.decrypt(hdr_mac_iv_cdata, aad=b'incorrect_chunkid'))
# These test vectors come from https://www.kullo.net/blog/hkdf-sha-512-test-vectors/ # These test vectors come from https://www.kullo.net/blog/hkdf-sha-512-test-vectors/
# who claims to have verified these against independent Python and C++ implementations. # who claims to have verified these against independent Python and C++ implementations.