always give id and size to chunk_incref/chunk_decref

incref: returns (id, size), so it needs the size if it can't
get it from the chunks index. also needed for updating stats.

decref: caller does not always have the chunk size (e.g. for
metadata chunks),
as we consider 0 to be an invalid size, we call with size == 1
in that case. thus, stats might be slightly off.
This commit is contained in:
Thomas Waldmann 2023-09-21 21:38:09 +02:00
parent 4488c077a7
commit 17fce18b44
No known key found for this signature in database
GPG Key ID: 243ACFA951F78E01
5 changed files with 53 additions and 46 deletions

View File

@ -643,14 +643,14 @@ Duration: {0.duration}
# so we can already remove it here, the next .save() will then commit this cleanup.
# remove its manifest entry, remove its ArchiveItem chunk, remove its item_ptrs chunks:
del self.manifest.archives[self.checkpoint_name]
self.cache.chunk_decref(self.id, self.stats)
self.cache.chunk_decref(self.id, 1, self.stats)
for id in metadata.item_ptrs:
self.cache.chunk_decref(id, self.stats)
self.cache.chunk_decref(id, 1, self.stats)
# also get rid of that part item, we do not want to have it in next checkpoint or final archive
tail_chunks = self.items_buffer.restore_chunks_state()
# tail_chunks contain the tail of the archive items metadata stream, not needed for next commit.
for id in tail_chunks:
self.cache.chunk_decref(id, self.stats)
self.cache.chunk_decref(id, 1, self.stats) # TODO can we have real size here?
def save(self, name=None, comment=None, timestamp=None, stats=None, additional_metadata=None):
name = name or self.name
@ -1024,7 +1024,7 @@ Duration: {0.duration}
new_id = self.key.id_hash(data)
self.cache.add_chunk(new_id, {}, data, stats=self.stats, ro_type=ROBJ_ARCHIVE_META)
self.manifest.archives[self.name] = (new_id, metadata.time)
self.cache.chunk_decref(self.id, self.stats)
self.cache.chunk_decref(self.id, 1, self.stats)
self.id = new_id
def rename(self, name):
@ -1052,9 +1052,9 @@ Duration: {0.duration}
error = True
return exception_ignored # must not return None here
def chunk_decref(id, stats):
def chunk_decref(id, size, stats):
try:
self.cache.chunk_decref(id, stats, wait=False)
self.cache.chunk_decref(id, size, stats, wait=False)
except KeyError:
cid = bin_to_hex(id)
raise ChunksIndexError(cid)
@ -1073,13 +1073,13 @@ Duration: {0.duration}
pi.show(i)
_, data = self.repo_objs.parse(items_id, data, ro_type=ROBJ_ARCHIVE_STREAM)
unpacker.feed(data)
chunk_decref(items_id, stats)
chunk_decref(items_id, 1, stats)
try:
for item in unpacker:
item = Item(internal_dict=item)
if "chunks" in item:
for chunk_id, size in item.chunks:
chunk_decref(chunk_id, stats)
chunk_decref(chunk_id, size, stats)
except (TypeError, ValueError):
# if items metadata spans multiple chunks and one chunk got dropped somehow,
# it could be that unpacker yields bad types
@ -1096,12 +1096,12 @@ Duration: {0.duration}
# delete the blocks that store all the references that end up being loaded into metadata.items:
for id in self.metadata.item_ptrs:
chunk_decref(id, stats)
chunk_decref(id, 1, stats)
# in forced delete mode, we try hard to delete at least the manifest entry,
# if possible also the archive superblock, even if processing the items raises
# some harmless exception.
chunk_decref(self.id, stats)
chunk_decref(self.id, 1, stats)
del self.manifest.archives[self.name]
while fetch_async_response(wait=True) is not None:
# we did async deletes, process outstanding results (== exceptions),
@ -1510,7 +1510,7 @@ class FilesystemObjectProcessors:
except BackupOSError:
# see comments in process_file's exception handler, same issue here.
for chunk in item.get("chunks", []):
cache.chunk_decref(chunk.id, self.stats, wait=False)
cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False)
raise
else:
item.get_size(memorize=True)
@ -1544,7 +1544,7 @@ class FilesystemObjectProcessors:
item.chunks = []
for chunk_id, chunk_size in hl_chunks:
# process one-by-one, so we will know in item.chunks how far we got
chunk_entry = cache.chunk_incref(chunk_id, self.stats)
chunk_entry = cache.chunk_incref(chunk_id, chunk_size, self.stats)
item.chunks.append(chunk_entry)
else: # normal case, no "2nd+" hardlink
if not is_special_file:
@ -1570,10 +1570,8 @@ class FilesystemObjectProcessors:
item.chunks = []
for chunk in chunks:
# process one-by-one, so we will know in item.chunks how far we got
chunk_entry = cache.chunk_incref(chunk.id, self.stats)
# chunk.size is from files cache, chunk_entry.size from index:
assert chunk == chunk_entry
item.chunks.append(chunk_entry)
cache.chunk_incref(chunk.id, chunk.size, self.stats)
item.chunks.append(chunk)
status = "U" # regular file, unchanged
else:
status = "M" if known else "A" # regular file, modified or added
@ -1622,7 +1620,7 @@ class FilesystemObjectProcessors:
# but we will not add an item (see add_item in create_helper) and thus
# they would be orphaned chunks in case that we commit the transaction.
for chunk in item.get("chunks", []):
cache.chunk_decref(chunk.id, self.stats, wait=False)
cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False)
# Now that we have cleaned up the chunk references, we can re-raise the exception.
# This will skip processing of this file, but might retry or continue with the next one.
raise
@ -1733,7 +1731,7 @@ class TarfileObjectProcessors:
except BackupOSError:
# see comment in FilesystemObjectProcessors.process_file, same issue here.
for chunk in item.get("chunks", []):
self.cache.chunk_decref(chunk.id, self.stats, wait=False)
self.cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False)
raise
@ -2446,7 +2444,7 @@ class ArchiveRecreater:
def process_chunks(self, archive, target, item):
if not target.recreate_rechunkify:
for chunk_id, size in item.chunks:
self.cache.chunk_incref(chunk_id, target.stats)
self.cache.chunk_incref(chunk_id, size, target.stats)
return item.chunks
chunk_iterator = self.iter_chunks(archive, target, list(item.chunks))
chunk_processor = partial(self.chunk_processor, target)
@ -2454,8 +2452,9 @@ class ArchiveRecreater:
def chunk_processor(self, target, chunk):
chunk_id, data = cached_hash(chunk, self.key.id_hash)
size = len(data)
if chunk_id in self.seen_chunks:
return self.cache.chunk_incref(chunk_id, target.stats)
return self.cache.chunk_incref(chunk_id, size, target.stats)
chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, wait=False, ro_type=ROBJ_FILE_STREAM)
self.cache.repository.async_response(wait=False)
self.seen_chunks.add(chunk_entry.id)

