mirror of https://github.com/borgbackup/borg.git
197 lines
8.4 KiB
Python
197 lines
8.4 KiB
Python
from struct import Struct
|
|
|
|
from .constants import * # NOQA
|
|
from .helpers import msgpack, workarounds
|
|
from .helpers.errors import IntegrityError
|
|
from .compress import Compressor, LZ4_COMPRESSOR, get_compressor
|
|
|
|
# workaround for lost passphrase or key in "authenticated" or "authenticated-blake2" mode
|
|
AUTHENTICATED_NO_KEY = "authenticated_no_key" in workarounds
|
|
|
|
|
|
class RepoObj:
|
|
meta_len_hdr = Struct("<H") # 16bit unsigned int
|
|
|
|
@classmethod
|
|
def extract_crypted_data(cls, data: bytes) -> bytes:
|
|
# used for crypto type detection
|
|
offs = cls.meta_len_hdr.size
|
|
meta_len = cls.meta_len_hdr.unpack(data[:offs])[0]
|
|
return data[offs + meta_len :]
|
|
|
|
def __init__(self, key):
|
|
self.key = key
|
|
# Some commands write new chunks (e.g. rename) but don't take a --compression argument. This duplicates
|
|
# the default used by those commands who do take a --compression argument.
|
|
self.compressor = LZ4_COMPRESSOR
|
|
|
|
def id_hash(self, data: bytes) -> bytes:
|
|
return self.key.id_hash(data)
|
|
|
|
def format(
|
|
self,
|
|
id: bytes,
|
|
meta: dict,
|
|
data: bytes,
|
|
compress: bool = True,
|
|
size: int = None,
|
|
ctype: int = None,
|
|
clevel: int = None,
|
|
ro_type: str = None,
|
|
) -> bytes:
|
|
assert isinstance(ro_type, str)
|
|
assert ro_type != ROBJ_DONTCARE
|
|
meta["type"] = ro_type
|
|
assert isinstance(id, bytes)
|
|
assert isinstance(meta, dict)
|
|
assert isinstance(data, (bytes, memoryview))
|
|
assert compress or size is not None and ctype is not None and clevel is not None
|
|
if compress:
|
|
assert size is None or size == len(data)
|
|
meta, data_compressed = self.compressor.compress(meta, data)
|
|
else:
|
|
assert isinstance(size, int)
|
|
meta["size"] = size
|
|
assert isinstance(ctype, int)
|
|
meta["ctype"] = ctype
|
|
assert isinstance(clevel, int)
|
|
meta["clevel"] = clevel
|
|
data_compressed = data # is already compressed, is NOT prefixed by type/level bytes
|
|
meta["csize"] = len(data_compressed)
|
|
data_encrypted = self.key.encrypt(id, data_compressed)
|
|
meta_packed = msgpack.packb(meta)
|
|
meta_encrypted = self.key.encrypt(id, meta_packed)
|
|
hdr = self.meta_len_hdr.pack(len(meta_encrypted))
|
|
return hdr + meta_encrypted + data_encrypted
|
|
|
|
def parse_meta(self, id: bytes, cdata: bytes, ro_type: str) -> dict:
|
|
# when calling parse_meta, enough cdata needs to be supplied to contain completely the
|
|
# meta_len_hdr and the encrypted, packed metadata. it is allowed to provide more cdata.
|
|
assert isinstance(id, bytes)
|
|
assert isinstance(cdata, bytes)
|
|
assert isinstance(ro_type, str)
|
|
obj = memoryview(cdata)
|
|
offs = self.meta_len_hdr.size
|
|
hdr = obj[:offs]
|
|
len_meta_encrypted = self.meta_len_hdr.unpack(hdr)[0]
|
|
assert offs + len_meta_encrypted <= len(obj)
|
|
meta_encrypted = obj[offs : offs + len_meta_encrypted]
|
|
meta_packed = self.key.decrypt(id, meta_encrypted)
|
|
meta = msgpack.unpackb(meta_packed)
|
|
if ro_type != ROBJ_DONTCARE and meta["type"] != ro_type:
|
|
raise IntegrityError(f"ro_type expected: {ro_type} got: {meta['type']}")
|
|
return meta
|
|
|
|
def parse(
|
|
self, id: bytes, cdata: bytes, decompress: bool = True, want_compressed: bool = False, ro_type: str = None
|
|
) -> tuple[dict, bytes]:
|
|
"""
|
|
Parse a repo object into metadata and data (decrypt it, maybe decompress, maybe verify if the chunk plaintext
|
|
corresponds to the chunk id via assert_id()).
|
|
|
|
Tweaking options (default is usually fine):
|
|
- decompress=True, want_compressed=False: slow, verifying. returns decompressed data (default).
|
|
- decompress=True, want_compressed=True: slow, verifying. returns compressed data (caller wants to reuse it).
|
|
- decompress=False, want_compressed=True: quick, not verifying. returns compressed data (caller wants to reuse).
|
|
- decompress=False, want_compressed=False: invalid
|
|
"""
|
|
assert isinstance(ro_type, str)
|
|
assert not (not decompress and not want_compressed), "invalid parameter combination!"
|
|
assert isinstance(id, bytes)
|
|
assert isinstance(cdata, bytes)
|
|
obj = memoryview(cdata)
|
|
offs = self.meta_len_hdr.size
|
|
hdr = obj[:offs]
|
|
len_meta_encrypted = self.meta_len_hdr.unpack(hdr)[0]
|
|
assert offs + len_meta_encrypted <= len(obj)
|
|
meta_encrypted = obj[offs : offs + len_meta_encrypted]
|
|
offs += len_meta_encrypted
|
|
meta_packed = self.key.decrypt(id, meta_encrypted)
|
|
meta_compressed = msgpack.unpackb(meta_packed) # means: before adding more metadata in decompress block
|
|
if ro_type != ROBJ_DONTCARE and meta_compressed["type"] != ro_type:
|
|
raise IntegrityError(f"ro_type expected: {ro_type} got: {meta_compressed['type']}")
|
|
data_encrypted = obj[offs:]
|
|
data_compressed = self.key.decrypt(id, data_encrypted) # does not include the type/level bytes
|
|
if decompress:
|
|
ctype = meta_compressed["ctype"]
|
|
clevel = meta_compressed["clevel"]
|
|
csize = meta_compressed["csize"] # always the overall size
|
|
assert csize == len(data_compressed)
|
|
psize = meta_compressed.get(
|
|
"psize", csize
|
|
) # obfuscation: psize (payload size) is potentially less than csize.
|
|
assert psize <= csize
|
|
compr_hdr = bytes((ctype, clevel))
|
|
compressor_cls, compression_level = Compressor.detect(compr_hdr)
|
|
compressor = compressor_cls(level=compression_level)
|
|
meta, data = compressor.decompress(dict(meta_compressed), data_compressed[:psize])
|
|
if not AUTHENTICATED_NO_KEY:
|
|
self.key.assert_id(id, data)
|
|
else:
|
|
meta, data = None, None
|
|
return meta_compressed if want_compressed else meta, data_compressed if want_compressed else data
|
|
|
|
|
|
class RepoObj1: # legacy
|
|
@classmethod
|
|
def extract_crypted_data(cls, data: bytes) -> bytes:
|
|
# used for crypto type detection
|
|
return data
|
|
|
|
def __init__(self, key):
|
|
self.key = key
|
|
self.compressor = get_compressor("lz4", legacy_mode=True)
|
|
|
|
def id_hash(self, data: bytes) -> bytes:
|
|
return self.key.id_hash(data)
|
|
|
|
def format(
|
|
self,
|
|
id: bytes,
|
|
meta: dict,
|
|
data: bytes,
|
|
compress: bool = True,
|
|
size: int = None,
|
|
ctype: int = None,
|
|
clevel: int = None,
|
|
ro_type: str = None,
|
|
) -> bytes:
|
|
assert isinstance(id, bytes)
|
|
assert meta == {}
|
|
assert isinstance(data, (bytes, memoryview))
|
|
assert ro_type is not None
|
|
assert compress or size is not None and ctype is not None and clevel is not None
|
|
if compress:
|
|
assert size is None or size == len(data)
|
|
meta, data_compressed = self.compressor.compress(meta, data)
|
|
else:
|
|
assert isinstance(size, int)
|
|
data_compressed = data # is already compressed, must include type/level bytes
|
|
data_encrypted = self.key.encrypt(id, data_compressed)
|
|
return data_encrypted
|
|
|
|
def parse_meta(self, id: bytes, cdata: bytes) -> dict:
|
|
raise NotImplementedError("parse_meta is not available for RepoObj1")
|
|
|
|
def parse(
|
|
self, id: bytes, cdata: bytes, decompress: bool = True, want_compressed: bool = False, ro_type: str = None
|
|
) -> tuple[dict, bytes]:
|
|
assert not (not decompress and not want_compressed), "invalid parameter combination!"
|
|
assert isinstance(id, bytes)
|
|
assert isinstance(cdata, bytes)
|
|
assert ro_type is not None
|
|
data_compressed = self.key.decrypt(id, cdata)
|
|
compressor_cls, compression_level = Compressor.detect(data_compressed[:2])
|
|
compressor = compressor_cls(level=compression_level, legacy_mode=True)
|
|
meta_compressed = {}
|
|
meta_compressed["ctype"] = compressor.ID
|
|
meta_compressed["clevel"] = compressor.level
|
|
meta_compressed["csize"] = len(data_compressed)
|
|
if decompress:
|
|
meta, data = compressor.decompress(None, data_compressed)
|
|
if not AUTHENTICATED_NO_KEY:
|
|
self.key.assert_id(id, data)
|
|
else:
|
|
meta, data = None, None
|
|
return meta_compressed if want_compressed else meta, data_compressed if want_compressed else data
|