refactor: use less binascii

our own hex_to_bin / bin_to_hex is more comfortable to use.

also: optimize remaining binascii usage / imports.
This commit is contained in:
Thomas Waldmann 2024-01-17 14:24:22 +01:00
parent d9132a34d4
commit 742bd0c097
No known key found for this signature in database
GPG Key ID: 243ACFA951F78E01
10 changed files with 51 additions and 68 deletions

View File

@ -23,7 +23,6 @@ try:
import tarfile
import textwrap
import time
from binascii import unhexlify, hexlify
from contextlib import contextmanager
from datetime import datetime, timedelta
from io import TextIOWrapper
@ -53,7 +52,7 @@ try:
from .helpers import PrefixSpec, GlobSpec, CommentSpec, PathSpec, SortBySpec, FilesCacheMode
from .helpers import BaseFormatter, ItemFormatter, ArchiveFormatter
from .helpers import format_timedelta, format_file_size, parse_file_size, format_archive
from .helpers import safe_encode, remove_surrogates, bin_to_hex, prepare_dump_dict, eval_escapes
from .helpers import safe_encode, remove_surrogates, bin_to_hex, hex_to_bin, prepare_dump_dict, eval_escapes
from .helpers import interval, prune_within, prune_split, PRUNING_PATTERNS
from .helpers import timestamp, utcnow
from .helpers import get_cache_dir, os_stat
@ -1856,12 +1855,7 @@ class Archiver:
raise ValueError('Invalid value')
elif name in ['id', ]:
if check_value:
try:
bin_id = unhexlify(value)
except:
raise ValueError('Invalid value, must be 64 hex digits') from None
if len(bin_id) != 32:
raise ValueError('Invalid value, must be 64 hex digits')
hex_to_bin(value, length=32)
else:
raise ValueError('Invalid name')
@ -2098,7 +2092,7 @@ class Archiver:
wanted = args.wanted
try:
if wanted.startswith('hex:'):
wanted = unhexlify(wanted[4:])
wanted = hex_to_bin(wanted[4:])
elif wanted.startswith('str:'):
wanted = wanted[4:].encode()
else:
@ -2154,9 +2148,7 @@ class Archiver:
"""get object contents from the repository and write it into file"""
hex_id = args.id
try:
id = unhexlify(hex_id)
if len(id) != 32: # 256bit
raise ValueError("id must be 256bits or 64 hex digits")
id = hex_to_bin(hex_id, length=32)
except ValueError as err:
raise CommandError(f"object id {hex_id} is invalid [{str(err)}].")
try:
@ -2182,9 +2174,7 @@ class Archiver:
data = f.read()
hex_id = args.id
try:
id = unhexlify(hex_id)
if len(id) != 32: # 256bit
raise ValueError("id must be 256bits or 64 hex digits")
id = hex_to_bin(hex_id, length=32)
except ValueError as err:
raise CommandError(f"object id {hex_id} is invalid [{str(err)}].")
repository.put(id, data)
@ -2197,7 +2187,7 @@ class Archiver:
modified = False
for hex_id in args.ids:
try:
id = unhexlify(hex_id)
id = hex_to_bin(hex_id, length=32)
except ValueError:
print("object id %s is invalid." % hex_id)
else:
@ -2216,7 +2206,7 @@ class Archiver:
"""display refcounts for the objects with the given IDs"""
for hex_id in args.ids:
try:
id = unhexlify(hex_id)
id = hex_to_bin(hex_id, length=32)
except ValueError:
print("object id %s is invalid." % hex_id)
else:
@ -2236,7 +2226,7 @@ class Archiver:
segments=repository.segments,
compact=repository.compact,
storage_quota_use=repository.storage_quota_use,
shadow_index={hexlify(k).decode(): v for k, v in repository.shadow_index.items()}
shadow_index={bin_to_hex(k): v for k, v in repository.shadow_index.items()}
)
with dash_open(args.path, 'w') as fd:
json.dump(hints, fd, indent=4)

View File

