diff --git a/src/borg/crypto/_crypto_helpers.c b/src/borg/crypto/_crypto_helpers.c index f3d766536..18fbe5220 100644 --- a/src/borg/crypto/_crypto_helpers.c +++ b/src/borg/crypto/_crypto_helpers.c @@ -23,14 +23,3 @@ void HMAC_CTX_free(HMAC_CTX *ctx) } } #endif - - -#if OPENSSL_VERSION_NUMBER < 0x10100000L || defined(LIBRESSL_VERSION_NUMBER) -const EVP_CIPHER *EVP_aes_256_ocb(void){ /* dummy, so that code compiles */ - return NULL; -} - -const EVP_CIPHER *EVP_chacha20_poly1305(void){ /* dummy, so that code compiles */ - return NULL; -} -#endif diff --git a/src/borg/crypto/_crypto_helpers.h b/src/borg/crypto/_crypto_helpers.h index 2d6b50dee..0a3021af6 100644 --- a/src/borg/crypto/_crypto_helpers.h +++ b/src/borg/crypto/_crypto_helpers.h @@ -10,12 +10,6 @@ void HMAC_CTX_free(HMAC_CTX *ctx); #endif -#if OPENSSL_VERSION_NUMBER < 0x10100000L || defined(LIBRESSL_VERSION_NUMBER) -const EVP_CIPHER *EVP_aes_256_ocb(void); /* dummy, so that code compiles */ -const EVP_CIPHER *EVP_chacha20_poly1305(void); /* dummy, so that code compiles */ -#endif - - #if !defined(LIBRESSL_VERSION_NUMBER) #define LIBRESSL_VERSION_NUMBER 0 #endif diff --git a/src/borg/crypto/low_level.pyx b/src/borg/crypto/low_level.pyx index c38ff2755..c40df8a2d 100644 --- a/src/borg/crypto/low_level.pyx +++ b/src/borg/crypto/low_level.pyx @@ -59,8 +59,6 @@ cdef extern from "openssl/evp.h": pass const EVP_CIPHER *EVP_aes_256_ctr() - const EVP_CIPHER *EVP_aes_256_ocb() - const EVP_CIPHER *EVP_chacha20_poly1305() void EVP_CIPHER_CTX_init(EVP_CIPHER_CTX *a) void EVP_CIPHER_CTX_cleanup(EVP_CIPHER_CTX *a) @@ -77,9 +75,6 @@ cdef extern from "openssl/evp.h": int EVP_DecryptFinal_ex(EVP_CIPHER_CTX *ctx, unsigned char *out, int *outl) int EVP_CIPHER_CTX_ctrl(EVP_CIPHER_CTX *ctx, int type, int arg, void *ptr) - int EVP_CTRL_GCM_GET_TAG - int EVP_CTRL_GCM_SET_TAG - int EVP_CTRL_GCM_SET_IVLEN const EVP_MD *EVP_sha256() nogil @@ -112,9 +107,6 @@ cdef extern from "_crypto_helpers.h": HMAC_CTX *HMAC_CTX_new() void HMAC_CTX_free(HMAC_CTX *a) - const EVP_CIPHER *EVP_aes_256_ocb() # dummy - const EVP_CIPHER *EVP_chacha20_poly1305() # dummy - openssl10 = OPENSSL_VERSION_NUMBER < 0x10100000 or LIBRESSL_VERSION_NUMBER @@ -132,7 +124,7 @@ long_to_bytes = lambda x: _long.pack(x) def num_cipher_blocks(length, blocksize=16): """Return the number of cipher blocks required to encrypt/decrypt bytes of data. - For a precise computation, must be the used cipher's block size (AES: 16, CHACHA20: 64). + For a precise computation, must be the used cipher's block size (AES: 16). For a safe-upper-boundary computation, must be the MINIMUM of the block sizes (in bytes) of ALL supported ciphers. This can be used to adjust a counter if the used cipher is not @@ -444,234 +436,6 @@ cdef class AES256_CTR_BLAKE2b(AES256_CTR_BASE): ctypedef const EVP_CIPHER * (* CIPHER)() -cdef class _AEAD_BASE: - # Layout: HEADER + MAC 16 + IV 12 + CT - - cdef CIPHER cipher - cdef EVP_CIPHER_CTX *ctx - cdef unsigned char *enc_key - cdef int cipher_blk_len - cdef int iv_len - cdef int aad_offset - cdef int header_len - cdef int mac_len - cdef unsigned char iv[12] - cdef long long blocks - - @classmethod - def requirements_check(cls): - """check whether library requirements for this ciphersuite are satisfied""" - raise NotImplemented # override / implement in child class - - def __init__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1): - assert mac_key is None - assert isinstance(enc_key, bytes) and len(enc_key) == 32 - self.iv_len = sizeof(self.iv) - self.header_len = 1 - assert aad_offset <= header_len - self.aad_offset = aad_offset - self.header_len = header_len - self.mac_len = 16 - self.enc_key = enc_key - if iv is not None: - self.set_iv(iv) - else: - self.blocks = -1 # make sure set_iv is called before encrypt - - def __cinit__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1): - self.ctx = EVP_CIPHER_CTX_new() - - def __dealloc__(self): - EVP_CIPHER_CTX_free(self.ctx) - - def encrypt(self, data, header=b'', iv=None): - """ - encrypt data, compute mac over aad + iv + cdata, prepend header. - aad_offset is the offset into the header where aad starts. - """ - if iv is not None: - self.set_iv(iv) - assert self.blocks == 0, 'iv needs to be set before encrypt is called' - # AES-GCM, AES-OCB, CHACHA20 ciphers all add a internal 32bit counter to the 96bit (12Byte) - # IV we provide, thus we must not encrypt more than 2^32 cipher blocks with same IV). - block_count = self.block_count(len(data)) - if block_count > 2**32: - raise ValueError('too much data, would overflow internal 32bit counter') - cdef int ilen = len(data) - cdef int hlen = len(header) - assert hlen == self.header_len - cdef int aoffset = self.aad_offset - cdef int alen = hlen - aoffset - cdef unsigned char *odata = PyMem_Malloc(hlen + self.mac_len + self.iv_len + - ilen + self.cipher_blk_len) - if not odata: - raise MemoryError - cdef int olen - cdef int offset - cdef Py_buffer idata = ro_buffer(data) - cdef Py_buffer hdata = ro_buffer(header) - try: - offset = 0 - for i in range(hlen): - odata[offset+i] = header[i] - offset += hlen - offset += self.mac_len - self.store_iv(odata+offset, self.iv) - rc = EVP_EncryptInit_ex(self.ctx, self.cipher(), NULL, NULL, NULL) - if not rc: - raise CryptoError('EVP_EncryptInit_ex failed') - if not EVP_CIPHER_CTX_ctrl(self.ctx, EVP_CTRL_GCM_SET_IVLEN, self.iv_len, NULL): - raise CryptoError('EVP_CIPHER_CTX_ctrl SET IVLEN failed') - rc = EVP_EncryptInit_ex(self.ctx, NULL, NULL, self.enc_key, self.iv) - if not rc: - raise CryptoError('EVP_EncryptInit_ex failed') - rc = EVP_EncryptUpdate(self.ctx, NULL, &olen, hdata.buf+aoffset, alen) - if not rc: - raise CryptoError('EVP_EncryptUpdate failed') - if not EVP_EncryptUpdate(self.ctx, NULL, &olen, odata+offset, self.iv_len): - raise CryptoError('EVP_EncryptUpdate failed') - offset += self.iv_len - rc = EVP_EncryptUpdate(self.ctx, odata+offset, &olen, idata.buf, ilen) - if not rc: - raise CryptoError('EVP_EncryptUpdate failed') - offset += olen - rc = EVP_EncryptFinal_ex(self.ctx, odata+offset, &olen) - if not rc: - raise CryptoError('EVP_EncryptFinal_ex failed') - offset += olen - if not EVP_CIPHER_CTX_ctrl(self.ctx, EVP_CTRL_GCM_GET_TAG, self.mac_len, odata+hlen): - raise CryptoError('EVP_CIPHER_CTX_ctrl GET TAG failed') - self.blocks = block_count - return odata[:offset] - finally: - PyMem_Free(odata) - PyBuffer_Release(&hdata) - PyBuffer_Release(&idata) - - def decrypt(self, envelope): - """ - authenticate aad + iv + cdata, decrypt cdata, ignore header bytes up to aad_offset. - """ - # AES-GCM, AES-OCB, CHACHA20 ciphers all add a internal 32bit counter to the 96bit (12Byte) - # IV we provide, thus we must not decrypt more than 2^32 cipher blocks with same IV): - approx_block_count = self.block_count(len(envelope)) # sloppy, but good enough for borg - if approx_block_count > 2**32: - raise ValueError('too much data, would overflow internal 32bit counter') - cdef int ilen = len(envelope) - cdef int hlen = self.header_len - assert hlen == self.header_len - cdef int aoffset = self.aad_offset - cdef int alen = hlen - aoffset - cdef unsigned char *odata = PyMem_Malloc(ilen + self.cipher_blk_len) - if not odata: - raise MemoryError - cdef int olen - cdef int offset - cdef Py_buffer idata = ro_buffer(envelope) - try: - if not EVP_DecryptInit_ex(self.ctx, self.cipher(), NULL, NULL, NULL): - raise CryptoError('EVP_DecryptInit_ex failed') - iv = self.fetch_iv( idata.buf+hlen+self.mac_len) - self.set_iv(iv) - if not EVP_CIPHER_CTX_ctrl(self.ctx, EVP_CTRL_GCM_SET_IVLEN, self.iv_len, NULL): - raise CryptoError('EVP_CIPHER_CTX_ctrl SET IVLEN failed') - if not EVP_DecryptInit_ex(self.ctx, NULL, NULL, self.enc_key, iv): - raise CryptoError('EVP_DecryptInit_ex failed') - if not EVP_CIPHER_CTX_ctrl(self.ctx, EVP_CTRL_GCM_SET_TAG, self.mac_len, idata.buf+hlen): - raise CryptoError('EVP_CIPHER_CTX_ctrl SET TAG failed') - rc = EVP_DecryptUpdate(self.ctx, NULL, &olen, idata.buf+aoffset, alen) - if not rc: - raise CryptoError('EVP_DecryptUpdate failed') - if not EVP_DecryptUpdate(self.ctx, NULL, &olen, - idata.buf+hlen+self.mac_len, self.iv_len): - raise CryptoError('EVP_DecryptUpdate failed') - offset = 0 - rc = EVP_DecryptUpdate(self.ctx, odata+offset, &olen, - idata.buf+hlen+self.mac_len+self.iv_len, - ilen-hlen-self.mac_len-self.iv_len) - if not rc: - raise CryptoError('EVP_DecryptUpdate failed') - offset += olen - rc = EVP_DecryptFinal_ex(self.ctx, odata+offset, &olen) - if rc <= 0: - # a failure here means corrupted or tampered tag (mac) or data. - raise IntegrityError('Authentication / EVP_DecryptFinal_ex failed') - offset += olen - self.blocks = self.block_count(offset) - return odata[:offset] - finally: - PyMem_Free(odata) - PyBuffer_Release(&idata) - - def block_count(self, length): - return num_cipher_blocks(length, self.cipher_blk_len) - - def set_iv(self, iv): - # set_iv needs to be called before each encrypt() call, - # because encrypt does a full initialisation of the cipher context. - if isinstance(iv, int): - iv = iv.to_bytes(self.iv_len, byteorder='big') - assert isinstance(iv, bytes) and len(iv) == self.iv_len - for i in range(self.iv_len): - self.iv[i] = iv[i] - self.blocks = 0 # number of cipher blocks encrypted with this IV - - def next_iv(self): - # call this after encrypt() to get the next iv (int) for the next encrypt() call - # AES-GCM, AES-OCB, CHACHA20 ciphers all add a internal 32bit counter to the 96bit - # (12 byte) IV we provide, thus we only need to increment the IV by 1. - iv = int.from_bytes(self.iv[:self.iv_len], byteorder='big') - return iv + 1 - - cdef fetch_iv(self, unsigned char * iv_in): - return iv_in[0:self.iv_len] - - cdef store_iv(self, unsigned char * iv_out, unsigned char * iv): - cdef int i - for i in range(self.iv_len): - iv_out[i] = iv[i] - - def extract_iv(self, envelope): - offset = self.header_len + self.mac_len - return bytes_to_long(envelope[offset:offset+self.iv_len]) - - -cdef class _AES_BASE(_AEAD_BASE): - def __init__(self, *args, **kwargs): - self.cipher_blk_len = 16 - super().__init__(*args, **kwargs) - - -cdef class _CHACHA_BASE(_AEAD_BASE): - def __init__(self, *args, **kwargs): - self.cipher_blk_len = 64 - super().__init__(*args, **kwargs) - - -cdef class AES256_OCB(_AES_BASE): - @classmethod - def requirements_check(cls): - if openssl10: - raise ValueError('AES OCB requires OpenSSL >= 1.1.0. Detected: OpenSSL %08x' % OPENSSL_VERSION_NUMBER) - - def __init__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1): - self.requirements_check() - self.cipher = EVP_aes_256_ocb - super().__init__(mac_key, enc_key, iv=iv, header_len=header_len, aad_offset=aad_offset) - - -cdef class CHACHA20_POLY1305(_CHACHA_BASE): - @classmethod - def requirements_check(cls): - if openssl10: - raise ValueError('CHACHA20-POLY1305 requires OpenSSL >= 1.1.0. Detected: OpenSSL %08x' % OPENSSL_VERSION_NUMBER) - - def __init__(self, mac_key, enc_key, iv=None, header_len=1, aad_offset=1): - self.requirements_check() - self.cipher = EVP_chacha20_poly1305 - super().__init__(mac_key, enc_key, iv=iv, header_len=header_len, aad_offset=aad_offset) - - cdef class AES: """A thin wrapper around the OpenSSL EVP cipher API - for legacy code, like key file encryption""" cdef CIPHER cipher diff --git a/src/borg/selftest.py b/src/borg/selftest.py index dfe7abb54..1768ef2ed 100644 --- a/src/borg/selftest.py +++ b/src/borg/selftest.py @@ -29,7 +29,7 @@ SELFTEST_CASES = [ ChunkerTestCase, ] -SELFTEST_COUNT = 35 +SELFTEST_COUNT = 33 class SelfTestResult(TestResult): diff --git a/src/borg/testsuite/crypto.py b/src/borg/testsuite/crypto.py index 919199a2f..16be9f7e1 100644 --- a/src/borg/testsuite/crypto.py +++ b/src/borg/testsuite/crypto.py @@ -1,7 +1,6 @@ from binascii import hexlify -from ..crypto.low_level import AES256_CTR_HMAC_SHA256, AES256_OCB, CHACHA20_POLY1305, UNENCRYPTED, \ - IntegrityError, openssl10 +from ..crypto.low_level import AES256_CTR_HMAC_SHA256, UNENCRYPTED, IntegrityError from ..crypto.low_level import bytes_to_long, bytes_to_int, long_to_bytes from ..crypto.low_level import hkdf_hmac_sha512 @@ -89,94 +88,6 @@ class CryptoTestCase(BaseTestCase): self.assert_raises(IntegrityError, lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted)) - def test_AE(self): - # used in legacy-like layout (1 type byte, no aad) - mac_key = None - enc_key = b'X' * 32 - iv = 0 - data = b'foo' * 10 - header = b'\x23' - tests = [ - # (ciphersuite class, exp_mac, exp_cdata) - ] - if not openssl10: - tests += [ - (AES256_OCB, - b'b6909c23c9aaebd9abbe1ff42097652d', - b'877ce46d2f62dee54699cebc3ba41d9ab613f7c486778c1b3636664b1493', ), - (CHACHA20_POLY1305, - b'fd08594796e0706cde1e8b461e3e0555', - b'a093e4b0387526f085d3c40cca84a35230a5c0dd766453b77ba38bcff775', ) - ] - for cs_cls, exp_mac, exp_cdata in tests: - # print(repr(cs_cls)) - # encrypt/mac - cs = cs_cls(mac_key, enc_key, iv, header_len=1, aad_offset=1) - hdr_mac_iv_cdata = cs.encrypt(data, header=header) - hdr = hdr_mac_iv_cdata[0:1] - mac = hdr_mac_iv_cdata[1:17] - iv = hdr_mac_iv_cdata[17:29] - cdata = hdr_mac_iv_cdata[29:] - self.assert_equal(hexlify(hdr), b'23') - self.assert_equal(hexlify(mac), exp_mac) - self.assert_equal(hexlify(iv), b'000000000000000000000000') - self.assert_equal(hexlify(cdata), exp_cdata) - self.assert_equal(cs.next_iv(), 1) - # auth/decrypt - cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1) - pdata = cs.decrypt(hdr_mac_iv_cdata) - self.assert_equal(data, pdata) - self.assert_equal(cs.next_iv(), 1) - # auth-failure due to corruption (corrupted data) - cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1) - hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:29] + b'\0' + hdr_mac_iv_cdata[30:] - self.assert_raises(IntegrityError, - lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted)) - - def test_AEAD(self): - # test with aad - mac_key = None - enc_key = b'X' * 32 - iv = 0 - data = b'foo' * 10 - header = b'\x12\x34\x56' - tests = [ - # (ciphersuite class, exp_mac, exp_cdata) - ] - if not openssl10: - tests += [ - (AES256_OCB, - b'f2748c412af1c7ead81863a18c2c1893', - b'877ce46d2f62dee54699cebc3ba41d9ab613f7c486778c1b3636664b1493', ), - (CHACHA20_POLY1305, - b'b7e7c9a79f2404e14f9aad156bf091dd', - b'a093e4b0387526f085d3c40cca84a35230a5c0dd766453b77ba38bcff775', ) - ] - for cs_cls, exp_mac, exp_cdata in tests: - # print(repr(cs_cls)) - # encrypt/mac - cs = cs_cls(mac_key, enc_key, iv, header_len=3, aad_offset=1) - hdr_mac_iv_cdata = cs.encrypt(data, header=header) - hdr = hdr_mac_iv_cdata[0:3] - mac = hdr_mac_iv_cdata[3:19] - iv = hdr_mac_iv_cdata[19:31] - cdata = hdr_mac_iv_cdata[31:] - self.assert_equal(hexlify(hdr), b'123456') - self.assert_equal(hexlify(mac), exp_mac) - self.assert_equal(hexlify(iv), b'000000000000000000000000') - self.assert_equal(hexlify(cdata), exp_cdata) - self.assert_equal(cs.next_iv(), 1) - # auth/decrypt - cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1) - pdata = cs.decrypt(hdr_mac_iv_cdata) - self.assert_equal(data, pdata) - self.assert_equal(cs.next_iv(), 1) - # auth-failure due to corruption (corrupted aad) - cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1) - hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:1] + b'\0' + hdr_mac_iv_cdata[2:] - self.assert_raises(IntegrityError, - lambda: cs.decrypt(hdr_mac_iv_cdata_corrupted)) - # These test vectors come from https://www.kullo.net/blog/hkdf-sha-512-test-vectors/ # who claims to have verified these against independent Python and C++ implementations.