Compare commits

...

17 Commits

Author SHA1 Message Date
TW 7028d277f8
Merge 1389bd10bc into 411c763fb8 2024-04-04 06:33:33 -05:00
Thomas Waldmann 1389bd10bc
fix test_debug_refcount_obj for misc. refcounts 2024-04-04 13:33:17 +02:00
Thomas Waldmann 8f2d41ab6c
fix test part that only works with LocalCache 2024-04-04 13:33:15 +02:00
Thomas Waldmann 30f6f1a4c4
skip tests requiring the chunks index (archive)
Only LocalCache implements these.
2024-04-04 13:33:14 +02:00
Thomas Waldmann b05e3067b8
check: do not consider orphan chunks a problem
if we use AdHocCache or NewCache, we do not have precise refcounting.
thus, we do not delete repo objects as their refcount does not go to zero.

check --repair will just remove the orphans.
2024-04-04 13:33:12 +02:00
Thomas Waldmann 6087d7487b
WIP (tests failing) implement NewCache
Also:
- move common code to ChunksMixin
- always use ._txn_active (not .txn_active)
2024-04-04 13:33:10 +02:00
Thomas Waldmann 5ec557908c
AdHocCache has no cache persistence
thus:

- no cache.path
- skip on-disk cache corruption tests for AdHocCache
2024-04-04 13:33:09 +02:00
Thomas Waldmann 186d536d55
tolerate missing chunks with delete --force
if a chunk is missing in repo, it will also be missing in a ad-hoc
built chunks index.
2024-04-04 13:33:07 +02:00
Thomas Waldmann 62289ff513
refactor files cache code into FilesCacheMixin class 2024-04-04 13:33:05 +02:00
Thomas Waldmann 08003f4065
create --no-cache-sync-forced option
when given, force using the AdHocCache.
2024-04-04 13:33:04 +02:00
Thomas Waldmann 26c6a1035a
fix AdHocCache.add_chunk signature (ctype, clevel kwargs) 2024-04-04 13:33:02 +02:00
Thomas Waldmann eafb80ccc0
always give id and size to chunk_incref/chunk_decref
incref: returns (id, size), so it needs the size if it can't
get it from the chunks index. also needed for updating stats.

decref: caller does not always have the chunk size (e.g. for
metadata chunks),
as we consider 0 to be an invalid size, we call with size == 1
in that case. thus, stats might be slightly off.
2024-04-04 13:33:01 +02:00
Thomas Waldmann c5e130d03d
files cache: add chunk size information
the files cache used to have only the chunk ids,
so it had to rely on the chunks index having the
size information - which is problematic with e.g.
the AdhocCache (has size==0 for all not new chunks) and blocked using the files cache there.
2024-04-04 13:32:55 +02:00
TW 411c763fb8
Merge pull request #8182 from ThomasWaldmann/fix-test-ht-master
format_timedelta: use 3 decimal digits (ms)
2024-04-04 13:31:58 +02:00
Thomas Waldmann 54a85bf56d
format_timedelta: use 3 decimal digits (ms)
maybe this fixes the frequently failing test.
also, giving ms makes more sense than 10ms granularity.
2024-04-04 12:45:28 +02:00
TW 4d2eb0cb1b
Merge pull request #8181 from ThomasWaldmann/github-actions-update-master
update github actions
2024-04-03 19:33:35 +02:00
Thomas Waldmann d893b899fc
update github actions
(avoid deprecation warnings)
2024-04-03 18:26:35 +02:00
17 changed files with 579 additions and 379 deletions

View File

@ -9,7 +9,7 @@ jobs:
lint: lint:
runs-on: ubuntu-22.04 runs-on: ubuntu-22.04
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
- uses: psf/black@stable - uses: psf/black@stable
with: with:
version: "~= 23.0" version: "~= 23.0"

View File

@ -78,11 +78,11 @@ jobs:
# just fetching 1 commit is not enough for setuptools-scm, so we fetch all # just fetching 1 commit is not enough for setuptools-scm, so we fetch all
fetch-depth: 0 fetch-depth: 0
- name: Set up Python ${{ matrix.python-version }} - name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4 uses: actions/setup-python@v5
with: with:
python-version: ${{ matrix.python-version }} python-version: ${{ matrix.python-version }}
- name: Cache pip - name: Cache pip
uses: actions/cache@v3 uses: actions/cache@v4
with: with:
path: ~/.cache/pip path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('requirements.d/development.txt') }} key: ${{ runner.os }}-pip-${{ hashFiles('requirements.d/development.txt') }}
@ -114,7 +114,7 @@ jobs:
#sudo -E bash -c "tox -e py" #sudo -E bash -c "tox -e py"
tox --skip-missing-interpreters tox --skip-missing-interpreters
- name: Upload coverage to Codecov - name: Upload coverage to Codecov
uses: codecov/codecov-action@v3 uses: codecov/codecov-action@v4
env: env:
OS: ${{ runner.os }} OS: ${{ runner.os }}
python: ${{ matrix.python-version }} python: ${{ matrix.python-version }}

View File