@ -3,7 +3,6 @@ import json
import os
import shutil
import stat
from binascii import unhexlify
from collections import namedtuple
from time import perf_counter
@ -19,7 +18,7 @@ from .helpers import Location
from .helpers import Error
from .helpers import Manifest
from .helpers import get_cache_dir, get_security_dir
from .helpers import int_to_bigint, bigint_to_int, bin_to_hex, parse_stringified_list
from .helpers import int_to_bigint, bigint_to_int, bin_to_hex, hex_to_bin, parse_stringified_list
from .helpers import format_file_size
from .helpers import safe_ns
from .helpers import yes
@ -278,7 +277,7 @@ class CacheConfig:
self._config.read_file(fd)
self._check_upgrade(self.config_path)
self.id = self._config.get('cache', 'repository')
self.manifest_id = unhexlify(self._config.get('cache', 'manifest'))
self.manifest_id = hex_to_bin(self._config.get('cache', 'manifest'))
self.timestamp = self._config.get('cache', 'timestamp', fallback=None)
self.key_type = self._config.get('cache', 'key_type', fallback=None)
self.ignored_features = set(parse_stringified_list(self._config.get('cache', 'ignored_features', fallback='')))
@ -696,8 +695,8 @@ class LocalCache(CacheStatsMixin):
fns = os.listdir(archive_path)
# filenames with 64 hex digits == 256bit,
# or compact indices which are 64 hex digits + ".compact"
return {unhexlify(fn) for fn in fns if len(fn) == 64} | \
{unhexlify(fn[:64]) for fn in fns if len(fn) == 72 and fn.endswith('.compact')}
return {hex_to_bin(fn) for fn in fns if len(fn) == 64} | \
{hex_to_bin(fn[:64]) for fn in fns if len(fn) == 72 and fn.endswith('.compact')}
else:
return set()

View File

@ -7,7 +7,6 @@ import shlex
import sys
import textwrap
import subprocess
from binascii import a2b_base64, b2a_base64, hexlify
from hashlib import sha256, sha512, pbkdf2_hmac
from ..logger import create_logger
@ -690,7 +689,7 @@ class KeyfileKeyBase(AESKeyBase):
raise NotImplementedError
def _load(self, key_data, passphrase):
cdata = a2b_base64(key_data)
cdata = binascii.a2b_base64(key_data)
data = self.decrypt_key_file(cdata, passphrase)
if data:
data = msgpack.unpackb(data)
@ -747,7 +746,7 @@ class KeyfileKeyBase(AESKeyBase):
tam_required=self.tam_required,
)
data = self.encrypt_key_file(msgpack.packb(key.as_dict()), passphrase)
key_data = '\n'.join(textwrap.wrap(b2a_base64(data).decode('ascii')))
key_data = '\n'.join(textwrap.wrap(binascii.b2a_base64(data).decode('ascii')))
return key_data
def change_passphrase(self, passphrase=None):
@ -785,7 +784,7 @@ class KeyfileKey(ID_HMAC_SHA_256, KeyfileKeyBase):
def sanity_check(self, filename, id):
file_id = self.FILE_ID.encode() + b' '
repo_id = hexlify(id)
repo_id = bin_to_hex(id).encode('ascii')
with open(filename, 'rb') as fd:
# we do the magic / id check in binary mode to avoid stumbling over
# decoding errors if somebody has binary files in the keys dir for some reason.
@ -805,7 +804,7 @@ class KeyfileKey(ID_HMAC_SHA_256, KeyfileKeyBase):
raise KeyfileInvalidError(self.repository._location.canonical_path(), filename)
key_b64 = ''.join(lines[1:])
try:
key = a2b_base64(key_b64)
key = binascii.a2b_base64(key_b64)
except binascii.Error:
logger.warning(f"borg key sanity check: key line 2+ does not look like base64. [{filename}]")
raise KeyfileInvalidError(self.repository._location.canonical_path(), filename)

View File

