From b8d1bc1ca891a65a7276d33488791a4878731a94 Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sat, 21 May 2016 19:17:14 +0200 Subject: [PATCH] Repository: don't read+verify old entries for size retrieval --- borg/repository.py | 67 +++++++++++++++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 19 deletions(-) diff --git a/borg/repository.py b/borg/repository.py index 7d4e972d1..439c06511 100644 --- a/borg/repository.py +++ b/borg/repository.py @@ -358,7 +358,7 @@ class Repository: # the old index is not necessarily valid for this transaction (e.g. compaction); if the segment # is already gone, then it was already compacted. self.segments[s] -= 1 - size = len(self.io.read(s, offset, key)) + size = self.io.read(s, offset, key, read_data=False) self.compact[s] += size elif tag == TAG_COMMIT: continue @@ -377,7 +377,7 @@ class Repository: if self.segments[segment] == 0: self.compact[segment] += self.io.segment_size(segment) return - for tag, key, offset, size in self.io.iter_objects(segment): + for tag, key, offset, size in self.io.iter_objects(segment, read_data=False): if tag == TAG_PUT: if self.index.get(key, (-1, -1)) != (segment, offset): # This PUT is superseded later @@ -504,7 +504,7 @@ class Repository: pass else: self.segments[segment] -= 1 - size = len(self.io.read(segment, offset, id)) + size = self.io.read(segment, offset, id, read_data=False) self.compact[segment] += size segment, size = self.io.write_delete(id) self.compact[segment] += size @@ -522,7 +522,7 @@ class Repository: except KeyError: raise self.ObjectNotFound(id, self.path) from None self.segments[segment] -= 1 - size = len(self.io.read(segment, offset, id)) + size = self.io.read(segment, offset, id, read_data=False) self.compact[segment] += size segment, size = self.io.write_delete(id) self.compact[segment] += size @@ -683,7 +683,15 @@ class LoggedIO: def segment_size(self, segment): return os.path.getsize(self.segment_filename(segment)) - def iter_objects(self, segment, include_data=False): + def iter_objects(self, segment, include_data=False, read_data=True): + """ + Return object iterator for *segment*. + + If read_data is False then include_data must be False as well. + Integrity checks are skipped: all data obtained from the iterator must be considered informational. + + The iterator returns four-tuples of (tag, key, offset, data|size). + """ fd = self.get_fd(segment) fd.seek(0) if fd.read(MAGIC_LEN) != MAGIC: @@ -692,7 +700,8 @@ class LoggedIO: header = fd.read(self.header_fmt.size) while header: size, tag, key, data = self._read(fd, self.header_fmt, header, segment, offset, - (TAG_PUT, TAG_DELETE, TAG_COMMIT)) + (TAG_PUT, TAG_DELETE, TAG_COMMIT), + read_data=read_data) if include_data: yield tag, key, offset, data else: @@ -720,19 +729,25 @@ class LoggedIO: fd.write(data[:size]) data = data[size:] - def read(self, segment, offset, id): + def read(self, segment, offset, id, read_data=True): + """ + Read entry from *segment* at *offset* with *id*. + + If read_data is False the size of the entry is returned instead and integrity checks are skipped. + The return value should thus be considered informational. + """ if segment == self.segment and self._write_fd: self._write_fd.sync() fd = self.get_fd(segment) fd.seek(offset) header = fd.read(self.put_header_fmt.size) - size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, )) + size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, ), read_data) if id != key: raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format( segment, offset)) - return data + return data if read_data else size - def _read(self, fd, fmt, header, segment, offset, acceptable_tags): + def _read(self, fd, fmt, header, segment, offset, acceptable_tags, read_data=True): # some code shared by read() and iter_objects() try: hdr_tuple = fmt.unpack(header) @@ -750,18 +765,32 @@ class LoggedIO: raise IntegrityError('Invalid segment entry size [segment {}, offset {}]'.format( segment, offset)) length = size - fmt.size - data = fd.read(length) - if len(data) != length: - raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format( - segment, offset, length, len(data))) - if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc: - raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format( - segment, offset)) + if read_data: + data = fd.read(length) + if len(data) != length: + raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format( + segment, offset, length, len(data))) + if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc: + raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format( + segment, offset)) + if key is None and tag in (TAG_PUT, TAG_DELETE): + key, data = data[:32], data[32:] + else: + if key is None and tag in (TAG_PUT, TAG_DELETE): + key = fd.read(32) + length -= 32 + if len(key) != 32: + raise IntegrityError('Segment entry key short read [segment {}, offset {}]: expected {}, got {} bytes'.format( + segment, offset, 32, len(key))) + oldpos = fd.tell() + seeked = fd.seek(length, os.SEEK_CUR) - oldpos + data = None + if seeked != length: + raise IntegrityError('Segment entry data short seek [segment {}, offset {}]: expected {}, got {} bytes'.format( + segment, offset, length, seeked)) if tag not in acceptable_tags: raise IntegrityError('Invalid segment entry header, did not get acceptable tag [segment {}, offset {}]'.format( segment, offset)) - if key is None and tag in (TAG_PUT, TAG_DELETE): - key, data = data[:32], data[32:] return size, tag, key, data def write_put(self, id, data, raise_full=False):