@ -643,14 +643,14 @@ Duration: {0.duration}
# so we can already remove it here, the next .save() will then commit this cleanup. # so we can already remove it here, the next .save() will then commit this cleanup.
# remove its manifest entry, remove its ArchiveItem chunk, remove its item_ptrs chunks: # remove its manifest entry, remove its ArchiveItem chunk, remove its item_ptrs chunks:
del self.manifest.archives[self.checkpoint_name] del self.manifest.archives[self.checkpoint_name]
self.cache.chunk_decref(self.id, self.stats) self.cache.chunk_decref(self.id, 1, self.stats)
for id in metadata.item_ptrs: for id in metadata.item_ptrs:
self.cache.chunk_decref(id, self.stats) self.cache.chunk_decref(id, 1, self.stats)
# also get rid of that part item, we do not want to have it in next checkpoint or final archive # also get rid of that part item, we do not want to have it in next checkpoint or final archive
tail_chunks = self.items_buffer.restore_chunks_state() tail_chunks = self.items_buffer.restore_chunks_state()
# tail_chunks contain the tail of the archive items metadata stream, not needed for next commit. # tail_chunks contain the tail of the archive items metadata stream, not needed for next commit.
for id in tail_chunks: for id in tail_chunks:
self.cache.chunk_decref(id, self.stats) self.cache.chunk_decref(id, 1, self.stats) # TODO can we have real size here?
def save(self, name=None, comment=None, timestamp=None, stats=None, additional_metadata=None): def save(self, name=None, comment=None, timestamp=None, stats=None, additional_metadata=None):
name = name or self.name name = name or self.name
@ -1024,7 +1024,7 @@ Duration: {0.duration}
new_id = self.key.id_hash(data) new_id = self.key.id_hash(data)
self.cache.add_chunk(new_id, {}, data, stats=self.stats, ro_type=ROBJ_ARCHIVE_META) self.cache.add_chunk(new_id, {}, data, stats=self.stats, ro_type=ROBJ_ARCHIVE_META)
self.manifest.archives[self.name] = (new_id, metadata.time) self.manifest.archives[self.name] = (new_id, metadata.time)
self.cache.chunk_decref(self.id, self.stats) self.cache.chunk_decref(self.id, 1, self.stats)
self.id = new_id self.id = new_id
def rename(self, name): def rename(self, name):
@ -1052,12 +1052,15 @@ Duration: {0.duration}
error = True error = True
return exception_ignored # must not return None here return exception_ignored # must not return None here
def chunk_decref(id, stats): def chunk_decref(id, size, stats):
try: try:
self.cache.chunk_decref(id, stats, wait=False) self.cache.chunk_decref(id, size, stats, wait=False)
except KeyError: except KeyError:
cid = bin_to_hex(id) nonlocal error
raise ChunksIndexError(cid) if forced == 0:
cid = bin_to_hex(id)
raise ChunksIndexError(cid)
error = True
else: else:
fetch_async_response(wait=False) fetch_async_response(wait=False)
@ -1073,13 +1076,13 @@ Duration: {0.duration}
pi.show(i) pi.show(i)
_, data = self.repo_objs.parse(items_id, data, ro_type=ROBJ_ARCHIVE_STREAM) _, data = self.repo_objs.parse(items_id, data, ro_type=ROBJ_ARCHIVE_STREAM)
unpacker.feed(data) unpacker.feed(data)
chunk_decref(items_id, stats) chunk_decref(items_id, 1, stats)
try: try:
for item in unpacker: for item in unpacker:
item = Item(internal_dict=item) item = Item(internal_dict=item)
if "chunks" in item: if "chunks" in item:
for chunk_id, size in item.chunks: for chunk_id, size in item.chunks:
chunk_decref(chunk_id, stats) chunk_decref(chunk_id, size, stats)
except (TypeError, ValueError): except (TypeError, ValueError):
# if items metadata spans multiple chunks and one chunk got dropped somehow, # if items metadata spans multiple chunks and one chunk got dropped somehow,
# it could be that unpacker yields bad types # it could be that unpacker yields bad types
@ -1096,12 +1099,12 @@ Duration: {0.duration}
# delete the blocks that store all the references that end up being loaded into metadata.items: # delete the blocks that store all the references that end up being loaded into metadata.items:
for id in self.metadata.item_ptrs: for id in self.metadata.item_ptrs:
chunk_decref(id, stats) chunk_decref(id, 1, stats)
# in forced delete mode, we try hard to delete at least the manifest entry, # in forced delete mode, we try hard to delete at least the manifest entry,
# if possible also the archive superblock, even if processing the items raises # if possible also the archive superblock, even if processing the items raises
# some harmless exception. # some harmless exception.
chunk_decref(self.id, stats) chunk_decref(self.id, 1, stats)
del self.manifest.archives[self.name] del self.manifest.archives[self.name]
while fetch_async_response(wait=True) is not None: while fetch_async_response(wait=True) is not None:
# we did async deletes, process outstanding results (== exceptions), # we did async deletes, process outstanding results (== exceptions),
@ -1510,7 +1513,7 @@ class FilesystemObjectProcessors:
except BackupOSError: except BackupOSError:
# see comments in process_file's exception handler, same issue here. # see comments in process_file's exception handler, same issue here.
for chunk in item.get("chunks", []): for chunk in item.get("chunks", []):
cache.chunk_decref(chunk.id, self.stats, wait=False) cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False)
raise raise
else: else:
item.get_size(memorize=True) item.get_size(memorize=True)
@ -1544,7 +1547,7 @@ class FilesystemObjectProcessors:
item.chunks = [] item.chunks = []
for chunk_id, chunk_size in hl_chunks: for chunk_id, chunk_size in hl_chunks:
# process one-by-one, so we will know in item.chunks how far we got # process one-by-one, so we will know in item.chunks how far we got
chunk_entry = cache.chunk_incref(chunk_id, self.stats) chunk_entry = cache.chunk_incref(chunk_id, chunk_size, self.stats)
item.chunks.append(chunk_entry) item.chunks.append(chunk_entry)
else: # normal case, no "2nd+" hardlink else: # normal case, no "2nd+" hardlink
if not is_special_file: if not is_special_file:
@ -1552,26 +1555,26 @@ class FilesystemObjectProcessors:
started_hashing = time.monotonic() started_hashing = time.monotonic()
path_hash = self.key.id_hash(hashed_path) path_hash = self.key.id_hash(hashed_path)
self.stats.hashing_time += time.monotonic() - started_hashing self.stats.hashing_time += time.monotonic() - started_hashing
known, ids = cache.file_known_and_unchanged(hashed_path, path_hash, st) known, chunks = cache.file_known_and_unchanged(hashed_path, path_hash, st)
else: else:
# in --read-special mode, we may be called for special files. # in --read-special mode, we may be called for special files.
# there should be no information in the cache about special files processed in # there should be no information in the cache about special files processed in
# read-special mode, but we better play safe as this was wrong in the past: # read-special mode, but we better play safe as this was wrong in the past:
hashed_path = path_hash = None hashed_path = path_hash = None
known, ids = False, None known, chunks = False, None
if ids is not None: if chunks is not None:
# Make sure all ids are available # Make sure all ids are available
for id_ in ids: for chunk in chunks:
if not cache.seen_chunk(id_): if not cache.seen_chunk(chunk.id):
# cache said it is unmodified, but we lost a chunk: process file like modified # cache said it is unmodified, but we lost a chunk: process file like modified
status = "M" status = "M"
break break
else: else:
item.chunks = [] item.chunks = []
for chunk_id in ids: for chunk in chunks:
# process one-by-one, so we will know in item.chunks how far we got # process one-by-one, so we will know in item.chunks how far we got
chunk_entry = cache.chunk_incref(chunk_id, self.stats) cache.chunk_incref(chunk.id, chunk.size, self.stats)
item.chunks.append(chunk_entry) item.chunks.append(chunk)
status = "U" # regular file, unchanged status = "U" # regular file, unchanged
else: else:
status = "M" if known else "A" # regular file, modified or added status = "M" if known else "A" # regular file, modified or added
@ -1606,7 +1609,7 @@ class FilesystemObjectProcessors:
# block or char device will change without its mtime/size/inode changing. # block or char device will change without its mtime/size/inode changing.
# also, we must not memorize a potentially inconsistent/corrupt file that # also, we must not memorize a potentially inconsistent/corrupt file that
# changed while we backed it up. # changed while we backed it up.
cache.memorize_file(hashed_path, path_hash, st, [c.id for c in item.chunks]) cache.memorize_file(hashed_path, path_hash, st, item.chunks)
self.stats.files_stats[status] += 1 # must be done late self.stats.files_stats[status] += 1 # must be done late
if not changed_while_backup: if not changed_while_backup:
status = None # we already called print_file_status status = None # we already called print_file_status
@ -1620,7 +1623,7 @@ class FilesystemObjectProcessors:
# but we will not add an item (see add_item in create_helper) and thus # but we will not add an item (see add_item in create_helper) and thus
# they would be orphaned chunks in case that we commit the transaction. # they would be orphaned chunks in case that we commit the transaction.
for chunk in item.get("chunks", []): for chunk in item.get("chunks", []):
cache.chunk_decref(chunk.id, self.stats, wait=False) cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False)
# Now that we have cleaned up the chunk references, we can re-raise the exception. # Now that we have cleaned up the chunk references, we can re-raise the exception.
# This will skip processing of this file, but might retry or continue with the next one. # This will skip processing of this file, but might retry or continue with the next one.
raise raise
@ -1731,7 +1734,7 @@ class TarfileObjectProcessors:
except BackupOSError: except BackupOSError:
# see comment in FilesystemObjectProcessors.process_file, same issue here. # see comment in FilesystemObjectProcessors.process_file, same issue here.
for chunk in item.get("chunks", []): for chunk in item.get("chunks", []):
self.cache.chunk_decref(chunk.id, self.stats, wait=False) self.cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False)
raise raise
@ -2328,10 +2331,10 @@ class ArchiveChecker:
unused = {id_ for id_, entry in self.chunks.iteritems() if entry.refcount == 0} unused = {id_ for id_, entry in self.chunks.iteritems() if entry.refcount == 0}
orphaned = unused - self.possibly_superseded orphaned = unused - self.possibly_superseded
if orphaned: if orphaned:
logger.error(f"{len(orphaned)} orphaned objects found!") logger.info(f"{len(orphaned)} orphaned (unused) objects found.")
for chunk_id in orphaned: for chunk_id in orphaned:
logger.debug(f"chunk {bin_to_hex(chunk_id)} is orphaned.") logger.debug(f"chunk {bin_to_hex(chunk_id)} is orphaned.")
self.error_found = True # To support working with AdHocCache or NewCache, we do not set self.error_found = True.
if self.repair and unused: if self.repair and unused:
logger.info( logger.info(
"Deleting %d orphaned and %d superseded objects..." % (len(orphaned), len(self.possibly_superseded)) "Deleting %d orphaned and %d superseded objects..." % (len(orphaned), len(self.possibly_superseded))
@ -2444,7 +2447,7 @@ class ArchiveRecreater:
def process_chunks(self, archive, target, item): def process_chunks(self, archive, target, item):
if not target.recreate_rechunkify: if not target.recreate_rechunkify:
for chunk_id, size in item.chunks: for chunk_id, size in item.chunks:
self.cache.chunk_incref(chunk_id, target.stats) self.cache.chunk_incref(chunk_id, size, target.stats)
return item.chunks return item.chunks
chunk_iterator = self.iter_chunks(archive, target, list(item.chunks)) chunk_iterator = self.iter_chunks(archive, target, list(item.chunks))
chunk_processor = partial(self.chunk_processor, target) chunk_processor = partial(self.chunk_processor, target)
@ -2452,8 +2455,9 @@ class ArchiveRecreater:
def chunk_processor(self, target, chunk): def chunk_processor(self, target, chunk):
chunk_id, data = cached_hash(chunk, self.key.id_hash) chunk_id, data = cached_hash(chunk, self.key.id_hash)
size = len(data)
if chunk_id in self.seen_chunks: if chunk_id in self.seen_chunks:
return self.cache.chunk_incref(chunk_id, target.stats) return self.cache.chunk_incref(chunk_id, size, target.stats)
chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, wait=False, ro_type=ROBJ_FILE_STREAM) chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, wait=False, ro_type=ROBJ_FILE_STREAM)
self.cache.repository.async_response(wait=False) self.cache.repository.async_response(wait=False)
self.seen_chunks.add(chunk_entry.id) self.seen_chunks.add(chunk_entry.id)

