diff --git a/attic/crypto.py b/attic/crypto.py index 8d7b32040..862ca616b 100644 --- a/attic/crypto.py +++ b/attic/crypto.py @@ -22,6 +22,12 @@ bytes_to_long = lambda x, offset=0: _long.unpack_from(x, offset)[0] long_to_bytes = lambda x: _long.pack(x) +def num_aes_blocks(length): + """Return the number of AES blocks required to encrypt/decrypt *length* bytes of data + """ + return (length + 15) // 16 + + def pbkdf2_sha256(password, salt, iterations, size): key = create_string_buffer(size) rv = libcrypto.PKCS5_PBKDF2_HMAC(password, len(password), salt, len(salt), iterations, libcrypto.EVP_sha256(), size, key) diff --git a/attic/testsuite/archiver.py b/attic/testsuite/archiver.py index 3dd898cab..048aaba39 100644 --- a/attic/testsuite/archiver.py +++ b/attic/testsuite/archiver.py @@ -7,10 +7,12 @@ import shutil import tempfile import time import unittest +from hashlib import sha256 from attic import xattr from attic.archiver import Archiver from attic.repository import Repository from attic.testsuite import AtticTestCase +from attic.crypto import bytes_to_long, num_aes_blocks try: import llfuse @@ -89,7 +91,6 @@ class ArchiverTestCase(AtticTestCase): sys.stdout, sys.stderr = stdout, stderr def create_src_archive(self, name): - self.attic('init', self.repository_location) self.attic('create', self.repository_location + '::' + name, src_dir) def create_regual_file(self, name, size=0): @@ -201,6 +202,7 @@ class ArchiverTestCase(AtticTestCase): self.assert_equal(repository._len(), 1) def test_corrupted_repository(self): + self.attic('init', self.repository_location) self.create_src_archive('test') self.attic('verify', self.repository_location + '::test') name = sorted(os.listdir(os.path.join(self.tmpdir, 'repository', 'data', '0')), reverse=True)[0] @@ -211,6 +213,7 @@ class ArchiverTestCase(AtticTestCase): self.attic('verify', self.repository_location + '::test', exit_code=1) def test_readonly_repository(self): + self.attic('init', self.repository_location) self.create_src_archive('test') os.system('chmod -R ugo-w ' + self.repository_path) try: @@ -252,6 +255,41 @@ class ArchiverTestCase(AtticTestCase): # Give the daemon some time to exit time.sleep(.2) + def verify_aes_counter_uniqueness(self, method): + seen = set() # Chunks already seen + used = set() # counter values already used + + def verify_uniqueness(): + repository = Repository(self.repository_path) + for key, _ in repository.index.iteritems(): + data = repository.get(key) + hash = sha256(data).digest() + if not hash in seen: + seen.add(hash) + num_blocks = num_aes_blocks(len(data) - 41) + start = bytes_to_long(data[33:41]) + for counter in range(start, start + num_blocks): + self.assert_not_in(counter, used) + used.add(counter) + + self.create_test_files() + os.environ['ATTIC_PASSPHRASE'] = 'passphrase' + self.attic('init', '--encryption=' + method, self.repository_location) + verify_uniqueness() + self.attic('create', self.repository_location + '::test', 'input') + verify_uniqueness() + self.attic('create', self.repository_location + '::test.2', 'input') + verify_uniqueness() + self.attic('delete', self.repository_location + '::test.2') + verify_uniqueness() + + def test_aes_counter_uniqueness_keyfile(self): + self.verify_aes_counter_uniqueness('keyfile') + + def test_aes_counter_uniqueness_passphrase(self): + self.verify_aes_counter_uniqueness('passphrase') + + class RemoteArchiverTestCase(ArchiverTestCase): prefix = '__testsuite__:'