1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2025-02-22 14:11:27 +00:00

move passphrase related stuff to borg.helpers.passphrase

This commit is contained in:
Thomas Waldmann 2022-03-06 21:25:43 +01:00
parent 8011fade91
commit 766d976f46
4 changed files with 187 additions and 174 deletions

View file

@ -1,13 +1,9 @@
import configparser import configparser
import getpass
import hmac import hmac
import os import os
import shlex
import sys
import textwrap import textwrap
import subprocess
from binascii import a2b_base64, b2a_base64, hexlify from binascii import a2b_base64, b2a_base64, hexlify
from hashlib import sha256, sha512, pbkdf2_hmac from hashlib import sha256
from ..logger import create_logger from ..logger import create_logger
@ -17,11 +13,10 @@
from ..compress import Compressor from ..compress import Compressor
from ..helpers import StableDict from ..helpers import StableDict
from ..helpers import Error, IntegrityError from ..helpers import Error, IntegrityError
from ..helpers import yes
from ..helpers import get_keys_dir, get_security_dir from ..helpers import get_keys_dir, get_security_dir
from ..helpers import get_limited_unpacker from ..helpers import get_limited_unpacker
from ..helpers import bin_to_hex from ..helpers import bin_to_hex
from ..helpers import prepare_subprocess_env from ..helpers.passphrase import Passphrase, PasswordRetriesExceeded, PassphraseWrong
from ..helpers import msgpack from ..helpers import msgpack
from ..item import Key, EncryptedKey from ..item import Key, EncryptedKey
from ..platform import SaveFile from ..platform import SaveFile
@ -31,22 +26,6 @@
from .low_level import AES256_CTR_HMAC_SHA256, AES256_CTR_BLAKE2b from .low_level import AES256_CTR_HMAC_SHA256, AES256_CTR_BLAKE2b
class NoPassphraseFailure(Error):
"""can not acquire a passphrase: {}"""
class PassphraseWrong(Error):
"""passphrase supplied in BORG_PASSPHRASE, by BORG_PASSCOMMAND or via BORG_PASSPHRASE_FD is incorrect."""
class PasscommandFailure(Error):
"""passcommand supplied in BORG_PASSCOMMAND failed: {}"""
class PasswordRetriesExceeded(Error):
"""exceeded the maximum password retries"""
class UnsupportedPayloadError(Error): class UnsupportedPayloadError(Error):
"""Unsupported payload type {}. A newer version is required to access this repository.""" """Unsupported payload type {}. A newer version is required to access this repository."""
@ -420,115 +399,6 @@ def init_ciphers(self, manifest_data=None):
self.nonce_manager = NonceManager(self.repository, nonce) self.nonce_manager = NonceManager(self.repository, nonce)
class Passphrase(str):
@classmethod
def _env_passphrase(cls, env_var, default=None):
passphrase = os.environ.get(env_var, default)
if passphrase is not None:
return cls(passphrase)
@classmethod
def env_passphrase(cls, default=None):
passphrase = cls._env_passphrase('BORG_PASSPHRASE', default)
if passphrase is not None:
return passphrase
passphrase = cls.env_passcommand()
if passphrase is not None:
return passphrase
passphrase = cls.fd_passphrase()
if passphrase is not None:
return passphrase
@classmethod
def env_passcommand(cls, default=None):
passcommand = os.environ.get('BORG_PASSCOMMAND', None)
if passcommand is not None:
# passcommand is a system command (not inside pyinstaller env)
env = prepare_subprocess_env(system=True)
try:
passphrase = subprocess.check_output(shlex.split(passcommand), universal_newlines=True, env=env)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
raise PasscommandFailure(e)
return cls(passphrase.rstrip('\n'))
@classmethod
def fd_passphrase(cls):
try:
fd = int(os.environ.get('BORG_PASSPHRASE_FD'))
except (ValueError, TypeError):
return None
with os.fdopen(fd, mode='r') as f:
passphrase = f.read()
return cls(passphrase.rstrip('\n'))
@classmethod
def env_new_passphrase(cls, default=None):
return cls._env_passphrase('BORG_NEW_PASSPHRASE', default)
@classmethod
def getpass(cls, prompt):
try:
pw = getpass.getpass(prompt)
except EOFError:
if prompt:
print() # avoid err msg appearing right of prompt
msg = []
for env_var in 'BORG_PASSPHRASE', 'BORG_PASSCOMMAND':
env_var_set = os.environ.get(env_var) is not None
msg.append('{} is {}.'.format(env_var, 'set' if env_var_set else 'not set'))
msg.append('Interactive password query failed.')
raise NoPassphraseFailure(' '.join(msg)) from None
else:
return cls(pw)
@classmethod
def verification(cls, passphrase):
msg = 'Do you want your passphrase to be displayed for verification? [yN]: '
if yes(msg, retry_msg=msg, invalid_msg='Invalid answer, try again.',
retry=True, env_var_override='BORG_DISPLAY_PASSPHRASE'):
print('Your passphrase (between double-quotes): "%s"' % passphrase,
file=sys.stderr)
print('Make sure the passphrase displayed above is exactly what you wanted.',
file=sys.stderr)
try:
passphrase.encode('ascii')
except UnicodeEncodeError:
print('Your passphrase (UTF-8 encoding in hex): %s' %
bin_to_hex(passphrase.encode('utf-8')),
file=sys.stderr)
print('As you have a non-ASCII passphrase, it is recommended to keep the UTF-8 encoding in hex together with the passphrase at a safe place.',
file=sys.stderr)
@classmethod
def new(cls, allow_empty=False):
passphrase = cls.env_new_passphrase()
if passphrase is not None:
return passphrase
passphrase = cls.env_passphrase()
if passphrase is not None:
return passphrase
for retry in range(1, 11):
passphrase = cls.getpass('Enter new passphrase: ')
if allow_empty or passphrase:
passphrase2 = cls.getpass('Enter same passphrase again: ')
if passphrase == passphrase2:
cls.verification(passphrase)
logger.info('Remember your passphrase. Your data will be inaccessible without it.')
return passphrase
else:
print('Passphrases do not match', file=sys.stderr)
else:
print('Passphrase must not be blank', file=sys.stderr)
else:
raise PasswordRetriesExceeded
def __repr__(self):
return '<Passphrase "***hidden***">'
def kdf(self, salt, iterations, length):
return pbkdf2_hmac('sha256', self.encode('utf-8'), salt, iterations, length)
class FlexiKeyBase(AESKeyBase): class FlexiKeyBase(AESKeyBase):
@classmethod @classmethod
def detect(cls, repository, manifest_data): def detect(cls, repository, manifest_data):

