refactor files cache code into FilesCacheMixin class

This commit is contained in:
Thomas Waldmann 2023-09-22 20:28:17 +02:00
parent 08003f4065
commit 62289ff513
No known key found for this signature in database
GPG Key ID: 243ACFA951F78E01
1 changed files with 185 additions and 167 deletions

View File

@ -248,15 +248,6 @@ def cache_dir(repository, path=None):
return path or os.path.join(get_cache_dir(), repository.id_str)
def files_cache_name():
suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "")
return "files." + suffix if suffix else "files"
def discover_files_cache_name(path):
return [fn for fn in os.listdir(path) if fn == "files" or fn.startswith("files.")][0]
class CacheConfig:
def __init__(self, repository, path=None, lock_wait=None):
self.repository = repository
@ -493,7 +484,182 @@ Total chunks: {0.total_chunks}
return self.Summary(**stats)
class LocalCache(CacheStatsMixin):
class FilesCacheMixin:
"""
Massively accelerate processing of unchanged files by caching their chunks list.
With that, we can avoid having to read and chunk them to get their chunks list.
"""
FILES_CACHE_NAME = "files"
def __init__(self, cache_mode):
self.cache_mode = cache_mode
self.files = None
self._newest_cmtime = None
def files_cache_name(self):
suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "")
return self.FILES_CACHE_NAME + "." + suffix if suffix else self.FILES_CACHE_NAME
def discover_files_cache_name(self, path):
return [
fn for fn in os.listdir(path) if fn == self.FILES_CACHE_NAME or fn.startswith(self.FILES_CACHE_NAME + ".")
][0]
def _create_empty_files_cache(self, path):
with SaveFile(os.path.join(path, self.files_cache_name()), binary=True):
pass # empty file
def _read_files_cache(self):
if "d" in self.cache_mode: # d(isabled)
return
self.files = {}
logger.debug("Reading files cache ...")
files_cache_logger.debug("FILES-CACHE-LOAD: starting...")
msg = None
try:
with IntegrityCheckedFile(
path=os.path.join(self.path, self.files_cache_name()),
write=False,
integrity_data=self.cache_config.integrity.get(self.files_cache_name()),
) as fd:
u = msgpack.Unpacker(use_list=True)
while True:
data = fd.read(64 * 1024)
if not data:
break
u.feed(data)
try:
for path_hash, item in u:
entry = FileCacheEntry(*item)
# in the end, this takes about 240 Bytes per file
self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1))
except (TypeError, ValueError) as exc:
msg = "The files cache seems invalid. [%s]" % str(exc)
break
except OSError as exc:
msg = "The files cache can't be read. [%s]" % str(exc)
except FileIntegrityError as fie:
msg = "The files cache is corrupted. [%s]" % str(fie)
if msg is not None:
logger.warning(msg)
logger.warning("Continuing without files cache - expect lower performance.")
self.files = {}
files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files))
def _write_files_cache(self):
if self._newest_cmtime is None:
# was never set because no files were modified/added
self._newest_cmtime = 2**63 - 1 # nanoseconds, good until y2262
ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20))
files_cache_logger.debug("FILES-CACHE-SAVE: starting...")
with IntegrityCheckedFile(path=os.path.join(self.path, self.files_cache_name()), write=True) as fd:
entry_count = 0
for path_hash, item in self.files.items():
# Only keep files seen in this backup that are older than newest cmtime seen in this backup -
# this is to avoid issues with filesystem snapshots and cmtime granularity.
# Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
entry = FileCacheEntry(*msgpack.unpackb(item))
if (
entry.age == 0
and timestamp_to_int(entry.cmtime) < self._newest_cmtime
or entry.age > 0
and entry.age < ttl
):
msgpack.pack((path_hash, entry), fd)
entry_count += 1
files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl)
files_cache_logger.debug(
"FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime
)
files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count)
return fd.integrity_data
def file_known_and_unchanged(self, hashed_path, path_hash, st):
"""
Check if we know the file that has this path_hash (know == it is in our files cache) and
whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode).
:param hashed_path: the file's path as we gave it to hash(hashed_path)
:param path_hash: hash(hashed_path), to save some memory in the files cache
:param st: the file's stat() result
:return: known, chunks (known is True if we have infos about this file in the cache,
chunks is a list[ChunkListEntry] IF the file has not changed, otherwise None).
"""
if not stat.S_ISREG(st.st_mode):
return False, None
cache_mode = self.cache_mode
if "d" in cache_mode: # d(isabled)
files_cache_logger.debug("UNKNOWN: files cache disabled")
return False, None
# note: r(echunk) does not need the files cache in this method, but the files cache will
# be updated and saved to disk to memorize the files. To preserve previous generations in
# the cache, this means that it also needs to get loaded from disk first.
if "r" in cache_mode: # r(echunk)
files_cache_logger.debug("UNKNOWN: rechunking enforced")
return False, None
entry = self.files.get(path_hash)
if not entry:
files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path)
return False, None
# we know the file!
entry = FileCacheEntry(*msgpack.unpackb(entry))
if "s" in cache_mode and entry.size != st.st_size:
files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path)
return True, None
if "i" in cache_mode and entry.inode != st.st_ino:
files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path)
return True, None
if "c" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns:
files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path)
return True, None
elif "m" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns:
files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path)
return True, None
# we ignored the inode number in the comparison above or it is still same.
# if it is still the same, replacing it in the tuple doesn't change it.
# if we ignored it, a reason for doing that is that files were moved to a new
# disk / new fs (so a one-time change of inode number is expected) and we wanted
# to avoid everything getting chunked again. to be able to re-enable the inode
# number comparison in a future backup run (and avoid chunking everything
# again at that time), we need to update the inode number in the cache with what
# we see in the filesystem.
self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0))
chunks = [ChunkListEntry(*chunk) for chunk in entry.chunks] # convert to list of namedtuple
return True, chunks
def memorize_file(self, hashed_path, path_hash, st, chunks):
if not stat.S_ISREG(st.st_mode):
return
cache_mode = self.cache_mode
# note: r(echunk) modes will update the files cache, d(isabled) mode won't
if "d" in cache_mode:
files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled")
return
if "c" in cache_mode:
cmtime_type = "ctime"
cmtime_ns = safe_ns(st.st_ctime_ns)
elif "m" in cache_mode:
cmtime_type = "mtime"
cmtime_ns = safe_ns(st.st_mtime_ns)
else: # neither 'c' nor 'm' in cache_mode, avoid UnboundLocalError
cmtime_type = "ctime"
cmtime_ns = safe_ns(st.st_ctime_ns)
entry = FileCacheEntry(
age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunks=chunks
)
self.files[path_hash] = msgpack.packb(entry)
self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns)
files_cache_logger.debug(
"FILES-CACHE-UPDATE: put %r [has %s] <- %r",
entry._replace(chunks="[%d entries]" % len(entry.chunks)),
cmtime_type,
hashed_path,
)
class LocalCache(CacheStatsMixin, FilesCacheMixin):
"""
Persistent, local (client-side) cache.
"""
@ -516,13 +682,13 @@ class LocalCache(CacheStatsMixin):
:param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison
"""
CacheStatsMixin.__init__(self, iec=iec)
FilesCacheMixin.__init__(self, cache_mode)
assert isinstance(manifest, Manifest)
self.manifest = manifest
self.repository = manifest.repository
self.key = manifest.key
self.repo_objs = manifest.repo_objs
self.progress = progress
self.cache_mode = cache_mode
self.timestamp = None
self.txn_active = False
@ -565,8 +731,7 @@ class LocalCache(CacheStatsMixin):
self.cache_config.create()
ChunkIndex().write(os.path.join(self.path, "chunks"))
os.makedirs(os.path.join(self.path, "chunks.archive.d"))
with SaveFile(os.path.join(self.path, files_cache_name()), binary=True):
pass # empty file
self._create_empty_files_cache(self.path)
def _do_open(self):
self.cache_config.load()
@ -576,10 +741,7 @@ class LocalCache(CacheStatsMixin):
integrity_data=self.cache_config.integrity.get("chunks"),
) as fd:
self.chunks = ChunkIndex.read(fd)
if "d" in self.cache_mode: # d(isabled)
self.files = None
else:
self._read_files()
self._read_files_cache()
def open(self):
if not os.path.isdir(self.path):
@ -592,42 +754,6 @@ class LocalCache(CacheStatsMixin):
self.cache_config.close()
self.cache_config = None
def _read_files(self):
self.files = {}
self._newest_cmtime = None
logger.debug("Reading files cache ...")
files_cache_logger.debug("FILES-CACHE-LOAD: starting...")
msg = None
try:
with IntegrityCheckedFile(
path=os.path.join(self.path, files_cache_name()),
write=False,
integrity_data=self.cache_config.integrity.get(files_cache_name()),
) as fd:
u = msgpack.Unpacker(use_list=True)
while True:
data = fd.read(64 * 1024)
if not data:
break
u.feed(data)
try:
for path_hash, item in u:
entry = FileCacheEntry(*item)
# in the end, this takes about 240 Bytes per file
self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1))
except (TypeError, ValueError) as exc:
msg = "The files cache seems invalid. [%s]" % str(exc)
break
except OSError as exc:
msg = "The files cache can't be read. [%s]" % str(exc)
except FileIntegrityError as fie:
msg = "The files cache is corrupted. [%s]" % str(fie)
if msg is not None:
logger.warning(msg)
logger.warning("Continuing without files cache - expect lower performance.")
self.files = {}
files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files))
def begin_txn(self):
# Initialize transaction snapshot
pi = ProgressIndicatorMessage(msgid="cache.begin_transaction")
@ -639,10 +765,9 @@ class LocalCache(CacheStatsMixin):
shutil.copy(os.path.join(self.path, "chunks"), txn_dir)
pi.output("Initializing cache transaction: Reading files")
try:
shutil.copy(os.path.join(self.path, files_cache_name()), txn_dir)
shutil.copy(os.path.join(self.path, self.files_cache_name()), txn_dir)
except FileNotFoundError:
with SaveFile(os.path.join(txn_dir, files_cache_name()), binary=True):
pass # empty file
self._create_empty_files_cache(txn_dir)
os.replace(txn_dir, os.path.join(self.path, "txn.active"))
self.txn_active = True
pi.finish()
@ -654,33 +779,9 @@ class LocalCache(CacheStatsMixin):
self.security_manager.save(self.manifest, self.key)
pi = ProgressIndicatorMessage(msgid="cache.commit")
if self.files is not None:
if self._newest_cmtime is None:
# was never set because no files were modified/added
self._newest_cmtime = 2**63 - 1 # nanoseconds, good until y2262
ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20))
pi.output("Saving files cache")
files_cache_logger.debug("FILES-CACHE-SAVE: starting...")
with IntegrityCheckedFile(path=os.path.join(self.path, files_cache_name()), write=True) as fd:
entry_count = 0
for path_hash, item in self.files.items():
# Only keep files seen in this backup that are older than newest cmtime seen in this backup -
# this is to avoid issues with filesystem snapshots and cmtime granularity.
# Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
entry = FileCacheEntry(*msgpack.unpackb(item))
if (
entry.age == 0
and timestamp_to_int(entry.cmtime) < self._newest_cmtime
or entry.age > 0
and entry.age < ttl
):
msgpack.pack((path_hash, entry), fd)
entry_count += 1
files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl)
files_cache_logger.debug(
"FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime
)
files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count)
self.cache_config.integrity[files_cache_name()] = fd.integrity_data
integrity_data = self._write_files_cache()
self.cache_config.integrity[self.files_cache_name()] = integrity_data
pi.output("Saving chunks cache")
with IntegrityCheckedFile(path=os.path.join(self.path, "chunks"), write=True) as fd:
self.chunks.write(fd)
@ -702,7 +803,7 @@ class LocalCache(CacheStatsMixin):
if os.path.exists(txn_dir):
shutil.copy(os.path.join(txn_dir, "config"), self.path)
shutil.copy(os.path.join(txn_dir, "chunks"), self.path)
shutil.copy(os.path.join(txn_dir, discover_files_cache_name(txn_dir)), self.path)
shutil.copy(os.path.join(txn_dir, self.discover_files_cache_name(txn_dir)), self.path)
txn_tmp = os.path.join(self.path, "txn.tmp")
os.replace(txn_dir, txn_tmp)
if os.path.exists(txn_tmp):
@ -934,8 +1035,7 @@ class LocalCache(CacheStatsMixin):
shutil.rmtree(os.path.join(self.path, "chunks.archive.d"))
os.makedirs(os.path.join(self.path, "chunks.archive.d"))
self.chunks = ChunkIndex()
with SaveFile(os.path.join(self.path, files_cache_name()), binary=True):
pass # empty file
self._create_empty_files_cache(self.path)
self.cache_config.manifest_id = ""
self.cache_config._config.set("cache", "manifest", "")
@ -1019,88 +1119,6 @@ class LocalCache(CacheStatsMixin):
else:
stats.update(-size, False)
def file_known_and_unchanged(self, hashed_path, path_hash, st):
"""
Check if we know the file that has this path_hash (know == it is in our files cache) and
whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode).
:param hashed_path: the file's path as we gave it to hash(hashed_path)
:param path_hash: hash(hashed_path), to save some memory in the files cache
:param st: the file's stat() result
:return: known, chunks (known is True if we have infos about this file in the cache,
chunks is a list[ChunkListEntry] IF the file has not changed, otherwise None).
"""
if not stat.S_ISREG(st.st_mode):
return False, None
cache_mode = self.cache_mode
if "d" in cache_mode: # d(isabled)
files_cache_logger.debug("UNKNOWN: files cache disabled")
return False, None
# note: r(echunk) does not need the files cache in this method, but the files cache will
# be updated and saved to disk to memorize the files. To preserve previous generations in
# the cache, this means that it also needs to get loaded from disk first.
if "r" in cache_mode: # r(echunk)
files_cache_logger.debug("UNKNOWN: rechunking enforced")
return False, None
entry = self.files.get(path_hash)
if not entry:
files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path)
return False, None
# we know the file!
entry = FileCacheEntry(*msgpack.unpackb(entry))
if "s" in cache_mode and entry.size != st.st_size:
files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path)
return True, None
if "i" in cache_mode and entry.inode != st.st_ino:
files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path)
return True, None
if "c" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns:
files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path)
return True, None
elif "m" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns:
files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path)
return True, None
# we ignored the inode number in the comparison above or it is still same.
# if it is still the same, replacing it in the tuple doesn't change it.
# if we ignored it, a reason for doing that is that files were moved to a new
# disk / new fs (so a one-time change of inode number is expected) and we wanted
# to avoid everything getting chunked again. to be able to re-enable the inode
# number comparison in a future backup run (and avoid chunking everything
# again at that time), we need to update the inode number in the cache with what
# we see in the filesystem.
self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0))
chunks = [ChunkListEntry(*chunk) for chunk in entry.chunks] # convert to list of namedtuple
return True, chunks
def memorize_file(self, hashed_path, path_hash, st, chunks):
if not stat.S_ISREG(st.st_mode):
return
cache_mode = self.cache_mode
# note: r(echunk) modes will update the files cache, d(isabled) mode won't
if "d" in cache_mode:
files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled")
return
if "c" in cache_mode:
cmtime_type = "ctime"
cmtime_ns = safe_ns(st.st_ctime_ns)
elif "m" in cache_mode:
cmtime_type = "mtime"
cmtime_ns = safe_ns(st.st_mtime_ns)
else: # neither 'c' nor 'm' in cache_mode, avoid UnboundLocalError
cmtime_type = "ctime"
cmtime_ns = safe_ns(st.st_ctime_ns)
entry = FileCacheEntry(
age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunks=chunks
)
self.files[path_hash] = msgpack.packb(entry)
self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns)
files_cache_logger.debug(
"FILES-CACHE-UPDATE: put %r [has %s] <- %r",
entry._replace(chunks="[%d entries]" % len(entry.chunks)),
cmtime_type,
hashed_path,
)
class AdHocCache(CacheStatsMixin):
"""