move key loading/saving code to FlexiKey

This commit is contained in:
Thomas Waldmann 2022-03-06 19:44:35 +01:00
parent ad405892ca
commit 8011fade91
2 changed files with 111 additions and 94 deletions

View File

@ -103,6 +103,17 @@ class KeyBlobStorage:
REPO = 'repository'
class KeyType:
KEYFILE = 0x00
PASSPHRASE = 0x01 # legacy, attic and borg < 1.0
PLAINTEXT = 0x02
REPO = 0x03
BLAKE2KEYFILE = 0x04
BLAKE2REPO = 0x05
BLAKE2AUTHENTICATED = 0x06
AUTHENTICATED = 0x07
def key_creator(repository, args):
for key in AVAILABLE_KEY_TYPES:
if key.ARG_NAME == args.encryption:
@ -260,7 +271,7 @@ class KeyBase:
class PlaintextKey(KeyBase):
TYPE = 0x02
TYPE = KeyType.PLAINTEXT
TYPES_ACCEPTABLE = {TYPE}
NAME = 'plaintext'
ARG_NAME = 'none'
@ -518,15 +529,6 @@ class Passphrase(str):
return pbkdf2_hmac('sha256', self.encode('utf-8'), salt, iterations, length)
class PassphraseKey:
# this is only a stub, repos with this mode could not be created any more since borg 1.0, see #97.
# in borg 1.3 all of its code and also the "borg key migrate-to-repokey" command was removed.
# if you still need to, you can use "borg key migrate-to-repokey" with borg 1.0, 1.1 and 1.2.
# Nowadays, we just dispatch this to RepoKey and assume the passphrase was migrated to a repokey.
TYPE = 0x01
NAME = 'passphrase'
class FlexiKeyBase(AESKeyBase):
@classmethod
def detect(cls, repository, manifest_data):
@ -640,12 +642,8 @@ class FlexiKeyBase(AESKeyBase):
raise NotImplementedError
class KeyfileKey(ID_HMAC_SHA_256, FlexiKeyBase):
TYPE = 0x00
TYPES_ACCEPTABLE = {TYPE}
NAME = 'key file'
ARG_NAME = 'keyfile'
STORAGE = KeyBlobStorage.KEYFILE
class FlexiKey(ID_HMAC_SHA_256, FlexiKeyBase):
TYPES_ACCEPTABLE = {KeyType.KEYFILE, KeyType.REPO, KeyType.PASSPHRASE}
FILE_ID = 'BORG_KEY'
@ -662,13 +660,22 @@ class KeyfileKey(ID_HMAC_SHA_256, FlexiKeyBase):
return filename
def find_key(self):
keyfile = self._find_key_file_from_environment()
if keyfile is not None:
return self.sanity_check(keyfile, self.repository.id)
keyfile = self._find_key_in_keys_dir()
if keyfile is not None:
return keyfile
raise KeyfileNotFoundError(self.repository._location.canonical_path(), get_keys_dir())
if self.STORAGE == KeyBlobStorage.KEYFILE:
keyfile = self._find_key_file_from_environment()
if keyfile is not None:
return self.sanity_check(keyfile, self.repository.id)
keyfile = self._find_key_in_keys_dir()
if keyfile is not None:
return keyfile
raise KeyfileNotFoundError(self.repository._location.canonical_path(), get_keys_dir())
if self.STORAGE == KeyBlobStorage.REPO:
loc = self.repository._location.canonical_path()
try:
self.repository.load_key()
return loc
except configparser.NoOptionError:
raise RepoKeyNotFoundError(loc) from None
def get_existing_or_new_target(self, args):
keyfile = self._find_key_file_from_environment()
@ -690,10 +697,14 @@ class KeyfileKey(ID_HMAC_SHA_256, FlexiKeyBase):
pass
def get_new_target(self, args):
keyfile = self._find_key_file_from_environment()
if keyfile is not None:
return keyfile
return self._get_new_target_in_keys_dir(args)
if self.STORAGE == KeyBlobStorage.KEYFILE:
keyfile = self._find_key_file_from_environment()
if keyfile is not None:
return keyfile
return self._get_new_target_in_keys_dir(args)
if self.STORAGE == KeyBlobStorage.REPO:
return self.repository
def _find_key_file_from_environment(self):
keyfile = os.environ.get('BORG_KEY_FILE')
@ -710,89 +721,93 @@ class KeyfileKey(ID_HMAC_SHA_256, FlexiKeyBase):
return path
def load(self, target, passphrase):
with open(target) as fd:
key_data = ''.join(fd.readlines()[1:])
success = self._load(key_data, passphrase)
if success:
self.target = target
return success
if self.STORAGE == KeyBlobStorage.KEYFILE:
with open(target) as fd:
key_data = ''.join(fd.readlines()[1:])
success = self._load(key_data, passphrase)
if success:
self.target = target
return success
if self.STORAGE == KeyBlobStorage.REPO:
# While the repository is encrypted, we consider a repokey repository with a blank
# passphrase an unencrypted repository.
self.logically_encrypted = passphrase != ''
# what we get in target is just a repo location, but we already have the repo obj:
target = self.repository
key_data = target.load_key()
key_data = key_data.decode('utf-8') # remote repo: msgpack issue #99, getting bytes
success = self._load(key_data, passphrase)
if success:
self.target = target
return success
def save(self, target, passphrase, create=False):
if create and os.path.isfile(target):
# if a new keyfile key repository is created, ensure that an existing keyfile of another
# keyfile key repo is not accidentally overwritten by careless use of the BORG_KEY_FILE env var.
# see issue #6036
raise Error('Aborting because key in "%s" already exists.' % target)
key_data = self._save(passphrase)
with SaveFile(target) as fd:
fd.write(f'{self.FILE_ID} {bin_to_hex(self.repository_id)}\n')
fd.write(key_data)
fd.write('\n')
self.target = target
if self.STORAGE == KeyBlobStorage.KEYFILE:
if create and os.path.isfile(target):
# if a new keyfile key repository is created, ensure that an existing keyfile of another
# keyfile key repo is not accidentally overwritten by careless use of the BORG_KEY_FILE env var.
# see issue #6036
raise Error('Aborting because key in "%s" already exists.' % target)
key_data = self._save(passphrase)
with SaveFile(target) as fd:
fd.write(f'{self.FILE_ID} {bin_to_hex(self.repository_id)}\n')
fd.write(key_data)
fd.write('\n')
self.target = target
if self.STORAGE == KeyBlobStorage.REPO:
self.logically_encrypted = passphrase != ''
key_data = self._save(passphrase)
key_data = key_data.encode('utf-8') # remote repo: msgpack issue #99, giving bytes
target.save_key(key_data)
self.target = target
class RepoKey(ID_HMAC_SHA_256, FlexiKeyBase):
TYPE = 0x03
TYPES_ACCEPTABLE = {TYPE, PassphraseKey.TYPE}
class PassphraseKey:
# this is only a stub, repos with this mode could not be created any more since borg 1.0, see #97.
# in borg 1.3 all of its code and also the "borg key migrate-to-repokey" command was removed.
# if you still need to, you can use "borg key migrate-to-repokey" with borg 1.0, 1.1 and 1.2.
# Nowadays, we just dispatch this to RepoKey and assume the passphrase was migrated to a repokey.
TYPE = KeyType.PASSPHRASE
NAME = 'passphrase'
class KeyfileKey(FlexiKey):
TYPE = KeyType.KEYFILE
NAME = 'key file'
ARG_NAME = 'keyfile'
STORAGE = KeyBlobStorage.KEYFILE
class RepoKey(FlexiKey):
TYPE = KeyType.REPO
NAME = 'repokey'
ARG_NAME = 'repokey'
STORAGE = KeyBlobStorage.REPO
def find_key(self):
loc = self.repository._location.canonical_path()
try:
self.repository.load_key()
return loc
except configparser.NoOptionError:
raise RepoKeyNotFoundError(loc) from None
def get_new_target(self, args):
return self.repository
def load(self, target, passphrase):
# While the repository is encrypted, we consider a repokey repository with a blank
# passphrase an unencrypted repository.
self.logically_encrypted = passphrase != ''
# what we get in target is just a repo location, but we already have the repo obj:
target = self.repository
key_data = target.load_key()
key_data = key_data.decode('utf-8') # remote repo: msgpack issue #99, getting bytes
success = self._load(key_data, passphrase)
if success:
self.target = target
return success
def save(self, target, passphrase, create=False):
self.logically_encrypted = passphrase != ''
key_data = self._save(passphrase)
key_data = key_data.encode('utf-8') # remote repo: msgpack issue #99, giving bytes
target.save_key(key_data)
self.target = target
class Blake2FlexiKey(ID_BLAKE2b_256, FlexiKey):
TYPES_ACCEPTABLE = {KeyType.BLAKE2KEYFILE, KeyType.BLAKE2REPO}
CIPHERSUITE = AES256_CTR_BLAKE2b
class Blake2KeyfileKey(ID_BLAKE2b_256, KeyfileKey):
TYPE = 0x04
TYPES_ACCEPTABLE = {0x04}
class Blake2KeyfileKey(Blake2FlexiKey):
TYPE = KeyType.BLAKE2KEYFILE
NAME = 'key file BLAKE2b'
ARG_NAME = 'keyfile-blake2'
STORAGE = KeyBlobStorage.KEYFILE
FILE_ID = 'BORG_KEY'
CIPHERSUITE = AES256_CTR_BLAKE2b
class Blake2RepoKey(ID_BLAKE2b_256, RepoKey):
TYPE = 0x05
TYPES_ACCEPTABLE = {TYPE}
class Blake2RepoKey(Blake2FlexiKey):
TYPE = KeyType.BLAKE2REPO
NAME = 'repokey BLAKE2b'
ARG_NAME = 'repokey-blake2'
STORAGE = KeyBlobStorage.REPO
CIPHERSUITE = AES256_CTR_BLAKE2b
class AuthenticatedKeyBase(RepoKey):
class AuthenticatedKeyBase(FlexiKey):
STORAGE = KeyBlobStorage.REPO
# It's only authenticated, not encrypted.
@ -808,7 +823,7 @@ class AuthenticatedKeyBase(RepoKey):
self.logically_encrypted = False
def init_ciphers(self, manifest_data=None):
if manifest_data is not None and manifest_data[0] != self.TYPE:
if manifest_data is not None and manifest_data[0] not in self.TYPES_ACCEPTABLE:
raise IntegrityError('Manifest: Invalid encryption envelope')
def encrypt(self, chunk):
@ -816,7 +831,7 @@ class AuthenticatedKeyBase(RepoKey):
return b''.join([self.TYPE_STR, data])
def decrypt(self, id, data, decompress=True):
if data[0] != self.TYPE:
if data[0] not in self.TYPES_ACCEPTABLE:
id_str = bin_to_hex(id) if id is not None else '(unknown)'
raise IntegrityError('Chunk %s: Invalid envelope' % id_str)
payload = memoryview(data)[1:]
@ -828,14 +843,14 @@ class AuthenticatedKeyBase(RepoKey):
class AuthenticatedKey(AuthenticatedKeyBase):
TYPE = 0x07
TYPE = KeyType.AUTHENTICATED
TYPES_ACCEPTABLE = {TYPE}
NAME = 'authenticated'
ARG_NAME = 'authenticated'
class Blake2AuthenticatedKey(ID_BLAKE2b_256, AuthenticatedKeyBase):
TYPE = 0x06
TYPE = KeyType.BLAKE2AUTHENTICATED
TYPES_ACCEPTABLE = {TYPE}
NAME = 'authenticated BLAKE2b'
ARG_NAME = 'authenticated-blake2'

View File

@ -184,7 +184,9 @@ class TestKey:
def _corrupt_byte(self, key, data, offset):
data = bytearray(data)
data[offset] ^= 1
# note: we corrupt in a way so that even corruption of the unauthenticated encryption type byte
# will trigger an IntegrityError (does not happen while we stay within TYPES_ACCEPTABLE).
data[offset] ^= 64
with pytest.raises(IntegrityErrorBase):
key.decrypt(b'', data)