diff --git a/src/borg/archive.py b/src/borg/archive.py index 16ab75e31..a081261bf 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1041,12 +1041,18 @@ class ArchiveChecker: def verify_data(self): logger.info('Starting cryptographic data integrity verification...') - count = len(self.chunks) + chunks_count_index = len(self.chunks) + chunks_count_segments = 0 errors = 0 defect_chunks = [] - pi = ProgressIndicatorPercent(total=count, msg="Verifying data %6.2f%%", step=0.01) - for chunk_infos in chunkit(self.chunks.iteritems(), 100): - chunk_ids = [chunk_id for chunk_id, _ in chunk_infos] + pi = ProgressIndicatorPercent(total=chunks_count_index, msg="Verifying data %6.2f%%", step=0.01) + marker = None + while True: + chunk_ids = self.repository.scan(limit=100, marker=marker) + if not chunk_ids: + break + chunks_count_segments += len(chunk_ids) + marker = chunk_ids[-1] chunk_data_iter = self.repository.get_many(chunk_ids) chunk_ids_revd = list(reversed(chunk_ids)) while chunk_ids_revd: @@ -1074,6 +1080,10 @@ class ArchiveChecker: logger.error('chunk %s, integrity error: %s', bin_to_hex(chunk_id), integrity_error) defect_chunks.append(chunk_id) pi.finish() + if chunks_count_index != chunks_count_segments: + logger.error('Repo/Chunks index object count vs. segment files object count mismatch.') + logger.error('Repo/Chunks index: %d objects != segment files: %d objects', + chunks_count_index, chunks_count_segments) if defect_chunks: if self.repair: # if we kill the defect chunk here, subsequent actions within this "borg check" @@ -1106,7 +1116,8 @@ class ArchiveChecker: for defect_chunk in defect_chunks: logger.debug('chunk %s is defect.', bin_to_hex(defect_chunk)) log = logger.error if errors else logger.info - log('Finished cryptographic data integrity verification, verified %d chunks with %d integrity errors.', count, errors) + log('Finished cryptographic data integrity verification, verified %d chunks with %d integrity errors.', + chunks_count_segments, errors) def rebuild_manifest(self): """Rebuild the manifest object if it is missing diff --git a/src/borg/remote.py b/src/borg/remote.py index 294bd40cf..d48fa9495 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -62,6 +62,7 @@ class RepositoryServer: # pragma: no cover 'destroy', 'get', 'list', + 'scan', 'negotiate', 'open', 'put', @@ -467,6 +468,9 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. def list(self, limit=None, marker=None): return self.call('list', limit, marker) + def scan(self, limit=None, marker=None): + return self.call('scan', limit, marker) + def get(self, id_): for resp in self.get_many([id_]): return resp diff --git a/src/borg/repository.py b/src/borg/repository.py index 095a9ba7f..e0f006c20 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -2,7 +2,7 @@ import errno import os import shutil import struct -from binascii import unhexlify +from binascii import hexlify, unhexlify from collections import defaultdict from configparser import ConfigParser from datetime import datetime @@ -750,10 +750,53 @@ class Repository: return id in self.index def list(self, limit=None, marker=None): + """ + list IDs starting from after id - in index (pseudo-random) order. + """ if not self.index: self.index = self.open_index(self.get_transaction_id()) return [id_ for id_, _ in islice(self.index.iteritems(marker=marker), limit)] + def scan(self, limit=None, marker=None): + """ + list IDs starting from after id - in on-disk order, so that a client + fetching data in this order does linear reads and reuses stuff from disk cache. + + We rely on repository.check() has run already (either now or some time before) and that: + - if we are called from a borg check command, self.index is a valid, fresh, in-sync repo index. + - if we are called from elsewhere, either self.index or the on-disk index is valid and in-sync. + - the repository segments are valid (no CRC errors). + if we encounter CRC errors in segment entry headers, rest of segment is skipped. + """ + if limit is not None and limit < 1: + raise ValueError('please use limit > 0 or limit = None') + if not self.index: + transaction_id = self.get_transaction_id() + self.index = self.open_index(transaction_id) + at_start = marker is None + # smallest valid seg is 0, smallest valid offs is 8 + marker_segment, marker_offset = (0, 0) if at_start else self.index[marker] + result = [] + for segment, filename in self.io.segment_iterator(): + if segment < marker_segment: + continue + obj_iterator = self.io.iter_objects(segment, read_data=False, include_data=False) + while True: + try: + tag, id, offset, size = next(obj_iterator) + except (StopIteration, IntegrityError): + # either end-of-segment or an error - we can not seek to objects at + # higher offsets than one that has an error in the header fields + break + if segment == marker_segment and offset <= marker_offset: + continue + if tag == TAG_PUT and (segment, offset) == self.index.get(id): + # we have found an existing and current object + result.append(id) + if len(result) == limit: + return result + return result + def get(self, id_): if not self.index: self.index = self.open_index(self.get_transaction_id()) diff --git a/src/borg/testsuite/repository.py b/src/borg/testsuite/repository.py index 90c46f4d7..dc6f656be 100644 --- a/src/borg/testsuite/repository.py +++ b/src/borg/testsuite/repository.py @@ -133,6 +133,7 @@ class RepositoryTestCase(RepositoryTestCaseBase): def test_list(self): for x in range(100): self.repository.put(H(x), b'SOMEDATA') + self.repository.commit() all = self.repository.list() self.assert_equal(len(all), 100) first_half = self.repository.list(limit=50) @@ -143,6 +144,23 @@ class RepositoryTestCase(RepositoryTestCaseBase): self.assert_equal(second_half, all[50:]) self.assert_equal(len(self.repository.list(limit=50)), 50) + def test_scan(self): + for x in range(100): + self.repository.put(H(x), b'SOMEDATA') + self.repository.commit() + all = self.repository.scan() + assert len(all) == 100 + first_half = self.repository.scan(limit=50) + assert len(first_half) == 50 + assert first_half == all[:50] + second_half = self.repository.scan(marker=first_half[-1]) + assert len(second_half) == 50 + assert second_half == all[50:] + assert len(self.repository.scan(limit=50)) == 50 + # check result order == on-disk order (which is hash order) + for x in range(100): + assert all[x] == H(x) + def test_max_data_size(self): max_data = b'x' * MAX_DATA_SIZE self.repository.put(H(0), max_data)