mirror of https://github.com/borgbackup/borg.git
1213 lines
53 KiB
Python
1213 lines
53 KiB
Python
import configparser
|
|
import os
|
|
import shutil
|
|
import stat
|
|
from binascii import unhexlify
|
|
from collections import namedtuple
|
|
from time import perf_counter
|
|
|
|
from .logger import create_logger
|
|
|
|
logger = create_logger()
|
|
|
|
files_cache_logger = create_logger("borg.debug.files_cache")
|
|
|
|
from .constants import CACHE_README, FILES_CACHE_MODE_DISABLED
|
|
from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer
|
|
from .helpers import Location
|
|
from .helpers import Error
|
|
from .helpers import get_cache_dir, get_security_dir
|
|
from .helpers import bin_to_hex, parse_stringified_list
|
|
from .helpers import format_file_size
|
|
from .helpers import safe_ns
|
|
from .helpers import yes
|
|
from .helpers import remove_surrogates
|
|
from .helpers import ProgressIndicatorPercent, ProgressIndicatorMessage
|
|
from .helpers import set_ec, EXIT_WARNING
|
|
from .helpers import safe_unlink
|
|
from .helpers import msgpack
|
|
from .helpers.msgpack import int_to_timestamp, timestamp_to_int
|
|
from .item import ArchiveItem, ChunkListEntry
|
|
from .crypto.key import PlaintextKey
|
|
from .crypto.file_integrity import IntegrityCheckedFile, DetachedIntegrityCheckedFile, FileIntegrityError
|
|
from .locking import Lock
|
|
from .manifest import Manifest
|
|
from .platform import SaveFile
|
|
from .remote import cache_if_remote
|
|
from .repository import LIST_SCAN_LIMIT
|
|
|
|
# note: cmtime might me either a ctime or a mtime timestamp
|
|
FileCacheEntry = namedtuple("FileCacheEntry", "age inode size cmtime chunk_ids")
|
|
|
|
|
|
class SecurityManager:
|
|
"""
|
|
Tracks repositories. Ensures that nothing bad happens (repository swaps,
|
|
replay attacks, unknown repositories etc.).
|
|
|
|
This is complicated by the Cache being initially used for this, while
|
|
only some commands actually use the Cache, which meant that other commands
|
|
did not perform these checks.
|
|
|
|
Further complications were created by the Cache being a cache, so it
|
|
could be legitimately deleted, which is annoying because Borg didn't
|
|
recognize repositories after that.
|
|
|
|
Therefore a second location, the security database (see get_security_dir),
|
|
was introduced which stores this information. However, this means that
|
|
the code has to deal with a cache existing but no security DB entry,
|
|
or inconsistencies between the security DB and the cache which have to
|
|
be reconciled, and also with no cache existing but a security DB entry.
|
|
"""
|
|
|
|
def __init__(self, repository):
|
|
self.repository = repository
|
|
self.dir = get_security_dir(repository.id_str, legacy=(repository.version == 1))
|
|
self.cache_dir = cache_dir(repository)
|
|
self.key_type_file = os.path.join(self.dir, "key-type")
|
|
self.location_file = os.path.join(self.dir, "location")
|
|
self.manifest_ts_file = os.path.join(self.dir, "manifest-timestamp")
|
|
|
|
@staticmethod
|
|
def destroy(repository, path=None):
|
|
"""destroy the security dir for ``repository`` or at ``path``"""
|
|
path = path or get_security_dir(repository.id_str, legacy=(repository.version == 1))
|
|
if os.path.exists(path):
|
|
shutil.rmtree(path)
|
|
|
|
def known(self):
|
|
return all(os.path.exists(f) for f in (self.key_type_file, self.location_file, self.manifest_ts_file))
|
|
|
|
def key_matches(self, key):
|
|
if not self.known():
|
|
return False
|
|
try:
|
|
with open(self.key_type_file) as fd:
|
|
type = fd.read()
|
|
return type == str(key.TYPE)
|
|
except OSError as exc:
|
|
logger.warning("Could not read/parse key type file: %s", exc)
|
|
|
|
def save(self, manifest, key):
|
|
logger.debug("security: saving state for %s to %s", self.repository.id_str, self.dir)
|
|
current_location = self.repository._location.canonical_path()
|
|
logger.debug("security: current location %s", current_location)
|
|
logger.debug("security: key type %s", str(key.TYPE))
|
|
logger.debug("security: manifest timestamp %s", manifest.timestamp)
|
|
with SaveFile(self.location_file) as fd:
|
|
fd.write(current_location)
|
|
with SaveFile(self.key_type_file) as fd:
|
|
fd.write(str(key.TYPE))
|
|
with SaveFile(self.manifest_ts_file) as fd:
|
|
fd.write(manifest.timestamp)
|
|
|
|
def assert_location_matches(self, cache_config=None):
|
|
# Warn user before sending data to a relocated repository
|
|
try:
|
|
with open(self.location_file) as fd:
|
|
previous_location = fd.read()
|
|
logger.debug("security: read previous location %r", previous_location)
|
|
except FileNotFoundError:
|
|
logger.debug("security: previous location file %s not found", self.location_file)
|
|
previous_location = None
|
|
except OSError as exc:
|
|
logger.warning("Could not read previous location file: %s", exc)
|
|
previous_location = None
|
|
if cache_config and cache_config.previous_location and previous_location != cache_config.previous_location:
|
|
# Reconcile cache and security dir; we take the cache location.
|
|
previous_location = cache_config.previous_location
|
|
logger.debug("security: using previous_location of cache: %r", previous_location)
|
|
|
|
repository_location = self.repository._location.canonical_path()
|
|
if previous_location and previous_location != repository_location:
|
|
msg = (
|
|
"Warning: The repository at location {} was previously located at {}\n".format(
|
|
repository_location, previous_location
|
|
)
|
|
+ "Do you want to continue? [yN] "
|
|
)
|
|
if not yes(
|
|
msg,
|
|
false_msg="Aborting.",
|
|
invalid_msg="Invalid answer, aborting.",
|
|
retry=False,
|
|
env_var_override="BORG_RELOCATED_REPO_ACCESS_IS_OK",
|
|
):
|
|
raise Cache.RepositoryAccessAborted()
|
|
# adapt on-disk config immediately if the new location was accepted
|
|
logger.debug("security: updating location stored in cache and security dir")
|
|
with SaveFile(self.location_file) as fd:
|
|
fd.write(repository_location)
|
|
if cache_config:
|
|
cache_config.save()
|
|
|
|
def assert_no_manifest_replay(self, manifest, key, cache_config=None):
|
|
try:
|
|
with open(self.manifest_ts_file) as fd:
|
|
timestamp = fd.read()
|
|
logger.debug("security: read manifest timestamp %r", timestamp)
|
|
except FileNotFoundError:
|
|
logger.debug("security: manifest timestamp file %s not found", self.manifest_ts_file)
|
|
timestamp = ""
|
|
except OSError as exc:
|
|
logger.warning("Could not read previous location file: %s", exc)
|
|
timestamp = ""
|
|
if cache_config:
|
|
timestamp = max(timestamp, cache_config.timestamp or "")
|
|
logger.debug("security: determined newest manifest timestamp as %s", timestamp)
|
|
# If repository is older than the cache or security dir something fishy is going on
|
|
if timestamp and timestamp > manifest.timestamp:
|
|
if isinstance(key, PlaintextKey):
|
|
raise Cache.RepositoryIDNotUnique()
|
|
else:
|
|
raise Cache.RepositoryReplay()
|
|
|
|
def assert_key_type(self, key, cache_config=None):
|
|
# Make sure an encrypted repository has not been swapped for an unencrypted repository
|
|
if cache_config and cache_config.key_type is not None and cache_config.key_type != str(key.TYPE):
|
|
raise Cache.EncryptionMethodMismatch()
|
|
if self.known() and not self.key_matches(key):
|
|
raise Cache.EncryptionMethodMismatch()
|
|
|
|
def assert_secure(self, manifest, key, *, cache_config=None, warn_if_unencrypted=True, lock_wait=None):
|
|
# warn_if_unencrypted=False is only used for initializing a new repository.
|
|
# Thus, avoiding asking about a repository that's currently initializing.
|
|
self.assert_access_unknown(warn_if_unencrypted, manifest, key)
|
|
if cache_config:
|
|
self._assert_secure(manifest, key, cache_config)
|
|
else:
|
|
cache_config = CacheConfig(self.repository, lock_wait=lock_wait)
|
|
if cache_config.exists():
|
|
with cache_config:
|
|
self._assert_secure(manifest, key, cache_config)
|
|
else:
|
|
self._assert_secure(manifest, key)
|
|
logger.debug("security: repository checks ok, allowing access")
|
|
|
|
def _assert_secure(self, manifest, key, cache_config=None):
|
|
self.assert_location_matches(cache_config)
|
|
self.assert_key_type(key, cache_config)
|
|
self.assert_no_manifest_replay(manifest, key, cache_config)
|
|
if not self.known():
|
|
logger.debug("security: remembering previously unknown repository")
|
|
self.save(manifest, key)
|
|
|
|
def assert_access_unknown(self, warn_if_unencrypted, manifest, key):
|
|
# warn_if_unencrypted=False is only used for initializing a new repository.
|
|
# Thus, avoiding asking about a repository that's currently initializing.
|
|
if not key.logically_encrypted and not self.known():
|
|
msg = (
|
|
"Warning: Attempting to access a previously unknown unencrypted repository!\n"
|
|
+ "Do you want to continue? [yN] "
|
|
)
|
|
allow_access = not warn_if_unencrypted or yes(
|
|
msg,
|
|
false_msg="Aborting.",
|
|
invalid_msg="Invalid answer, aborting.",
|
|
retry=False,
|
|
env_var_override="BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK",
|
|
)
|
|
if allow_access:
|
|
if warn_if_unencrypted:
|
|
logger.debug("security: remembering unknown unencrypted repository (explicitly allowed)")
|
|
else:
|
|
logger.debug("security: initializing unencrypted repository")
|
|
self.save(manifest, key)
|
|
else:
|
|
raise Cache.CacheInitAbortedError()
|
|
|
|
|
|
def assert_secure(repository, manifest, lock_wait):
|
|
sm = SecurityManager(repository)
|
|
sm.assert_secure(manifest, manifest.key, lock_wait=lock_wait)
|
|
|
|
|
|
def recanonicalize_relative_location(cache_location, repository):
|
|
# borg < 1.0.8rc1 had different canonicalization for the repo location (see #1655 and #1741).
|
|
repo_location = repository._location.canonical_path()
|
|
rl = Location(repo_location)
|
|
cl = Location(cache_location)
|
|
if (
|
|
cl.proto == rl.proto
|
|
and cl.user == rl.user
|
|
and cl.host == rl.host
|
|
and cl.port == rl.port
|
|
and cl.path
|
|
and rl.path
|
|
and cl.path.startswith("/~/")
|
|
and rl.path.startswith("/./")
|
|
and cl.path[3:] == rl.path[3:]
|
|
):
|
|
# everything is same except the expected change in relative path canonicalization,
|
|
# update previous_location to avoid warning / user query about changed location:
|
|
return repo_location
|
|
else:
|
|
return cache_location
|
|
|
|
|
|
def cache_dir(repository, path=None):
|
|
return path or os.path.join(get_cache_dir(), repository.id_str)
|
|
|
|
|
|
def files_cache_name():
|
|
suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "")
|
|
return "files." + suffix if suffix else "files"
|
|
|
|
|
|
def discover_files_cache_name(path):
|
|
return [fn for fn in os.listdir(path) if fn == "files" or fn.startswith("files.")][0]
|
|
|
|
|
|
class CacheConfig:
|
|
def __init__(self, repository, path=None, lock_wait=None):
|
|
self.repository = repository
|
|
self.path = cache_dir(repository, path)
|
|
logger.debug("Using %s as cache", self.path)
|
|
self.config_path = os.path.join(self.path, "config")
|
|
self.lock = None
|
|
self.lock_wait = lock_wait
|
|
|
|
def __enter__(self):
|
|
self.open()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
|
|
def exists(self):
|
|
return os.path.exists(self.config_path)
|
|
|
|
def create(self):
|
|
assert not self.exists()
|
|
config = configparser.ConfigParser(interpolation=None)
|
|
config.add_section("cache")
|
|
config.set("cache", "version", "1")
|
|
config.set("cache", "repository", self.repository.id_str)
|
|
config.set("cache", "manifest", "")
|
|
config.add_section("integrity")
|
|
config.set("integrity", "manifest", "")
|
|
with SaveFile(self.config_path) as fd:
|
|
config.write(fd)
|
|
|
|
def open(self):
|
|
self.lock = Lock(os.path.join(self.path, "lock"), exclusive=True, timeout=self.lock_wait).acquire()
|
|
self.load()
|
|
|
|
def load(self):
|
|
self._config = configparser.ConfigParser(interpolation=None)
|
|
with open(self.config_path) as fd:
|
|
self._config.read_file(fd)
|
|
self._check_upgrade(self.config_path)
|
|
self.id = self._config.get("cache", "repository")
|
|
self.manifest_id = unhexlify(self._config.get("cache", "manifest"))
|
|
self.timestamp = self._config.get("cache", "timestamp", fallback=None)
|
|
self.key_type = self._config.get("cache", "key_type", fallback=None)
|
|
self.ignored_features = set(parse_stringified_list(self._config.get("cache", "ignored_features", fallback="")))
|
|
self.mandatory_features = set(
|
|
parse_stringified_list(self._config.get("cache", "mandatory_features", fallback=""))
|
|
)
|
|
try:
|
|
self.integrity = dict(self._config.items("integrity"))
|
|
if self._config.get("cache", "manifest") != self.integrity.pop("manifest"):
|
|
# The cache config file is updated (parsed with ConfigParser, the state of the ConfigParser
|
|
# is modified and then written out.), not re-created.
|
|
# Thus, older versions will leave our [integrity] section alone, making the section's data invalid.
|
|
# Therefore, we also add the manifest ID to this section and
|
|
# can discern whether an older version interfered by comparing the manifest IDs of this section
|
|
# and the main [cache] section.
|
|
self.integrity = {}
|
|
logger.warning("Cache integrity data not available: old Borg version modified the cache.")
|
|
except configparser.NoSectionError:
|
|
logger.debug("Cache integrity: No integrity data found (files, chunks). Cache is from old version.")
|
|
self.integrity = {}
|
|
previous_location = self._config.get("cache", "previous_location", fallback=None)
|
|
if previous_location:
|
|
self.previous_location = recanonicalize_relative_location(previous_location, self.repository)
|
|
else:
|
|
self.previous_location = None
|
|
self._config.set("cache", "previous_location", self.repository._location.canonical_path())
|
|
|
|
def save(self, manifest=None, key=None):
|
|
if manifest:
|
|
self._config.set("cache", "manifest", manifest.id_str)
|
|
self._config.set("cache", "timestamp", manifest.timestamp)
|
|
self._config.set("cache", "ignored_features", ",".join(self.ignored_features))
|
|
self._config.set("cache", "mandatory_features", ",".join(self.mandatory_features))
|
|
if not self._config.has_section("integrity"):
|
|
self._config.add_section("integrity")
|
|
for file, integrity_data in self.integrity.items():
|
|
self._config.set("integrity", file, integrity_data)
|
|
self._config.set("integrity", "manifest", manifest.id_str)
|
|
if key:
|
|
self._config.set("cache", "key_type", str(key.TYPE))
|
|
with SaveFile(self.config_path) as fd:
|
|
self._config.write(fd)
|
|
|
|
def close(self):
|
|
if self.lock is not None:
|
|
self.lock.release()
|
|
self.lock = None
|
|
|
|
def _check_upgrade(self, config_path):
|
|
try:
|
|
cache_version = self._config.getint("cache", "version")
|
|
wanted_version = 1
|
|
if cache_version != wanted_version:
|
|
self.close()
|
|
raise Exception(
|
|
"%s has unexpected cache version %d (wanted: %d)." % (config_path, cache_version, wanted_version)
|
|
)
|
|
except configparser.NoSectionError:
|
|
self.close()
|
|
raise Exception("%s does not look like a Borg cache." % config_path) from None
|
|
|
|
|
|
class Cache:
|
|
"""Client Side cache"""
|
|
|
|
class RepositoryIDNotUnique(Error):
|
|
"""Cache is newer than repository - do you have multiple, independently updated repos with same ID?"""
|
|
|
|
class RepositoryReplay(Error):
|
|
"""Cache, or information obtained from the security directory is newer than repository - this is either an attack or unsafe (multiple repos with same ID)"""
|
|
|
|
class CacheInitAbortedError(Error):
|
|
"""Cache initialization aborted"""
|
|
|
|
class RepositoryAccessAborted(Error):
|
|
"""Repository access aborted"""
|
|
|
|
class EncryptionMethodMismatch(Error):
|
|
"""Repository encryption method changed since last access, refusing to continue"""
|
|
|
|
@staticmethod
|
|
def break_lock(repository, path=None):
|
|
path = cache_dir(repository, path)
|
|
Lock(os.path.join(path, "lock"), exclusive=True).break_lock()
|
|
|
|
@staticmethod
|
|
def destroy(repository, path=None):
|
|
"""destroy the cache for ``repository`` or at ``path``"""
|
|
path = path or os.path.join(get_cache_dir(), repository.id_str)
|
|
config = os.path.join(path, "config")
|
|
if os.path.exists(config):
|
|
os.remove(config) # kill config first
|
|
shutil.rmtree(path)
|
|
|
|
def __new__(
|
|
cls,
|
|
repository,
|
|
manifest,
|
|
path=None,
|
|
sync=True,
|
|
warn_if_unencrypted=True,
|
|
progress=False,
|
|
lock_wait=None,
|
|
permit_adhoc_cache=False,
|
|
cache_mode=FILES_CACHE_MODE_DISABLED,
|
|
iec=False,
|
|
):
|
|
def local():
|
|
return LocalCache(
|
|
manifest=manifest,
|
|
path=path,
|
|
sync=sync,
|
|
warn_if_unencrypted=warn_if_unencrypted,
|
|
progress=progress,
|
|
iec=iec,
|
|
lock_wait=lock_wait,
|
|
cache_mode=cache_mode,
|
|
)
|
|
|
|
def adhoc():
|
|
return AdHocCache(manifest=manifest, lock_wait=lock_wait, iec=iec)
|
|
|
|
if not permit_adhoc_cache:
|
|
return local()
|
|
|
|
# ad-hoc cache may be permitted, but if the local cache is in sync it'd be stupid to invalidate
|
|
# it by needlessly using the ad-hoc cache.
|
|
# Check if the local cache exists and is in sync.
|
|
|
|
cache_config = CacheConfig(repository, path, lock_wait)
|
|
if cache_config.exists():
|
|
with cache_config:
|
|
cache_in_sync = cache_config.manifest_id == manifest.id
|
|
# Don't nest cache locks
|
|
if cache_in_sync:
|
|
# Local cache is in sync, use it
|
|
logger.debug("Cache: choosing local cache (in sync)")
|
|
return local()
|
|
logger.debug("Cache: choosing ad-hoc cache (local cache does not exist or is not in sync)")
|
|
return adhoc()
|
|
|
|
|
|
class CacheStatsMixin:
|
|
str_format = """\
|
|
Original size: {0.total_size}
|
|
Deduplicated size: {0.unique_size}
|
|
Unique chunks: {0.total_unique_chunks}
|
|
Total chunks: {0.total_chunks}
|
|
"""
|
|
|
|
def __init__(self, iec=False):
|
|
self.iec = iec
|
|
|
|
def __str__(self):
|
|
return self.str_format.format(self.format_tuple())
|
|
|
|
Summary = namedtuple("Summary", ["total_size", "unique_size", "total_unique_chunks", "total_chunks"])
|
|
|
|
def stats(self):
|
|
from .archive import Archive
|
|
|
|
# XXX: this should really be moved down to `hashindex.pyx`
|
|
total_size, unique_size, total_unique_chunks, total_chunks = self.chunks.summarize()
|
|
# since borg 1.2 we have new archive metadata telling the total size per archive,
|
|
# so we can just sum up all archives to get the "all archives" stats:
|
|
total_size = 0
|
|
for archive_name in self.manifest.archives:
|
|
archive = Archive(self.manifest, archive_name)
|
|
stats = archive.calc_stats(self, want_unique=False)
|
|
total_size += stats.osize
|
|
stats = self.Summary(total_size, unique_size, total_unique_chunks, total_chunks)._asdict()
|
|
return stats
|
|
|
|
def format_tuple(self):
|
|
stats = self.stats()
|
|
for field in ["total_size", "unique_size"]:
|
|
stats[field] = format_file_size(stats[field], iec=self.iec)
|
|
return self.Summary(**stats)
|
|
|
|
|
|
class LocalCache(CacheStatsMixin):
|
|
"""
|
|
Persistent, local (client-side) cache.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
manifest,
|
|
path=None,
|
|
sync=True,
|
|
warn_if_unencrypted=True,
|
|
progress=False,
|
|
lock_wait=None,
|
|
cache_mode=FILES_CACHE_MODE_DISABLED,
|
|
iec=False,
|
|
):
|
|
"""
|
|
:param warn_if_unencrypted: print warning if accessing unknown unencrypted repository
|
|
:param lock_wait: timeout for lock acquisition (int [s] or None [wait forever])
|
|
:param sync: do :meth:`.sync`
|
|
:param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison
|
|
"""
|
|
CacheStatsMixin.__init__(self, iec=iec)
|
|
assert isinstance(manifest, Manifest)
|
|
self.manifest = manifest
|
|
self.repository = manifest.repository
|
|
self.key = manifest.key
|
|
self.repo_objs = manifest.repo_objs
|
|
self.progress = progress
|
|
self.cache_mode = cache_mode
|
|
self.timestamp = None
|
|
self.txn_active = False
|
|
|
|
self.path = cache_dir(self.repository, path)
|
|
self.security_manager = SecurityManager(self.repository)
|
|
self.cache_config = CacheConfig(self.repository, self.path, lock_wait)
|
|
|
|
# Warn user before sending data to a never seen before unencrypted repository
|
|
if not os.path.exists(self.path):
|
|
self.security_manager.assert_access_unknown(warn_if_unencrypted, manifest, self.key)
|
|
self.create()
|
|
|
|
self.open()
|
|
try:
|
|
self.security_manager.assert_secure(manifest, self.key, cache_config=self.cache_config)
|
|
|
|
if not self.check_cache_compatibility():
|
|
self.wipe_cache()
|
|
|
|
self.update_compatibility()
|
|
|
|
if sync and self.manifest.id != self.cache_config.manifest_id:
|
|
self.sync()
|
|
self.commit()
|
|
except:
|
|
self.close()
|
|
raise
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
|
|
def create(self):
|
|
"""Create a new empty cache at `self.path`"""
|
|
os.makedirs(self.path)
|
|
with open(os.path.join(self.path, "README"), "w") as fd:
|
|
fd.write(CACHE_README)
|
|
self.cache_config.create()
|
|
ChunkIndex().write(os.path.join(self.path, "chunks"))
|
|
os.makedirs(os.path.join(self.path, "chunks.archive.d"))
|
|
with SaveFile(os.path.join(self.path, files_cache_name()), binary=True):
|
|
pass # empty file
|
|
|
|
def _do_open(self):
|
|
self.cache_config.load()
|
|
with IntegrityCheckedFile(
|
|
path=os.path.join(self.path, "chunks"),
|
|
write=False,
|
|
integrity_data=self.cache_config.integrity.get("chunks"),
|
|
) as fd:
|
|
self.chunks = ChunkIndex.read(fd)
|
|
if "d" in self.cache_mode: # d(isabled)
|
|
self.files = None
|
|
else:
|
|
self._read_files()
|
|
|
|
def open(self):
|
|
if not os.path.isdir(self.path):
|
|
raise Exception("%s Does not look like a Borg cache" % self.path)
|
|
self.cache_config.open()
|
|
self.rollback()
|
|
|
|
def close(self):
|
|
if self.cache_config is not None:
|
|
self.cache_config.close()
|
|
self.cache_config = None
|
|
|
|
def _read_files(self):
|
|
self.files = {}
|
|
self._newest_cmtime = None
|
|
logger.debug("Reading files cache ...")
|
|
files_cache_logger.debug("FILES-CACHE-LOAD: starting...")
|
|
msg = None
|
|
try:
|
|
with IntegrityCheckedFile(
|
|
path=os.path.join(self.path, files_cache_name()),
|
|
write=False,
|
|
integrity_data=self.cache_config.integrity.get(files_cache_name()),
|
|
) as fd:
|
|
u = msgpack.Unpacker(use_list=True)
|
|
while True:
|
|
data = fd.read(64 * 1024)
|
|
if not data:
|
|
break
|
|
u.feed(data)
|
|
try:
|
|
for path_hash, item in u:
|
|
entry = FileCacheEntry(*item)
|
|
# in the end, this takes about 240 Bytes per file
|
|
self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1))
|
|
except (TypeError, ValueError) as exc:
|
|
msg = "The files cache seems invalid. [%s]" % str(exc)
|
|
break
|
|
except OSError as exc:
|
|
msg = "The files cache can't be read. [%s]" % str(exc)
|
|
except FileIntegrityError as fie:
|
|
msg = "The files cache is corrupted. [%s]" % str(fie)
|
|
if msg is not None:
|
|
logger.warning(msg)
|
|
logger.warning("Continuing without files cache - expect lower performance.")
|
|
self.files = {}
|
|
files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files))
|
|
|
|
def begin_txn(self):
|
|
# Initialize transaction snapshot
|
|
pi = ProgressIndicatorMessage(msgid="cache.begin_transaction")
|
|
txn_dir = os.path.join(self.path, "txn.tmp")
|
|
os.mkdir(txn_dir)
|
|
pi.output("Initializing cache transaction: Reading config")
|
|
shutil.copy(os.path.join(self.path, "config"), txn_dir)
|
|
pi.output("Initializing cache transaction: Reading chunks")
|
|
shutil.copy(os.path.join(self.path, "chunks"), txn_dir)
|
|
pi.output("Initializing cache transaction: Reading files")
|
|
try:
|
|
shutil.copy(os.path.join(self.path, files_cache_name()), txn_dir)
|
|
except FileNotFoundError:
|
|
with SaveFile(os.path.join(txn_dir, files_cache_name()), binary=True):
|
|
pass # empty file
|
|
os.replace(txn_dir, os.path.join(self.path, "txn.active"))
|
|
self.txn_active = True
|
|
pi.finish()
|
|
|
|
def commit(self):
|
|
"""Commit transaction"""
|
|
if not self.txn_active:
|
|
return
|
|
self.security_manager.save(self.manifest, self.key)
|
|
pi = ProgressIndicatorMessage(msgid="cache.commit")
|
|
if self.files is not None:
|
|
if self._newest_cmtime is None:
|
|
# was never set because no files were modified/added
|
|
self._newest_cmtime = 2**63 - 1 # nanoseconds, good until y2262
|
|
ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20))
|
|
pi.output("Saving files cache")
|
|
files_cache_logger.debug("FILES-CACHE-SAVE: starting...")
|
|
with IntegrityCheckedFile(path=os.path.join(self.path, files_cache_name()), write=True) as fd:
|
|
entry_count = 0
|
|
for path_hash, item in self.files.items():
|
|
# Only keep files seen in this backup that are older than newest cmtime seen in this backup -
|
|
# this is to avoid issues with filesystem snapshots and cmtime granularity.
|
|
# Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
|
|
entry = FileCacheEntry(*msgpack.unpackb(item))
|
|
if (
|
|
entry.age == 0
|
|
and timestamp_to_int(entry.cmtime) < self._newest_cmtime
|
|
or entry.age > 0
|
|
and entry.age < ttl
|
|
):
|
|
msgpack.pack((path_hash, entry), fd)
|
|
entry_count += 1
|
|
files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl)
|
|
files_cache_logger.debug(
|
|
"FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime
|
|
)
|
|
files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count)
|
|
self.cache_config.integrity[files_cache_name()] = fd.integrity_data
|
|
pi.output("Saving chunks cache")
|
|
with IntegrityCheckedFile(path=os.path.join(self.path, "chunks"), write=True) as fd:
|
|
self.chunks.write(fd)
|
|
self.cache_config.integrity["chunks"] = fd.integrity_data
|
|
pi.output("Saving cache config")
|
|
self.cache_config.save(self.manifest, self.key)
|
|
os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp"))
|
|
shutil.rmtree(os.path.join(self.path, "txn.tmp"))
|
|
self.txn_active = False
|
|
pi.finish()
|
|
|
|
def rollback(self):
|
|
"""Roll back partial and aborted transactions"""
|
|
# Remove partial transaction
|
|
if os.path.exists(os.path.join(self.path, "txn.tmp")):
|
|
shutil.rmtree(os.path.join(self.path, "txn.tmp"))
|
|
# Roll back active transaction
|
|
txn_dir = os.path.join(self.path, "txn.active")
|
|
if os.path.exists(txn_dir):
|
|
shutil.copy(os.path.join(txn_dir, "config"), self.path)
|
|
shutil.copy(os.path.join(txn_dir, "chunks"), self.path)
|
|
shutil.copy(os.path.join(txn_dir, discover_files_cache_name(txn_dir)), self.path)
|
|
txn_tmp = os.path.join(self.path, "txn.tmp")
|
|
os.replace(txn_dir, txn_tmp)
|
|
if os.path.exists(txn_tmp):
|
|
shutil.rmtree(txn_tmp)
|
|
self.txn_active = False
|
|
self._do_open()
|
|
|
|
def sync(self):
|
|
"""Re-synchronize chunks cache with repository.
|
|
|
|
Maintains a directory with known backup archive indexes, so it only
|
|
needs to fetch infos from repo and build a chunk index once per backup
|
|
archive.
|
|
If out of sync, missing archive indexes get added, outdated indexes
|
|
get removed and a new master chunks index is built by merging all
|
|
archive indexes.
|
|
"""
|
|
archive_path = os.path.join(self.path, "chunks.archive.d")
|
|
# Instrumentation
|
|
processed_item_metadata_bytes = 0
|
|
processed_item_metadata_chunks = 0
|
|
compact_chunks_archive_saved_space = 0
|
|
|
|
def mkpath(id, suffix=""):
|
|
id_hex = bin_to_hex(id)
|
|
path = os.path.join(archive_path, id_hex + suffix)
|
|
return path
|
|
|
|
def cached_archives():
|
|
if self.do_cache:
|
|
fns = os.listdir(archive_path)
|
|
# filenames with 64 hex digits == 256bit,
|
|
# or compact indices which are 64 hex digits + ".compact"
|
|
return {unhexlify(fn) for fn in fns if len(fn) == 64} | {
|
|
unhexlify(fn[:64]) for fn in fns if len(fn) == 72 and fn.endswith(".compact")
|
|
}
|
|
else:
|
|
return set()
|
|
|
|
def repo_archives():
|
|
return {info.id for info in self.manifest.archives.list()}
|
|
|
|
def cleanup_outdated(ids):
|
|
for id in ids:
|
|
cleanup_cached_archive(id)
|
|
|
|
def cleanup_cached_archive(id, cleanup_compact=True):
|
|
try:
|
|
os.unlink(mkpath(id))
|
|
os.unlink(mkpath(id) + ".integrity")
|
|
except FileNotFoundError:
|
|
pass
|
|
if not cleanup_compact:
|
|
return
|
|
try:
|
|
os.unlink(mkpath(id, suffix=".compact"))
|
|
os.unlink(mkpath(id, suffix=".compact") + ".integrity")
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
def fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx):
|
|
nonlocal processed_item_metadata_bytes
|
|
nonlocal processed_item_metadata_chunks
|
|
csize, data = decrypted_repository.get(archive_id)
|
|
chunk_idx.add(archive_id, 1, len(data))
|
|
archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
|
|
if archive.version not in (1, 2): # legacy
|
|
raise Exception("Unknown archive metadata version")
|
|
if archive.version == 1:
|
|
items = archive.items
|
|
elif archive.version == 2:
|
|
items = []
|
|
for chunk_id, (csize, data) in zip(archive.item_ptrs, decrypted_repository.get_many(archive.item_ptrs)):
|
|
chunk_idx.add(chunk_id, 1, len(data))
|
|
ids = msgpack.unpackb(data)
|
|
items.extend(ids)
|
|
sync = CacheSynchronizer(chunk_idx)
|
|
for item_id, (csize, data) in zip(items, decrypted_repository.get_many(items)):
|
|
chunk_idx.add(item_id, 1, len(data))
|
|
processed_item_metadata_bytes += len(data)
|
|
processed_item_metadata_chunks += 1
|
|
sync.feed(data)
|
|
if self.do_cache:
|
|
write_archive_index(archive_id, chunk_idx)
|
|
|
|
def write_archive_index(archive_id, chunk_idx):
|
|
nonlocal compact_chunks_archive_saved_space
|
|
compact_chunks_archive_saved_space += chunk_idx.compact()
|
|
fn = mkpath(archive_id, suffix=".compact")
|
|
fn_tmp = mkpath(archive_id, suffix=".tmp")
|
|
try:
|
|
with DetachedIntegrityCheckedFile(
|
|
path=fn_tmp, write=True, filename=bin_to_hex(archive_id) + ".compact"
|
|
) as fd:
|
|
chunk_idx.write(fd)
|
|
except Exception:
|
|
safe_unlink(fn_tmp)
|
|
else:
|
|
os.replace(fn_tmp, fn)
|
|
|
|
def read_archive_index(archive_id, archive_name):
|
|
archive_chunk_idx_path = mkpath(archive_id)
|
|
logger.info("Reading cached archive chunk index for %s", archive_name)
|
|
try:
|
|
try:
|
|
# Attempt to load compact index first
|
|
with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path + ".compact", write=False) as fd:
|
|
archive_chunk_idx = ChunkIndex.read(fd, permit_compact=True)
|
|
# In case a non-compact index exists, delete it.
|
|
cleanup_cached_archive(archive_id, cleanup_compact=False)
|
|
# Compact index read - return index, no conversion necessary (below).
|
|
return archive_chunk_idx
|
|
except FileNotFoundError:
|
|
# No compact index found, load non-compact index, and convert below.
|
|
with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path, write=False) as fd:
|
|
archive_chunk_idx = ChunkIndex.read(fd)
|
|
except FileIntegrityError as fie:
|
|
logger.error("Cached archive chunk index of %s is corrupted: %s", archive_name, fie)
|
|
# Delete corrupted index, set warning. A new index must be build.
|
|
cleanup_cached_archive(archive_id)
|
|
set_ec(EXIT_WARNING)
|
|
return None
|
|
|
|
# Convert to compact index. Delete the existing index first.
|
|
logger.debug("Found non-compact index for %s, converting to compact.", archive_name)
|
|
cleanup_cached_archive(archive_id)
|
|
write_archive_index(archive_id, archive_chunk_idx)
|
|
return archive_chunk_idx
|
|
|
|
def get_archive_ids_to_names(archive_ids):
|
|
# Pass once over all archives and build a mapping from ids to names.
|
|
# The easier approach, doing a similar loop for each archive, has
|
|
# square complexity and does about a dozen million functions calls
|
|
# with 1100 archives (which takes 30s CPU seconds _alone_).
|
|
archive_names = {}
|
|
for info in self.manifest.archives.list():
|
|
if info.id in archive_ids:
|
|
archive_names[info.id] = info.name
|
|
assert len(archive_names) == len(archive_ids)
|
|
return archive_names
|
|
|
|
def create_master_idx(chunk_idx):
|
|
logger.debug("Synchronizing chunks index...")
|
|
cached_ids = cached_archives()
|
|
archive_ids = repo_archives()
|
|
logger.info(
|
|
"Cached archive chunk indexes: %d fresh, %d stale, %d need fetching.",
|
|
len(archive_ids & cached_ids),
|
|
len(cached_ids - archive_ids),
|
|
len(archive_ids - cached_ids),
|
|
)
|
|
# deallocates old hashindex, creates empty hashindex:
|
|
chunk_idx.clear()
|
|
cleanup_outdated(cached_ids - archive_ids)
|
|
# Explicitly set the usable initial hash table capacity to avoid performance issues
|
|
# due to hash table "resonance".
|
|
master_index_capacity = len(self.repository)
|
|
if archive_ids:
|
|
chunk_idx = None if not self.do_cache else ChunkIndex(usable=master_index_capacity)
|
|
pi = ProgressIndicatorPercent(
|
|
total=len(archive_ids),
|
|
step=0.1,
|
|
msg="%3.0f%% Syncing chunks index. Processing archive %s.",
|
|
msgid="cache.sync",
|
|
)
|
|
archive_ids_to_names = get_archive_ids_to_names(archive_ids)
|
|
for archive_id, archive_name in archive_ids_to_names.items():
|
|
pi.show(info=[remove_surrogates(archive_name)]) # legacy. borg2 always has pure unicode arch names.
|
|
if self.do_cache:
|
|
if archive_id in cached_ids:
|
|
archive_chunk_idx = read_archive_index(archive_id, archive_name)
|
|
if archive_chunk_idx is None:
|
|
cached_ids.remove(archive_id)
|
|
if archive_id not in cached_ids:
|
|
# Do not make this an else branch; the FileIntegrityError exception handler
|
|
# above can remove *archive_id* from *cached_ids*.
|
|
logger.info("Fetching and building archive index for %s.", archive_name)
|
|
archive_chunk_idx = ChunkIndex()
|
|
fetch_and_build_idx(archive_id, decrypted_repository, archive_chunk_idx)
|
|
logger.debug("Merging into master chunks index.")
|
|
chunk_idx.merge(archive_chunk_idx)
|
|
else:
|
|
chunk_idx = chunk_idx or ChunkIndex(usable=master_index_capacity)
|
|
logger.info("Fetching archive index for %s.", archive_name)
|
|
fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx)
|
|
pi.finish()
|
|
logger.debug(
|
|
"Chunks index sync: processed %s (%d chunks) of metadata.",
|
|
format_file_size(processed_item_metadata_bytes),
|
|
processed_item_metadata_chunks,
|
|
)
|
|
logger.debug(
|
|
"Chunks index sync: compact chunks.archive.d storage saved %s bytes.",
|
|
format_file_size(compact_chunks_archive_saved_space),
|
|
)
|
|
logger.debug("Chunks index sync done.")
|
|
return chunk_idx
|
|
|
|
# The cache can be used by a command that e.g. only checks against Manifest.Operation.WRITE,
|
|
# which does not have to include all flags from Manifest.Operation.READ.
|
|
# Since the sync will attempt to read archives, check compatibility with Manifest.Operation.READ.
|
|
self.manifest.check_repository_compatibility((Manifest.Operation.READ,))
|
|
|
|
self.begin_txn()
|
|
with cache_if_remote(self.repository, decrypted_cache=self.repo_objs) as decrypted_repository:
|
|
# TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d -
|
|
# this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk)
|
|
self.do_cache = os.path.isdir(archive_path)
|
|
self.chunks = create_master_idx(self.chunks)
|
|
|
|
def check_cache_compatibility(self):
|
|
my_features = Manifest.SUPPORTED_REPO_FEATURES
|
|
if self.cache_config.ignored_features & my_features:
|
|
# The cache might not contain references of chunks that need a feature that is mandatory for some operation
|
|
# and which this version supports. To avoid corruption while executing that operation force rebuild.
|
|
return False
|
|
if not self.cache_config.mandatory_features <= my_features:
|
|
# The cache was build with consideration to at least one feature that this version does not understand.
|
|
# This client might misinterpret the cache. Thus force a rebuild.
|
|
return False
|
|
return True
|
|
|
|
def wipe_cache(self):
|
|
logger.warning("Discarding incompatible cache and forcing a cache rebuild")
|
|
archive_path = os.path.join(self.path, "chunks.archive.d")
|
|
if os.path.isdir(archive_path):
|
|
shutil.rmtree(os.path.join(self.path, "chunks.archive.d"))
|
|
os.makedirs(os.path.join(self.path, "chunks.archive.d"))
|
|
self.chunks = ChunkIndex()
|
|
with SaveFile(os.path.join(self.path, files_cache_name()), binary=True):
|
|
pass # empty file
|
|
self.cache_config.manifest_id = ""
|
|
self.cache_config._config.set("cache", "manifest", "")
|
|
|
|
self.cache_config.ignored_features = set()
|
|
self.cache_config.mandatory_features = set()
|
|
|
|
def update_compatibility(self):
|
|
operation_to_features_map = self.manifest.get_all_mandatory_features()
|
|
my_features = Manifest.SUPPORTED_REPO_FEATURES
|
|
repo_features = set()
|
|
for operation, features in operation_to_features_map.items():
|
|
repo_features.update(features)
|
|
|
|
self.cache_config.ignored_features.update(repo_features - my_features)
|
|
self.cache_config.mandatory_features.update(repo_features & my_features)
|
|
|
|
def add_chunk(
|
|
self, id, meta, data, *, stats, overwrite=False, wait=True, compress=True, size=None, ctype=None, clevel=None
|
|
):
|
|
if not self.txn_active:
|
|
self.begin_txn()
|
|
if size is None and compress:
|
|
size = len(data) # data is still uncompressed
|
|
refcount = self.seen_chunk(id, size)
|
|
if refcount and not overwrite:
|
|
return self.chunk_incref(id, stats)
|
|
if size is None:
|
|
raise ValueError("when giving compressed data for a new chunk, the uncompressed size must be given also")
|
|
cdata = self.repo_objs.format(id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel)
|
|
self.repository.put(id, cdata, wait=wait)
|
|
self.chunks.add(id, 1, size)
|
|
stats.update(size, not refcount)
|
|
return ChunkListEntry(id, size)
|
|
|
|
def seen_chunk(self, id, size=None):
|
|
refcount, stored_size = self.chunks.get(id, ChunkIndexEntry(0, None))
|
|
if size is not None and stored_size is not None and size != stored_size:
|
|
# we already have a chunk with that id, but different size.
|
|
# this is either a hash collision (unlikely) or corruption or a bug.
|
|
raise Exception(
|
|
"chunk has same id [%r], but different size (stored: %d new: %d)!" % (id, stored_size, size)
|
|
)
|
|
return refcount
|
|
|
|
def chunk_incref(self, id, stats, size=None):
|
|
if not self.txn_active:
|
|
self.begin_txn()
|
|
count, _size = self.chunks.incref(id)
|
|
stats.update(_size, False)
|
|
return ChunkListEntry(id, _size)
|
|
|
|
def chunk_decref(self, id, stats, wait=True):
|
|
if not self.txn_active:
|
|
self.begin_txn()
|
|
count, size = self.chunks.decref(id)
|
|
if count == 0:
|
|
del self.chunks[id]
|
|
self.repository.delete(id, wait=wait)
|
|
stats.update(-size, True)
|
|
else:
|
|
stats.update(-size, False)
|
|
|
|
def file_known_and_unchanged(self, hashed_path, path_hash, st):
|
|
"""
|
|
Check if we know the file that has this path_hash (know == it is in our files cache) and
|
|
whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode).
|
|
|
|
:param hashed_path: the file's path as we gave it to hash(hashed_path)
|
|
:param path_hash: hash(hashed_path), to save some memory in the files cache
|
|
:param st: the file's stat() result
|
|
:return: known, ids (known is True if we have infos about this file in the cache,
|
|
ids is the list of chunk ids IF the file has not changed, otherwise None).
|
|
"""
|
|
if not stat.S_ISREG(st.st_mode):
|
|
return False, None
|
|
cache_mode = self.cache_mode
|
|
if "d" in cache_mode: # d(isabled)
|
|
files_cache_logger.debug("UNKNOWN: files cache disabled")
|
|
return False, None
|
|
# note: r(echunk) does not need the files cache in this method, but the files cache will
|
|
# be updated and saved to disk to memorize the files. To preserve previous generations in
|
|
# the cache, this means that it also needs to get loaded from disk first.
|
|
if "r" in cache_mode: # r(echunk)
|
|
files_cache_logger.debug("UNKNOWN: rechunking enforced")
|
|
return False, None
|
|
entry = self.files.get(path_hash)
|
|
if not entry:
|
|
files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path)
|
|
return False, None
|
|
# we know the file!
|
|
entry = FileCacheEntry(*msgpack.unpackb(entry))
|
|
if "s" in cache_mode and entry.size != st.st_size:
|
|
files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path)
|
|
return True, None
|
|
if "i" in cache_mode and entry.inode != st.st_ino:
|
|
files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path)
|
|
return True, None
|
|
if "c" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns:
|
|
files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path)
|
|
return True, None
|
|
elif "m" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns:
|
|
files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path)
|
|
return True, None
|
|
# we ignored the inode number in the comparison above or it is still same.
|
|
# if it is still the same, replacing it in the tuple doesn't change it.
|
|
# if we ignored it, a reason for doing that is that files were moved to a new
|
|
# disk / new fs (so a one-time change of inode number is expected) and we wanted
|
|
# to avoid everything getting chunked again. to be able to re-enable the inode
|
|
# number comparison in a future backup run (and avoid chunking everything
|
|
# again at that time), we need to update the inode number in the cache with what
|
|
# we see in the filesystem.
|
|
self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0))
|
|
return True, entry.chunk_ids
|
|
|
|
def memorize_file(self, hashed_path, path_hash, st, ids):
|
|
if not stat.S_ISREG(st.st_mode):
|
|
return
|
|
cache_mode = self.cache_mode
|
|
# note: r(echunk) modes will update the files cache, d(isabled) mode won't
|
|
if "d" in cache_mode:
|
|
files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled")
|
|
return
|
|
if "c" in cache_mode:
|
|
cmtime_type = "ctime"
|
|
cmtime_ns = safe_ns(st.st_ctime_ns)
|
|
elif "m" in cache_mode:
|
|
cmtime_type = "mtime"
|
|
cmtime_ns = safe_ns(st.st_mtime_ns)
|
|
entry = FileCacheEntry(
|
|
age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunk_ids=ids
|
|
)
|
|
self.files[path_hash] = msgpack.packb(entry)
|
|
self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns)
|
|
files_cache_logger.debug(
|
|
"FILES-CACHE-UPDATE: put %r [has %s] <- %r",
|
|
entry._replace(chunk_ids="[%d entries]" % len(entry.chunk_ids)),
|
|
cmtime_type,
|
|
hashed_path,
|
|
)
|
|
|
|
|
|
class AdHocCache(CacheStatsMixin):
|
|
"""
|
|
Ad-hoc, non-persistent cache.
|
|
|
|
Compared to the standard LocalCache the AdHocCache does not maintain accurate reference count,
|
|
nor does it provide a files cache (which would require persistence). Chunks that were not added
|
|
during the current AdHocCache lifetime won't have correct size set (0 bytes) and will
|
|
have an infinite reference count (MAX_VALUE).
|
|
"""
|
|
|
|
str_format = """\
|
|
All archives: unknown unknown unknown
|
|
|
|
Unique chunks Total chunks
|
|
Chunk index: {0.total_unique_chunks:20d} unknown"""
|
|
|
|
def __init__(self, manifest, warn_if_unencrypted=True, lock_wait=None, iec=False):
|
|
CacheStatsMixin.__init__(self, iec=iec)
|
|
assert isinstance(manifest, Manifest)
|
|
self.manifest = manifest
|
|
self.repository = manifest.repository
|
|
self.key = manifest.key
|
|
self.repo_objs = manifest.repo_objs
|
|
self._txn_active = False
|
|
|
|
self.security_manager = SecurityManager(self.repository)
|
|
self.security_manager.assert_secure(manifest, self.key, lock_wait=lock_wait)
|
|
|
|
logger.warning("Note: --no-cache-sync is an experimental feature.")
|
|
|
|
# Public API
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
pass
|
|
|
|
files = None # type: ignore
|
|
cache_mode = "d"
|
|
|
|
def file_known_and_unchanged(self, hashed_path, path_hash, st):
|
|
files_cache_logger.debug("UNKNOWN: files cache not implemented")
|
|
return False, None
|
|
|
|
def memorize_file(self, hashed_path, path_hash, st, ids):
|
|
pass
|
|
|
|
def add_chunk(self, id, meta, data, *, stats, overwrite=False, wait=True, compress=True, size=None):
|
|
assert not overwrite, "AdHocCache does not permit overwrites — trying to use it for recreate?"
|
|
if not self._txn_active:
|
|
self.begin_txn()
|
|
if size is None and compress:
|
|
size = len(data) # data is still uncompressed
|
|
if size is None:
|
|
raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
|
|
refcount = self.seen_chunk(id, size)
|
|
if refcount:
|
|
return self.chunk_incref(id, stats, size=size)
|
|
cdata = self.repo_objs.format(id, meta, data, compress=compress)
|
|
self.repository.put(id, cdata, wait=wait)
|
|
self.chunks.add(id, 1, size)
|
|
stats.update(size, not refcount)
|
|
return ChunkListEntry(id, size)
|
|
|
|
def seen_chunk(self, id, size=None):
|
|
if not self._txn_active:
|
|
self.begin_txn()
|
|
entry = self.chunks.get(id, ChunkIndexEntry(0, None))
|
|
if entry.refcount and size and not entry.size:
|
|
# The LocalCache has existing size information and uses *size* to make an effort at detecting collisions.
|
|
# This is of course not possible for the AdHocCache.
|
|
# Here *size* is used to update the chunk's size information, which will be zero for existing chunks.
|
|
self.chunks[id] = entry._replace(size=size)
|
|
return entry.refcount
|
|
|
|
def chunk_incref(self, id, stats, size=None):
|
|
if not self._txn_active:
|
|
self.begin_txn()
|
|
count, _size = self.chunks.incref(id)
|
|
# When _size is 0 and size is not given, then this chunk has not been locally visited yet (seen_chunk with
|
|
# size or add_chunk); we can't add references to those (size=0 is invalid) and generally don't try to.
|
|
size = _size or size
|
|
assert size
|
|
stats.update(size, False)
|
|
return ChunkListEntry(id, size)
|
|
|
|
def chunk_decref(self, id, stats, wait=True):
|
|
if not self._txn_active:
|
|
self.begin_txn()
|
|
count, size = self.chunks.decref(id)
|
|
if count == 0:
|
|
del self.chunks[id]
|
|
self.repository.delete(id, wait=wait)
|
|
stats.update(-size, True)
|
|
else:
|
|
stats.update(-size, False)
|
|
|
|
def commit(self):
|
|
if not self._txn_active:
|
|
return
|
|
self.security_manager.save(self.manifest, self.key)
|
|
self._txn_active = False
|
|
|
|
def rollback(self):
|
|
self._txn_active = False
|
|
del self.chunks
|
|
|
|
def begin_txn(self):
|
|
self._txn_active = True
|
|
# Explicitly set the initial usable hash table capacity to avoid performance issues
|
|
# due to hash table "resonance".
|
|
# Since we're creating an archive, add 10 % from the start.
|
|
num_chunks = len(self.repository)
|
|
self.chunks = ChunkIndex(usable=num_chunks * 1.1)
|
|
pi = ProgressIndicatorPercent(
|
|
total=num_chunks, msg="Downloading chunk list... %3.0f%%", msgid="cache.download_chunks"
|
|
)
|
|
t0 = perf_counter()
|
|
num_requests = 0
|
|
marker = None
|
|
while True:
|
|
result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
|
|
num_requests += 1
|
|
if not result:
|
|
break
|
|
pi.show(increase=len(result))
|
|
marker = result[-1]
|
|
# All chunks from the repository have a refcount of MAX_VALUE, which is sticky,
|
|
# therefore we can't/won't delete them. Chunks we added ourselves in this transaction
|
|
# (e.g. checkpoint archives) are tracked correctly.
|
|
init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
|
|
for id_ in result:
|
|
self.chunks[id_] = init_entry
|
|
assert len(self.chunks) == num_chunks
|
|
# LocalCache does not contain the manifest, either.
|
|
del self.chunks[self.manifest.MANIFEST_ID]
|
|
duration = perf_counter() - t0 or 0.01
|
|
pi.finish()
|
|
logger.debug(
|
|
"AdHocCache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s",
|
|
num_chunks,
|
|
duration,
|
|
num_requests,
|
|
format_file_size(num_chunks * 34 / duration),
|
|
)
|
|
# Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes.
|
|
# Protocol overhead is neglected in this calculation.
|