From 8f1678e2ba90f9a5d1356b0744d708367dd85edd Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Thu, 8 Sep 2016 05:13:23 +0200 Subject: [PATCH] set_iv / next iv with integers --- src/borg/crypto/key.py | 9 ++++--- src/borg/crypto/low_level.pyx | 40 +++++++++++++------------------ src/borg/testsuite/crypto.py | 45 +++++++++++++---------------------- src/borg/testsuite/key.py | 8 +++---- 4 files changed, 40 insertions(+), 62 deletions(-) diff --git a/src/borg/crypto/key.py b/src/borg/crypto/key.py index 9c608882c..5bef44a6c 100644 --- a/src/borg/crypto/key.py +++ b/src/borg/crypto/key.py @@ -358,10 +358,9 @@ class AESKeyBase(KeyBase): def encrypt(self, chunk): data = self.compressor.compress(chunk) - next_nonce = int.from_bytes(self.cipher.next_iv(), byteorder='big') - next_nonce = self.nonce_manager.ensure_reservation(next_nonce, self.cipher.block_count(len(data))) - iv = next_nonce.to_bytes(self.cipher.iv_len, byteorder='big') - return self.cipher.encrypt(data, header=self.TYPE_STR, iv=iv) + next_iv = self.nonce_manager.ensure_reservation(self.cipher.next_iv(), + self.cipher.block_count(len(data))) + return self.cipher.encrypt(data, header=self.TYPE_STR, iv=next_iv) def decrypt(self, id, data, decompress=True): if not (data[0] == self.TYPE or @@ -402,7 +401,7 @@ class AESKeyBase(KeyBase): # be a bit too high, but that does not matter. manifest_blocks = num_cipher_blocks(len(manifest_data)) nonce = self.cipher.extract_iv(manifest_data) + manifest_blocks - self.cipher.set_iv(nonce.to_bytes(16, byteorder='big')) + self.cipher.set_iv(nonce) self.nonce_manager = NonceManager(self.repository, nonce) diff --git a/src/borg/crypto/low_level.pyx b/src/borg/crypto/low_level.pyx index b2bf81107..019051f59 100644 --- a/src/borg/crypto/low_level.pyx +++ b/src/borg/crypto/low_level.pyx @@ -153,21 +153,6 @@ def int_to_bytes16(i): return _2long.pack(h, l) -def increment_iv(iv, amount=1): - """ - Increment the IV by the given amount (default 1). - - :param iv: input IV, 16 bytes (128 bit) - :param amount: increment value - :return: input_IV + amount, 16 bytes (128 bit) - """ - assert len(iv) == 16 - iv = bytes16_to_int(iv) - iv += amount - iv = int_to_bytes16(iv) - return iv - - def num_cipher_blocks(length, blocksize=16): """Return the number of cipher blocks required to encrypt/decrypt bytes of data. @@ -393,14 +378,17 @@ cdef class AES256_CTR_HMAC_SHA256: def set_iv(self, iv): # set_iv needs to be called before each encrypt() call + if isinstance(iv, int): + iv = iv.to_bytes(self.iv_len, byteorder='big') assert isinstance(iv, bytes) and len(iv) == self.iv_len self.blocks = 0 # how many AES blocks got encrypted with this IV? for i in range(self.iv_len): self.iv[i] = iv[i] def next_iv(self): - # call this after encrypt() to get the next iv for the next encrypt() call - return increment_iv(self.iv[:self.iv_len], self.blocks) + # call this after encrypt() to get the next iv (int) for the next encrypt() call + iv = int.from_bytes(self.iv[:self.iv_len], byteorder='big') + return iv + self.blocks cdef fetch_iv(self, unsigned char * iv_in): # fetch lower self.iv_len_short bytes of iv and add upper zero bytes @@ -575,21 +563,21 @@ cdef class _AEAD_BASE: def set_iv(self, iv): # set_iv needs to be called before each encrypt() call, # because encrypt does a full initialisation of the cipher context. + if isinstance(iv, int): + iv = iv.to_bytes(self.iv_len, byteorder='big') assert isinstance(iv, bytes) and len(iv) == self.iv_len self.blocks = 0 # number of cipher blocks encrypted with this IV for i in range(self.iv_len): self.iv[i] = iv[i] def next_iv(self): - # call this after encrypt() to get the next iv for the next encrypt() call + # call this after encrypt() to get the next iv (int) for the next encrypt() call # AES-GCM, AES-OCB, CHACHA20 ciphers all add a internal 32bit counter to the 96bit # (12 byte) IV we provide, thus we only need to increment the IV by 1 (and we must # not encrypt more than 2^32 cipher blocks with same IV): assert self.blocks < 2**32 - # we need 16 bytes for increment_iv: - last_iv = b'\0' * (16 - self.iv_len) + self.iv[:self.iv_len] - next_iv = increment_iv(last_iv, 1) - return next_iv[-self.iv_len:] + iv = int.from_bytes(self.iv[:self.iv_len], byteorder='big') + return iv + 1 cdef fetch_iv(self, unsigned char * iv_in): return iv_in[0:self.iv_len] @@ -741,14 +729,18 @@ cdef class AES: def set_iv(self, iv): # set_iv needs to be called before each encrypt() call, # because encrypt does a full initialisation of the cipher context. + if isinstance(iv, int): + iv = iv.to_bytes(self.iv_len, byteorder='big') assert isinstance(iv, bytes) and len(iv) == self.iv_len self.blocks = 0 # number of cipher blocks encrypted with this IV for i in range(self.iv_len): self.iv[i] = iv[i] def next_iv(self): - # call this after encrypt() to get the next iv for the next encrypt() call - return increment_iv(self.iv[:self.iv_len], self.blocks) + # call this after encrypt() to get the next iv (int) for the next encrypt() call + iv = int.from_bytes(self.iv[:self.iv_len], byteorder='big') + return iv + self.blocks + def hmac_sha256(key, data): diff --git a/src/borg/testsuite/crypto.py b/src/borg/testsuite/crypto.py index a69f938dc..a60ab2fb3 100644 --- a/src/borg/testsuite/crypto.py +++ b/src/borg/testsuite/crypto.py @@ -1,8 +1,8 @@ from binascii import hexlify, unhexlify from ..crypto.low_level import AES256_CTR_HMAC_SHA256, AES256_GCM, AES256_OCB, CHACHA20_POLY1305, UNENCRYPTED, \ - IntegrityError, hmac_sha256, blake2b_256, openssl10 -from ..crypto.low_level import bytes_to_long, bytes_to_int, long_to_bytes, bytes16_to_int, int_to_bytes16, increment_iv + IntegrityError, blake2b_256, hmac_sha256, openssl10 +from ..crypto.low_level import bytes_to_long, bytes_to_int, long_to_bytes, bytes16_to_int, int_to_bytes16 from ..crypto.low_level import hkdf_hmac_sha512 from . import BaseTestCase @@ -26,21 +26,6 @@ class CryptoTestCase(BaseTestCase): self.assert_equal(bytes16_to_int(b'\0\0\0\0\0\0\0\1\0\0\0\0\0\0\0\0'), 2 ** 64) self.assert_equal(int_to_bytes16(2 ** 64), b'\0\0\0\0\0\0\0\1\0\0\0\0\0\0\0\0') - def test_increment_iv(self): - iv0 = b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0' - iv1 = b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\1' - iv2 = b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\2' - self.assert_equal(increment_iv(iv0, 0), iv0) - self.assert_equal(increment_iv(iv0, 1), iv1) - self.assert_equal(increment_iv(iv0, 2), iv2) - iva = b'\0\0\0\0\0\0\0\0\xff\xff\xff\xff\xff\xff\xff\xff' - ivb = b'\0\0\0\0\0\0\0\1\x00\x00\x00\x00\x00\x00\x00\x00' - ivc = b'\0\0\0\0\0\0\0\1\x00\x00\x00\x00\x00\x00\x00\x01' - self.assert_equal(increment_iv(iva, 0), iva) - self.assert_equal(increment_iv(iva, 1), ivb) - self.assert_equal(increment_iv(iva, 2), ivc) - self.assert_equal(increment_iv(iv0, 2**64), ivb) - def test_UNENCRYPTED(self): iv = b'' # any IV is ok, it just must be set and not None data = b'data' @@ -55,7 +40,7 @@ class CryptoTestCase(BaseTestCase): # this tests the layout as in attic / borg < 1.2 (1 type byte, no aad) mac_key = b'Y' * 32 enc_key = b'X' * 32 - iv = b'\0' * 16 + iv = 0 data = b'foo' * 10 header = b'\x42' # encrypt-then-mac @@ -69,12 +54,12 @@ class CryptoTestCase(BaseTestCase): self.assert_equal(hexlify(mac), b'af90b488b0cc4a8f768fe2d6814fa65aec66b148135e54f7d4d29a27f22f57a8') self.assert_equal(hexlify(iv), b'0000000000000000') self.assert_equal(hexlify(cdata), b'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466') - self.assert_equal(hexlify(cs.next_iv()), b'00000000000000000000000000000002') + self.assert_equal(cs.next_iv(), 2) # auth-then-decrypt cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1) pdata = cs.decrypt(hdr_mac_iv_cdata) self.assert_equal(data, pdata) - self.assert_equal(hexlify(cs.next_iv()), b'00000000000000000000000000000002') + self.assert_equal(cs.next_iv(), 2) # auth-failure due to corruption (corrupted data) cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1) hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:41] + b'\0' + hdr_mac_iv_cdata[42:] @@ -84,7 +69,7 @@ class CryptoTestCase(BaseTestCase): def test_AES256_CTR_HMAC_SHA256_aad(self): mac_key = b'Y' * 32 enc_key = b'X' * 32 - iv = b'\0' * 16 + iv = 0 data = b'foo' * 10 header = b'\x12\x34\x56' # encrypt-then-mac @@ -98,12 +83,12 @@ class CryptoTestCase(BaseTestCase): self.assert_equal(hexlify(mac), b'7659a915d9927072ef130258052351a17ef882692893c3850dd798c03d2dd138') self.assert_equal(hexlify(iv), b'0000000000000000') self.assert_equal(hexlify(cdata), b'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466') - self.assert_equal(hexlify(cs.next_iv()), b'00000000000000000000000000000002') + self.assert_equal(cs.next_iv(), 2) # auth-then-decrypt cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1) pdata = cs.decrypt(hdr_mac_iv_cdata) self.assert_equal(data, pdata) - self.assert_equal(hexlify(cs.next_iv()), b'00000000000000000000000000000002') + self.assert_equal(cs.next_iv(), 2) # auth-failure due to corruption (corrupted aad) cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1) hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:1] + b'\0' + hdr_mac_iv_cdata[2:] @@ -114,7 +99,7 @@ class CryptoTestCase(BaseTestCase): # used in legacy-like layout (1 type byte, no aad) mac_key = None enc_key = b'X' * 32 - iv = b'\0' * 12 + iv = 0 data = b'foo' * 10 header = b'\x23' tests = [ @@ -133,6 +118,7 @@ class CryptoTestCase(BaseTestCase): b'a093e4b0387526f085d3c40cca84a35230a5c0dd766453b77ba38bcff775', ) ] for cs_cls, exp_mac, exp_cdata in tests: + # print(repr(cs_cls)) # encrypt/mac cs = cs_cls(mac_key, enc_key, iv, header_len=1, aad_offset=1) hdr_mac_iv_cdata = cs.encrypt(data, header=header) @@ -144,12 +130,12 @@ class CryptoTestCase(BaseTestCase): self.assert_equal(hexlify(mac), exp_mac) self.assert_equal(hexlify(iv), b'000000000000000000000000') self.assert_equal(hexlify(cdata), exp_cdata) - self.assert_equal(hexlify(cs.next_iv()), b'000000000000000000000001') + self.assert_equal(cs.next_iv(), 1) # auth/decrypt cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1) pdata = cs.decrypt(hdr_mac_iv_cdata) self.assert_equal(data, pdata) - self.assert_equal(hexlify(cs.next_iv()), b'000000000000000000000001') + self.assert_equal(cs.next_iv(), 1) # auth-failure due to corruption (corrupted data) cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1) hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:29] + b'\0' + hdr_mac_iv_cdata[30:] @@ -160,7 +146,7 @@ class CryptoTestCase(BaseTestCase): # test with aad mac_key = None enc_key = b'X' * 32 - iv = b'\0' * 12 + iv = 0 data = b'foo' * 10 header = b'\x12\x34\x56' tests = [ @@ -179,6 +165,7 @@ class CryptoTestCase(BaseTestCase): b'a093e4b0387526f085d3c40cca84a35230a5c0dd766453b77ba38bcff775', ) ] for cs_cls, exp_mac, exp_cdata in tests: + # print(repr(cs_cls)) # encrypt/mac cs = cs_cls(mac_key, enc_key, iv, header_len=3, aad_offset=1) hdr_mac_iv_cdata = cs.encrypt(data, header=header) @@ -190,12 +177,12 @@ class CryptoTestCase(BaseTestCase): self.assert_equal(hexlify(mac), exp_mac) self.assert_equal(hexlify(iv), b'000000000000000000000000') self.assert_equal(hexlify(cdata), exp_cdata) - self.assert_equal(hexlify(cs.next_iv()), b'000000000000000000000001') + self.assert_equal(cs.next_iv(), 1) # auth/decrypt cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1) pdata = cs.decrypt(hdr_mac_iv_cdata) self.assert_equal(data, pdata) - self.assert_equal(hexlify(cs.next_iv()), b'000000000000000000000001') + self.assert_equal(cs.next_iv(), 1) # auth-failure due to corruption (corrupted aad) cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1) hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:1] + b'\0' + hdr_mac_iv_cdata[2:] diff --git a/src/borg/testsuite/key.py b/src/borg/testsuite/key.py index 670af0d2c..b9266328d 100644 --- a/src/borg/testsuite/key.py +++ b/src/borg/testsuite/key.py @@ -117,7 +117,7 @@ class TestKey: def test_keyfile(self, monkeypatch, keys_dir): monkeypatch.setenv('BORG_PASSPHRASE', 'test') key = KeyfileKey.create(self.MockRepository(), self.MockArgs()) - assert bytes_to_long(key.cipher.next_iv(), 8) == 0 + assert key.cipher.next_iv() == 0 manifest = key.encrypt(b'ABC') assert key.cipher.extract_iv(manifest) == 0 manifest2 = key.encrypt(b'ABC') @@ -126,7 +126,7 @@ class TestKey: assert key.cipher.extract_iv(manifest2) == 1 iv = key.cipher.extract_iv(manifest) key2 = KeyfileKey.detect(self.MockRepository(), manifest) - assert bytes_to_long(key2.cipher.next_iv(), 8) >= iv + key2.cipher.block_count(len(manifest) - KeyfileKey.PAYLOAD_OVERHEAD) + assert key2.cipher.next_iv() >= iv + key2.cipher.block_count(len(manifest) - KeyfileKey.PAYLOAD_OVERHEAD) # Key data sanity check assert len({key2.id_key, key2.enc_key, key2.enc_hmac_key}) == 3 assert key2.chunk_seed != 0 @@ -186,7 +186,7 @@ class TestKey: def test_passphrase(self, keys_dir, monkeypatch): monkeypatch.setenv('BORG_PASSPHRASE', 'test') key = PassphraseKey.create(self.MockRepository(), None) - assert bytes_to_long(key.cipher.next_iv(), 8) == 0 + assert key.cipher.next_iv() == 0 assert hexlify(key.id_key) == b'793b0717f9d8fb01c751a487e9b827897ceea62409870600013fbc6b4d8d7ca6' assert hexlify(key.enc_hmac_key) == b'b885a05d329a086627412a6142aaeb9f6c54ab7950f996dd65587251f6bc0901' assert hexlify(key.enc_key) == b'2ff3654c6daf7381dbbe718d2b20b4f1ea1e34caa6cc65f6bb3ac376b93fed2a' @@ -199,7 +199,7 @@ class TestKey: assert key.cipher.extract_iv(manifest2) == 1 iv = key.cipher.extract_iv(manifest) key2 = PassphraseKey.detect(self.MockRepository(), manifest) - assert bytes_to_long(key2.cipher.next_iv(), 8) == iv + key2.cipher.block_count(len(manifest)) + assert key2.cipher.next_iv() == iv + key2.cipher.block_count(len(manifest)) assert key.id_key == key2.id_key assert key.enc_hmac_key == key2.enc_hmac_key assert key.enc_key == key2.enc_key