From 766d976f46ea139dd76e9e8cfba51d43babeba9a Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sun, 6 Mar 2022 21:25:43 +0100 Subject: [PATCH] move passphrase related stuff to borg.helpers.passphrase --- src/borg/crypto/key.py | 134 +------------------------------ src/borg/helpers/passphrase.py | 141 +++++++++++++++++++++++++++++++++ src/borg/testsuite/helpers.py | 43 ++++++++++ src/borg/testsuite/key.py | 43 +--------- 4 files changed, 187 insertions(+), 174 deletions(-) create mode 100644 src/borg/helpers/passphrase.py diff --git a/src/borg/crypto/key.py b/src/borg/crypto/key.py index 46c81dc62..6511f6463 100644 --- a/src/borg/crypto/key.py +++ b/src/borg/crypto/key.py @@ -1,13 +1,9 @@ import configparser -import getpass import hmac import os -import shlex -import sys import textwrap -import subprocess from binascii import a2b_base64, b2a_base64, hexlify -from hashlib import sha256, sha512, pbkdf2_hmac +from hashlib import sha256 from ..logger import create_logger @@ -17,11 +13,10 @@ from ..compress import Compressor from ..helpers import StableDict from ..helpers import Error, IntegrityError -from ..helpers import yes from ..helpers import get_keys_dir, get_security_dir from ..helpers import get_limited_unpacker from ..helpers import bin_to_hex -from ..helpers import prepare_subprocess_env +from ..helpers.passphrase import Passphrase, PasswordRetriesExceeded, PassphraseWrong from ..helpers import msgpack from ..item import Key, EncryptedKey from ..platform import SaveFile @@ -31,22 +26,6 @@ from .low_level import AES256_CTR_HMAC_SHA256, AES256_CTR_BLAKE2b -class NoPassphraseFailure(Error): - """can not acquire a passphrase: {}""" - - -class PassphraseWrong(Error): - """passphrase supplied in BORG_PASSPHRASE, by BORG_PASSCOMMAND or via BORG_PASSPHRASE_FD is incorrect.""" - - -class PasscommandFailure(Error): - """passcommand supplied in BORG_PASSCOMMAND failed: {}""" - - -class PasswordRetriesExceeded(Error): - """exceeded the maximum password retries""" - - class UnsupportedPayloadError(Error): """Unsupported payload type {}. A newer version is required to access this repository.""" @@ -420,115 +399,6 @@ def init_ciphers(self, manifest_data=None): self.nonce_manager = NonceManager(self.repository, nonce) -class Passphrase(str): - @classmethod - def _env_passphrase(cls, env_var, default=None): - passphrase = os.environ.get(env_var, default) - if passphrase is not None: - return cls(passphrase) - - @classmethod - def env_passphrase(cls, default=None): - passphrase = cls._env_passphrase('BORG_PASSPHRASE', default) - if passphrase is not None: - return passphrase - passphrase = cls.env_passcommand() - if passphrase is not None: - return passphrase - passphrase = cls.fd_passphrase() - if passphrase is not None: - return passphrase - - @classmethod - def env_passcommand(cls, default=None): - passcommand = os.environ.get('BORG_PASSCOMMAND', None) - if passcommand is not None: - # passcommand is a system command (not inside pyinstaller env) - env = prepare_subprocess_env(system=True) - try: - passphrase = subprocess.check_output(shlex.split(passcommand), universal_newlines=True, env=env) - except (subprocess.CalledProcessError, FileNotFoundError) as e: - raise PasscommandFailure(e) - return cls(passphrase.rstrip('\n')) - - @classmethod - def fd_passphrase(cls): - try: - fd = int(os.environ.get('BORG_PASSPHRASE_FD')) - except (ValueError, TypeError): - return None - with os.fdopen(fd, mode='r') as f: - passphrase = f.read() - return cls(passphrase.rstrip('\n')) - - @classmethod - def env_new_passphrase(cls, default=None): - return cls._env_passphrase('BORG_NEW_PASSPHRASE', default) - - @classmethod - def getpass(cls, prompt): - try: - pw = getpass.getpass(prompt) - except EOFError: - if prompt: - print() # avoid err msg appearing right of prompt - msg = [] - for env_var in 'BORG_PASSPHRASE', 'BORG_PASSCOMMAND': - env_var_set = os.environ.get(env_var) is not None - msg.append('{} is {}.'.format(env_var, 'set' if env_var_set else 'not set')) - msg.append('Interactive password query failed.') - raise NoPassphraseFailure(' '.join(msg)) from None - else: - return cls(pw) - - @classmethod - def verification(cls, passphrase): - msg = 'Do you want your passphrase to be displayed for verification? [yN]: ' - if yes(msg, retry_msg=msg, invalid_msg='Invalid answer, try again.', - retry=True, env_var_override='BORG_DISPLAY_PASSPHRASE'): - print('Your passphrase (between double-quotes): "%s"' % passphrase, - file=sys.stderr) - print('Make sure the passphrase displayed above is exactly what you wanted.', - file=sys.stderr) - try: - passphrase.encode('ascii') - except UnicodeEncodeError: - print('Your passphrase (UTF-8 encoding in hex): %s' % - bin_to_hex(passphrase.encode('utf-8')), - file=sys.stderr) - print('As you have a non-ASCII passphrase, it is recommended to keep the UTF-8 encoding in hex together with the passphrase at a safe place.', - file=sys.stderr) - - @classmethod - def new(cls, allow_empty=False): - passphrase = cls.env_new_passphrase() - if passphrase is not None: - return passphrase - passphrase = cls.env_passphrase() - if passphrase is not None: - return passphrase - for retry in range(1, 11): - passphrase = cls.getpass('Enter new passphrase: ') - if allow_empty or passphrase: - passphrase2 = cls.getpass('Enter same passphrase again: ') - if passphrase == passphrase2: - cls.verification(passphrase) - logger.info('Remember your passphrase. Your data will be inaccessible without it.') - return passphrase - else: - print('Passphrases do not match', file=sys.stderr) - else: - print('Passphrase must not be blank', file=sys.stderr) - else: - raise PasswordRetriesExceeded - - def __repr__(self): - return '' - - def kdf(self, salt, iterations, length): - return pbkdf2_hmac('sha256', self.encode('utf-8'), salt, iterations, length) - - class FlexiKeyBase(AESKeyBase): @classmethod def detect(cls, repository, manifest_data): diff --git a/src/borg/helpers/passphrase.py b/src/borg/helpers/passphrase.py new file mode 100644 index 000000000..e415a0b76 --- /dev/null +++ b/src/borg/helpers/passphrase.py @@ -0,0 +1,141 @@ +import getpass +import os +import shlex +import subprocess +import sys +from hashlib import pbkdf2_hmac + +from . import bin_to_hex +from . import Error +from . import yes +from . import prepare_subprocess_env + +from ..logger import create_logger + +logger = create_logger() + + +class NoPassphraseFailure(Error): + """can not acquire a passphrase: {}""" + + +class PassphraseWrong(Error): + """passphrase supplied in BORG_PASSPHRASE, by BORG_PASSCOMMAND or via BORG_PASSPHRASE_FD is incorrect.""" + + +class PasscommandFailure(Error): + """passcommand supplied in BORG_PASSCOMMAND failed: {}""" + + +class PasswordRetriesExceeded(Error): + """exceeded the maximum password retries""" + + +class Passphrase(str): + @classmethod + def _env_passphrase(cls, env_var, default=None): + passphrase = os.environ.get(env_var, default) + if passphrase is not None: + return cls(passphrase) + + @classmethod + def env_passphrase(cls, default=None): + passphrase = cls._env_passphrase('BORG_PASSPHRASE', default) + if passphrase is not None: + return passphrase + passphrase = cls.env_passcommand() + if passphrase is not None: + return passphrase + passphrase = cls.fd_passphrase() + if passphrase is not None: + return passphrase + + @classmethod + def env_passcommand(cls, default=None): + passcommand = os.environ.get('BORG_PASSCOMMAND', None) + if passcommand is not None: + # passcommand is a system command (not inside pyinstaller env) + env = prepare_subprocess_env(system=True) + try: + passphrase = subprocess.check_output(shlex.split(passcommand), universal_newlines=True, env=env) + except (subprocess.CalledProcessError, FileNotFoundError) as e: + raise PasscommandFailure(e) + return cls(passphrase.rstrip('\n')) + + @classmethod + def fd_passphrase(cls): + try: + fd = int(os.environ.get('BORG_PASSPHRASE_FD')) + except (ValueError, TypeError): + return None + with os.fdopen(fd, mode='r') as f: + passphrase = f.read() + return cls(passphrase.rstrip('\n')) + + @classmethod + def env_new_passphrase(cls, default=None): + return cls._env_passphrase('BORG_NEW_PASSPHRASE', default) + + @classmethod + def getpass(cls, prompt): + try: + pw = getpass.getpass(prompt) + except EOFError: + if prompt: + print() # avoid err msg appearing right of prompt + msg = [] + for env_var in 'BORG_PASSPHRASE', 'BORG_PASSCOMMAND': + env_var_set = os.environ.get(env_var) is not None + msg.append('{} is {}.'.format(env_var, 'set' if env_var_set else 'not set')) + msg.append('Interactive password query failed.') + raise NoPassphraseFailure(' '.join(msg)) from None + else: + return cls(pw) + + @classmethod + def verification(cls, passphrase): + msg = 'Do you want your passphrase to be displayed for verification? [yN]: ' + if yes(msg, retry_msg=msg, invalid_msg='Invalid answer, try again.', + retry=True, env_var_override='BORG_DISPLAY_PASSPHRASE'): + print('Your passphrase (between double-quotes): "%s"' % passphrase, + file=sys.stderr) + print('Make sure the passphrase displayed above is exactly what you wanted.', + file=sys.stderr) + try: + passphrase.encode('ascii') + except UnicodeEncodeError: + print('Your passphrase (UTF-8 encoding in hex): %s' % + bin_to_hex(passphrase.encode('utf-8')), + file=sys.stderr) + print('As you have a non-ASCII passphrase, it is recommended to keep the ' + 'UTF-8 encoding in hex together with the passphrase at a safe place.', + file=sys.stderr) + + @classmethod + def new(cls, allow_empty=False): + passphrase = cls.env_new_passphrase() + if passphrase is not None: + return passphrase + passphrase = cls.env_passphrase() + if passphrase is not None: + return passphrase + for retry in range(1, 11): + passphrase = cls.getpass('Enter new passphrase: ') + if allow_empty or passphrase: + passphrase2 = cls.getpass('Enter same passphrase again: ') + if passphrase == passphrase2: + cls.verification(passphrase) + logger.info('Remember your passphrase. Your data will be inaccessible without it.') + return passphrase + else: + print('Passphrases do not match', file=sys.stderr) + else: + print('Passphrase must not be blank', file=sys.stderr) + else: + raise PasswordRetriesExceeded + + def __repr__(self): + return '' + + def kdf(self, salt, iterations, length): + return pbkdf2_hmac('sha256', self.encode('utf-8'), salt, iterations, length) diff --git a/src/borg/testsuite/helpers.py b/src/borg/testsuite/helpers.py index 5a5df302e..1dd66d02e 100644 --- a/src/borg/testsuite/helpers.py +++ b/src/borg/testsuite/helpers.py @@ -1,4 +1,5 @@ import errno +import getpass import hashlib import os import shutil @@ -32,6 +33,7 @@ from ..helpers import iter_separated from ..helpers import eval_escapes from ..helpers import safe_unlink +from ..helpers.passphrase import Passphrase, PasswordRetriesExceeded from . import BaseTestCase, FakeInputs @@ -1164,3 +1166,44 @@ def os_unlink(_): safe_unlink(hard_link) assert victim.read_binary() == contents + + +class TestPassphrase: + def test_passphrase_new_verification(self, capsys, monkeypatch): + monkeypatch.setattr(getpass, 'getpass', lambda prompt: "12aöäü") + monkeypatch.setenv('BORG_DISPLAY_PASSPHRASE', 'no') + Passphrase.new() + out, err = capsys.readouterr() + assert "12" not in out + assert "12" not in err + + monkeypatch.setenv('BORG_DISPLAY_PASSPHRASE', 'yes') + passphrase = Passphrase.new() + out, err = capsys.readouterr() + assert "313261c3b6c3a4c3bc" not in out + assert "313261c3b6c3a4c3bc" in err + assert passphrase == "12aöäü" + + monkeypatch.setattr(getpass, 'getpass', lambda prompt: "1234/@=") + Passphrase.new() + out, err = capsys.readouterr() + assert "1234/@=" not in out + assert "1234/@=" in err + + def test_passphrase_new_empty(self, capsys, monkeypatch): + monkeypatch.delenv('BORG_PASSPHRASE', False) + monkeypatch.setattr(getpass, 'getpass', lambda prompt: "") + with pytest.raises(PasswordRetriesExceeded): + Passphrase.new(allow_empty=False) + out, err = capsys.readouterr() + assert "must not be blank" in err + + def test_passphrase_new_retries(self, monkeypatch): + monkeypatch.delenv('BORG_PASSPHRASE', False) + ascending_numbers = iter(range(20)) + monkeypatch.setattr(getpass, 'getpass', lambda prompt: str(next(ascending_numbers))) + with pytest.raises(PasswordRetriesExceeded): + Passphrase.new() + + def test_passphrase_repr(self): + assert "secret" not in repr(Passphrase("secret")) diff --git a/src/borg/testsuite/key.py b/src/borg/testsuite/key.py index 6254cc6e3..6d2207a9e 100644 --- a/src/borg/testsuite/key.py +++ b/src/borg/testsuite/key.py @@ -6,7 +6,7 @@ import pytest -from ..crypto.key import Passphrase, PasswordRetriesExceeded, bin_to_hex +from ..crypto.key import bin_to_hex from ..crypto.key import PlaintextKey, AuthenticatedKey, RepoKey, KeyfileKey, \ Blake2KeyfileKey, Blake2RepoKey, Blake2AuthenticatedKey from ..crypto.key import ID_HMAC_SHA_256, ID_BLAKE2b_256 @@ -255,47 +255,6 @@ def test_blake2_authenticated_encrypt(self, monkeypatch): assert authenticated == b'\x06\x00\x00' + plaintext -class TestPassphrase: - def test_passphrase_new_verification(self, capsys, monkeypatch): - monkeypatch.setattr(getpass, 'getpass', lambda prompt: "12aöäü") - monkeypatch.setenv('BORG_DISPLAY_PASSPHRASE', 'no') - Passphrase.new() - out, err = capsys.readouterr() - assert "12" not in out - assert "12" not in err - - monkeypatch.setenv('BORG_DISPLAY_PASSPHRASE', 'yes') - passphrase = Passphrase.new() - out, err = capsys.readouterr() - assert "313261c3b6c3a4c3bc" not in out - assert "313261c3b6c3a4c3bc" in err - assert passphrase == "12aöäü" - - monkeypatch.setattr(getpass, 'getpass', lambda prompt: "1234/@=") - Passphrase.new() - out, err = capsys.readouterr() - assert "1234/@=" not in out - assert "1234/@=" in err - - def test_passphrase_new_empty(self, capsys, monkeypatch): - monkeypatch.delenv('BORG_PASSPHRASE', False) - monkeypatch.setattr(getpass, 'getpass', lambda prompt: "") - with pytest.raises(PasswordRetriesExceeded): - Passphrase.new(allow_empty=False) - out, err = capsys.readouterr() - assert "must not be blank" in err - - def test_passphrase_new_retries(self, monkeypatch): - monkeypatch.delenv('BORG_PASSPHRASE', False) - ascending_numbers = iter(range(20)) - monkeypatch.setattr(getpass, 'getpass', lambda prompt: str(next(ascending_numbers))) - with pytest.raises(PasswordRetriesExceeded): - Passphrase.new() - - def test_passphrase_repr(self): - assert "secret" not in repr(Passphrase("secret")) - - class TestTAM: @pytest.fixture def key(self, monkeypatch):