WIP (tests failing) implement NewCache

Also:
- move common code to ChunksMixin
- always use ._txn_active (not .txn_active)
This commit is contained in:
Thomas Waldmann 2023-09-22 23:40:42 +02:00
parent 5ec557908c
commit 6087d7487b
No known key found for this signature in database
GPG Key ID: 243ACFA951F78E01
2 changed files with 298 additions and 174 deletions

View File

@ -420,6 +420,17 @@ class Cache:
cache_mode=cache_mode,
)
def newcache():
return NewCache(
manifest=manifest,
path=path,
warn_if_unencrypted=warn_if_unencrypted,
progress=progress,
iec=iec,
lock_wait=lock_wait,
cache_mode=cache_mode,
)
def adhoc():
return AdHocCache(manifest=manifest, lock_wait=lock_wait, iec=iec)
@ -659,7 +670,127 @@ class FilesCacheMixin:
)
class LocalCache(CacheStatsMixin, FilesCacheMixin):
class ChunksMixin:
"""
Chunks index related code for misc. Cache implementations.
"""
def chunk_incref(self, id, size, stats):
assert isinstance(size, int) and size > 0
if not self._txn_active:
self.begin_txn()
count, _size = self.chunks.incref(id)
stats.update(size, False)
return ChunkListEntry(id, size)
def chunk_decref(self, id, size, stats, wait=True):
assert isinstance(size, int) and size > 0
if not self._txn_active:
self.begin_txn()
count, _size = self.chunks.decref(id)
if count == 0:
del self.chunks[id]
self.repository.delete(id, wait=wait)
stats.update(-size, True)
else:
stats.update(-size, False)
def seen_chunk(self, id, size=None):
if not self._txn_active:
self.begin_txn()
entry = self.chunks.get(id, ChunkIndexEntry(0, None))
if entry.refcount and size is not None:
assert isinstance(entry.size, int)
if entry.size:
# LocalCache: has existing size information and uses *size* to make an effort at detecting collisions.
if size != entry.size:
# we already have a chunk with that id, but different size.
# this is either a hash collision (unlikely) or corruption or a bug.
raise Exception(
"chunk has same id [%r], but different size (stored: %d new: %d)!" % (id, entry.size, size)
)
else:
# NewCache / AdHocCache:
# Here *size* is used to update the chunk's size information, which will be zero for existing chunks.
self.chunks[id] = entry._replace(size=size)
return entry.refcount
def add_chunk(
self,
id,
meta,
data,
*,
stats,
wait=True,
compress=True,
size=None,
ctype=None,
clevel=None,
ro_type=ROBJ_FILE_STREAM,
):
assert ro_type is not None
if not self._txn_active:
self.begin_txn()
if size is None:
if compress:
size = len(data) # data is still uncompressed
else:
raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
refcount = self.seen_chunk(id, size)
if refcount:
return self.chunk_incref(id, size, stats)
cdata = self.repo_objs.format(
id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type
)
self.repository.put(id, cdata, wait=wait)
self.chunks.add(id, 1, size)
stats.update(size, not refcount)
return ChunkListEntry(id, size)
def _load_chunks_from_repo(self):
# Explicitly set the initial usable hash table capacity to avoid performance issues
# due to hash table "resonance".
# Since we're creating an archive, add 10 % from the start.
num_chunks = len(self.repository)
chunks = ChunkIndex(usable=num_chunks * 1.1)
pi = ProgressIndicatorPercent(
total=num_chunks, msg="Downloading chunk list... %3.0f%%", msgid="cache.download_chunks"
)
t0 = perf_counter()
num_requests = 0
marker = None
while True:
result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
num_requests += 1
if not result:
break
pi.show(increase=len(result))
marker = result[-1]
# All chunks from the repository have a refcount of MAX_VALUE, which is sticky,
# therefore we can't/won't delete them. Chunks we added ourselves in this transaction
# (e.g. checkpoint archives) are tracked correctly.
init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
for id_ in result:
chunks[id_] = init_entry
assert len(chunks) == num_chunks
# LocalCache does not contain the manifest, either.
del chunks[self.manifest.MANIFEST_ID]
duration = perf_counter() - t0 or 0.01
pi.finish()
logger.debug(
"Cache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s",
num_chunks,
duration,
num_requests,
format_file_size(num_chunks * 34 / duration),
)
# Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes.
# Protocol overhead is neglected in this calculation.
return chunks
class LocalCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin):
"""
Persistent, local (client-side) cache.
"""
@ -690,7 +821,7 @@ class LocalCache(CacheStatsMixin, FilesCacheMixin):
self.repo_objs = manifest.repo_objs
self.progress = progress
self.timestamp = None
self.txn_active = False
self._txn_active = False
self.path = cache_dir(self.repository, path)
self.security_manager = SecurityManager(self.repository)
@ -769,12 +900,12 @@ class LocalCache(CacheStatsMixin, FilesCacheMixin):
except FileNotFoundError:
self._create_empty_files_cache(txn_dir)
os.replace(txn_dir, os.path.join(self.path, "txn.active"))
self.txn_active = True
self._txn_active = True
pi.finish()
def commit(self):
"""Commit transaction"""
if not self.txn_active:
if not self._txn_active:
return
self.security_manager.save(self.manifest, self.key)
pi = ProgressIndicatorMessage(msgid="cache.commit")
@ -790,7 +921,7 @@ class LocalCache(CacheStatsMixin, FilesCacheMixin):
self.cache_config.save(self.manifest, self.key)
os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp"))
shutil.rmtree(os.path.join(self.path, "txn.tmp"))
self.txn_active = False
self._txn_active = False
pi.finish()
def rollback(self):
@ -808,7 +939,7 @@ class LocalCache(CacheStatsMixin, FilesCacheMixin):
os.replace(txn_dir, txn_tmp)
if os.path.exists(txn_tmp):
shutil.rmtree(txn_tmp)
self.txn_active = False
self._txn_active = False
self._do_open()
def sync(self):
@ -1052,75 +1183,171 @@ class LocalCache(CacheStatsMixin, FilesCacheMixin):
self.cache_config.ignored_features.update(repo_features - my_features)
self.cache_config.mandatory_features.update(repo_features & my_features)
def add_chunk(
class NewCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin):
"""
Like AdHocCache, but with a files cache.
"""
def __init__(
self,
id,
meta,
data,
*,
stats,
wait=True,
compress=True,
size=None,
ctype=None,
clevel=None,
ro_type=ROBJ_FILE_STREAM,
manifest,
path=None,
warn_if_unencrypted=True,
progress=False,
lock_wait=None,
cache_mode=FILES_CACHE_MODE_DISABLED,
iec=False,
):
assert ro_type is not None
if not self.txn_active:
self.begin_txn()
if size is None:
if compress:
size = len(data) # data is still uncompressed
else:
raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
refcount = self.seen_chunk(id, size)
if refcount:
return self.chunk_incref(id, size, stats)
if size is None:
raise ValueError("when giving compressed data for a new chunk, the uncompressed size must be given also")
cdata = self.repo_objs.format(
id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type
)
self.repository.put(id, cdata, wait=wait)
self.chunks.add(id, 1, size)
stats.update(size, not refcount)
return ChunkListEntry(id, size)
"""
:param warn_if_unencrypted: print warning if accessing unknown unencrypted repository
:param lock_wait: timeout for lock acquisition (int [s] or None [wait forever])
:param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison
"""
CacheStatsMixin.__init__(self, iec=iec)
FilesCacheMixin.__init__(self, cache_mode)
assert isinstance(manifest, Manifest)
self.manifest = manifest
self.repository = manifest.repository
self.key = manifest.key
self.repo_objs = manifest.repo_objs
self.progress = progress
self.timestamp = None
self._txn_active = False
def seen_chunk(self, id, size=None):
refcount, stored_size = self.chunks.get(id, ChunkIndexEntry(0, None))
if size is not None and stored_size is not None and size != stored_size:
# we already have a chunk with that id, but different size.
# this is either a hash collision (unlikely) or corruption or a bug.
raise Exception(
"chunk has same id [%r], but different size (stored: %d new: %d)!" % (id, stored_size, size)
)
return refcount
self.path = cache_dir(self.repository, path)
self.security_manager = SecurityManager(self.repository)
self.cache_config = CacheConfig(self.repository, self.path, lock_wait)
def chunk_incref(self, id, size, stats):
assert isinstance(size, int) and size > 0
if not self.txn_active:
self.begin_txn()
count, _size = self.chunks.incref(id)
assert size == _size
stats.update(size, False)
return ChunkListEntry(id, size)
# Warn user before sending data to a never seen before unencrypted repository
if not os.path.exists(self.path):
self.security_manager.assert_access_unknown(warn_if_unencrypted, manifest, self.key)
self.create()
def chunk_decref(self, id, size, stats, wait=True):
assert isinstance(size, int) and size > 0
if not self.txn_active:
self.begin_txn()
count, _size = self.chunks.decref(id)
assert size == 1 or size == _size # don't check if caller gave fake size 1
if count == 0:
del self.chunks[id]
self.repository.delete(id, wait=wait)
stats.update(-size, True)
else:
stats.update(-size, False)
self.open()
try:
self.security_manager.assert_secure(manifest, self.key, cache_config=self.cache_config)
if not self.check_cache_compatibility():
self.wipe_cache()
self.update_compatibility()
except: # noqa
self.close()
raise
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def create(self):
"""Create a new empty cache at `self.path`"""
os.makedirs(self.path)
with open(os.path.join(self.path, "README"), "w") as fd:
fd.write(CACHE_README)
self.cache_config.create()
self._create_empty_files_cache(self.path)
def _do_open(self):
self.cache_config.load()
self.chunks = self._load_chunks_from_repo()
self._read_files_cache()
def open(self):
if not os.path.isdir(self.path):
raise Exception("%s Does not look like a Borg cache" % self.path)
self.cache_config.open()
self.rollback()
def close(self):
if self.cache_config is not None:
self.cache_config.close()
self.cache_config = None
def begin_txn(self):
# Initialize transaction snapshot
pi = ProgressIndicatorMessage(msgid="cache.begin_transaction")
txn_dir = os.path.join(self.path, "txn.tmp")
os.mkdir(txn_dir)
pi.output("Initializing cache transaction: Reading config")
shutil.copy(os.path.join(self.path, "config"), txn_dir)
pi.output("Initializing cache transaction: Reading files")
try:
shutil.copy(os.path.join(self.path, self.files_cache_name()), txn_dir)
except FileNotFoundError:
self._create_empty_files_cache(txn_dir)
os.replace(txn_dir, os.path.join(self.path, "txn.active"))
pi.finish()
self._txn_active = True
def commit(self):
if not self._txn_active:
return
self.security_manager.save(self.manifest, self.key)
pi = ProgressIndicatorMessage(msgid="cache.commit")
if self.files is not None:
pi.output("Saving files cache")
integrity_data = self._write_files_cache()
self.cache_config.integrity[self.files_cache_name()] = integrity_data
pi.output("Saving cache config")
self.cache_config.save(self.manifest, self.key)
os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp"))
shutil.rmtree(os.path.join(self.path, "txn.tmp"))
self._txn_active = False
pi.finish()
def rollback(self):
# Remove partial transaction
if os.path.exists(os.path.join(self.path, "txn.tmp")):
shutil.rmtree(os.path.join(self.path, "txn.tmp"))
# Roll back active transaction
txn_dir = os.path.join(self.path, "txn.active")
if os.path.exists(txn_dir):
shutil.copy(os.path.join(txn_dir, "config"), self.path)
shutil.copy(os.path.join(txn_dir, self.discover_files_cache_name(txn_dir)), self.path)
txn_tmp = os.path.join(self.path, "txn.tmp")
os.replace(txn_dir, txn_tmp)
if os.path.exists(txn_tmp):
shutil.rmtree(txn_tmp)
self._txn_active = False
self._do_open()
def check_cache_compatibility(self):
my_features = Manifest.SUPPORTED_REPO_FEATURES
if self.cache_config.ignored_features & my_features:
# The cache might not contain references of chunks that need a feature that is mandatory for some operation
# and which this version supports. To avoid corruption while executing that operation force rebuild.
return False
if not self.cache_config.mandatory_features <= my_features:
# The cache was build with consideration to at least one feature that this version does not understand.
# This client might misinterpret the cache. Thus force a rebuild.
return False
return True
def wipe_cache(self):
logger.warning("Discarding incompatible cache and forcing a cache rebuild")
self.chunks = ChunkIndex()
self._create_empty_files_cache(self.path)
self.cache_config.manifest_id = ""
self.cache_config._config.set("cache", "manifest", "")
self.cache_config.ignored_features = set()
self.cache_config.mandatory_features = set()
def update_compatibility(self):
operation_to_features_map = self.manifest.get_all_mandatory_features()
my_features = Manifest.SUPPORTED_REPO_FEATURES
repo_features = set()
for operation, features in operation_to_features_map.items():
repo_features.update(features)
self.cache_config.ignored_features.update(repo_features - my_features)
self.cache_config.mandatory_features.update(repo_features & my_features)
class AdHocCache(CacheStatsMixin):
class AdHocCache(CacheStatsMixin, ChunksMixin):
"""
Ad-hoc, non-persistent cache.
@ -1168,72 +1395,6 @@ Chunk index: {0.total_unique_chunks:20d} unknown"""
def memorize_file(self, hashed_path, path_hash, st, chunks):
pass
def add_chunk(
self,
id,
meta,
data,
*,
stats,
wait=True,
compress=True,
size=None,
ctype=None,
clevel=None,
ro_type=ROBJ_FILE_STREAM,
):
assert ro_type is not None
if not self._txn_active:
self.begin_txn()
if size is None:
if compress:
size = len(data) # data is still uncompressed
else:
raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
refcount = self.seen_chunk(id, size)
if refcount:
return self.chunk_incref(id, size, stats)
cdata = self.repo_objs.format(
id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type
)
self.repository.put(id, cdata, wait=wait)
self.chunks.add(id, 1, size)
stats.update(size, not refcount)
return ChunkListEntry(id, size)
def seen_chunk(self, id, size=None):
if not self._txn_active:
self.begin_txn()
entry = self.chunks.get(id, ChunkIndexEntry(0, None))
if entry.refcount and size and not entry.size:
# The LocalCache has existing size information and uses *size* to make an effort at detecting collisions.
# This is of course not possible for the AdHocCache.
# Here *size* is used to update the chunk's size information, which will be zero for existing chunks.
self.chunks[id] = entry._replace(size=size)
return entry.refcount
def chunk_incref(self, id, size, stats):
assert isinstance(size, int) and size > 0
if not self._txn_active:
self.begin_txn()
count, _size = self.chunks.incref(id)
assert size == _size
stats.update(size, False)
return ChunkListEntry(id, size)
def chunk_decref(self, id, size, stats, wait=True):
assert isinstance(size, int) and size > 0
if not self._txn_active:
self.begin_txn()
count, _size = self.chunks.decref(id)
assert size == 1 or size == _size # don't check if caller gave fake size 1
if count == 0:
del self.chunks[id]
self.repository.delete(id, wait=wait)
stats.update(-size, True)
else:
stats.update(-size, False)
def commit(self):
if not self._txn_active:
return
@ -1246,41 +1407,4 @@ Chunk index: {0.total_unique_chunks:20d} unknown"""
def begin_txn(self):
self._txn_active = True
# Explicitly set the initial usable hash table capacity to avoid performance issues
# due to hash table "resonance".
# Since we're creating an archive, add 10 % from the start.
num_chunks = len(self.repository)
self.chunks = ChunkIndex(usable=num_chunks * 1.1)
pi = ProgressIndicatorPercent(
total=num_chunks, msg="Downloading chunk list... %3.0f%%", msgid="cache.download_chunks"
)
t0 = perf_counter()
num_requests = 0
marker = None
while True:
result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
num_requests += 1
if not result:
break
pi.show(increase=len(result))
marker = result[-1]
# All chunks from the repository have a refcount of MAX_VALUE, which is sticky,
# therefore we can't/won't delete them. Chunks we added ourselves in this transaction
# (e.g. checkpoint archives) are tracked correctly.
init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
for id_ in result:
self.chunks[id_] = init_entry
assert len(self.chunks) == num_chunks
# LocalCache does not contain the manifest, either.
del self.chunks[self.manifest.MANIFEST_ID]
duration = perf_counter() - t0 or 0.01
pi.finish()
logger.debug(
"AdHocCache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s",
num_chunks,
duration,
num_requests,
format_file_size(num_chunks * 34 / duration),
)
# Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes.
# Protocol overhead is neglected in this calculation.
self.chunks = self._load_chunks_from_repo()

View File

@ -1184,13 +1184,13 @@ class BorgJsonEncoder(json.JSONEncoder):
from ..repository import Repository
from ..remote import RemoteRepository
from ..archive import Archive
from ..cache import LocalCache, AdHocCache
from ..cache import LocalCache, AdHocCache, NewCache
if isinstance(o, Repository) or isinstance(o, RemoteRepository):
return {"id": bin_to_hex(o.id), "location": o._location.canonical_path()}
if isinstance(o, Archive):
return o.info()
if isinstance(o, LocalCache):
if isinstance(o, (LocalCache, NewCache)):
return {"path": o.path, "stats": o.stats()}
if isinstance(o, AdHocCache):
return {"stats": o.stats()}