mirror of
https://github.com/borgbackup/borg.git
synced 2025-02-28 16:56:33 +00:00
Merge pull request #8104 from ThomasWaldmann/error-msg-bad-nonce-file2-master
refactor: use less binascii
This commit is contained in:
commit
2d31b027c7
12 changed files with 74 additions and 104 deletions
|
@ -1,13 +1,12 @@
|
|||
import argparse
|
||||
import configparser
|
||||
from binascii import unhexlify
|
||||
|
||||
from ._common import with_repository
|
||||
from ..cache import Cache, assert_secure
|
||||
from ..constants import * # NOQA
|
||||
from ..helpers import Error, CommandError
|
||||
from ..helpers import Location
|
||||
from ..helpers import parse_file_size
|
||||
from ..helpers import parse_file_size, hex_to_bin
|
||||
from ..manifest import Manifest
|
||||
|
||||
from ..logger import create_logger
|
||||
|
@ -46,12 +45,7 @@ def repo_validate(section, name, value=None, check_value=True):
|
|||
raise ValueError("Invalid value")
|
||||
elif name in ["id"]:
|
||||
if check_value:
|
||||
try:
|
||||
bin_id = unhexlify(value)
|
||||
except: # noqa
|
||||
raise ValueError("Invalid value, must be 64 hex digits") from None
|
||||
if len(bin_id) != 32:
|
||||
raise ValueError("Invalid value, must be 64 hex digits")
|
||||
hex_to_bin(value, length=32)
|
||||
else:
|
||||
raise ValueError("Invalid name")
|
||||
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import argparse
|
||||
from binascii import unhexlify, hexlify
|
||||
import functools
|
||||
import json
|
||||
import textwrap
|
||||
|
@ -9,7 +8,7 @@
|
|||
from ..constants import * # NOQA
|
||||
from ..helpers import msgpack
|
||||
from ..helpers import sysinfo
|
||||
from ..helpers import bin_to_hex, prepare_dump_dict
|
||||
from ..helpers import bin_to_hex, hex_to_bin, prepare_dump_dict
|
||||
from ..helpers import dash_open
|
||||
from ..helpers import StableDict
|
||||
from ..helpers import positive_int_validator, archivename_validator
|
||||
|
@ -179,7 +178,7 @@ def print_finding(info, wanted, data, offset):
|
|||
wanted = args.wanted
|
||||
try:
|
||||
if wanted.startswith("hex:"):
|
||||
wanted = unhexlify(wanted[4:])
|
||||
wanted = hex_to_bin(wanted[4:])
|
||||
elif wanted.startswith("str:"):
|
||||
wanted = wanted[4:].encode()
|
||||
else:
|
||||
|
@ -235,9 +234,7 @@ def do_debug_get_obj(self, args, repository):
|
|||
"""get object contents from the repository and write it into file"""
|
||||
hex_id = args.id
|
||||
try:
|
||||
id = unhexlify(hex_id)
|
||||
if len(id) != 32: # 256bit
|
||||
raise ValueError("id must be 256bits or 64 hex digits")
|
||||
id = hex_to_bin(hex_id, length=32)
|
||||
except ValueError as err:
|
||||
raise CommandError(f"object id {hex_id} is invalid [{str(err)}].")
|
||||
try:
|
||||
|
@ -264,9 +261,7 @@ def do_debug_parse_obj(self, args, repository, manifest):
|
|||
# get the object from id
|
||||
hex_id = args.id
|
||||
try:
|
||||
id = unhexlify(hex_id)
|
||||
if len(id) != 32: # 256bit
|
||||
raise ValueError("id must be 256bits or 64 hex digits")
|
||||
id = hex_to_bin(hex_id, length=32)
|
||||
except ValueError as err:
|
||||
raise CommandError(f"object id {hex_id} is invalid [{str(err)}].")
|
||||
|
||||
|
@ -289,9 +284,7 @@ def do_debug_format_obj(self, args, repository, manifest):
|
|||
# get the object from id
|
||||
hex_id = args.id
|
||||
try:
|
||||
id = unhexlify(hex_id)
|
||||
if len(id) != 32: # 256bit
|
||||
raise ValueError("id must be 256bits or 64 hex digits")
|
||||
id = hex_to_bin(hex_id, length=32)
|
||||
except ValueError as err:
|
||||
raise CommandError(f"object id {hex_id} is invalid [{str(err)}].")
|
||||
|
||||
|
@ -315,9 +308,7 @@ def do_debug_put_obj(self, args, repository):
|
|||
data = f.read()
|
||||
hex_id = args.id
|
||||
try:
|
||||
id = unhexlify(hex_id)
|
||||
if len(id) != 32: # 256bit
|
||||
raise ValueError("id must be 256bits or 64 hex digits")
|
||||
id = hex_to_bin(hex_id, length=32)
|
||||
except ValueError as err:
|
||||
raise CommandError(f"object id {hex_id} is invalid [{str(err)}].")
|
||||
|
||||
|
@ -331,7 +322,7 @@ def do_debug_delete_obj(self, args, repository):
|
|||
modified = False
|
||||
for hex_id in args.ids:
|
||||
try:
|
||||
id = unhexlify(hex_id)
|
||||
id = hex_to_bin(hex_id, length=32)
|
||||
except ValueError:
|
||||
print("object id %s is invalid." % hex_id)
|
||||
else:
|
||||
|
@ -350,7 +341,7 @@ def do_debug_refcount_obj(self, args, repository, manifest, cache):
|
|||
"""display refcounts for the objects with the given IDs"""
|
||||
for hex_id in args.ids:
|
||||
try:
|
||||
id = unhexlify(hex_id)
|
||||
id = hex_to_bin(hex_id, length=32)
|
||||
except ValueError:
|
||||
print("object id %s is invalid." % hex_id)
|
||||
else:
|
||||
|
@ -370,7 +361,7 @@ def do_debug_dump_hints(self, args, repository):
|
|||
segments=repository.segments,
|
||||
compact=repository.compact,
|
||||
storage_quota_use=repository.storage_quota_use,
|
||||
shadow_index={hexlify(k).decode(): v for k, v in repository.shadow_index.items()},
|
||||
shadow_index={bin_to_hex(k): v for k, v in repository.shadow_index.items()},
|
||||
)
|
||||
with dash_open(args.path, "w") as fd:
|
||||
json.dump(hints, fd, indent=4)
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
import os
|
||||
import shutil
|
||||
import stat
|
||||
from binascii import unhexlify
|
||||
from collections import namedtuple
|
||||
from time import perf_counter
|
||||
|
||||
|
@ -17,7 +16,7 @@
|
|||
from .helpers import Location
|
||||
from .helpers import Error
|
||||
from .helpers import get_cache_dir, get_security_dir
|
||||
from .helpers import bin_to_hex, parse_stringified_list
|
||||
from .helpers import bin_to_hex, hex_to_bin, parse_stringified_list
|
||||
from .helpers import format_file_size
|
||||
from .helpers import safe_ns
|
||||
from .helpers import yes
|
||||
|
@ -299,7 +298,7 @@ def load(self):
|
|||
self._config.read_file(fd)
|
||||
self._check_upgrade(self.config_path)
|
||||
self.id = self._config.get("cache", "repository")
|
||||
self.manifest_id = unhexlify(self._config.get("cache", "manifest"))
|
||||
self.manifest_id = hex_to_bin(self._config.get("cache", "manifest"))
|
||||
self.timestamp = self._config.get("cache", "timestamp", fallback=None)
|
||||
self.key_type = self._config.get("cache", "key_type", fallback=None)
|
||||
self.ignored_features = set(parse_stringified_list(self._config.get("cache", "ignored_features", fallback="")))
|
||||
|
@ -733,8 +732,8 @@ def cached_archives():
|
|||
fns = os.listdir(archive_path)
|
||||
# filenames with 64 hex digits == 256bit,
|
||||
# or compact indices which are 64 hex digits + ".compact"
|
||||
return {unhexlify(fn) for fn in fns if len(fn) == 64} | {
|
||||
unhexlify(fn[:64]) for fn in fns if len(fn) == 72 and fn.endswith(".compact")
|
||||
return {hex_to_bin(fn) for fn in fns if len(fn) == 64} | {
|
||||
hex_to_bin(fn[:64]) for fn in fns if len(fn) == 72 and fn.endswith(".compact")
|
||||
}
|
||||
else:
|
||||
return set()
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
import hmac
|
||||
import os
|
||||
import textwrap
|
||||
from binascii import a2b_base64, b2a_base64, hexlify
|
||||
from hashlib import sha256, pbkdf2_hmac
|
||||
from typing import Literal, Callable, ClassVar
|
||||
|
||||
|
@ -382,7 +381,7 @@ def detect(cls, repository, manifest_data):
|
|||
return key
|
||||
|
||||
def _load(self, key_data, passphrase):
|
||||
cdata = a2b_base64(key_data)
|
||||
cdata = binascii.a2b_base64(key_data)
|
||||
data = self.decrypt_key_file(cdata, passphrase)
|
||||
if data:
|
||||
data = msgpack.unpackb(data)
|
||||
|
@ -507,7 +506,7 @@ def _save(self, passphrase, algorithm):
|
|||
chunk_seed=self.chunk_seed,
|
||||
)
|
||||
data = self.encrypt_key_file(msgpack.packb(key.as_dict()), passphrase, algorithm)
|
||||
key_data = "\n".join(textwrap.wrap(b2a_base64(data).decode("ascii")))
|
||||
key_data = "\n".join(textwrap.wrap(binascii.b2a_base64(data).decode("ascii")))
|
||||
return key_data
|
||||
|
||||
def change_passphrase(self, passphrase=None):
|
||||
|
@ -547,7 +546,7 @@ def create(cls, repository, args, *, other_key=None):
|
|||
|
||||
def sanity_check(self, filename, id):
|
||||
file_id = self.FILE_ID.encode() + b" "
|
||||
repo_id = hexlify(id)
|
||||
repo_id = bin_to_hex(id).encode("ascii")
|
||||
with open(filename, "rb") as fd:
|
||||
# we do the magic / id check in binary mode to avoid stumbling over
|
||||
# decoding errors if somebody has binary files in the keys dir for some reason.
|
||||
|
@ -567,7 +566,7 @@ def sanity_check(self, filename, id):
|
|||
raise KeyfileInvalidError(self.repository._location.canonical_path(), filename)
|
||||
key_b64 = "".join(lines[1:])
|
||||
try:
|
||||
key = a2b_base64(key_b64)
|
||||
key = binascii.a2b_base64(key_b64)
|
||||
except binascii.Error:
|
||||
logger.warning(f"borg key sanity check: key line 2+ does not look like base64. [{filename}]")
|
||||
raise KeyfileInvalidError(self.repository._location.canonical_path(), filename)
|
||||
|
|
|
@ -1,10 +1,9 @@
|
|||
import binascii
|
||||
import pkgutil
|
||||
import textwrap
|
||||
from binascii import unhexlify, a2b_base64, b2a_base64
|
||||
from hashlib import sha256
|
||||
|
||||
from ..helpers import Error, yes, bin_to_hex, dash_open
|
||||
from ..helpers import Error, yes, bin_to_hex, hex_to_bin, dash_open
|
||||
from ..manifest import Manifest, NoManifestError
|
||||
from ..repository import Repository
|
||||
from ..repoobj import RepoObj
|
||||
|
@ -127,7 +126,7 @@ def grouped(s):
|
|||
|
||||
export = "To restore key use borg key import --paper /path/to/repo\n\n"
|
||||
|
||||
binary = a2b_base64(self.keyblob)
|
||||
binary = binascii.a2b_base64(self.keyblob)
|
||||
export += "BORG PAPER KEY v1\n"
|
||||
lines = (len(binary) + 17) // 18
|
||||
repoid = bin_to_hex(self.repository.id)[:18]
|
||||
|
@ -218,9 +217,9 @@ def import_paperkey(self, args):
|
|||
print("each line must contain exactly one '-', try again")
|
||||
continue
|
||||
try:
|
||||
part = unhexlify(data)
|
||||
except binascii.Error:
|
||||
print("only characters 0-9 and a-f and '-' are valid, try again")
|
||||
part = hex_to_bin(data)
|
||||
except ValueError as e:
|
||||
print(f"only characters 0-9 and a-f and '-' are valid, try again [{e}]")
|
||||
continue
|
||||
if sha256_truncated(idx.to_bytes(2, byteorder="big") + part, 2) != checksum:
|
||||
print(f"line checksum did not match, try line {idx} again")
|
||||
|
@ -234,7 +233,7 @@ def import_paperkey(self, args):
|
|||
print("The overall checksum did not match, retry or enter a blank line to abort.")
|
||||
continue
|
||||
|
||||
self.keyblob = "\n".join(textwrap.wrap(b2a_base64(result).decode("ascii"))) + "\n"
|
||||
self.keyblob = "\n".join(textwrap.wrap(binascii.b2a_base64(result).decode("ascii"))) + "\n"
|
||||
self.store_keyblob(args)
|
||||
break
|
||||
|
||||
|
|
|
@ -5,7 +5,6 @@
|
|||
import stat
|
||||
import struct
|
||||
import time
|
||||
from binascii import unhexlify
|
||||
from collections import defaultdict
|
||||
from configparser import ConfigParser
|
||||
from datetime import datetime, timezone
|
||||
|
@ -18,7 +17,7 @@
|
|||
from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size
|
||||
from .helpers import Location
|
||||
from .helpers import ProgressIndicatorPercent
|
||||
from .helpers import bin_to_hex
|
||||
from .helpers import bin_to_hex, hex_to_bin
|
||||
from .helpers import secure_erase, safe_unlink
|
||||
from .helpers import msgpack
|
||||
from .helpers.lrucache import LRUCache
|
||||
|
@ -490,7 +489,7 @@ def open(self, path, exclusive, lock_wait=None, lock=True):
|
|||
if self.storage_quota is None:
|
||||
# self.storage_quota is None => no explicit storage_quota was specified, use repository setting.
|
||||
self.storage_quota = parse_file_size(self.config.get("repository", "storage_quota", fallback=0))
|
||||
self.id = unhexlify(self.config.get("repository", "id").strip())
|
||||
self.id = hex_to_bin(self.config.get("repository", "id").strip(), length=32)
|
||||
self.io = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir)
|
||||
|
||||
def _load_hints(self):
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import binascii
|
||||
import os
|
||||
from binascii import unhexlify, b2a_base64, a2b_base64
|
||||
|
||||
import pytest
|
||||
|
||||
|
@ -7,7 +7,7 @@
|
|||
from ...crypto.key import AESOCBRepoKey, AESOCBKeyfileKey, CHPOKeyfileKey, Passphrase
|
||||
from ...crypto.keymanager import RepoIdMismatch, NotABorgKeyFile
|
||||
from ...helpers import EXIT_ERROR, CommandError
|
||||
from ...helpers import bin_to_hex
|
||||
from ...helpers import bin_to_hex, hex_to_bin
|
||||
from ...helpers import msgpack
|
||||
from ...repository import Repository
|
||||
from .. import key
|
||||
|
@ -223,12 +223,12 @@ def test_key_export_paperkey(archivers, request):
|
|||
repo_id = "e294423506da4e1ea76e8dcdf1a3919624ae3ae496fddf905610c351d3f09239"
|
||||
export_file = archiver.output_path + "/exported"
|
||||
cmd(archiver, "rcreate", KF_ENCRYPTION)
|
||||
_set_repository_id(archiver.repository_path, unhexlify(repo_id))
|
||||
_set_repository_id(archiver.repository_path, hex_to_bin(repo_id))
|
||||
key_file = archiver.keys_path + "/" + os.listdir(archiver.keys_path)[0]
|
||||
|
||||
with open(key_file, "w") as fd:
|
||||
fd.write(CHPOKeyfileKey.FILE_ID + " " + repo_id + "\n")
|
||||
fd.write(b2a_base64(b"abcdefghijklmnopqrstu").decode())
|
||||
fd.write(binascii.b2a_base64(b"abcdefghijklmnopqrstu").decode())
|
||||
|
||||
cmd(archiver, "key", "export", "--paper", export_file)
|
||||
|
||||
|
@ -251,12 +251,12 @@ def test_key_import_paperkey(archivers, request):
|
|||
archiver = request.getfixturevalue(archivers)
|
||||
repo_id = "e294423506da4e1ea76e8dcdf1a3919624ae3ae496fddf905610c351d3f09239"
|
||||
cmd(archiver, "rcreate", KF_ENCRYPTION)
|
||||
_set_repository_id(archiver.repository_path, unhexlify(repo_id))
|
||||
_set_repository_id(archiver.repository_path, hex_to_bin(repo_id))
|
||||
|
||||
key_file = archiver.keys_path + "/" + os.listdir(archiver.keys_path)[0]
|
||||
with open(key_file, "w") as fd:
|
||||
fd.write(AESOCBKeyfileKey.FILE_ID + " " + repo_id + "\n")
|
||||
fd.write(b2a_base64(b"abcdefghijklmnopqrstu").decode())
|
||||
fd.write(binascii.b2a_base64(b"abcdefghijklmnopqrstu").decode())
|
||||
|
||||
typed_input = (
|
||||
b"2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 02\n" # Forgot to type "-"
|
||||
|
@ -298,7 +298,7 @@ def test_init_defaults_to_argon2(archivers, request):
|
|||
archiver = request.getfixturevalue(archivers)
|
||||
cmd(archiver, "rcreate", RK_ENCRYPTION)
|
||||
with Repository(archiver.repository_path) as repository:
|
||||
key = msgpack.unpackb(a2b_base64(repository.load_key()))
|
||||
key = msgpack.unpackb(binascii.a2b_base64(repository.load_key()))
|
||||
assert key["algorithm"] == "argon2 chacha20-poly1305"
|
||||
|
||||
|
||||
|
@ -309,7 +309,7 @@ def test_change_passphrase_does_not_change_algorithm_argon2(archivers, request):
|
|||
cmd(archiver, "key", "change-passphrase")
|
||||
|
||||
with Repository(archiver.repository_path) as repository:
|
||||
key = msgpack.unpackb(a2b_base64(repository.load_key()))
|
||||
key = msgpack.unpackb(binascii.a2b_base64(repository.load_key()))
|
||||
assert key["algorithm"] == "argon2 chacha20-poly1305"
|
||||
|
||||
|
||||
|
@ -319,5 +319,5 @@ def test_change_location_does_not_change_algorithm_argon2(archivers, request):
|
|||
cmd(archiver, "key", "change-location", "repokey")
|
||||
|
||||
with Repository(archiver.repository_path) as repository:
|
||||
key = msgpack.unpackb(a2b_base64(repository.load_key()))
|
||||
key = msgpack.unpackb(binascii.a2b_base64(repository.load_key()))
|
||||
assert key["algorithm"] == "argon2 chacha20-poly1305"
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
import os
|
||||
from binascii import hexlify
|
||||
|
||||
from ...constants import * # NOQA
|
||||
from ...repository import Repository
|
||||
from ...manifest import Manifest
|
||||
from ...compress import ZSTD, ZLIB, LZ4, CNONE
|
||||
from ...helpers import bin_to_hex
|
||||
|
||||
from . import create_regular_file, cmd, RK_ENCRYPTION
|
||||
|
||||
|
@ -27,15 +27,7 @@ def check_compression(ctype, clevel, olevel):
|
|||
) # will also decompress according to metadata
|
||||
m_olevel = meta.get("olevel", -1)
|
||||
m_psize = meta.get("psize", -1)
|
||||
print(
|
||||
hexlify(id).decode(),
|
||||
meta["ctype"],
|
||||
meta["clevel"],
|
||||
meta["csize"],
|
||||
meta["size"],
|
||||
m_olevel,
|
||||
m_psize,
|
||||
)
|
||||
print(bin_to_hex(id), meta["ctype"], meta["clevel"], meta["csize"], meta["size"], m_olevel, m_psize)
|
||||
# this is not as easy as one thinks due to the DecidingCompressor choosing the smallest of
|
||||
# (desired compressed, lz4 compressed, not compressed).
|
||||
assert meta["ctype"] in (ctype, LZ4.ID, CNONE.ID)
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
from binascii import unhexlify
|
||||
|
||||
from .. import checksums
|
||||
from ..helpers import bin_to_hex
|
||||
from ..helpers import bin_to_hex, hex_to_bin
|
||||
|
||||
|
||||
def test_xxh64():
|
||||
|
@ -10,7 +8,7 @@ def test_xxh64():
|
|||
assert (
|
||||
bin_to_hex(
|
||||
checksums.xxh64(
|
||||
unhexlify(
|
||||
hex_to_bin(
|
||||
"6f663f01c118abdea553373d5eae44e7dac3b6829b46b9bbeff202b6c592c22d724"
|
||||
"fb3d25a347cca6c5b8f20d567e4bb04b9cfa85d17f691590f9a9d32e8ccc9102e9d"
|
||||
"cf8a7e6716280cd642ce48d03fdf114c9f57c20d9472bb0f81c147645e6fa3d331"
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
from io import BytesIO
|
||||
from binascii import unhexlify
|
||||
|
||||
from .chunker import cf
|
||||
from ..chunker import Chunker
|
||||
from ..crypto.low_level import blake2b_256
|
||||
from ..constants import * # NOQA
|
||||
from ..helpers import hex_to_bin
|
||||
|
||||
|
||||
def test_chunkpoints_unchanged():
|
||||
|
@ -34,4 +34,4 @@ def twist(size):
|
|||
# The "correct" hash below matches the existing chunker behavior.
|
||||
# Future chunker optimisations must not change this, or existing repos will bloat.
|
||||
overall_hash = blake2b_256(b"", b"".join(runs))
|
||||
assert overall_hash == unhexlify("b559b0ac8df8daaa221201d018815114241ea5c6609d98913cd2246a702af4e3")
|
||||
assert overall_hash == hex_to_bin("b559b0ac8df8daaa221201d018815114241ea5c6609d98913cd2246a702af4e3")
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
# Note: these tests are part of the self test, do not use or import pytest functionality here.
|
||||
# See borg.selftest for details. If you add/remove test methods, update SELFTEST_COUNT
|
||||
|
||||
from binascii import hexlify
|
||||
from unittest.mock import MagicMock
|
||||
import unittest
|
||||
|
||||
|
@ -9,7 +8,7 @@
|
|||
from ..crypto.low_level import bytes_to_long, bytes_to_int, long_to_bytes
|
||||
from ..crypto.low_level import AES, hmac_sha256
|
||||
from ..crypto.key import CHPOKeyfileKey, AESOCBRepoKey, FlexiKey
|
||||
from ..helpers import msgpack
|
||||
from ..helpers import msgpack, bin_to_hex
|
||||
|
||||
from . import BaseTestCase
|
||||
|
||||
|
@ -46,10 +45,10 @@ def test_AES256_CTR_HMAC_SHA256(self):
|
|||
mac = hdr_mac_iv_cdata[1:33]
|
||||
iv = hdr_mac_iv_cdata[33:41]
|
||||
cdata = hdr_mac_iv_cdata[41:]
|
||||
self.assert_equal(hexlify(hdr), b"42")
|
||||
self.assert_equal(hexlify(mac), b"af90b488b0cc4a8f768fe2d6814fa65aec66b148135e54f7d4d29a27f22f57a8")
|
||||
self.assert_equal(hexlify(iv), b"0000000000000000")
|
||||
self.assert_equal(hexlify(cdata), b"c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466")
|
||||
self.assert_equal(bin_to_hex(hdr), "42")
|
||||
self.assert_equal(bin_to_hex(mac), "af90b488b0cc4a8f768fe2d6814fa65aec66b148135e54f7d4d29a27f22f57a8")
|
||||
self.assert_equal(bin_to_hex(iv), "0000000000000000")
|
||||
self.assert_equal(bin_to_hex(cdata), "c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466")
|
||||
self.assert_equal(cs.next_iv(), 2)
|
||||
# auth-then-decrypt
|
||||
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1)
|
||||
|
@ -74,10 +73,10 @@ def test_AES256_CTR_HMAC_SHA256_aad(self):
|
|||
mac = hdr_mac_iv_cdata[3:35]
|
||||
iv = hdr_mac_iv_cdata[35:43]
|
||||
cdata = hdr_mac_iv_cdata[43:]
|
||||
self.assert_equal(hexlify(hdr), b"123456")
|
||||
self.assert_equal(hexlify(mac), b"7659a915d9927072ef130258052351a17ef882692893c3850dd798c03d2dd138")
|
||||
self.assert_equal(hexlify(iv), b"0000000000000000")
|
||||
self.assert_equal(hexlify(cdata), b"c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466")
|
||||
self.assert_equal(bin_to_hex(hdr), "123456")
|
||||
self.assert_equal(bin_to_hex(mac), "7659a915d9927072ef130258052351a17ef882692893c3850dd798c03d2dd138")
|
||||
self.assert_equal(bin_to_hex(iv), "0000000000000000")
|
||||
self.assert_equal(bin_to_hex(cdata), "c6efb702de12498f34a2c2bbc8149e759996d08bf6dc5c610aefc0c3a466")
|
||||
self.assert_equal(cs.next_iv(), 2)
|
||||
# auth-then-decrypt
|
||||
cs = AES256_CTR_HMAC_SHA256(mac_key, enc_key, header_len=len(header), aad_offset=1)
|
||||
|
@ -99,13 +98,13 @@ def test_AE(self):
|
|||
# (ciphersuite class, exp_mac, exp_cdata)
|
||||
(
|
||||
AES256_OCB,
|
||||
b"b6909c23c9aaebd9abbe1ff42097652d",
|
||||
b"877ce46d2f62dee54699cebc3ba41d9ab613f7c486778c1b3636664b1493",
|
||||
"b6909c23c9aaebd9abbe1ff42097652d",
|
||||
"877ce46d2f62dee54699cebc3ba41d9ab613f7c486778c1b3636664b1493",
|
||||
),
|
||||
(
|
||||
CHACHA20_POLY1305,
|
||||
b"fd08594796e0706cde1e8b461e3e0555",
|
||||
b"a093e4b0387526f085d3c40cca84a35230a5c0dd766453b77ba38bcff775",
|
||||
"fd08594796e0706cde1e8b461e3e0555",
|
||||
"a093e4b0387526f085d3c40cca84a35230a5c0dd766453b77ba38bcff775",
|
||||
),
|
||||
]
|
||||
for cs_cls, exp_mac, exp_cdata in tests:
|
||||
|
@ -117,10 +116,10 @@ def test_AE(self):
|
|||
iv = hdr_mac_iv_cdata[1:13]
|
||||
mac = hdr_mac_iv_cdata[13:29]
|
||||
cdata = hdr_mac_iv_cdata[29:]
|
||||
self.assert_equal(hexlify(hdr), b"23")
|
||||
self.assert_equal(hexlify(mac), exp_mac)
|
||||
self.assert_equal(hexlify(iv), b"000000000000000000000000")
|
||||
self.assert_equal(hexlify(cdata), exp_cdata)
|
||||
self.assert_equal(bin_to_hex(hdr), "23")
|
||||
self.assert_equal(bin_to_hex(mac), exp_mac)
|
||||
self.assert_equal(bin_to_hex(iv), "000000000000000000000000")
|
||||
self.assert_equal(bin_to_hex(cdata), exp_cdata)
|
||||
self.assert_equal(cs.next_iv(), 1)
|
||||
# auth/decrypt
|
||||
cs = cs_cls(key, iv_int, header_len=len(header), aad_offset=1)
|
||||
|
@ -142,13 +141,13 @@ def test_AEAD(self):
|
|||
# (ciphersuite class, exp_mac, exp_cdata)
|
||||
(
|
||||
AES256_OCB,
|
||||
b"f2748c412af1c7ead81863a18c2c1893",
|
||||
b"877ce46d2f62dee54699cebc3ba41d9ab613f7c486778c1b3636664b1493",
|
||||
"f2748c412af1c7ead81863a18c2c1893",
|
||||
"877ce46d2f62dee54699cebc3ba41d9ab613f7c486778c1b3636664b1493",
|
||||
),
|
||||
(
|
||||
CHACHA20_POLY1305,
|
||||
b"b7e7c9a79f2404e14f9aad156bf091dd",
|
||||
b"a093e4b0387526f085d3c40cca84a35230a5c0dd766453b77ba38bcff775",
|
||||
"b7e7c9a79f2404e14f9aad156bf091dd",
|
||||
"a093e4b0387526f085d3c40cca84a35230a5c0dd766453b77ba38bcff775",
|
||||
),
|
||||
]
|
||||
for cs_cls, exp_mac, exp_cdata in tests:
|
||||
|
@ -160,10 +159,10 @@ def test_AEAD(self):
|
|||
iv = hdr_mac_iv_cdata[3:15]
|
||||
mac = hdr_mac_iv_cdata[15:31]
|
||||
cdata = hdr_mac_iv_cdata[31:]
|
||||
self.assert_equal(hexlify(hdr), b"123456")
|
||||
self.assert_equal(hexlify(mac), exp_mac)
|
||||
self.assert_equal(hexlify(iv), b"000000000000000000000000")
|
||||
self.assert_equal(hexlify(cdata), exp_cdata)
|
||||
self.assert_equal(bin_to_hex(hdr), "123456")
|
||||
self.assert_equal(bin_to_hex(mac), exp_mac)
|
||||
self.assert_equal(bin_to_hex(iv), "000000000000000000000000")
|
||||
self.assert_equal(bin_to_hex(cdata), exp_cdata)
|
||||
self.assert_equal(cs.next_iv(), 1)
|
||||
# auth/decrypt
|
||||
cs = cs_cls(key, iv_int, header_len=len(header), aad_offset=1)
|
||||
|
|
|
@ -1,10 +1,9 @@
|
|||
import tempfile
|
||||
from binascii import hexlify, unhexlify, a2b_base64
|
||||
from binascii import a2b_base64
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from ..crypto.key import bin_to_hex
|
||||
from ..crypto.key import PlaintextKey, AuthenticatedKey, Blake2AuthenticatedKey
|
||||
from ..crypto.key import RepoKey, KeyfileKey, Blake2RepoKey, Blake2KeyfileKey
|
||||
from ..crypto.key import AEADKeyBase
|
||||
|
@ -18,6 +17,7 @@
|
|||
from ..helpers import Location
|
||||
from ..helpers import msgpack
|
||||
from ..constants import KEY_ALGORITHMS
|
||||
from ..helpers import hex_to_bin, bin_to_hex
|
||||
|
||||
|
||||
class TestKey:
|
||||
|
@ -35,10 +35,10 @@ class MockArgs:
|
|||
F84MsMMiqpbz4KVICeBZhfAaTPs4W7BC63qml0ZXJhdGlvbnPOAAGGoKRzYWx02gAgLENQ
|
||||
2uVCoR7EnAoiRzn8J+orbojKtJlNCnQ31SSC8rendmVyc2lvbgE=""".strip()
|
||||
|
||||
keyfile2_cdata = bytes.fromhex(
|
||||
keyfile2_cdata = hex_to_bin(
|
||||
"003be7d57280d1a42add9f3f36ea363bbc5e9349ad01ddec0634a54dd02959e70500000000000003ec063d2cbcacba6b"
|
||||
)
|
||||
keyfile2_id = unhexlify("c3fbf14bc001ebcc3cd86e696c13482ed071740927cd7cbe1b01b4bfcee49314")
|
||||
keyfile2_id = hex_to_bin("c3fbf14bc001ebcc3cd86e696c13482ed071740927cd7cbe1b01b4bfcee49314")
|
||||
|
||||
keyfile_blake2_key_file = """
|
||||
BORG_KEY 0000000000000000000000000000000000000000000000000000000000000000
|
||||
|
@ -54,7 +54,7 @@ class MockArgs:
|
|||
UTHFJg343jqml0ZXJhdGlvbnPOAAGGoKRzYWx02gAgz3YaUZZ/s+UWywj97EY5b4KhtJYi
|
||||
qkPqtDDxs2j/T7+ndmVyc2lvbgE=""".strip()
|
||||
|
||||
keyfile_blake2_cdata = bytes.fromhex(
|
||||
keyfile_blake2_cdata = hex_to_bin(
|
||||
"04d6040f5ef80e0a8ac92badcbe3dee83b7a6b53d5c9a58c4eed14964cb10ef591040404040404040d1e65cc1f435027"
|
||||
)
|
||||
# Verified against b2sum. Entire string passed to BLAKE2, including the padded 64 byte key contained in
|
||||
|
@ -64,7 +64,7 @@ class MockArgs:
|
|||
# 000000000000000000000000000000000000000000000000000000000000000000000000000000
|
||||
# 00000000000000000000007061796c6f6164
|
||||
# p a y l o a d
|
||||
keyfile_blake2_id = bytes.fromhex("d8bc68e961c79f99be39061589e5179b2113cd9226e07b08ddd4a1fef7ce93fb")
|
||||
keyfile_blake2_id = hex_to_bin("d8bc68e961c79f99be39061589e5179b2113cd9226e07b08ddd4a1fef7ce93fb")
|
||||
|
||||
@pytest.fixture
|
||||
def keys_dir(self, request, monkeypatch, tmpdir):
|
||||
|
@ -119,7 +119,7 @@ def test_plaintext(self):
|
|||
key = PlaintextKey.create(None, None)
|
||||
chunk = b"foo"
|
||||
id = key.id_hash(chunk)
|
||||
assert hexlify(id) == b"2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae"
|
||||
assert bin_to_hex(id) == "2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae"
|
||||
assert chunk == key.decrypt(id, key.encrypt(id, chunk))
|
||||
|
||||
def test_keyfile(self, monkeypatch, keys_dir):
|
||||
|
|
Loading…
Reference in a new issue