@ -1,10 +1,9 @@
import binascii
import pkgutil
import textwrap
from binascii import unhexlify, a2b_base64, b2a_base64
from hashlib import sha256
from ..helpers import Manifest, NoManifestError, Error, yes, bin_to_hex, dash_open
from ..helpers import Manifest, NoManifestError, Error, yes, bin_to_hex, hex_to_bin, dash_open
from ..repository import Repository
from .key import KeyfileKey, KeyfileNotFoundError, RepoKeyNotFoundError, KeyBlobStorage, identify_key
@ -119,7 +118,7 @@ class KeyManager:
export = 'To restore key use borg key import --paper /path/to/repo\n\n'
binary = a2b_base64(self.keyblob)
binary = binascii.a2b_base64(self.keyblob)
export += 'BORG PAPER KEY v1\n'
lines = (len(binary) + 17) // 18
repoid = bin_to_hex(self.repository.id)[:18]
@ -208,9 +207,9 @@ class KeyManager:
print("each line must contain exactly one '-', try again")
continue
try:
part = unhexlify(data)
except binascii.Error:
print("only characters 0-9 and a-f and '-' are valid, try again")
part = hex_to_bin(data)
except ValueError as e:
print(f"only characters 0-9 and a-f and '-' are valid, try again [{e}]")
continue
if sha256_truncated(idx.to_bytes(2, byteorder='big') + part, 2) != checksum:
print(f'line checksum did not match, try line {idx} again')
@ -224,7 +223,7 @@ class KeyManager:
print('The overall checksum did not match, retry or enter a blank line to abort.')
continue
self.keyblob = '\n'.join(textwrap.wrap(b2a_base64(result).decode('ascii'))) + '\n'
self.keyblob = '\n'.join(textwrap.wrap(binascii.b2a_base64(result).decode('ascii'))) + '\n'
self.store_keyblob(args)
break

View File

@ -5,7 +5,6 @@ import shutil
import stat
import struct
import time
from binascii import hexlify, unhexlify
from collections import defaultdict
from configparser import ConfigParser
from functools import partial
@ -481,7 +480,7 @@ class Repository:
if self.storage_quota is None:
# self.storage_quota is None => no explicit storage_quota was specified, use repository setting.
self.storage_quota = parse_file_size(self.config.get('repository', 'storage_quota', fallback=0))
self.id = unhexlify(self.config.get('repository', 'id').strip())
self.id = hex_to_bin(self.config.get('repository', 'id').strip(), length=32)
self.io = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir)
if self.check_segment_magic:
# read a segment and check whether we are dealing with a non-upgraded Attic repository

View File

