Added more robust test for AES counter uniqueness

This commit is contained in:
Jonas Borgström 2013-08-12 13:39:46 +02:00
parent 16d9e55f84
commit ee77ce8b98
2 changed files with 45 additions and 1 deletions

View File

@ -22,6 +22,12 @@ bytes_to_long = lambda x, offset=0: _long.unpack_from(x, offset)[0]
long_to_bytes = lambda x: _long.pack(x)
def num_aes_blocks(length):
"""Return the number of AES blocks required to encrypt/decrypt *length* bytes of data
"""
return (length + 15) // 16
def pbkdf2_sha256(password, salt, iterations, size):
key = create_string_buffer(size)
rv = libcrypto.PKCS5_PBKDF2_HMAC(password, len(password), salt, len(salt), iterations, libcrypto.EVP_sha256(), size, key)

View File

@ -7,10 +7,12 @@ import shutil
import tempfile
import time
import unittest
from hashlib import sha256
from attic import xattr
from attic.archiver import Archiver
from attic.repository import Repository
from attic.testsuite import AtticTestCase
from attic.crypto import bytes_to_long, num_aes_blocks
try:
import llfuse
@ -89,7 +91,6 @@ class ArchiverTestCase(AtticTestCase):
sys.stdout, sys.stderr = stdout, stderr
def create_src_archive(self, name):
self.attic('init', self.repository_location)
self.attic('create', self.repository_location + '::' + name, src_dir)
def create_regual_file(self, name, size=0):
@ -201,6 +202,7 @@ class ArchiverTestCase(AtticTestCase):
self.assert_equal(repository._len(), 1)
def test_corrupted_repository(self):
self.attic('init', self.repository_location)
self.create_src_archive('test')
self.attic('verify', self.repository_location + '::test')
name = sorted(os.listdir(os.path.join(self.tmpdir, 'repository', 'data', '0')), reverse=True)[0]
@ -211,6 +213,7 @@ class ArchiverTestCase(AtticTestCase):
self.attic('verify', self.repository_location + '::test', exit_code=1)
def test_readonly_repository(self):
self.attic('init', self.repository_location)
self.create_src_archive('test')
os.system('chmod -R ugo-w ' + self.repository_path)
try:
@ -252,6 +255,41 @@ class ArchiverTestCase(AtticTestCase):
# Give the daemon some time to exit
time.sleep(.2)
def verify_aes_counter_uniqueness(self, method):
seen = set() # Chunks already seen
used = set() # counter values already used
def verify_uniqueness():
repository = Repository(self.repository_path)
for key, _ in repository.index.iteritems():
data = repository.get(key)
hash = sha256(data).digest()
if not hash in seen:
seen.add(hash)
num_blocks = num_aes_blocks(len(data) - 41)
start = bytes_to_long(data[33:41])
for counter in range(start, start + num_blocks):
self.assert_not_in(counter, used)
used.add(counter)
self.create_test_files()
os.environ['ATTIC_PASSPHRASE'] = 'passphrase'
self.attic('init', '--encryption=' + method, self.repository_location)
verify_uniqueness()
self.attic('create', self.repository_location + '::test', 'input')
verify_uniqueness()
self.attic('create', self.repository_location + '::test.2', 'input')
verify_uniqueness()
self.attic('delete', self.repository_location + '::test.2')
verify_uniqueness()
def test_aes_counter_uniqueness_keyfile(self):
self.verify_aes_counter_uniqueness('keyfile')
def test_aes_counter_uniqueness_passphrase(self):
self.verify_aes_counter_uniqueness('passphrase')
class RemoteArchiverTestCase(ArchiverTestCase):
prefix = '__testsuite__:'