1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2025-01-04 06:21:46 +00:00

transfer: verify chunks we get using assert_id, fixes #7383

this needs to decompress and to hash the chunk data,
but better let's play safe.

at least we still can avoid the (re-)compression with
borg transfer (which is often much more expensive
than decompression).
This commit is contained in:
Thomas Waldmann 2023-03-24 21:58:43 +01:00
parent 8dc52b23db
commit 3d65cb3fea
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01
3 changed files with 43 additions and 20 deletions

View file

@ -106,8 +106,11 @@ def do_transfer(self, args, *, repository, manifest, cache, other_repository=Non
if refcount == 0: # target repo does not yet have this chunk
if not dry_run:
cdata = other_repository.get(chunk_id)
# keep compressed payload same, avoid decompression / recompression
meta, data = other_manifest.repo_objs.parse(chunk_id, cdata, decompress=False)
# keep compressed payload same, verify via assert_id (that will
# decompress, but avoid needing to compress it again):
meta, data = other_manifest.repo_objs.parse(
chunk_id, cdata, decompress=True, want_compressed=True
)
meta, data = upgrader.upgrade_compressed_chunk(meta, data)
chunk_entry = cache.add_chunk(
chunk_id,

View file

@ -70,7 +70,20 @@ def parse_meta(self, id: bytes, cdata: bytes) -> dict:
meta = msgpack.unpackb(meta_packed)
return meta
def parse(self, id: bytes, cdata: bytes, decompress: bool = True) -> tuple[dict, bytes]:
def parse(
self, id: bytes, cdata: bytes, decompress: bool = True, want_compressed: bool = False
) -> tuple[dict, bytes]:
"""
Parse a repo object into metadata and data (decrypt it, maybe decompress, maybe verify if the chunk plaintext
corresponds to the chunk id via assert_id()).
Tweaking options (default is usually fine):
- decompress=True, want_compressed=False: slow, verifying. returns decompressed data (default).
- decompress=True, want_compressed=True: slow, verifying. returns compressed data (caller wants to reuse it).
- decompress=False, want_compressed=True: quick, not verifying. returns compressed data (caller wants to reuse).
- decompress=False, want_compressed=False: invalid
"""
assert not (not decompress and not want_compressed), "invalid parameter combination!"
assert isinstance(id, bytes)
assert isinstance(cdata, bytes)
obj = memoryview(cdata)
@ -81,24 +94,26 @@ def parse(self, id: bytes, cdata: bytes, decompress: bool = True) -> tuple[dict,
meta_encrypted = obj[offs : offs + len_meta_encrypted]
offs += len_meta_encrypted
meta_packed = self.key.decrypt(id, meta_encrypted)
meta = msgpack.unpackb(meta_packed)
meta_compressed = msgpack.unpackb(meta_packed) # means: before adding more metadata in decompress block
data_encrypted = obj[offs:]
data_compressed = self.key.decrypt(id, data_encrypted)
data_compressed = self.key.decrypt(id, data_encrypted) # does not include the type/level bytes
if decompress:
ctype = meta["ctype"]
clevel = meta["clevel"]
csize = meta["csize"] # always the overall size
ctype = meta_compressed["ctype"]
clevel = meta_compressed["clevel"]
csize = meta_compressed["csize"] # always the overall size
assert csize == len(data_compressed)
psize = meta.get("psize", csize) # obfuscation: psize (payload size) is potentially less than csize.
psize = meta_compressed.get(
"psize", csize
) # obfuscation: psize (payload size) is potentially less than csize.
assert psize <= csize
compr_hdr = bytes((ctype, clevel))
compressor_cls, compression_level = Compressor.detect(compr_hdr)
compressor = compressor_cls(level=compression_level)
meta, data = compressor.decompress(meta, data_compressed[:psize])
meta, data = compressor.decompress(dict(meta_compressed), data_compressed[:psize])
self.key.assert_id(id, data)
else:
data = data_compressed # does not include the type/level bytes
return meta, data
meta, data = None, None
return meta_compressed if want_compressed else meta, data_compressed if want_compressed else data
class RepoObj1: # legacy
@ -140,19 +155,22 @@ def format(
def parse_meta(self, id: bytes, cdata: bytes) -> dict:
raise NotImplementedError("parse_meta is not available for RepoObj1")
def parse(self, id: bytes, cdata: bytes, decompress: bool = True) -> tuple[dict, bytes]:
def parse(
self, id: bytes, cdata: bytes, decompress: bool = True, want_compressed: bool = False
) -> tuple[dict, bytes]:
assert not (not decompress and not want_compressed), "invalid parameter combination!"
assert isinstance(id, bytes)
assert isinstance(cdata, bytes)
data_compressed = self.key.decrypt(id, cdata)
compressor_cls, compression_level = Compressor.detect(data_compressed[:2])
compressor = compressor_cls(level=compression_level, legacy_mode=True)
meta_compressed = {}
meta_compressed["ctype"] = compressor.ID
meta_compressed["clevel"] = compressor.level
meta_compressed["csize"] = len(data_compressed)
if decompress:
meta, data = compressor.decompress(None, data_compressed)
self.key.assert_id(id, data)
else:
meta = {}
meta["ctype"] = compressor.ID
meta["clevel"] = compressor.level
meta["csize"] = len(data_compressed)
data = data_compressed
return meta, data
meta, data = None, None
return meta_compressed if want_compressed else meta, data_compressed if want_compressed else data

View file

@ -68,7 +68,9 @@ def test_borg1_borg2_transition(key):
repo_objs1 = RepoObj1(key)
id = repo_objs1.id_hash(data)
borg1_cdata = repo_objs1.format(id, meta, data)
meta1, compr_data1 = repo_objs1.parse(id, borg1_cdata, decompress=False) # borg transfer avoids (de)compression
meta1, compr_data1 = repo_objs1.parse(
id, borg1_cdata, decompress=True, want_compressed=True
) # avoid re-compression
# in borg 1, we can only get this metadata after decrypting the whole chunk (and we do not have "size" here):
assert meta1["ctype"] == LZ4.ID # default compression
assert meta1["clevel"] == 0xFF # lz4 does not know levels (yet?)