View File

@ -225,6 +225,7 @@ class CreateMixIn:
progress=args.progress, progress=args.progress,
lock_wait=self.lock_wait, lock_wait=self.lock_wait,
permit_adhoc_cache=args.no_cache_sync, permit_adhoc_cache=args.no_cache_sync,
force_adhoc_cache=args.no_cache_sync_forced,
cache_mode=args.files_cache_mode, cache_mode=args.files_cache_mode,
iec=args.iec, iec=args.iec,
) as cache: ) as cache:
@ -803,6 +804,12 @@ class CreateMixIn:
action="store_true", action="store_true",
help="experimental: do not synchronize the cache. Implies not using the files cache.", help="experimental: do not synchronize the cache. Implies not using the files cache.",
) )
subparser.add_argument(
"--no-cache-sync-forced",
dest="no_cache_sync_forced",
action="store_true",
help="experimental: do not synchronize the cache (forced). Implies not using the files cache.",
)
subparser.add_argument( subparser.add_argument(
"--stdin-name", "--stdin-name",
metavar="NAME", metavar="NAME",

View File

@ -59,16 +59,9 @@ class RInfoMixIn:
output += f" out of {format_file_size(storage_quota, iec=args.iec)}" output += f" out of {format_file_size(storage_quota, iec=args.iec)}"
output += "\n" output += "\n"
output += ( if hasattr(info["cache"], "path"):
textwrap.dedent( output += "Cache: {cache.path}\n".format(**info)
""" output += "Security dir: {security_dir}\n".format(**info)
Cache: {cache.path}
Security dir: {security_dir}
"""
)
.strip()
.format(**info)
)
print(output) print(output)
print(str(cache)) print(str(cache))

View File

@ -143,7 +143,7 @@ class TransferMixIn:
transfer_size += size transfer_size += size
else: else:
if not dry_run: if not dry_run:
chunk_entry = cache.chunk_incref(chunk_id, archive.stats) chunk_entry = cache.chunk_incref(chunk_id, size, archive.stats)
chunks.append(chunk_entry) chunks.append(chunk_entry)
present_size += size present_size += size
if not dry_run: if not dry_run:

View File

@ -35,8 +35,8 @@ from .platform import SaveFile
from .remote import cache_if_remote from .remote import cache_if_remote
from .repository import LIST_SCAN_LIMIT from .repository import LIST_SCAN_LIMIT
# note: cmtime might me either a ctime or a mtime timestamp # note: cmtime might be either a ctime or a mtime timestamp, chunks is a list of ChunkListEntry
FileCacheEntry = namedtuple("FileCacheEntry", "age inode size cmtime chunk_ids") FileCacheEntry = namedtuple("FileCacheEntry", "age inode size cmtime chunks")
class SecurityManager: class SecurityManager:
@ -248,15 +248,6 @@ def cache_dir(repository, path=None):
return path or os.path.join(get_cache_dir(), repository.id_str) return path or os.path.join(get_cache_dir(), repository.id_str)
def files_cache_name():
suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "")
return "files." + suffix if suffix else "files"
def discover_files_cache_name(path):
return [fn for fn in os.listdir(path) if fn == "files" or fn.startswith("files.")][0]
class CacheConfig: class CacheConfig:
def __init__(self, repository, path=None, lock_wait=None): def __init__(self, repository, path=None, lock_wait=None):
self.repository = repository self.repository = repository
@ -413,6 +404,7 @@ class Cache:
progress=False, progress=False,
lock_wait=None, lock_wait=None,
permit_adhoc_cache=False, permit_adhoc_cache=False,
force_adhoc_cache=False,
cache_mode=FILES_CACHE_MODE_DISABLED, cache_mode=FILES_CACHE_MODE_DISABLED,
iec=False, iec=False,
): ):
@ -428,9 +420,23 @@ class Cache:
cache_mode=cache_mode, cache_mode=cache_mode,
) )
def newcache():
return NewCache(
manifest=manifest,
path=path,
warn_if_unencrypted=warn_if_unencrypted,
progress=progress,
iec=iec,
lock_wait=lock_wait,
cache_mode=cache_mode,
)
def adhoc(): def adhoc():
return AdHocCache(manifest=manifest, lock_wait=lock_wait, iec=iec) return AdHocCache(manifest=manifest, lock_wait=lock_wait, iec=iec)
if force_adhoc_cache:
return adhoc()
if not permit_adhoc_cache: if not permit_adhoc_cache:
return local() return local()
@ -489,7 +495,302 @@ Total chunks: {0.total_chunks}
return self.Summary(**stats) return self.Summary(**stats)
class LocalCache(CacheStatsMixin): class FilesCacheMixin:
"""
Massively accelerate processing of unchanged files by caching their chunks list.
With that, we can avoid having to read and chunk them to get their chunks list.
"""
FILES_CACHE_NAME = "files"
def __init__(self, cache_mode):
self.cache_mode = cache_mode
self.files = None
self._newest_cmtime = None
def files_cache_name(self):
suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "")
return self.FILES_CACHE_NAME + "." + suffix if suffix else self.FILES_CACHE_NAME
def discover_files_cache_name(self, path):
return [
fn for fn in os.listdir(path) if fn == self.FILES_CACHE_NAME or fn.startswith(self.FILES_CACHE_NAME + ".")
][0]
def _create_empty_files_cache(self, path):
with SaveFile(os.path.join(path, self.files_cache_name()), binary=True):
pass # empty file
def _read_files_cache(self):
if "d" in self.cache_mode: # d(isabled)
return
self.files = {}
logger.debug("Reading files cache ...")
files_cache_logger.debug("FILES-CACHE-LOAD: starting...")
msg = None
try:
with IntegrityCheckedFile(
path=os.path.join(self.path, self.files_cache_name()),
write=False,
integrity_data=self.cache_config.integrity.get(self.files_cache_name()),
) as fd:
u = msgpack.Unpacker(use_list=True)
while True:
data = fd.read(64 * 1024)
if not data:
break
u.feed(data)
try:
for path_hash, item in u:
entry = FileCacheEntry(*item)
# in the end, this takes about 240 Bytes per file
self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1))
except (TypeError, ValueError) as exc:
msg = "The files cache seems invalid. [%s]" % str(exc)
break
except OSError as exc:
msg = "The files cache can't be read. [%s]" % str(exc)
except FileIntegrityError as fie:
msg = "The files cache is corrupted. [%s]" % str(fie)
if msg is not None:
logger.warning(msg)
logger.warning("Continuing without files cache - expect lower performance.")
self.files = {}
files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files))
def _write_files_cache(self):
if self._newest_cmtime is None:
# was never set because no files were modified/added
self._newest_cmtime = 2**63 - 1 # nanoseconds, good until y2262
ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20))
files_cache_logger.debug("FILES-CACHE-SAVE: starting...")
with IntegrityCheckedFile(path=os.path.join(self.path, self.files_cache_name()), write=True) as fd:
entry_count = 0
for path_hash, item in self.files.items():
# Only keep files seen in this backup that are older than newest cmtime seen in this backup -
# this is to avoid issues with filesystem snapshots and cmtime granularity.
# Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
entry = FileCacheEntry(*msgpack.unpackb(item))
if (
entry.age == 0
and timestamp_to_int(entry.cmtime) < self._newest_cmtime
or entry.age > 0
and entry.age < ttl
):
msgpack.pack((path_hash, entry), fd)
entry_count += 1
files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl)
files_cache_logger.debug(
"FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime
)
files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count)
return fd.integrity_data
def file_known_and_unchanged(self, hashed_path, path_hash, st):
"""
Check if we know the file that has this path_hash (know == it is in our files cache) and
whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode).
:param hashed_path: the file's path as we gave it to hash(hashed_path)
:param path_hash: hash(hashed_path), to save some memory in the files cache
:param st: the file's stat() result
:return: known, chunks (known is True if we have infos about this file in the cache,
chunks is a list[ChunkListEntry] IF the file has not changed, otherwise None).
"""
if not stat.S_ISREG(st.st_mode):
return False, None
cache_mode = self.cache_mode
if "d" in cache_mode: # d(isabled)
files_cache_logger.debug("UNKNOWN: files cache disabled")
return False, None
# note: r(echunk) does not need the files cache in this method, but the files cache will
# be updated and saved to disk to memorize the files. To preserve previous generations in
# the cache, this means that it also needs to get loaded from disk first.
if "r" in cache_mode: # r(echunk)
files_cache_logger.debug("UNKNOWN: rechunking enforced")
return False, None
entry = self.files.get(path_hash)
if not entry:
files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path)
return False, None
# we know the file!
entry = FileCacheEntry(*msgpack.unpackb(entry))
if "s" in cache_mode and entry.size != st.st_size:
files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path)
return True, None
if "i" in cache_mode and entry.inode != st.st_ino:
files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path)
return True, None
if "c" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns:
files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path)
return True, None
elif "m" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns:
files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path)
return True, None
# we ignored the inode number in the comparison above or it is still same.
# if it is still the same, replacing it in the tuple doesn't change it.
# if we ignored it, a reason for doing that is that files were moved to a new
# disk / new fs (so a one-time change of inode number is expected) and we wanted
# to avoid everything getting chunked again. to be able to re-enable the inode
# number comparison in a future backup run (and avoid chunking everything
# again at that time), we need to update the inode number in the cache with what
# we see in the filesystem.
self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0))
chunks = [ChunkListEntry(*chunk) for chunk in entry.chunks] # convert to list of namedtuple
return True, chunks
def memorize_file(self, hashed_path, path_hash, st, chunks):
if not stat.S_ISREG(st.st_mode):
return
cache_mode = self.cache_mode
# note: r(echunk) modes will update the files cache, d(isabled) mode won't
if "d" in cache_mode:
files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled")
return
if "c" in cache_mode:
cmtime_type = "ctime"
cmtime_ns = safe_ns(st.st_ctime_ns)
elif "m" in cache_mode:
cmtime_type = "mtime"
cmtime_ns = safe_ns(st.st_mtime_ns)
else: # neither 'c' nor 'm' in cache_mode, avoid UnboundLocalError
cmtime_type = "ctime"
cmtime_ns = safe_ns(st.st_ctime_ns)
entry = FileCacheEntry(
age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunks=chunks
)
self.files[path_hash] = msgpack.packb(entry)
self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns)
files_cache_logger.debug(
"FILES-CACHE-UPDATE: put %r [has %s] <- %r",
entry._replace(chunks="[%d entries]" % len(entry.chunks)),
cmtime_type,
hashed_path,
)
class ChunksMixin:
"""
Chunks index related code for misc. Cache implementations.
"""
def chunk_incref(self, id, size, stats):
assert isinstance(size, int) and size > 0
if not self._txn_active:
self.begin_txn()
count, _size = self.chunks.incref(id)
stats.update(size, False)
return ChunkListEntry(id, size)
def chunk_decref(self, id, size, stats, wait=True):
assert isinstance(size, int) and size > 0
if not self._txn_active:
self.begin_txn()
count, _size = self.chunks.decref(id)
if count == 0:
del self.chunks[id]
self.repository.delete(id, wait=wait)
stats.update(-size, True)
else:
stats.update(-size, False)
def seen_chunk(self, id, size=None):
if not self._txn_active:
self.begin_txn()
entry = self.chunks.get(id, ChunkIndexEntry(0, None))
if entry.refcount and size is not None:
assert isinstance(entry.size, int)
if entry.size:
# LocalCache: has existing size information and uses *size* to make an effort at detecting collisions.
if size != entry.size:
# we already have a chunk with that id, but different size.
# this is either a hash collision (unlikely) or corruption or a bug.
raise Exception(
"chunk has same id [%r], but different size (stored: %d new: %d)!" % (id, entry.size, size)
)
else:
# NewCache / AdHocCache:
# Here *size* is used to update the chunk's size information, which will be zero for existing chunks.
self.chunks[id] = entry._replace(size=size)
return entry.refcount
def add_chunk(
self,
id,
meta,
data,
*,
stats,
wait=True,
compress=True,
size=None,
ctype=None,
clevel=None,
ro_type=ROBJ_FILE_STREAM,
):
assert ro_type is not None
if not self._txn_active:
self.begin_txn()
if size is None:
if compress:
size = len(data) # data is still uncompressed
else:
raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
refcount = self.seen_chunk(id, size)
if refcount:
return self.chunk_incref(id, size, stats)
cdata = self.repo_objs.format(
id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type
)
self.repository.put(id, cdata, wait=wait)
self.chunks.add(id, 1, size)
stats.update(size, not refcount)
return ChunkListEntry(id, size)
def _load_chunks_from_repo(self):
# Explicitly set the initial usable hash table capacity to avoid performance issues
# due to hash table "resonance".
# Since we're creating an archive, add 10 % from the start.
num_chunks = len(self.repository)
chunks = ChunkIndex(usable=num_chunks * 1.1)
pi = ProgressIndicatorPercent(
total=num_chunks, msg="Downloading chunk list... %3.0f%%", msgid="cache.download_chunks"
)
t0 = perf_counter()
num_requests = 0
marker = None
while True:
result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
num_requests += 1
if not result:
break
pi.show(increase=len(result))
marker = result[-1]
# All chunks from the repository have a refcount of MAX_VALUE, which is sticky,
# therefore we can't/won't delete them. Chunks we added ourselves in this transaction
# (e.g. checkpoint archives) are tracked correctly.
init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
for id_ in result:
chunks[id_] = init_entry
assert len(chunks) == num_chunks
# LocalCache does not contain the manifest, either.
del chunks[self.manifest.MANIFEST_ID]
duration = perf_counter() - t0 or 0.01
pi.finish()
logger.debug(
"Cache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s",
num_chunks,
duration,
num_requests,
format_file_size(num_chunks * 34 / duration),
)
# Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes.
# Protocol overhead is neglected in this calculation.
return chunks
class LocalCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin):
""" """
Persistent, local (client-side) cache. Persistent, local (client-side) cache.
""" """
@ -512,15 +813,15 @@ class LocalCache(CacheStatsMixin):
:param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison :param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison
""" """
CacheStatsMixin.__init__(self, iec=iec) CacheStatsMixin.__init__(self, iec=iec)
FilesCacheMixin.__init__(self, cache_mode)
assert isinstance(manifest, Manifest) assert isinstance(manifest, Manifest)
self.manifest = manifest self.manifest = manifest
self.repository = manifest.repository self.repository = manifest.repository
self.key = manifest.key self.key = manifest.key
self.repo_objs = manifest.repo_objs self.repo_objs = manifest.repo_objs
self.progress = progress self.progress = progress
self.cache_mode = cache_mode
self.timestamp = None self.timestamp = None
self.txn_active = False self._txn_active = False
self.path = cache_dir(self.repository, path) self.path = cache_dir(self.repository, path)
self.security_manager = SecurityManager(self.repository) self.security_manager = SecurityManager(self.repository)
@ -561,8 +862,7 @@ class LocalCache(CacheStatsMixin):
self.cache_config.create() self.cache_config.create()
ChunkIndex().write(os.path.join(self.path, "chunks")) ChunkIndex().write(os.path.join(self.path, "chunks"))
os.makedirs(os.path.join(self.path, "chunks.archive.d")) os.makedirs(os.path.join(self.path, "chunks.archive.d"))
with SaveFile(os.path.join(self.path, files_cache_name()), binary=True): self._create_empty_files_cache(self.path)
pass # empty file
def _do_open(self): def _do_open(self):
self.cache_config.load() self.cache_config.load()
@ -572,10 +872,7 @@ class LocalCache(CacheStatsMixin):
integrity_data=self.cache_config.integrity.get("chunks"), integrity_data=self.cache_config.integrity.get("chunks"),
) as fd: ) as fd:
self.chunks = ChunkIndex.read(fd) self.chunks = ChunkIndex.read(fd)
if "d" in self.cache_mode: # d(isabled) self._read_files_cache()
self.files = None
else:
self._read_files()
def open(self): def open(self):
if not os.path.isdir(self.path): if not os.path.isdir(self.path):
@ -588,42 +885,6 @@ class LocalCache(CacheStatsMixin):
self.cache_config.close() self.cache_config.close()
self.cache_config = None self.cache_config = None
def _read_files(self):
self.files = {}
self._newest_cmtime = None
logger.debug("Reading files cache ...")
files_cache_logger.debug("FILES-CACHE-LOAD: starting...")
msg = None
try:
with IntegrityCheckedFile(
path=os.path.join(self.path, files_cache_name()),
write=False,
integrity_data=self.cache_config.integrity.get(files_cache_name()),
) as fd:
u = msgpack.Unpacker(use_list=True)
while True:
data = fd.read(64 * 1024)
if not data:
break
u.feed(data)
try:
for path_hash, item in u:
entry = FileCacheEntry(*item)
# in the end, this takes about 240 Bytes per file
self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1))
except (TypeError, ValueError) as exc:
msg = "The files cache seems invalid. [%s]" % str(exc)
break
except OSError as exc:
msg = "The files cache can't be read. [%s]" % str(exc)
except FileIntegrityError as fie:
msg = "The files cache is corrupted. [%s]" % str(fie)
if msg is not None:
logger.warning(msg)
logger.warning("Continuing without files cache - expect lower performance.")
self.files = {}
files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files))
def begin_txn(self): def begin_txn(self):
# Initialize transaction snapshot # Initialize transaction snapshot
pi = ProgressIndicatorMessage(msgid="cache.begin_transaction") pi = ProgressIndicatorMessage(msgid="cache.begin_transaction")
@ -635,48 +896,23 @@ class LocalCache(CacheStatsMixin):
shutil.copy(os.path.join(self.path, "chunks"), txn_dir) shutil.copy(os.path.join(self.path, "chunks"), txn_dir)
pi.output("Initializing cache transaction: Reading files") pi.output("Initializing cache transaction: Reading files")
try: try:
shutil.copy(os.path.join(self.path, files_cache_name()), txn_dir) shutil.copy(os.path.join(self.path, self.files_cache_name()), txn_dir)
except FileNotFoundError: except FileNotFoundError:
with SaveFile(os.path.join(txn_dir, files_cache_name()), binary=True): self._create_empty_files_cache(txn_dir)
pass # empty file
os.replace(txn_dir, os.path.join(self.path, "txn.active")) os.replace(txn_dir, os.path.join(self.path, "txn.active"))
self.txn_active = True self._txn_active = True
pi.finish() pi.finish()
def commit(self): def commit(self):
"""Commit transaction""" """Commit transaction"""
if not self.txn_active: if not self._txn_active:
return return
self.security_manager.save(self.manifest, self.key) self.security_manager.save(self.manifest, self.key)
pi = ProgressIndicatorMessage(msgid="cache.commit") pi = ProgressIndicatorMessage(msgid="cache.commit")
if self.files is not None: if self.files is not None:
if self._newest_cmtime is None:
# was never set because no files were modified/added
self._newest_cmtime = 2**63 - 1 # nanoseconds, good until y2262
ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20))
pi.output("Saving files cache") pi.output("Saving files cache")
files_cache_logger.debug("FILES-CACHE-SAVE: starting...") integrity_data = self._write_files_cache()
with IntegrityCheckedFile(path=os.path.join(self.path, files_cache_name()), write=True) as fd: self.cache_config.integrity[self.files_cache_name()] = integrity_data
entry_count = 0
for path_hash, item in self.files.items():
# Only keep files seen in this backup that are older than newest cmtime seen in this backup -
# this is to avoid issues with filesystem snapshots and cmtime granularity.
# Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
entry = FileCacheEntry(*msgpack.unpackb(item))
if (
entry.age == 0
and timestamp_to_int(entry.cmtime) < self._newest_cmtime
or entry.age > 0
and entry.age < ttl
):
msgpack.pack((path_hash, entry), fd)
entry_count += 1
files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl)
files_cache_logger.debug(
"FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime
)
files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count)
self.cache_config.integrity[files_cache_name()] = fd.integrity_data
pi.output("Saving chunks cache") pi.output("Saving chunks cache")
with IntegrityCheckedFile(path=os.path.join(self.path, "chunks"), write=True) as fd: with IntegrityCheckedFile(path=os.path.join(self.path, "chunks"), write=True) as fd:
self.chunks.write(fd) self.chunks.write(fd)
@ -685,7 +921,7 @@ class LocalCache(CacheStatsMixin):
self.cache_config.save(self.manifest, self.key) self.cache_config.save(self.manifest, self.key)
os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp")) os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp"))
shutil.rmtree(os.path.join(self.path, "txn.tmp")) shutil.rmtree(os.path.join(self.path, "txn.tmp"))
self.txn_active = False self._txn_active = False
pi.finish() pi.finish()
def rollback(self): def rollback(self):
@ -698,12 +934,12 @@ class LocalCache(CacheStatsMixin):
if os.path.exists(txn_dir): if os.path.exists(txn_dir):
shutil.copy(os.path.join(txn_dir, "config"), self.path) shutil.copy(os.path.join(txn_dir, "config"), self.path)
shutil.copy(os.path.join(txn_dir, "chunks"), self.path) shutil.copy(os.path.join(txn_dir, "chunks"), self.path)
shutil.copy(os.path.join(txn_dir, discover_files_cache_name(txn_dir)), self.path) shutil.copy(os.path.join(txn_dir, self.discover_files_cache_name(txn_dir)), self.path)
txn_tmp = os.path.join(self.path, "txn.tmp") txn_tmp = os.path.join(self.path, "txn.tmp")
os.replace(txn_dir, txn_tmp) os.replace(txn_dir, txn_tmp)
if os.path.exists(txn_tmp): if os.path.exists(txn_tmp):
shutil.rmtree(txn_tmp) shutil.rmtree(txn_tmp)
self.txn_active = False self._txn_active = False
self._do_open() self._do_open()
def sync(self): def sync(self):
@ -930,8 +1166,7 @@ class LocalCache(CacheStatsMixin):
shutil.rmtree(os.path.join(self.path, "chunks.archive.d")) shutil.rmtree(os.path.join(self.path, "chunks.archive.d"))
os.makedirs(os.path.join(self.path, "chunks.archive.d")) os.makedirs(os.path.join(self.path, "chunks.archive.d"))
self.chunks = ChunkIndex() self.chunks = ChunkIndex()
with SaveFile(os.path.join(self.path, files_cache_name()), binary=True): self._create_empty_files_cache(self.path)
pass # empty file
self.cache_config.manifest_id = "" self.cache_config.manifest_id = ""
self.cache_config._config.set("cache", "manifest", "") self.cache_config._config.set("cache", "manifest", "")
@ -948,149 +1183,171 @@ class LocalCache(CacheStatsMixin):
self.cache_config.ignored_features.update(repo_features - my_features) self.cache_config.ignored_features.update(repo_features - my_features)
self.cache_config.mandatory_features.update(repo_features & my_features) self.cache_config.mandatory_features.update(repo_features & my_features)
def add_chunk(
class NewCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin):
"""
Like AdHocCache, but with a files cache.
"""
def __init__(
self, self,
id, manifest,
meta, path=None,
data, warn_if_unencrypted=True,
*, progress=False,
stats, lock_wait=None,
wait=True, cache_mode=FILES_CACHE_MODE_DISABLED,
compress=True, iec=False,
size=None,
ctype=None,
clevel=None,
ro_type=ROBJ_FILE_STREAM,
): ):
assert ro_type is not None
if not self.txn_active:
self.begin_txn()
if size is None and compress:
size = len(data) # data is still uncompressed
refcount = self.seen_chunk(id, size)
if refcount:
return self.chunk_incref(id, stats)
if size is None:
raise ValueError("when giving compressed data for a new chunk, the uncompressed size must be given also")
cdata = self.repo_objs.format(
id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type
)
self.repository.put(id, cdata, wait=wait)
self.chunks.add(id, 1, size)
stats.update(size, not refcount)
return ChunkListEntry(id, size)
def seen_chunk(self, id, size=None):
refcount, stored_size = self.chunks.get(id, ChunkIndexEntry(0, None))
if size is not None and stored_size is not None and size != stored_size:
# we already have a chunk with that id, but different size.
# this is either a hash collision (unlikely) or corruption or a bug.
raise Exception(
"chunk has same id [%r], but different size (stored: %d new: %d)!" % (id, stored_size, size)
)
return refcount
def chunk_incref(self, id, stats, size=None):
if not self.txn_active:
self.begin_txn()
count, _size = self.chunks.incref(id)
stats.update(_size, False)
return ChunkListEntry(id, _size)
def chunk_decref(self, id, stats, wait=True):
if not self.txn_active:
self.begin_txn()
count, size = self.chunks.decref(id)
if count == 0:
del self.chunks[id]
self.repository.delete(id, wait=wait)
stats.update(-size, True)
else:
stats.update(-size, False)
def file_known_and_unchanged(self, hashed_path, path_hash, st):
""" """
Check if we know the file that has this path_hash (know == it is in our files cache) and :param warn_if_unencrypted: print warning if accessing unknown unencrypted repository
whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode). :param lock_wait: timeout for lock acquisition (int [s] or None [wait forever])
:param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison
:param hashed_path: the file's path as we gave it to hash(hashed_path)
:param path_hash: hash(hashed_path), to save some memory in the files cache
:param st: the file's stat() result
:return: known, ids (known is True if we have infos about this file in the cache,
ids is the list of chunk ids IF the file has not changed, otherwise None).
""" """
if not stat.S_ISREG(st.st_mode): CacheStatsMixin.__init__(self, iec=iec)
return False, None FilesCacheMixin.__init__(self, cache_mode)
cache_mode = self.cache_mode assert isinstance(manifest, Manifest)
if "d" in cache_mode: # d(isabled) self.manifest = manifest
files_cache_logger.debug("UNKNOWN: files cache disabled") self.repository = manifest.repository
return False, None self.key = manifest.key
# note: r(echunk) does not need the files cache in this method, but the files cache will self.repo_objs = manifest.repo_objs
# be updated and saved to disk to memorize the files. To preserve previous generations in self.progress = progress
# the cache, this means that it also needs to get loaded from disk first. self.timestamp = None
if "r" in cache_mode: # r(echunk) self._txn_active = False
files_cache_logger.debug("UNKNOWN: rechunking enforced")
return False, None
entry = self.files.get(path_hash)
if not entry:
files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path)
return False, None
# we know the file!
entry = FileCacheEntry(*msgpack.unpackb(entry))
if "s" in cache_mode and entry.size != st.st_size:
files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path)
return True, None
if "i" in cache_mode and entry.inode != st.st_ino:
files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path)
return True, None
if "c" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns:
files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path)
return True, None
elif "m" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns:
files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path)
return True, None
# we ignored the inode number in the comparison above or it is still same.
# if it is still the same, replacing it in the tuple doesn't change it.
# if we ignored it, a reason for doing that is that files were moved to a new
# disk / new fs (so a one-time change of inode number is expected) and we wanted
# to avoid everything getting chunked again. to be able to re-enable the inode
# number comparison in a future backup run (and avoid chunking everything
# again at that time), we need to update the inode number in the cache with what
# we see in the filesystem.
self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0))
return True, entry.chunk_ids
def memorize_file(self, hashed_path, path_hash, st, ids): self.path = cache_dir(self.repository, path)
if not stat.S_ISREG(st.st_mode): self.security_manager = SecurityManager(self.repository)
self.cache_config = CacheConfig(self.repository, self.path, lock_wait)
# Warn user before sending data to a never seen before unencrypted repository
if not os.path.exists(self.path):
self.security_manager.assert_access_unknown(warn_if_unencrypted, manifest, self.key)
self.create()
self.open()
try:
self.security_manager.assert_secure(manifest, self.key, cache_config=self.cache_config)
if not self.check_cache_compatibility():
self.wipe_cache()
self.update_compatibility()
except: # noqa
self.close()
raise
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def create(self):
"""Create a new empty cache at `self.path`"""
os.makedirs(self.path)
with open(os.path.join(self.path, "README"), "w") as fd:
fd.write(CACHE_README)
self.cache_config.create()
self._create_empty_files_cache(self.path)
def _do_open(self):
self.cache_config.load()
self.chunks = self._load_chunks_from_repo()
self._read_files_cache()
def open(self):
if not os.path.isdir(self.path):
raise Exception("%s Does not look like a Borg cache" % self.path)
self.cache_config.open()
self.rollback()
def close(self):
if self.cache_config is not None:
self.cache_config.close()
self.cache_config = None
def begin_txn(self):
# Initialize transaction snapshot
pi = ProgressIndicatorMessage(msgid="cache.begin_transaction")
txn_dir = os.path.join(self.path, "txn.tmp")
os.mkdir(txn_dir)
pi.output("Initializing cache transaction: Reading config")
shutil.copy(os.path.join(self.path, "config"), txn_dir)
pi.output("Initializing cache transaction: Reading files")
try:
shutil.copy(os.path.join(self.path, self.files_cache_name()), txn_dir)
except FileNotFoundError:
self._create_empty_files_cache(txn_dir)
os.replace(txn_dir, os.path.join(self.path, "txn.active"))
pi.finish()
self._txn_active = True
def commit(self):
if not self._txn_active:
return return
cache_mode = self.cache_mode self.security_manager.save(self.manifest, self.key)
# note: r(echunk) modes will update the files cache, d(isabled) mode won't pi = ProgressIndicatorMessage(msgid="cache.commit")
if "d" in cache_mode: if self.files is not None:
files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled") pi.output("Saving files cache")
return integrity_data = self._write_files_cache()
if "c" in cache_mode: self.cache_config.integrity[self.files_cache_name()] = integrity_data
cmtime_type = "ctime" pi.output("Saving cache config")
cmtime_ns = safe_ns(st.st_ctime_ns) self.cache_config.save(self.manifest, self.key)
elif "m" in cache_mode: os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp"))
cmtime_type = "mtime" shutil.rmtree(os.path.join(self.path, "txn.tmp"))
cmtime_ns = safe_ns(st.st_mtime_ns) self._txn_active = False
else: # neither 'c' nor 'm' in cache_mode, avoid UnboundLocalError pi.finish()
cmtime_type = "ctime"
cmtime_ns = safe_ns(st.st_ctime_ns) def rollback(self):
entry = FileCacheEntry( # Remove partial transaction
age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunk_ids=ids if os.path.exists(os.path.join(self.path, "txn.tmp")):
) shutil.rmtree(os.path.join(self.path, "txn.tmp"))
self.files[path_hash] = msgpack.packb(entry) # Roll back active transaction
self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns) txn_dir = os.path.join(self.path, "txn.active")
files_cache_logger.debug( if os.path.exists(txn_dir):
"FILES-CACHE-UPDATE: put %r [has %s] <- %r", shutil.copy(os.path.join(txn_dir, "config"), self.path)
entry._replace(chunk_ids="[%d entries]" % len(entry.chunk_ids)), shutil.copy(os.path.join(txn_dir, self.discover_files_cache_name(txn_dir)), self.path)
cmtime_type, txn_tmp = os.path.join(self.path, "txn.tmp")
hashed_path, os.replace(txn_dir, txn_tmp)
) if os.path.exists(txn_tmp):
shutil.rmtree(txn_tmp)
self._txn_active = False
self._do_open()
def check_cache_compatibility(self):
my_features = Manifest.SUPPORTED_REPO_FEATURES
if self.cache_config.ignored_features & my_features:
# The cache might not contain references of chunks that need a feature that is mandatory for some operation
# and which this version supports. To avoid corruption while executing that operation force rebuild.
return False
if not self.cache_config.mandatory_features <= my_features:
# The cache was build with consideration to at least one feature that this version does not understand.
# This client might misinterpret the cache. Thus force a rebuild.
return False
return True
def wipe_cache(self):
logger.warning("Discarding incompatible cache and forcing a cache rebuild")
self.chunks = ChunkIndex()
self._create_empty_files_cache(self.path)
self.cache_config.manifest_id = ""
self.cache_config._config.set("cache", "manifest", "")
self.cache_config.ignored_features = set()
self.cache_config.mandatory_features = set()
def update_compatibility(self):
operation_to_features_map = self.manifest.get_all_mandatory_features()
my_features = Manifest.SUPPORTED_REPO_FEATURES
repo_features = set()
for operation, features in operation_to_features_map.items():
repo_features.update(features)
self.cache_config.ignored_features.update(repo_features - my_features)
self.cache_config.mandatory_features.update(repo_features & my_features)
class AdHocCache(CacheStatsMixin): class AdHocCache(CacheStatsMixin, ChunksMixin):
""" """
Ad-hoc, non-persistent cache. Ad-hoc, non-persistent cache.
@ -1135,59 +1392,9 @@ Chunk index: {0.total_unique_chunks:20d} unknown"""
files_cache_logger.debug("UNKNOWN: files cache not implemented") files_cache_logger.debug("UNKNOWN: files cache not implemented")
return False, None return False, None
def memorize_file(self, hashed_path, path_hash, st, ids): def memorize_file(self, hashed_path, path_hash, st, chunks):
pass pass
def add_chunk(self, id, meta, data, *, stats, wait=True, compress=True, size=None, ro_type=ROBJ_FILE_STREAM):
assert ro_type is not None
if not self._txn_active:
self.begin_txn()
if size is None and compress:
size = len(data) # data is still uncompressed
if size is None:
raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
refcount = self.seen_chunk(id, size)
if refcount:
return self.chunk_incref(id, stats, size=size)
cdata = self.repo_objs.format(id, meta, data, compress=compress, ro_type=ro_type)
self.repository.put(id, cdata, wait=wait)
self.chunks.add(id, 1, size)
stats.update(size, not refcount)
return ChunkListEntry(id, size)
def seen_chunk(self, id, size=None):
if not self._txn_active:
self.begin_txn()
entry = self.chunks.get(id, ChunkIndexEntry(0, None))
if entry.refcount and size and not entry.size:
# The LocalCache has existing size information and uses *size* to make an effort at detecting collisions.
# This is of course not possible for the AdHocCache.
# Here *size* is used to update the chunk's size information, which will be zero for existing chunks.
self.chunks[id] = entry._replace(size=size)
return entry.refcount
def chunk_incref(self, id, stats, size=None):
if not self._txn_active:
self.begin_txn()
count, _size = self.chunks.incref(id)
# When _size is 0 and size is not given, then this chunk has not been locally visited yet (seen_chunk with
# size or add_chunk); we can't add references to those (size=0 is invalid) and generally don't try to.
size = _size or size
assert size
stats.update(size, False)
return ChunkListEntry(id, size)
def chunk_decref(self, id, stats, wait=True):
if not self._txn_active:
self.begin_txn()
count, size = self.chunks.decref(id)
if count == 0:
del self.chunks[id]
self.repository.delete(id, wait=wait)
stats.update(-size, True)
else:
stats.update(-size, False)
def commit(self): def commit(self):
if not self._txn_active: if not self._txn_active:
return return
@ -1200,41 +1407,4 @@ Chunk index: {0.total_unique_chunks:20d} unknown"""
def begin_txn(self): def begin_txn(self):
self._txn_active = True self._txn_active = True
# Explicitly set the initial usable hash table capacity to avoid performance issues self.chunks = self._load_chunks_from_repo()
# due to hash table "resonance".
# Since we're creating an archive, add 10 % from the start.
num_chunks = len(self.repository)
self.chunks = ChunkIndex(usable=num_chunks * 1.1)
pi = ProgressIndicatorPercent(
total=num_chunks, msg="Downloading chunk list... %3.0f%%", msgid="cache.download_chunks"
)
t0 = perf_counter()
num_requests = 0
marker = None
while True:
result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
num_requests += 1
if not result:
break
pi.show(increase=len(result))
marker = result[-1]
# All chunks from the repository have a refcount of MAX_VALUE, which is sticky,
# therefore we can't/won't delete them. Chunks we added ourselves in this transaction
# (e.g. checkpoint archives) are tracked correctly.
init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
for id_ in result:
self.chunks[id_] = init_entry
assert len(self.chunks) == num_chunks
# LocalCache does not contain the manifest, either.
del self.chunks[self.manifest.MANIFEST_ID]
duration = perf_counter() - t0 or 0.01
pi.finish()
logger.debug(
"AdHocCache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s",
num_chunks,
duration,
num_requests,
format_file_size(num_chunks * 34 / duration),
)
# Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes.
# Protocol overhead is neglected in this calculation.

View File

@ -1184,13 +1184,13 @@ class BorgJsonEncoder(json.JSONEncoder):
from ..repository import Repository from ..repository import Repository
from ..remote import RemoteRepository from ..remote import RemoteRepository
from ..archive import Archive from ..archive import Archive
from ..cache import LocalCache, AdHocCache from ..cache import LocalCache, AdHocCache, NewCache
if isinstance(o, Repository) or isinstance(o, RemoteRepository): if isinstance(o, Repository) or isinstance(o, RemoteRepository):
return {"id": bin_to_hex(o.id), "location": o._location.canonical_path()} return {"id": bin_to_hex(o.id), "location": o._location.canonical_path()}
if isinstance(o, Archive): if isinstance(o, Archive):
return o.info() return o.info()
if isinstance(o, LocalCache): if isinstance(o, (LocalCache, NewCache)):
return {"path": o.path, "stats": o.stats()} return {"path": o.path, "stats": o.stats()}
if isinstance(o, AdHocCache): if isinstance(o, AdHocCache):
return {"stats": o.stats()} return {"stats": o.stats()}

View File

@ -100,7 +100,7 @@ def format_timedelta(td):
s = ts % 60 s = ts % 60
m = int(ts / 60) % 60 m = int(ts / 60) % 60
h = int(ts / 3600) % 24 h = int(ts / 3600) % 24
txt = "%.2f seconds" % s txt = "%.3f seconds" % s
if m: if m:
txt = "%d minutes %s" % (m, txt) txt = "%d minutes %s" % (m, txt)
if h: if h:

