From d27b7a7981b509e01b3acf6abeb6de9c98afa9a6 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Wed, 28 Aug 2024 22:14:12 +0200 Subject: [PATCH] cache: remove transactions, load files/chunks cache on demand --- src/borg/archive.py | 1 - src/borg/archiver/key_cmds.py | 4 - src/borg/archiver/prune_cmd.py | 1 - src/borg/archiver/recreate_cmd.py | 1 - src/borg/archiver/rename_cmd.py | 1 - src/borg/cache.py | 125 +++++++++----------------- src/borg/testsuite/archiver/checks.py | 2 - src/borg/testsuite/cache.py | 9 -- 8 files changed, 40 insertions(+), 104 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index c44eb506f..84bab78ef 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -659,7 +659,6 @@ Duration: {0.duration} pass self.manifest.archives[name] = (self.id, metadata.time) self.manifest.write() - self.cache.commit() return metadata def calc_stats(self, cache, want_unique=True): diff --git a/src/borg/archiver/key_cmds.py b/src/borg/archiver/key_cmds.py index f2e5c12d9..6fbac5f09 100644 --- a/src/borg/archiver/key_cmds.py +++ b/src/borg/archiver/key_cmds.py @@ -74,11 +74,7 @@ class KeysMixIn: manifest.repo_objs.key = key_new manifest.write() - # we need to rewrite cache config and security key-type info, - # so that the cached key-type will match the repo key-type. - cache.begin_txn() # need to start a cache transaction, otherwise commit() does nothing. cache.key = key_new - cache.commit() loc = key_new.find_key() if hasattr(key_new, "find_key") else None if args.keep: diff --git a/src/borg/archiver/prune_cmd.py b/src/borg/archiver/prune_cmd.py index 6c7ebf3ff..a33901770 100644 --- a/src/borg/archiver/prune_cmd.py +++ b/src/borg/archiver/prune_cmd.py @@ -143,7 +143,6 @@ class PruneMixIn: raise Error("Got Ctrl-C / SIGINT.") elif uncommitted_deletes > 0: manifest.write() - cache.commit() def build_parser_prune(self, subparsers, common_parser, mid_common_parser): from ._common import process_epilog diff --git a/src/borg/archiver/recreate_cmd.py b/src/borg/archiver/recreate_cmd.py index 90260efab..7d30b41d3 100644 --- a/src/borg/archiver/recreate_cmd.py +++ b/src/borg/archiver/recreate_cmd.py @@ -49,7 +49,6 @@ class RecreateMixIn: logger.info("Skipped archive %s: Nothing to do. Archive was not processed.", name) if not args.dry_run: manifest.write() - cache.commit() def build_parser_recreate(self, subparsers, common_parser, mid_common_parser): from ._common import process_epilog diff --git a/src/borg/archiver/rename_cmd.py b/src/borg/archiver/rename_cmd.py index 39b36571a..cb8660c48 100644 --- a/src/borg/archiver/rename_cmd.py +++ b/src/borg/archiver/rename_cmd.py @@ -17,7 +17,6 @@ class RenameMixIn: """Rename an existing archive""" archive.rename(args.newname) manifest.write() - cache.commit() def build_parser_rename(self, subparsers, common_parser, mid_common_parser): from ._common import process_epilog diff --git a/src/borg/cache.py b/src/borg/cache.py index 0f452cb9c..7c126c3cc 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -391,9 +391,15 @@ class FilesCacheMixin: def __init__(self, cache_mode): self.cache_mode = cache_mode - self.files = None + self._files = None self._newest_cmtime = None + @property + def files(self): + if self._files is None: + self._files = self._read_files_cache() + return self._files + def files_cache_name(self): suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "") return self.FILES_CACHE_NAME + "." + suffix if suffix else self.FILES_CACHE_NAME @@ -412,7 +418,7 @@ class FilesCacheMixin: if "d" in self.cache_mode: # d(isabled) return - self.files = {} + files = {} logger.debug("Reading files cache ...") files_cache_logger.debug("FILES-CACHE-LOAD: starting...") msg = None @@ -432,7 +438,7 @@ class FilesCacheMixin: for path_hash, item in u: entry = FileCacheEntry(*item) # in the end, this takes about 240 Bytes per file - self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1)) + files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1)) except (TypeError, ValueError) as exc: msg = "The files cache seems invalid. [%s]" % str(exc) break @@ -443,18 +449,20 @@ class FilesCacheMixin: if msg is not None: logger.warning(msg) logger.warning("Continuing without files cache - expect lower performance.") - self.files = {} - files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files)) + files = {} + files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(files)) + return files - def _write_files_cache(self): + def _write_files_cache(self, files): if self._newest_cmtime is None: # was never set because no files were modified/added self._newest_cmtime = 2**63 - 1 # nanoseconds, good until y2262 ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20)) files_cache_logger.debug("FILES-CACHE-SAVE: starting...") + # TODO: use something like SaveFile here, but that didn't work due to SyncFile missing .seek(). with IntegrityCheckedFile(path=os.path.join(self.path, self.files_cache_name()), write=True) as fd: entry_count = 0 - for path_hash, item in self.files.items(): + for path_hash, item in files.items(): # Only keep files seen in this backup that are older than newest cmtime seen in this backup - # this is to avoid issues with filesystem snapshots and cmtime granularity. # Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet. @@ -562,18 +570,23 @@ class ChunksMixin: Chunks index related code for misc. Cache implementations. """ + def __init__(self): + self._chunks = None + + @property + def chunks(self): + if self._chunks is None: + self._chunks = self._load_chunks_from_repo() + return self._chunks + def chunk_incref(self, id, size, stats): assert isinstance(size, int) and size > 0 - if not self._txn_active: - self.begin_txn() count, _size = self.chunks.incref(id) stats.update(size, False) return ChunkListEntry(id, size) def chunk_decref(self, id, size, stats, wait=True): assert isinstance(size, int) and size > 0 - if not self._txn_active: - self.begin_txn() count, _size = self.chunks.decref(id) if count == 0: del self.chunks[id] @@ -583,8 +596,6 @@ class ChunksMixin: stats.update(-size, False) def seen_chunk(self, id, size=None): - if not self._txn_active: - self.begin_txn() entry = self.chunks.get(id, ChunkIndexEntry(0, None)) if entry.refcount and size is not None: assert isinstance(entry.size, int) @@ -609,8 +620,6 @@ class ChunksMixin: ro_type=ROBJ_FILE_STREAM, ): assert ro_type is not None - if not self._txn_active: - self.begin_txn() if size is None: if compress: size = len(data) # data is still uncompressed @@ -641,7 +650,7 @@ class ChunksMixin: break marker = result[-1][0] # All chunks from the repository have a refcount of MAX_VALUE, which is sticky, - # therefore we can't/won't delete them. Chunks we added ourselves in this transaction + # therefore we can't/won't delete them. Chunks we added ourselves in this borg run # are tracked correctly. init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0) # plaintext size for id, stored_size in result: @@ -684,13 +693,13 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin): :param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison """ FilesCacheMixin.__init__(self, cache_mode) + ChunksMixin.__init__(self) assert isinstance(manifest, Manifest) self.manifest = manifest self.repository = manifest.repository self.key = manifest.key self.repo_objs = manifest.repo_objs self.progress = progress - self._txn_active = False self.path = cache_dir(self.repository, path) self.security_manager = SecurityManager(self.repository) @@ -714,10 +723,12 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin): raise def __enter__(self): + self._chunks = None return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() + self._chunks = None def create(self): """Create a new empty cache at `self.path`""" @@ -727,69 +738,24 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin): self.cache_config.create() self._create_empty_files_cache(self.path) - def _do_open(self): - self.cache_config.load() - self.chunks = self._load_chunks_from_repo() - self._read_files_cache() - def open(self): if not os.path.isdir(self.path): raise Exception("%s Does not look like a Borg cache" % self.path) self.cache_config.open() - self.rollback() + self.cache_config.load() def close(self): - if self.cache_config is not None: - self.cache_config.close() - self.cache_config = None - - def begin_txn(self): - # Initialize transaction snapshot - pi = ProgressIndicatorMessage(msgid="cache.begin_transaction") - txn_dir = os.path.join(self.path, "txn.tmp") - os.mkdir(txn_dir) - pi.output("Initializing cache transaction: Reading config") - shutil.copy(os.path.join(self.path, "config"), txn_dir) - pi.output("Initializing cache transaction: Reading files") - try: - shutil.copy(os.path.join(self.path, self.files_cache_name()), txn_dir) - except FileNotFoundError: - self._create_empty_files_cache(txn_dir) - os.replace(txn_dir, os.path.join(self.path, "txn.active")) - pi.finish() - self._txn_active = True - - def commit(self): - if not self._txn_active: - return self.security_manager.save(self.manifest, self.key) - pi = ProgressIndicatorMessage(msgid="cache.commit") - if self.files is not None: + pi = ProgressIndicatorMessage(msgid="cache.close") + if self._files is not None: pi.output("Saving files cache") - integrity_data = self._write_files_cache() + integrity_data = self._write_files_cache(self._files) self.cache_config.integrity[self.files_cache_name()] = integrity_data pi.output("Saving cache config") self.cache_config.save(self.manifest) - os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp")) - shutil.rmtree(os.path.join(self.path, "txn.tmp")) - self._txn_active = False + self.cache_config.close() pi.finish() - - def rollback(self): - # Remove partial transaction - if os.path.exists(os.path.join(self.path, "txn.tmp")): - shutil.rmtree(os.path.join(self.path, "txn.tmp")) - # Roll back active transaction - txn_dir = os.path.join(self.path, "txn.active") - if os.path.exists(txn_dir): - shutil.copy(os.path.join(txn_dir, "config"), self.path) - shutil.copy(os.path.join(txn_dir, self.discover_files_cache_name(txn_dir)), self.path) - txn_tmp = os.path.join(self.path, "txn.tmp") - os.replace(txn_dir, txn_tmp) - if os.path.exists(txn_tmp): - shutil.rmtree(txn_tmp) - self._txn_active = False - self._do_open() + self.cache_config = None def check_cache_compatibility(self): my_features = Manifest.SUPPORTED_REPO_FEATURES @@ -805,7 +771,7 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin): def wipe_cache(self): logger.warning("Discarding incompatible cache and forcing a cache rebuild") - self.chunks = ChunkIndex() + self._chunks = ChunkIndex() self._create_empty_files_cache(self.path) self.cache_config.manifest_id = "" self.cache_config._config.set("cache", "manifest", "") @@ -835,12 +801,12 @@ class AdHocCache(ChunksMixin): """ def __init__(self, manifest, warn_if_unencrypted=True, lock_wait=None, iec=False): + ChunksMixin.__init__(self) assert isinstance(manifest, Manifest) self.manifest = manifest self.repository = manifest.repository self.key = manifest.key self.repo_objs = manifest.repo_objs - self._txn_active = False self.security_manager = SecurityManager(self.repository) self.security_manager.assert_secure(manifest, self.key, lock_wait=lock_wait) @@ -848,10 +814,13 @@ class AdHocCache(ChunksMixin): # Public API def __enter__(self): + self._chunks = None return self def __exit__(self, exc_type, exc_val, exc_tb): - pass + if exc_type is None: + self.security_manager.save(self.manifest, self.key) + self._chunks = None files = None # type: ignore cache_mode = "d" @@ -862,17 +831,3 @@ class AdHocCache(ChunksMixin): def memorize_file(self, hashed_path, path_hash, st, chunks): pass - - def commit(self): - if not self._txn_active: - return - self.security_manager.save(self.manifest, self.key) - self._txn_active = False - - def rollback(self): - self._txn_active = False - del self.chunks - - def begin_txn(self): - self._txn_active = True - self.chunks = self._load_chunks_from_repo() diff --git a/src/borg/testsuite/archiver/checks.py b/src/borg/testsuite/archiver/checks.py index efea8e1fb..e6c407e8d 100644 --- a/src/borg/testsuite/archiver/checks.py +++ b/src/borg/testsuite/archiver/checks.py @@ -264,9 +264,7 @@ def test_unknown_mandatory_feature_in_cache(archivers, request): repository._location = Location(archiver.repository_location) manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) with Cache(repository, manifest) as cache: - cache.begin_txn() cache.cache_config.mandatory_features = {"unknown-feature"} - cache.commit() if archiver.FORK_DEFAULT: cmd(archiver, "create", "test", "input") diff --git a/src/borg/testsuite/cache.py b/src/borg/testsuite/cache.py index beca83505..2d86bede0 100644 --- a/src/borg/testsuite/cache.py +++ b/src/borg/testsuite/cache.py @@ -59,15 +59,6 @@ class TestAdHocCache: assert cache.cache_mode == "d" assert cache.files is None - def test_txn(self, cache): - assert not cache._txn_active - cache.seen_chunk(H(5)) - assert cache._txn_active - assert cache.chunks - cache.rollback() - assert not cache._txn_active - assert not hasattr(cache, "chunks") - def test_incref_after_add_chunk(self, cache): assert cache.add_chunk(H(3), {}, b"5678", stats=Statistics()) == (H(3), 4) assert cache.chunk_incref(H(3), 4, Statistics()) == (H(3), 4)