repository.scan: only iterate over committed chunks

otherwise, if we scan+get+put (e.g. if we read/modify/write chunks to
recompress them), it would scan past the last commit and run into the
newly written chunks (and potentially never terminate).
This commit is contained in:
Thomas Waldmann 2022-09-19 11:05:07 +02:00
parent 7e3b6752cb
commit ce08f92090
2 changed files with 42 additions and 14 deletions

View File

@ -1163,9 +1163,7 @@ class Repository:
When segment or segment+offset is given, limit processing to this location only.
"""
for current_segment, filename in self.io.segment_iterator(segment=segment):
if segment is not None and current_segment > segment:
break
for current_segment, filename in self.io.segment_iterator(start_segment=segment, end_segment=segment):
try:
for tag, key, current_offset, _, data in self.io.iter_objects(
segment=current_segment, offset=offset or 0
@ -1223,14 +1221,14 @@ class Repository:
"""
if limit is not None and limit < 1:
raise ValueError("please use limit > 0 or limit = None")
transaction_id = self.get_transaction_id()
if not self.index:
transaction_id = self.get_transaction_id()
self.index = self.open_index(transaction_id)
at_start = marker is None
# smallest valid seg is <uint32> 0, smallest valid offs is <uint32> 8
start_segment, start_offset, _ = (0, 0, 0) if at_start else self.index[marker]
result = []
for segment, filename in self.io.segment_iterator(start_segment):
for segment, filename in self.io.segment_iterator(start_segment, transaction_id):
obj_iterator = self.io.iter_objects(segment, start_offset, read_data=False)
while True:
try:
@ -1392,23 +1390,26 @@ class LoggedIO:
safe_fadvise(fd.fileno(), 0, 0, "DONTNEED")
fd.close()
def segment_iterator(self, segment=None, reverse=False):
if segment is None:
segment = 0 if not reverse else 2**32 - 1
def segment_iterator(self, start_segment=None, end_segment=None, reverse=False):
if start_segment is None:
start_segment = 0 if not reverse else 2**32 - 1
if end_segment is None:
end_segment = 2**32 - 1 if not reverse else 0
data_path = os.path.join(self.path, "data")
start_segment_dir = segment // self.segments_per_dir
start_segment_dir = start_segment // self.segments_per_dir
end_segment_dir = end_segment // self.segments_per_dir
dirs = os.listdir(data_path)
if not reverse:
dirs = [dir for dir in dirs if dir.isdigit() and int(dir) >= start_segment_dir]
dirs = [dir for dir in dirs if dir.isdigit() and start_segment_dir <= int(dir) <= end_segment_dir]
else:
dirs = [dir for dir in dirs if dir.isdigit() and int(dir) <= start_segment_dir]
dirs = [dir for dir in dirs if dir.isdigit() and start_segment_dir >= int(dir) >= end_segment_dir]
dirs = sorted(dirs, key=int, reverse=reverse)
for dir in dirs:
filenames = os.listdir(os.path.join(data_path, dir))
if not reverse:
filenames = [filename for filename in filenames if filename.isdigit() and int(filename) >= segment]
filenames = [filename for filename in filenames if filename.isdigit() and start_segment <= int(filename) <= end_segment]
else:
filenames = [filename for filename in filenames if filename.isdigit() and int(filename) <= segment]
filenames = [filename for filename in filenames if filename.isdigit() and start_segment >= int(filename) >= end_segment]
filenames = sorted(filenames, key=int, reverse=reverse)
for filename in filenames:
# Note: Do not filter out logically deleted segments (see "File system interaction" above),

View File

@ -197,11 +197,38 @@ class RepositoryTestCase(RepositoryTestCaseBase):
second_half = self.repository.scan(marker=first_half[-1])
assert len(second_half) == 50
assert second_half == all[50:]
assert len(self.repository.scan(limit=50)) == 50
# check result order == on-disk order (which is hash order)
for x in range(100):
assert all[x] == H(x)
def test_scan_modify(self):
for x in range(100):
self.repository.put(H(x), fchunk(b"ORIGINAL"))
self.repository.commit(compact=False)
# now we scan, read and modify chunks at the same time
count = 0
for id in self.repository.scan():
# scan results are in same order as we put the chunks into the repo (into the segment file)
assert id == H(count)
chunk = self.repository.get(id)
# check that we **only** get data that was committed when we started scanning
# and that we do not run into the new data we put into the repo.
assert pdchunk(chunk) == b"ORIGINAL"
count += 1
self.repository.put(id, fchunk(b"MODIFIED"))
assert count == 100
self.repository.commit()
# now we have committed all the modified chunks, and **only** must get the modified ones.
count = 0
for id in self.repository.scan():
# scan results are in same order as we put the chunks into the repo (into the segment file)
assert id == H(count)
chunk = self.repository.get(id)
assert pdchunk(chunk) == b"MODIFIED"
count += 1
assert count == 100
def test_max_data_size(self):
max_data = b"x" * (MAX_DATA_SIZE - RepoObj.meta_len_hdr.size)
self.repository.put(H(0), fchunk(max_data))