View File

@ -61,8 +61,8 @@ def test_stats_format(stats):
Number of files: 1 Number of files: 1
Original size: 20 B Original size: 20 B
Deduplicated size: 20 B Deduplicated size: 20 B
Time spent in hashing: 0.00 seconds Time spent in hashing: 0.000 seconds
Time spent in chunking: 0.00 seconds Time spent in chunking: 0.000 seconds
Added files: 0 Added files: 0
Unchanged files: 0 Unchanged files: 0
Modified files: 0 Modified files: 0

View File

@ -338,10 +338,11 @@ def test_extra_chunks(archivers, request):
with Repository(archiver.repository_location, exclusive=True) as repository: with Repository(archiver.repository_location, exclusive=True) as repository:
repository.put(b"01234567890123456789012345678901", b"xxxx") repository.put(b"01234567890123456789012345678901", b"xxxx")
repository.commit(compact=False) repository.commit(compact=False)
cmd(archiver, "check", exit_code=1) output = cmd(archiver, "check", "-v", exit_code=0) # orphans are not considered warnings anymore
cmd(archiver, "check", exit_code=1) assert "1 orphaned (unused) objects found." in output
cmd(archiver, "check", "--repair", exit_code=0) cmd(archiver, "check", "--repair", exit_code=0)
cmd(archiver, "check", exit_code=0) output = cmd(archiver, "check", "-v", exit_code=0)
assert "orphaned (unused) objects found." not in output
cmd(archiver, "extract", "archive1", "--dry-run", exit_code=0) cmd(archiver, "extract", "archive1", "--dry-run", exit_code=0)