View file

@ -0,0 +1,141 @@
import getpass
import os
import shlex
import subprocess
import sys
from hashlib import pbkdf2_hmac
from . import bin_to_hex
from . import Error
from . import yes
from . import prepare_subprocess_env
from ..logger import create_logger
logger = create_logger()
class NoPassphraseFailure(Error):
"""can not acquire a passphrase: {}"""
class PassphraseWrong(Error):
"""passphrase supplied in BORG_PASSPHRASE, by BORG_PASSCOMMAND or via BORG_PASSPHRASE_FD is incorrect."""
class PasscommandFailure(Error):
"""passcommand supplied in BORG_PASSCOMMAND failed: {}"""
class PasswordRetriesExceeded(Error):
"""exceeded the maximum password retries"""
class Passphrase(str):
@classmethod
def _env_passphrase(cls, env_var, default=None):
passphrase = os.environ.get(env_var, default)
if passphrase is not None:
return cls(passphrase)
@classmethod
def env_passphrase(cls, default=None):
passphrase = cls._env_passphrase('BORG_PASSPHRASE', default)
if passphrase is not None:
return passphrase
passphrase = cls.env_passcommand()
if passphrase is not None:
return passphrase
passphrase = cls.fd_passphrase()
if passphrase is not None:
return passphrase
@classmethod
def env_passcommand(cls, default=None):
passcommand = os.environ.get('BORG_PASSCOMMAND', None)
if passcommand is not None:
# passcommand is a system command (not inside pyinstaller env)
env = prepare_subprocess_env(system=True)
try:
passphrase = subprocess.check_output(shlex.split(passcommand), universal_newlines=True, env=env)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
raise PasscommandFailure(e)
return cls(passphrase.rstrip('\n'))
@classmethod
def fd_passphrase(cls):
try:
fd = int(os.environ.get('BORG_PASSPHRASE_FD'))
except (ValueError, TypeError):
return None
with os.fdopen(fd, mode='r') as f:
passphrase = f.read()
return cls(passphrase.rstrip('\n'))
@classmethod
def env_new_passphrase(cls, default=None):
return cls._env_passphrase('BORG_NEW_PASSPHRASE', default)
@classmethod
def getpass(cls, prompt):
try:
pw = getpass.getpass(prompt)
except EOFError:
if prompt:
print() # avoid err msg appearing right of prompt
msg = []
for env_var in 'BORG_PASSPHRASE', 'BORG_PASSCOMMAND':
env_var_set = os.environ.get(env_var) is not None
msg.append('{} is {}.'.format(env_var, 'set' if env_var_set else 'not set'))
msg.append('Interactive password query failed.')
raise NoPassphraseFailure(' '.join(msg)) from None
else:
return cls(pw)
@classmethod
def verification(cls, passphrase):
msg = 'Do you want your passphrase to be displayed for verification? [yN]: '
if yes(msg, retry_msg=msg, invalid_msg='Invalid answer, try again.',
retry=True, env_var_override='BORG_DISPLAY_PASSPHRASE'):
print('Your passphrase (between double-quotes): "%s"' % passphrase,
file=sys.stderr)
print('Make sure the passphrase displayed above is exactly what you wanted.',
file=sys.stderr)
try:
passphrase.encode('ascii')
except UnicodeEncodeError:
print('Your passphrase (UTF-8 encoding in hex): %s' %
bin_to_hex(passphrase.encode('utf-8')),
file=sys.stderr)
print('As you have a non-ASCII passphrase, it is recommended to keep the '
'UTF-8 encoding in hex together with the passphrase at a safe place.',
file=sys.stderr)
@classmethod
def new(cls, allow_empty=False):
passphrase = cls.env_new_passphrase()
if passphrase is not None:
return passphrase
passphrase = cls.env_passphrase()
if passphrase is not None:
return passphrase
for retry in range(1, 11):
passphrase = cls.getpass('Enter new passphrase: ')
if allow_empty or passphrase:
passphrase2 = cls.getpass('Enter same passphrase again: ')
if passphrase == passphrase2:
cls.verification(passphrase)
logger.info('Remember your passphrase. Your data will be inaccessible without it.')
return passphrase
else:
print('Passphrases do not match', file=sys.stderr)
else:
print('Passphrase must not be blank', file=sys.stderr)
else:
raise PasswordRetriesExceeded
def __repr__(self):
return '<Passphrase "***hidden***">'
def kdf(self, salt, iterations, length):
return pbkdf2_hmac('sha256', self.encode('utf-8'), salt, iterations, length)

