diff --git a/src/borg/cache.py b/src/borg/cache.py index 3338c111b..9944dac71 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -248,15 +248,6 @@ def cache_dir(repository, path=None): return path or os.path.join(get_cache_dir(), repository.id_str) -def files_cache_name(): - suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "") - return "files." + suffix if suffix else "files" - - -def discover_files_cache_name(path): - return [fn for fn in os.listdir(path) if fn == "files" or fn.startswith("files.")][0] - - class CacheConfig: def __init__(self, repository, path=None, lock_wait=None): self.repository = repository @@ -493,7 +484,183 @@ def format_tuple(self): return self.Summary(**stats) -class LocalCache(CacheStatsMixin): +class FilesCacheMixin: + """ + Massively accelerate processing of unchanged files by caching their chunks list. + With that, we can avoid having to read and chunk them to get their chunks list. + """ + + FILES_CACHE_NAME = "files" + + def __init__(self, cache_mode): + self.cache_mode = cache_mode + self.files = None + self._newest_cmtime = None + + def files_cache_name(self): + suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "") + return self.FILES_CACHE_NAME + "." + suffix if suffix else self.FILES_CACHE_NAME + + def discover_files_cache_name(self, path): + return [ + fn for fn in os.listdir(path) if fn == self.FILES_CACHE_NAME or fn.startswith(self.FILES_CACHE_NAME + ".") + ][0] + + def _create_empty_files_cache(self, path): + with IntegrityCheckedFile(path=os.path.join(path, self.files_cache_name()), write=True) as fd: + pass # empty file + return fd.integrity_data + + def _read_files_cache(self): + if "d" in self.cache_mode: # d(isabled) + return + + self.files = {} + logger.debug("Reading files cache ...") + files_cache_logger.debug("FILES-CACHE-LOAD: starting...") + msg = None + try: + with IntegrityCheckedFile( + path=os.path.join(self.path, self.files_cache_name()), + write=False, + integrity_data=self.cache_config.integrity.get(self.files_cache_name()), + ) as fd: + u = msgpack.Unpacker(use_list=True) + while True: + data = fd.read(64 * 1024) + if not data: + break + u.feed(data) + try: + for path_hash, item in u: + entry = FileCacheEntry(*item) + # in the end, this takes about 240 Bytes per file + self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1)) + except (TypeError, ValueError) as exc: + msg = "The files cache seems invalid. [%s]" % str(exc) + break + except OSError as exc: + msg = "The files cache can't be read. [%s]" % str(exc) + except FileIntegrityError as fie: + msg = "The files cache is corrupted. [%s]" % str(fie) + if msg is not None: + logger.warning(msg) + logger.warning("Continuing without files cache - expect lower performance.") + self.files = {} + files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files)) + + def _write_files_cache(self): + if self._newest_cmtime is None: + # was never set because no files were modified/added + self._newest_cmtime = 2**63 - 1 # nanoseconds, good until y2262 + ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20)) + files_cache_logger.debug("FILES-CACHE-SAVE: starting...") + with IntegrityCheckedFile(path=os.path.join(self.path, self.files_cache_name()), write=True) as fd: + entry_count = 0 + for path_hash, item in self.files.items(): + # Only keep files seen in this backup that are older than newest cmtime seen in this backup - + # this is to avoid issues with filesystem snapshots and cmtime granularity. + # Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet. + entry = FileCacheEntry(*msgpack.unpackb(item)) + if ( + entry.age == 0 + and timestamp_to_int(entry.cmtime) < self._newest_cmtime + or entry.age > 0 + and entry.age < ttl + ): + msgpack.pack((path_hash, entry), fd) + entry_count += 1 + files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl) + files_cache_logger.debug( + "FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime + ) + files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count) + return fd.integrity_data + + def file_known_and_unchanged(self, hashed_path, path_hash, st): + """ + Check if we know the file that has this path_hash (know == it is in our files cache) and + whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode). + + :param hashed_path: the file's path as we gave it to hash(hashed_path) + :param path_hash: hash(hashed_path), to save some memory in the files cache + :param st: the file's stat() result + :return: known, chunks (known is True if we have infos about this file in the cache, + chunks is a list[ChunkListEntry] IF the file has not changed, otherwise None). + """ + if not stat.S_ISREG(st.st_mode): + return False, None + cache_mode = self.cache_mode + if "d" in cache_mode: # d(isabled) + files_cache_logger.debug("UNKNOWN: files cache disabled") + return False, None + # note: r(echunk) does not need the files cache in this method, but the files cache will + # be updated and saved to disk to memorize the files. To preserve previous generations in + # the cache, this means that it also needs to get loaded from disk first. + if "r" in cache_mode: # r(echunk) + files_cache_logger.debug("UNKNOWN: rechunking enforced") + return False, None + entry = self.files.get(path_hash) + if not entry: + files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path) + return False, None + # we know the file! + entry = FileCacheEntry(*msgpack.unpackb(entry)) + if "s" in cache_mode and entry.size != st.st_size: + files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path) + return True, None + if "i" in cache_mode and entry.inode != st.st_ino: + files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path) + return True, None + if "c" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns: + files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path) + return True, None + elif "m" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns: + files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path) + return True, None + # we ignored the inode number in the comparison above or it is still same. + # if it is still the same, replacing it in the tuple doesn't change it. + # if we ignored it, a reason for doing that is that files were moved to a new + # disk / new fs (so a one-time change of inode number is expected) and we wanted + # to avoid everything getting chunked again. to be able to re-enable the inode + # number comparison in a future backup run (and avoid chunking everything + # again at that time), we need to update the inode number in the cache with what + # we see in the filesystem. + self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0)) + chunks = [ChunkListEntry(*chunk) for chunk in entry.chunks] # convert to list of namedtuple + return True, chunks + + def memorize_file(self, hashed_path, path_hash, st, chunks): + if not stat.S_ISREG(st.st_mode): + return + cache_mode = self.cache_mode + # note: r(echunk) modes will update the files cache, d(isabled) mode won't + if "d" in cache_mode: + files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled") + return + if "c" in cache_mode: + cmtime_type = "ctime" + cmtime_ns = safe_ns(st.st_ctime_ns) + elif "m" in cache_mode: + cmtime_type = "mtime" + cmtime_ns = safe_ns(st.st_mtime_ns) + else: # neither 'c' nor 'm' in cache_mode, avoid UnboundLocalError + cmtime_type = "ctime" + cmtime_ns = safe_ns(st.st_ctime_ns) + entry = FileCacheEntry( + age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunks=chunks + ) + self.files[path_hash] = msgpack.packb(entry) + self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns) + files_cache_logger.debug( + "FILES-CACHE-UPDATE: put %r [has %s] <- %r", + entry._replace(chunks="[%d entries]" % len(entry.chunks)), + cmtime_type, + hashed_path, + ) + + +class LocalCache(CacheStatsMixin, FilesCacheMixin): """ Persistent, local (client-side) cache. """ @@ -516,13 +683,13 @@ def __init__( :param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison """ CacheStatsMixin.__init__(self, iec=iec) + FilesCacheMixin.__init__(self, cache_mode) assert isinstance(manifest, Manifest) self.manifest = manifest self.repository = manifest.repository self.key = manifest.key self.repo_objs = manifest.repo_objs self.progress = progress - self.cache_mode = cache_mode self.timestamp = None self.txn_active = False self.do_cache = os.environ.get("BORG_USE_CHUNKS_ARCHIVE", "yes").lower() in ["yes", "1", "true"] @@ -571,8 +738,7 @@ def create(self): self.cache_config.create() ChunkIndex().write(os.path.join(self.path, "chunks")) os.makedirs(os.path.join(self.path, "chunks.archive.d")) - with SaveFile(os.path.join(self.path, files_cache_name()), binary=True): - pass # empty file + self._create_empty_files_cache(self.path) def _do_open(self): self.cache_config.load() @@ -582,10 +748,7 @@ def _do_open(self): integrity_data=self.cache_config.integrity.get("chunks"), ) as fd: self.chunks = ChunkIndex.read(fd) - if "d" in self.cache_mode: # d(isabled) - self.files = None - else: - self._read_files() + self._read_files_cache() def open(self): if not os.path.isdir(self.path): @@ -598,42 +761,6 @@ def close(self): self.cache_config.close() self.cache_config = None - def _read_files(self): - self.files = {} - self._newest_cmtime = None - logger.debug("Reading files cache ...") - files_cache_logger.debug("FILES-CACHE-LOAD: starting...") - msg = None - try: - with IntegrityCheckedFile( - path=os.path.join(self.path, files_cache_name()), - write=False, - integrity_data=self.cache_config.integrity.get(files_cache_name()), - ) as fd: - u = msgpack.Unpacker(use_list=True) - while True: - data = fd.read(64 * 1024) - if not data: - break - u.feed(data) - try: - for path_hash, item in u: - entry = FileCacheEntry(*item) - # in the end, this takes about 240 Bytes per file - self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1)) - except (TypeError, ValueError) as exc: - msg = "The files cache seems invalid. [%s]" % str(exc) - break - except OSError as exc: - msg = "The files cache can't be read. [%s]" % str(exc) - except FileIntegrityError as fie: - msg = "The files cache is corrupted. [%s]" % str(fie) - if msg is not None: - logger.warning(msg) - logger.warning("Continuing without files cache - expect lower performance.") - self.files = {} - files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files)) - def begin_txn(self): # Initialize transaction snapshot pi = ProgressIndicatorMessage(msgid="cache.begin_transaction") @@ -645,10 +772,9 @@ def begin_txn(self): shutil.copy(os.path.join(self.path, "chunks"), txn_dir) pi.output("Initializing cache transaction: Reading files") try: - shutil.copy(os.path.join(self.path, files_cache_name()), txn_dir) + shutil.copy(os.path.join(self.path, self.files_cache_name()), txn_dir) except FileNotFoundError: - with SaveFile(os.path.join(txn_dir, files_cache_name()), binary=True): - pass # empty file + self._create_empty_files_cache(txn_dir) os.replace(txn_dir, os.path.join(self.path, "txn.active")) self.txn_active = True pi.finish() @@ -660,33 +786,9 @@ def commit(self): self.security_manager.save(self.manifest, self.key) pi = ProgressIndicatorMessage(msgid="cache.commit") if self.files is not None: - if self._newest_cmtime is None: - # was never set because no files were modified/added - self._newest_cmtime = 2**63 - 1 # nanoseconds, good until y2262 - ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20)) pi.output("Saving files cache") - files_cache_logger.debug("FILES-CACHE-SAVE: starting...") - with IntegrityCheckedFile(path=os.path.join(self.path, files_cache_name()), write=True) as fd: - entry_count = 0 - for path_hash, item in self.files.items(): - # Only keep files seen in this backup that are older than newest cmtime seen in this backup - - # this is to avoid issues with filesystem snapshots and cmtime granularity. - # Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet. - entry = FileCacheEntry(*msgpack.unpackb(item)) - if ( - entry.age == 0 - and timestamp_to_int(entry.cmtime) < self._newest_cmtime - or entry.age > 0 - and entry.age < ttl - ): - msgpack.pack((path_hash, entry), fd) - entry_count += 1 - files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl) - files_cache_logger.debug( - "FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime - ) - files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count) - self.cache_config.integrity[files_cache_name()] = fd.integrity_data + integrity_data = self._write_files_cache() + self.cache_config.integrity[self.files_cache_name()] = integrity_data pi.output("Saving chunks cache") with IntegrityCheckedFile(path=os.path.join(self.path, "chunks"), write=True) as fd: self.chunks.write(fd) @@ -708,7 +810,7 @@ def rollback(self): if os.path.exists(txn_dir): shutil.copy(os.path.join(txn_dir, "config"), self.path) shutil.copy(os.path.join(txn_dir, "chunks"), self.path) - shutil.copy(os.path.join(txn_dir, discover_files_cache_name(txn_dir)), self.path) + shutil.copy(os.path.join(txn_dir, self.discover_files_cache_name(txn_dir)), self.path) txn_tmp = os.path.join(self.path, "txn.tmp") os.replace(txn_dir, txn_tmp) if os.path.exists(txn_tmp): @@ -939,9 +1041,8 @@ def wipe_cache(self): with IntegrityCheckedFile(path=os.path.join(self.path, "chunks"), write=True) as fd: self.chunks.write(fd) self.cache_config.integrity["chunks"] = fd.integrity_data - with IntegrityCheckedFile(path=os.path.join(self.path, files_cache_name()), write=True) as fd: - pass # empty file - self.cache_config.integrity[files_cache_name()] = fd.integrity_data + integrity_data = self._create_empty_files_cache(self.path) + self.cache_config.integrity[self.files_cache_name()] = integrity_data self.cache_config.manifest_id = "" self.cache_config._config.set("cache", "manifest", "") if not self.cache_config._config.has_section("integrity"): @@ -1033,88 +1134,6 @@ def chunk_decref(self, id, size, stats, wait=True): else: stats.update(-size, False) - def file_known_and_unchanged(self, hashed_path, path_hash, st): - """ - Check if we know the file that has this path_hash (know == it is in our files cache) and - whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode). - - :param hashed_path: the file's path as we gave it to hash(hashed_path) - :param path_hash: hash(hashed_path), to save some memory in the files cache - :param st: the file's stat() result - :return: known, chunks (known is True if we have infos about this file in the cache, - chunks is a list[ChunkListEntry] IF the file has not changed, otherwise None). - """ - if not stat.S_ISREG(st.st_mode): - return False, None - cache_mode = self.cache_mode - if "d" in cache_mode: # d(isabled) - files_cache_logger.debug("UNKNOWN: files cache disabled") - return False, None - # note: r(echunk) does not need the files cache in this method, but the files cache will - # be updated and saved to disk to memorize the files. To preserve previous generations in - # the cache, this means that it also needs to get loaded from disk first. - if "r" in cache_mode: # r(echunk) - files_cache_logger.debug("UNKNOWN: rechunking enforced") - return False, None - entry = self.files.get(path_hash) - if not entry: - files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path) - return False, None - # we know the file! - entry = FileCacheEntry(*msgpack.unpackb(entry)) - if "s" in cache_mode and entry.size != st.st_size: - files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path) - return True, None - if "i" in cache_mode and entry.inode != st.st_ino: - files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path) - return True, None - if "c" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns: - files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path) - return True, None - elif "m" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns: - files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path) - return True, None - # we ignored the inode number in the comparison above or it is still same. - # if it is still the same, replacing it in the tuple doesn't change it. - # if we ignored it, a reason for doing that is that files were moved to a new - # disk / new fs (so a one-time change of inode number is expected) and we wanted - # to avoid everything getting chunked again. to be able to re-enable the inode - # number comparison in a future backup run (and avoid chunking everything - # again at that time), we need to update the inode number in the cache with what - # we see in the filesystem. - self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0)) - chunks = [ChunkListEntry(*chunk) for chunk in entry.chunks] # convert to list of namedtuple - return True, chunks - - def memorize_file(self, hashed_path, path_hash, st, chunks): - if not stat.S_ISREG(st.st_mode): - return - cache_mode = self.cache_mode - # note: r(echunk) modes will update the files cache, d(isabled) mode won't - if "d" in cache_mode: - files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled") - return - if "c" in cache_mode: - cmtime_type = "ctime" - cmtime_ns = safe_ns(st.st_ctime_ns) - elif "m" in cache_mode: - cmtime_type = "mtime" - cmtime_ns = safe_ns(st.st_mtime_ns) - else: # neither 'c' nor 'm' in cache_mode, avoid UnboundLocalError - cmtime_type = "ctime" - cmtime_ns = safe_ns(st.st_ctime_ns) - entry = FileCacheEntry( - age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunks=chunks - ) - self.files[path_hash] = msgpack.packb(entry) - self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns) - files_cache_logger.debug( - "FILES-CACHE-UPDATE: put %r [has %s] <- %r", - entry._replace(chunks="[%d entries]" % len(entry.chunks)), - cmtime_type, - hashed_path, - ) - class AdHocCache(CacheStatsMixin): """