From 76e3cd6dfe21750dad1f40cbec7e3192d9222162 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Borgstr=C3=B6m?= Date: Sun, 16 Jun 2013 23:28:09 +0200 Subject: [PATCH] Replace pycrypto with ctype wrapped libcrypto --- darc/crypto.py | 101 +++++++++++++++++++++++++++++++++++++++++++++++++ darc/key.py | 82 +++++++++++++++++++-------------------- darc/test.py | 3 +- 3 files changed, 144 insertions(+), 42 deletions(-) create mode 100644 darc/crypto.py diff --git a/darc/crypto.py b/darc/crypto.py new file mode 100644 index 000000000..47d2985ee --- /dev/null +++ b/darc/crypto.py @@ -0,0 +1,101 @@ +from binascii import hexlify +from ctypes import cdll, c_char_p, c_int, c_uint, c_void_p, byref, POINTER, create_string_buffer +from ctypes.util import find_library +import struct +import unittest + +libcrypto = cdll.LoadLibrary(find_library('crypto')) +libcrypto.PKCS5_PBKDF2_HMAC.argtypes = (c_char_p, c_int, c_char_p, c_int, c_int, c_void_p, c_int, c_char_p) +libcrypto.EVP_sha256.restype = c_void_p +libcrypto.AES_set_encrypt_key.argtypes = (c_char_p, c_int, c_char_p) +libcrypto.AES_ctr128_encrypt.argtypes = (c_char_p, c_char_p, c_int, c_char_p, c_char_p, c_char_p, POINTER(c_uint)) + +_int = struct.Struct('>I') +_long = struct.Struct('>Q') + +bytes_to_int = lambda x, offset=0: _int.unpack_from(x, offset)[0] +bytes_to_long = lambda x, offset=0: _long.unpack_from(x, offset)[0] +long_to_bytes = lambda x: _long.pack(x) + + +def pbkdf2_sha256(password, salt, iterations, size): + key = create_string_buffer(size) + rv = libcrypto.PKCS5_PBKDF2_HMAC(password, len(password), salt, len(salt), iterations, libcrypto.EVP_sha256(), size, key) + if not rv: + raise Exception('PKCS5_PBKDF2_HMAC failed') + return key.raw + + +def get_random_bytes(n): + """Return n cryptographically strong pseudo-random bytes + """ + buf = create_string_buffer(n) + if not libcrypto.RAND_bytes(buf, n): + raise Exception('RAND_bytes failed') + return buf.raw + + +class AES: + def __init__(self, key, iv=None): + self.key = create_string_buffer(2000) + self.iv = create_string_buffer(16) + self.buf = create_string_buffer(16) + self.num = c_uint() + self.reset(key, iv) + + def reset(self, key=None, iv=None): + if key: + libcrypto.AES_set_encrypt_key(key, len(key) * 8, self.key) + if iv: + self.iv.raw = iv + self.num.value = 0 + + def encrypt(self, data): + out = create_string_buffer(len(data)) + libcrypto.AES_ctr128_encrypt(data, out, len(data), self.key, self.iv, self.buf, self.num) + return out.raw + decrypt = encrypt + + +class CryptoTestCase(unittest.TestCase): + + def test_bytes_to_int(self): + self.assertEqual(bytes_to_int(b'\0\0\0\1'), 1) + + def test_bytes_to_long(self): + self.assertEqual(bytes_to_long(b'\0\0\0\0\0\0\0\1'), 1) + self.assertEqual(long_to_bytes(1), b'\0\0\0\0\0\0\0\1') + + def test_pbkdf2_sha256(self): + self.assertEqual(hexlify(pbkdf2_sha256(b'password', b'salt', 1, 32)), + b'120fb6cffcf8b32c43e7225256c4f837a86548c92ccc35480805987cb70be17b') + self.assertEqual(hexlify(pbkdf2_sha256(b'password', b'salt', 2, 32)), + b'ae4d0c95af6b46d32d0adff928f06dd02a303f8ef3c251dfd6e2d85a95474c43') + self.assertEqual(hexlify(pbkdf2_sha256(b'password', b'salt', 4096, 32)), + b'c5e478d59288c841aa530db6845c4c8d962893a001ce4e11a4963873aa98134a') + + def test_get_random_bytes(self): + bytes = get_random_bytes(10) + bytes2 = get_random_bytes(10) + self.assertEqual(len(bytes), 10) + self.assertEqual(len(bytes2), 10) + self.assertNotEqual(bytes, bytes2) + + def test_aes(self): + key = b'X' * 32 + data = b'foo' * 10 + aes = AES(key) + self.assertEqual(bytes_to_long(aes.iv.raw, 8), 0) + cdata = aes.encrypt(data) + self.assertEqual(hexlify(cdata), b'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466') + self.assertEqual(bytes_to_long(aes.iv.raw, 8), 2) + self.assertNotEqual(data, aes.decrypt(cdata)) + aes.reset(iv=b'\0' * 16) + self.assertEqual(data, aes.decrypt(cdata)) + + +def suite(): + return unittest.TestLoader().loadTestsFromTestCase(CryptoTestCase) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/darc/key.py b/darc/key.py index b99ab57f5..ffc9c303e 100644 --- a/darc/key.py +++ b/darc/key.py @@ -5,15 +5,11 @@ import shutil import tempfile import unittest +import hmac +from hashlib import sha256 import zlib -from Crypto.Cipher import AES -from Crypto.Hash import SHA256, HMAC -from Crypto.Util import Counter -from Crypto.Util.number import bytes_to_long, long_to_bytes -from Crypto.Random import get_random_bytes -from Crypto.Protocol.KDF import PBKDF2 - +from .crypto import pbkdf2_sha256, get_random_bytes, AES, bytes_to_long, long_to_bytes, bytes_to_int from .helpers import IntegrityError, get_keys_dir, Location PREFIX = b'\0' * 8 @@ -22,6 +18,11 @@ PASSPHRASE = b'\1' PLAINTEXT = b'\2' +class HMAC(hmac.HMAC): + + def update(self, msg): + self.inner.update(msg) + def key_creator(store, args): if args.keyfile: @@ -43,10 +44,6 @@ def key_factory(store, manifest_data): raise Exception('Unkown Key type %d' % ord(manifest_data[0])) -def SHA256_PDF(p, s): - return HMAC.new(p, s, SHA256).digest() - - class KeyBase(object): def id_hash(self, data): @@ -75,7 +72,7 @@ def detect(cls, store, manifest_data): return cls() def id_hash(self, data): - return SHA256.new(data).digest() + return sha256(data).digest() def encrypt(self, data): return b''.join([self.TYPE, zlib.compress(data)]) @@ -84,7 +81,7 @@ def decrypt(self, id, data): if data[:1] != self.TYPE: raise IntegrityError('Invalid encryption envelope') data = zlib.decompress(memoryview(data)[1:]) - if id and SHA256.new(data).digest() != id: + if id and sha256(data).digest() != id: raise IntegrityError('Chunk id verification failed') return data @@ -94,26 +91,24 @@ class AESKeyBase(KeyBase): def id_hash(self, data): """Return HMAC hash using the "id" HMAC key """ - return HMAC.new(self.id_key, data, SHA256).digest() + return HMAC(self.id_key, data, sha256).digest() def encrypt(self, data): data = zlib.compress(data) - nonce = long_to_bytes(self.counter.next_value(), 8) - data = b''.join((nonce, AES.new(self.enc_key, AES.MODE_CTR, b'', - counter=self.counter).encrypt(data))) - hash = HMAC.new(self.enc_hmac_key, data, SHA256).digest() + self.enc_cipher.reset() + data = b''.join((self.enc_cipher.iv[8:], self.enc_cipher.encrypt(data))) + hash = HMAC(self.enc_hmac_key, data, sha256).digest() return b''.join((self.TYPE, hash, data)) def decrypt(self, id, data): if data[:1] != self.TYPE: raise IntegrityError('Invalid encryption envelope') hash = memoryview(data)[1:33] - if memoryview(HMAC.new(self.enc_hmac_key, memoryview(data)[33:], SHA256).digest()) != hash: + if memoryview(HMAC(self.enc_hmac_key, memoryview(data)[33:], sha256).digest()) != hash: raise IntegrityError('Encryption envelope checksum mismatch') - nonce = bytes_to_long(memoryview(data)[33:41]) - counter = Counter.new(64, initial_value=nonce, prefix=PREFIX) - data = zlib.decompress(AES.new(self.enc_key, AES.MODE_CTR, counter=counter).decrypt(memoryview(data)[41:])) - if id and HMAC.new(self.id_key, data, SHA256).digest() != id: + self.dec_cipher.reset(iv=PREFIX + data[33:41]) + data = zlib.decompress(self.dec_cipher.decrypt(data[41:])) # should use memoryview + if id and HMAC(self.id_key, data, sha256).digest() != id: raise IntegrityError('Chunk id verification failed') return data @@ -127,11 +122,14 @@ def init_from_random_data(self, data): self.enc_key = data[0:32] self.enc_hmac_key = data[32:64] self.id_key = data[64:96] - self.chunk_seed = bytes_to_long(data[96:100]) + self.chunk_seed = bytes_to_int(data[96:100]) # Convert to signed int32 if self.chunk_seed & 0x80000000: self.chunk_seed = self.chunk_seed - 0xffffffff - 1 - self.counter = Counter.new(64, initial_value=1, prefix=PREFIX) + + def init_ciphers(self, enc_iv=b''): + self.enc_cipher = AES(self.enc_key, enc_iv) + self.dec_cipher = AES(self.enc_key) class PassphraseKey(AESKeyBase): @@ -170,20 +168,22 @@ def detect(cls, store, manifest_data): key.init(store, passphrase) try: key.decrypt(None, manifest_data) - iv = key.extract_iv(manifest_data) - key.counter = Counter.new(64, initial_value=iv + 1000, prefix=PREFIX) + key.init_ciphers(PREFIX + long_to_bytes(key.extract_iv(manifest_data) + 1000)) return key except IntegrityError: passphrase = getpass(prompt) def init(self, store, passphrase): - self.init_from_random_data(PBKDF2(passphrase, store.id, 100, self.iterations, SHA256_PDF)) + self.init_from_random_data(pbkdf2_sha256(passphrase.encode('utf-8'), store.id, self.iterations, 100)) + self.init_ciphers() class KeyfileKey(AESKeyBase): FILE_ID = 'DARC KEY' TYPE = KEYFILE + IV = PREFIX + long_to_bytes(1) + @classmethod def detect(cls, store, manifest_data): key = cls() @@ -192,8 +192,7 @@ def detect(cls, store, manifest_data): passphrase = os.environ.get('DARC_PASSPHRASE', '') while not key.load(path, passphrase): passphrase = getpass(prompt) - iv = key.extract_iv(manifest_data) - key.counter = Counter.new(64, initial_value=iv + 1000, prefix=PREFIX) + key.init_ciphers(PREFIX + long_to_bytes(key.extract_iv(manifest_data) + 1000)) return key @classmethod @@ -221,7 +220,6 @@ def load(self, filename, passphrase): self.enc_hmac_key = key[b'enc_hmac_key'] self.id_key = key[b'id_key'] self.chunk_seed = key[b'chunk_seed'] - self.counter = Counter.new(64, initial_value=1, prefix=PREFIX) self.path = filename return True @@ -229,18 +227,19 @@ def decrypt_key_file(self, data, passphrase): d = msgpack.unpackb(data) assert d[b'version'] == 1 assert d[b'algorithm'] == b'SHA256' - key = PBKDF2(passphrase, d[b'salt'], 32, d[b'iterations'], SHA256_PDF) - data = AES.new(key, AES.MODE_CTR, counter=Counter.new(128)).decrypt(d[b'data']) - if HMAC.new(key, data, SHA256).digest() != d[b'hash']: + key = pbkdf2_sha256(passphrase.encode('utf-8'), d[b'salt'], d[b'iterations'], 32) + data = AES(key, self.IV).decrypt(d[b'data']) + if HMAC(key, data, sha256).digest() != d[b'hash']: return None return data def encrypt_key_file(self, data, passphrase): salt = get_random_bytes(32) iterations = 10000 - key = PBKDF2(passphrase, salt, 32, iterations, SHA256_PDF) - hash = HMAC.new(key, data, SHA256).digest() - cdata = AES.new(key, AES.MODE_CTR, counter=Counter.new(128)).encrypt(data) + key = pbkdf2_sha256(passphrase.encode('utf-8'), salt, iterations, 32) + hash = HMAC(key, data, sha256).digest() + cdata = AES(key, self.IV).encrypt(data) +# cdata = AES.new(key, AES.MODE_CTR, counter=Counter.new(128)).encrypt(data) d = { 'version': 1, 'salt': salt, @@ -297,6 +296,7 @@ def create(cls, store, args): key = cls() key.store_id = store.id key.init_from_random_data(get_random_bytes(100)) + key.init_ciphers() key.save(path, passphrase) print('Key file "%s" created.' % key.path) print('Keep this file safe. Your data will be inaccessible without it.') @@ -337,11 +337,11 @@ class MockArgs(object): store = Location(tempfile.mkstemp()[1]) os.environ['DARC_PASSPHRASE'] = 'test' key = KeyfileKey.create(self.MockStore(), MockArgs()) - self.assertEqual(bytes_to_long(key.counter()), 1) + self.assertEqual(bytes_to_long(key.enc_cipher.iv, 8), 0) manifest = key.encrypt(b'') iv = key.extract_iv(manifest) key2 = KeyfileKey.detect(self.MockStore(), manifest) - self.assertEqual(bytes_to_long(key2.counter()), iv + 1000) + self.assertEqual(bytes_to_long(key2.enc_cipher.iv, 8), iv + 1000) # Key data sanity check self.assertEqual(len(set([key2.id_key, key2.enc_key, key2.enc_hmac_key])), 3) self.assertEqual(key2.chunk_seed == 0, False) @@ -351,7 +351,7 @@ class MockArgs(object): def test_passphrase(self): os.environ['DARC_PASSPHRASE'] = 'test' key = PassphraseKey.create(self.MockStore(), None) - self.assertEqual(bytes_to_long(key.counter()), 1) + self.assertEqual(bytes_to_long(key.enc_cipher.iv, 8), 0) self.assertEqual(hexlify(key.id_key), b'f28e915da78a972786da47fee6c4bd2960a421b9bdbdb35a7942eb82552e9a72') self.assertEqual(hexlify(key.enc_hmac_key), b'169c6082f209e524ea97e2c75318936f6e93c101b9345942a95491e9ae1738ca') self.assertEqual(hexlify(key.enc_key), b'c05dd423843d4dd32a52e4dc07bb11acabe215917fc5cf3a3df6c92b47af79ba') @@ -359,7 +359,7 @@ def test_passphrase(self): manifest = key.encrypt(b'') iv = key.extract_iv(manifest) key2 = PassphraseKey.detect(self.MockStore(), manifest) - self.assertEqual(bytes_to_long(key2.counter()), iv + 1000) + self.assertEqual(bytes_to_long(key2.enc_cipher.iv, 8), iv + 1000) self.assertEqual(key.id_key, key2.id_key) self.assertEqual(key.enc_hmac_key, key2.enc_hmac_key) self.assertEqual(key.enc_key, key2.enc_key) diff --git a/darc/test.py b/darc/test.py index 6be147fcd..e03496f80 100644 --- a/darc/test.py +++ b/darc/test.py @@ -9,7 +9,7 @@ import unittest import xattr -from . import helpers, lrucache +from . import helpers, lrucache, crypto from .chunker import chunkify, buzhash, buzhash_update from .archiver import Archiver from .key import suite as KeySuite @@ -228,6 +228,7 @@ def suite(): suite.addTest(RemoteStoreSuite()) suite.addTest(doctest.DocTestSuite(helpers)) suite.addTest(lrucache.suite()) + suite.addTest(crypto.suite()) return suite if __name__ == '__main__':