View file

@ -1,4 +1,5 @@
import errno import errno
import getpass
import hashlib import hashlib
import os import os
import shutil import shutil
@ -32,6 +33,7 @@
from ..helpers import iter_separated from ..helpers import iter_separated
from ..helpers import eval_escapes from ..helpers import eval_escapes
from ..helpers import safe_unlink from ..helpers import safe_unlink
from ..helpers.passphrase import Passphrase, PasswordRetriesExceeded
from . import BaseTestCase, FakeInputs from . import BaseTestCase, FakeInputs
@ -1164,3 +1166,44 @@ def os_unlink(_):
safe_unlink(hard_link) safe_unlink(hard_link)
assert victim.read_binary() == contents assert victim.read_binary() == contents
class TestPassphrase:
def test_passphrase_new_verification(self, capsys, monkeypatch):
monkeypatch.setattr(getpass, 'getpass', lambda prompt: "12aöäü")
monkeypatch.setenv('BORG_DISPLAY_PASSPHRASE', 'no')
Passphrase.new()
out, err = capsys.readouterr()
assert "12" not in out
assert "12" not in err
monkeypatch.setenv('BORG_DISPLAY_PASSPHRASE', 'yes')
passphrase = Passphrase.new()
out, err = capsys.readouterr()
assert "313261c3b6c3a4c3bc" not in out
assert "313261c3b6c3a4c3bc" in err
assert passphrase == "12aöäü"
monkeypatch.setattr(getpass, 'getpass', lambda prompt: "1234/@=")
Passphrase.new()
out, err = capsys.readouterr()
assert "1234/@=" not in out
assert "1234/@=" in err
def test_passphrase_new_empty(self, capsys, monkeypatch):
monkeypatch.delenv('BORG_PASSPHRASE', False)
monkeypatch.setattr(getpass, 'getpass', lambda prompt: "")
with pytest.raises(PasswordRetriesExceeded):
Passphrase.new(allow_empty=False)
out, err = capsys.readouterr()
assert "must not be blank" in err
def test_passphrase_new_retries(self, monkeypatch):
monkeypatch.delenv('BORG_PASSPHRASE', False)
ascending_numbers = iter(range(20))
monkeypatch.setattr(getpass, 'getpass', lambda prompt: str(next(ascending_numbers)))
with pytest.raises(PasswordRetriesExceeded):
Passphrase.new()
def test_passphrase_repr(self):
assert "secret" not in repr(Passphrase("secret"))

