1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2024-12-26 01:37:20 +00:00

set_iv / next iv with integers

This commit is contained in:
Thomas Waldmann 2016-09-08 05:13:23 +02:00
parent 58c2dafbe0
commit 8f1678e2ba
4 changed files with 40 additions and 62 deletions

View file

@ -358,10 +358,9 @@ class AESKeyBase(KeyBase):
def encrypt(self, chunk):
data = self.compressor.compress(chunk)
next_nonce = int.from_bytes(self.cipher.next_iv(), byteorder='big')
next_nonce = self.nonce_manager.ensure_reservation(next_nonce, self.cipher.block_count(len(data)))
iv = next_nonce.to_bytes(self.cipher.iv_len, byteorder='big')
return self.cipher.encrypt(data, header=self.TYPE_STR, iv=iv)
next_iv = self.nonce_manager.ensure_reservation(self.cipher.next_iv(),
self.cipher.block_count(len(data)))
return self.cipher.encrypt(data, header=self.TYPE_STR, iv=next_iv)
def decrypt(self, id, data, decompress=True):
if not (data[0] == self.TYPE or
@ -402,7 +401,7 @@ def init_ciphers(self, manifest_data=None):
# be a bit too high, but that does not matter.
manifest_blocks = num_cipher_blocks(len(manifest_data))
nonce = self.cipher.extract_iv(manifest_data) + manifest_blocks
self.cipher.set_iv(nonce.to_bytes(16, byteorder='big'))
self.cipher.set_iv(nonce)
self.nonce_manager = NonceManager(self.repository, nonce)

View file

@ -153,21 +153,6 @@ def int_to_bytes16(i):
return _2long.pack(h, l)
def increment_iv(iv, amount=1):
"""
Increment the IV by the given amount (default 1).
:param iv: input IV, 16 bytes (128 bit)
:param amount: increment value
:return: input_IV + amount, 16 bytes (128 bit)
"""
assert len(iv) == 16
iv = bytes16_to_int(iv)
iv += amount
iv = int_to_bytes16(iv)
return iv
def num_cipher_blocks(length, blocksize=16):
"""Return the number of cipher blocks required to encrypt/decrypt <length> bytes of data.
@ -393,14 +378,17 @@ cdef class AES256_CTR_HMAC_SHA256:
def set_iv(self, iv):
# set_iv needs to be called before each encrypt() call
if isinstance(iv, int):
iv = iv.to_bytes(self.iv_len, byteorder='big')
assert isinstance(iv, bytes) and len(iv) == self.iv_len
self.blocks = 0 # how many AES blocks got encrypted with this IV?
for i in range(self.iv_len):
self.iv[i] = iv[i]
def next_iv(self):
# call this after encrypt() to get the next iv for the next encrypt() call
return increment_iv(self.iv[:self.iv_len], self.blocks)
# call this after encrypt() to get the next iv (int) for the next encrypt() call
iv = int.from_bytes(self.iv[:self.iv_len], byteorder='big')
return iv + self.blocks
cdef fetch_iv(self, unsigned char * iv_in):
# fetch lower self.iv_len_short bytes of iv and add upper zero bytes
@ -575,21 +563,21 @@ cdef class _AEAD_BASE:
def set_iv(self, iv):
# set_iv needs to be called before each encrypt() call,
# because encrypt does a full initialisation of the cipher context.
if isinstance(iv, int):
iv = iv.to_bytes(self.iv_len, byteorder='big')
assert isinstance(iv, bytes) and len(iv) == self.iv_len
self.blocks = 0 # number of cipher blocks encrypted with this IV
for i in range(self.iv_len):
self.iv[i] = iv[i]
def next_iv(self):
# call this after encrypt() to get the next iv for the next encrypt() call
# call this after encrypt() to get the next iv (int) for the next encrypt() call
# AES-GCM, AES-OCB, CHACHA20 ciphers all add a internal 32bit counter to the 96bit
# (12 byte) IV we provide, thus we only need to increment the IV by 1 (and we must
# not encrypt more than 2^32 cipher blocks with same IV):
assert self.blocks < 2**32
# we need 16 bytes for increment_iv:
last_iv = b'\0' * (16 - self.iv_len) + self.iv[:self.iv_len]
next_iv = increment_iv(last_iv, 1)
return next_iv[-self.iv_len:]
iv = int.from_bytes(self.iv[:self.iv_len], byteorder='big')
return iv + 1
cdef fetch_iv(self, unsigned char * iv_in):
return iv_in[0:self.iv_len]
@ -741,14 +729,18 @@ cdef class AES:
def set_iv(self, iv):
# set_iv needs to be called before each encrypt() call,
# because encrypt does a full initialisation of the cipher context.
if isinstance(iv, int):
iv = iv.to_bytes(self.iv_len, byteorder='big')
assert isinstance(iv, bytes) and len(iv) == self.iv_len
self.blocks = 0 # number of cipher blocks encrypted with this IV
for i in range(self.iv_len):
self.iv[i] = iv[i]
def next_iv(self):
# call this after encrypt() to get the next iv for the next encrypt() call
return increment_iv(self.iv[:self.iv_len], self.blocks)
# call this after encrypt() to get the next iv (int) for the next encrypt() call
iv = int.from_bytes(self.iv[:self.iv_len], byteorder='big')
return iv + self.blocks
def hmac_sha256(key, data):

View file

@ -1,8 +1,8 @@
from binascii import hexlify, unhexlify
from ..crypto.low_level import AES256_CTR_HMAC_SHA256, AES256_GCM, AES256_OCB, CHACHA20_POLY1305, UNENCRYPTED, \
IntegrityError, hmac_sha256, blake2b_256, openssl10
from ..crypto.low_level import bytes_to_long, bytes_to_int, long_to_bytes, bytes16_to_int, int_to_bytes16, increment_iv
IntegrityError, blake2b_256, hmac_sha256, openssl10
from ..crypto.low_level import bytes_to_long, bytes_to_int, long_to_bytes, bytes16_to_int, int_to_bytes16
from ..crypto.low_level import hkdf_hmac_sha512
from . import BaseTestCase
@ -26,21 +26,6 @@ def test_bytes16_to_int(self):
self.assert_equal(bytes16_to_int(b'\0\0\0\0\0\0\0\1\0\0\0\0\0\0\0\0'), 2 ** 64)
self.assert_equal(int_to_bytes16(2 ** 64), b'\0\0\0\0\0\0\0\1\0\0\0\0\0\0\0\0')
def test_increment_iv(self):
iv0 = b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0'
iv1 = b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\1'
iv2 = b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\2'
self.assert_equal(increment_iv(iv0, 0), iv0)
self.assert_equal(increment_iv(iv0, 1), iv1)
self.assert_equal(increment_iv(iv0, 2), iv2)
iva = b'\0\0\0\0\0\0\0\0\xff\xff\xff\xff\xff\xff\xff\xff'
ivb = b'\0\0\0\0\0\0\0\1\x00\x00\x00\x00\x00\x00\x00\x00'
ivc = b'\0\0\0\0\0\0\0\1\x00\x00\x00\x00\x00\x00\x00\x01'
self.assert_equal(increment_iv(iva, 0), iva)
self.assert_equal(increment_iv(iva, 1), ivb)
self.assert_equal(increment_iv(iva, 2), ivc)
self.assert_equal(increment_iv(iv0, 2**64), ivb)
def test_UNENCRYPTED(self):
iv = b'' # any IV is ok, it just must be set and not None
data = b'data'
@ -55,7 +40,7 @@ def test_AES256_CTR_HMAC_SHA256(self):
# this tests the layout as in attic / borg < 1.2 (1 type byte, no aad)
mac_key = b'Y' * 32
enc_key = b'X' * 32
iv = b'\0' * 16
iv = 0
data = b'foo' * 10
header = b'\x42'
# encrypt-then-mac
@ -69,12 +54,12 @@ def test_AES256_CTR_HMAC_SHA256(self):
self.assert_equal(hexlify(mac), b'af90b488b0cc4a8f768fe2d6814fa65aec66b148135e54f7d4d29a27f22f57a8')
self.assert_equal(hexlify(iv), b'0000000000000000')
self.assert_equal(hexlify(cdata), b'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466')
self.assert_equal(hexlify(cs.next_iv()), b'00000000000000000000000000000002')
self.assert_equal(cs.next_iv(), 2)
# auth-then-decrypt
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1)
pdata = cs.decrypt(hdr_mac_iv_cdata)
self.assert_equal(data, pdata)
self.assert_equal(hexlify(cs.next_iv()), b'00000000000000000000000000000002')
self.assert_equal(cs.next_iv(), 2)
# auth-failure due to corruption (corrupted data)
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1)
hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:41] + b'\0' + hdr_mac_iv_cdata[42:]
@ -84,7 +69,7 @@ def test_AES256_CTR_HMAC_SHA256(self):
def test_AES256_CTR_HMAC_SHA256_aad(self):
mac_key = b'Y' * 32
enc_key = b'X' * 32
iv = b'\0' * 16
iv = 0
data = b'foo' * 10
header = b'\x12\x34\x56'
# encrypt-then-mac
@ -98,12 +83,12 @@ def test_AES256_CTR_HMAC_SHA256_aad(self):
self.assert_equal(hexlify(mac), b'7659a915d9927072ef130258052351a17ef882692893c3850dd798c03d2dd138')
self.assert_equal(hexlify(iv), b'0000000000000000')
self.assert_equal(hexlify(cdata), b'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466')
self.assert_equal(hexlify(cs.next_iv()), b'00000000000000000000000000000002')
self.assert_equal(cs.next_iv(), 2)
# auth-then-decrypt
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1)
pdata = cs.decrypt(hdr_mac_iv_cdata)
self.assert_equal(data, pdata)
self.assert_equal(hexlify(cs.next_iv()), b'00000000000000000000000000000002')
self.assert_equal(cs.next_iv(), 2)
# auth-failure due to corruption (corrupted aad)
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1)
hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:1] + b'\0' + hdr_mac_iv_cdata[2:]
@ -114,7 +99,7 @@ def test_AE(self):
# used in legacy-like layout (1 type byte, no aad)
mac_key = None
enc_key = b'X' * 32
iv = b'\0' * 12
iv = 0
data = b'foo' * 10
header = b'\x23'
tests = [
@ -133,6 +118,7 @@ def test_AE(self):
b'a093e4b0387526f085d3c40cca84a35230a5c0dd766453b77ba38bcff775', )
]
for cs_cls, exp_mac, exp_cdata in tests:
# print(repr(cs_cls))
# encrypt/mac
cs = cs_cls(mac_key, enc_key, iv, header_len=1, aad_offset=1)
hdr_mac_iv_cdata = cs.encrypt(data, header=header)
@ -144,12 +130,12 @@ def test_AE(self):
self.assert_equal(hexlify(mac), exp_mac)
self.assert_equal(hexlify(iv), b'000000000000000000000000')
self.assert_equal(hexlify(cdata), exp_cdata)
self.assert_equal(hexlify(cs.next_iv()), b'000000000000000000000001')
self.assert_equal(cs.next_iv(), 1)
# auth/decrypt
cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1)
pdata = cs.decrypt(hdr_mac_iv_cdata)
self.assert_equal(data, pdata)
self.assert_equal(hexlify(cs.next_iv()), b'000000000000000000000001')
self.assert_equal(cs.next_iv(), 1)
# auth-failure due to corruption (corrupted data)
cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1)
hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:29] + b'\0' + hdr_mac_iv_cdata[30:]
@ -160,7 +146,7 @@ def test_AEAD(self):
# test with aad
mac_key = None
enc_key = b'X' * 32
iv = b'\0' * 12
iv = 0
data = b'foo' * 10
header = b'\x12\x34\x56'
tests = [
@ -179,6 +165,7 @@ def test_AEAD(self):
b'a093e4b0387526f085d3c40cca84a35230a5c0dd766453b77ba38bcff775', )
]
for cs_cls, exp_mac, exp_cdata in tests:
# print(repr(cs_cls))
# encrypt/mac
cs = cs_cls(mac_key, enc_key, iv, header_len=3, aad_offset=1)
hdr_mac_iv_cdata = cs.encrypt(data, header=header)
@ -190,12 +177,12 @@ def test_AEAD(self):
self.assert_equal(hexlify(mac), exp_mac)
self.assert_equal(hexlify(iv), b'000000000000000000000000')
self.assert_equal(hexlify(cdata), exp_cdata)
self.assert_equal(hexlify(cs.next_iv()), b'000000000000000000000001')
self.assert_equal(cs.next_iv(), 1)
# auth/decrypt
cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1)
pdata = cs.decrypt(hdr_mac_iv_cdata)
self.assert_equal(data, pdata)
self.assert_equal(hexlify(cs.next_iv()), b'000000000000000000000001')
self.assert_equal(cs.next_iv(), 1)
# auth-failure due to corruption (corrupted aad)
cs = cs_cls(mac_key, enc_key, header_len=len(header), aad_offset=1)
hdr_mac_iv_cdata_corrupted = hdr_mac_iv_cdata[:1] + b'\0' + hdr_mac_iv_cdata[2:]

View file

@ -117,7 +117,7 @@ def test_plaintext(self):
def test_keyfile(self, monkeypatch, keys_dir):
monkeypatch.setenv('BORG_PASSPHRASE', 'test')
key = KeyfileKey.create(self.MockRepository(), self.MockArgs())
assert bytes_to_long(key.cipher.next_iv(), 8) == 0
assert key.cipher.next_iv() == 0
manifest = key.encrypt(b'ABC')
assert key.cipher.extract_iv(manifest) == 0
manifest2 = key.encrypt(b'ABC')
@ -126,7 +126,7 @@ def test_keyfile(self, monkeypatch, keys_dir):
assert key.cipher.extract_iv(manifest2) == 1
iv = key.cipher.extract_iv(manifest)
key2 = KeyfileKey.detect(self.MockRepository(), manifest)
assert bytes_to_long(key2.cipher.next_iv(), 8) >= iv + key2.cipher.block_count(len(manifest) - KeyfileKey.PAYLOAD_OVERHEAD)
assert key2.cipher.next_iv() >= iv + key2.cipher.block_count(len(manifest) - KeyfileKey.PAYLOAD_OVERHEAD)
# Key data sanity check
assert len({key2.id_key, key2.enc_key, key2.enc_hmac_key}) == 3
assert key2.chunk_seed != 0
@ -186,7 +186,7 @@ def test_keyfile_blake2(self, monkeypatch, keys_dir):
def test_passphrase(self, keys_dir, monkeypatch):
monkeypatch.setenv('BORG_PASSPHRASE', 'test')
key = PassphraseKey.create(self.MockRepository(), None)
assert bytes_to_long(key.cipher.next_iv(), 8) == 0
assert key.cipher.next_iv() == 0
assert hexlify(key.id_key) == b'793b0717f9d8fb01c751a487e9b827897ceea62409870600013fbc6b4d8d7ca6'
assert hexlify(key.enc_hmac_key) == b'b885a05d329a086627412a6142aaeb9f6c54ab7950f996dd65587251f6bc0901'
assert hexlify(key.enc_key) == b'2ff3654c6daf7381dbbe718d2b20b4f1ea1e34caa6cc65f6bb3ac376b93fed2a'
@ -199,7 +199,7 @@ def test_passphrase(self, keys_dir, monkeypatch):
assert key.cipher.extract_iv(manifest2) == 1
iv = key.cipher.extract_iv(manifest)
key2 = PassphraseKey.detect(self.MockRepository(), manifest)
assert bytes_to_long(key2.cipher.next_iv(), 8) == iv + key2.cipher.block_count(len(manifest))
assert key2.cipher.next_iv() == iv + key2.cipher.block_count(len(manifest))
assert key.id_key == key2.id_key
assert key.enc_hmac_key == key2.enc_hmac_key
assert key.enc_key == key2.enc_key