View File

@ -277,6 +277,7 @@ def test_unknown_mandatory_feature_in_cache(archivers, request):
repository._location = Location(archiver.repository_location) repository._location = Location(archiver.repository_location)
manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
with Cache(repository, manifest) as cache: with Cache(repository, manifest) as cache:
is_localcache = isinstance(cache, LocalCache)
cache.begin_txn() cache.begin_txn()
cache.cache_config.mandatory_features = {"unknown-feature"} cache.cache_config.mandatory_features = {"unknown-feature"}
cache.commit() cache.commit()
@ -295,7 +296,8 @@ def test_unknown_mandatory_feature_in_cache(archivers, request):
with patch.object(LocalCache, "wipe_cache", wipe_wrapper): with patch.object(LocalCache, "wipe_cache", wipe_wrapper):
cmd(archiver, "create", "test", "input") cmd(archiver, "create", "test", "input")
assert called if is_localcache:
assert called
with Repository(archiver.repository_path, exclusive=True) as repository: with Repository(archiver.repository_path, exclusive=True) as repository:
if remote_repo: if remote_repo:

View File

@ -32,7 +32,7 @@ def test_check_corrupted_repository(archiver):
def corrupt_archiver(archiver): def corrupt_archiver(archiver):
create_test_files(archiver.input_path) create_test_files(archiver.input_path)
cmd(archiver, "rcreate", RK_ENCRYPTION) cmd(archiver, "rcreate", RK_ENCRYPTION)
archiver.cache_path = json.loads(cmd(archiver, "rinfo", "--json"))["cache"]["path"] archiver.cache_path = json.loads(cmd(archiver, "rinfo", "--json"))["cache"].get("path")
def corrupt(file, amount=1): def corrupt(file, amount=1):
@ -45,7 +45,14 @@ def corrupt(file, amount=1):
def test_cache_chunks(archiver): def test_cache_chunks(archiver):
corrupt_archiver(archiver) corrupt_archiver(archiver)
corrupt(os.path.join(archiver.cache_path, "chunks")) if archiver.cache_path is None:
pytest.skip("no cache path for this kind of Cache implementation")
chunks_index = os.path.join(archiver.cache_path, "chunks")
if not os.path.exists(chunks_index):
pytest.skip("Only works with LocalCache.")
corrupt(chunks_index)
if archiver.FORK_DEFAULT: if archiver.FORK_DEFAULT:
out = cmd(archiver, "rinfo", exit_code=2) out = cmd(archiver, "rinfo", exit_code=2)
assert "failed integrity check" in out assert "failed integrity check" in out
@ -56,6 +63,9 @@ def test_cache_chunks(archiver):
def test_cache_files(archiver): def test_cache_files(archiver):
corrupt_archiver(archiver) corrupt_archiver(archiver)
if archiver.cache_path is None:
pytest.skip("no cache path for this kind of Cache implementation")
cmd(archiver, "create", "test", "input") cmd(archiver, "create", "test", "input")
corrupt(os.path.join(archiver.cache_path, "files")) corrupt(os.path.join(archiver.cache_path, "files"))
out = cmd(archiver, "create", "test1", "input") out = cmd(archiver, "create", "test1", "input")
@ -65,6 +75,9 @@ def test_cache_files(archiver):
def test_chunks_archive(archiver): def test_chunks_archive(archiver):
corrupt_archiver(archiver) corrupt_archiver(archiver)
if archiver.cache_path is None:
pytest.skip("no cache path for this kind of Cache implementation")
cmd(archiver, "create", "test1", "input") cmd(archiver, "create", "test1", "input")
# Find ID of test1, so we can corrupt it later :) # Find ID of test1, so we can corrupt it later :)
target_id = cmd(archiver, "rlist", "--format={id}{NL}").strip() target_id = cmd(archiver, "rlist", "--format={id}{NL}").strip()
@ -75,6 +88,8 @@ def test_chunks_archive(archiver):
cmd(archiver, "rinfo", "--json") cmd(archiver, "rinfo", "--json")
chunks_archive = os.path.join(archiver.cache_path, "chunks.archive.d") chunks_archive = os.path.join(archiver.cache_path, "chunks.archive.d")
if not os.path.exists(chunks_archive):
pytest.skip("Only LocalCache has a per-archive chunks index cache.")
assert len(os.listdir(chunks_archive)) == 4 # two archives, one chunks cache and one .integrity file each assert len(os.listdir(chunks_archive)) == 4 # two archives, one chunks cache and one .integrity file each
corrupt(os.path.join(chunks_archive, target_id + ".compact")) corrupt(os.path.join(chunks_archive, target_id + ".compact"))
@ -96,6 +111,9 @@ def test_chunks_archive(archiver):
def test_old_version_interfered(archiver): def test_old_version_interfered(archiver):
corrupt_archiver(archiver) corrupt_archiver(archiver)
if archiver.cache_path is None:
pytest.skip("no cache path for this kind of Cache implementation")
# Modify the main manifest ID without touching the manifest ID in the integrity section. # Modify the main manifest ID without touching the manifest ID in the integrity section.
# This happens if a version without integrity checking modifies the cache. # This happens if a version without integrity checking modifies the cache.
config_path = os.path.join(archiver.cache_path, "config") config_path = os.path.join(archiver.cache_path, "config")

