Compare commits

...

17 Commits

Author SHA1 Message Date
TW 7028d277f8
Merge 1389bd10bc into 411c763fb8 2024-04-04 06:33:33 -05:00
Thomas Waldmann 1389bd10bc
fix test_debug_refcount_obj for misc. refcounts 2024-04-04 13:33:17 +02:00
Thomas Waldmann 8f2d41ab6c
fix test part that only works with LocalCache 2024-04-04 13:33:15 +02:00
Thomas Waldmann 30f6f1a4c4
skip tests requiring the chunks index (archive)
Only LocalCache implements these.
2024-04-04 13:33:14 +02:00
Thomas Waldmann b05e3067b8
check: do not consider orphan chunks a problem
if we use AdHocCache or NewCache, we do not have precise refcounting.
thus, we do not delete repo objects as their refcount does not go to zero.

check --repair will just remove the orphans.
2024-04-04 13:33:12 +02:00
Thomas Waldmann 6087d7487b
WIP (tests failing) implement NewCache
Also:
- move common code to ChunksMixin
- always use ._txn_active (not .txn_active)
2024-04-04 13:33:10 +02:00
Thomas Waldmann 5ec557908c
AdHocCache has no cache persistence
thus:

- no cache.path
- skip on-disk cache corruption tests for AdHocCache
2024-04-04 13:33:09 +02:00
Thomas Waldmann 186d536d55
tolerate missing chunks with delete --force
if a chunk is missing in repo, it will also be missing in a ad-hoc
built chunks index.
2024-04-04 13:33:07 +02:00
Thomas Waldmann 62289ff513
refactor files cache code into FilesCacheMixin class 2024-04-04 13:33:05 +02:00
Thomas Waldmann 08003f4065
create --no-cache-sync-forced option
when given, force using the AdHocCache.
2024-04-04 13:33:04 +02:00
Thomas Waldmann 26c6a1035a
fix AdHocCache.add_chunk signature (ctype, clevel kwargs) 2024-04-04 13:33:02 +02:00
Thomas Waldmann eafb80ccc0
always give id and size to chunk_incref/chunk_decref
incref: returns (id, size), so it needs the size if it can't
get it from the chunks index. also needed for updating stats.

decref: caller does not always have the chunk size (e.g. for
metadata chunks),
as we consider 0 to be an invalid size, we call with size == 1
in that case. thus, stats might be slightly off.
2024-04-04 13:33:01 +02:00
Thomas Waldmann c5e130d03d
files cache: add chunk size information
the files cache used to have only the chunk ids,
so it had to rely on the chunks index having the
size information - which is problematic with e.g.
the AdhocCache (has size==0 for all not new chunks) and blocked using the files cache there.
2024-04-04 13:32:55 +02:00
TW 411c763fb8
Merge pull request #8182 from ThomasWaldmann/fix-test-ht-master
format_timedelta: use 3 decimal digits (ms)
2024-04-04 13:31:58 +02:00
Thomas Waldmann 54a85bf56d
format_timedelta: use 3 decimal digits (ms)
maybe this fixes the frequently failing test.
also, giving ms makes more sense than 10ms granularity.
2024-04-04 12:45:28 +02:00
TW 4d2eb0cb1b
Merge pull request #8181 from ThomasWaldmann/github-actions-update-master
update github actions
2024-04-03 19:33:35 +02:00
Thomas Waldmann d893b899fc
update github actions
(avoid deprecation warnings)
2024-04-03 18:26:35 +02:00
17 changed files with 579 additions and 379 deletions

View File

@ -9,7 +9,7 @@ jobs:
lint:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: psf/black@stable
with:
version: "~= 23.0"

View File

@ -78,11 +78,11 @@ jobs:
# just fetching 1 commit is not enough for setuptools-scm, so we fetch all
fetch-depth: 0
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Cache pip
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('requirements.d/development.txt') }}
@ -114,7 +114,7 @@ jobs:
#sudo -E bash -c "tox -e py"
tox --skip-missing-interpreters
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
env:
OS: ${{ runner.os }}
python: ${{ matrix.python-version }}

View File

