From d4310dd4cf898bf506290e2606166974826eb51a Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 8 Nov 2024 01:49:01 +0100 Subject: [PATCH 1/5] chunk index cache: use cache/chunks., see #8503 - doesn't need a separate file for the hash - we can later write multiple partial chunkindexes to the cache also: add upgrade code that renames the cache from previous borg versions. --- src/borg/archive.py | 17 ++--- src/borg/archiver/compact_cmd.py | 2 +- src/borg/cache.py | 110 +++++++++++++++++++++---------- src/borg/repository.py | 2 +- 4 files changed, 81 insertions(+), 50 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 0887741c5..5ddf9790d 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -22,7 +22,7 @@ from . import xattr from .chunker import get_chunker, Chunk -from .cache import ChunkListEntry, build_chunkindex_from_repo +from .cache import ChunkListEntry, build_chunkindex_from_repo, delete_chunkindex_cache from .crypto.key import key_factory, UnsupportedPayloadError from .compress import CompressionSpec from .constants import * # NOQA @@ -50,7 +50,7 @@ from .item import Item, ArchiveItem, ItemDiff from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname from .remote import RemoteRepository, cache_if_remote -from .repository import Repository, NoManifestError, StoreObjectNotFound +from .repository import Repository, NoManifestError from .repoobj import RepoObj has_link = hasattr(os, "link") @@ -2140,18 +2140,9 @@ def valid_item(obj): def finish(self): if self.repair: + # we may have deleted chunks, remove the chunks index cache! logger.info("Deleting chunks cache in repository - next repository access will cause a rebuild.") - # we may have deleted chunks, invalidate/remove the chunks index cache! - try: - self.repository.store_delete("cache/chunks_hash") - except (Repository.ObjectNotFound, StoreObjectNotFound): - # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF - pass - try: - self.repository.store_delete("cache/chunks") - except (Repository.ObjectNotFound, StoreObjectNotFound): - # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF - pass + delete_chunkindex_cache(self.repository) logger.info("Writing Manifest.") self.manifest.write() diff --git a/src/borg/archiver/compact_cmd.py b/src/borg/archiver/compact_cmd.py index 2ead37488..0e6a6ba07 100644 --- a/src/borg/archiver/compact_cmd.py +++ b/src/borg/archiver/compact_cmd.py @@ -65,7 +65,7 @@ def save_chunk_index(self): # as we put the wrong size in there, we need to clean up the size: self.chunks[id] = entry._replace(size=0) # now self.chunks is an uptodate ChunkIndex, usable for general borg usage! - write_chunkindex_to_repo_cache(self.repository, self.chunks, clear=True, force_write=True) + write_chunkindex_to_repo_cache(self.repository, self.chunks, clear=True, force_write=True, delete_other=True) self.chunks = None # nothing there (cleared!) def analyze_archives(self) -> Tuple[Set, Set, int, int, int]: diff --git a/src/borg/cache.py b/src/borg/cache.py index d807472a0..0953460f2 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -13,6 +13,8 @@ files_cache_logger = create_logger("borg.debug.files_cache") +from borgstore.store import ItemInfo + from .constants import CACHE_README, FILES_CACHE_MODE_DISABLED, ROBJ_FILE_STREAM, TIME_DIFFERS2_NS from .checksums import xxh64 from .hashindex import ChunkIndex, ChunkIndexEntry @@ -663,63 +665,101 @@ def memorize_file(self, hashed_path, path_hash, st, chunks): ) -def load_chunks_hash(repository) -> bytes: +def try_upgrade_to_b14(repository): + # TODO: remove this before 2.0.0 release try: hash = repository.store_load("cache/chunks_hash") - logger.debug(f"cache/chunks_hash is '{bin_to_hex(hash)}'.") except (Repository.ObjectNotFound, StoreObjectNotFound): # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF - hash = b"" - logger.debug("cache/chunks_hash missing!") - return hash + pass # likely already upgraded + else: + old_name = "cache/chunks" + new_name = f"cache/chunks.{bin_to_hex(hash)}" + logger.debug(f"renaming {old_name} to {new_name}.") + repository.store_move(old_name, new_name) + repository.store_delete("cache/chunks_hash") + + +def list_chunkindex_hashes(repository): + hashes = set() + for info in repository.store_list("cache"): + info = ItemInfo(*info) # RPC does not give namedtuple + if info.name.startswith("chunks."): + hash = info.name.removeprefix("chunks.") + hashes.add(hash) + logger.debug(f"cached chunk indexes: {hashes}") + return hashes + + +def delete_chunkindex_cache(repository): + hashes = list_chunkindex_hashes(repository) + for hash in hashes: + cache_name = f"cache/chunks.{hash}" + try: + repository.store_delete(cache_name) + except (Repository.ObjectNotFound, StoreObjectNotFound): + # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF + pass + logger.debug(f"cached chunk indexes deleted: {hashes}") CHUNKINDEX_HASH_SEED = 2 -def write_chunkindex_to_repo_cache(repository, chunks, *, clear=False, force_write=False): - cached_hash = load_chunks_hash(repository) +def write_chunkindex_to_repo_cache(repository, chunks, *, clear=False, force_write=False, delete_other=False): + cached_hashes = list_chunkindex_hashes(repository) with io.BytesIO() as f: chunks.write(f) data = f.getvalue() if clear: # if we don't need the in-memory chunks index anymore: chunks.clear() # free memory, immediately - new_hash = xxh64(data, seed=CHUNKINDEX_HASH_SEED) - if force_write or new_hash != cached_hash: - # when an updated chunks index is stored into the cache, we also store its hash into the cache. + new_hash = bin_to_hex(xxh64(data, seed=CHUNKINDEX_HASH_SEED)) + if force_write or new_hash not in cached_hashes: + # when an updated chunks index is stored into the cache, we also store its hash as part of the name. # when a client is loading the chunks index from a cache, it has to compare its xxh64 - # hash against cache/chunks_hash in the repository. if it is the same, the cache - # is valid. If it is different, the cache is either corrupted or out of date and - # has to be discarded. - # when some functionality is DELETING chunks from the repository, it has to either update - # both cache/chunks and cache/chunks_hash (like borg compact does) or it has to delete both, + # hash against the hash in its name. if it is the same, the cache is valid. + # if it is different, the cache is either corrupted or out of date and has to be discarded. + # when some functionality is DELETING chunks from the repository, it has to delete + # all existing cache/chunks.* and maybe write a new, valid cache/chunks., # so that all clients will discard any client-local chunks index caches. - logger.debug(f"caching chunks index {bin_to_hex(new_hash)} in repository...") - repository.store_store("cache/chunks", data) - repository.store_store("cache/chunks_hash", new_hash) + cache_name = f"cache/chunks.{new_hash}" + logger.debug(f"caching chunks index as {cache_name} in repository...") + repository.store_store(cache_name, data) + if delete_other: + for hash in cached_hashes: + cache_name = f"cache/chunks.{hash}" + try: + repository.store_delete(cache_name) + except (Repository.ObjectNotFound, StoreObjectNotFound): + # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF + pass + logger.debug(f"cached chunk indexes deleted: {cached_hashes}") return new_hash def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=False): - chunks = None + try_upgrade_to_b14(repository) # first, try to load a pre-computed and centrally cached chunks index: if not disable_caches: - wanted_hash = load_chunks_hash(repository) - logger.debug(f"trying to get cached chunk index (id {bin_to_hex(wanted_hash)}) from the repo...") - try: - chunks_data = repository.store_load("cache/chunks") - except (Repository.ObjectNotFound, StoreObjectNotFound): - # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF - logger.debug("cache/chunks not found in the repository.") - else: - if xxh64(chunks_data, seed=CHUNKINDEX_HASH_SEED) == wanted_hash: - logger.debug("cache/chunks is valid.") - with io.BytesIO(chunks_data) as f: - chunks = ChunkIndex.read(f) - return chunks + hashes = list_chunkindex_hashes(repository) + assert len(hashes) <= 1, f"chunk indexes: {hashes}" # later we change to multiple chunkindexes... + for hash in hashes: + cache_name = f"cache/chunks.{hash}" + logger.debug(f"trying to load {cache_name} from the repo...") + try: + chunks_data = repository.store_load(cache_name) + except (Repository.ObjectNotFound, StoreObjectNotFound): + # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF + logger.debug(f"{cache_name} not found in the repository.") else: - logger.debug("cache/chunks is invalid.") + if xxh64(chunks_data, seed=CHUNKINDEX_HASH_SEED) == hex_to_bin(hash): + logger.debug(f"{cache_name} is valid.") + with io.BytesIO(chunks_data) as f: + chunks = ChunkIndex.read(f) + return chunks + else: + logger.debug(f"{cache_name} is invalid.") # if we didn't get anything from the cache, compute the ChunkIndex the slow way: logger.debug("querying the chunk IDs list from the repo...") chunks = ChunkIndex() @@ -741,7 +781,7 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi logger.debug(f"queried {num_chunks} chunk IDs in {duration} s, ~{speed}/s") if cache_immediately: # immediately update cache/chunks, so we only rarely have to do it the slow way: - write_chunkindex_to_repo_cache(repository, chunks, clear=False, force_write=True) + write_chunkindex_to_repo_cache(repository, chunks, clear=False, force_write=True, delete_other=True) return chunks @@ -817,7 +857,7 @@ def add_chunk( def _write_chunks_cache(self, chunks): # this is called from .close, so we can clear here: - write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=True) + write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=True, delete_other=True) self._chunks = None # nothing there (cleared!) def refresh_lock(self, now): diff --git a/src/borg/repository.py b/src/borg/repository.py index 5f7ac27e6..395e31296 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -385,7 +385,7 @@ def check_object(obj): # if we did a full pass in one go, we built a complete, uptodate ChunkIndex, cache it! from .cache import write_chunkindex_to_repo_cache - write_chunkindex_to_repo_cache(self, chunks, clear=True, force_write=True) + write_chunkindex_to_repo_cache(self, chunks, clear=True, force_write=True, delete_other=True) except StoreObjectNotFound: # it can be that there is no "data/" at all, then it crashes when iterating infos. pass From 4a6fcc26d7115d76b18bdba0f08d337bd442caa8 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 8 Nov 2024 19:43:44 +0100 Subject: [PATCH 2/5] use stable chunkindex list order --- src/borg/cache.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index 0953460f2..696db4d79 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -681,12 +681,13 @@ def try_upgrade_to_b14(repository): def list_chunkindex_hashes(repository): - hashes = set() + hashes = [] for info in repository.store_list("cache"): info = ItemInfo(*info) # RPC does not give namedtuple if info.name.startswith("chunks."): hash = info.name.removeprefix("chunks.") - hashes.add(hash) + hashes.append(hash) + hashes = sorted(hashes) logger.debug(f"cached chunk indexes: {hashes}") return hashes From 00f8cdc9a7d2d326ffd50011bd7f99daf98eb274 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 8 Nov 2024 20:24:33 +0100 Subject: [PATCH 3/5] when building the chunk index, merge all we have in the cache --- src/borg/cache.py | 55 +++++++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index 696db4d79..977268358 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -739,28 +739,45 @@ def write_chunkindex_to_repo_cache(repository, chunks, *, clear=False, force_wri return new_hash +def read_chunkindex_from_repo_cache(repository, hash): + cache_name = f"cache/chunks.{hash}" + logger.debug(f"trying to load {cache_name} from the repo...") + try: + chunks_data = repository.store_load(cache_name) + except (Repository.ObjectNotFound, StoreObjectNotFound): + # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF + logger.debug(f"{cache_name} not found in the repository.") + else: + if xxh64(chunks_data, seed=CHUNKINDEX_HASH_SEED) == hex_to_bin(hash): + logger.debug(f"{cache_name} is valid.") + with io.BytesIO(chunks_data) as f: + chunks = ChunkIndex.read(f) + return chunks + else: + logger.debug(f"{cache_name} is invalid.") + + def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=False): try_upgrade_to_b14(repository) - # first, try to load a pre-computed and centrally cached chunks index: + # first, try to build a fresh, mostly complete chunk index from centrally cached chunk indexes: if not disable_caches: hashes = list_chunkindex_hashes(repository) - assert len(hashes) <= 1, f"chunk indexes: {hashes}" # later we change to multiple chunkindexes... - for hash in hashes: - cache_name = f"cache/chunks.{hash}" - logger.debug(f"trying to load {cache_name} from the repo...") - try: - chunks_data = repository.store_load(cache_name) - except (Repository.ObjectNotFound, StoreObjectNotFound): - # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF - logger.debug(f"{cache_name} not found in the repository.") - else: - if xxh64(chunks_data, seed=CHUNKINDEX_HASH_SEED) == hex_to_bin(hash): - logger.debug(f"{cache_name} is valid.") - with io.BytesIO(chunks_data) as f: - chunks = ChunkIndex.read(f) - return chunks - else: - logger.debug(f"{cache_name} is invalid.") + if hashes: # we have at least one cached chunk index! + merged = 0 + chunks = ChunkIndex() # we'll merge all we find into this + for hash in hashes: + chunks_to_merge = read_chunkindex_from_repo_cache(repository, hash) + if chunks_to_merge is not None: + logger.debug(f"cached chunk index {hash} gets merged...") + for k, v in chunks_to_merge.items(): + chunks[k] = v + merged += 1 + chunks_to_merge.clear() + if merged > 0: + if merged > 1 and cache_immediately: + # immediately update cache/chunks, so we don't have to merge these again: + write_chunkindex_to_repo_cache(repository, chunks, clear=False, force_write=True, delete_other=True) + return chunks # if we didn't get anything from the cache, compute the ChunkIndex the slow way: logger.debug("querying the chunk IDs list from the repo...") chunks = ChunkIndex() @@ -858,7 +875,7 @@ def add_chunk( def _write_chunks_cache(self, chunks): # this is called from .close, so we can clear here: - write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=True, delete_other=True) + write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=True, delete_other=False) self._chunks = None # nothing there (cleared!) def refresh_lock(self, now): From 43a27f294dc0d062485c1c463e2a964099dd2de9 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 8 Nov 2024 21:23:43 +0100 Subject: [PATCH 4/5] caching a merged chunk index: only delete what we merged --- src/borg/cache.py | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index 977268358..17c2b6c7a 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -707,7 +707,9 @@ def delete_chunkindex_cache(repository): CHUNKINDEX_HASH_SEED = 2 -def write_chunkindex_to_repo_cache(repository, chunks, *, clear=False, force_write=False, delete_other=False): +def write_chunkindex_to_repo_cache( + repository, chunks, *, clear=False, force_write=False, delete_other=False, delete_these=None +): cached_hashes = list_chunkindex_hashes(repository) with io.BytesIO() as f: chunks.write(f) @@ -728,14 +730,20 @@ def write_chunkindex_to_repo_cache(repository, chunks, *, clear=False, force_wri logger.debug(f"caching chunks index as {cache_name} in repository...") repository.store_store(cache_name, data) if delete_other: - for hash in cached_hashes: - cache_name = f"cache/chunks.{hash}" - try: - repository.store_delete(cache_name) - except (Repository.ObjectNotFound, StoreObjectNotFound): - # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF - pass - logger.debug(f"cached chunk indexes deleted: {cached_hashes}") + delete_these = cached_hashes + elif delete_these: + pass + else: + delete_these = [] + for hash in delete_these: + cache_name = f"cache/chunks.{hash}" + try: + repository.store_delete(cache_name) + except (Repository.ObjectNotFound, StoreObjectNotFound): + # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF + pass + if delete_these: + logger.debug(f"cached chunk indexes deleted: {delete_these}") return new_hash @@ -776,7 +784,9 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi if merged > 0: if merged > 1 and cache_immediately: # immediately update cache/chunks, so we don't have to merge these again: - write_chunkindex_to_repo_cache(repository, chunks, clear=False, force_write=True, delete_other=True) + write_chunkindex_to_repo_cache( + repository, chunks, clear=False, force_write=True, delete_these=hashes + ) return chunks # if we didn't get anything from the cache, compute the ChunkIndex the slow way: logger.debug("querying the chunk IDs list from the repo...") @@ -875,7 +885,7 @@ def add_chunk( def _write_chunks_cache(self, chunks): # this is called from .close, so we can clear here: - write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=True, delete_other=False) + write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=True) self._chunks = None # nothing there (cleared!) def refresh_lock(self, now): From 56493fc62b31d57ba0cae42e98fcad6adb32bfc0 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 8 Nov 2024 22:10:39 +0100 Subject: [PATCH 5/5] chunk index cache: fix "upgrade" code nice try, but due to other changes after b13, the cache is invalid anyway. --- src/borg/cache.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index 17c2b6c7a..efd3282d9 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -667,17 +667,13 @@ def memorize_file(self, hashed_path, path_hash, st, chunks): def try_upgrade_to_b14(repository): # TODO: remove this before 2.0.0 release - try: - hash = repository.store_load("cache/chunks_hash") - except (Repository.ObjectNotFound, StoreObjectNotFound): - # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF - pass # likely already upgraded - else: - old_name = "cache/chunks" - new_name = f"cache/chunks.{bin_to_hex(hash)}" - logger.debug(f"renaming {old_name} to {new_name}.") - repository.store_move(old_name, new_name) - repository.store_delete("cache/chunks_hash") + # we just delete any present chunk index cache here, it is invalid due to the + # refcount -> flags change we did and due to the different CHUNKINDEX_HASH_SEED. + for name in "chunks_hash", "chunks": + try: + repository.store_delete(f"cache/{name}") + except (Repository.ObjectNotFound, StoreObjectNotFound): + pass # likely already upgraded def list_chunkindex_hashes(repository):