from struct import Struct from .constants import * # NOQA from .helpers import msgpack, workarounds from .helpers.errors import IntegrityError from .compress import Compressor, LZ4_COMPRESSOR, get_compressor # workaround for lost passphrase or key in "authenticated" or "authenticated-blake2" mode AUTHENTICATED_NO_KEY = "authenticated_no_key" in workarounds class RepoObj: meta_len_hdr = Struct(" bytes: # used for crypto type detection offs = cls.meta_len_hdr.size meta_len = cls.meta_len_hdr.unpack(data[:offs])[0] return data[offs + meta_len :] def __init__(self, key): self.key = key # Some commands write new chunks (e.g. rename) but don't take a --compression argument. This duplicates # the default used by those commands who do take a --compression argument. self.compressor = LZ4_COMPRESSOR def id_hash(self, data: bytes) -> bytes: return self.key.id_hash(data) def format( self, id: bytes, meta: dict, data: bytes, compress: bool = True, size: int = None, ctype: int = None, clevel: int = None, ro_type: str = None, ) -> bytes: assert isinstance(ro_type, str) assert ro_type != ROBJ_DONTCARE meta["type"] = ro_type assert isinstance(id, bytes) assert isinstance(meta, dict) assert isinstance(data, (bytes, memoryview)) assert compress or size is not None and ctype is not None and clevel is not None if compress: assert size is None or size == len(data) meta, data_compressed = self.compressor.compress(meta, data) else: assert isinstance(size, int) meta["size"] = size assert isinstance(ctype, int) meta["ctype"] = ctype assert isinstance(clevel, int) meta["clevel"] = clevel data_compressed = data # is already compressed, is NOT prefixed by type/level bytes meta["csize"] = len(data_compressed) data_encrypted = self.key.encrypt(id, data_compressed) meta_packed = msgpack.packb(meta) meta_encrypted = self.key.encrypt(id, meta_packed) hdr = self.meta_len_hdr.pack(len(meta_encrypted)) return hdr + meta_encrypted + data_encrypted def parse_meta(self, id: bytes, cdata: bytes, ro_type: str) -> dict: # when calling parse_meta, enough cdata needs to be supplied to contain completely the # meta_len_hdr and the encrypted, packed metadata. it is allowed to provide more cdata. assert isinstance(id, bytes) assert isinstance(cdata, bytes) assert isinstance(ro_type, str) obj = memoryview(cdata) offs = self.meta_len_hdr.size hdr = obj[:offs] len_meta_encrypted = self.meta_len_hdr.unpack(hdr)[0] assert offs + len_meta_encrypted <= len(obj) meta_encrypted = obj[offs : offs + len_meta_encrypted] meta_packed = self.key.decrypt(id, meta_encrypted) meta = msgpack.unpackb(meta_packed) if ro_type != ROBJ_DONTCARE and meta["type"] != ro_type: raise IntegrityError(f"ro_type expected: {ro_type} got: {meta['type']}") return meta def parse( self, id: bytes, cdata: bytes, decompress: bool = True, want_compressed: bool = False, ro_type: str = None ) -> tuple[dict, bytes]: """ Parse a repo object into metadata and data (decrypt it, maybe decompress, maybe verify if the chunk plaintext corresponds to the chunk id via assert_id()). Tweaking options (default is usually fine): - decompress=True, want_compressed=False: slow, verifying. returns decompressed data (default). - decompress=True, want_compressed=True: slow, verifying. returns compressed data (caller wants to reuse it). - decompress=False, want_compressed=True: quick, not verifying. returns compressed data (caller wants to reuse). - decompress=False, want_compressed=False: invalid """ assert isinstance(ro_type, str) assert not (not decompress and not want_compressed), "invalid parameter combination!" assert isinstance(id, bytes) assert isinstance(cdata, bytes) obj = memoryview(cdata) offs = self.meta_len_hdr.size hdr = obj[:offs] len_meta_encrypted = self.meta_len_hdr.unpack(hdr)[0] assert offs + len_meta_encrypted <= len(obj) meta_encrypted = obj[offs : offs + len_meta_encrypted] offs += len_meta_encrypted meta_packed = self.key.decrypt(id, meta_encrypted) meta_compressed = msgpack.unpackb(meta_packed) # means: before adding more metadata in decompress block if ro_type != ROBJ_DONTCARE and meta_compressed["type"] != ro_type: raise IntegrityError(f"ro_type expected: {ro_type} got: {meta_compressed['type']}") data_encrypted = obj[offs:] data_compressed = self.key.decrypt(id, data_encrypted) # does not include the type/level bytes if decompress: ctype = meta_compressed["ctype"] clevel = meta_compressed["clevel"] csize = meta_compressed["csize"] # always the overall size assert csize == len(data_compressed) psize = meta_compressed.get( "psize", csize ) # obfuscation: psize (payload size) is potentially less than csize. assert psize <= csize compr_hdr = bytes((ctype, clevel)) compressor_cls, compression_level = Compressor.detect(compr_hdr) compressor = compressor_cls(level=compression_level) meta, data = compressor.decompress(dict(meta_compressed), data_compressed[:psize]) if not AUTHENTICATED_NO_KEY: self.key.assert_id(id, data) else: meta, data = None, None return meta_compressed if want_compressed else meta, data_compressed if want_compressed else data class RepoObj1: # legacy @classmethod def extract_crypted_data(cls, data: bytes) -> bytes: # used for crypto type detection return data def __init__(self, key): self.key = key self.compressor = get_compressor("lz4", legacy_mode=True) def id_hash(self, data: bytes) -> bytes: return self.key.id_hash(data) def format( self, id: bytes, meta: dict, data: bytes, compress: bool = True, size: int = None, ctype: int = None, clevel: int = None, ro_type: str = None, ) -> bytes: assert isinstance(id, bytes) assert meta == {} assert isinstance(data, (bytes, memoryview)) assert ro_type is not None assert compress or size is not None and ctype is not None and clevel is not None if compress: assert size is None or size == len(data) meta, data_compressed = self.compressor.compress(meta, data) else: assert isinstance(size, int) data_compressed = data # is already compressed, must include type/level bytes data_encrypted = self.key.encrypt(id, data_compressed) return data_encrypted def parse_meta(self, id: bytes, cdata: bytes) -> dict: raise NotImplementedError("parse_meta is not available for RepoObj1") def parse( self, id: bytes, cdata: bytes, decompress: bool = True, want_compressed: bool = False, ro_type: str = None ) -> tuple[dict, bytes]: assert not (not decompress and not want_compressed), "invalid parameter combination!" assert isinstance(id, bytes) assert isinstance(cdata, bytes) assert ro_type is not None data_compressed = self.key.decrypt(id, cdata) compressor_cls, compression_level = Compressor.detect(data_compressed[:2]) compressor = compressor_cls(level=compression_level, legacy_mode=True) meta_compressed = {} meta_compressed["ctype"] = compressor.ID meta_compressed["clevel"] = compressor.level meta_compressed["csize"] = len(data_compressed) if decompress: meta, data = compressor.decompress(None, data_compressed) if not AUTHENTICATED_NO_KEY: self.key.assert_id(id, data) else: meta, data = None, None return meta_compressed if want_compressed else meta, data_compressed if want_compressed else data