@ -643,14 +643,14 @@ Duration: {0.duration}
# so we can already remove it here, the next .save() will then commit this cleanup.
# remove its manifest entry, remove its ArchiveItem chunk, remove its item_ptrs chunks:
del self.manifest.archives[self.checkpoint_name]
self.cache.chunk_decref(self.id, self.stats)
self.cache.chunk_decref(self.id, 1, self.stats)
for id in metadata.item_ptrs:
self.cache.chunk_decref(id, self.stats)
self.cache.chunk_decref(id, 1, self.stats)
# also get rid of that part item, we do not want to have it in next checkpoint or final archive
tail_chunks = self.items_buffer.restore_chunks_state()
# tail_chunks contain the tail of the archive items metadata stream, not needed for next commit.
for id in tail_chunks:
self.cache.chunk_decref(id, self.stats)
self.cache.chunk_decref(id, 1, self.stats) # TODO can we have real size here?
def save(self, name=None, comment=None, timestamp=None, stats=None, additional_metadata=None):
name = name or self.name
@ -1024,7 +1024,7 @@ Duration: {0.duration}
new_id = self.key.id_hash(data)
self.cache.add_chunk(new_id, {}, data, stats=self.stats, ro_type=ROBJ_ARCHIVE_META)
self.manifest.archives[self.name] = (new_id, metadata.time)
self.cache.chunk_decref(self.id, self.stats)
self.cache.chunk_decref(self.id, 1, self.stats)
self.id = new_id
def rename(self, name):
@ -1052,12 +1052,15 @@ Duration: {0.duration}
error = True
return exception_ignored # must not return None here
def chunk_decref(id, stats):
def chunk_decref(id, size, stats):
try:
self.cache.chunk_decref(id, stats, wait=False)
self.cache.chunk_decref(id, size, stats, wait=False)
except KeyError:
cid = bin_to_hex(id)
raise ChunksIndexError(cid)
nonlocal error
if forced == 0:
cid = bin_to_hex(id)
raise ChunksIndexError(cid)
error = True
else:
fetch_async_response(wait=False)
@ -1073,13 +1076,13 @@ Duration: {0.duration}
pi.show(i)
_, data = self.repo_objs.parse(items_id, data, ro_type=ROBJ_ARCHIVE_STREAM)
unpacker.feed(data)
chunk_decref(items_id, stats)
chunk_decref(items_id, 1, stats)
try:
for item in unpacker:
item = Item(internal_dict=item)
if "chunks" in item:
for chunk_id, size in item.chunks:
chunk_decref(chunk_id, stats)
chunk_decref(chunk_id, size, stats)
except (TypeError, ValueError):
# if items metadata spans multiple chunks and one chunk got dropped somehow,
# it could be that unpacker yields bad types
@ -1096,12 +1099,12 @@ Duration: {0.duration}
# delete the blocks that store all the references that end up being loaded into metadata.items:
for id in self.metadata.item_ptrs:
chunk_decref(id, stats)
chunk_decref(id, 1, stats)
# in forced delete mode, we try hard to delete at least the manifest entry,
# if possible also the archive superblock, even if processing the items raises
# some harmless exception.
chunk_decref(self.id, stats)
chunk_decref(self.id, 1, stats)
del self.manifest.archives[self.name]
while fetch_async_response(wait=True) is not None:
# we did async deletes, process outstanding results (== exceptions),
@ -1510,7 +1513,7 @@ class FilesystemObjectProcessors:
except BackupOSError:
# see comments in process_file's exception handler, same issue here.
for chunk in item.get("chunks", []):
cache.chunk_decref(chunk.id, self.stats, wait=False)
cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False)
raise
else:
item.get_size(memorize=True)
@ -1544,7 +1547,7 @@ class FilesystemObjectProcessors:
item.chunks = []
for chunk_id, chunk_size in hl_chunks:
# process one-by-one, so we will know in item.chunks how far we got
chunk_entry = cache.chunk_incref(chunk_id, self.stats)
chunk_entry = cache.chunk_incref(chunk_id, chunk_size, self.stats)
item.chunks.append(chunk_entry)
else: # normal case, no "2nd+" hardlink
if not is_special_file:
@ -1552,26 +1555,26 @@ class FilesystemObjectProcessors:
started_hashing = time.monotonic()
path_hash = self.key.id_hash(hashed_path)
self.stats.hashing_time += time.monotonic() - started_hashing
known, ids = cache.file_known_and_unchanged(hashed_path, path_hash, st)
known, chunks = cache.file_known_and_unchanged(hashed_path, path_hash, st)
else:
# in --read-special mode, we may be called for special files.
# there should be no information in the cache about special files processed in
# read-special mode, but we better play safe as this was wrong in the past:
hashed_path = path_hash = None
known, ids = False, None
if ids is not None:
known, chunks = False, None
if chunks is not None:
# Make sure all ids are available
for id_ in ids:
if not cache.seen_chunk(id_):
for chunk in chunks:
if not cache.seen_chunk(chunk.id):
# cache said it is unmodified, but we lost a chunk: process file like modified
status = "M"
break
else:
item.chunks = []
for chunk_id in ids:
for chunk in chunks:
# process one-by-one, so we will know in item.chunks how far we got
chunk_entry = cache.chunk_incref(chunk_id, self.stats)
item.chunks.append(chunk_entry)
cache.chunk_incref(chunk.id, chunk.size, self.stats)
item.chunks.append(chunk)
status = "U" # regular file, unchanged
else:
status = "M" if known else "A" # regular file, modified or added
@ -1606,7 +1609,7 @@ class FilesystemObjectProcessors:
# block or char device will change without its mtime/size/inode changing.
# also, we must not memorize a potentially inconsistent/corrupt file that
# changed while we backed it up.
cache.memorize_file(hashed_path, path_hash, st, [c.id for c in item.chunks])
cache.memorize_file(hashed_path, path_hash, st, item.chunks)
self.stats.files_stats[status] += 1 # must be done late
if not changed_while_backup:
status = None # we already called print_file_status
@ -1620,7 +1623,7 @@ class FilesystemObjectProcessors:
# but we will not add an item (see add_item in create_helper) and thus
# they would be orphaned chunks in case that we commit the transaction.
for chunk in item.get("chunks", []):
cache.chunk_decref(chunk.id, self.stats, wait=False)
cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False)
# Now that we have cleaned up the chunk references, we can re-raise the exception.
# This will skip processing of this file, but might retry or continue with the next one.
raise
@ -1731,7 +1734,7 @@ class TarfileObjectProcessors:
except BackupOSError:
# see comment in FilesystemObjectProcessors.process_file, same issue here.
for chunk in item.get("chunks", []):
self.cache.chunk_decref(chunk.id, self.stats, wait=False)
self.cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False)
raise
@ -2328,10 +2331,10 @@ class ArchiveChecker:
unused = {id_ for id_, entry in self.chunks.iteritems() if entry.refcount == 0}
orphaned = unused - self.possibly_superseded
if orphaned:
logger.error(f"{len(orphaned)} orphaned objects found!")
logger.info(f"{len(orphaned)} orphaned (unused) objects found.")
for chunk_id in orphaned:
logger.debug(f"chunk {bin_to_hex(chunk_id)} is orphaned.")
self.error_found = True
# To support working with AdHocCache or NewCache, we do not set self.error_found = True.
if self.repair and unused:
logger.info(
"Deleting %d orphaned and %d superseded objects..." % (len(orphaned), len(self.possibly_superseded))
@ -2444,7 +2447,7 @@ class ArchiveRecreater:
def process_chunks(self, archive, target, item):
if not target.recreate_rechunkify:
for chunk_id, size in item.chunks:
self.cache.chunk_incref(chunk_id, target.stats)
self.cache.chunk_incref(chunk_id, size, target.stats)
return item.chunks
chunk_iterator = self.iter_chunks(archive, target, list(item.chunks))
chunk_processor = partial(self.chunk_processor, target)
@ -2452,8 +2455,9 @@ class ArchiveRecreater:
def chunk_processor(self, target, chunk):
chunk_id, data = cached_hash(chunk, self.key.id_hash)
size = len(data)
if chunk_id in self.seen_chunks:
return self.cache.chunk_incref(chunk_id, target.stats)
return self.cache.chunk_incref(chunk_id, size, target.stats)
chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, wait=False, ro_type=ROBJ_FILE_STREAM)
self.cache.repository.async_response(wait=False)
self.seen_chunks.add(chunk_entry.id)

View File

