diff --git a/src/borg/archive.py b/src/borg/archive.py index 73d8cf868..04b6afe25 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1749,11 +1749,10 @@ def verify_data(self): ) marker = None while True: - chunk_ids = self.repository.scan(limit=100, marker=marker) + chunk_ids, marker = self.repository.scan(limit=100, marker=marker) if not chunk_ids: break chunks_count_segments += len(chunk_ids) - marker = chunk_ids[-1] chunk_data_iter = self.repository.get_many(chunk_ids) chunk_ids_revd = list(reversed(chunk_ids)) while chunk_ids_revd: diff --git a/src/borg/archiver/debug_cmd.py b/src/borg/archiver/debug_cmd.py index b8f49a68d..57811fc41 100644 --- a/src/borg/archiver/debug_cmd.py +++ b/src/borg/archiver/debug_cmd.py @@ -155,11 +155,12 @@ def decrypt_dump(i, id, cdata, tag=None, segment=None, offset=None): marker = None i = 0 while True: - result = repository.scan(limit=LIST_SCAN_LIMIT, marker=marker) # must use on-disk order scanning here - if not result: + ids, marker = repository.scan( + limit=LIST_SCAN_LIMIT, marker=marker + ) # must use on-disk order scanning here + if not ids: break - marker = result[-1] - for id in result: + for id in ids: cdata = repository.get(id) decrypt_dump(i, id, cdata) i += 1 @@ -207,11 +208,10 @@ def print_finding(info, wanted, data, offset): last_id = None i = 0 while True: - result = repository.scan(limit=LIST_SCAN_LIMIT, marker=marker) # must use on-disk order scanning here - if not result: + ids, marker = repository.scan(limit=LIST_SCAN_LIMIT, marker=marker) # must use on-disk order scanning here + if not ids: break - marker = result[-1] - for id in result: + for id in ids: cdata = repository.get(id) _, data = repo_objs.parse(id, cdata) diff --git a/src/borg/repository.py b/src/borg/repository.py index 27b783c4e..e6c9015fa 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -1163,9 +1163,7 @@ def scan_low_level(self, segment=None, offset=None): When segment or segment+offset is given, limit processing to this location only. """ - for current_segment, filename in self.io.segment_iterator(segment=segment): - if segment is not None and current_segment > segment: - break + for current_segment, filename in self.io.segment_iterator(start_segment=segment, end_segment=segment): try: for tag, key, current_offset, _, data in self.io.iter_objects( segment=current_segment, offset=offset or 0 @@ -1211,9 +1209,14 @@ def list(self, limit=None, marker=None, mask=0, value=0): def scan(self, limit=None, marker=None): """ - list IDs starting from after id - in on-disk order, so that a client + list IDs starting from after - in on-disk order, so that a client fetching data in this order does linear reads and reuses stuff from disk cache. + marker can either be None (default, meaning "start from the beginning") or the object + returned from a previous scan call (meaning "continue scanning where we stopped previously"). + + returns: list of chunk ids, marker + We rely on repository.check() has run already (either now or some time before) and that: - if we are called from a borg check command, self.index is a valid, fresh, in-sync repo index. @@ -1223,14 +1226,15 @@ def scan(self, limit=None, marker=None): """ if limit is not None and limit < 1: raise ValueError("please use limit > 0 or limit = None") + transaction_id = self.get_transaction_id() if not self.index: - transaction_id = self.get_transaction_id() self.index = self.open_index(transaction_id) - at_start = marker is None # smallest valid seg is 0, smallest valid offs is 8 - start_segment, start_offset, _ = (0, 0, 0) if at_start else self.index[marker] - result = [] - for segment, filename in self.io.segment_iterator(start_segment): + start_segment, start_offset = marker if marker is not None else (0, 0) + ids, segment, offset = [], 0, 0 + # we only scan up to end_segment == transaction_id to only scan **committed** chunks, + # avoiding scanning into newly written chunks. + for segment, filename in self.io.segment_iterator(start_segment, transaction_id): obj_iterator = self.io.iter_objects(segment, start_offset, read_data=False) while True: try: @@ -1249,10 +1253,10 @@ def scan(self, limit=None, marker=None): in_index = self.index.get(id) if in_index and (in_index.segment, in_index.offset) == (segment, offset): # we have found an existing and current object - result.append(id) - if len(result) == limit: - return result - return result + ids.append(id) + if len(ids) == limit: + return ids, (segment, offset) + return ids, (segment, offset) def flags(self, id, mask=0xFFFFFFFF, value=None): """ @@ -1392,23 +1396,34 @@ def _close_fd(self, ts_fd): safe_fadvise(fd.fileno(), 0, 0, "DONTNEED") fd.close() - def segment_iterator(self, segment=None, reverse=False): - if segment is None: - segment = 0 if not reverse else 2**32 - 1 + def segment_iterator(self, start_segment=None, end_segment=None, reverse=False): + if start_segment is None: + start_segment = 0 if not reverse else 2**32 - 1 + if end_segment is None: + end_segment = 2**32 - 1 if not reverse else 0 data_path = os.path.join(self.path, "data") - start_segment_dir = segment // self.segments_per_dir + start_segment_dir = start_segment // self.segments_per_dir + end_segment_dir = end_segment // self.segments_per_dir dirs = os.listdir(data_path) if not reverse: - dirs = [dir for dir in dirs if dir.isdigit() and int(dir) >= start_segment_dir] + dirs = [dir for dir in dirs if dir.isdigit() and start_segment_dir <= int(dir) <= end_segment_dir] else: - dirs = [dir for dir in dirs if dir.isdigit() and int(dir) <= start_segment_dir] + dirs = [dir for dir in dirs if dir.isdigit() and start_segment_dir >= int(dir) >= end_segment_dir] dirs = sorted(dirs, key=int, reverse=reverse) for dir in dirs: filenames = os.listdir(os.path.join(data_path, dir)) if not reverse: - filenames = [filename for filename in filenames if filename.isdigit() and int(filename) >= segment] + filenames = [ + filename + for filename in filenames + if filename.isdigit() and start_segment <= int(filename) <= end_segment + ] else: - filenames = [filename for filename in filenames if filename.isdigit() and int(filename) <= segment] + filenames = [ + filename + for filename in filenames + if filename.isdigit() and start_segment >= int(filename) >= end_segment + ] filenames = sorted(filenames, key=int, reverse=reverse) for filename in filenames: # Note: Do not filter out logically deleted segments (see "File system interaction" above), diff --git a/src/borg/testsuite/repository.py b/src/borg/testsuite/repository.py index d88ca6351..2b6440d0e 100644 --- a/src/borg/testsuite/repository.py +++ b/src/borg/testsuite/repository.py @@ -189,19 +189,48 @@ def test_scan(self): for x in range(100): self.repository.put(H(x), fchunk(b"SOMEDATA")) self.repository.commit(compact=False) - all = self.repository.scan() + all, _ = self.repository.scan() assert len(all) == 100 - first_half = self.repository.scan(limit=50) + first_half, marker = self.repository.scan(limit=50) assert len(first_half) == 50 assert first_half == all[:50] - second_half = self.repository.scan(marker=first_half[-1]) + second_half, _ = self.repository.scan(marker=marker) assert len(second_half) == 50 assert second_half == all[50:] - assert len(self.repository.scan(limit=50)) == 50 # check result order == on-disk order (which is hash order) for x in range(100): assert all[x] == H(x) + def test_scan_modify(self): + for x in range(100): + self.repository.put(H(x), fchunk(b"ORIGINAL")) + self.repository.commit(compact=False) + # now we scan, read and modify chunks at the same time + count = 0 + ids, _ = self.repository.scan() + for id in ids: + # scan results are in same order as we put the chunks into the repo (into the segment file) + assert id == H(count) + chunk = self.repository.get(id) + # check that we **only** get data that was committed when we started scanning + # and that we do not run into the new data we put into the repo. + assert pdchunk(chunk) == b"ORIGINAL" + count += 1 + self.repository.put(id, fchunk(b"MODIFIED")) + assert count == 100 + self.repository.commit() + + # now we have committed all the modified chunks, and **only** must get the modified ones. + count = 0 + ids, _ = self.repository.scan() + for id in ids: + # scan results are in same order as we put the chunks into the repo (into the segment file) + assert id == H(count) + chunk = self.repository.get(id) + assert pdchunk(chunk) == b"MODIFIED" + count += 1 + assert count == 100 + def test_max_data_size(self): max_data = b"x" * (MAX_DATA_SIZE - RepoObj.meta_len_hdr.size) self.repository.put(H(0), fchunk(max_data))