mirror of
https://github.com/borgbackup/borg.git
synced 2024-12-25 01:06:50 +00:00
Merge pull request #7038 from ThomasWaldmann/fix-scan
Fix repository.scan
This commit is contained in:
commit
c0e674ce61
4 changed files with 78 additions and 35 deletions
|
@ -1749,11 +1749,10 @@ def verify_data(self):
|
|||
)
|
||||
marker = None
|
||||
while True:
|
||||
chunk_ids = self.repository.scan(limit=100, marker=marker)
|
||||
chunk_ids, marker = self.repository.scan(limit=100, marker=marker)
|
||||
if not chunk_ids:
|
||||
break
|
||||
chunks_count_segments += len(chunk_ids)
|
||||
marker = chunk_ids[-1]
|
||||
chunk_data_iter = self.repository.get_many(chunk_ids)
|
||||
chunk_ids_revd = list(reversed(chunk_ids))
|
||||
while chunk_ids_revd:
|
||||
|
|
|
@ -155,11 +155,12 @@ def decrypt_dump(i, id, cdata, tag=None, segment=None, offset=None):
|
|||
marker = None
|
||||
i = 0
|
||||
while True:
|
||||
result = repository.scan(limit=LIST_SCAN_LIMIT, marker=marker) # must use on-disk order scanning here
|
||||
if not result:
|
||||
ids, marker = repository.scan(
|
||||
limit=LIST_SCAN_LIMIT, marker=marker
|
||||
) # must use on-disk order scanning here
|
||||
if not ids:
|
||||
break
|
||||
marker = result[-1]
|
||||
for id in result:
|
||||
for id in ids:
|
||||
cdata = repository.get(id)
|
||||
decrypt_dump(i, id, cdata)
|
||||
i += 1
|
||||
|
@ -207,11 +208,10 @@ def print_finding(info, wanted, data, offset):
|
|||
last_id = None
|
||||
i = 0
|
||||
while True:
|
||||
result = repository.scan(limit=LIST_SCAN_LIMIT, marker=marker) # must use on-disk order scanning here
|
||||
if not result:
|
||||
ids, marker = repository.scan(limit=LIST_SCAN_LIMIT, marker=marker) # must use on-disk order scanning here
|
||||
if not ids:
|
||||
break
|
||||
marker = result[-1]
|
||||
for id in result:
|
||||
for id in ids:
|
||||
cdata = repository.get(id)
|
||||
_, data = repo_objs.parse(id, cdata)
|
||||
|
||||
|
|
|
@ -1163,9 +1163,7 @@ def scan_low_level(self, segment=None, offset=None):
|
|||
|
||||
When segment or segment+offset is given, limit processing to this location only.
|
||||
"""
|
||||
for current_segment, filename in self.io.segment_iterator(segment=segment):
|
||||
if segment is not None and current_segment > segment:
|
||||
break
|
||||
for current_segment, filename in self.io.segment_iterator(start_segment=segment, end_segment=segment):
|
||||
try:
|
||||
for tag, key, current_offset, _, data in self.io.iter_objects(
|
||||
segment=current_segment, offset=offset or 0
|
||||
|
@ -1211,9 +1209,14 @@ def list(self, limit=None, marker=None, mask=0, value=0):
|
|||
|
||||
def scan(self, limit=None, marker=None):
|
||||
"""
|
||||
list <limit> IDs starting from after id <marker> - in on-disk order, so that a client
|
||||
list <limit> IDs starting from after <marker> - in on-disk order, so that a client
|
||||
fetching data in this order does linear reads and reuses stuff from disk cache.
|
||||
|
||||
marker can either be None (default, meaning "start from the beginning") or the object
|
||||
returned from a previous scan call (meaning "continue scanning where we stopped previously").
|
||||
|
||||
returns: list of chunk ids, marker
|
||||
|
||||
We rely on repository.check() has run already (either now or some time before) and that:
|
||||
|
||||
- if we are called from a borg check command, self.index is a valid, fresh, in-sync repo index.
|
||||
|
@ -1223,14 +1226,15 @@ def scan(self, limit=None, marker=None):
|
|||
"""
|
||||
if limit is not None and limit < 1:
|
||||
raise ValueError("please use limit > 0 or limit = None")
|
||||
transaction_id = self.get_transaction_id()
|
||||
if not self.index:
|
||||
transaction_id = self.get_transaction_id()
|
||||
self.index = self.open_index(transaction_id)
|
||||
at_start = marker is None
|
||||
# smallest valid seg is <uint32> 0, smallest valid offs is <uint32> 8
|
||||
start_segment, start_offset, _ = (0, 0, 0) if at_start else self.index[marker]
|
||||
result = []
|
||||
for segment, filename in self.io.segment_iterator(start_segment):
|
||||
start_segment, start_offset = marker if marker is not None else (0, 0)
|
||||
ids, segment, offset = [], 0, 0
|
||||
# we only scan up to end_segment == transaction_id to only scan **committed** chunks,
|
||||
# avoiding scanning into newly written chunks.
|
||||
for segment, filename in self.io.segment_iterator(start_segment, transaction_id):
|
||||
obj_iterator = self.io.iter_objects(segment, start_offset, read_data=False)
|
||||
while True:
|
||||
try:
|
||||
|
@ -1249,10 +1253,10 @@ def scan(self, limit=None, marker=None):
|
|||
in_index = self.index.get(id)
|
||||
if in_index and (in_index.segment, in_index.offset) == (segment, offset):
|
||||
# we have found an existing and current object
|
||||
result.append(id)
|
||||
if len(result) == limit:
|
||||
return result
|
||||
return result
|
||||
ids.append(id)
|
||||
if len(ids) == limit:
|
||||
return ids, (segment, offset)
|
||||
return ids, (segment, offset)
|
||||
|
||||
def flags(self, id, mask=0xFFFFFFFF, value=None):
|
||||
"""
|
||||
|
@ -1392,23 +1396,34 @@ def _close_fd(self, ts_fd):
|
|||
safe_fadvise(fd.fileno(), 0, 0, "DONTNEED")
|
||||
fd.close()
|
||||
|
||||
def segment_iterator(self, segment=None, reverse=False):
|
||||
if segment is None:
|
||||
segment = 0 if not reverse else 2**32 - 1
|
||||
def segment_iterator(self, start_segment=None, end_segment=None, reverse=False):
|
||||
if start_segment is None:
|
||||
start_segment = 0 if not reverse else 2**32 - 1
|
||||
if end_segment is None:
|
||||
end_segment = 2**32 - 1 if not reverse else 0
|
||||
data_path = os.path.join(self.path, "data")
|
||||
start_segment_dir = segment // self.segments_per_dir
|
||||
start_segment_dir = start_segment // self.segments_per_dir
|
||||
end_segment_dir = end_segment // self.segments_per_dir
|
||||
dirs = os.listdir(data_path)
|
||||
if not reverse:
|
||||
dirs = [dir for dir in dirs if dir.isdigit() and int(dir) >= start_segment_dir]
|
||||
dirs = [dir for dir in dirs if dir.isdigit() and start_segment_dir <= int(dir) <= end_segment_dir]
|
||||
else:
|
||||
dirs = [dir for dir in dirs if dir.isdigit() and int(dir) <= start_segment_dir]
|
||||
dirs = [dir for dir in dirs if dir.isdigit() and start_segment_dir >= int(dir) >= end_segment_dir]
|
||||
dirs = sorted(dirs, key=int, reverse=reverse)
|
||||
for dir in dirs:
|
||||
filenames = os.listdir(os.path.join(data_path, dir))
|
||||
if not reverse:
|
||||
filenames = [filename for filename in filenames if filename.isdigit() and int(filename) >= segment]
|
||||
filenames = [
|
||||
filename
|
||||
for filename in filenames
|
||||
if filename.isdigit() and start_segment <= int(filename) <= end_segment
|
||||
]
|
||||
else:
|
||||
filenames = [filename for filename in filenames if filename.isdigit() and int(filename) <= segment]
|
||||
filenames = [
|
||||
filename
|
||||
for filename in filenames
|
||||
if filename.isdigit() and start_segment >= int(filename) >= end_segment
|
||||
]
|
||||
filenames = sorted(filenames, key=int, reverse=reverse)
|
||||
for filename in filenames:
|
||||
# Note: Do not filter out logically deleted segments (see "File system interaction" above),
|
||||
|
|
|
@ -189,19 +189,48 @@ def test_scan(self):
|
|||
for x in range(100):
|
||||
self.repository.put(H(x), fchunk(b"SOMEDATA"))
|
||||
self.repository.commit(compact=False)
|
||||
all = self.repository.scan()
|
||||
all, _ = self.repository.scan()
|
||||
assert len(all) == 100
|
||||
first_half = self.repository.scan(limit=50)
|
||||
first_half, marker = self.repository.scan(limit=50)
|
||||
assert len(first_half) == 50
|
||||
assert first_half == all[:50]
|
||||
second_half = self.repository.scan(marker=first_half[-1])
|
||||
second_half, _ = self.repository.scan(marker=marker)
|
||||
assert len(second_half) == 50
|
||||
assert second_half == all[50:]
|
||||
assert len(self.repository.scan(limit=50)) == 50
|
||||
# check result order == on-disk order (which is hash order)
|
||||
for x in range(100):
|
||||
assert all[x] == H(x)
|
||||
|
||||
def test_scan_modify(self):
|
||||
for x in range(100):
|
||||
self.repository.put(H(x), fchunk(b"ORIGINAL"))
|
||||
self.repository.commit(compact=False)
|
||||
# now we scan, read and modify chunks at the same time
|
||||
count = 0
|
||||
ids, _ = self.repository.scan()
|
||||
for id in ids:
|
||||
# scan results are in same order as we put the chunks into the repo (into the segment file)
|
||||
assert id == H(count)
|
||||
chunk = self.repository.get(id)
|
||||
# check that we **only** get data that was committed when we started scanning
|
||||
# and that we do not run into the new data we put into the repo.
|
||||
assert pdchunk(chunk) == b"ORIGINAL"
|
||||
count += 1
|
||||
self.repository.put(id, fchunk(b"MODIFIED"))
|
||||
assert count == 100
|
||||
self.repository.commit()
|
||||
|
||||
# now we have committed all the modified chunks, and **only** must get the modified ones.
|
||||
count = 0
|
||||
ids, _ = self.repository.scan()
|
||||
for id in ids:
|
||||
# scan results are in same order as we put the chunks into the repo (into the segment file)
|
||||
assert id == H(count)
|
||||
chunk = self.repository.get(id)
|
||||
assert pdchunk(chunk) == b"MODIFIED"
|
||||
count += 1
|
||||
assert count == 100
|
||||
|
||||
def test_max_data_size(self):
|
||||
max_data = b"x" * (MAX_DATA_SIZE - RepoObj.meta_len_hdr.size)
|
||||
self.repository.put(H(0), fchunk(max_data))
|
||||
|
|
Loading…
Reference in a new issue