@ -225,6 +225,7 @@ class CreateMixIn:
progress=args.progress,
lock_wait=self.lock_wait,
permit_adhoc_cache=args.no_cache_sync,
force_adhoc_cache=args.no_cache_sync_forced,
cache_mode=args.files_cache_mode,
iec=args.iec,
) as cache:
@ -803,6 +804,12 @@ class CreateMixIn:
action="store_true",
help="experimental: do not synchronize the cache. Implies not using the files cache.",
)
subparser.add_argument(
"--no-cache-sync-forced",
dest="no_cache_sync_forced",
action="store_true",
help="experimental: do not synchronize the cache (forced). Implies not using the files cache.",
)
subparser.add_argument(
"--stdin-name",
metavar="NAME",

View File

@ -59,16 +59,9 @@ class RInfoMixIn:
output += f" out of {format_file_size(storage_quota, iec=args.iec)}"
output += "\n"
output += (
textwrap.dedent(
"""
Cache: {cache.path}
Security dir: {security_dir}
"""
)
.strip()
.format(**info)
)
if hasattr(info["cache"], "path"):
output += "Cache: {cache.path}\n".format(**info)
output += "Security dir: {security_dir}\n".format(**info)
print(output)
print(str(cache))

View File

@ -143,7 +143,7 @@ class TransferMixIn:
transfer_size += size
else:
if not dry_run:
chunk_entry = cache.chunk_incref(chunk_id, archive.stats)
chunk_entry = cache.chunk_incref(chunk_id, size, archive.stats)
chunks.append(chunk_entry)
present_size += size
if not dry_run:

View File

@ -35,8 +35,8 @@ from .platform import SaveFile
from .remote import cache_if_remote
from .repository import LIST_SCAN_LIMIT
# note: cmtime might me either a ctime or a mtime timestamp
FileCacheEntry = namedtuple("FileCacheEntry", "age inode size cmtime chunk_ids")
# note: cmtime might be either a ctime or a mtime timestamp, chunks is a list of ChunkListEntry
FileCacheEntry = namedtuple("FileCacheEntry", "age inode size cmtime chunks")
class SecurityManager:
@ -248,15 +248,6 @@ def cache_dir(repository, path=None):
return path or os.path.join(get_cache_dir(), repository.id_str)
def files_cache_name():
suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "")
return "files." + suffix if suffix else "files"
def discover_files_cache_name(path):
return [fn for fn in os.listdir(path) if fn == "files" or fn.startswith("files.")][0]
class CacheConfig:
def __init__(self, repository, path=None, lock_wait=None):
self.repository = repository
@ -413,6 +404,7 @@ class Cache:
progress=False,
lock_wait=None,
permit_adhoc_cache=False,
force_adhoc_cache=False,
cache_mode=FILES_CACHE_MODE_DISABLED,
iec=False,
):
@ -428,9 +420,23 @@ class Cache:
cache_mode=cache_mode,
)
def newcache():
return NewCache(
manifest=manifest,
path=path,
warn_if_unencrypted=warn_if_unencrypted,
progress=progress,
iec=iec,
lock_wait=lock_wait,
cache_mode=cache_mode,
)
def adhoc():
return AdHocCache(manifest=manifest, lock_wait=lock_wait, iec=iec)
if force_adhoc_cache:
return adhoc()
if not permit_adhoc_cache:
return local()
@ -489,7 +495,302 @@ Total chunks: {0.total_chunks}
return self.Summary(**stats)
class LocalCache(CacheStatsMixin):
class FilesCacheMixin:
"""
Massively accelerate processing of unchanged files by caching their chunks list.
With that, we can avoid having to read and chunk them to get their chunks list.
"""
FILES_CACHE_NAME = "files"
def __init__(self, cache_mode):
self.cache_mode = cache_mode
self.files = None
self._newest_cmtime = None
def files_cache_name(self):
suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "")
return self.FILES_CACHE_NAME + "." + suffix if suffix else self.FILES_CACHE_NAME
def discover_files_cache_name(self, path):
return [
fn for fn in os.listdir(path) if fn == self.FILES_CACHE_NAME or fn.startswith(self.FILES_CACHE_NAME + ".")
][0]
def _create_empty_files_cache(self, path):
with SaveFile(os.path.join(path, self.files_cache_name()), binary=True):
pass # empty file
def _read_files_cache(self):
if "d" in self.cache_mode: # d(isabled)
return
self.files = {}
logger.debug("Reading files cache ...")
files_cache_logger.debug("FILES-CACHE-LOAD: starting...")
msg = None
try:
with IntegrityCheckedFile(
path=os.path.join(self.path, self.files_cache_name()),
write=False,
integrity_data=self.cache_config.integrity.get(self.files_cache_name()),
) as fd:
u = msgpack.Unpacker(use_list=True)
while True:
data = fd.read(64 * 1024)
if not data:
break
u.feed(data)
try:
for path_hash, item in u:
entry = FileCacheEntry(*item)
# in the end, this takes about 240 Bytes per file
self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1))
except (TypeError, ValueError) as exc:
msg = "The files cache seems invalid. [%s]" % str(exc)
break
except OSError as exc:
msg = "The files cache can't be read. [%s]" % str(exc)
except FileIntegrityError as fie:
msg = "The files cache is corrupted. [%s]" % str(fie)
if msg is not None:
logger.warning(msg)
logger.warning("Continuing without files cache - expect lower performance.")
self.files = {}
files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files))
def _write_files_cache(self):
if self._newest_cmtime is None:
# was never set because no files were modified/added
self._newest_cmtime = 2**63 - 1 # nanoseconds, good until y2262
ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20))
files_cache_logger.debug("FILES-CACHE-SAVE: starting...")
with IntegrityCheckedFile(path=os.path.join(self.path, self.files_cache_name()), write=True) as fd:
entry_count = 0
for path_hash, item in self.files.items():
# Only keep files seen in this backup that are older than newest cmtime seen in this backup -
# this is to avoid issues with filesystem snapshots and cmtime granularity.
# Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
entry = FileCacheEntry(*msgpack.unpackb(item))
if (
entry.age == 0
and timestamp_to_int(entry.cmtime) < self._newest_cmtime
or entry.age > 0
and entry.age < ttl
):
msgpack.pack((path_hash, entry), fd)
entry_count += 1
files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl)
files_cache_logger.debug(
"FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime
)
files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count)
return fd.integrity_data
def file_known_and_unchanged(self, hashed_path, path_hash, st):
"""
Check if we know the file that has this path_hash (know == it is in our files cache) and
whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode).
:param hashed_path: the file's path as we gave it to hash(hashed_path)
:param path_hash: hash(hashed_path), to save some memory in the files cache
:param st: the file's stat() result
:return: known, chunks (known is True if we have infos about this file in the cache,
chunks is a list[ChunkListEntry] IF the file has not changed, otherwise None).
"""
if not stat.S_ISREG(st.st_mode):
return False, None
cache_mode = self.cache_mode
if "d" in cache_mode: # d(isabled)
files_cache_logger.debug("UNKNOWN: files cache disabled")
return False, None
# note: r(echunk) does not need the files cache in this method, but the files cache will
# be updated and saved to disk to memorize the files. To preserve previous generations in
# the cache, this means that it also needs to get loaded from disk first.
if "r" in cache_mode: # r(echunk)
files_cache_logger.debug("UNKNOWN: rechunking enforced")
return False, None
entry = self.files.get(path_hash)
if not entry:
files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path)
return False, None
# we know the file!
entry = FileCacheEntry(*msgpack.unpackb(entry))
if "s" in cache_mode and entry.size != st.st_size:
files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path)
return True, None
if "i" in cache_mode and entry.inode != st.st_ino:
files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path)
return True, None
if "c" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns:
files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path)
return True, None
elif "m" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns:
files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path)
return True, None
# we ignored the inode number in the comparison above or it is still same.
# if it is still the same, replacing it in the tuple doesn't change it.
# if we ignored it, a reason for doing that is that files were moved to a new
# disk / new fs (so a one-time change of inode number is expected) and we wanted
# to avoid everything getting chunked again. to be able to re-enable the inode
# number comparison in a future backup run (and avoid chunking everything
# again at that time), we need to update the inode number in the cache with what
# we see in the filesystem.
self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0))
chunks = [ChunkListEntry(*chunk) for chunk in entry.chunks] # convert to list of namedtuple
return True, chunks
def memorize_file(self, hashed_path, path_hash, st, chunks):
if not stat.S_ISREG(st.st_mode):
return
cache_mode = self.cache_mode
# note: r(echunk) modes will update the files cache, d(isabled) mode won't
if "d" in cache_mode:
files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled")
return
if "c" in cache_mode:
cmtime_type = "ctime"
cmtime_ns = safe_ns(st.st_ctime_ns)
elif "m" in cache_mode:
cmtime_type = "mtime"
cmtime_ns = safe_ns(st.st_mtime_ns)
else: # neither 'c' nor 'm' in cache_mode, avoid UnboundLocalError
cmtime_type = "ctime"
cmtime_ns = safe_ns(st.st_ctime_ns)
entry = FileCacheEntry(
age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunks=chunks
)
self.files[path_hash] = msgpack.packb(entry)
self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns)
files_cache_logger.debug(
"FILES-CACHE-UPDATE: put %r [has %s] <- %r",
entry._replace(chunks="[%d entries]" % len(entry.chunks)),
cmtime_type,
hashed_path,
)
class ChunksMixin:
"""
Chunks index related code for misc. Cache implementations.
"""
def chunk_incref(self, id, size, stats):
assert isinstance(size, int) and size > 0
if not self._txn_active:
self.begin_txn()
count, _size = self.chunks.incref(id)
stats.update(size, False)
return ChunkListEntry(id, size)
def chunk_decref(self, id, size, stats, wait=True):
assert isinstance(size, int) and size > 0
if not self._txn_active:
self.begin_txn()
count, _size = self.chunks.decref(id)
if count == 0:
del self.chunks[id]
self.repository.delete(id, wait=wait)
stats.update(-size, True)
else:
stats.update(-size, False)
def seen_chunk(self, id, size=None):
if not self._txn_active:
self.begin_txn()
entry = self.chunks.get(id, ChunkIndexEntry(0, None))
if entry.refcount and size is not None:
assert isinstance(entry.size, int)
if entry.size:
# LocalCache: has existing size information and uses *size* to make an effort at detecting collisions.
if size != entry.size:
# we already have a chunk with that id, but different size.
# this is either a hash collision (unlikely) or corruption or a bug.
raise Exception(
"chunk has same id [%r], but different size (stored: %d new: %d)!" % (id, entry.size, size)
)
else:
# NewCache / AdHocCache:
# Here *size* is used to update the chunk's size information, which will be zero for existing chunks.
self.chunks[id] = entry._replace(size=size)
return entry.refcount
def add_chunk(
self,
id,
meta,
data,
*,
stats,
wait=True,
compress=True,
size=None,
ctype=None,
clevel=None,
ro_type=ROBJ_FILE_STREAM,
):
assert ro_type is not None
if not self._txn_active:
self.begin_txn()
if size is None:
if compress:
size = len(data) # data is still uncompressed
else:
raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
refcount = self.seen_chunk(id, size)
if refcount:
return self.chunk_incref(id, size, stats)
cdata = self.repo_objs.format(
id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type
)
self.repository.put(id, cdata, wait=wait)
self.chunks.add(id, 1, size)
stats.update(size, not refcount)
return ChunkListEntry(id, size)
def _load_chunks_from_repo(self):
# Explicitly set the initial usable hash table capacity to avoid performance issues
# due to hash table "resonance".
# Since we're creating an archive, add 10 % from the start.
num_chunks = len(self.repository)
chunks = ChunkIndex(usable=num_chunks * 1.1)
pi = ProgressIndicatorPercent(
total=num_chunks, msg="Downloading chunk list... %3.0f%%", msgid="cache.download_chunks"
)
t0 = perf_counter()
num_requests = 0
marker = None
while True:
result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
num_requests += 1
if not result:
break
pi.show(increase=len(result))
marker = result[-1]
# All chunks from the repository have a refcount of MAX_VALUE, which is sticky,
# therefore we can't/won't delete them. Chunks we added ourselves in this transaction
# (e.g. checkpoint archives) are tracked correctly.
init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
for id_ in result:
chunks[id_] = init_entry
assert len(chunks) == num_chunks
# LocalCache does not contain the manifest, either.
del chunks[self.manifest.MANIFEST_ID]
duration = perf_counter() - t0 or 0.01
pi.finish()
logger.debug(
"Cache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s",
num_chunks,
duration,
num_requests,
format_file_size(num_chunks * 34 / duration),
)
# Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes.
# Protocol overhead is neglected in this calculation.
return chunks
class LocalCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin):
"""
Persistent, local (client-side) cache.
"""
@ -512,15 +813,15 @@ class LocalCache(CacheStatsMixin):
:param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison
"""
CacheStatsMixin.__init__(self, iec=iec)
FilesCacheMixin.__init__(self, cache_mode)
assert isinstance(manifest, Manifest)
self.manifest = manifest
self.repository = manifest.repository
self.key = manifest.key
self.repo_objs = manifest.repo_objs
self.progress = progress
self.cache_mode = cache_mode
self.timestamp = None
self.txn_active = False
self._txn_active = False
self.path = cache_dir(self.repository, path)
self.security_manager = SecurityManager(self.repository)
@ -561,8 +862,7 @@ class LocalCache(CacheStatsMixin):
self.cache_config.create()
ChunkIndex().write(os.path.join(self.path, "chunks"))
os.makedirs(os.path.join(self.path, "chunks.archive.d"))
with SaveFile(os.path.join(self.path, files_cache_name()), binary=True):
pass # empty file
self._create_empty_files_cache(self.path)
def _do_open(self):
self.cache_config.load()
@ -572,10 +872,7 @@ class LocalCache(CacheStatsMixin):
integrity_data=self.cache_config.integrity.get("chunks"),
) as fd:
self.chunks = ChunkIndex.read(fd)
if "d" in self.cache_mode: # d(isabled)
self.files = None
else:
self._read_files()
self._read_files_cache()
def open(self):
if not os.path.isdir(self.path):
@ -588,42 +885,6 @@ class LocalCache(CacheStatsMixin):
self.cache_config.close()
self.cache_config = None
def _read_files(self):
self.files = {}
self._newest_cmtime = None
logger.debug("Reading files cache ...")
files_cache_logger.debug("FILES-CACHE-LOAD: starting...")
msg = None
try:
with IntegrityCheckedFile(
path=os.path.join(self.path, files_cache_name()),
write=False,
integrity_data=self.cache_config.integrity.get(files_cache_name()),
) as fd:
u = msgpack.Unpacker(use_list=True)
while True:
data = fd.read(64 * 1024)
if not data:
break
u.feed(data)
try:
for path_hash, item in u:
entry = FileCacheEntry(*item)
# in the end, this takes about 240 Bytes per file
self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1))
except (TypeError, ValueError) as exc:
msg = "The files cache seems invalid. [%s]" % str(exc)
break
except OSError as exc:
msg = "The files cache can't be read. [%s]" % str(exc)
except FileIntegrityError as fie:
msg = "The files cache is corrupted. [%s]" % str(fie)
if msg is not None:
logger.warning(msg)
logger.warning("Continuing without files cache - expect lower performance.")
self.files = {}
files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files))
def begin_txn(self):
# Initialize transaction snapshot
pi = ProgressIndicatorMessage(msgid="cache.begin_transaction")
@ -635,48 +896,23 @@ class LocalCache(CacheStatsMixin):
shutil.copy(os.path.join(self.path, "chunks"), txn_dir)
pi.output("Initializing cache transaction: Reading files")
try:
shutil.copy(os.path.join(self.path, files_cache_name()), txn_dir)
shutil.copy(os.path.join(self.path, self.files_cache_name()), txn_dir)
except FileNotFoundError:
with SaveFile(os.path.join(txn_dir, files_cache_name()), binary=True):
pass # empty file
self._create_empty_files_cache(txn_dir)
os.replace(txn_dir, os.path.join(self.path, "txn.active"))
self.txn_active = True
self._txn_active = True
pi.finish()
def commit(self):
"""Commit transaction"""
if not self.txn_active:
if not self._txn_active:
return
self.security_manager.save(self.manifest, self.key)
pi = ProgressIndicatorMessage(msgid="cache.commit")
if self.files is not None:
if self._newest_cmtime is None:
# was never set because no files were modified/added
self._newest_cmtime = 2**63 - 1 # nanoseconds, good until y2262
ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20))
pi.output("Saving files cache")
files_cache_logger.debug("FILES-CACHE-SAVE: starting...")
with IntegrityCheckedFile(path=os.path.join(self.path, files_cache_name()), write=True) as fd:
entry_count = 0
for path_hash, item in self.files.items():
# Only keep files seen in this backup that are older than newest cmtime seen in this backup -
# this is to avoid issues with filesystem snapshots and cmtime granularity.
# Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
entry = FileCacheEntry(*msgpack.unpackb(item))
if (
entry.age == 0
and timestamp_to_int(entry.cmtime) < self._newest_cmtime
or entry.age > 0
and entry.age < ttl
):
msgpack.pack((path_hash, entry), fd)
entry_count += 1
files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl)
files_cache_logger.debug(
"FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime
)
files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count)
self.cache_config.integrity[files_cache_name()] = fd.integrity_data
integrity_data = self._write_files_cache()
self.cache_config.integrity[self.files_cache_name()] = integrity_data
pi.output("Saving chunks cache")
with IntegrityCheckedFile(path=os.path.join(self.path, "chunks"), write=True) as fd:
self.chunks.write(fd)
@ -685,7 +921,7 @@ class LocalCache(CacheStatsMixin):
self.cache_config.save(self.manifest, self.key)
os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp"))
shutil.rmtree(os.path.join(self.path, "txn.tmp"))
self.txn_active = False
self._txn_active = False
pi.finish()
def rollback(self):
@ -698,12 +934,12 @@ class LocalCache(CacheStatsMixin):
if os.path.exists(txn_dir):
shutil.copy(os.path.join(txn_dir, "config"), self.path)
shutil.copy(os.path.join(txn_dir, "chunks"), self.path)
shutil.copy(os.path.join(txn_dir, discover_files_cache_name(txn_dir)), self.path)
shutil.copy(os.path.join(txn_dir, self.discover_files_cache_name(txn_dir)), self.path)
txn_tmp = os.path.join(self.path, "txn.tmp")
os.replace(txn_dir, txn_tmp)
if os.path.exists(txn_tmp):
shutil.rmtree(txn_tmp)
self.txn_active = False
self._txn_active = False
self._do_open()
def sync(self):
@ -930,8 +1166,7 @@ class LocalCache(CacheStatsMixin):
shutil.rmtree(os.path.join(self.path, "chunks.archive.d"))
os.makedirs(os.path.join(self.path, "chunks.archive.d"))
self.chunks = ChunkIndex()
with SaveFile(os.path.join(self.path, files_cache_name()), binary=True):
pass # empty file
self._create_empty_files_cache(self.path)
self.cache_config.manifest_id = ""
self.cache_config._config.set("cache", "manifest", "")
@ -948,149 +1183,171 @@ class LocalCache(CacheStatsMixin):
self.cache_config.ignored_features.update(repo_features - my_features)
self.cache_config.mandatory_features.update(repo_features & my_features)
def add_chunk(
class NewCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin):
"""
Like AdHocCache, but with a files cache.
"""
def __init__(
self,
id,
meta,
data,
*,
stats,
wait=True,
compress=True,
size=None,
ctype=None,
clevel=None,
ro_type=ROBJ_FILE_STREAM,
manifest,
path=None,
warn_if_unencrypted=True,
progress=False,
lock_wait=None,
cache_mode=FILES_CACHE_MODE_DISABLED,
iec=False,
):
assert ro_type is not None
if not self.txn_active:
self.begin_txn()
if size is None and compress:
size = len(data) # data is still uncompressed
refcount = self.seen_chunk(id, size)
if refcount:
return self.chunk_incref(id, stats)
if size is None:
raise ValueError("when giving compressed data for a new chunk, the uncompressed size must be given also")
cdata = self.repo_objs.format(
id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type
)
self.repository.put(id, cdata, wait=wait)
self.chunks.add(id, 1, size)
stats.update(size, not refcount)
return ChunkListEntry(id, size)
def seen_chunk(self, id, size=None):
refcount, stored_size = self.chunks.get(id, ChunkIndexEntry(0, None))
if size is not None and stored_size is not None and size != stored_size:
# we already have a chunk with that id, but different size.
# this is either a hash collision (unlikely) or corruption or a bug.
raise Exception(
"chunk has same id [%r], but different size (stored: %d new: %d)!" % (id, stored_size, size)
)
return refcount
def chunk_incref(self, id, stats, size=None):
if not self.txn_active:
self.begin_txn()
count, _size = self.chunks.incref(id)
stats.update(_size, False)
return ChunkListEntry(id, _size)
def chunk_decref(self, id, stats, wait=True):
if not self.txn_active:
self.begin_txn()
count, size = self.chunks.decref(id)
if count == 0:
del self.chunks[id]
self.repository.delete(id, wait=wait)
stats.update(-size, True)
else:
stats.update(-size, False)
def file_known_and_unchanged(self, hashed_path, path_hash, st):
"""
Check if we know the file that has this path_hash (know == it is in our files cache) and
whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode).
:param hashed_path: the file's path as we gave it to hash(hashed_path)
:param path_hash: hash(hashed_path), to save some memory in the files cache
:param st: the file's stat() result
:return: known, ids (known is True if we have infos about this file in the cache,
ids is the list of chunk ids IF the file has not changed, otherwise None).
:param warn_if_unencrypted: print warning if accessing unknown unencrypted repository
:param lock_wait: timeout for lock acquisition (int [s] or None [wait forever])
:param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison
"""
if not stat.S_ISREG(st.st_mode):
return False, None
cache_mode = self.cache_mode
if "d" in cache_mode: # d(isabled)
files_cache_logger.debug("UNKNOWN: files cache disabled")
return False, None
# note: r(echunk) does not need the files cache in this method, but the files cache will
# be updated and saved to disk to memorize the files. To preserve previous generations in
# the cache, this means that it also needs to get loaded from disk first.
if "r" in cache_mode: # r(echunk)
files_cache_logger.debug("UNKNOWN: rechunking enforced")
return False, None
entry = self.files.get(path_hash)
if not entry:
files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path)
return False, None
# we know the file!
entry = FileCacheEntry(*msgpack.unpackb(entry))
if "s" in cache_mode and entry.size != st.st_size:
files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path)
return True, None
if "i" in cache_mode and entry.inode != st.st_ino:
files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path)
return True, None
if "c" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns:
files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path)
return True, None
elif "m" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns:
files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path)
return True, None
# we ignored the inode number in the comparison above or it is still same.
# if it is still the same, replacing it in the tuple doesn't change it.
# if we ignored it, a reason for doing that is that files were moved to a new
# disk / new fs (so a one-time change of inode number is expected) and we wanted
# to avoid everything getting chunked again. to be able to re-enable the inode
# number comparison in a future backup run (and avoid chunking everything
# again at that time), we need to update the inode number in the cache with what
# we see in the filesystem.
self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0))
return True, entry.chunk_ids
CacheStatsMixin.__init__(self, iec=iec)
FilesCacheMixin.__init__(self, cache_mode)
assert isinstance(manifest, Manifest)
self.manifest = manifest
self.repository = manifest.repository
self.key = manifest.key
self.repo_objs = manifest.repo_objs
self.progress = progress
self.timestamp = None
self._txn_active = False
def memorize_file(self, hashed_path, path_hash, st, ids):
if not stat.S_ISREG(st.st_mode):
self.path = cache_dir(self.repository, path)
self.security_manager = SecurityManager(self.repository)
self.cache_config = CacheConfig(self.repository, self.path, lock_wait)
# Warn user before sending data to a never seen before unencrypted repository
if not os.path.exists(self.path):
self.security_manager.assert_access_unknown(warn_if_unencrypted, manifest, self.key)
self.create()
self.open()
try:
self.security_manager.assert_secure(manifest, self.key, cache_config=self.cache_config)
if not self.check_cache_compatibility():
self.wipe_cache()
self.update_compatibility()
except: # noqa
self.close()
raise
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def create(self):
"""Create a new empty cache at `self.path`"""
os.makedirs(self.path)
with open(os.path.join(self.path, "README"), "w") as fd:
fd.write(CACHE_README)
self.cache_config.create()
self._create_empty_files_cache(self.path)
def _do_open(self):
self.cache_config.load()
self.chunks = self._load_chunks_from_repo()
self._read_files_cache()
def open(self):
if not os.path.isdir(self.path):
raise Exception("%s Does not look like a Borg cache" % self.path)
self.cache_config.open()
self.rollback()
def close(self):
if self.cache_config is not None:
self.cache_config.close()
self.cache_config = None
def begin_txn(self):
# Initialize transaction snapshot
pi = ProgressIndicatorMessage(msgid="cache.begin_transaction")
txn_dir = os.path.join(self.path, "txn.tmp")
os.mkdir(txn_dir)
pi.output("Initializing cache transaction: Reading config")
shutil.copy(os.path.join(self.path, "config"), txn_dir)
pi.output("Initializing cache transaction: Reading files")
try:
shutil.copy(os.path.join(self.path, self.files_cache_name()), txn_dir)
except FileNotFoundError:
self._create_empty_files_cache(txn_dir)
os.replace(txn_dir, os.path.join(self.path, "txn.active"))
pi.finish()
self._txn_active = True
def commit(self):
if not self._txn_active:
return
cache_mode = self.cache_mode
# note: r(echunk) modes will update the files cache, d(isabled) mode won't
if "d" in cache_mode:
files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled")
return
if "c" in cache_mode:
cmtime_type = "ctime"
cmtime_ns = safe_ns(st.st_ctime_ns)
elif "m" in cache_mode:
cmtime_type = "mtime"
cmtime_ns = safe_ns(st.st_mtime_ns)
else: # neither 'c' nor 'm' in cache_mode, avoid UnboundLocalError
cmtime_type = "ctime"
cmtime_ns = safe_ns(st.st_ctime_ns)
entry = FileCacheEntry(
age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunk_ids=ids
)
self.files[path_hash] = msgpack.packb(entry)
self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns)
files_cache_logger.debug(
"FILES-CACHE-UPDATE: put %r [has %s] <- %r",
entry._replace(chunk_ids="[%d entries]" % len(entry.chunk_ids)),
cmtime_type,
hashed_path,
)
self.security_manager.save(self.manifest, self.key)
pi = ProgressIndicatorMessage(msgid="cache.commit")
if self.files is not None:
pi.output("Saving files cache")
integrity_data = self._write_files_cache()
self.cache_config.integrity[self.files_cache_name()] = integrity_data
pi.output("Saving cache config")
self.cache_config.save(self.manifest, self.key)
os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp"))
shutil.rmtree(os.path.join(self.path, "txn.tmp"))
self._txn_active = False
pi.finish()
def rollback(self):
# Remove partial transaction
if os.path.exists(os.path.join(self.path, "txn.tmp")):
shutil.rmtree(os.path.join(self.path, "txn.tmp"))
# Roll back active transaction
txn_dir = os.path.join(self.path, "txn.active")
if os.path.exists(txn_dir):
shutil.copy(os.path.join(txn_dir, "config"), self.path)
shutil.copy(os.path.join(txn_dir, self.discover_files_cache_name(txn_dir)), self.path)
txn_tmp = os.path.join(self.path, "txn.tmp")
os.replace(txn_dir, txn_tmp)
if os.path.exists(txn_tmp):
shutil.rmtree(txn_tmp)
self._txn_active = False
self._do_open()
def check_cache_compatibility(self):
my_features = Manifest.SUPPORTED_REPO_FEATURES
if self.cache_config.ignored_features & my_features:
# The cache might not contain references of chunks that need a feature that is mandatory for some operation
# and which this version supports. To avoid corruption while executing that operation force rebuild.
return False
if not self.cache_config.mandatory_features <= my_features:
# The cache was build with consideration to at least one feature that this version does not understand.
# This client might misinterpret the cache. Thus force a rebuild.
return False
return True
def wipe_cache(self):
logger.warning("Discarding incompatible cache and forcing a cache rebuild")
self.chunks = ChunkIndex()
self._create_empty_files_cache(self.path)
self.cache_config.manifest_id = ""
self.cache_config._config.set("cache", "manifest", "")
self.cache_config.ignored_features = set()
self.cache_config.mandatory_features = set()
def update_compatibility(self):
operation_to_features_map = self.manifest.get_all_mandatory_features()
my_features = Manifest.SUPPORTED_REPO_FEATURES
repo_features = set()
for operation, features in operation_to_features_map.items():
repo_features.update(features)
self.cache_config.ignored_features.update(repo_features - my_features)
self.cache_config.mandatory_features.update(repo_features & my_features)
class AdHocCache(CacheStatsMixin):
class AdHocCache(CacheStatsMixin, ChunksMixin):
"""
Ad-hoc, non-persistent cache.
@ -1135,59 +1392,9 @@ Chunk index: {0.total_unique_chunks:20d} unknown"""
files_cache_logger.debug("UNKNOWN: files cache not implemented")
return False, None
def memorize_file(self, hashed_path, path_hash, st, ids):
def memorize_file(self, hashed_path, path_hash, st, chunks):
pass
def add_chunk(self, id, meta, data, *, stats, wait=True, compress=True, size=None, ro_type=ROBJ_FILE_STREAM):
assert ro_type is not None
if not self._txn_active:
self.begin_txn()
if size is None and compress:
size = len(data) # data is still uncompressed
if size is None:
raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
refcount = self.seen_chunk(id, size)
if refcount:
return self.chunk_incref(id, stats, size=size)
cdata = self.repo_objs.format(id, meta, data, compress=compress, ro_type=ro_type)
self.repository.put(id, cdata, wait=wait)
self.chunks.add(id, 1, size)
stats.update(size, not refcount)
return ChunkListEntry(id, size)
def seen_chunk(self, id, size=None):
if not self._txn_active:
self.begin_txn()
entry = self.chunks.get(id, ChunkIndexEntry(0, None))
if entry.refcount and size and not entry.size:
# The LocalCache has existing size information and uses *size* to make an effort at detecting collisions.
# This is of course not possible for the AdHocCache.
# Here *size* is used to update the chunk's size information, which will be zero for existing chunks.
self.chunks[id] = entry._replace(size=size)
return entry.refcount
def chunk_incref(self, id, stats, size=None):
if not self._txn_active:
self.begin_txn()
count, _size = self.chunks.incref(id)
# When _size is 0 and size is not given, then this chunk has not been locally visited yet (seen_chunk with
# size or add_chunk); we can't add references to those (size=0 is invalid) and generally don't try to.
size = _size or size
assert size
stats.update(size, False)
return ChunkListEntry(id, size)
def chunk_decref(self, id, stats, wait=True):
if not self._txn_active:
self.begin_txn()
count, size = self.chunks.decref(id)
if count == 0:
del self.chunks[id]
self.repository.delete(id, wait=wait)
stats.update(-size, True)
else:
stats.update(-size, False)
def commit(self):
if not self._txn_active:
return
@ -1200,41 +1407,4 @@ Chunk index: {0.total_unique_chunks:20d} unknown"""
def begin_txn(self):
self._txn_active = True
# Explicitly set the initial usable hash table capacity to avoid performance issues
# due to hash table "resonance".
# Since we're creating an archive, add 10 % from the start.
num_chunks = len(self.repository)
self.chunks = ChunkIndex(usable=num_chunks * 1.1)
pi = ProgressIndicatorPercent(
total=num_chunks, msg="Downloading chunk list... %3.0f%%", msgid="cache.download_chunks"
)
t0 = perf_counter()
num_requests = 0
marker = None
while True:
result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
num_requests += 1
if not result:
break
pi.show(increase=len(result))
marker = result[-1]
# All chunks from the repository have a refcount of MAX_VALUE, which is sticky,
# therefore we can't/won't delete them. Chunks we added ourselves in this transaction
# (e.g. checkpoint archives) are tracked correctly.
init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
for id_ in result:
self.chunks[id_] = init_entry
assert len(self.chunks) == num_chunks
# LocalCache does not contain the manifest, either.
del self.chunks[self.manifest.MANIFEST_ID]
duration = perf_counter() - t0 or 0.01
pi.finish()
logger.debug(
"AdHocCache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s",
num_chunks,
duration,
num_requests,
format_file_size(num_chunks * 34 / duration),
)
# Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes.
# Protocol overhead is neglected in this calculation.
self.chunks = self._load_chunks_from_repo()

