From e053307523f7d60b1b5507d5079a67191a38af05 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 1 Nov 2024 15:03:50 +0100 Subject: [PATCH] reduce memory consumption of files cache, fixes #5756 - refactor packing/unpacking of fc entries into separate functions - instead of a chunks list entry being a tuple of 256bit id [bytes] and 32bit size [int], only store a stable 32bit index into kv array of ChunkIndex (where we also have id and size [and refcount]). - only done in memory, the on-disk format has (id, size) tuples. memory consumption (N = entry.chunks list element count, X = overhead for rest of entry): - previously: - packed = packb(dict(..., chunks=[(id1, size1), (id2, size2), ...])) - packed size ~= X + N * (1 + (34 + 5)) Bytes - now: - packed = packb(dict(..., chunks=[ix1, ix2, ...])) - packed size ~= X + N * 5 Bytes --- src/borg/cache.py | 55 ++++++++++++++++++++++++++++++++++++------ src/borg/hashindex.pyx | 6 +++++ 2 files changed, 54 insertions(+), 7 deletions(-) diff --git a/src/borg/cache.py b/src/borg/cache.py index 93a187270..7c5990526 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -381,6 +381,46 @@ def __init__(self, cache_mode, archive_name=None, start_backup=None): self._newest_path_hashes = set() self.start_backup = start_backup + def compress_entry(self, entry): + """ + compress a files cache entry: + + - use the ChunkIndex to "compress" the entry's chunks list (256bit key + 32bit size -> 32bit index). + - use msgpack to pack the entry (reduce memory usage by packing and having less python objects). + + Note: the result is only valid while the ChunkIndex is in memory! + """ + assert isinstance(self.chunks, ChunkIndex), f"{self.chunks} is not a ChunkIndex" + assert isinstance(entry, FileCacheEntry) + compressed_chunks = [] + for id, size in entry.chunks: + cie = self.chunks.get(id) + assert cie is not None + assert cie.refcount > 0 + assert size == cie.size + idx = self.chunks.k_to_idx(id) + compressed_chunks.append(idx) + entry = entry._replace(chunks=compressed_chunks) + return msgpack.packb(entry) + + def decompress_entry(self, entry_packed): + """reverse operation of compress_entry""" + assert isinstance(self.chunks, ChunkIndex), f"{self.chunks} is not a ChunkIndex" + assert isinstance(entry_packed, bytes) + entry = msgpack.unpackb(entry_packed) + entry = FileCacheEntry(*entry) + chunks = [] + for idx in entry.chunks: + assert isinstance(idx, int), f"{idx} is not an int" + id = self.chunks.idx_to_k(idx) + cie = self.chunks.get(id) + assert cie is not None + assert cie.refcount > 0 + assert cie.size > 0 + chunks.append((id, cie.size)) + entry = entry._replace(chunks=chunks) + return entry + @property def files(self): if self._files is None: @@ -440,7 +480,7 @@ def _build_files_cache(self): mtime=int_to_timestamp(mtime_ns), chunks=item.chunks, ) - files[path_hash] = msgpack.packb(entry) # takes about 240 Bytes per file + files[path_hash] = self.compress_entry(entry) # deal with special snapshot / timestamp granularity case, see FAQ: for path_hash in self._newest_path_hashes: del files[path_hash] @@ -483,8 +523,8 @@ def _read_files_cache(self): try: for path_hash, item in u: entry = FileCacheEntry(*item) - # in the end, this takes about 240 Bytes per file - files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1)) + entry = entry._replace(age=entry.age + 1) + files[path_hash] = self.compress_entry(entry) except (TypeError, ValueError) as exc: msg = "The files cache seems invalid. [%s]" % str(exc) break @@ -514,7 +554,7 @@ def _write_files_cache(self, files): age_discarded = 0 race_discarded = 0 for path_hash, item in files.items(): - entry = FileCacheEntry(*msgpack.unpackb(item)) + entry = FileCacheEntry(*self.decompress_entry(item)) if entry.age == 0: # current entries if max(timestamp_to_int(entry.ctime), timestamp_to_int(entry.mtime)) < discard_after: # Only keep files seen in this backup that old enough not to suffer race conditions relating @@ -567,7 +607,7 @@ def file_known_and_unchanged(self, hashed_path, path_hash, st): files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path) return False, None # we know the file! - entry = FileCacheEntry(*msgpack.unpackb(entry)) + entry = FileCacheEntry(*self.decompress_entry(entry)) if "s" in cache_mode and entry.size != st.st_size: files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path) return True, None @@ -590,7 +630,8 @@ def file_known_and_unchanged(self, hashed_path, path_hash, st): # to avoid everything getting chunked again. to be able to re-enable the # V comparison in a future backup run (and avoid chunking everything again at # that time), we need to update V in the cache with what we see in the filesystem. - self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, ctime=ctime, mtime=mtime, age=0)) + entry = entry._replace(inode=st.st_ino, ctime=ctime, mtime=mtime, age=0) + self.files[path_hash] = self.compress_entry(entry) chunks = [ChunkListEntry(*chunk) for chunk in entry.chunks] # convert to list of namedtuple return True, chunks @@ -611,7 +652,7 @@ def memorize_file(self, hashed_path, path_hash, st, chunks): mtime=int_to_timestamp(mtime_ns), chunks=chunks, ) - self.files[path_hash] = msgpack.packb(entry) + self.files[path_hash] = self.compress_entry(entry) self._newest_cmtime = max(self._newest_cmtime or 0, ctime_ns) self._newest_cmtime = max(self._newest_cmtime or 0, mtime_ns) files_cache_logger.debug( diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index 978d923fd..c596928b1 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -73,6 +73,12 @@ class ChunkIndex(HTProxyMixin, MutableMapping): def stats(self): return self.ht.stats + def k_to_idx(self, key): + return self.ht.k_to_idx(key) + + def idx_to_k(self, idx): + return self.ht.idx_to_k(idx) + FuseVersionsIndexEntry = namedtuple('FuseVersionsIndexEntry', 'version hash')