From d5d49e8a15e887fe252c8d4f619c1035d5358717 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Wed, 13 Nov 2024 01:16:55 +0100 Subject: [PATCH 1/2] ChunkIndex: enable partial index updates - ChunkIndex: implement system flags - ChunkIndex: F_NEW flag as 1st system flag for newly added chunks - incrementally write only NEW chunks to repo/cache/chunks.* - merge all chunks.* when loading the ChunkIndex from the repo Also: the cached ChunkIndex only has the chunk IDs. All values are just dummies. The ChunkIndexEntry value can be used to set flags and track size, but we intentionally do not persist flags and size to the cache. The size information gets set when borg loads the files cache and "compresses" the chunks lists in the files cache entries. After that, all chunks referenced by the files cache will have a valid size as long as the ChunkIndex is in memory. This is needed so that "uncompress" can work. --- src/borg/archiver/compact_cmd.py | 9 ++--- src/borg/cache.py | 39 ++++++++++++++------- src/borg/hashindex.pyi | 6 +++- src/borg/hashindex.pyx | 52 +++++++++++++++++++++++++--- src/borg/testsuite/hashindex_test.py | 21 ++++++++++- 5 files changed, 101 insertions(+), 26 deletions(-) diff --git a/src/borg/archiver/compact_cmd.py b/src/borg/archiver/compact_cmd.py index 0e6a6ba07..35b08275e 100644 --- a/src/borg/archiver/compact_cmd.py +++ b/src/borg/archiver/compact_cmd.py @@ -58,13 +58,8 @@ def get_repository_chunks(self) -> ChunkIndex: return chunks def save_chunk_index(self): - # first clean up: - for id, entry in self.chunks.iteritems(): - # we already deleted the unused chunks, so everything left must be used: - assert entry.flags & ChunkIndex.F_USED - # as we put the wrong size in there, we need to clean up the size: - self.chunks[id] = entry._replace(size=0) - # now self.chunks is an uptodate ChunkIndex, usable for general borg usage! + # write_chunkindex_to_repo now removes all flags and size infos. + # we need this, as we put the wrong size in there. write_chunkindex_to_repo_cache(self.repository, self.chunks, clear=True, force_write=True, delete_other=True) self.chunks = None # nothing there (cleared!) diff --git a/src/borg/cache.py b/src/borg/cache.py index efd3282d9..76d43a6fd 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -396,9 +396,7 @@ def compress_entry(self, entry): assert isinstance(entry, FileCacheEntry) compressed_chunks = [] for id, size in entry.chunks: - cie = self.chunks.get(id) - assert cie is not None - assert cie.flags & ChunkIndex.F_USED + cie = self.chunks[id] # may raise KeyError if chunk id is not in repo if cie.size == 0: # size is not known in the chunks index yet self.chunks[id] = cie._replace(size=size) else: @@ -418,9 +416,7 @@ def decompress_entry(self, entry_packed): for idx in entry.chunks: assert isinstance(idx, int), f"{idx} is not an int" id = self.chunks.idx_to_k(idx) - cie = self.chunks.get(id) - assert cie is not None - assert cie.flags & ChunkIndex.F_USED + cie = self.chunks[id] assert cie.size > 0 chunks.append((id, cie.size)) entry = entry._replace(chunks=chunks) @@ -485,6 +481,7 @@ def _build_files_cache(self): mtime=int_to_timestamp(mtime_ns), chunks=item.chunks, ) + # note: if the repo is an a valid state, next line should not fail with KeyError: files[path_hash] = self.compress_entry(entry) # deal with special snapshot / timestamp granularity case, see FAQ: for path_hash in self._newest_path_hashes: @@ -529,7 +526,11 @@ def _read_files_cache(self): for path_hash, entry in u: entry = FileCacheEntry(*entry) entry = entry._replace(age=entry.age + 1) - files[path_hash] = self.compress_entry(entry) + try: + files[path_hash] = self.compress_entry(entry) + except KeyError: + # repo is missing a chunk referenced from entry + logger.debug(f"compress_entry failed for {entry}, skipping.") except (TypeError, ValueError) as exc: msg = "The files cache seems invalid. [%s]" % str(exc) break @@ -706,14 +707,23 @@ def delete_chunkindex_cache(repository): def write_chunkindex_to_repo_cache( repository, chunks, *, clear=False, force_write=False, delete_other=False, delete_these=None ): - cached_hashes = list_chunkindex_hashes(repository) + # the borghash code has no means to only serialize the F_NEW table entries, + # thus we copy only the new entries to a temporary table: + new_chunks = ChunkIndex() + # for now, we don't want to serialize the flags or the size, just the keys (chunk IDs): + cleaned_value = ChunkIndexEntry(flags=ChunkIndex.F_NONE, size=0) + for key, _ in chunks.iteritems(only_new=True): + new_chunks[key] = cleaned_value with io.BytesIO() as f: - chunks.write(f) + new_chunks.write(f) data = f.getvalue() + logger.debug(f"caching {len(new_chunks)} new chunks.") + new_chunks.clear() # free memory of the temporary table if clear: # if we don't need the in-memory chunks index anymore: chunks.clear() # free memory, immediately new_hash = bin_to_hex(xxh64(data, seed=CHUNKINDEX_HASH_SEED)) + cached_hashes = list_chunkindex_hashes(repository) if force_write or new_hash not in cached_hashes: # when an updated chunks index is stored into the cache, we also store its hash as part of the name. # when a client is loading the chunks index from a cache, it has to compare its xxh64 @@ -725,12 +735,15 @@ def write_chunkindex_to_repo_cache( cache_name = f"cache/chunks.{new_hash}" logger.debug(f"caching chunks index as {cache_name} in repository...") repository.store_store(cache_name, data) + # we have successfully stored to the repository, so we can clear all F_NEW flags now: + chunks.clear_new() + # delete some not needed cached chunk indexes, but never the one we just wrote: if delete_other: - delete_these = cached_hashes + delete_these = set(cached_hashes) - {new_hash} elif delete_these: - pass + delete_these = set(delete_these) - {new_hash} else: - delete_these = [] + delete_these = set() for hash in delete_these: cache_name = f"cache/chunks.{hash}" try: @@ -783,6 +796,8 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi write_chunkindex_to_repo_cache( repository, chunks, clear=False, force_write=True, delete_these=hashes ) + else: + chunks.clear_new() return chunks # if we didn't get anything from the cache, compute the ChunkIndex the slow way: logger.debug("querying the chunk IDs list from the repo...") diff --git a/src/borg/hashindex.pyi b/src/borg/hashindex.pyi index 6b236afa7..994e54b5a 100644 --- a/src/borg/hashindex.pyi +++ b/src/borg/hashindex.pyi @@ -13,8 +13,12 @@ CIE = Union[Tuple[int, int], Type[ChunkIndexEntry]] class ChunkIndex: F_NONE: int F_USED: int + F_NEW: int + M_USER: int + M_SYSTEM: int def add(self, key: bytes, size: int) -> None: ... - def iteritems(self, marker: bytes = ...) -> Iterator: ... + def iteritems(self, *, only_new: bool = ...) -> Iterator: ... + def clear_new(self) -> None: ... def __contains__(self, key: bytes) -> bool: ... def __getitem__(self, key: bytes) -> Type[ChunkIndexEntry]: ... def __setitem__(self, key: bytes, value: CIE) -> None: ... diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index 00fe68404..beca0540b 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -39,11 +39,16 @@ ChunkIndexEntry = namedtuple('ChunkIndexEntry', 'flags size') class ChunkIndex(HTProxyMixin, MutableMapping): """ - Mapping from key256 to (refcount32, size32) to track chunks in the repository. + Mapping from key256 to (flags32, size32) to track chunks in the repository. """ - # .flags values: 2^0 .. 2^31 + # .flags related values: F_NONE = 0 # all flags cleared - F_USED = 1 # chunk is used/referenced + M_USER = 0x00ffffff # mask for user flags + M_SYSTEM = 0xff000000 # mask for system flags + # user flags: + F_USED = 2 ** 0 # chunk is used/referenced + # system flags (internal use, always 0 to user, not changeable by user): + F_NEW = 2 ** 24 # a new chunk that is not present in repo/cache/chunks.* yet. def __init__(self, capacity=1000, path=None, usable=None): if path: @@ -53,8 +58,15 @@ class ChunkIndex(HTProxyMixin, MutableMapping): capacity = usable * 2 # load factor 0.5 self.ht = HashTableNT(key_size=32, value_format=" Date: Thu, 14 Nov 2024 21:44:56 +0100 Subject: [PATCH 2/2] update ChunkIndex cache in repo every 10mins --- src/borg/cache.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index 76d43a6fd..eef00f1a0 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -833,6 +833,8 @@ def __init__(self): self._chunks = None self.last_refresh_dt = datetime.now(timezone.utc) self.refresh_td = timedelta(seconds=60) + self.chunks_cache_last_write = datetime.now(timezone.utc) + self.chunks_cache_write_td = timedelta(seconds=600) @property def chunks(self): @@ -879,6 +881,7 @@ def add_chunk( else: raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also") now = datetime.now(timezone.utc) + self._maybe_write_chunks_cache(now) exists = self.seen_chunk(id, size) if exists: # if borg create is processing lots of unchanged files (no content and not metadata changes), @@ -894,10 +897,10 @@ def add_chunk( stats.update(size, not exists) return ChunkListEntry(id, size) - def _write_chunks_cache(self, chunks): - # this is called from .close, so we can clear here: - write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=True) - self._chunks = None # nothing there (cleared!) + def _maybe_write_chunks_cache(self, now, force=False, clear=False): + if force or now > self.chunks_cache_last_write + self.chunks_cache_write_td: + write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=clear) + self.chunks_cache_last_write = now def refresh_lock(self, now): if now > self.last_refresh_dt + self.refresh_td: @@ -995,7 +998,9 @@ def close(self): for key, value in sorted(self._chunks.stats.items()): logger.debug(f"Chunks index stats: {key}: {value}") pi.output("Saving chunks cache") - self._write_chunks_cache(self._chunks) # cache/chunks in repo has a different integrity mechanism + # note: cache/chunks.* in repo has a different integrity mechanism + self._maybe_write_chunks_cache(self._chunks, force=True, clear=True) + self._chunks = None # nothing there (cleared!) pi.output("Saving cache config") self.cache_config.save(self.manifest) self.cache_config.close()