View File

@ -1184,13 +1184,13 @@ class BorgJsonEncoder(json.JSONEncoder):
from ..repository import Repository
from ..remote import RemoteRepository
from ..archive import Archive
from ..cache import LocalCache, AdHocCache
from ..cache import LocalCache, AdHocCache, NewCache
if isinstance(o, Repository) or isinstance(o, RemoteRepository):
return {"id": bin_to_hex(o.id), "location": o._location.canonical_path()}
if isinstance(o, Archive):
return o.info()
if isinstance(o, LocalCache):
if isinstance(o, (LocalCache, NewCache)):
return {"path": o.path, "stats": o.stats()}
if isinstance(o, AdHocCache):
return {"stats": o.stats()}

View File

@ -100,7 +100,7 @@ def format_timedelta(td):
s = ts % 60
m = int(ts / 60) % 60
h = int(ts / 3600) % 24
txt = "%.2f seconds" % s
txt = "%.3f seconds" % s
if m:
txt = "%d minutes %s" % (m, txt)
if h:

View File

@ -61,8 +61,8 @@ def test_stats_format(stats):
Number of files: 1
Original size: 20 B
Deduplicated size: 20 B
Time spent in hashing: 0.00 seconds
Time spent in chunking: 0.00 seconds
Time spent in hashing: 0.000 seconds
Time spent in chunking: 0.000 seconds
Added files: 0
Unchanged files: 0
Modified files: 0