View File

@ -168,7 +168,12 @@ def test_debug_refcount_obj(archivers, request):
create_json = json.loads(cmd(archiver, "create", "--json", "test", "input")) create_json = json.loads(cmd(archiver, "create", "--json", "test", "input"))
archive_id = create_json["archive"]["id"] archive_id = create_json["archive"]["id"]
output = cmd(archiver, "debug", "refcount-obj", archive_id).strip() output = cmd(archiver, "debug", "refcount-obj", archive_id).strip()
assert output == f"object {archive_id} has 1 referrers [info from chunks cache]." # LocalCache does precise refcounting, so we'll get 1 reference for the archive.
# AdHocCache or NewCache doesn't, we'll get ChunkIndex.MAX_VALUE as refcount.
assert (
output == f"object {archive_id} has 1 referrers [info from chunks cache]."
or output == f"object {archive_id} has 4294966271 referrers [info from chunks cache]."
)
# Invalid IDs do not abort or return an error # Invalid IDs do not abort or return an error
output = cmd(archiver, "debug", "refcount-obj", "124", "xyza").strip() output = cmd(archiver, "debug", "refcount-obj", "124", "xyza").strip()

View File

@ -189,7 +189,7 @@ class TestAdHocCache:
def test_does_not_delete_existing_chunks(self, repository, cache): def test_does_not_delete_existing_chunks(self, repository, cache):
assert cache.seen_chunk(H(1)) == ChunkIndex.MAX_VALUE assert cache.seen_chunk(H(1)) == ChunkIndex.MAX_VALUE
cache.chunk_decref(H(1), Statistics()) cache.chunk_decref(H(1), 1, Statistics())
assert repository.get(H(1)) == b"1234" assert repository.get(H(1)) == b"1234"
def test_seen_chunk_add_chunk_size(self, cache): def test_seen_chunk_add_chunk_size(self, cache):
@ -199,7 +199,7 @@ class TestAdHocCache:
"""E.g. checkpoint archives""" """E.g. checkpoint archives"""
cache.add_chunk(H(5), {}, b"1010", stats=Statistics()) cache.add_chunk(H(5), {}, b"1010", stats=Statistics())
assert cache.seen_chunk(H(5)) == 1 assert cache.seen_chunk(H(5)) == 1
cache.chunk_decref(H(5), Statistics()) cache.chunk_decref(H(5), 1, Statistics())
assert not cache.seen_chunk(H(5)) assert not cache.seen_chunk(H(5))
with pytest.raises(Repository.ObjectNotFound): with pytest.raises(Repository.ObjectNotFound):
repository.get(H(5)) repository.get(H(5))
@ -220,9 +220,9 @@ class TestAdHocCache:
def test_incref_after_add_chunk(self, cache): def test_incref_after_add_chunk(self, cache):
assert cache.add_chunk(H(3), {}, b"5678", stats=Statistics()) == (H(3), 4) assert cache.add_chunk(H(3), {}, b"5678", stats=Statistics()) == (H(3), 4)
assert cache.chunk_incref(H(3), Statistics()) == (H(3), 4) assert cache.chunk_incref(H(3), 4, Statistics()) == (H(3), 4)
def test_existing_incref_after_add_chunk(self, cache): def test_existing_incref_after_add_chunk(self, cache):
"""This case occurs with part files, see Archive.chunk_file.""" """This case occurs with part files, see Archive.chunk_file."""
assert cache.add_chunk(H(1), {}, b"5678", stats=Statistics()) == (H(1), 4) assert cache.add_chunk(H(1), {}, b"5678", stats=Statistics()) == (H(1), 4)
assert cache.chunk_incref(H(1), Statistics()) == (H(1), 4) assert cache.chunk_incref(H(1), 4, Statistics()) == (H(1), 4)

