Merge pull request #1703 from ThomasWaldmann/improve-scan

Repository.scan(): avoid re-reading same repo object headers repeatedly
This commit is contained in:
enkore 2016-10-10 11:16:39 +02:00 committed by GitHub
commit 9381c38f29
1 changed files with 33 additions and 16 deletions

View File

@ -775,12 +775,10 @@ class Repository:
self.index = self.open_index(transaction_id) self.index = self.open_index(transaction_id)
at_start = marker is None at_start = marker is None
# smallest valid seg is <uint32> 0, smallest valid offs is <uint32> 8 # smallest valid seg is <uint32> 0, smallest valid offs is <uint32> 8
marker_segment, marker_offset = (0, 0) if at_start else self.index[marker] start_segment, start_offset = (0, 0) if at_start else self.index[marker]
result = [] result = []
for segment, filename in self.io.segment_iterator(): for segment, filename in self.io.segment_iterator(start_segment):
if segment < marker_segment: obj_iterator = self.io.iter_objects(segment, start_offset, read_data=False, include_data=False)
continue
obj_iterator = self.io.iter_objects(segment, read_data=False, include_data=False)
while True: while True:
try: try:
tag, id, offset, size = next(obj_iterator) tag, id, offset, size = next(obj_iterator)
@ -788,7 +786,11 @@ class Repository:
# either end-of-segment or an error - we can not seek to objects at # either end-of-segment or an error - we can not seek to objects at
# higher offsets than one that has an error in the header fields # higher offsets than one that has an error in the header fields
break break
if segment == marker_segment and offset <= marker_offset: if start_offset > 0:
# we are using a marker and the marker points to the last object we have already
# returned in the previous scan() call - thus, we need to skip this one object.
# also, for the next segment, we need to start at offset 0.
start_offset = 0
continue continue
if tag == TAG_PUT and (segment, offset) == self.index.get(id): if tag == TAG_PUT and (segment, offset) == self.index.get(id):
# we have found an existing and current object # we have found an existing and current object
@ -886,14 +888,25 @@ class LoggedIO:
os.posix_fadvise(fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED) os.posix_fadvise(fd.fileno(), 0, 0, os.POSIX_FADV_DONTNEED)
fd.close() fd.close()
def segment_iterator(self, reverse=False): def segment_iterator(self, segment=None, reverse=False):
if segment is None:
segment = 0 if not reverse else 2 ** 32 - 1
data_path = os.path.join(self.path, 'data') data_path = os.path.join(self.path, 'data')
dirs = sorted((dir for dir in os.listdir(data_path) if dir.isdigit()), key=int, reverse=reverse) start_segment_dir = segment // self.segments_per_dir
dirs = os.listdir(data_path)
if not reverse:
dirs = [dir for dir in dirs if dir.isdigit() and int(dir) >= start_segment_dir]
else:
dirs = [dir for dir in dirs if dir.isdigit() and int(dir) <= start_segment_dir]
dirs = sorted(dirs, key=int, reverse=reverse)
for dir in dirs: for dir in dirs:
filenames = os.listdir(os.path.join(data_path, dir)) filenames = os.listdir(os.path.join(data_path, dir))
sorted_filenames = sorted((filename for filename in filenames if not reverse:
if filename.isdigit()), key=int, reverse=reverse) filenames = [filename for filename in filenames if filename.isdigit() and int(filename) >= segment]
for filename in sorted_filenames: else:
filenames = [filename for filename in filenames if filename.isdigit() and int(filename) <= segment]
filenames = sorted(filenames, key=int, reverse=reverse)
for filename in filenames:
yield int(filename), os.path.join(data_path, dir, filename) yield int(filename), os.path.join(data_path, dir, filename)
def get_latest_segment(self): def get_latest_segment(self):
@ -999,7 +1012,7 @@ class LoggedIO:
def segment_size(self, segment): def segment_size(self, segment):
return os.path.getsize(self.segment_filename(segment)) return os.path.getsize(self.segment_filename(segment))
def iter_objects(self, segment, include_data=False, read_data=True): def iter_objects(self, segment, offset=0, include_data=False, read_data=True):
""" """
Return object iterator for *segment*. Return object iterator for *segment*.
@ -1009,10 +1022,14 @@ class LoggedIO:
The iterator returns four-tuples of (tag, key, offset, data|size). The iterator returns four-tuples of (tag, key, offset, data|size).
""" """
fd = self.get_fd(segment) fd = self.get_fd(segment)
fd.seek(0) fd.seek(offset)
if fd.read(MAGIC_LEN) != MAGIC: if offset == 0:
raise IntegrityError('Invalid segment magic [segment {}, offset {}]'.format(segment, 0)) # we are touching this segment for the first time, check the MAGIC.
offset = MAGIC_LEN # Repository.scan() calls us with segment > 0 when it continues an ongoing iteration
# from a marker position - but then we have checked the magic before already.
if fd.read(MAGIC_LEN) != MAGIC:
raise IntegrityError('Invalid segment magic [segment {}, offset {}]'.format(segment, 0))
offset = MAGIC_LEN
header = fd.read(self.header_fmt.size) header = fd.read(self.header_fmt.size)
while header: while header:
size, tag, key, data = self._read(fd, self.header_fmt, header, segment, offset, size, tag, key, data = self._read(fd, self.header_fmt, header, segment, offset,