View file

@ -6,7 +6,7 @@
import pytest import pytest
from ..crypto.key import Passphrase, PasswordRetriesExceeded, bin_to_hex from ..crypto.key import bin_to_hex
from ..crypto.key import PlaintextKey, AuthenticatedKey, RepoKey, KeyfileKey, \ from ..crypto.key import PlaintextKey, AuthenticatedKey, RepoKey, KeyfileKey, \
Blake2KeyfileKey, Blake2RepoKey, Blake2AuthenticatedKey Blake2KeyfileKey, Blake2RepoKey, Blake2AuthenticatedKey
from ..crypto.key import ID_HMAC_SHA_256, ID_BLAKE2b_256 from ..crypto.key import ID_HMAC_SHA_256, ID_BLAKE2b_256
@ -255,47 +255,6 @@ def test_blake2_authenticated_encrypt(self, monkeypatch):
assert authenticated == b'\x06\x00\x00' + plaintext assert authenticated == b'\x06\x00\x00' + plaintext
class TestPassphrase:
def test_passphrase_new_verification(self, capsys, monkeypatch):
monkeypatch.setattr(getpass, 'getpass', lambda prompt: "12aöäü")
monkeypatch.setenv('BORG_DISPLAY_PASSPHRASE', 'no')
Passphrase.new()
out, err = capsys.readouterr()
assert "12" not in out
assert "12" not in err
monkeypatch.setenv('BORG_DISPLAY_PASSPHRASE', 'yes')
passphrase = Passphrase.new()
out, err = capsys.readouterr()
assert "313261c3b6c3a4c3bc" not in out
assert "313261c3b6c3a4c3bc" in err
assert passphrase == "12aöäü"
monkeypatch.setattr(getpass, 'getpass', lambda prompt: "1234/@=")
Passphrase.new()
out, err = capsys.readouterr()
assert "1234/@=" not in out
assert "1234/@=" in err
def test_passphrase_new_empty(self, capsys, monkeypatch):
monkeypatch.delenv('BORG_PASSPHRASE', False)
monkeypatch.setattr(getpass, 'getpass', lambda prompt: "")
with pytest.raises(PasswordRetriesExceeded):
Passphrase.new(allow_empty=False)
out, err = capsys.readouterr()
assert "must not be blank" in err
def test_passphrase_new_retries(self, monkeypatch):
monkeypatch.delenv('BORG_PASSPHRASE', False)
ascending_numbers = iter(range(20))
monkeypatch.setattr(getpass, 'getpass', lambda prompt: str(next(ascending_numbers)))
with pytest.raises(PasswordRetriesExceeded):
Passphrase.new()
def test_passphrase_repr(self):
assert "secret" not in repr(Passphrase("secret"))
class TestTAM: class TestTAM:
@pytest.fixture @pytest.fixture
def key(self, monkeypatch): def key(self, monkeypatch):