View File

@ -368,7 +368,7 @@ def test_text_invalid(text):
def test_format_timedelta(): def test_format_timedelta():
t0 = datetime(2001, 1, 1, 10, 20, 3, 0) t0 = datetime(2001, 1, 1, 10, 20, 3, 0)
t1 = datetime(2001, 1, 1, 12, 20, 4, 100000) t1 = datetime(2001, 1, 1, 12, 20, 4, 100000)
assert format_timedelta(t1 - t0) == "2 hours 1.10 seconds" assert format_timedelta(t1 - t0) == "2 hours 1.100 seconds"
@pytest.mark.parametrize( @pytest.mark.parametrize(

View File

@ -84,8 +84,8 @@ class UpgraderFrom12To20:
chunks, chunks_healthy = self.hlm.retrieve(id=hlid, default=(None, None)) chunks, chunks_healthy = self.hlm.retrieve(id=hlid, default=(None, None))
if chunks is not None: if chunks is not None:
item.chunks = chunks item.chunks = chunks
for chunk_id, _ in chunks: for chunk_id, chunk_size in chunks:
self.cache.chunk_incref(chunk_id, self.archive.stats) self.cache.chunk_incref(chunk_id, chunk_size, self.archive.stats)
if chunks_healthy is not None: if chunks_healthy is not None:
item.chunks_healthy = chunks item.chunks_healthy = chunks
del item.source # not used for hardlinks any more, replaced by hlid del item.source # not used for hardlinks any more, replaced by hlid