mirror of
https://github.com/borgbackup/borg.git
synced 2025-03-10 14:15:43 +00:00
cache: remove transactions, load files/chunks cache on demand
This commit is contained in:
parent
57268909f8
commit
d27b7a7981
8 changed files with 40 additions and 104 deletions
|
@ -659,7 +659,6 @@ Duration: {0.duration}
|
|||
pass
|
||||
self.manifest.archives[name] = (self.id, metadata.time)
|
||||
self.manifest.write()
|
||||
self.cache.commit()
|
||||
return metadata
|
||||
|
||||
def calc_stats(self, cache, want_unique=True):
|
||||
|
|
|
@ -74,11 +74,7 @@ class KeysMixIn:
|
|||
manifest.repo_objs.key = key_new
|
||||
manifest.write()
|
||||
|
||||
# we need to rewrite cache config and security key-type info,
|
||||
# so that the cached key-type will match the repo key-type.
|
||||
cache.begin_txn() # need to start a cache transaction, otherwise commit() does nothing.
|
||||
cache.key = key_new
|
||||
cache.commit()
|
||||
|
||||
loc = key_new.find_key() if hasattr(key_new, "find_key") else None
|
||||
if args.keep:
|
||||
|
|
|
@ -143,7 +143,6 @@ class PruneMixIn:
|
|||
raise Error("Got Ctrl-C / SIGINT.")
|
||||
elif uncommitted_deletes > 0:
|
||||
manifest.write()
|
||||
cache.commit()
|
||||
|
||||
def build_parser_prune(self, subparsers, common_parser, mid_common_parser):
|
||||
from ._common import process_epilog
|
||||
|
|
|
@ -49,7 +49,6 @@ class RecreateMixIn:
|
|||
logger.info("Skipped archive %s: Nothing to do. Archive was not processed.", name)
|
||||
if not args.dry_run:
|
||||
manifest.write()
|
||||
cache.commit()
|
||||
|
||||
def build_parser_recreate(self, subparsers, common_parser, mid_common_parser):
|
||||
from ._common import process_epilog
|
||||
|
|
|
@ -17,7 +17,6 @@ class RenameMixIn:
|
|||
"""Rename an existing archive"""
|
||||
archive.rename(args.newname)
|
||||
manifest.write()
|
||||
cache.commit()
|
||||
|
||||
def build_parser_rename(self, subparsers, common_parser, mid_common_parser):
|
||||
from ._common import process_epilog
|
||||
|
|
|
@ -391,9 +391,15 @@ class FilesCacheMixin:
|
|||
|
||||
def __init__(self, cache_mode):
|
||||
self.cache_mode = cache_mode
|
||||
self.files = None
|
||||
self._files = None
|
||||
self._newest_cmtime = None
|
||||
|
||||
@property
|
||||
def files(self):
|
||||
if self._files is None:
|
||||
self._files = self._read_files_cache()
|
||||
return self._files
|
||||
|
||||
def files_cache_name(self):
|
||||
suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "")
|
||||
return self.FILES_CACHE_NAME + "." + suffix if suffix else self.FILES_CACHE_NAME
|
||||
|
@ -412,7 +418,7 @@ class FilesCacheMixin:
|
|||
if "d" in self.cache_mode: # d(isabled)
|
||||
return
|
||||
|
||||
self.files = {}
|
||||
files = {}
|
||||
logger.debug("Reading files cache ...")
|
||||
files_cache_logger.debug("FILES-CACHE-LOAD: starting...")
|
||||
msg = None
|
||||
|
@ -432,7 +438,7 @@ class FilesCacheMixin:
|
|||
for path_hash, item in u:
|
||||
entry = FileCacheEntry(*item)
|
||||
# in the end, this takes about 240 Bytes per file
|
||||
self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1))
|
||||
files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1))
|
||||
except (TypeError, ValueError) as exc:
|
||||
msg = "The files cache seems invalid. [%s]" % str(exc)
|
||||
break
|
||||
|
@ -443,18 +449,20 @@ class FilesCacheMixin:
|
|||
if msg is not None:
|
||||
logger.warning(msg)
|
||||
logger.warning("Continuing without files cache - expect lower performance.")
|
||||
self.files = {}
|
||||
files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files))
|
||||
files = {}
|
||||
files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(files))
|
||||
return files
|
||||
|
||||
def _write_files_cache(self):
|
||||
def _write_files_cache(self, files):
|
||||
if self._newest_cmtime is None:
|
||||
# was never set because no files were modified/added
|
||||
self._newest_cmtime = 2**63 - 1 # nanoseconds, good until y2262
|
||||
ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20))
|
||||
files_cache_logger.debug("FILES-CACHE-SAVE: starting...")
|
||||
# TODO: use something like SaveFile here, but that didn't work due to SyncFile missing .seek().
|
||||
with IntegrityCheckedFile(path=os.path.join(self.path, self.files_cache_name()), write=True) as fd:
|
||||
entry_count = 0
|
||||
for path_hash, item in self.files.items():
|
||||
for path_hash, item in files.items():
|
||||
# Only keep files seen in this backup that are older than newest cmtime seen in this backup -
|
||||
# this is to avoid issues with filesystem snapshots and cmtime granularity.
|
||||
# Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
|
||||
|
@ -562,18 +570,23 @@ class ChunksMixin:
|
|||
Chunks index related code for misc. Cache implementations.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._chunks = None
|
||||
|
||||
@property
|
||||
def chunks(self):
|
||||
if self._chunks is None:
|
||||
self._chunks = self._load_chunks_from_repo()
|
||||
return self._chunks
|
||||
|
||||
def chunk_incref(self, id, size, stats):
|
||||
assert isinstance(size, int) and size > 0
|
||||
if not self._txn_active:
|
||||
self.begin_txn()
|
||||
count, _size = self.chunks.incref(id)
|
||||
stats.update(size, False)
|
||||
return ChunkListEntry(id, size)
|
||||
|
||||
def chunk_decref(self, id, size, stats, wait=True):
|
||||
assert isinstance(size, int) and size > 0
|
||||
if not self._txn_active:
|
||||
self.begin_txn()
|
||||
count, _size = self.chunks.decref(id)
|
||||
if count == 0:
|
||||
del self.chunks[id]
|
||||
|
@ -583,8 +596,6 @@ class ChunksMixin:
|
|||
stats.update(-size, False)
|
||||
|
||||
def seen_chunk(self, id, size=None):
|
||||
if not self._txn_active:
|
||||
self.begin_txn()
|
||||
entry = self.chunks.get(id, ChunkIndexEntry(0, None))
|
||||
if entry.refcount and size is not None:
|
||||
assert isinstance(entry.size, int)
|
||||
|
@ -609,8 +620,6 @@ class ChunksMixin:
|
|||
ro_type=ROBJ_FILE_STREAM,
|
||||
):
|
||||
assert ro_type is not None
|
||||
if not self._txn_active:
|
||||
self.begin_txn()
|
||||
if size is None:
|
||||
if compress:
|
||||
size = len(data) # data is still uncompressed
|
||||
|
@ -641,7 +650,7 @@ class ChunksMixin:
|
|||
break
|
||||
marker = result[-1][0]
|
||||
# All chunks from the repository have a refcount of MAX_VALUE, which is sticky,
|
||||
# therefore we can't/won't delete them. Chunks we added ourselves in this transaction
|
||||
# therefore we can't/won't delete them. Chunks we added ourselves in this borg run
|
||||
# are tracked correctly.
|
||||
init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0) # plaintext size
|
||||
for id, stored_size in result:
|
||||
|
@ -684,13 +693,13 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin):
|
|||
:param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison
|
||||
"""
|
||||
FilesCacheMixin.__init__(self, cache_mode)
|
||||
ChunksMixin.__init__(self)
|
||||
assert isinstance(manifest, Manifest)
|
||||
self.manifest = manifest
|
||||
self.repository = manifest.repository
|
||||
self.key = manifest.key
|
||||
self.repo_objs = manifest.repo_objs
|
||||
self.progress = progress
|
||||
self._txn_active = False
|
||||
|
||||
self.path = cache_dir(self.repository, path)
|
||||
self.security_manager = SecurityManager(self.repository)
|
||||
|
@ -714,10 +723,12 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin):
|
|||
raise
|
||||
|
||||
def __enter__(self):
|
||||
self._chunks = None
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.close()
|
||||
self._chunks = None
|
||||
|
||||
def create(self):
|
||||
"""Create a new empty cache at `self.path`"""
|
||||
|
@ -727,69 +738,24 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin):
|
|||
self.cache_config.create()
|
||||
self._create_empty_files_cache(self.path)
|
||||
|
||||
def _do_open(self):
|
||||
self.cache_config.load()
|
||||
self.chunks = self._load_chunks_from_repo()
|
||||
self._read_files_cache()
|
||||
|
||||
def open(self):
|
||||
if not os.path.isdir(self.path):
|
||||
raise Exception("%s Does not look like a Borg cache" % self.path)
|
||||
self.cache_config.open()
|
||||
self.rollback()
|
||||
self.cache_config.load()
|
||||
|
||||
def close(self):
|
||||
if self.cache_config is not None:
|
||||
self.cache_config.close()
|
||||
self.cache_config = None
|
||||
|
||||
def begin_txn(self):
|
||||
# Initialize transaction snapshot
|
||||
pi = ProgressIndicatorMessage(msgid="cache.begin_transaction")
|
||||
txn_dir = os.path.join(self.path, "txn.tmp")
|
||||
os.mkdir(txn_dir)
|
||||
pi.output("Initializing cache transaction: Reading config")
|
||||
shutil.copy(os.path.join(self.path, "config"), txn_dir)
|
||||
pi.output("Initializing cache transaction: Reading files")
|
||||
try:
|
||||
shutil.copy(os.path.join(self.path, self.files_cache_name()), txn_dir)
|
||||
except FileNotFoundError:
|
||||
self._create_empty_files_cache(txn_dir)
|
||||
os.replace(txn_dir, os.path.join(self.path, "txn.active"))
|
||||
pi.finish()
|
||||
self._txn_active = True
|
||||
|
||||
def commit(self):
|
||||
if not self._txn_active:
|
||||
return
|
||||
self.security_manager.save(self.manifest, self.key)
|
||||
pi = ProgressIndicatorMessage(msgid="cache.commit")
|
||||
if self.files is not None:
|
||||
pi = ProgressIndicatorMessage(msgid="cache.close")
|
||||
if self._files is not None:
|
||||
pi.output("Saving files cache")
|
||||
integrity_data = self._write_files_cache()
|
||||
integrity_data = self._write_files_cache(self._files)
|
||||
self.cache_config.integrity[self.files_cache_name()] = integrity_data
|
||||
pi.output("Saving cache config")
|
||||
self.cache_config.save(self.manifest)
|
||||
os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp"))
|
||||
shutil.rmtree(os.path.join(self.path, "txn.tmp"))
|
||||
self._txn_active = False
|
||||
self.cache_config.close()
|
||||
pi.finish()
|
||||
|
||||
def rollback(self):
|
||||
# Remove partial transaction
|
||||
if os.path.exists(os.path.join(self.path, "txn.tmp")):
|
||||
shutil.rmtree(os.path.join(self.path, "txn.tmp"))
|
||||
# Roll back active transaction
|
||||
txn_dir = os.path.join(self.path, "txn.active")
|
||||
if os.path.exists(txn_dir):
|
||||
shutil.copy(os.path.join(txn_dir, "config"), self.path)
|
||||
shutil.copy(os.path.join(txn_dir, self.discover_files_cache_name(txn_dir)), self.path)
|
||||
txn_tmp = os.path.join(self.path, "txn.tmp")
|
||||
os.replace(txn_dir, txn_tmp)
|
||||
if os.path.exists(txn_tmp):
|
||||
shutil.rmtree(txn_tmp)
|
||||
self._txn_active = False
|
||||
self._do_open()
|
||||
self.cache_config = None
|
||||
|
||||
def check_cache_compatibility(self):
|
||||
my_features = Manifest.SUPPORTED_REPO_FEATURES
|
||||
|
@ -805,7 +771,7 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin):
|
|||
|
||||
def wipe_cache(self):
|
||||
logger.warning("Discarding incompatible cache and forcing a cache rebuild")
|
||||
self.chunks = ChunkIndex()
|
||||
self._chunks = ChunkIndex()
|
||||
self._create_empty_files_cache(self.path)
|
||||
self.cache_config.manifest_id = ""
|
||||
self.cache_config._config.set("cache", "manifest", "")
|
||||
|
@ -835,12 +801,12 @@ class AdHocCache(ChunksMixin):
|
|||
"""
|
||||
|
||||
def __init__(self, manifest, warn_if_unencrypted=True, lock_wait=None, iec=False):
|
||||
ChunksMixin.__init__(self)
|
||||
assert isinstance(manifest, Manifest)
|
||||
self.manifest = manifest
|
||||
self.repository = manifest.repository
|
||||
self.key = manifest.key
|
||||
self.repo_objs = manifest.repo_objs
|
||||
self._txn_active = False
|
||||
|
||||
self.security_manager = SecurityManager(self.repository)
|
||||
self.security_manager.assert_secure(manifest, self.key, lock_wait=lock_wait)
|
||||
|
@ -848,10 +814,13 @@ class AdHocCache(ChunksMixin):
|
|||
# Public API
|
||||
|
||||
def __enter__(self):
|
||||
self._chunks = None
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
pass
|
||||
if exc_type is None:
|
||||
self.security_manager.save(self.manifest, self.key)
|
||||
self._chunks = None
|
||||
|
||||
files = None # type: ignore
|
||||
cache_mode = "d"
|
||||
|
@ -862,17 +831,3 @@ class AdHocCache(ChunksMixin):
|
|||
|
||||
def memorize_file(self, hashed_path, path_hash, st, chunks):
|
||||
pass
|
||||
|
||||
def commit(self):
|
||||
if not self._txn_active:
|
||||
return
|
||||
self.security_manager.save(self.manifest, self.key)
|
||||
self._txn_active = False
|
||||
|
||||
def rollback(self):
|
||||
self._txn_active = False
|
||||
del self.chunks
|
||||
|
||||
def begin_txn(self):
|
||||
self._txn_active = True
|
||||
self.chunks = self._load_chunks_from_repo()
|
||||
|
|
|
@ -264,9 +264,7 @@ def test_unknown_mandatory_feature_in_cache(archivers, request):
|
|||
repository._location = Location(archiver.repository_location)
|
||||
manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
|
||||
with Cache(repository, manifest) as cache:
|
||||
cache.begin_txn()
|
||||
cache.cache_config.mandatory_features = {"unknown-feature"}
|
||||
cache.commit()
|
||||
|
||||
if archiver.FORK_DEFAULT:
|
||||
cmd(archiver, "create", "test", "input")
|
||||
|
|
|
@ -59,15 +59,6 @@ class TestAdHocCache:
|
|||
assert cache.cache_mode == "d"
|
||||
assert cache.files is None
|
||||
|
||||
def test_txn(self, cache):
|
||||
assert not cache._txn_active
|
||||
cache.seen_chunk(H(5))
|
||||
assert cache._txn_active
|
||||
assert cache.chunks
|
||||
cache.rollback()
|
||||
assert not cache._txn_active
|
||||
assert not hasattr(cache, "chunks")
|
||||
|
||||
def test_incref_after_add_chunk(self, cache):
|
||||
assert cache.add_chunk(H(3), {}, b"5678", stats=Statistics()) == (H(3), 4)
|
||||
assert cache.chunk_incref(H(3), 4, Statistics()) == (H(3), 4)
|
||||
|
|
Loading…
Add table
Reference in a new issue