View File

@ -143,7 +143,7 @@ class TransferMixIn:
transfer_size += size
else:
if not dry_run:
chunk_entry = cache.chunk_incref(chunk_id, archive.stats)
chunk_entry = cache.chunk_incref(chunk_id, size, archive.stats)
chunks.append(chunk_entry)
present_size += size
if not dry_run:

View File

@ -979,11 +979,14 @@ class LocalCache(CacheStatsMixin):
assert ro_type is not None
if not self.txn_active:
self.begin_txn()
if size is None and compress:
size = len(data) # data is still uncompressed
if size is None:
if compress:
size = len(data) # data is still uncompressed
else:
raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
refcount = self.seen_chunk(id, size)
if refcount:
return self.chunk_incref(id, stats)
return self.chunk_incref(id, size, stats)
if size is None:
raise ValueError("when giving compressed data for a new chunk, the uncompressed size must be given also")
cdata = self.repo_objs.format(
@ -1004,17 +1007,21 @@ class LocalCache(CacheStatsMixin):
)
return refcount
def chunk_incref(self, id, stats, size=None):
def chunk_incref(self, id, size, stats):
assert isinstance(size, int) and size > 0
if not self.txn_active:
self.begin_txn()
count, _size = self.chunks.incref(id)
stats.update(_size, False)
return ChunkListEntry(id, _size)
assert size == _size
stats.update(size, False)
return ChunkListEntry(id, size)
def chunk_decref(self, id, stats, wait=True):
def chunk_decref(self, id, size, stats, wait=True):
assert isinstance(size, int) and size > 0
if not self.txn_active:
self.begin_txn()
count, size = self.chunks.decref(id)
count, _size = self.chunks.decref(id)
assert size == 1 or size == _size # don't check if caller gave fake size 1
if count == 0:
del self.chunks[id]
self.repository.delete(id, wait=wait)
@ -1157,13 +1164,14 @@ Chunk index: {0.total_unique_chunks:20d} unknown"""
assert ro_type is not None
if not self._txn_active:
self.begin_txn()
if size is None and compress:
size = len(data) # data is still uncompressed
if size is None:
raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
if compress:
size = len(data) # data is still uncompressed
else:
raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
refcount = self.seen_chunk(id, size)
if refcount:
return self.chunk_incref(id, stats, size=size)
return self.chunk_incref(id, size, stats)
cdata = self.repo_objs.format(id, meta, data, compress=compress, ro_type=ro_type)
self.repository.put(id, cdata, wait=wait)
self.chunks.add(id, 1, size)
@ -1181,21 +1189,21 @@ Chunk index: {0.total_unique_chunks:20d} unknown"""
self.chunks[id] = entry._replace(size=size)
return entry.refcount
def chunk_incref(self, id, stats, size=None):
def chunk_incref(self, id, size, stats):
assert isinstance(size, int) and size > 0
if not self._txn_active:
self.begin_txn()
count, _size = self.chunks.incref(id)
# When _size is 0 and size is not given, then this chunk has not been locally visited yet (seen_chunk with
# size or add_chunk); we can't add references to those (size=0 is invalid) and generally don't try to.
size = _size or size
assert size
assert size == _size
stats.update(size, False)
return ChunkListEntry(id, size)
def chunk_decref(self, id, stats, wait=True):
def chunk_decref(self, id, size, stats, wait=True):
assert isinstance(size, int) and size > 0
if not self._txn_active:
self.begin_txn()
count, size = self.chunks.decref(id)
count, _size = self.chunks.decref(id)
assert size == 1 or size == _size # don't check if caller gave fake size 1
if count == 0:
del self.chunks[id]
self.repository.delete(id, wait=wait)

