mirror of https://github.com/borgbackup/borg.git
Compare commits
17 Commits
7eab72de9a
...
7028d277f8
Author | SHA1 | Date |
---|---|---|
TW | 7028d277f8 | |
Thomas Waldmann | 1389bd10bc | |
Thomas Waldmann | 8f2d41ab6c | |
Thomas Waldmann | 30f6f1a4c4 | |
Thomas Waldmann | b05e3067b8 | |
Thomas Waldmann | 6087d7487b | |
Thomas Waldmann | 5ec557908c | |
Thomas Waldmann | 186d536d55 | |
Thomas Waldmann | 62289ff513 | |
Thomas Waldmann | 08003f4065 | |
Thomas Waldmann | 26c6a1035a | |
Thomas Waldmann | eafb80ccc0 | |
Thomas Waldmann | c5e130d03d | |
TW | 411c763fb8 | |
Thomas Waldmann | 54a85bf56d | |
TW | 4d2eb0cb1b | |
Thomas Waldmann | d893b899fc |
|
@ -9,7 +9,7 @@ jobs:
|
|||
lint:
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- uses: psf/black@stable
|
||||
with:
|
||||
version: "~= 23.0"
|
||||
|
|
|
@ -78,11 +78,11 @@ jobs:
|
|||
# just fetching 1 commit is not enough for setuptools-scm, so we fetch all
|
||||
fetch-depth: 0
|
||||
- name: Set up Python ${{ matrix.python-version }}
|
||||
uses: actions/setup-python@v4
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
- name: Cache pip
|
||||
uses: actions/cache@v3
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: ~/.cache/pip
|
||||
key: ${{ runner.os }}-pip-${{ hashFiles('requirements.d/development.txt') }}
|
||||
|
@ -114,7 +114,7 @@ jobs:
|
|||
#sudo -E bash -c "tox -e py"
|
||||
tox --skip-missing-interpreters
|
||||
- name: Upload coverage to Codecov
|
||||
uses: codecov/codecov-action@v3
|
||||
uses: codecov/codecov-action@v4
|
||||
env:
|
||||
OS: ${{ runner.os }}
|
||||
python: ${{ matrix.python-version }}
|
||||
|
|
|
@ -643,14 +643,14 @@ Duration: {0.duration}
|
|||
# so we can already remove it here, the next .save() will then commit this cleanup.
|
||||
# remove its manifest entry, remove its ArchiveItem chunk, remove its item_ptrs chunks:
|
||||
del self.manifest.archives[self.checkpoint_name]
|
||||
self.cache.chunk_decref(self.id, self.stats)
|
||||
self.cache.chunk_decref(self.id, 1, self.stats)
|
||||
for id in metadata.item_ptrs:
|
||||
self.cache.chunk_decref(id, self.stats)
|
||||
self.cache.chunk_decref(id, 1, self.stats)
|
||||
# also get rid of that part item, we do not want to have it in next checkpoint or final archive
|
||||
tail_chunks = self.items_buffer.restore_chunks_state()
|
||||
# tail_chunks contain the tail of the archive items metadata stream, not needed for next commit.
|
||||
for id in tail_chunks:
|
||||
self.cache.chunk_decref(id, self.stats)
|
||||
self.cache.chunk_decref(id, 1, self.stats) # TODO can we have real size here?
|
||||
|
||||
def save(self, name=None, comment=None, timestamp=None, stats=None, additional_metadata=None):
|
||||
name = name or self.name
|
||||
|
@ -1024,7 +1024,7 @@ Duration: {0.duration}
|
|||
new_id = self.key.id_hash(data)
|
||||
self.cache.add_chunk(new_id, {}, data, stats=self.stats, ro_type=ROBJ_ARCHIVE_META)
|
||||
self.manifest.archives[self.name] = (new_id, metadata.time)
|
||||
self.cache.chunk_decref(self.id, self.stats)
|
||||
self.cache.chunk_decref(self.id, 1, self.stats)
|
||||
self.id = new_id
|
||||
|
||||
def rename(self, name):
|
||||
|
@ -1052,12 +1052,15 @@ Duration: {0.duration}
|
|||
error = True
|
||||
return exception_ignored # must not return None here
|
||||
|
||||
def chunk_decref(id, stats):
|
||||
def chunk_decref(id, size, stats):
|
||||
try:
|
||||
self.cache.chunk_decref(id, stats, wait=False)
|
||||
self.cache.chunk_decref(id, size, stats, wait=False)
|
||||
except KeyError:
|
||||
cid = bin_to_hex(id)
|
||||
raise ChunksIndexError(cid)
|
||||
nonlocal error
|
||||
if forced == 0:
|
||||
cid = bin_to_hex(id)
|
||||
raise ChunksIndexError(cid)
|
||||
error = True
|
||||
else:
|
||||
fetch_async_response(wait=False)
|
||||
|
||||
|
@ -1073,13 +1076,13 @@ Duration: {0.duration}
|
|||
pi.show(i)
|
||||
_, data = self.repo_objs.parse(items_id, data, ro_type=ROBJ_ARCHIVE_STREAM)
|
||||
unpacker.feed(data)
|
||||
chunk_decref(items_id, stats)
|
||||
chunk_decref(items_id, 1, stats)
|
||||
try:
|
||||
for item in unpacker:
|
||||
item = Item(internal_dict=item)
|
||||
if "chunks" in item:
|
||||
for chunk_id, size in item.chunks:
|
||||
chunk_decref(chunk_id, stats)
|
||||
chunk_decref(chunk_id, size, stats)
|
||||
except (TypeError, ValueError):
|
||||
# if items metadata spans multiple chunks and one chunk got dropped somehow,
|
||||
# it could be that unpacker yields bad types
|
||||
|
@ -1096,12 +1099,12 @@ Duration: {0.duration}
|
|||
|
||||
# delete the blocks that store all the references that end up being loaded into metadata.items:
|
||||
for id in self.metadata.item_ptrs:
|
||||
chunk_decref(id, stats)
|
||||
chunk_decref(id, 1, stats)
|
||||
|
||||
# in forced delete mode, we try hard to delete at least the manifest entry,
|
||||
# if possible also the archive superblock, even if processing the items raises
|
||||
# some harmless exception.
|
||||
chunk_decref(self.id, stats)
|
||||
chunk_decref(self.id, 1, stats)
|
||||
del self.manifest.archives[self.name]
|
||||
while fetch_async_response(wait=True) is not None:
|
||||
# we did async deletes, process outstanding results (== exceptions),
|
||||
|
@ -1510,7 +1513,7 @@ class FilesystemObjectProcessors:
|
|||
except BackupOSError:
|
||||
# see comments in process_file's exception handler, same issue here.
|
||||
for chunk in item.get("chunks", []):
|
||||
cache.chunk_decref(chunk.id, self.stats, wait=False)
|
||||
cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False)
|
||||
raise
|
||||
else:
|
||||
item.get_size(memorize=True)
|
||||
|
@ -1544,7 +1547,7 @@ class FilesystemObjectProcessors:
|
|||
item.chunks = []
|
||||
for chunk_id, chunk_size in hl_chunks:
|
||||
# process one-by-one, so we will know in item.chunks how far we got
|
||||
chunk_entry = cache.chunk_incref(chunk_id, self.stats)
|
||||
chunk_entry = cache.chunk_incref(chunk_id, chunk_size, self.stats)
|
||||
item.chunks.append(chunk_entry)
|
||||
else: # normal case, no "2nd+" hardlink
|
||||
if not is_special_file:
|
||||
|
@ -1552,26 +1555,26 @@ class FilesystemObjectProcessors:
|
|||
started_hashing = time.monotonic()
|
||||
path_hash = self.key.id_hash(hashed_path)
|
||||
self.stats.hashing_time += time.monotonic() - started_hashing
|
||||
known, ids = cache.file_known_and_unchanged(hashed_path, path_hash, st)
|
||||
known, chunks = cache.file_known_and_unchanged(hashed_path, path_hash, st)
|
||||
else:
|
||||
# in --read-special mode, we may be called for special files.
|
||||
# there should be no information in the cache about special files processed in
|
||||
# read-special mode, but we better play safe as this was wrong in the past:
|
||||
hashed_path = path_hash = None
|
||||
known, ids = False, None
|
||||
if ids is not None:
|
||||
known, chunks = False, None
|
||||
if chunks is not None:
|
||||
# Make sure all ids are available
|
||||
for id_ in ids:
|
||||
if not cache.seen_chunk(id_):
|
||||
for chunk in chunks:
|
||||
if not cache.seen_chunk(chunk.id):
|
||||
# cache said it is unmodified, but we lost a chunk: process file like modified
|
||||
status = "M"
|
||||
break
|
||||
else:
|
||||
item.chunks = []
|
||||
for chunk_id in ids:
|
||||
for chunk in chunks:
|
||||
# process one-by-one, so we will know in item.chunks how far we got
|
||||
chunk_entry = cache.chunk_incref(chunk_id, self.stats)
|
||||
item.chunks.append(chunk_entry)
|
||||
cache.chunk_incref(chunk.id, chunk.size, self.stats)
|
||||
item.chunks.append(chunk)
|
||||
status = "U" # regular file, unchanged
|
||||
else:
|
||||
status = "M" if known else "A" # regular file, modified or added
|
||||
|
@ -1606,7 +1609,7 @@ class FilesystemObjectProcessors:
|
|||
# block or char device will change without its mtime/size/inode changing.
|
||||
# also, we must not memorize a potentially inconsistent/corrupt file that
|
||||
# changed while we backed it up.
|
||||
cache.memorize_file(hashed_path, path_hash, st, [c.id for c in item.chunks])
|
||||
cache.memorize_file(hashed_path, path_hash, st, item.chunks)
|
||||
self.stats.files_stats[status] += 1 # must be done late
|
||||
if not changed_while_backup:
|
||||
status = None # we already called print_file_status
|
||||
|
@ -1620,7 +1623,7 @@ class FilesystemObjectProcessors:
|
|||
# but we will not add an item (see add_item in create_helper) and thus
|
||||
# they would be orphaned chunks in case that we commit the transaction.
|
||||
for chunk in item.get("chunks", []):
|
||||
cache.chunk_decref(chunk.id, self.stats, wait=False)
|
||||
cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False)
|
||||
# Now that we have cleaned up the chunk references, we can re-raise the exception.
|
||||
# This will skip processing of this file, but might retry or continue with the next one.
|
||||
raise
|
||||
|
@ -1731,7 +1734,7 @@ class TarfileObjectProcessors:
|
|||
except BackupOSError:
|
||||
# see comment in FilesystemObjectProcessors.process_file, same issue here.
|
||||
for chunk in item.get("chunks", []):
|
||||
self.cache.chunk_decref(chunk.id, self.stats, wait=False)
|
||||
self.cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False)
|
||||
raise
|
||||
|
||||
|
||||
|
@ -2328,10 +2331,10 @@ class ArchiveChecker:
|
|||
unused = {id_ for id_, entry in self.chunks.iteritems() if entry.refcount == 0}
|
||||
orphaned = unused - self.possibly_superseded
|
||||
if orphaned:
|
||||
logger.error(f"{len(orphaned)} orphaned objects found!")
|
||||
logger.info(f"{len(orphaned)} orphaned (unused) objects found.")
|
||||
for chunk_id in orphaned:
|
||||
logger.debug(f"chunk {bin_to_hex(chunk_id)} is orphaned.")
|
||||
self.error_found = True
|
||||
# To support working with AdHocCache or NewCache, we do not set self.error_found = True.
|
||||
if self.repair and unused:
|
||||
logger.info(
|
||||
"Deleting %d orphaned and %d superseded objects..." % (len(orphaned), len(self.possibly_superseded))
|
||||
|
@ -2444,7 +2447,7 @@ class ArchiveRecreater:
|
|||
def process_chunks(self, archive, target, item):
|
||||
if not target.recreate_rechunkify:
|
||||
for chunk_id, size in item.chunks:
|
||||
self.cache.chunk_incref(chunk_id, target.stats)
|
||||
self.cache.chunk_incref(chunk_id, size, target.stats)
|
||||
return item.chunks
|
||||
chunk_iterator = self.iter_chunks(archive, target, list(item.chunks))
|
||||
chunk_processor = partial(self.chunk_processor, target)
|
||||
|
@ -2452,8 +2455,9 @@ class ArchiveRecreater:
|
|||
|
||||
def chunk_processor(self, target, chunk):
|
||||
chunk_id, data = cached_hash(chunk, self.key.id_hash)
|
||||
size = len(data)
|
||||
if chunk_id in self.seen_chunks:
|
||||
return self.cache.chunk_incref(chunk_id, target.stats)
|
||||
return self.cache.chunk_incref(chunk_id, size, target.stats)
|
||||
chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, wait=False, ro_type=ROBJ_FILE_STREAM)
|
||||
self.cache.repository.async_response(wait=False)
|
||||
self.seen_chunks.add(chunk_entry.id)
|
||||
|
|
|
@ -225,6 +225,7 @@ class CreateMixIn:
|
|||
progress=args.progress,
|
||||
lock_wait=self.lock_wait,
|
||||
permit_adhoc_cache=args.no_cache_sync,
|
||||
force_adhoc_cache=args.no_cache_sync_forced,
|
||||
cache_mode=args.files_cache_mode,
|
||||
iec=args.iec,
|
||||
) as cache:
|
||||
|
@ -803,6 +804,12 @@ class CreateMixIn:
|
|||
action="store_true",
|
||||
help="experimental: do not synchronize the cache. Implies not using the files cache.",
|
||||
)
|
||||
subparser.add_argument(
|
||||
"--no-cache-sync-forced",
|
||||
dest="no_cache_sync_forced",
|
||||
action="store_true",
|
||||
help="experimental: do not synchronize the cache (forced). Implies not using the files cache.",
|
||||
)
|
||||
subparser.add_argument(
|
||||
"--stdin-name",
|
||||
metavar="NAME",
|
||||
|
|
|
@ -59,16 +59,9 @@ class RInfoMixIn:
|
|||
output += f" out of {format_file_size(storage_quota, iec=args.iec)}"
|
||||
output += "\n"
|
||||
|
||||
output += (
|
||||
textwrap.dedent(
|
||||
"""
|
||||
Cache: {cache.path}
|
||||
Security dir: {security_dir}
|
||||
"""
|
||||
)
|
||||
.strip()
|
||||
.format(**info)
|
||||
)
|
||||
if hasattr(info["cache"], "path"):
|
||||
output += "Cache: {cache.path}\n".format(**info)
|
||||
output += "Security dir: {security_dir}\n".format(**info)
|
||||
|
||||
print(output)
|
||||
print(str(cache))
|
||||
|
|
|
@ -143,7 +143,7 @@ class TransferMixIn:
|
|||
transfer_size += size
|
||||
else:
|
||||
if not dry_run:
|
||||
chunk_entry = cache.chunk_incref(chunk_id, archive.stats)
|
||||
chunk_entry = cache.chunk_incref(chunk_id, size, archive.stats)
|
||||
chunks.append(chunk_entry)
|
||||
present_size += size
|
||||
if not dry_run:
|
||||
|
|
|
@ -35,8 +35,8 @@ from .platform import SaveFile
|
|||
from .remote import cache_if_remote
|
||||
from .repository import LIST_SCAN_LIMIT
|
||||
|
||||
# note: cmtime might me either a ctime or a mtime timestamp
|
||||
FileCacheEntry = namedtuple("FileCacheEntry", "age inode size cmtime chunk_ids")
|
||||
# note: cmtime might be either a ctime or a mtime timestamp, chunks is a list of ChunkListEntry
|
||||
FileCacheEntry = namedtuple("FileCacheEntry", "age inode size cmtime chunks")
|
||||
|
||||
|
||||
class SecurityManager:
|
||||
|
@ -248,15 +248,6 @@ def cache_dir(repository, path=None):
|
|||
return path or os.path.join(get_cache_dir(), repository.id_str)
|
||||
|
||||
|
||||
def files_cache_name():
|
||||
suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "")
|
||||
return "files." + suffix if suffix else "files"
|
||||
|
||||
|
||||
def discover_files_cache_name(path):
|
||||
return [fn for fn in os.listdir(path) if fn == "files" or fn.startswith("files.")][0]
|
||||
|
||||
|
||||
class CacheConfig:
|
||||
def __init__(self, repository, path=None, lock_wait=None):
|
||||
self.repository = repository
|
||||
|
@ -413,6 +404,7 @@ class Cache:
|
|||
progress=False,
|
||||
lock_wait=None,
|
||||
permit_adhoc_cache=False,
|
||||
force_adhoc_cache=False,
|
||||
cache_mode=FILES_CACHE_MODE_DISABLED,
|
||||
iec=False,
|
||||
):
|
||||
|
@ -428,9 +420,23 @@ class Cache:
|
|||
cache_mode=cache_mode,
|
||||
)
|
||||
|
||||
def newcache():
|
||||
return NewCache(
|
||||
manifest=manifest,
|
||||
path=path,
|
||||
warn_if_unencrypted=warn_if_unencrypted,
|
||||
progress=progress,
|
||||
iec=iec,
|
||||
lock_wait=lock_wait,
|
||||
cache_mode=cache_mode,
|
||||
)
|
||||
|
||||
def adhoc():
|
||||
return AdHocCache(manifest=manifest, lock_wait=lock_wait, iec=iec)
|
||||
|
||||
if force_adhoc_cache:
|
||||
return adhoc()
|
||||
|
||||
if not permit_adhoc_cache:
|
||||
return local()
|
||||
|
||||
|
@ -489,7 +495,302 @@ Total chunks: {0.total_chunks}
|
|||
return self.Summary(**stats)
|
||||
|
||||
|
||||
class LocalCache(CacheStatsMixin):
|
||||
class FilesCacheMixin:
|
||||
"""
|
||||
Massively accelerate processing of unchanged files by caching their chunks list.
|
||||
With that, we can avoid having to read and chunk them to get their chunks list.
|
||||
"""
|
||||
|
||||
FILES_CACHE_NAME = "files"
|
||||
|
||||
def __init__(self, cache_mode):
|
||||
self.cache_mode = cache_mode
|
||||
self.files = None
|
||||
self._newest_cmtime = None
|
||||
|
||||
def files_cache_name(self):
|
||||
suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "")
|
||||
return self.FILES_CACHE_NAME + "." + suffix if suffix else self.FILES_CACHE_NAME
|
||||
|
||||
def discover_files_cache_name(self, path):
|
||||
return [
|
||||
fn for fn in os.listdir(path) if fn == self.FILES_CACHE_NAME or fn.startswith(self.FILES_CACHE_NAME + ".")
|
||||
][0]
|
||||
|
||||
def _create_empty_files_cache(self, path):
|
||||
with SaveFile(os.path.join(path, self.files_cache_name()), binary=True):
|
||||
pass # empty file
|
||||
|
||||
def _read_files_cache(self):
|
||||
if "d" in self.cache_mode: # d(isabled)
|
||||
return
|
||||
|
||||
self.files = {}
|
||||
logger.debug("Reading files cache ...")
|
||||
files_cache_logger.debug("FILES-CACHE-LOAD: starting...")
|
||||
msg = None
|
||||
try:
|
||||
with IntegrityCheckedFile(
|
||||
path=os.path.join(self.path, self.files_cache_name()),
|
||||
write=False,
|
||||
integrity_data=self.cache_config.integrity.get(self.files_cache_name()),
|
||||
) as fd:
|
||||
u = msgpack.Unpacker(use_list=True)
|
||||
while True:
|
||||
data = fd.read(64 * 1024)
|
||||
if not data:
|
||||
break
|
||||
u.feed(data)
|
||||
try:
|
||||
for path_hash, item in u:
|
||||
entry = FileCacheEntry(*item)
|
||||
# in the end, this takes about 240 Bytes per file
|
||||
self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1))
|
||||
except (TypeError, ValueError) as exc:
|
||||
msg = "The files cache seems invalid. [%s]" % str(exc)
|
||||
break
|
||||
except OSError as exc:
|
||||
msg = "The files cache can't be read. [%s]" % str(exc)
|
||||
except FileIntegrityError as fie:
|
||||
msg = "The files cache is corrupted. [%s]" % str(fie)
|
||||
if msg is not None:
|
||||
logger.warning(msg)
|
||||
logger.warning("Continuing without files cache - expect lower performance.")
|
||||
self.files = {}
|
||||
files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files))
|
||||
|
||||
def _write_files_cache(self):
|
||||
if self._newest_cmtime is None:
|
||||
# was never set because no files were modified/added
|
||||
self._newest_cmtime = 2**63 - 1 # nanoseconds, good until y2262
|
||||
ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20))
|
||||
files_cache_logger.debug("FILES-CACHE-SAVE: starting...")
|
||||
with IntegrityCheckedFile(path=os.path.join(self.path, self.files_cache_name()), write=True) as fd:
|
||||
entry_count = 0
|
||||
for path_hash, item in self.files.items():
|
||||
# Only keep files seen in this backup that are older than newest cmtime seen in this backup -
|
||||
# this is to avoid issues with filesystem snapshots and cmtime granularity.
|
||||
# Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
|
||||
entry = FileCacheEntry(*msgpack.unpackb(item))
|
||||
if (
|
||||
entry.age == 0
|
||||
and timestamp_to_int(entry.cmtime) < self._newest_cmtime
|
||||
or entry.age > 0
|
||||
and entry.age < ttl
|
||||
):
|
||||
msgpack.pack((path_hash, entry), fd)
|
||||
entry_count += 1
|
||||
files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl)
|
||||
files_cache_logger.debug(
|
||||
"FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime
|
||||
)
|
||||
files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count)
|
||||
return fd.integrity_data
|
||||
|
||||
def file_known_and_unchanged(self, hashed_path, path_hash, st):
|
||||
"""
|
||||
Check if we know the file that has this path_hash (know == it is in our files cache) and
|
||||
whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode).
|
||||
|
||||
:param hashed_path: the file's path as we gave it to hash(hashed_path)
|
||||
:param path_hash: hash(hashed_path), to save some memory in the files cache
|
||||
:param st: the file's stat() result
|
||||
:return: known, chunks (known is True if we have infos about this file in the cache,
|
||||
chunks is a list[ChunkListEntry] IF the file has not changed, otherwise None).
|
||||
"""
|
||||
if not stat.S_ISREG(st.st_mode):
|
||||
return False, None
|
||||
cache_mode = self.cache_mode
|
||||
if "d" in cache_mode: # d(isabled)
|
||||
files_cache_logger.debug("UNKNOWN: files cache disabled")
|
||||
return False, None
|
||||
# note: r(echunk) does not need the files cache in this method, but the files cache will
|
||||
# be updated and saved to disk to memorize the files. To preserve previous generations in
|
||||
# the cache, this means that it also needs to get loaded from disk first.
|
||||
if "r" in cache_mode: # r(echunk)
|
||||
files_cache_logger.debug("UNKNOWN: rechunking enforced")
|
||||
return False, None
|
||||
entry = self.files.get(path_hash)
|
||||
if not entry:
|
||||
files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path)
|
||||
return False, None
|
||||
# we know the file!
|
||||
entry = FileCacheEntry(*msgpack.unpackb(entry))
|
||||
if "s" in cache_mode and entry.size != st.st_size:
|
||||
files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path)
|
||||
return True, None
|
||||
if "i" in cache_mode and entry.inode != st.st_ino:
|
||||
files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path)
|
||||
return True, None
|
||||
if "c" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns:
|
||||
files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path)
|
||||
return True, None
|
||||
elif "m" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns:
|
||||
files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path)
|
||||
return True, None
|
||||
# we ignored the inode number in the comparison above or it is still same.
|
||||
# if it is still the same, replacing it in the tuple doesn't change it.
|
||||
# if we ignored it, a reason for doing that is that files were moved to a new
|
||||
# disk / new fs (so a one-time change of inode number is expected) and we wanted
|
||||
# to avoid everything getting chunked again. to be able to re-enable the inode
|
||||
# number comparison in a future backup run (and avoid chunking everything
|
||||
# again at that time), we need to update the inode number in the cache with what
|
||||
# we see in the filesystem.
|
||||
self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0))
|
||||
chunks = [ChunkListEntry(*chunk) for chunk in entry.chunks] # convert to list of namedtuple
|
||||
return True, chunks
|
||||
|
||||
def memorize_file(self, hashed_path, path_hash, st, chunks):
|
||||
if not stat.S_ISREG(st.st_mode):
|
||||
return
|
||||
cache_mode = self.cache_mode
|
||||
# note: r(echunk) modes will update the files cache, d(isabled) mode won't
|
||||
if "d" in cache_mode:
|
||||
files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled")
|
||||
return
|
||||
if "c" in cache_mode:
|
||||
cmtime_type = "ctime"
|
||||
cmtime_ns = safe_ns(st.st_ctime_ns)
|
||||
elif "m" in cache_mode:
|
||||
cmtime_type = "mtime"
|
||||
cmtime_ns = safe_ns(st.st_mtime_ns)
|
||||
else: # neither 'c' nor 'm' in cache_mode, avoid UnboundLocalError
|
||||
cmtime_type = "ctime"
|
||||
cmtime_ns = safe_ns(st.st_ctime_ns)
|
||||
entry = FileCacheEntry(
|
||||
age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunks=chunks
|
||||
)
|
||||
self.files[path_hash] = msgpack.packb(entry)
|
||||
self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns)
|
||||
files_cache_logger.debug(
|
||||
"FILES-CACHE-UPDATE: put %r [has %s] <- %r",
|
||||
entry._replace(chunks="[%d entries]" % len(entry.chunks)),
|
||||
cmtime_type,
|
||||
hashed_path,
|
||||
)
|
||||
|
||||
|
||||
class ChunksMixin:
|
||||
"""
|
||||
Chunks index related code for misc. Cache implementations.
|
||||
"""
|
||||
|
||||
def chunk_incref(self, id, size, stats):
|
||||
assert isinstance(size, int) and size > 0
|
||||
if not self._txn_active:
|
||||
self.begin_txn()
|
||||
count, _size = self.chunks.incref(id)
|
||||
stats.update(size, False)
|
||||
return ChunkListEntry(id, size)
|
||||
|
||||
def chunk_decref(self, id, size, stats, wait=True):
|
||||
assert isinstance(size, int) and size > 0
|
||||
if not self._txn_active:
|
||||
self.begin_txn()
|
||||
count, _size = self.chunks.decref(id)
|
||||
if count == 0:
|
||||
del self.chunks[id]
|
||||
self.repository.delete(id, wait=wait)
|
||||
stats.update(-size, True)
|
||||
else:
|
||||
stats.update(-size, False)
|
||||
|
||||
def seen_chunk(self, id, size=None):
|
||||
if not self._txn_active:
|
||||
self.begin_txn()
|
||||
entry = self.chunks.get(id, ChunkIndexEntry(0, None))
|
||||
if entry.refcount and size is not None:
|
||||
assert isinstance(entry.size, int)
|
||||
if entry.size:
|
||||
# LocalCache: has existing size information and uses *size* to make an effort at detecting collisions.
|
||||
if size != entry.size:
|
||||
# we already have a chunk with that id, but different size.
|
||||
# this is either a hash collision (unlikely) or corruption or a bug.
|
||||
raise Exception(
|
||||
"chunk has same id [%r], but different size (stored: %d new: %d)!" % (id, entry.size, size)
|
||||
)
|
||||
else:
|
||||
# NewCache / AdHocCache:
|
||||
# Here *size* is used to update the chunk's size information, which will be zero for existing chunks.
|
||||
self.chunks[id] = entry._replace(size=size)
|
||||
return entry.refcount
|
||||
|
||||
def add_chunk(
|
||||
self,
|
||||
id,
|
||||
meta,
|
||||
data,
|
||||
*,
|
||||
stats,
|
||||
wait=True,
|
||||
compress=True,
|
||||
size=None,
|
||||
ctype=None,
|
||||
clevel=None,
|
||||
ro_type=ROBJ_FILE_STREAM,
|
||||
):
|
||||
assert ro_type is not None
|
||||
if not self._txn_active:
|
||||
self.begin_txn()
|
||||
if size is None:
|
||||
if compress:
|
||||
size = len(data) # data is still uncompressed
|
||||
else:
|
||||
raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
|
||||
refcount = self.seen_chunk(id, size)
|
||||
if refcount:
|
||||
return self.chunk_incref(id, size, stats)
|
||||
cdata = self.repo_objs.format(
|
||||
id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type
|
||||
)
|
||||
self.repository.put(id, cdata, wait=wait)
|
||||
self.chunks.add(id, 1, size)
|
||||
stats.update(size, not refcount)
|
||||
return ChunkListEntry(id, size)
|
||||
|
||||
def _load_chunks_from_repo(self):
|
||||
# Explicitly set the initial usable hash table capacity to avoid performance issues
|
||||
# due to hash table "resonance".
|
||||
# Since we're creating an archive, add 10 % from the start.
|
||||
num_chunks = len(self.repository)
|
||||
chunks = ChunkIndex(usable=num_chunks * 1.1)
|
||||
pi = ProgressIndicatorPercent(
|
||||
total=num_chunks, msg="Downloading chunk list... %3.0f%%", msgid="cache.download_chunks"
|
||||
)
|
||||
t0 = perf_counter()
|
||||
num_requests = 0
|
||||
marker = None
|
||||
while True:
|
||||
result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
|
||||
num_requests += 1
|
||||
if not result:
|
||||
break
|
||||
pi.show(increase=len(result))
|
||||
marker = result[-1]
|
||||
# All chunks from the repository have a refcount of MAX_VALUE, which is sticky,
|
||||
# therefore we can't/won't delete them. Chunks we added ourselves in this transaction
|
||||
# (e.g. checkpoint archives) are tracked correctly.
|
||||
init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
|
||||
for id_ in result:
|
||||
chunks[id_] = init_entry
|
||||
assert len(chunks) == num_chunks
|
||||
# LocalCache does not contain the manifest, either.
|
||||
del chunks[self.manifest.MANIFEST_ID]
|
||||
duration = perf_counter() - t0 or 0.01
|
||||
pi.finish()
|
||||
logger.debug(
|
||||
"Cache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s",
|
||||
num_chunks,
|
||||
duration,
|
||||
num_requests,
|
||||
format_file_size(num_chunks * 34 / duration),
|
||||
)
|
||||
# Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes.
|
||||
# Protocol overhead is neglected in this calculation.
|
||||
return chunks
|
||||
|
||||
|
||||
class LocalCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin):
|
||||
"""
|
||||
Persistent, local (client-side) cache.
|
||||
"""
|
||||
|
@ -512,15 +813,15 @@ class LocalCache(CacheStatsMixin):
|
|||
:param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison
|
||||
"""
|
||||
CacheStatsMixin.__init__(self, iec=iec)
|
||||
FilesCacheMixin.__init__(self, cache_mode)
|
||||
assert isinstance(manifest, Manifest)
|
||||
self.manifest = manifest
|
||||
self.repository = manifest.repository
|
||||
self.key = manifest.key
|
||||
self.repo_objs = manifest.repo_objs
|
||||
self.progress = progress
|
||||
self.cache_mode = cache_mode
|
||||
self.timestamp = None
|
||||
self.txn_active = False
|
||||
self._txn_active = False
|
||||
|
||||
self.path = cache_dir(self.repository, path)
|
||||
self.security_manager = SecurityManager(self.repository)
|
||||
|
@ -561,8 +862,7 @@ class LocalCache(CacheStatsMixin):
|
|||
self.cache_config.create()
|
||||
ChunkIndex().write(os.path.join(self.path, "chunks"))
|
||||
os.makedirs(os.path.join(self.path, "chunks.archive.d"))
|
||||
with SaveFile(os.path.join(self.path, files_cache_name()), binary=True):
|
||||
pass # empty file
|
||||
self._create_empty_files_cache(self.path)
|
||||
|
||||
def _do_open(self):
|
||||
self.cache_config.load()
|
||||
|
@ -572,10 +872,7 @@ class LocalCache(CacheStatsMixin):
|
|||
integrity_data=self.cache_config.integrity.get("chunks"),
|
||||
) as fd:
|
||||
self.chunks = ChunkIndex.read(fd)
|
||||
if "d" in self.cache_mode: # d(isabled)
|
||||
self.files = None
|
||||
else:
|
||||
self._read_files()
|
||||
self._read_files_cache()
|
||||
|
||||
def open(self):
|
||||
if not os.path.isdir(self.path):
|
||||
|
@ -588,42 +885,6 @@ class LocalCache(CacheStatsMixin):
|
|||
self.cache_config.close()
|
||||
self.cache_config = None
|
||||
|
||||
def _read_files(self):
|
||||
self.files = {}
|
||||
self._newest_cmtime = None
|
||||
logger.debug("Reading files cache ...")
|
||||
files_cache_logger.debug("FILES-CACHE-LOAD: starting...")
|
||||
msg = None
|
||||
try:
|
||||
with IntegrityCheckedFile(
|
||||
path=os.path.join(self.path, files_cache_name()),
|
||||
write=False,
|
||||
integrity_data=self.cache_config.integrity.get(files_cache_name()),
|
||||
) as fd:
|
||||
u = msgpack.Unpacker(use_list=True)
|
||||
while True:
|
||||
data = fd.read(64 * 1024)
|
||||
if not data:
|
||||
break
|
||||
u.feed(data)
|
||||
try:
|
||||
for path_hash, item in u:
|
||||
entry = FileCacheEntry(*item)
|
||||
# in the end, this takes about 240 Bytes per file
|
||||
self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1))
|
||||
except (TypeError, ValueError) as exc:
|
||||
msg = "The files cache seems invalid. [%s]" % str(exc)
|
||||
break
|
||||
except OSError as exc:
|
||||
msg = "The files cache can't be read. [%s]" % str(exc)
|
||||
except FileIntegrityError as fie:
|
||||
msg = "The files cache is corrupted. [%s]" % str(fie)
|
||||
if msg is not None:
|
||||
logger.warning(msg)
|
||||
logger.warning("Continuing without files cache - expect lower performance.")
|
||||
self.files = {}
|
||||
files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files))
|
||||
|
||||
def begin_txn(self):
|
||||
# Initialize transaction snapshot
|
||||
pi = ProgressIndicatorMessage(msgid="cache.begin_transaction")
|
||||
|
@ -635,48 +896,23 @@ class LocalCache(CacheStatsMixin):
|
|||
shutil.copy(os.path.join(self.path, "chunks"), txn_dir)
|
||||
pi.output("Initializing cache transaction: Reading files")
|
||||
try:
|
||||
shutil.copy(os.path.join(self.path, files_cache_name()), txn_dir)
|
||||
shutil.copy(os.path.join(self.path, self.files_cache_name()), txn_dir)
|
||||
except FileNotFoundError:
|
||||
with SaveFile(os.path.join(txn_dir, files_cache_name()), binary=True):
|
||||
pass # empty file
|
||||
self._create_empty_files_cache(txn_dir)
|
||||
os.replace(txn_dir, os.path.join(self.path, "txn.active"))
|
||||
self.txn_active = True
|
||||
self._txn_active = True
|
||||
pi.finish()
|
||||
|
||||
def commit(self):
|
||||
"""Commit transaction"""
|
||||
if not self.txn_active:
|
||||
if not self._txn_active:
|
||||
return
|
||||
self.security_manager.save(self.manifest, self.key)
|
||||
pi = ProgressIndicatorMessage(msgid="cache.commit")
|
||||
if self.files is not None:
|
||||
if self._newest_cmtime is None:
|
||||
# was never set because no files were modified/added
|
||||
self._newest_cmtime = 2**63 - 1 # nanoseconds, good until y2262
|
||||
ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20))
|
||||
pi.output("Saving files cache")
|
||||
files_cache_logger.debug("FILES-CACHE-SAVE: starting...")
|
||||
with IntegrityCheckedFile(path=os.path.join(self.path, files_cache_name()), write=True) as fd:
|
||||
entry_count = 0
|
||||
for path_hash, item in self.files.items():
|
||||
# Only keep files seen in this backup that are older than newest cmtime seen in this backup -
|
||||
# this is to avoid issues with filesystem snapshots and cmtime granularity.
|
||||
# Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
|
||||
entry = FileCacheEntry(*msgpack.unpackb(item))
|
||||
if (
|
||||
entry.age == 0
|
||||
and timestamp_to_int(entry.cmtime) < self._newest_cmtime
|
||||
or entry.age > 0
|
||||
and entry.age < ttl
|
||||
):
|
||||
msgpack.pack((path_hash, entry), fd)
|
||||
entry_count += 1
|
||||
files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl)
|
||||
files_cache_logger.debug(
|
||||
"FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime
|
||||
)
|
||||
files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count)
|
||||
self.cache_config.integrity[files_cache_name()] = fd.integrity_data
|
||||
integrity_data = self._write_files_cache()
|
||||
self.cache_config.integrity[self.files_cache_name()] = integrity_data
|
||||
pi.output("Saving chunks cache")
|
||||
with IntegrityCheckedFile(path=os.path.join(self.path, "chunks"), write=True) as fd:
|
||||
self.chunks.write(fd)
|
||||
|
@ -685,7 +921,7 @@ class LocalCache(CacheStatsMixin):
|
|||
self.cache_config.save(self.manifest, self.key)
|
||||
os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp"))
|
||||
shutil.rmtree(os.path.join(self.path, "txn.tmp"))
|
||||
self.txn_active = False
|
||||
self._txn_active = False
|
||||
pi.finish()
|
||||
|
||||
def rollback(self):
|
||||
|
@ -698,12 +934,12 @@ class LocalCache(CacheStatsMixin):
|
|||
if os.path.exists(txn_dir):
|
||||
shutil.copy(os.path.join(txn_dir, "config"), self.path)
|
||||
shutil.copy(os.path.join(txn_dir, "chunks"), self.path)
|
||||
shutil.copy(os.path.join(txn_dir, discover_files_cache_name(txn_dir)), self.path)
|
||||
shutil.copy(os.path.join(txn_dir, self.discover_files_cache_name(txn_dir)), self.path)
|
||||
txn_tmp = os.path.join(self.path, "txn.tmp")
|
||||
os.replace(txn_dir, txn_tmp)
|
||||
if os.path.exists(txn_tmp):
|
||||
shutil.rmtree(txn_tmp)
|
||||
self.txn_active = False
|
||||
self._txn_active = False
|
||||
self._do_open()
|
||||
|
||||
def sync(self):
|
||||
|
@ -930,8 +1166,7 @@ class LocalCache(CacheStatsMixin):
|
|||
shutil.rmtree(os.path.join(self.path, "chunks.archive.d"))
|
||||
os.makedirs(os.path.join(self.path, "chunks.archive.d"))
|
||||
self.chunks = ChunkIndex()
|
||||
with SaveFile(os.path.join(self.path, files_cache_name()), binary=True):
|
||||
pass # empty file
|
||||
self._create_empty_files_cache(self.path)
|
||||
self.cache_config.manifest_id = ""
|
||||
self.cache_config._config.set("cache", "manifest", "")
|
||||
|
||||
|
@ -948,149 +1183,171 @@ class LocalCache(CacheStatsMixin):
|
|||
self.cache_config.ignored_features.update(repo_features - my_features)
|
||||
self.cache_config.mandatory_features.update(repo_features & my_features)
|
||||
|
||||
def add_chunk(
|
||||
|
||||
class NewCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin):
|
||||
"""
|
||||
Like AdHocCache, but with a files cache.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
id,
|
||||
meta,
|
||||
data,
|
||||
*,
|
||||
stats,
|
||||
wait=True,
|
||||
compress=True,
|
||||
size=None,
|
||||
ctype=None,
|
||||
clevel=None,
|
||||
ro_type=ROBJ_FILE_STREAM,
|
||||
manifest,
|
||||
path=None,
|
||||
warn_if_unencrypted=True,
|
||||
progress=False,
|
||||
lock_wait=None,
|
||||
cache_mode=FILES_CACHE_MODE_DISABLED,
|
||||
iec=False,
|
||||
):
|
||||
assert ro_type is not None
|
||||
if not self.txn_active:
|
||||
self.begin_txn()
|
||||
if size is None and compress:
|
||||
size = len(data) # data is still uncompressed
|
||||
refcount = self.seen_chunk(id, size)
|
||||
if refcount:
|
||||
return self.chunk_incref(id, stats)
|
||||
if size is None:
|
||||
raise ValueError("when giving compressed data for a new chunk, the uncompressed size must be given also")
|
||||
cdata = self.repo_objs.format(
|
||||
id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type
|
||||
)
|
||||
self.repository.put(id, cdata, wait=wait)
|
||||
self.chunks.add(id, 1, size)
|
||||
stats.update(size, not refcount)
|
||||
return ChunkListEntry(id, size)
|
||||
|
||||
def seen_chunk(self, id, size=None):
|
||||
refcount, stored_size = self.chunks.get(id, ChunkIndexEntry(0, None))
|
||||
if size is not None and stored_size is not None and size != stored_size:
|
||||
# we already have a chunk with that id, but different size.
|
||||
# this is either a hash collision (unlikely) or corruption or a bug.
|
||||
raise Exception(
|
||||
"chunk has same id [%r], but different size (stored: %d new: %d)!" % (id, stored_size, size)
|
||||
)
|
||||
return refcount
|
||||
|
||||
def chunk_incref(self, id, stats, size=None):
|
||||
if not self.txn_active:
|
||||
self.begin_txn()
|
||||
count, _size = self.chunks.incref(id)
|
||||
stats.update(_size, False)
|
||||
return ChunkListEntry(id, _size)
|
||||
|
||||
def chunk_decref(self, id, stats, wait=True):
|
||||
if not self.txn_active:
|
||||
self.begin_txn()
|
||||
count, size = self.chunks.decref(id)
|
||||
if count == 0:
|
||||
del self.chunks[id]
|
||||
self.repository.delete(id, wait=wait)
|
||||
stats.update(-size, True)
|
||||
else:
|
||||
stats.update(-size, False)
|
||||
|
||||
def file_known_and_unchanged(self, hashed_path, path_hash, st):
|
||||
"""
|
||||
Check if we know the file that has this path_hash (know == it is in our files cache) and
|
||||
whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode).
|
||||
|
||||
:param hashed_path: the file's path as we gave it to hash(hashed_path)
|
||||
:param path_hash: hash(hashed_path), to save some memory in the files cache
|
||||
:param st: the file's stat() result
|
||||
:return: known, ids (known is True if we have infos about this file in the cache,
|
||||
ids is the list of chunk ids IF the file has not changed, otherwise None).
|
||||
:param warn_if_unencrypted: print warning if accessing unknown unencrypted repository
|
||||
:param lock_wait: timeout for lock acquisition (int [s] or None [wait forever])
|
||||
:param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison
|
||||
"""
|
||||
if not stat.S_ISREG(st.st_mode):
|
||||
return False, None
|
||||
cache_mode = self.cache_mode
|
||||
if "d" in cache_mode: # d(isabled)
|
||||
files_cache_logger.debug("UNKNOWN: files cache disabled")
|
||||
return False, None
|
||||
# note: r(echunk) does not need the files cache in this method, but the files cache will
|
||||
# be updated and saved to disk to memorize the files. To preserve previous generations in
|
||||
# the cache, this means that it also needs to get loaded from disk first.
|
||||
if "r" in cache_mode: # r(echunk)
|
||||
files_cache_logger.debug("UNKNOWN: rechunking enforced")
|
||||
return False, None
|
||||
entry = self.files.get(path_hash)
|
||||
if not entry:
|
||||
files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path)
|
||||
return False, None
|
||||
# we know the file!
|
||||
entry = FileCacheEntry(*msgpack.unpackb(entry))
|
||||
if "s" in cache_mode and entry.size != st.st_size:
|
||||
files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path)
|
||||
return True, None
|
||||
if "i" in cache_mode and entry.inode != st.st_ino:
|
||||
files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path)
|
||||
return True, None
|
||||
if "c" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns:
|
||||
files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path)
|
||||
return True, None
|
||||
elif "m" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns:
|
||||
files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path)
|
||||
return True, None
|
||||
# we ignored the inode number in the comparison above or it is still same.
|
||||
# if it is still the same, replacing it in the tuple doesn't change it.
|
||||
# if we ignored it, a reason for doing that is that files were moved to a new
|
||||
# disk / new fs (so a one-time change of inode number is expected) and we wanted
|
||||
# to avoid everything getting chunked again. to be able to re-enable the inode
|
||||
# number comparison in a future backup run (and avoid chunking everything
|
||||
# again at that time), we need to update the inode number in the cache with what
|
||||
# we see in the filesystem.
|
||||
self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0))
|
||||
return True, entry.chunk_ids
|
||||
CacheStatsMixin.__init__(self, iec=iec)
|
||||
FilesCacheMixin.__init__(self, cache_mode)
|
||||
assert isinstance(manifest, Manifest)
|
||||
self.manifest = manifest
|
||||
self.repository = manifest.repository
|
||||
self.key = manifest.key
|
||||
self.repo_objs = manifest.repo_objs
|
||||
self.progress = progress
|
||||
self.timestamp = None
|
||||
self._txn_active = False
|
||||
|
||||
def memorize_file(self, hashed_path, path_hash, st, ids):
|
||||
if not stat.S_ISREG(st.st_mode):
|
||||
self.path = cache_dir(self.repository, path)
|
||||
self.security_manager = SecurityManager(self.repository)
|
||||
self.cache_config = CacheConfig(self.repository, self.path, lock_wait)
|
||||
|
||||
# Warn user before sending data to a never seen before unencrypted repository
|
||||
if not os.path.exists(self.path):
|
||||
self.security_manager.assert_access_unknown(warn_if_unencrypted, manifest, self.key)
|
||||
self.create()
|
||||
|
||||
self.open()
|
||||
try:
|
||||
self.security_manager.assert_secure(manifest, self.key, cache_config=self.cache_config)
|
||||
|
||||
if not self.check_cache_compatibility():
|
||||
self.wipe_cache()
|
||||
|
||||
self.update_compatibility()
|
||||
except: # noqa
|
||||
self.close()
|
||||
raise
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.close()
|
||||
|
||||
def create(self):
|
||||
"""Create a new empty cache at `self.path`"""
|
||||
os.makedirs(self.path)
|
||||
with open(os.path.join(self.path, "README"), "w") as fd:
|
||||
fd.write(CACHE_README)
|
||||
self.cache_config.create()
|
||||
self._create_empty_files_cache(self.path)
|
||||
|
||||
def _do_open(self):
|
||||
self.cache_config.load()
|
||||
self.chunks = self._load_chunks_from_repo()
|
||||
self._read_files_cache()
|
||||
|
||||
def open(self):
|
||||
if not os.path.isdir(self.path):
|
||||
raise Exception("%s Does not look like a Borg cache" % self.path)
|
||||
self.cache_config.open()
|
||||
self.rollback()
|
||||
|
||||
def close(self):
|
||||
if self.cache_config is not None:
|
||||
self.cache_config.close()
|
||||
self.cache_config = None
|
||||
|
||||
def begin_txn(self):
|
||||
# Initialize transaction snapshot
|
||||
pi = ProgressIndicatorMessage(msgid="cache.begin_transaction")
|
||||
txn_dir = os.path.join(self.path, "txn.tmp")
|
||||
os.mkdir(txn_dir)
|
||||
pi.output("Initializing cache transaction: Reading config")
|
||||
shutil.copy(os.path.join(self.path, "config"), txn_dir)
|
||||
pi.output("Initializing cache transaction: Reading files")
|
||||
try:
|
||||
shutil.copy(os.path.join(self.path, self.files_cache_name()), txn_dir)
|
||||
except FileNotFoundError:
|
||||
self._create_empty_files_cache(txn_dir)
|
||||
os.replace(txn_dir, os.path.join(self.path, "txn.active"))
|
||||
pi.finish()
|
||||
self._txn_active = True
|
||||
|
||||
def commit(self):
|
||||
if not self._txn_active:
|
||||
return
|
||||
cache_mode = self.cache_mode
|
||||
# note: r(echunk) modes will update the files cache, d(isabled) mode won't
|
||||
if "d" in cache_mode:
|
||||
files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled")
|
||||
return
|
||||
if "c" in cache_mode:
|
||||
cmtime_type = "ctime"
|
||||
cmtime_ns = safe_ns(st.st_ctime_ns)
|
||||
elif "m" in cache_mode:
|
||||
cmtime_type = "mtime"
|
||||
cmtime_ns = safe_ns(st.st_mtime_ns)
|
||||
else: # neither 'c' nor 'm' in cache_mode, avoid UnboundLocalError
|
||||
cmtime_type = "ctime"
|
||||
cmtime_ns = safe_ns(st.st_ctime_ns)
|
||||
entry = FileCacheEntry(
|
||||
age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunk_ids=ids
|
||||
)
|
||||
self.files[path_hash] = msgpack.packb(entry)
|
||||
self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns)
|
||||
files_cache_logger.debug(
|
||||
"FILES-CACHE-UPDATE: put %r [has %s] <- %r",
|
||||
entry._replace(chunk_ids="[%d entries]" % len(entry.chunk_ids)),
|
||||
cmtime_type,
|
||||
hashed_path,
|
||||
)
|
||||
self.security_manager.save(self.manifest, self.key)
|
||||
pi = ProgressIndicatorMessage(msgid="cache.commit")
|
||||
if self.files is not None:
|
||||
pi.output("Saving files cache")
|
||||
integrity_data = self._write_files_cache()
|
||||
self.cache_config.integrity[self.files_cache_name()] = integrity_data
|
||||
pi.output("Saving cache config")
|
||||
self.cache_config.save(self.manifest, self.key)
|
||||
os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp"))
|
||||
shutil.rmtree(os.path.join(self.path, "txn.tmp"))
|
||||
self._txn_active = False
|
||||
pi.finish()
|
||||
|
||||
def rollback(self):
|
||||
# Remove partial transaction
|
||||
if os.path.exists(os.path.join(self.path, "txn.tmp")):
|
||||
shutil.rmtree(os.path.join(self.path, "txn.tmp"))
|
||||
# Roll back active transaction
|
||||
txn_dir = os.path.join(self.path, "txn.active")
|
||||
if os.path.exists(txn_dir):
|
||||
shutil.copy(os.path.join(txn_dir, "config"), self.path)
|
||||
shutil.copy(os.path.join(txn_dir, self.discover_files_cache_name(txn_dir)), self.path)
|
||||
txn_tmp = os.path.join(self.path, "txn.tmp")
|
||||
os.replace(txn_dir, txn_tmp)
|
||||
if os.path.exists(txn_tmp):
|
||||
shutil.rmtree(txn_tmp)
|
||||
self._txn_active = False
|
||||
self._do_open()
|
||||
|
||||
def check_cache_compatibility(self):
|
||||
my_features = Manifest.SUPPORTED_REPO_FEATURES
|
||||
if self.cache_config.ignored_features & my_features:
|
||||
# The cache might not contain references of chunks that need a feature that is mandatory for some operation
|
||||
# and which this version supports. To avoid corruption while executing that operation force rebuild.
|
||||
return False
|
||||
if not self.cache_config.mandatory_features <= my_features:
|
||||
# The cache was build with consideration to at least one feature that this version does not understand.
|
||||
# This client might misinterpret the cache. Thus force a rebuild.
|
||||
return False
|
||||
return True
|
||||
|
||||
def wipe_cache(self):
|
||||
logger.warning("Discarding incompatible cache and forcing a cache rebuild")
|
||||
self.chunks = ChunkIndex()
|
||||
self._create_empty_files_cache(self.path)
|
||||
self.cache_config.manifest_id = ""
|
||||
self.cache_config._config.set("cache", "manifest", "")
|
||||
|
||||
self.cache_config.ignored_features = set()
|
||||
self.cache_config.mandatory_features = set()
|
||||
|
||||
def update_compatibility(self):
|
||||
operation_to_features_map = self.manifest.get_all_mandatory_features()
|
||||
my_features = Manifest.SUPPORTED_REPO_FEATURES
|
||||
repo_features = set()
|
||||
for operation, features in operation_to_features_map.items():
|
||||
repo_features.update(features)
|
||||
|
||||
self.cache_config.ignored_features.update(repo_features - my_features)
|
||||
self.cache_config.mandatory_features.update(repo_features & my_features)
|
||||
|
||||
|
||||
class AdHocCache(CacheStatsMixin):
|
||||
class AdHocCache(CacheStatsMixin, ChunksMixin):
|
||||
"""
|
||||
Ad-hoc, non-persistent cache.
|
||||
|
||||
|
@ -1135,59 +1392,9 @@ Chunk index: {0.total_unique_chunks:20d} unknown"""
|
|||
files_cache_logger.debug("UNKNOWN: files cache not implemented")
|
||||
return False, None
|
||||
|
||||
def memorize_file(self, hashed_path, path_hash, st, ids):
|
||||
def memorize_file(self, hashed_path, path_hash, st, chunks):
|
||||
pass
|
||||
|
||||
def add_chunk(self, id, meta, data, *, stats, wait=True, compress=True, size=None, ro_type=ROBJ_FILE_STREAM):
|
||||
assert ro_type is not None
|
||||
if not self._txn_active:
|
||||
self.begin_txn()
|
||||
if size is None and compress:
|
||||
size = len(data) # data is still uncompressed
|
||||
if size is None:
|
||||
raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
|
||||
refcount = self.seen_chunk(id, size)
|
||||
if refcount:
|
||||
return self.chunk_incref(id, stats, size=size)
|
||||
cdata = self.repo_objs.format(id, meta, data, compress=compress, ro_type=ro_type)
|
||||
self.repository.put(id, cdata, wait=wait)
|
||||
self.chunks.add(id, 1, size)
|
||||
stats.update(size, not refcount)
|
||||
return ChunkListEntry(id, size)
|
||||
|
||||
def seen_chunk(self, id, size=None):
|
||||
if not self._txn_active:
|
||||
self.begin_txn()
|
||||
entry = self.chunks.get(id, ChunkIndexEntry(0, None))
|
||||
if entry.refcount and size and not entry.size:
|
||||
# The LocalCache has existing size information and uses *size* to make an effort at detecting collisions.
|
||||
# This is of course not possible for the AdHocCache.
|
||||
# Here *size* is used to update the chunk's size information, which will be zero for existing chunks.
|
||||
self.chunks[id] = entry._replace(size=size)
|
||||
return entry.refcount
|
||||
|
||||
def chunk_incref(self, id, stats, size=None):
|
||||
if not self._txn_active:
|
||||
self.begin_txn()
|
||||
count, _size = self.chunks.incref(id)
|
||||
# When _size is 0 and size is not given, then this chunk has not been locally visited yet (seen_chunk with
|
||||
# size or add_chunk); we can't add references to those (size=0 is invalid) and generally don't try to.
|
||||
size = _size or size
|
||||
assert size
|
||||
stats.update(size, False)
|
||||
return ChunkListEntry(id, size)
|
||||
|
||||
def chunk_decref(self, id, stats, wait=True):
|
||||
if not self._txn_active:
|
||||
self.begin_txn()
|
||||
count, size = self.chunks.decref(id)
|
||||
if count == 0:
|
||||
del self.chunks[id]
|
||||
self.repository.delete(id, wait=wait)
|
||||
stats.update(-size, True)
|
||||
else:
|
||||
stats.update(-size, False)
|
||||
|
||||
def commit(self):
|
||||
if not self._txn_active:
|
||||
return
|
||||
|
@ -1200,41 +1407,4 @@ Chunk index: {0.total_unique_chunks:20d} unknown"""
|
|||
|
||||
def begin_txn(self):
|
||||
self._txn_active = True
|
||||
# Explicitly set the initial usable hash table capacity to avoid performance issues
|
||||
# due to hash table "resonance".
|
||||
# Since we're creating an archive, add 10 % from the start.
|
||||
num_chunks = len(self.repository)
|
||||
self.chunks = ChunkIndex(usable=num_chunks * 1.1)
|
||||
pi = ProgressIndicatorPercent(
|
||||
total=num_chunks, msg="Downloading chunk list... %3.0f%%", msgid="cache.download_chunks"
|
||||
)
|
||||
t0 = perf_counter()
|
||||
num_requests = 0
|
||||
marker = None
|
||||
while True:
|
||||
result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
|
||||
num_requests += 1
|
||||
if not result:
|
||||
break
|
||||
pi.show(increase=len(result))
|
||||
marker = result[-1]
|
||||
# All chunks from the repository have a refcount of MAX_VALUE, which is sticky,
|
||||
# therefore we can't/won't delete them. Chunks we added ourselves in this transaction
|
||||
# (e.g. checkpoint archives) are tracked correctly.
|
||||
init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
|
||||
for id_ in result:
|
||||
self.chunks[id_] = init_entry
|
||||
assert len(self.chunks) == num_chunks
|
||||
# LocalCache does not contain the manifest, either.
|
||||
del self.chunks[self.manifest.MANIFEST_ID]
|
||||
duration = perf_counter() - t0 or 0.01
|
||||
pi.finish()
|
||||
logger.debug(
|
||||
"AdHocCache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s",
|
||||
num_chunks,
|
||||
duration,
|
||||
num_requests,
|
||||
format_file_size(num_chunks * 34 / duration),
|
||||
)
|
||||
# Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes.
|
||||
# Protocol overhead is neglected in this calculation.
|
||||
self.chunks = self._load_chunks_from_repo()
|
||||
|
|
|
@ -1184,13 +1184,13 @@ class BorgJsonEncoder(json.JSONEncoder):
|
|||
from ..repository import Repository
|
||||
from ..remote import RemoteRepository
|
||||
from ..archive import Archive
|
||||
from ..cache import LocalCache, AdHocCache
|
||||
from ..cache import LocalCache, AdHocCache, NewCache
|
||||
|
||||
if isinstance(o, Repository) or isinstance(o, RemoteRepository):
|
||||
return {"id": bin_to_hex(o.id), "location": o._location.canonical_path()}
|
||||
if isinstance(o, Archive):
|
||||
return o.info()
|
||||
if isinstance(o, LocalCache):
|
||||
if isinstance(o, (LocalCache, NewCache)):
|
||||
return {"path": o.path, "stats": o.stats()}
|
||||
if isinstance(o, AdHocCache):
|
||||
return {"stats": o.stats()}
|
||||
|
|
|
@ -100,7 +100,7 @@ def format_timedelta(td):
|
|||
s = ts % 60
|
||||
m = int(ts / 60) % 60
|
||||
h = int(ts / 3600) % 24
|
||||
txt = "%.2f seconds" % s
|
||||
txt = "%.3f seconds" % s
|
||||
if m:
|
||||
txt = "%d minutes %s" % (m, txt)
|
||||
if h:
|
||||
|
|
|
@ -61,8 +61,8 @@ def test_stats_format(stats):
|
|||
Number of files: 1
|
||||
Original size: 20 B
|
||||
Deduplicated size: 20 B
|
||||
Time spent in hashing: 0.00 seconds
|
||||
Time spent in chunking: 0.00 seconds
|
||||
Time spent in hashing: 0.000 seconds
|
||||
Time spent in chunking: 0.000 seconds
|
||||
Added files: 0
|
||||
Unchanged files: 0
|
||||
Modified files: 0
|
||||
|
|
|
@ -338,10 +338,11 @@ def test_extra_chunks(archivers, request):
|
|||
with Repository(archiver.repository_location, exclusive=True) as repository:
|
||||
repository.put(b"01234567890123456789012345678901", b"xxxx")
|
||||
repository.commit(compact=False)
|
||||
cmd(archiver, "check", exit_code=1)
|
||||
cmd(archiver, "check", exit_code=1)
|
||||
output = cmd(archiver, "check", "-v", exit_code=0) # orphans are not considered warnings anymore
|
||||
assert "1 orphaned (unused) objects found." in output
|
||||
cmd(archiver, "check", "--repair", exit_code=0)
|
||||
cmd(archiver, "check", exit_code=0)
|
||||
output = cmd(archiver, "check", "-v", exit_code=0)
|
||||
assert "orphaned (unused) objects found." not in output
|
||||
cmd(archiver, "extract", "archive1", "--dry-run", exit_code=0)
|
||||
|
||||
|
||||
|
|
|
@ -277,6 +277,7 @@ def test_unknown_mandatory_feature_in_cache(archivers, request):
|
|||
repository._location = Location(archiver.repository_location)
|
||||
manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
|
||||
with Cache(repository, manifest) as cache:
|
||||
is_localcache = isinstance(cache, LocalCache)
|
||||
cache.begin_txn()
|
||||
cache.cache_config.mandatory_features = {"unknown-feature"}
|
||||
cache.commit()
|
||||
|
@ -295,7 +296,8 @@ def test_unknown_mandatory_feature_in_cache(archivers, request):
|
|||
with patch.object(LocalCache, "wipe_cache", wipe_wrapper):
|
||||
cmd(archiver, "create", "test", "input")
|
||||
|
||||
assert called
|
||||
if is_localcache:
|
||||
assert called
|
||||
|
||||
with Repository(archiver.repository_path, exclusive=True) as repository:
|
||||
if remote_repo:
|
||||
|
|
|
@ -32,7 +32,7 @@ def test_check_corrupted_repository(archiver):
|
|||
def corrupt_archiver(archiver):
|
||||
create_test_files(archiver.input_path)
|
||||
cmd(archiver, "rcreate", RK_ENCRYPTION)
|
||||
archiver.cache_path = json.loads(cmd(archiver, "rinfo", "--json"))["cache"]["path"]
|
||||
archiver.cache_path = json.loads(cmd(archiver, "rinfo", "--json"))["cache"].get("path")
|
||||
|
||||
|
||||
def corrupt(file, amount=1):
|
||||
|
@ -45,7 +45,14 @@ def corrupt(file, amount=1):
|
|||
|
||||
def test_cache_chunks(archiver):
|
||||
corrupt_archiver(archiver)
|
||||
corrupt(os.path.join(archiver.cache_path, "chunks"))
|
||||
if archiver.cache_path is None:
|
||||
pytest.skip("no cache path for this kind of Cache implementation")
|
||||
|
||||
chunks_index = os.path.join(archiver.cache_path, "chunks")
|
||||
if not os.path.exists(chunks_index):
|
||||
pytest.skip("Only works with LocalCache.")
|
||||
|
||||
corrupt(chunks_index)
|
||||
if archiver.FORK_DEFAULT:
|
||||
out = cmd(archiver, "rinfo", exit_code=2)
|
||||
assert "failed integrity check" in out
|
||||
|
@ -56,6 +63,9 @@ def test_cache_chunks(archiver):
|
|||
|
||||
def test_cache_files(archiver):
|
||||
corrupt_archiver(archiver)
|
||||
if archiver.cache_path is None:
|
||||
pytest.skip("no cache path for this kind of Cache implementation")
|
||||
|
||||
cmd(archiver, "create", "test", "input")
|
||||
corrupt(os.path.join(archiver.cache_path, "files"))
|
||||
out = cmd(archiver, "create", "test1", "input")
|
||||
|
@ -65,6 +75,9 @@ def test_cache_files(archiver):
|
|||
|
||||
def test_chunks_archive(archiver):
|
||||
corrupt_archiver(archiver)
|
||||
if archiver.cache_path is None:
|
||||
pytest.skip("no cache path for this kind of Cache implementation")
|
||||
|
||||
cmd(archiver, "create", "test1", "input")
|
||||
# Find ID of test1, so we can corrupt it later :)
|
||||
target_id = cmd(archiver, "rlist", "--format={id}{NL}").strip()
|
||||
|
@ -75,6 +88,8 @@ def test_chunks_archive(archiver):
|
|||
cmd(archiver, "rinfo", "--json")
|
||||
|
||||
chunks_archive = os.path.join(archiver.cache_path, "chunks.archive.d")
|
||||
if not os.path.exists(chunks_archive):
|
||||
pytest.skip("Only LocalCache has a per-archive chunks index cache.")
|
||||
assert len(os.listdir(chunks_archive)) == 4 # two archives, one chunks cache and one .integrity file each
|
||||
|
||||
corrupt(os.path.join(chunks_archive, target_id + ".compact"))
|
||||
|
@ -96,6 +111,9 @@ def test_chunks_archive(archiver):
|
|||
|
||||
def test_old_version_interfered(archiver):
|
||||
corrupt_archiver(archiver)
|
||||
if archiver.cache_path is None:
|
||||
pytest.skip("no cache path for this kind of Cache implementation")
|
||||
|
||||
# Modify the main manifest ID without touching the manifest ID in the integrity section.
|
||||
# This happens if a version without integrity checking modifies the cache.
|
||||
config_path = os.path.join(archiver.cache_path, "config")
|
||||
|
|
|
@ -168,7 +168,12 @@ def test_debug_refcount_obj(archivers, request):
|
|||
create_json = json.loads(cmd(archiver, "create", "--json", "test", "input"))
|
||||
archive_id = create_json["archive"]["id"]
|
||||
output = cmd(archiver, "debug", "refcount-obj", archive_id).strip()
|
||||
assert output == f"object {archive_id} has 1 referrers [info from chunks cache]."
|
||||
# LocalCache does precise refcounting, so we'll get 1 reference for the archive.
|
||||
# AdHocCache or NewCache doesn't, we'll get ChunkIndex.MAX_VALUE as refcount.
|
||||
assert (
|
||||
output == f"object {archive_id} has 1 referrers [info from chunks cache]."
|
||||
or output == f"object {archive_id} has 4294966271 referrers [info from chunks cache]."
|
||||
)
|
||||
|
||||
# Invalid IDs do not abort or return an error
|
||||
output = cmd(archiver, "debug", "refcount-obj", "124", "xyza").strip()
|
||||
|
|
|
@ -189,7 +189,7 @@ class TestAdHocCache:
|
|||
|
||||
def test_does_not_delete_existing_chunks(self, repository, cache):
|
||||
assert cache.seen_chunk(H(1)) == ChunkIndex.MAX_VALUE
|
||||
cache.chunk_decref(H(1), Statistics())
|
||||
cache.chunk_decref(H(1), 1, Statistics())
|
||||
assert repository.get(H(1)) == b"1234"
|
||||
|
||||
def test_seen_chunk_add_chunk_size(self, cache):
|
||||
|
@ -199,7 +199,7 @@ class TestAdHocCache:
|
|||
"""E.g. checkpoint archives"""
|
||||
cache.add_chunk(H(5), {}, b"1010", stats=Statistics())
|
||||
assert cache.seen_chunk(H(5)) == 1
|
||||
cache.chunk_decref(H(5), Statistics())
|
||||
cache.chunk_decref(H(5), 1, Statistics())
|
||||
assert not cache.seen_chunk(H(5))
|
||||
with pytest.raises(Repository.ObjectNotFound):
|
||||
repository.get(H(5))
|
||||
|
@ -220,9 +220,9 @@ class TestAdHocCache:
|
|||
|
||||
def test_incref_after_add_chunk(self, cache):
|
||||
assert cache.add_chunk(H(3), {}, b"5678", stats=Statistics()) == (H(3), 4)
|
||||
assert cache.chunk_incref(H(3), Statistics()) == (H(3), 4)
|
||||
assert cache.chunk_incref(H(3), 4, Statistics()) == (H(3), 4)
|
||||
|
||||
def test_existing_incref_after_add_chunk(self, cache):
|
||||
"""This case occurs with part files, see Archive.chunk_file."""
|
||||
assert cache.add_chunk(H(1), {}, b"5678", stats=Statistics()) == (H(1), 4)
|
||||
assert cache.chunk_incref(H(1), Statistics()) == (H(1), 4)
|
||||
assert cache.chunk_incref(H(1), 4, Statistics()) == (H(1), 4)
|
||||
|
|
|
@ -368,7 +368,7 @@ def test_text_invalid(text):
|
|||
def test_format_timedelta():
|
||||
t0 = datetime(2001, 1, 1, 10, 20, 3, 0)
|
||||
t1 = datetime(2001, 1, 1, 12, 20, 4, 100000)
|
||||
assert format_timedelta(t1 - t0) == "2 hours 1.10 seconds"
|
||||
assert format_timedelta(t1 - t0) == "2 hours 1.100 seconds"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
|
|
@ -84,8 +84,8 @@ class UpgraderFrom12To20:
|
|||
chunks, chunks_healthy = self.hlm.retrieve(id=hlid, default=(None, None))
|
||||
if chunks is not None:
|
||||
item.chunks = chunks
|
||||
for chunk_id, _ in chunks:
|
||||
self.cache.chunk_incref(chunk_id, self.archive.stats)
|
||||
for chunk_id, chunk_size in chunks:
|
||||
self.cache.chunk_incref(chunk_id, chunk_size, self.archive.stats)
|
||||
if chunks_healthy is not None:
|
||||
item.chunks_healthy = chunks
|
||||
del item.source # not used for hardlinks any more, replaced by hlid
|
||||
|
|
Loading…
Reference in New Issue