View File

@ -338,10 +338,11 @@ def test_extra_chunks(archivers, request):
with Repository(archiver.repository_location, exclusive=True) as repository:
repository.put(b"01234567890123456789012345678901", b"xxxx")
repository.commit(compact=False)
cmd(archiver, "check", exit_code=1)
cmd(archiver, "check", exit_code=1)
output = cmd(archiver, "check", "-v", exit_code=0) # orphans are not considered warnings anymore
assert "1 orphaned (unused) objects found." in output
cmd(archiver, "check", "--repair", exit_code=0)
cmd(archiver, "check", exit_code=0)
output = cmd(archiver, "check", "-v", exit_code=0)
assert "orphaned (unused) objects found." not in output
cmd(archiver, "extract", "archive1", "--dry-run", exit_code=0)

View File

@ -277,6 +277,7 @@ def test_unknown_mandatory_feature_in_cache(archivers, request):
repository._location = Location(archiver.repository_location)
manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
with Cache(repository, manifest) as cache:
is_localcache = isinstance(cache, LocalCache)
cache.begin_txn()
cache.cache_config.mandatory_features = {"unknown-feature"}
cache.commit()
@ -295,7 +296,8 @@ def test_unknown_mandatory_feature_in_cache(archivers, request):
with patch.object(LocalCache, "wipe_cache", wipe_wrapper):
cmd(archiver, "create", "test", "input")
assert called
if is_localcache:
assert called
with Repository(archiver.repository_path, exclusive=True) as repository:
if remote_repo:

View File

@ -32,7 +32,7 @@ def test_check_corrupted_repository(archiver):
def corrupt_archiver(archiver):
create_test_files(archiver.input_path)
cmd(archiver, "rcreate", RK_ENCRYPTION)
archiver.cache_path = json.loads(cmd(archiver, "rinfo", "--json"))["cache"]["path"]
archiver.cache_path = json.loads(cmd(archiver, "rinfo", "--json"))["cache"].get("path")
def corrupt(file, amount=1):
@ -45,7 +45,14 @@ def corrupt(file, amount=1):
def test_cache_chunks(archiver):
corrupt_archiver(archiver)
corrupt(os.path.join(archiver.cache_path, "chunks"))
if archiver.cache_path is None:
pytest.skip("no cache path for this kind of Cache implementation")
chunks_index = os.path.join(archiver.cache_path, "chunks")
if not os.path.exists(chunks_index):
pytest.skip("Only works with LocalCache.")
corrupt(chunks_index)
if archiver.FORK_DEFAULT:
out = cmd(archiver, "rinfo", exit_code=2)
assert "failed integrity check" in out
@ -56,6 +63,9 @@ def test_cache_chunks(archiver):
def test_cache_files(archiver):
corrupt_archiver(archiver)
if archiver.cache_path is None:
pytest.skip("no cache path for this kind of Cache implementation")
cmd(archiver, "create", "test", "input")
corrupt(os.path.join(archiver.cache_path, "files"))
out = cmd(archiver, "create", "test1", "input")
@ -65,6 +75,9 @@ def test_cache_files(archiver):
def test_chunks_archive(archiver):
corrupt_archiver(archiver)
if archiver.cache_path is None:
pytest.skip("no cache path for this kind of Cache implementation")
cmd(archiver, "create", "test1", "input")
# Find ID of test1, so we can corrupt it later :)
target_id = cmd(archiver, "rlist", "--format={id}{NL}").strip()
@ -75,6 +88,8 @@ def test_chunks_archive(archiver):
cmd(archiver, "rinfo", "--json")
chunks_archive = os.path.join(archiver.cache_path, "chunks.archive.d")
if not os.path.exists(chunks_archive):
pytest.skip("Only LocalCache has a per-archive chunks index cache.")
assert len(os.listdir(chunks_archive)) == 4 # two archives, one chunks cache and one .integrity file each
corrupt(os.path.join(chunks_archive, target_id + ".compact"))
@ -96,6 +111,9 @@ def test_chunks_archive(archiver):
def test_old_version_interfered(archiver):
corrupt_archiver(archiver)
if archiver.cache_path is None:
pytest.skip("no cache path for this kind of Cache implementation")
# Modify the main manifest ID without touching the manifest ID in the integrity section.
# This happens if a version without integrity checking modifies the cache.
config_path = os.path.join(archiver.cache_path, "config")