View File

@ -189,7 +189,7 @@ class TestAdHocCache:
def test_does_not_delete_existing_chunks(self, repository, cache):
assert cache.seen_chunk(H(1)) == ChunkIndex.MAX_VALUE
cache.chunk_decref(H(1), Statistics())
cache.chunk_decref(H(1), 1, Statistics())
assert repository.get(H(1)) == b"1234"
def test_seen_chunk_add_chunk_size(self, cache):
@ -199,7 +199,7 @@ class TestAdHocCache:
"""E.g. checkpoint archives"""
cache.add_chunk(H(5), {}, b"1010", stats=Statistics())
assert cache.seen_chunk(H(5)) == 1
cache.chunk_decref(H(5), Statistics())
cache.chunk_decref(H(5), 1, Statistics())
assert not cache.seen_chunk(H(5))
with pytest.raises(Repository.ObjectNotFound):
repository.get(H(5))
@ -220,9 +220,9 @@ class TestAdHocCache:
def test_incref_after_add_chunk(self, cache):
assert cache.add_chunk(H(3), {}, b"5678", stats=Statistics()) == (H(3), 4)
assert cache.chunk_incref(H(3), Statistics()) == (H(3), 4)
assert cache.chunk_incref(H(3), 4, Statistics()) == (H(3), 4)
def test_existing_incref_after_add_chunk(self, cache):
"""This case occurs with part files, see Archive.chunk_file."""
assert cache.add_chunk(H(1), {}, b"5678", stats=Statistics()) == (H(1), 4)
assert cache.chunk_incref(H(1), Statistics()) == (H(1), 4)
assert cache.chunk_incref(H(1), 4, Statistics()) == (H(1), 4)

View File

@ -84,8 +84,8 @@ class UpgraderFrom12To20:
chunks, chunks_healthy = self.hlm.retrieve(id=hlid, default=(None, None))
if chunks is not None:
item.chunks = chunks
for chunk_id, _ in chunks:
self.cache.chunk_incref(chunk_id, self.archive.stats)
for chunk_id, chunk_size in chunks:
self.cache.chunk_incref(chunk_id, chunk_size, self.archive.stats)
if chunks_healthy is not None:
item.chunks_healthy = chunks
del item.source # not used for hardlinks any more, replaced by hlid