diff --git a/src/borg/archiver/compact_cmd.py b/src/borg/archiver/compact_cmd.py index 8fa37e90c..8f529d544 100644 --- a/src/borg/archiver/compact_cmd.py +++ b/src/borg/archiver/compact_cmd.py @@ -10,7 +10,7 @@ from ..helpers import ProgressIndicatorPercent from ..manifest import Manifest from ..remote import RemoteRepository -from ..repository import Repository +from ..repository import Repository, repo_lister from ..logger import create_logger @@ -49,18 +49,12 @@ def garbage_collect(self): def get_repository_chunks(self) -> ChunkIndex: """Build a dict id -> size of all chunks present in the repository""" chunks = ChunkIndex() - marker = None - while True: - result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker) - if not result: - break - marker = result[-1][0] - for id, stored_size in result: - # we add this id to the chunks index, using refcount == 0, because - # we do not know yet whether it is actually referenced from some archives. - # we "abuse" the size field here. usually there is the plaintext size, - # but we use it for the size of the stored object here. - chunks[id] = ChunkIndexEntry(refcount=0, size=stored_size) + for id, stored_size in repo_lister(self.repository, limit=LIST_SCAN_LIMIT): + # we add this id to the chunks index, using refcount == 0, because + # we do not know yet whether it is actually referenced from some archives. + # we "abuse" the size field here. usually there is the plaintext size, + # but we use it for the size of the stored object here. + chunks[id] = ChunkIndexEntry(refcount=0, size=stored_size) return chunks def save_chunk_index(self): diff --git a/src/borg/archiver/debug_cmd.py b/src/borg/archiver/debug_cmd.py index 874ef58df..b3ddb6bfc 100644 --- a/src/borg/archiver/debug_cmd.py +++ b/src/borg/archiver/debug_cmd.py @@ -15,7 +15,7 @@ from ..helpers import CommandError, RTError from ..manifest import Manifest from ..platform import get_process_id -from ..repository import Repository, LIST_SCAN_LIMIT +from ..repository import Repository, LIST_SCAN_LIMIT, repo_lister from ..repoobj import RepoObj from ._common import with_repository, Highlander @@ -130,15 +130,9 @@ def decrypt_dump(id, cdata): cdata = repository.get(id) key = key_factory(repository, cdata) repo_objs = RepoObj(key) - marker = None - while True: - result = repository.list(limit=LIST_SCAN_LIMIT, marker=marker) - if not result: - break - marker = result[-1][0] - for id, stored_size in result: - cdata = repository.get(id) - decrypt_dump(id, cdata) + for id, stored_size in repo_lister(repository, limit=LIST_SCAN_LIMIT): + cdata = repository.get(id) + decrypt_dump(id, cdata) print("Done.") @with_repository(manifest=False) @@ -177,38 +171,32 @@ def print_finding(info, wanted, data, offset): key = key_factory(repository, cdata) repo_objs = RepoObj(key) - marker = None last_data = b"" last_id = None i = 0 - while True: - result = repository.list(limit=LIST_SCAN_LIMIT, marker=marker) - if not result: - break - marker = result[-1][0] - for id, stored_size in result: - cdata = repository.get(id) - _, data = repo_objs.parse(id, cdata, ro_type=ROBJ_DONTCARE) + for id, stored_size in repo_lister(repository, limit=LIST_SCAN_LIMIT): + cdata = repository.get(id) + _, data = repo_objs.parse(id, cdata, ro_type=ROBJ_DONTCARE) - # try to locate wanted sequence crossing the border of last_data and data - boundary_data = last_data[-(len(wanted) - 1) :] + data[: len(wanted) - 1] - if wanted in boundary_data: - boundary_data = last_data[-(len(wanted) - 1 + context) :] + data[: len(wanted) - 1 + context] - offset = boundary_data.find(wanted) - info = "%d %s | %s" % (i, last_id.hex(), id.hex()) - print_finding(info, wanted, boundary_data, offset) + # try to locate wanted sequence crossing the border of last_data and data + boundary_data = last_data[-(len(wanted) - 1) :] + data[: len(wanted) - 1] + if wanted in boundary_data: + boundary_data = last_data[-(len(wanted) - 1 + context) :] + data[: len(wanted) - 1 + context] + offset = boundary_data.find(wanted) + info = "%d %s | %s" % (i, last_id.hex(), id.hex()) + print_finding(info, wanted, boundary_data, offset) - # try to locate wanted sequence in data - count = data.count(wanted) - if count: - offset = data.find(wanted) # only determine first occurrence's offset - info = "%d %s #%d" % (i, id.hex(), count) - print_finding(info, wanted, data, offset) + # try to locate wanted sequence in data + count = data.count(wanted) + if count: + offset = data.find(wanted) # only determine first occurrence's offset + info = "%d %s #%d" % (i, id.hex(), count) + print_finding(info, wanted, data, offset) - last_id, last_data = id, data - i += 1 - if i % 10000 == 0: - print("%d objects processed." % i) + last_id, last_data = id, data + i += 1 + if i % 10000 == 0: + print("%d objects processed." % i) print("Done.") @with_repository(manifest=False) diff --git a/src/borg/cache.py b/src/borg/cache.py index 28d4951ae..70101fcea 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -31,7 +31,7 @@ from .manifest import Manifest from .platform import SaveFile from .remote import RemoteRepository -from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound +from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound, repo_lister # chunks is a list of ChunkListEntry FileCacheEntry = namedtuple("FileCacheEntry", "age inode size ctime mtime chunks") @@ -680,22 +680,14 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi logger.debug("querying the chunk IDs list from the repo...") chunks = ChunkIndex() t0 = perf_counter() - num_requests = 0 num_chunks = 0 - marker = None - while True: - result = repository.list(limit=LIST_SCAN_LIMIT, marker=marker) - num_requests += 1 - if not result: - break - marker = result[-1][0] - # The repo says it has these chunks, so we assume they are referenced chunks. - # We do not care for refcounting anymore, so we just set refcount = MAX_VALUE. - # We do not know the plaintext size (!= stored_size), thus we set size = 0. - init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0) - for id, stored_size in result: - num_chunks += 1 - chunks[id] = init_entry + # The repo says it has these chunks, so we assume they are referenced chunks. + # We do not care for refcounting anymore, so we just set refcount = MAX_VALUE. + # We do not know the plaintext size (!= stored_size), thus we set size = 0. + init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0) + for id, stored_size in repo_lister(repository, limit=LIST_SCAN_LIMIT): + num_chunks += 1 + chunks[id] = init_entry # Cache does not contain the manifest. if not isinstance(repository, (Repository, RemoteRepository)): del chunks[Manifest.MANIFEST_ID] @@ -703,7 +695,7 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi # Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes. # Protocol overhead is neglected in this calculation. speed = format_file_size(num_chunks * 34 / duration) - logger.debug(f"queried {num_chunks} chunk IDs in {duration} s ({num_requests} requests), ~{speed}/s") + logger.debug(f"queried {num_chunks} chunk IDs in {duration} s, ~{speed}/s") if cache_immediately: # immediately update cache/chunks, so we only rarely have to do it the slow way: write_chunkindex_to_repo_cache(repository, chunks, compact=False, clear=False, force_write=True) diff --git a/src/borg/repository.py b/src/borg/repository.py index 66db295c2..c293358b0 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -18,6 +18,18 @@ logger = create_logger(__name__) +def repo_lister(repository, *, limit=None): + marker = None + finished = False + while not finished: + result = repository.list(limit=limit, marker=marker) + finished = (len(result) < limit) if limit is not None else (len(result) == 0) + if not finished: + marker = result[-1][0] + for id, stored_size in result: + yield id, stored_size + + class Repository: """borgstore based key value store""" diff --git a/src/borg/testsuite/archiver/repo_compress_cmd.py b/src/borg/testsuite/archiver/repo_compress_cmd.py index 9e29e8927..1d06a14d5 100644 --- a/src/borg/testsuite/archiver/repo_compress_cmd.py +++ b/src/borg/testsuite/archiver/repo_compress_cmd.py @@ -1,7 +1,7 @@ import os from ...constants import * # NOQA -from ...repository import Repository +from ...repository import Repository, repo_lister from ...manifest import Manifest from ...compress import ZSTD, ZLIB, LZ4, CNONE from ...helpers import bin_to_hex @@ -15,30 +15,24 @@ def check_compression(ctype, clevel, olevel): repository = Repository(archiver.repository_path, exclusive=True) with repository: manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK) - marker = None - while True: - result = repository.list(limit=LIST_SCAN_LIMIT, marker=marker) - if not result: - break - marker = result[-1][0] - for id, _ in result: - chunk = repository.get(id, read_data=True) - meta, data = manifest.repo_objs.parse( - id, chunk, ro_type=ROBJ_DONTCARE - ) # will also decompress according to metadata - m_olevel = meta.get("olevel", -1) - m_psize = meta.get("psize", -1) - print(bin_to_hex(id), meta["ctype"], meta["clevel"], meta["csize"], meta["size"], m_olevel, m_psize) - # this is not as easy as one thinks due to the DecidingCompressor choosing the smallest of - # (desired compressed, lz4 compressed, not compressed). - assert meta["ctype"] in (ctype, LZ4.ID, CNONE.ID) - assert meta["clevel"] in (clevel, 255) # LZ4 and CNONE has level 255 - if olevel != -1: # we expect obfuscation - assert "psize" in meta - assert m_olevel == olevel - else: - assert "psize" not in meta - assert "olevel" not in meta + for id, _ in repo_lister(repository, limit=LIST_SCAN_LIMIT): + chunk = repository.get(id, read_data=True) + meta, data = manifest.repo_objs.parse( + id, chunk, ro_type=ROBJ_DONTCARE + ) # will also decompress according to metadata + m_olevel = meta.get("olevel", -1) + m_psize = meta.get("psize", -1) + print(bin_to_hex(id), meta["ctype"], meta["clevel"], meta["csize"], meta["size"], m_olevel, m_psize) + # this is not as easy as one thinks due to the DecidingCompressor choosing the smallest of + # (desired compressed, lz4 compressed, not compressed). + assert meta["ctype"] in (ctype, LZ4.ID, CNONE.ID) + assert meta["clevel"] in (clevel, 255) # LZ4 and CNONE has level 255 + if olevel != -1: # we expect obfuscation + assert "psize" in meta + assert m_olevel == olevel + else: + assert "psize" not in meta + assert "olevel" not in meta create_regular_file(archiver.input_path, "file1", size=1024 * 10) create_regular_file(archiver.input_path, "file2", contents=os.urandom(1024 * 10))