diff --git a/src/borg/archive.py b/src/borg/archive.py index 8d85724c..8db9e28c 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -296,7 +296,7 @@ class DownloadPipeline: """ hlids_preloaded = set() unpacker = msgpack.Unpacker(use_list=False) - for data in self.fetch_many(ids): + for data in self.fetch_many(ids, ro_type=ROBJ_ARCHIVE_STREAM): unpacker.feed(data) for _item in unpacker: item = Item(internal_dict=_item) @@ -318,9 +318,10 @@ class DownloadPipeline: self.repository.preload([c.id for c in item.chunks]) yield item - def fetch_many(self, ids, is_preloaded=False): + def fetch_many(self, ids, is_preloaded=False, ro_type=None): + assert ro_type is not None for id_, cdata in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)): - _, data = self.repo_objs.parse(id_, cdata) + _, data = self.repo_objs.parse(id_, cdata, ro_type=ro_type) yield data @@ -393,7 +394,9 @@ class CacheChunkBuffer(ChunkBuffer): self.stats = stats def write_chunk(self, chunk): - id_, _ = self.cache.add_chunk(self.key.id_hash(chunk), {}, chunk, stats=self.stats, wait=False) + id_, _ = self.cache.add_chunk( + self.key.id_hash(chunk), {}, chunk, stats=self.stats, wait=False, ro_type=ROBJ_ARCHIVE_STREAM + ) logger.debug(f"writing item metadata stream chunk {bin_to_hex(id_)}") self.cache.repository.async_response(wait=False) return id_ @@ -422,7 +425,7 @@ def archive_get_items(metadata, *, repo_objs, repository): assert "items" not in metadata items = [] for id, cdata in zip(metadata.item_ptrs, repository.get_many(metadata.item_ptrs)): - _, data = repo_objs.parse(id, cdata) + _, data = repo_objs.parse(id, cdata, ro_type=ROBJ_ARCHIVE_CHUNKIDS) ids = msgpack.unpackb(data) items.extend(ids) return items @@ -440,9 +443,9 @@ def archive_put_items(chunk_ids, *, repo_objs, cache=None, stats=None, add_refer id = repo_objs.id_hash(data) logger.debug(f"writing item_ptrs chunk {bin_to_hex(id)}") if cache is not None and stats is not None: - cache.add_chunk(id, {}, data, stats=stats) + cache.add_chunk(id, {}, data, stats=stats, ro_type=ROBJ_ARCHIVE_CHUNKIDS) elif add_reference is not None: - cdata = repo_objs.format(id, {}, data) + cdata = repo_objs.format(id, {}, data, ro_type=ROBJ_ARCHIVE_CHUNKIDS) add_reference(id, len(data), cdata) else: raise NotImplementedError @@ -531,7 +534,7 @@ class Archive: def _load_meta(self, id): cdata = self.repository.get(id) - _, data = self.repo_objs.parse(id, cdata) + _, data = self.repo_objs.parse(id, cdata, ro_type=ROBJ_ARCHIVE_META) archive, _ = self.key.unpack_and_verify_archive(data) metadata = ArchiveItem(internal_dict=archive) if metadata.version not in (1, 2): # legacy: still need to read v1 archives @@ -702,7 +705,7 @@ Duration: {0.duration} data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b"archive") self.id = self.repo_objs.id_hash(data) try: - self.cache.add_chunk(self.id, {}, data, stats=self.stats) + self.cache.add_chunk(self.id, {}, data, stats=self.stats, ro_type=ROBJ_ARCHIVE_META) except IntegrityError as err: err_msg = str(err) # hack to avoid changing the RPC protocol by introducing new (more specific) exception class @@ -740,7 +743,7 @@ Duration: {0.duration} for id, chunk in zip(self.metadata.items, self.repository.get_many(self.metadata.items)): pi.show(increase=1) add(id) - _, data = self.repo_objs.parse(id, chunk) + _, data = self.repo_objs.parse(id, chunk, ro_type=ROBJ_ARCHIVE_STREAM) sync.feed(data) unique_size = archive_index.stats_against(cache.chunks)[1] pi.finish() @@ -826,7 +829,9 @@ Duration: {0.duration} # it would get stuck. if "chunks" in item: item_chunks_size = 0 - for data in self.pipeline.fetch_many([c.id for c in item.chunks], is_preloaded=True): + for data in self.pipeline.fetch_many( + [c.id for c in item.chunks], is_preloaded=True, ro_type=ROBJ_FILE_STREAM + ): if pi: pi.show(increase=len(data), info=[remove_surrogates(item.path)]) if stdout: @@ -878,7 +883,7 @@ Duration: {0.duration} fd = open(path, "wb") with fd: ids = [c.id for c in item.chunks] - for data in self.pipeline.fetch_many(ids, is_preloaded=True): + for data in self.pipeline.fetch_many(ids, is_preloaded=True, ro_type=ROBJ_FILE_STREAM): if pi: pi.show(increase=len(data), info=[remove_surrogates(item.path)]) with backup_io("write"): @@ -1027,7 +1032,7 @@ Duration: {0.duration} del metadata.items data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b"archive") new_id = self.key.id_hash(data) - self.cache.add_chunk(new_id, {}, data, stats=self.stats) + self.cache.add_chunk(new_id, {}, data, stats=self.stats, ro_type=ROBJ_ARCHIVE_META) self.manifest.archives[self.name] = (new_id, metadata.time) self.cache.chunk_decref(self.id, self.stats) self.id = new_id @@ -1076,7 +1081,7 @@ Duration: {0.duration} for i, (items_id, data) in enumerate(zip(items_ids, self.repository.get_many(items_ids))): if progress: pi.show(i) - _, data = self.repo_objs.parse(items_id, data) + _, data = self.repo_objs.parse(items_id, data, ro_type=ROBJ_ARCHIVE_STREAM) unpacker.feed(data) chunk_decref(items_id, stats) try: @@ -1132,8 +1137,8 @@ Duration: {0.duration} path, item1, item2, - archive1.pipeline.fetch_many([c.id for c in item1.get("chunks", [])]), - archive2.pipeline.fetch_many([c.id for c in item2.get("chunks", [])]), + archive1.pipeline.fetch_many([c.id for c in item1.get("chunks", [])], ro_type=ROBJ_FILE_STREAM), + archive2.pipeline.fetch_many([c.id for c in item2.get("chunks", [])], ro_type=ROBJ_FILE_STREAM), can_compare_chunk_ids=can_compare_chunk_ids, ) @@ -1319,7 +1324,7 @@ class ChunksProcessor: started_hashing = time.monotonic() chunk_id, data = cached_hash(chunk, self.key.id_hash) stats.hashing_time += time.monotonic() - started_hashing - chunk_entry = cache.add_chunk(chunk_id, {}, data, stats=stats, wait=False) + chunk_entry = cache.add_chunk(chunk_id, {}, data, stats=stats, wait=False, ro_type=ROBJ_FILE_STREAM) self.cache.repository.async_response(wait=False) return chunk_entry @@ -1898,7 +1903,7 @@ class ArchiveChecker: else: try: # we must decompress, so it'll call assert_id() in there: - self.repo_objs.parse(chunk_id, encrypted_data, decompress=True) + self.repo_objs.parse(chunk_id, encrypted_data, decompress=True, ro_type=ROBJ_DONTCARE) except IntegrityErrorBase as integrity_error: self.error_found = True errors += 1 @@ -1930,7 +1935,7 @@ class ArchiveChecker: try: encrypted_data = self.repository.get(defect_chunk) # we must decompress, so it'll call assert_id() in there: - self.repo_objs.parse(defect_chunk, encrypted_data, decompress=True) + self.repo_objs.parse(defect_chunk, encrypted_data, decompress=True, ro_type=ROBJ_DONTCARE) except IntegrityErrorBase: # failed twice -> get rid of this chunk del self.chunks[defect_chunk] @@ -1978,7 +1983,7 @@ class ArchiveChecker: pi.show() cdata = self.repository.get(chunk_id) try: - _, data = self.repo_objs.parse(chunk_id, cdata) + _, data = self.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_DONTCARE) except IntegrityErrorBase as exc: logger.error("Skipping corrupted chunk: %s", exc) self.error_found = True @@ -2043,7 +2048,7 @@ class ArchiveChecker: def add_callback(chunk): id_ = self.key.id_hash(chunk) - cdata = self.repo_objs.format(id_, {}, chunk) + cdata = self.repo_objs.format(id_, {}, chunk, ro_type=ROBJ_ARCHIVE_STREAM) add_reference(id_, len(chunk), cdata) return id_ @@ -2066,7 +2071,7 @@ class ArchiveChecker: def replacement_chunk(size): chunk = Chunk(None, allocation=CH_ALLOC, size=size) chunk_id, data = cached_hash(chunk, self.key.id_hash) - cdata = self.repo_objs.format(chunk_id, {}, data) + cdata = self.repo_objs.format(chunk_id, {}, data, ro_type=ROBJ_FILE_STREAM) return chunk_id, size, cdata offset = 0 @@ -2197,7 +2202,7 @@ class ArchiveChecker: unpacker.resync() for chunk_id, cdata in zip(items, repository.get_many(items)): try: - _, data = self.repo_objs.parse(chunk_id, cdata) + _, data = self.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_ARCHIVE_STREAM) unpacker.feed(data) for item in unpacker: valid, reason = valid_item(item) @@ -2260,7 +2265,7 @@ class ArchiveChecker: mark_as_possibly_superseded(archive_id) cdata = self.repository.get(archive_id) try: - _, data = self.repo_objs.parse(archive_id, cdata) + _, data = self.repo_objs.parse(archive_id, cdata, ro_type=ROBJ_ARCHIVE_META) except IntegrityError as integrity_error: logger.error("Archive metadata block %s is corrupted: %s", bin_to_hex(archive_id), integrity_error) self.error_found = True @@ -2298,7 +2303,7 @@ class ArchiveChecker: ) data = self.key.pack_and_authenticate_metadata(archive.as_dict(), context=b"archive", salt=salt) new_archive_id = self.key.id_hash(data) - cdata = self.repo_objs.format(new_archive_id, {}, data) + cdata = self.repo_objs.format(new_archive_id, {}, data, ro_type=ROBJ_ARCHIVE_META) add_reference(new_archive_id, len(data), cdata) self.manifest.archives[info.name] = (new_archive_id, info.ts) pi.finish() @@ -2434,13 +2439,13 @@ class ArchiveRecreater: chunk_id, data = cached_hash(chunk, self.key.id_hash) if chunk_id in self.seen_chunks: return self.cache.chunk_incref(chunk_id, target.stats) - chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, wait=False) + chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, wait=False, ro_type=ROBJ_FILE_STREAM) self.cache.repository.async_response(wait=False) self.seen_chunks.add(chunk_entry.id) return chunk_entry def iter_chunks(self, archive, target, chunks): - chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _ in chunks]) + chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _ in chunks], ro_type=ROBJ_FILE_STREAM) if target.recreate_rechunkify: # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk # (does not load the entire file into memory) diff --git a/src/borg/archiver/debug_cmd.py b/src/borg/archiver/debug_cmd.py index 6c0d1892..cab8075c 100644 --- a/src/borg/archiver/debug_cmd.py +++ b/src/borg/archiver/debug_cmd.py @@ -35,7 +35,7 @@ class DebugMixIn: repo_objs = manifest.repo_objs archive = Archive(manifest, args.name) for i, item_id in enumerate(archive.metadata.items): - _, data = repo_objs.parse(item_id, repository.get(item_id)) + _, data = repo_objs.parse(item_id, repository.get(item_id), ro_type=ROBJ_ARCHIVE_STREAM) filename = "%06d_%s.items" % (i, bin_to_hex(item_id)) print("Dumping", filename) with open(filename, "wb") as fd: @@ -65,7 +65,8 @@ class DebugMixIn: fd.write(do_indent(prepare_dump_dict(archive_meta_orig))) fd.write(",\n") - _, data = repo_objs.parse(archive_meta_orig["id"], repository.get(archive_meta_orig["id"])) + archive_id = archive_meta_orig["id"] + _, data = repo_objs.parse(archive_id, repository.get(archive_id), ro_type=ROBJ_ARCHIVE_META) archive_org_dict = msgpack.unpackb(data, object_hook=StableDict) fd.write(' "_meta":\n') @@ -77,10 +78,10 @@ class DebugMixIn: first = True items = [] for chunk_id in archive_org_dict["item_ptrs"]: - _, data = repo_objs.parse(chunk_id, repository.get(chunk_id)) + _, data = repo_objs.parse(chunk_id, repository.get(chunk_id), ro_type=ROBJ_ARCHIVE_CHUNKIDS) items.extend(msgpack.unpackb(data)) for item_id in items: - _, data = repo_objs.parse(item_id, repository.get(item_id)) + _, data = repo_objs.parse(item_id, repository.get(item_id), ro_type=ROBJ_ARCHIVE_STREAM) unpacker.feed(data) for item in unpacker: item = prepare_dump_dict(item) @@ -101,7 +102,7 @@ class DebugMixIn: def do_debug_dump_manifest(self, args, repository, manifest): """dump decoded repository manifest""" repo_objs = manifest.repo_objs - _, data = repo_objs.parse(manifest.MANIFEST_ID, repository.get(manifest.MANIFEST_ID)) + _, data = repo_objs.parse(manifest.MANIFEST_ID, repository.get(manifest.MANIFEST_ID), ro_type=ROBJ_MANIFEST) meta = prepare_dump_dict(msgpack.unpackb(data, object_hook=StableDict)) @@ -116,7 +117,7 @@ class DebugMixIn: def decrypt_dump(i, id, cdata, tag=None, segment=None, offset=None): if cdata is not None: - _, data = repo_objs.parse(id, cdata) + _, data = repo_objs.parse(id, cdata, ro_type=ROBJ_DONTCARE) else: _, data = {}, b"" tag_str = "" if tag is None else "_" + tag @@ -211,7 +212,7 @@ class DebugMixIn: break for id in ids: cdata = repository.get(id) - _, data = repo_objs.parse(id, cdata) + _, data = repo_objs.parse(id, cdata, ro_type=ROBJ_DONTCARE) # try to locate wanted sequence crossing the border of last_data and data boundary_data = last_data[-(len(wanted) - 1) :] + data[: len(wanted) - 1] @@ -284,7 +285,7 @@ class DebugMixIn: cdata = f.read() repo_objs = manifest.repo_objs - meta, data = repo_objs.parse(id=id, cdata=cdata) + meta, data = repo_objs.parse(id=id, cdata=cdata, ro_type=ROBJ_DONTCARE) with open(args.json_path, "w") as f: json.dump(meta, f) @@ -315,7 +316,8 @@ class DebugMixIn: meta = json.load(f) repo_objs = manifest.repo_objs - data_encrypted = repo_objs.format(id=id, meta=meta, data=data) + # TODO: support misc repo object types other than ROBJ_FILE_STREAM + data_encrypted = repo_objs.format(id=id, meta=meta, data=data, ro_type=ROBJ_FILE_STREAM) with open(args.object_path, "wb") as f: f.write(data_encrypted) diff --git a/src/borg/archiver/rcompress_cmd.py b/src/borg/archiver/rcompress_cmd.py index 038fd70b..40d7571d 100644 --- a/src/borg/archiver/rcompress_cmd.py +++ b/src/borg/archiver/rcompress_cmd.py @@ -37,7 +37,7 @@ def find_chunks(repository, repo_objs, stats, ctype, clevel, olevel): if not chunk_ids: break for id, chunk_no_data in zip(chunk_ids, repository.get_many(chunk_ids, read_data=False)): - meta = repo_objs.parse_meta(id, chunk_no_data) + meta = repo_objs.parse_meta(id, chunk_no_data, ro_type=ROBJ_DONTCARE) compr_found = meta["ctype"], meta["clevel"], meta.get("olevel", -1) if compr_found != compr_wanted: recompress_ids.append(id) @@ -57,13 +57,14 @@ def process_chunks(repository, repo_objs, stats, recompress_ids, olevel): for id, chunk in zip(recompress_ids, repository.get_many(recompress_ids, read_data=True)): old_size = len(chunk) stats["old_size"] += old_size - meta, data = repo_objs.parse(id, chunk) + meta, data = repo_objs.parse(id, chunk, ro_type=ROBJ_DONTCARE) + ro_type = meta.pop("type", None) compr_old = meta["ctype"], meta["clevel"], meta.get("olevel", -1) if olevel == -1: # if the chunk was obfuscated, but should not be in future, remove related metadata meta.pop("olevel", None) meta.pop("psize", None) - chunk = repo_objs.format(id, meta, data) + chunk = repo_objs.format(id, meta, data, ro_type=ro_type) compr_done = meta["ctype"], meta["clevel"], meta.get("olevel", -1) if compr_done != compr_old: # we actually changed something diff --git a/src/borg/archiver/tar_cmds.py b/src/borg/archiver/tar_cmds.py index b18cb24d..0504f97e 100644 --- a/src/borg/archiver/tar_cmds.py +++ b/src/borg/archiver/tar_cmds.py @@ -115,7 +115,9 @@ class TarMixIn: """ Return a file-like object that reads from the chunks of *item*. """ - chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _ in item.chunks], is_preloaded=True) + chunk_iterator = archive.pipeline.fetch_many( + [chunk_id for chunk_id, _ in item.chunks], is_preloaded=True, ro_type=ROBJ_FILE_STREAM + ) if pi: info = [remove_surrogates(item.path)] return ChunkIteratorFileWrapper( diff --git a/src/borg/archiver/transfer_cmd.py b/src/borg/archiver/transfer_cmd.py index 49529a6c..f820ef63 100644 --- a/src/borg/archiver/transfer_cmd.py +++ b/src/borg/archiver/transfer_cmd.py @@ -111,7 +111,11 @@ class TransferMixIn: # keep compressed payload same, verify via assert_id (that will # decompress, but avoid needing to compress it again): meta, data = other_manifest.repo_objs.parse( - chunk_id, cdata, decompress=True, want_compressed=True + chunk_id, + cdata, + decompress=True, + want_compressed=True, + ro_type=ROBJ_FILE_STREAM, ) meta, data = upgrader.upgrade_compressed_chunk(meta, data) chunk_entry = cache.add_chunk( @@ -124,12 +128,20 @@ class TransferMixIn: size=size, ctype=meta["ctype"], clevel=meta["clevel"], + ro_type=ROBJ_FILE_STREAM, ) elif args.recompress == "always": # always decompress and re-compress file data chunks - meta, data = other_manifest.repo_objs.parse(chunk_id, cdata) + meta, data = other_manifest.repo_objs.parse( + chunk_id, cdata, ro_type=ROBJ_FILE_STREAM + ) chunk_entry = cache.add_chunk( - chunk_id, meta, data, stats=archive.stats, wait=False + chunk_id, + meta, + data, + stats=archive.stats, + wait=False, + ro_type=ROBJ_FILE_STREAM, ) else: raise ValueError(f"unsupported recompress mode: {args.recompress}") diff --git a/src/borg/cache.py b/src/borg/cache.py index 4f0032e1..fb99309a 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -12,7 +12,7 @@ logger = create_logger() files_cache_logger = create_logger("borg.debug.files_cache") -from .constants import CACHE_README, FILES_CACHE_MODE_DISABLED +from .constants import CACHE_README, FILES_CACHE_MODE_DISABLED, ROBJ_FILE_STREAM from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer from .helpers import Location from .helpers import Error @@ -939,7 +939,21 @@ class LocalCache(CacheStatsMixin): self.cache_config.ignored_features.update(repo_features - my_features) self.cache_config.mandatory_features.update(repo_features & my_features) - def add_chunk(self, id, meta, data, *, stats, wait=True, compress=True, size=None, ctype=None, clevel=None): + def add_chunk( + self, + id, + meta, + data, + *, + stats, + wait=True, + compress=True, + size=None, + ctype=None, + clevel=None, + ro_type=ROBJ_FILE_STREAM, + ): + assert ro_type is not None if not self.txn_active: self.begin_txn() if size is None and compress: @@ -949,7 +963,9 @@ class LocalCache(CacheStatsMixin): return self.chunk_incref(id, stats) if size is None: raise ValueError("when giving compressed data for a new chunk, the uncompressed size must be given also") - cdata = self.repo_objs.format(id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel) + cdata = self.repo_objs.format( + id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type + ) self.repository.put(id, cdata, wait=wait) self.chunks.add(id, 1, size) stats.update(size, not refcount) @@ -1113,7 +1129,8 @@ Chunk index: {0.total_unique_chunks:20d} unknown""" def memorize_file(self, hashed_path, path_hash, st, ids): pass - def add_chunk(self, id, meta, data, *, stats, wait=True, compress=True, size=None): + def add_chunk(self, id, meta, data, *, stats, wait=True, compress=True, size=None, ro_type=ROBJ_FILE_STREAM): + assert ro_type is not None if not self._txn_active: self.begin_txn() if size is None and compress: @@ -1123,7 +1140,7 @@ Chunk index: {0.total_unique_chunks:20d} unknown""" refcount = self.seen_chunk(id, size) if refcount: return self.chunk_incref(id, stats, size=size) - cdata = self.repo_objs.format(id, meta, data, compress=compress) + cdata = self.repo_objs.format(id, meta, data, compress=compress, ro_type=ro_type) self.repository.put(id, cdata, wait=wait) self.chunks.add(id, 1, size) stats.update(size, not refcount) diff --git a/src/borg/constants.py b/src/borg/constants.py index 54528b61..7f4cbc31 100644 --- a/src/borg/constants.py +++ b/src/borg/constants.py @@ -33,6 +33,14 @@ UMASK_DEFAULT = 0o077 # forcing to 0o100XXX later STDIN_MODE_DEFAULT = 0o660 +# RepoObj types +ROBJ_MANIFEST = "M" # Manifest (directory of archives, other metadata) object +ROBJ_ARCHIVE_META = "A" # main archive metadata object +ROBJ_ARCHIVE_CHUNKIDS = "C" # objects with a list of archive metadata stream chunkids +ROBJ_ARCHIVE_STREAM = "S" # archive metadata stream chunk (containing items) +ROBJ_FILE_STREAM = "F" # file content stream chunk (containing user data) +ROBJ_DONTCARE = "*" # used to parse without type assertion (= accept any type) + # in borg < 1.3, this has been defined like this: # 20 MiB minus 41 bytes for a PUT header (because the "size" field in the Repository includes # the header, and the total size was set to precisely 20 MiB for borg < 1.3). diff --git a/src/borg/fuse.py b/src/borg/fuse.py index 376020d9..92f14587 100644 --- a/src/borg/fuse.py +++ b/src/borg/fuse.py @@ -10,6 +10,7 @@ import time from collections import defaultdict from signal import SIGINT +from .constants import ROBJ_FILE_STREAM from .fuse_impl import llfuse, has_pyfuse3 @@ -688,7 +689,7 @@ class FuseOperations(llfuse.Operations, FuseBackend): # evict fully read chunk from cache del self.data_cache[id] else: - _, data = self.repo_objs.parse(id, self.repository_uncached.get(id)) + _, data = self.repo_objs.parse(id, self.repository_uncached.get(id), ro_type=ROBJ_FILE_STREAM) if offset + n < len(data): # chunk was only partially read, cache it self.data_cache[id] = data diff --git a/src/borg/helpers/misc.py b/src/borg/helpers/misc.py index d844b763..1f687a0b 100644 --- a/src/borg/helpers/misc.py +++ b/src/borg/helpers/misc.py @@ -13,6 +13,7 @@ logger = create_logger() from . import msgpack from .. import __version__ as borg_version +from ..constants import ROBJ_FILE_STREAM def sysinfo(): @@ -123,7 +124,7 @@ class ChunkIteratorFileWrapper: def open_item(archive, item): """Return file-like object for archived item (with chunks).""" - chunk_iterator = archive.pipeline.fetch_many([c.id for c in item.chunks]) + chunk_iterator = archive.pipeline.fetch_many([c.id for c in item.chunks], ro_type=ROBJ_FILE_STREAM) return ChunkIteratorFileWrapper(chunk_iterator) diff --git a/src/borg/helpers/parseformat.py b/src/borg/helpers/parseformat.py index b4a455a0..7f845604 100644 --- a/src/borg/helpers/parseformat.py +++ b/src/borg/helpers/parseformat.py @@ -933,7 +933,7 @@ class ItemFormatter(BaseFormatter): hash = self.xxh64() elif hash_function in self.hash_algorithms: hash = hashlib.new(hash_function) - for data in self.archive.pipeline.fetch_many([c.id for c in item.chunks]): + for data in self.archive.pipeline.fetch_many([c.id for c in item.chunks], ro_type=ROBJ_FILE_STREAM): hash.update(data) return hash.hexdigest() diff --git a/src/borg/manifest.py b/src/borg/manifest.py index a9609884..047a2a4f 100644 --- a/src/borg/manifest.py +++ b/src/borg/manifest.py @@ -250,7 +250,7 @@ class Manifest: if not key: key = key_factory(repository, cdata, ro_cls=ro_cls) manifest = cls(key, repository, ro_cls=ro_cls) - _, data = manifest.repo_objs.parse(cls.MANIFEST_ID, cdata) + _, data = manifest.repo_objs.parse(cls.MANIFEST_ID, cdata, ro_type=ROBJ_MANIFEST) manifest_dict = key.unpack_and_verify_manifest(data) m = ManifestItem(internal_dict=manifest_dict) manifest.id = manifest.repo_objs.id_hash(data) @@ -315,4 +315,4 @@ class Manifest: ) data = self.key.pack_and_authenticate_metadata(manifest.as_dict()) self.id = self.repo_objs.id_hash(data) - self.repository.put(self.MANIFEST_ID, self.repo_objs.format(self.MANIFEST_ID, {}, data)) + self.repository.put(self.MANIFEST_ID, self.repo_objs.format(self.MANIFEST_ID, {}, data, ro_type=ROBJ_MANIFEST)) diff --git a/src/borg/remote.py b/src/borg/remote.py index 40b8b62f..4a1551f0 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -1204,7 +1204,7 @@ def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None return csize, decrypted def transform(id_, data): - meta, decrypted = repo_objs.parse(id_, data) + meta, decrypted = repo_objs.parse(id_, data, ro_type=ROBJ_DONTCARE) csize = meta.get("csize", len(data)) return csize, decrypted diff --git a/src/borg/repoobj.py b/src/borg/repoobj.py index 557d4a60..8514db4d 100644 --- a/src/borg/repoobj.py +++ b/src/borg/repoobj.py @@ -1,5 +1,6 @@ from struct import Struct +from .constants import * # NOQA from .helpers import msgpack, workarounds from .compress import Compressor, LZ4_COMPRESSOR, get_compressor @@ -35,7 +36,11 @@ class RepoObj: size: int = None, ctype: int = None, clevel: int = None, + ro_type: str = None, ) -> bytes: + assert isinstance(ro_type, str) + assert ro_type != ROBJ_DONTCARE + meta["type"] = ro_type assert isinstance(id, bytes) assert isinstance(meta, dict) assert isinstance(data, (bytes, memoryview)) @@ -58,11 +63,12 @@ class RepoObj: hdr = self.meta_len_hdr.pack(len(meta_encrypted)) return hdr + meta_encrypted + data_encrypted - def parse_meta(self, id: bytes, cdata: bytes) -> dict: + def parse_meta(self, id: bytes, cdata: bytes, ro_type: str) -> dict: # when calling parse_meta, enough cdata needs to be supplied to contain completely the # meta_len_hdr and the encrypted, packed metadata. it is allowed to provide more cdata. assert isinstance(id, bytes) assert isinstance(cdata, bytes) + assert isinstance(ro_type, str) obj = memoryview(cdata) offs = self.meta_len_hdr.size hdr = obj[:offs] @@ -71,10 +77,11 @@ class RepoObj: meta_encrypted = obj[offs : offs + len_meta_encrypted] meta_packed = self.key.decrypt(id, meta_encrypted) meta = msgpack.unpackb(meta_packed) + assert ro_type == ROBJ_DONTCARE or meta["type"] == ro_type return meta def parse( - self, id: bytes, cdata: bytes, decompress: bool = True, want_compressed: bool = False + self, id: bytes, cdata: bytes, decompress: bool = True, want_compressed: bool = False, ro_type: str = None ) -> tuple[dict, bytes]: """ Parse a repo object into metadata and data (decrypt it, maybe decompress, maybe verify if the chunk plaintext @@ -86,6 +93,7 @@ class RepoObj: - decompress=False, want_compressed=True: quick, not verifying. returns compressed data (caller wants to reuse). - decompress=False, want_compressed=False: invalid """ + assert isinstance(ro_type, str) assert not (not decompress and not want_compressed), "invalid parameter combination!" assert isinstance(id, bytes) assert isinstance(cdata, bytes) @@ -98,6 +106,7 @@ class RepoObj: offs += len_meta_encrypted meta_packed = self.key.decrypt(id, meta_encrypted) meta_compressed = msgpack.unpackb(meta_packed) # means: before adding more metadata in decompress block + assert ro_type == ROBJ_DONTCARE or meta_compressed["type"] == ro_type data_encrypted = obj[offs:] data_compressed = self.key.decrypt(id, data_encrypted) # does not include the type/level bytes if decompress: @@ -142,10 +151,12 @@ class RepoObj1: # legacy size: int = None, ctype: int = None, clevel: int = None, + ro_type: str = None, ) -> bytes: assert isinstance(id, bytes) assert meta == {} assert isinstance(data, (bytes, memoryview)) + assert ro_type is not None assert compress or size is not None and ctype is not None and clevel is not None if compress: assert size is None or size == len(data) @@ -160,11 +171,12 @@ class RepoObj1: # legacy raise NotImplementedError("parse_meta is not available for RepoObj1") def parse( - self, id: bytes, cdata: bytes, decompress: bool = True, want_compressed: bool = False + self, id: bytes, cdata: bytes, decompress: bool = True, want_compressed: bool = False, ro_type: str = None ) -> tuple[dict, bytes]: assert not (not decompress and not want_compressed), "invalid parameter combination!" assert isinstance(id, bytes) assert isinstance(cdata, bytes) + assert ro_type is not None data_compressed = self.key.decrypt(id, cdata) compressor_cls, compression_level = Compressor.detect(data_compressed[:2]) compressor = compressor_cls(level=compression_level, legacy_mode=True) diff --git a/src/borg/testsuite/archive.py b/src/borg/testsuite/archive.py index cefd6293..361a8289 100644 --- a/src/borg/testsuite/archive.py +++ b/src/borg/testsuite/archive.py @@ -127,7 +127,8 @@ class MockCache: self.objects = {} self.repository = self.MockRepo() - def add_chunk(self, id, meta, data, stats=None, wait=True): + def add_chunk(self, id, meta, data, stats=None, wait=True, ro_type=None): + assert ro_type is not None self.objects[id] = data return id, len(data) diff --git a/src/borg/testsuite/archiver/check_cmd.py b/src/borg/testsuite/archiver/check_cmd.py index 5ddce343..01c313d5 100644 --- a/src/borg/testsuite/archiver/check_cmd.py +++ b/src/borg/testsuite/archiver/check_cmd.py @@ -243,7 +243,7 @@ def test_manifest_rebuild_duplicate_archive(archivers, request): } archive = repo_objs.key.pack_and_authenticate_metadata(archive_dict, context=b"archive") archive_id = repo_objs.id_hash(archive) - repository.put(archive_id, repo_objs.format(archive_id, {}, archive)) + repository.put(archive_id, repo_objs.format(archive_id, {}, archive, ro_type=ROBJ_ARCHIVE_META)) repository.commit(compact=False) cmd(archiver, "check", exit_code=1) cmd(archiver, "check", "--repair", exit_code=0) diff --git a/src/borg/testsuite/archiver/checks.py b/src/borg/testsuite/archiver/checks.py index f6311c8e..c3ed2720 100644 --- a/src/borg/testsuite/archiver/checks.py +++ b/src/borg/testsuite/archiver/checks.py @@ -337,6 +337,7 @@ def spoof_manifest(repository): "timestamp": (datetime.now(tz=timezone.utc) + timedelta(days=1)).isoformat(timespec="microseconds"), } ), + ro_type=ROBJ_MANIFEST, ) repository.put(Manifest.MANIFEST_ID, cdata) repository.commit(compact=False) @@ -357,6 +358,7 @@ def test_fresh_init_tam_required(archiver): "timestamp": (datetime.now(tz=timezone.utc) + timedelta(days=1)).isoformat(timespec="microseconds"), } ), + ro_type=ROBJ_MANIFEST, ) repository.put(Manifest.MANIFEST_ID, cdata) repository.commit(compact=False) @@ -397,7 +399,7 @@ def write_archive_without_tam(repository, archive_name): } ) archive_id = manifest.repo_objs.id_hash(archive_data) - cdata = manifest.repo_objs.format(archive_id, {}, archive_data) + cdata = manifest.repo_objs.format(archive_id, {}, archive_data, ro_type=ROBJ_ARCHIVE_META) repository.put(archive_id, cdata) manifest.archives[archive_name] = (archive_id, datetime.now()) manifest.write() diff --git a/src/borg/testsuite/archiver/rcompress_cmd.py b/src/borg/testsuite/archiver/rcompress_cmd.py index 3fa8a34b..dbae4483 100644 --- a/src/borg/testsuite/archiver/rcompress_cmd.py +++ b/src/borg/testsuite/archiver/rcompress_cmd.py @@ -22,7 +22,9 @@ def test_rcompress(archiver): break for id in ids: chunk = repository.get(id, read_data=True) - meta, data = manifest.repo_objs.parse(id, chunk) # will also decompress according to metadata + meta, data = manifest.repo_objs.parse( + id, chunk, ro_type=ROBJ_DONTCARE + ) # will also decompress according to metadata m_olevel = meta.get("olevel", -1) m_psize = meta.get("psize", -1) print( diff --git a/src/borg/testsuite/remote.py b/src/borg/testsuite/remote.py index b5cb2e64..a2d84bce 100644 --- a/src/borg/testsuite/remote.py +++ b/src/borg/testsuite/remote.py @@ -6,6 +6,7 @@ from unittest.mock import patch import pytest +from ..constants import ROBJ_FILE_STREAM from ..remote import SleepingBandwidthLimiter, RepositoryCache, cache_if_remote from ..repository import Repository from ..crypto.key import PlaintextKey @@ -205,7 +206,7 @@ class TestRepositoryCache: def _put_encrypted_object(self, repo_objs, repository, data): id_ = repo_objs.id_hash(data) - repository.put(id_, repo_objs.format(id_, {}, data)) + repository.put(id_, repo_objs.format(id_, {}, data, ro_type=ROBJ_FILE_STREAM)) return id_ @pytest.fixture diff --git a/src/borg/testsuite/repoobj.py b/src/borg/testsuite/repoobj.py index 5d11ad93..7f923f57 100644 --- a/src/borg/testsuite/repoobj.py +++ b/src/borg/testsuite/repoobj.py @@ -1,5 +1,6 @@ import pytest +from ..constants import ROBJ_FILE_STREAM, ROBJ_MANIFEST, ROBJ_ARCHIVE_META from ..crypto.key import PlaintextKey from ..repository import Repository from ..repoobj import RepoObj, RepoObj1 @@ -21,14 +22,14 @@ def test_format_parse_roundtrip(key): data = b"foobar" * 10 id = repo_objs.id_hash(data) meta = {"custom": "something"} # size and csize are computed automatically - cdata = repo_objs.format(id, meta, data) + cdata = repo_objs.format(id, meta, data, ro_type=ROBJ_FILE_STREAM) - got_meta = repo_objs.parse_meta(id, cdata) + got_meta = repo_objs.parse_meta(id, cdata, ro_type=ROBJ_FILE_STREAM) assert got_meta["size"] == len(data) assert got_meta["csize"] < len(data) assert got_meta["custom"] == "something" - got_meta, got_data = repo_objs.parse(id, cdata) + got_meta, got_data = repo_objs.parse(id, cdata, ro_type=ROBJ_FILE_STREAM) assert got_meta["size"] == len(data) assert got_meta["csize"] < len(data) assert got_meta["custom"] == "something" @@ -44,11 +45,11 @@ def test_format_parse_roundtrip_borg1(key): # legacy data = b"foobar" * 10 id = repo_objs.id_hash(data) meta = {} # borg1 does not support this kind of metadata - cdata = repo_objs.format(id, meta, data) + cdata = repo_objs.format(id, meta, data, ro_type=ROBJ_FILE_STREAM) # borg1 does not support separate metadata and borg2 does not invoke parse_meta for borg1 repos - got_meta, got_data = repo_objs.parse(id, cdata) + got_meta, got_data = repo_objs.parse(id, cdata, ro_type=ROBJ_FILE_STREAM) assert got_meta["size"] == len(data) assert got_meta["csize"] < len(data) assert data == got_data @@ -67,9 +68,9 @@ def test_borg1_borg2_transition(key): len_data = len(data) repo_objs1 = RepoObj1(key) id = repo_objs1.id_hash(data) - borg1_cdata = repo_objs1.format(id, meta, data) + borg1_cdata = repo_objs1.format(id, meta, data, ro_type=ROBJ_FILE_STREAM) meta1, compr_data1 = repo_objs1.parse( - id, borg1_cdata, decompress=True, want_compressed=True + id, borg1_cdata, decompress=True, want_compressed=True, ro_type=ROBJ_FILE_STREAM ) # avoid re-compression # in borg 1, we can only get this metadata after decrypting the whole chunk (and we do not have "size" here): assert meta1["ctype"] == LZ4.ID # default compression @@ -80,18 +81,49 @@ def test_borg1_borg2_transition(key): # note: as we did not decompress, we do not have "size" and we need to get it from somewhere else. # here, we just use len_data. for borg transfer, we also know the size from another metadata source. borg2_cdata = repo_objs2.format( - id, dict(meta1), compr_data1[2:], compress=False, size=len_data, ctype=meta1["ctype"], clevel=meta1["clevel"] + id, + dict(meta1), + compr_data1[2:], + compress=False, + size=len_data, + ctype=meta1["ctype"], + clevel=meta1["clevel"], + ro_type=ROBJ_FILE_STREAM, ) - meta2, data2 = repo_objs2.parse(id, borg2_cdata) + meta2, data2 = repo_objs2.parse(id, borg2_cdata, ro_type=ROBJ_FILE_STREAM) assert data2 == data assert meta2["ctype"] == LZ4.ID assert meta2["clevel"] == 0xFF assert meta2["csize"] == meta1["csize"] - 2 # borg2 does not store the type/level bytes there assert meta2["size"] == len_data - meta2 = repo_objs2.parse_meta(id, borg2_cdata) + meta2 = repo_objs2.parse_meta(id, borg2_cdata, ro_type=ROBJ_FILE_STREAM) # now, in borg 2, we have nice and separately decrypted metadata (no need to decrypt the whole chunk): assert meta2["ctype"] == LZ4.ID assert meta2["clevel"] == 0xFF assert meta2["csize"] == meta1["csize"] - 2 # borg2 does not store the type/level bytes there assert meta2["size"] == len_data + + +def test_spoof_manifest(key): + repo_objs = RepoObj(key) + data = b"fake or malicious manifest data" # file content could be provided by attacker. + id = repo_objs.id_hash(data) + # create a repo object containing user data (file content data). + cdata = repo_objs.format(id, {}, data, ro_type=ROBJ_FILE_STREAM) + # let's assume an attacker somehow managed to replace the manifest with that repo object. + # as borg always give the ro_type it wants to read, this should fail: + with pytest.raises(AssertionError): + repo_objs.parse(id, cdata, ro_type=ROBJ_MANIFEST) + + +def test_spoof_archive(key): + repo_objs = RepoObj(key) + data = b"fake or malicious archive data" # file content could be provided by attacker. + id = repo_objs.id_hash(data) + # create a repo object containing user data (file content data). + cdata = repo_objs.format(id, {}, data, ro_type=ROBJ_FILE_STREAM) + # let's assume an attacker somehow managed to replace an archive with that repo object. + # as borg always give the ro_type it wants to read, this should fail: + with pytest.raises(AssertionError): + repo_objs.parse(id, cdata, ro_type=ROBJ_ARCHIVE_META)