1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2025-02-23 22:51:35 +00:00

Replace pycrypto with ctype wrapped libcrypto

This commit is contained in:
Jonas Borgström 2013-06-16 23:28:09 +02:00
parent ad08664b65
commit 76e3cd6dfe
3 changed files with 144 additions and 42 deletions

101
darc/crypto.py Normal file
View file

@ -0,0 +1,101 @@
from binascii import hexlify
from ctypes import cdll, c_char_p, c_int, c_uint, c_void_p, byref, POINTER, create_string_buffer
from ctypes.util import find_library
import struct
import unittest
libcrypto = cdll.LoadLibrary(find_library('crypto'))
libcrypto.PKCS5_PBKDF2_HMAC.argtypes = (c_char_p, c_int, c_char_p, c_int, c_int, c_void_p, c_int, c_char_p)
libcrypto.EVP_sha256.restype = c_void_p
libcrypto.AES_set_encrypt_key.argtypes = (c_char_p, c_int, c_char_p)
libcrypto.AES_ctr128_encrypt.argtypes = (c_char_p, c_char_p, c_int, c_char_p, c_char_p, c_char_p, POINTER(c_uint))
_int = struct.Struct('>I')
_long = struct.Struct('>Q')
bytes_to_int = lambda x, offset=0: _int.unpack_from(x, offset)[0]
bytes_to_long = lambda x, offset=0: _long.unpack_from(x, offset)[0]
long_to_bytes = lambda x: _long.pack(x)
def pbkdf2_sha256(password, salt, iterations, size):
key = create_string_buffer(size)
rv = libcrypto.PKCS5_PBKDF2_HMAC(password, len(password), salt, len(salt), iterations, libcrypto.EVP_sha256(), size, key)
if not rv:
raise Exception('PKCS5_PBKDF2_HMAC failed')
return key.raw
def get_random_bytes(n):
"""Return n cryptographically strong pseudo-random bytes
"""
buf = create_string_buffer(n)
if not libcrypto.RAND_bytes(buf, n):
raise Exception('RAND_bytes failed')
return buf.raw
class AES:
def __init__(self, key, iv=None):
self.key = create_string_buffer(2000)
self.iv = create_string_buffer(16)
self.buf = create_string_buffer(16)
self.num = c_uint()
self.reset(key, iv)
def reset(self, key=None, iv=None):
if key:
libcrypto.AES_set_encrypt_key(key, len(key) * 8, self.key)
if iv:
self.iv.raw = iv
self.num.value = 0
def encrypt(self, data):
out = create_string_buffer(len(data))
libcrypto.AES_ctr128_encrypt(data, out, len(data), self.key, self.iv, self.buf, self.num)
return out.raw
decrypt = encrypt
class CryptoTestCase(unittest.TestCase):
def test_bytes_to_int(self):
self.assertEqual(bytes_to_int(b'\0\0\0\1'), 1)
def test_bytes_to_long(self):
self.assertEqual(bytes_to_long(b'\0\0\0\0\0\0\0\1'), 1)
self.assertEqual(long_to_bytes(1), b'\0\0\0\0\0\0\0\1')
def test_pbkdf2_sha256(self):
self.assertEqual(hexlify(pbkdf2_sha256(b'password', b'salt', 1, 32)),
b'120fb6cffcf8b32c43e7225256c4f837a86548c92ccc35480805987cb70be17b')
self.assertEqual(hexlify(pbkdf2_sha256(b'password', b'salt', 2, 32)),
b'ae4d0c95af6b46d32d0adff928f06dd02a303f8ef3c251dfd6e2d85a95474c43')
self.assertEqual(hexlify(pbkdf2_sha256(b'password', b'salt', 4096, 32)),
b'c5e478d59288c841aa530db6845c4c8d962893a001ce4e11a4963873aa98134a')
def test_get_random_bytes(self):
bytes = get_random_bytes(10)
bytes2 = get_random_bytes(10)
self.assertEqual(len(bytes), 10)
self.assertEqual(len(bytes2), 10)
self.assertNotEqual(bytes, bytes2)
def test_aes(self):
key = b'X' * 32
data = b'foo' * 10
aes = AES(key)
self.assertEqual(bytes_to_long(aes.iv.raw, 8), 0)
cdata = aes.encrypt(data)
self.assertEqual(hexlify(cdata), b'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466')
self.assertEqual(bytes_to_long(aes.iv.raw, 8), 2)
self.assertNotEqual(data, aes.decrypt(cdata))
aes.reset(iv=b'\0' * 16)
self.assertEqual(data, aes.decrypt(cdata))
def suite():
return unittest.TestLoader().loadTestsFromTestCase(CryptoTestCase)
if __name__ == '__main__':
unittest.main()

