borg/darc/key.py

354 lines
12 KiB
Python
Raw Normal View History

from __future__ import with_statement
from getpass import getpass
import os
import msgpack
import shutil
import tempfile
import unittest
import zlib
from Crypto.Cipher import AES
from Crypto.Hash import SHA256, HMAC
from Crypto.Util import Counter
from Crypto.Util.number import bytes_to_long, long_to_bytes
from Crypto.Random import get_random_bytes
2012-02-29 22:31:08 +00:00
from Crypto.Protocol.KDF import PBKDF2
from .helpers import IntegrityError, get_keys_dir, Location
PREFIX = '\0' * 8
KEYFILE = '\0'
PASSPHRASE = '\1'
PLAINTEXT = '\2'
def key_creator(store, args):
if args.keyfile:
return KeyfileKey.create(store, args)
elif args.passphrase:
return PassphraseKey.create(store, args)
else:
return PlaintextKey.create(store, args)
def key_factory(store, manifest_data):
if manifest_data[0] == KEYFILE:
return KeyfileKey.detect(store, manifest_data)
elif manifest_data[0] == PASSPHRASE:
return PassphraseKey.detect(store, manifest_data)
elif manifest_data[0] == PLAINTEXT:
return PlaintextKey.detect(store, manifest_data)
else:
raise Exception('Unkown Key type %d' % ord(manifest_data[0]))
2011-10-29 15:01:07 +00:00
2012-02-29 22:31:08 +00:00
def SHA256_PDF(p, s):
return HMAC.new(p, s, SHA256).digest()
class KeyBase(object):
def id_hash(self, data):
"""Return HMAC hash using the "id" HMAC key
"""
def encrypt(self, data):
pass
def decrypt(self, id, data):
pass
def post_manifest_load(self, config):
pass
def pre_manifest_write(self, manifest):
pass
class PlaintextKey(KeyBase):
TYPE = PLAINTEXT
chunk_seed = 0
@classmethod
def create(cls, store, args):
print 'Encryption NOT enabled.\nUse the --key-file or --passphrase options to enable encryption.'
return cls()
@classmethod
def detect(cls, store, manifest_data):
return cls()
def id_hash(self, data):
return SHA256.new(data).digest()
def encrypt(self, data):
return ''.join([self.TYPE, zlib.compress(data)])
def decrypt(self, id, data):
if data[0] != self.TYPE:
raise IntegrityError('Invalid encryption envelope')
data = zlib.decompress(data[1:])
if id and SHA256.new(data).digest() != id:
raise IntegrityError('Chunk id verification failed')
return data
class AESKeyBase(KeyBase):
def post_manifest_load(self, config):
iv = bytes_to_long(config['aes_counter']) + 100
self.counter = Counter.new(64, initial_value=iv, prefix=PREFIX)
def pre_manifest_write(self, manifest):
manifest.config['aes_counter'] = long_to_bytes(self.counter.next_value(), 8)
def id_hash(self, data):
"""Return HMAC hash using the "id" HMAC key
"""
return HMAC.new(self.id_key, data, SHA256).digest()
def encrypt(self, data):
data = zlib.compress(data)
nonce = long_to_bytes(self.counter.next_value(), 8)
data = ''.join((nonce, AES.new(self.enc_key, AES.MODE_CTR, '',
counter=self.counter).encrypt(data)))
hash = HMAC.new(self.enc_hmac_key, data, SHA256).digest()
return ''.join((self.TYPE, hash, data))
def decrypt(self, id, data):
if data[0] != self.TYPE:
raise IntegrityError('Invalid encryption envelope')
hash = data[1:33]
if HMAC.new(self.enc_hmac_key, data[33:], SHA256).digest() != hash:
raise IntegrityError('Encryption envelope checksum mismatch')
nonce = bytes_to_long(data[33:41])
counter = Counter.new(64, initial_value=nonce, prefix=PREFIX)
data = zlib.decompress(AES.new(self.enc_key, AES.MODE_CTR, counter=counter).decrypt(data[41:]))
if id and HMAC.new(self.id_key, data, SHA256).digest() != id:
raise IntegrityError('Chunk id verification failed')
return data
def init_from_random_data(self, data):
self.enc_key = data[0:32]
self.enc_hmac_key = data[32:64]
self.id_key = data[64:96]
self.chunk_seed = bytes_to_long(data[96:100])
# Convert to signed int32
if self.chunk_seed & 0x80000000:
self.chunk_seed = self.chunk_seed - 0xffffffff - 1
self.counter = Counter.new(64, initial_value=1, prefix=PREFIX)
class PassphraseKey(AESKeyBase):
TYPE = PASSPHRASE
iterations = 10000
@classmethod
def create(cls, store, args):
key = cls()
passphrase = os.environ.get('DARC_PASSPHRASE')
if passphrase is not None:
passphrase2 = passphrase
else:
passphrase, passphrase2 = 1, 2
while passphrase != passphrase2:
passphrase = getpass('Enter passphrase: ')
if not passphrase:
print 'Passphrase must not be blank'
continue
passphrase2 = getpass('Enter same passphrase again: ')
if passphrase != passphrase2:
print 'Passphrases do not match'
key.init(store, passphrase)
if passphrase:
print 'Remember your passphrase. Your data will be inaccessible without it.'
return key
@classmethod
def detect(cls, store, manifest_data):
prompt = 'Enter passphrase for %s: ' % store._location.orig
key = cls()
passphrase = os.environ.get('DARC_PASSPHRASE')
if passphrase is None:
passphrase = getpass(prompt)
while True:
key.init(store, passphrase)
try:
key.decrypt(None, manifest_data)
return key
except IntegrityError:
passphrase = getpass(prompt)
def init(self, store, passphrase):
self.init_from_random_data(PBKDF2(passphrase, store.id, 100, self.iterations, SHA256_PDF))
class KeyfileKey(AESKeyBase):
FILE_ID = 'DARC KEY'
TYPE = KEYFILE
@classmethod
def detect(cls, store, manifest_data):
key = cls()
path = cls.find_key_file(store)
prompt = 'Enter passphrase for key file %s: ' % path
passphrase = os.environ.get('DARC_PASSPHRASE', '')
while not key.load(path, passphrase):
passphrase = getpass(prompt)
return key
@classmethod
def find_key_file(cls, store):
id = store.id.encode('hex')
2011-08-06 11:01:58 +00:00
keys_dir = get_keys_dir()
2011-08-04 13:27:52 +00:00
for name in os.listdir(keys_dir):
filename = os.path.join(keys_dir, name)
with open(filename, 'rb') as fd:
line = fd.readline().strip()
if line and line.startswith(cls.FILE_ID) and line[9:] == id:
2011-08-04 13:27:52 +00:00
return filename
raise Exception('Key file for store with ID %s not found' % id)
def load(self, filename, passphrase):
2011-08-04 13:27:52 +00:00
with open(filename, 'rb') as fd:
cdata = (''.join(fd.readlines()[1:])).decode('base64')
data = self.decrypt_key_file(cdata, passphrase)
if data:
key = msgpack.unpackb(data)
if key['version'] != 1:
raise IntegrityError('Invalid key file header')
self.store_id = key['store_id']
self.enc_key = key['enc_key']
self.enc_hmac_key = key['enc_hmac_key']
self.id_key = key['id_key']
self.chunk_seed = key['chunk_seed']
self.counter = Counter.new(64, initial_value=1, prefix=PREFIX)
self.path = filename
return True
def decrypt_key_file(self, data, passphrase):
d = msgpack.unpackb(data)
assert d['version'] == 1
assert d['algorithm'] == 'SHA256'
key = PBKDF2(passphrase, d['salt'], 32, d['iterations'], SHA256_PDF)
data = AES.new(key, AES.MODE_CTR, counter=Counter.new(128)).decrypt(d['data'])
if HMAC.new(key, data, SHA256).digest() != d['hash']:
return None
return data
def encrypt_key_file(self, data, passphrase):
salt = get_random_bytes(32)
2011-10-27 20:17:47 +00:00
iterations = 10000
key = PBKDF2(passphrase, salt, 32, iterations, SHA256_PDF)
hash = HMAC.new(key, data, SHA256).digest()
cdata = AES.new(key, AES.MODE_CTR, counter=Counter.new(128)).encrypt(data)
d = {
'version': 1,
'salt': salt,
'iterations': iterations,
'algorithm': 'SHA256',
'hash': hash,
'data': cdata,
}
return msgpack.packb(d)
def save(self, path, passphrase):
key = {
'version': 1,
'store_id': self.store_id,
'enc_key': self.enc_key,
'enc_hmac_key': self.enc_hmac_key,
'id_key': self.enc_key,
'chunk_seed': self.chunk_seed,
}
data = self.encrypt_key_file(msgpack.packb(key), passphrase)
with open(path, 'wb') as fd:
fd.write('%s %s\n' % (self.FILE_ID, self.store_id.encode('hex')))
fd.write(data.encode('base64'))
2011-10-27 20:17:47 +00:00
self.path = path
def change_passphrase(self):
passphrase, passphrase2 = 1, 2
while passphrase != passphrase2:
passphrase = getpass('New passphrase: ')
passphrase2 = getpass('Enter same passphrase again: ')
if passphrase != passphrase2:
print 'Passphrases do not match'
self.save(self.path, passphrase)
print 'Key file "%s" updated' % self.path
@classmethod
def create(cls, store, args):
filename = args.store.to_key_filename()
2011-08-04 13:27:52 +00:00
path = filename
i = 1
2011-08-04 13:27:52 +00:00
while os.path.exists(path):
i += 1
path = filename + '.%d' % i
passphrase = os.environ.get('DARC_PASSPHRASE')
if passphrase is not None:
passphrase2 = passphrase
2011-08-06 11:01:58 +00:00
else:
passphrase, passphrase2 = 1, 2
while passphrase != passphrase2:
passphrase = getpass('Enter passphrase (empty for no passphrase):')
passphrase2 = getpass('Enter same passphrase again: ')
if passphrase != passphrase2:
print 'Passphrases do not match'
key = cls()
key.store_id = store.id
key.init_from_random_data(get_random_bytes(100))
key.save(path, passphrase)
print 'Key file "%s" created.' % key.path
print 'Keep this file safe. Your data will be inaccessible without it.'
return key
class KeyTestCase(unittest.TestCase):
class MockStore(object):
id = '\0' * 32
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.keys_path = os.path.join(self.tmpdir, 'keys')
os.mkdir(self.keys_path)
os.environ['DARC_KEYS_DIR'] = self.keys_path
def tearDown(self):
shutil.rmtree(self.tmpdir)
def test_plaintext(self):
key = PlaintextKey.create(None, None)
data = 'foo'
self.assertEqual(key.id_hash(data).encode('hex'), '2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae')
self.assertEqual(data, key.decrypt(key.id_hash(data), key.encrypt(data)))
def test_keyfile(self):
class MockArgs(object):
store = Location(tempfile.mkstemp()[1])
os.environ['DARC_PASSPHRASE'] = 'test'
key = KeyfileKey.create(self.MockStore(), MockArgs())
key = KeyfileKey.detect(self.MockStore(), None)
data = 'foo'
self.assertEqual(data, key.decrypt(key.id_hash(data), key.encrypt(data)))
def test_passphrase(self):
os.environ['DARC_PASSPHRASE'] = 'test'
key = PassphraseKey.create(self.MockStore(), None)
self.assertEqual(key.id_key.encode('hex'), 'f28e915da78a972786da47fee6c4bd2960a421b9bdbdb35a7942eb82552e9a72')
self.assertEqual(key.enc_hmac_key.encode('hex'), '169c6082f209e524ea97e2c75318936f6e93c101b9345942a95491e9ae1738ca')
self.assertEqual(key.enc_key.encode('hex'), 'c05dd423843d4dd32a52e4dc07bb11acabe215917fc5cf3a3df6c92b47af79ba')
self.assertEqual(key.chunk_seed, -324662077)
data = 'foo'
self.assertEqual(key.id_hash(data).encode('hex'), '016c27cd40dc8e84f196f3b43a9424e8472897e09f6935d0d3a82fb41664bad7')
self.assertEqual(data, key.decrypt(key.id_hash(data), key.encrypt(data)))
def suite():
return unittest.TestLoader().loadTestsFromTestCase(KeyTestCase)
if __name__ == '__main__':
unittest.main()