View File

@ -168,7 +168,12 @@ def test_debug_refcount_obj(archivers, request):
create_json = json.loads(cmd(archiver, "create", "--json", "test", "input"))
archive_id = create_json["archive"]["id"]
output = cmd(archiver, "debug", "refcount-obj", archive_id).strip()
assert output == f"object {archive_id} has 1 referrers [info from chunks cache]."
# LocalCache does precise refcounting, so we'll get 1 reference for the archive.
# AdHocCache or NewCache doesn't, we'll get ChunkIndex.MAX_VALUE as refcount.
assert (
output == f"object {archive_id} has 1 referrers [info from chunks cache]."
or output == f"object {archive_id} has 4294966271 referrers [info from chunks cache]."
)
# Invalid IDs do not abort or return an error
output = cmd(archiver, "debug", "refcount-obj", "124", "xyza").strip()

View File

@ -189,7 +189,7 @@ class TestAdHocCache:
def test_does_not_delete_existing_chunks(self, repository, cache):
assert cache.seen_chunk(H(1)) == ChunkIndex.MAX_VALUE
cache.chunk_decref(H(1), Statistics())
cache.chunk_decref(H(1), 1, Statistics())
assert repository.get(H(1)) == b"1234"
def test_seen_chunk_add_chunk_size(self, cache):
@ -199,7 +199,7 @@ class TestAdHocCache:
"""E.g. checkpoint archives"""
cache.add_chunk(H(5), {}, b"1010", stats=Statistics())
assert cache.seen_chunk(H(5)) == 1
cache.chunk_decref(H(5), Statistics())
cache.chunk_decref(H(5), 1, Statistics())
assert not cache.seen_chunk(H(5))
with pytest.raises(Repository.ObjectNotFound):
repository.get(H(5))
@ -220,9 +220,9 @@ class TestAdHocCache:
def test_incref_after_add_chunk(self, cache):
assert cache.add_chunk(H(3), {}, b"5678", stats=Statistics()) == (H(3), 4)
assert cache.chunk_incref(H(3), Statistics()) == (H(3), 4)
assert cache.chunk_incref(H(3), 4, Statistics()) == (H(3), 4)
def test_existing_incref_after_add_chunk(self, cache):
"""This case occurs with part files, see Archive.chunk_file."""
assert cache.add_chunk(H(1), {}, b"5678", stats=Statistics()) == (H(1), 4)
assert cache.chunk_incref(H(1), Statistics()) == (H(1), 4)
assert cache.chunk_incref(H(1), 4, Statistics()) == (H(1), 4)

View File

@ -368,7 +368,7 @@ def test_text_invalid(text):
def test_format_timedelta():
t0 = datetime(2001, 1, 1, 10, 20, 3, 0)
t1 = datetime(2001, 1, 1, 12, 20, 4, 100000)
assert format_timedelta(t1 - t0) == "2 hours 1.10 seconds"
assert format_timedelta(t1 - t0) == "2 hours 1.100 seconds"
@pytest.mark.parametrize(

View File

@ -84,8 +84,8 @@ class UpgraderFrom12To20:
chunks, chunks_healthy = self.hlm.retrieve(id=hlid, default=(None, None))
if chunks is not None:
item.chunks = chunks
for chunk_id, _ in chunks:
self.cache.chunk_incref(chunk_id, self.archive.stats)
for chunk_id, chunk_size in chunks:
self.cache.chunk_incref(chunk_id, chunk_size, self.archive.stats)
if chunks_healthy is not None:
item.chunks_healthy = chunks
del item.source # not used for hardlinks any more, replaced by hlid