@ -1,4 +1,5 @@
import argparse
import binascii
import errno
import io
import json
@ -15,7 +16,6 @@ import sys
import tempfile
import time
import unittest
from binascii import unhexlify, b2a_base64
from configparser import ConfigParser
from datetime import datetime
from datetime import timezone
@ -42,7 +42,7 @@ from ..helpers import Location, get_security_dir
from ..helpers import Manifest, MandatoryFeatureUnsupported, ArchiveInfo
from ..helpers import init_ec_warnings
from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR, Error, CancelledByUser, RTError, CommandError
from ..helpers import bin_to_hex
from ..helpers import bin_to_hex, hex_to_bin
from ..helpers import MAX_S
from ..helpers import msgpack
from ..helpers import flags_noatime, flags_normal
@ -3391,13 +3391,13 @@ class ArchiverTestCase(ArchiverTestCaseBase):
export_file = self.output_path + '/exported'
self.cmd('init', self.repository_location, '--encryption', 'keyfile')
self._set_repository_id(self.repository_path, unhexlify(repo_id))
self._set_repository_id(self.repository_path, hex_to_bin(repo_id))
key_file = self.keys_path + '/' + os.listdir(self.keys_path)[0]
with open(key_file, 'w') as fd:
fd.write(KeyfileKey.FILE_ID + ' ' + repo_id + '\n')
fd.write(b2a_base64(b'abcdefghijklmnopqrstu').decode())
fd.write(binascii.b2a_base64(b'abcdefghijklmnopqrstu').decode())
self.cmd('key', 'export', '--paper', self.repository_location, export_file)
@ -3415,12 +3415,12 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02
def test_key_import_paperkey(self):
repo_id = 'e294423506da4e1ea76e8dcdf1a3919624ae3ae496fddf905610c351d3f09239'
self.cmd('init', self.repository_location, '--encryption', 'keyfile')
self._set_repository_id(self.repository_path, unhexlify(repo_id))
self._set_repository_id(self.repository_path, hex_to_bin(repo_id))
key_file = self.keys_path + '/' + os.listdir(self.keys_path)[0]
with open(key_file, 'w') as fd:
fd.write(KeyfileKey.FILE_ID + ' ' + repo_id + '\n')
fd.write(b2a_base64(b'abcdefghijklmnopqrstu').decode())
fd.write(binascii.b2a_base64(b'abcdefghijklmnopqrstu').decode())
typed_input = (
b'2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 02\n' # Forgot to type "-"

View File

@ -1,11 +1,10 @@
import os
import zlib
from binascii import unhexlify
import pytest
from ..algorithms import checksums
from ..helpers import bin_to_hex
from ..helpers import bin_to_hex, hex_to_bin
crc32_implementations = [checksums.crc32_slice_by_8]
if checksums.have_clmul:
@ -31,7 +30,7 @@ def test_crc32(implementation):
def test_xxh64():
assert bin_to_hex(checksums.xxh64(b'test', 123)) == '2b81b9401bef86cf'
assert bin_to_hex(checksums.xxh64(b'test')) == '4fdcca5ddb678139'
assert bin_to_hex(checksums.xxh64(unhexlify(
assert bin_to_hex(checksums.xxh64(hex_to_bin(
'6f663f01c118abdea553373d5eae44e7dac3b6829b46b9bbeff202b6c592c22d724'
'fb3d25a347cca6c5b8f20d567e4bb04b9cfa85d17f691590f9a9d32e8ccc9102e9d'
'cf8a7e6716280cd642ce48d03fdf114c9f57c20d9472bb0f81c147645e6fa3d331'))) == '35d5d2f545d9511a'

View File

@ -1,10 +1,10 @@
from io import BytesIO
from binascii import unhexlify
from .chunker import cf
from ..chunker import Chunker
from ..crypto.low_level import blake2b_256
from ..constants import * # NOQA
from ..helpers import hex_to_bin
from . import BaseTestCase
@ -37,4 +37,4 @@ class ChunkerRegressionTestCase(BaseTestCase):
# The "correct" hash below matches the existing chunker behavior.
# Future chunker optimisations must not change this, or existing repos will bloat.
overall_hash = blake2b_256(b'', b''.join(runs))
self.assert_equal(overall_hash, unhexlify("b559b0ac8df8daaa221201d018815114241ea5c6609d98913cd2246a702af4e3"))
self.assert_equal(overall_hash, hex_to_bin("b559b0ac8df8daaa221201d018815114241ea5c6609d98913cd2246a702af4e3"))

View File

@ -1,11 +1,10 @@
# Note: these tests are part of the self test, do not use or import pytest functionality here.
# See borg.selftest for details. If you add/remove test methods, update SELFTEST_COUNT
from binascii import hexlify
from ..crypto.low_level import AES256_CTR_HMAC_SHA256, UNENCRYPTED, IntegrityError
from ..crypto.low_level import bytes_to_long, bytes_to_int, long_to_bytes
from ..crypto.low_level import hkdf_hmac_sha512
from ..helpers import bin_to_hex
from . import BaseTestCase
@ -43,10 +42,10 @@ class CryptoTestCase(BaseTestCase):
mac = hdr_mac_iv_cdata[1:33]
iv = hdr_mac_iv_cdata[33:41]
cdata = hdr_mac_iv_cdata[41:]
self.assert_equal(hexlify(hdr), b'42')
self.assert_equal(hexlify(mac), b'af90b488b0cc4a8f768fe2d6814fa65aec66b148135e54f7d4d29a27f22f57a8')
self.assert_equal(hexlify(iv), b'0000000000000000')
self.assert_equal(hexlify(cdata), b'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466')
self.assert_equal(bin_to_hex(hdr), '42')
self.assert_equal(bin_to_hex(mac), 'af90b488b0cc4a8f768fe2d6814fa65aec66b148135e54f7d4d29a27f22f57a8')
self.assert_equal(bin_to_hex(iv), '0000000000000000')
self.assert_equal(bin_to_hex(cdata), 'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466')
self.assert_equal(cs.next_iv(), 2)
# auth-then-decrypt
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1)
@ -72,10 +71,10 @@ class CryptoTestCase(BaseTestCase):
mac = hdr_mac_iv_cdata[3:35]
iv = hdr_mac_iv_cdata[35:43]
cdata = hdr_mac_iv_cdata[43:]
self.assert_equal(hexlify(hdr), b'123456')
self.assert_equal(hexlify(mac), b'7659a915d9927072ef130258052351a17ef882692893c3850dd798c03d2dd138')
self.assert_equal(hexlify(iv), b'0000000000000000')
self.assert_equal(hexlify(cdata), b'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466')
self.assert_equal(bin_to_hex(hdr), '123456')
self.assert_equal(bin_to_hex(mac), '7659a915d9927072ef130258052351a17ef882692893c3850dd798c03d2dd138')
self.assert_equal(bin_to_hex(iv), '0000000000000000')
self.assert_equal(bin_to_hex(cdata), 'c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466')
self.assert_equal(cs.next_iv(), 2)
# auth-then-decrypt
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1)

View File

@ -2,11 +2,10 @@ import getpass
import os.path
import re
import tempfile
from binascii import hexlify, unhexlify
import pytest
from ..crypto.key import Passphrase, PasswordRetriesExceeded, bin_to_hex
from ..crypto.key import Passphrase, PasswordRetriesExceeded
from ..crypto.key import PlaintextKey, PassphraseKey, AuthenticatedKey, RepoKey, KeyfileKey, \
Blake2KeyfileKey, Blake2RepoKey, Blake2AuthenticatedKey
from ..crypto.key import ID_HMAC_SHA_256, ID_BLAKE2b_256
@ -20,7 +19,7 @@ from ..helpers import Location
from ..helpers import StableDict
from ..helpers import get_security_dir
from ..helpers import msgpack
from ..helpers import hex_to_bin, bin_to_hex
class TestKey:
class MockArgs:
@ -36,11 +35,11 @@ class TestKey:
/cXJq7jrqmrJ1phd6dg4SHAM/i+hubadZoS6m25OQzYAW09wZD/phG8OVa698Z5ed3HTaT
SmrtgJL3EoOKgUI9d6BLE4dJdBqntifo""".strip()
keyfile2_cdata = unhexlify(re.sub(r'\W', '', """
keyfile2_cdata = hex_to_bin(re.sub(r'\W', '', """
0055f161493fcfc16276e8c31493c4641e1eb19a79d0326fad0291e5a9c98e5933
00000000000003e8d21eaf9b86c297a8cd56432e1915bb
"""))
keyfile2_id = unhexlify('c3fbf14bc001ebcc3cd86e696c13482ed071740927cd7cbe1b01b4bfcee49314')
keyfile2_id = hex_to_bin('c3fbf14bc001ebcc3cd86e696c13482ed071740927cd7cbe1b01b4bfcee49314')
keyfile_blake2_key_file = """
BORG_KEY 0000000000000000000000000000000000000000000000000000000000000000
@ -112,7 +111,7 @@ class TestKey:
def test_plaintext(self):
key = PlaintextKey.create(None, None)
chunk = b'foo'
assert hexlify(key.id_hash(chunk)) == b'2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae'
assert bin_to_hex(key.id_hash(chunk)) == '2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae'
assert chunk == key.decrypt(key.id_hash(chunk), key.encrypt(chunk))
def test_keyfile(self, monkeypatch, keys_dir):
@ -187,9 +186,9 @@ class TestKey:
monkeypatch.setenv('BORG_PASSPHRASE', 'test')
key = PassphraseKey.create(self.MockRepository(), None)
assert key.cipher.next_iv() == 0
assert hexlify(key.id_key) == b'793b0717f9d8fb01c751a487e9b827897ceea62409870600013fbc6b4d8d7ca6'
assert hexlify(key.enc_hmac_key) == b'b885a05d329a086627412a6142aaeb9f6c54ab7950f996dd65587251f6bc0901'
assert hexlify(key.enc_key) == b'2ff3654c6daf7381dbbe718d2b20b4f1ea1e34caa6cc65f6bb3ac376b93fed2a'
assert bin_to_hex(key.id_key) == '793b0717f9d8fb01c751a487e9b827897ceea62409870600013fbc6b4d8d7ca6'
assert bin_to_hex(key.enc_hmac_key) == 'b885a05d329a086627412a6142aaeb9f6c54ab7950f996dd65587251f6bc0901'
assert bin_to_hex(key.enc_key) == '2ff3654c6daf7381dbbe718d2b20b4f1ea1e34caa6cc65f6bb3ac376b93fed2a'
assert key.chunk_seed == -775740477
manifest = key.encrypt(b'ABC')
assert key.cipher.extract_iv(manifest) == 0
@ -205,7 +204,7 @@ class TestKey:
assert key.enc_key == key2.enc_key
assert key.chunk_seed == key2.chunk_seed
chunk = b'foo'
assert hexlify(key.id_hash(chunk)) == b'818217cf07d37efad3860766dcdf1d21e401650fed2d76ed1d797d3aae925990'
assert bin_to_hex(key.id_hash(chunk)) == '818217cf07d37efad3860766dcdf1d21e401650fed2d76ed1d797d3aae925990'
assert chunk == key2.decrypt(key2.id_hash(chunk), key.encrypt(chunk))
def _corrupt_byte(self, key, data, offset):