View file

@ -5,15 +5,11 @@
import shutil
import tempfile
import unittest
import hmac
from hashlib import sha256
import zlib
from Crypto.Cipher import AES
from Crypto.Hash import SHA256, HMAC
from Crypto.Util import Counter
from Crypto.Util.number import bytes_to_long, long_to_bytes
from Crypto.Random import get_random_bytes
from Crypto.Protocol.KDF import PBKDF2
from .crypto import pbkdf2_sha256, get_random_bytes, AES, bytes_to_long, long_to_bytes, bytes_to_int
from .helpers import IntegrityError, get_keys_dir, Location
PREFIX = b'\0' * 8
@ -22,6 +18,11 @@
PASSPHRASE = b'\1'
PLAINTEXT = b'\2'
class HMAC(hmac.HMAC):
def update(self, msg):
self.inner.update(msg)
def key_creator(store, args):
if args.keyfile:
@ -43,10 +44,6 @@ def key_factory(store, manifest_data):
raise Exception('Unkown Key type %d' % ord(manifest_data[0]))
def SHA256_PDF(p, s):
return HMAC.new(p, s, SHA256).digest()
class KeyBase(object):
def id_hash(self, data):
@ -75,7 +72,7 @@ def detect(cls, store, manifest_data):
return cls()
def id_hash(self, data):
return SHA256.new(data).digest()
return sha256(data).digest()
def encrypt(self, data):
return b''.join([self.TYPE, zlib.compress(data)])
@ -84,7 +81,7 @@ def decrypt(self, id, data):
if data[:1] != self.TYPE:
raise IntegrityError('Invalid encryption envelope')
data = zlib.decompress(memoryview(data)[1:])
if id and SHA256.new(data).digest() != id:
if id and sha256(data).digest() != id:
raise IntegrityError('Chunk id verification failed')
return data
@ -94,26 +91,24 @@ class AESKeyBase(KeyBase):
def id_hash(self, data):
"""Return HMAC hash using the "id" HMAC key
"""
return HMAC.new(self.id_key, data, SHA256).digest()
return HMAC(self.id_key, data, sha256).digest()
def encrypt(self, data):
data = zlib.compress(data)
nonce = long_to_bytes(self.counter.next_value(), 8)
data = b''.join((nonce, AES.new(self.enc_key, AES.MODE_CTR, b'',
counter=self.counter).encrypt(data)))
hash = HMAC.new(self.enc_hmac_key, data, SHA256).digest()
self.enc_cipher.reset()
data = b''.join((self.enc_cipher.iv[8:], self.enc_cipher.encrypt(data)))
hash = HMAC(self.enc_hmac_key, data, sha256).digest()
return b''.join((self.TYPE, hash, data))
def decrypt(self, id, data):
if data[:1] != self.TYPE:
raise IntegrityError('Invalid encryption envelope')
hash = memoryview(data)[1:33]
if memoryview(HMAC.new(self.enc_hmac_key, memoryview(data)[33:], SHA256).digest()) != hash:
if memoryview(HMAC(self.enc_hmac_key, memoryview(data)[33:], sha256).digest()) != hash:
raise IntegrityError('Encryption envelope checksum mismatch')
nonce = bytes_to_long(memoryview(data)[33:41])
counter = Counter.new(64, initial_value=nonce, prefix=PREFIX)
data = zlib.decompress(AES.new(self.enc_key, AES.MODE_CTR, counter=counter).decrypt(memoryview(data)[41:]))
if id and HMAC.new(self.id_key, data, SHA256).digest() != id:
self.dec_cipher.reset(iv=PREFIX + data[33:41])
data = zlib.decompress(self.dec_cipher.decrypt(data[41:])) # should use memoryview
if id and HMAC(self.id_key, data, sha256).digest() != id:
raise IntegrityError('Chunk id verification failed')
return data
@ -127,11 +122,14 @@ def init_from_random_data(self, data):
self.enc_key = data[0:32]
self.enc_hmac_key = data[32:64]
self.id_key = data[64:96]
self.chunk_seed = bytes_to_long(data[96:100])
self.chunk_seed = bytes_to_int(data[96:100])
# Convert to signed int32
if self.chunk_seed & 0x80000000:
self.chunk_seed = self.chunk_seed - 0xffffffff - 1
self.counter = Counter.new(64, initial_value=1, prefix=PREFIX)
def init_ciphers(self, enc_iv=b''):
self.enc_cipher = AES(self.enc_key, enc_iv)
self.dec_cipher = AES(self.enc_key)
class PassphraseKey(AESKeyBase):
@ -170,20 +168,22 @@ def detect(cls, store, manifest_data):
key.init(store, passphrase)
try:
key.decrypt(None, manifest_data)
iv = key.extract_iv(manifest_data)
key.counter = Counter.new(64, initial_value=iv + 1000, prefix=PREFIX)
key.init_ciphers(PREFIX + long_to_bytes(key.extract_iv(manifest_data) + 1000))
return key
except IntegrityError:
passphrase = getpass(prompt)
def init(self, store, passphrase):
self.init_from_random_data(PBKDF2(passphrase, store.id, 100, self.iterations, SHA256_PDF))
self.init_from_random_data(pbkdf2_sha256(passphrase.encode('utf-8'), store.id, self.iterations, 100))
self.init_ciphers()
class KeyfileKey(AESKeyBase):
FILE_ID = 'DARC KEY'
TYPE = KEYFILE
IV = PREFIX + long_to_bytes(1)
@classmethod
def detect(cls, store, manifest_data):
key = cls()
@ -192,8 +192,7 @@ def detect(cls, store, manifest_data):
passphrase = os.environ.get('DARC_PASSPHRASE', '')
while not key.load(path, passphrase):
passphrase = getpass(prompt)
iv = key.extract_iv(manifest_data)
key.counter = Counter.new(64, initial_value=iv + 1000, prefix=PREFIX)
key.init_ciphers(PREFIX + long_to_bytes(key.extract_iv(manifest_data) + 1000))
return key
@classmethod
@ -221,7 +220,6 @@ def load(self, filename, passphrase):
self.enc_hmac_key = key[b'enc_hmac_key']
self.id_key = key[b'id_key']
self.chunk_seed = key[b'chunk_seed']
self.counter = Counter.new(64, initial_value=1, prefix=PREFIX)
self.path = filename
return True
@ -229,18 +227,19 @@ def decrypt_key_file(self, data, passphrase):
d = msgpack.unpackb(data)
assert d[b'version'] == 1
assert d[b'algorithm'] == b'SHA256'
key = PBKDF2(passphrase, d[b'salt'], 32, d[b'iterations'], SHA256_PDF)
data = AES.new(key, AES.MODE_CTR, counter=Counter.new(128)).decrypt(d[b'data'])
if HMAC.new(key, data, SHA256).digest() != d[b'hash']:
key = pbkdf2_sha256(passphrase.encode('utf-8'), d[b'salt'], d[b'iterations'], 32)
data = AES(key, self.IV).decrypt(d[b'data'])
if HMAC(key, data, sha256).digest() != d[b'hash']:
return None
return data
def encrypt_key_file(self, data, passphrase):
salt = get_random_bytes(32)
iterations = 10000
key = PBKDF2(passphrase, salt, 32, iterations, SHA256_PDF)
hash = HMAC.new(key, data, SHA256).digest()
cdata = AES.new(key, AES.MODE_CTR, counter=Counter.new(128)).encrypt(data)
key = pbkdf2_sha256(passphrase.encode('utf-8'), salt, iterations, 32)
hash = HMAC(key, data, sha256).digest()
cdata = AES(key, self.IV).encrypt(data)
# cdata = AES.new(key, AES.MODE_CTR, counter=Counter.new(128)).encrypt(data)
d = {
'version': 1,
'salt': salt,
@ -297,6 +296,7 @@ def create(cls, store, args):
key = cls()
key.store_id = store.id
key.init_from_random_data(get_random_bytes(100))
key.init_ciphers()
key.save(path, passphrase)
print('Key file "%s" created.' % key.path)
print('Keep this file safe. Your data will be inaccessible without it.')
@ -337,11 +337,11 @@ class MockArgs(object):
store = Location(tempfile.mkstemp()[1])
os.environ['DARC_PASSPHRASE'] = 'test'
key = KeyfileKey.create(self.MockStore(), MockArgs())
self.assertEqual(bytes_to_long(key.counter()), 1)
self.assertEqual(bytes_to_long(key.enc_cipher.iv, 8), 0)
manifest = key.encrypt(b'')
iv = key.extract_iv(manifest)
key2 = KeyfileKey.detect(self.MockStore(), manifest)
self.assertEqual(bytes_to_long(key2.counter()), iv + 1000)
self.assertEqual(bytes_to_long(key2.enc_cipher.iv, 8), iv + 1000)
# Key data sanity check
self.assertEqual(len(set([key2.id_key, key2.enc_key, key2.enc_hmac_key])), 3)
self.assertEqual(key2.chunk_seed == 0, False)
@ -351,7 +351,7 @@ class MockArgs(object):
def test_passphrase(self):
os.environ['DARC_PASSPHRASE'] = 'test'
key = PassphraseKey.create(self.MockStore(), None)
self.assertEqual(bytes_to_long(key.counter()), 1)
self.assertEqual(bytes_to_long(key.enc_cipher.iv, 8), 0)
self.assertEqual(hexlify(key.id_key), b'f28e915da78a972786da47fee6c4bd2960a421b9bdbdb35a7942eb82552e9a72')
self.assertEqual(hexlify(key.enc_hmac_key), b'169c6082f209e524ea97e2c75318936f6e93c101b9345942a95491e9ae1738ca')
self.assertEqual(hexlify(key.enc_key), b'c05dd423843d4dd32a52e4dc07bb11acabe215917fc5cf3a3df6c92b47af79ba')
@ -359,7 +359,7 @@ def test_passphrase(self):
manifest = key.encrypt(b'')
iv = key.extract_iv(manifest)
key2 = PassphraseKey.detect(self.MockStore(), manifest)
self.assertEqual(bytes_to_long(key2.counter()), iv + 1000)
self.assertEqual(bytes_to_long(key2.enc_cipher.iv, 8), iv + 1000)
self.assertEqual(key.id_key, key2.id_key)
self.assertEqual(key.enc_hmac_key, key2.enc_hmac_key)
self.assertEqual(key.enc_key, key2.enc_key)

View file

@ -9,7 +9,7 @@
import unittest
import xattr
from . import helpers, lrucache
from . import helpers, lrucache, crypto
from .chunker import chunkify, buzhash, buzhash_update
from .archiver import Archiver
from .key import suite as KeySuite
@ -228,6 +228,7 @@ def suite():
suite.addTest(RemoteStoreSuite())
suite.addTest(doctest.DocTestSuite(helpers))
suite.addTest(lrucache.suite())
suite.addTest(crypto.suite())
return suite
if __name__ == '__main__':