diff --git a/src/borg/archive.py b/src/borg/archive.py index f058e6ce8..891a16222 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -643,14 +643,14 @@ Duration: {0.duration} # so we can already remove it here, the next .save() will then commit this cleanup. # remove its manifest entry, remove its ArchiveItem chunk, remove its item_ptrs chunks: del self.manifest.archives[self.checkpoint_name] - self.cache.chunk_decref(self.id, self.stats) + self.cache.chunk_decref(self.id, 1, self.stats) for id in metadata.item_ptrs: - self.cache.chunk_decref(id, self.stats) + self.cache.chunk_decref(id, 1, self.stats) # also get rid of that part item, we do not want to have it in next checkpoint or final archive tail_chunks = self.items_buffer.restore_chunks_state() # tail_chunks contain the tail of the archive items metadata stream, not needed for next commit. for id in tail_chunks: - self.cache.chunk_decref(id, self.stats) + self.cache.chunk_decref(id, 1, self.stats) # TODO can we have real size here? def save(self, name=None, comment=None, timestamp=None, stats=None, additional_metadata=None): name = name or self.name @@ -1024,7 +1024,7 @@ Duration: {0.duration} new_id = self.key.id_hash(data) self.cache.add_chunk(new_id, {}, data, stats=self.stats, ro_type=ROBJ_ARCHIVE_META) self.manifest.archives[self.name] = (new_id, metadata.time) - self.cache.chunk_decref(self.id, self.stats) + self.cache.chunk_decref(self.id, 1, self.stats) self.id = new_id def rename(self, name): @@ -1052,9 +1052,9 @@ Duration: {0.duration} error = True return exception_ignored # must not return None here - def chunk_decref(id, stats): + def chunk_decref(id, size, stats): try: - self.cache.chunk_decref(id, stats, wait=False) + self.cache.chunk_decref(id, size, stats, wait=False) except KeyError: cid = bin_to_hex(id) raise ChunksIndexError(cid) @@ -1073,13 +1073,13 @@ Duration: {0.duration} pi.show(i) _, data = self.repo_objs.parse(items_id, data, ro_type=ROBJ_ARCHIVE_STREAM) unpacker.feed(data) - chunk_decref(items_id, stats) + chunk_decref(items_id, 1, stats) try: for item in unpacker: item = Item(internal_dict=item) if "chunks" in item: for chunk_id, size in item.chunks: - chunk_decref(chunk_id, stats) + chunk_decref(chunk_id, size, stats) except (TypeError, ValueError): # if items metadata spans multiple chunks and one chunk got dropped somehow, # it could be that unpacker yields bad types @@ -1096,12 +1096,12 @@ Duration: {0.duration} # delete the blocks that store all the references that end up being loaded into metadata.items: for id in self.metadata.item_ptrs: - chunk_decref(id, stats) + chunk_decref(id, 1, stats) # in forced delete mode, we try hard to delete at least the manifest entry, # if possible also the archive superblock, even if processing the items raises # some harmless exception. - chunk_decref(self.id, stats) + chunk_decref(self.id, 1, stats) del self.manifest.archives[self.name] while fetch_async_response(wait=True) is not None: # we did async deletes, process outstanding results (== exceptions), @@ -1510,7 +1510,7 @@ class FilesystemObjectProcessors: except BackupOSError: # see comments in process_file's exception handler, same issue here. for chunk in item.get("chunks", []): - cache.chunk_decref(chunk.id, self.stats, wait=False) + cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False) raise else: item.get_size(memorize=True) @@ -1544,7 +1544,7 @@ class FilesystemObjectProcessors: item.chunks = [] for chunk_id, chunk_size in hl_chunks: # process one-by-one, so we will know in item.chunks how far we got - chunk_entry = cache.chunk_incref(chunk_id, self.stats) + chunk_entry = cache.chunk_incref(chunk_id, chunk_size, self.stats) item.chunks.append(chunk_entry) else: # normal case, no "2nd+" hardlink if not is_special_file: @@ -1570,10 +1570,8 @@ class FilesystemObjectProcessors: item.chunks = [] for chunk in chunks: # process one-by-one, so we will know in item.chunks how far we got - chunk_entry = cache.chunk_incref(chunk.id, self.stats) - # chunk.size is from files cache, chunk_entry.size from index: - assert chunk == chunk_entry - item.chunks.append(chunk_entry) + cache.chunk_incref(chunk.id, chunk.size, self.stats) + item.chunks.append(chunk) status = "U" # regular file, unchanged else: status = "M" if known else "A" # regular file, modified or added @@ -1622,7 +1620,7 @@ class FilesystemObjectProcessors: # but we will not add an item (see add_item in create_helper) and thus # they would be orphaned chunks in case that we commit the transaction. for chunk in item.get("chunks", []): - cache.chunk_decref(chunk.id, self.stats, wait=False) + cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False) # Now that we have cleaned up the chunk references, we can re-raise the exception. # This will skip processing of this file, but might retry or continue with the next one. raise @@ -1733,7 +1731,7 @@ class TarfileObjectProcessors: except BackupOSError: # see comment in FilesystemObjectProcessors.process_file, same issue here. for chunk in item.get("chunks", []): - self.cache.chunk_decref(chunk.id, self.stats, wait=False) + self.cache.chunk_decref(chunk.id, chunk.size, self.stats, wait=False) raise @@ -2446,7 +2444,7 @@ class ArchiveRecreater: def process_chunks(self, archive, target, item): if not target.recreate_rechunkify: for chunk_id, size in item.chunks: - self.cache.chunk_incref(chunk_id, target.stats) + self.cache.chunk_incref(chunk_id, size, target.stats) return item.chunks chunk_iterator = self.iter_chunks(archive, target, list(item.chunks)) chunk_processor = partial(self.chunk_processor, target) @@ -2454,8 +2452,9 @@ class ArchiveRecreater: def chunk_processor(self, target, chunk): chunk_id, data = cached_hash(chunk, self.key.id_hash) + size = len(data) if chunk_id in self.seen_chunks: - return self.cache.chunk_incref(chunk_id, target.stats) + return self.cache.chunk_incref(chunk_id, size, target.stats) chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, wait=False, ro_type=ROBJ_FILE_STREAM) self.cache.repository.async_response(wait=False) self.seen_chunks.add(chunk_entry.id) diff --git a/src/borg/archiver/transfer_cmd.py b/src/borg/archiver/transfer_cmd.py index 1922cf1cf..1ba8ed3c8 100644 --- a/src/borg/archiver/transfer_cmd.py +++ b/src/borg/archiver/transfer_cmd.py @@ -143,7 +143,7 @@ class TransferMixIn: transfer_size += size else: if not dry_run: - chunk_entry = cache.chunk_incref(chunk_id, archive.stats) + chunk_entry = cache.chunk_incref(chunk_id, size, archive.stats) chunks.append(chunk_entry) present_size += size if not dry_run: diff --git a/src/borg/cache.py b/src/borg/cache.py index 5ad9d38c4..5cd3cc497 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -979,11 +979,14 @@ class LocalCache(CacheStatsMixin): assert ro_type is not None if not self.txn_active: self.begin_txn() - if size is None and compress: - size = len(data) # data is still uncompressed + if size is None: + if compress: + size = len(data) # data is still uncompressed + else: + raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also") refcount = self.seen_chunk(id, size) if refcount: - return self.chunk_incref(id, stats) + return self.chunk_incref(id, size, stats) if size is None: raise ValueError("when giving compressed data for a new chunk, the uncompressed size must be given also") cdata = self.repo_objs.format( @@ -1004,17 +1007,21 @@ class LocalCache(CacheStatsMixin): ) return refcount - def chunk_incref(self, id, stats, size=None): + def chunk_incref(self, id, size, stats): + assert isinstance(size, int) and size > 0 if not self.txn_active: self.begin_txn() count, _size = self.chunks.incref(id) - stats.update(_size, False) - return ChunkListEntry(id, _size) + assert size == _size + stats.update(size, False) + return ChunkListEntry(id, size) - def chunk_decref(self, id, stats, wait=True): + def chunk_decref(self, id, size, stats, wait=True): + assert isinstance(size, int) and size > 0 if not self.txn_active: self.begin_txn() - count, size = self.chunks.decref(id) + count, _size = self.chunks.decref(id) + assert size == 1 or size == _size # don't check if caller gave fake size 1 if count == 0: del self.chunks[id] self.repository.delete(id, wait=wait) @@ -1157,13 +1164,14 @@ Chunk index: {0.total_unique_chunks:20d} unknown""" assert ro_type is not None if not self._txn_active: self.begin_txn() - if size is None and compress: - size = len(data) # data is still uncompressed if size is None: - raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also") + if compress: + size = len(data) # data is still uncompressed + else: + raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also") refcount = self.seen_chunk(id, size) if refcount: - return self.chunk_incref(id, stats, size=size) + return self.chunk_incref(id, size, stats) cdata = self.repo_objs.format(id, meta, data, compress=compress, ro_type=ro_type) self.repository.put(id, cdata, wait=wait) self.chunks.add(id, 1, size) @@ -1181,21 +1189,21 @@ Chunk index: {0.total_unique_chunks:20d} unknown""" self.chunks[id] = entry._replace(size=size) return entry.refcount - def chunk_incref(self, id, stats, size=None): + def chunk_incref(self, id, size, stats): + assert isinstance(size, int) and size > 0 if not self._txn_active: self.begin_txn() count, _size = self.chunks.incref(id) - # When _size is 0 and size is not given, then this chunk has not been locally visited yet (seen_chunk with - # size or add_chunk); we can't add references to those (size=0 is invalid) and generally don't try to. - size = _size or size - assert size + assert size == _size stats.update(size, False) return ChunkListEntry(id, size) - def chunk_decref(self, id, stats, wait=True): + def chunk_decref(self, id, size, stats, wait=True): + assert isinstance(size, int) and size > 0 if not self._txn_active: self.begin_txn() - count, size = self.chunks.decref(id) + count, _size = self.chunks.decref(id) + assert size == 1 or size == _size # don't check if caller gave fake size 1 if count == 0: del self.chunks[id] self.repository.delete(id, wait=wait) diff --git a/src/borg/testsuite/cache.py b/src/borg/testsuite/cache.py index 571a483d2..60cb870e3 100644 --- a/src/borg/testsuite/cache.py +++ b/src/borg/testsuite/cache.py @@ -189,7 +189,7 @@ class TestAdHocCache: def test_does_not_delete_existing_chunks(self, repository, cache): assert cache.seen_chunk(H(1)) == ChunkIndex.MAX_VALUE - cache.chunk_decref(H(1), Statistics()) + cache.chunk_decref(H(1), 1, Statistics()) assert repository.get(H(1)) == b"1234" def test_seen_chunk_add_chunk_size(self, cache): @@ -199,7 +199,7 @@ class TestAdHocCache: """E.g. checkpoint archives""" cache.add_chunk(H(5), {}, b"1010", stats=Statistics()) assert cache.seen_chunk(H(5)) == 1 - cache.chunk_decref(H(5), Statistics()) + cache.chunk_decref(H(5), 1, Statistics()) assert not cache.seen_chunk(H(5)) with pytest.raises(Repository.ObjectNotFound): repository.get(H(5)) @@ -220,9 +220,9 @@ class TestAdHocCache: def test_incref_after_add_chunk(self, cache): assert cache.add_chunk(H(3), {}, b"5678", stats=Statistics()) == (H(3), 4) - assert cache.chunk_incref(H(3), Statistics()) == (H(3), 4) + assert cache.chunk_incref(H(3), 4, Statistics()) == (H(3), 4) def test_existing_incref_after_add_chunk(self, cache): """This case occurs with part files, see Archive.chunk_file.""" assert cache.add_chunk(H(1), {}, b"5678", stats=Statistics()) == (H(1), 4) - assert cache.chunk_incref(H(1), Statistics()) == (H(1), 4) + assert cache.chunk_incref(H(1), 4, Statistics()) == (H(1), 4) diff --git a/src/borg/upgrade.py b/src/borg/upgrade.py index a8d17ac41..22a27c18c 100644 --- a/src/borg/upgrade.py +++ b/src/borg/upgrade.py @@ -84,8 +84,8 @@ class UpgraderFrom12To20: chunks, chunks_healthy = self.hlm.retrieve(id=hlid, default=(None, None)) if chunks is not None: item.chunks = chunks - for chunk_id, _ in chunks: - self.cache.chunk_incref(chunk_id, self.archive.stats) + for chunk_id, chunk_size in chunks: + self.cache.chunk_incref(chunk_id, chunk_size, self.archive.stats) if chunks_healthy is not None: item.chunks_healthy = chunks del item.source # not used for hardlinks any more, replaced by hlid