diff --git a/src/borg/archive.py b/src/borg/archive.py index 1555536d5..194814687 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -19,7 +19,7 @@ logger = create_logger() from . import xattr -from .chunker import get_chunker, max_chunk_size +from .chunker import get_chunker, max_chunk_size, Chunk from .cache import ChunkListEntry from .crypto.key import key_factory from .compress import Compressor, CompressionSpec @@ -43,6 +43,7 @@ from .helpers import sig_int from .patterns import PathPrefixPattern, FnmatchPattern, IECommand from .item import Item, ArchiveItem, ItemDiff +from .lrucache import LRUCache from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname from .remote import cache_if_remote from .repository import Repository, LIST_SCAN_LIMIT @@ -336,7 +337,9 @@ def flush(self, flush=False): self.buffer.seek(0) # The chunker returns a memoryview to its internal buffer, # thus a copy is needed before resuming the chunker iterator. - chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer)) + # note: this is the items metadata stream chunker, we only will get CH_DATA allocation here, + # thus chunk.data will always be data bytes. + chunks = list(bytes(chunk.data) for chunk in self.chunker.chunkify(self.buffer)) self.buffer.seek(0) self.buffer.truncate(0) # Leave the last partial chunk in the buffer unless flush is True @@ -1102,6 +1105,8 @@ def __init__(self, *, key, cache, self.checkpoint_interval = checkpoint_interval self.last_checkpoint = time.monotonic() self.rechunkify = rechunkify + self.zero_chunk_ids = LRUCache(10, dispose=lambda _: None) # length of all-zero chunk -> chunk_id + self.zeros = memoryview(bytes(MAX_DATA_SIZE)) def write_part_file(self, item, from_chunk, number): item = Item(internal_dict=item.as_dict()) @@ -1133,8 +1138,22 @@ def maybe_checkpoint(self, item, from_chunk, part_number, forced=False): def process_file_chunks(self, item, cache, stats, show_progress, chunk_iter, chunk_processor=None): if not chunk_processor: - def chunk_processor(data): - chunk_entry = cache.add_chunk(self.key.id_hash(data), data, stats, wait=False) + def chunk_processor(chunk): + allocation = chunk.meta['allocation'] + if allocation == CH_DATA: + data = chunk.data + chunk_id = self.key.id_hash(data) + elif allocation == CH_HOLE: + size = chunk.meta['size'] + data = self.zeros[:size] + try: + chunk_id = self.zero_chunk_ids[size] + except KeyError: + chunk_id = self.key.id_hash(data) + self.zero_chunk_ids[size] = chunk_id + else: + raise ValueError('unexpected allocation type') + chunk_entry = cache.add_chunk(chunk_id, data, stats, wait=False) self.cache.repository.async_response(wait=False) return chunk_entry @@ -1145,8 +1164,8 @@ def chunk_processor(data): del item.chunks_healthy from_chunk = 0 part_number = 1 - for data in chunk_iter: - item.chunks.append(chunk_processor(data)) + for chunk in chunk_iter: + item.chunks.append(chunk_processor(chunk)) if show_progress: stats.show_progress(item=item, dt=0.2) from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=False) @@ -1982,7 +2001,10 @@ def process_chunks(self, archive, target, item): chunk_processor = partial(self.chunk_processor, target) target.process_file_chunks(item, self.cache, target.stats, self.progress, chunk_iterator, chunk_processor) - def chunk_processor(self, target, data): + def chunk_processor(self, target, chunk): + # as this is recreate (we do not read from the fs), we never have holes here + assert chunk.meta['allocation'] == CH_DATA + data = chunk.data chunk_id = self.key.id_hash(data) if chunk_id in self.seen_chunks: return self.cache.chunk_incref(chunk_id, target.stats) @@ -2007,7 +2029,7 @@ def iter_chunks(self, archive, target, chunks): yield from target.chunker.chunkify(file) else: for chunk in chunk_iterator: - yield chunk + yield Chunk(chunk, size=len(chunk), allocation=CH_DATA) def save(self, archive, target, comment=None, replace_original=True): if self.dry_run: