1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2024-12-26 01:37:20 +00:00

repository: simplify LoggedIO._read

Code gets simpler if we always only use the (shorter) header_fmt.
That format ALWAYS applies, to all tags borg writes.

If the tag unpacked from there indicates that there is also a chunkid
to read (like for PUT and DEL), we can decide that inside _read and
then read the chunkid from the fd.
This commit is contained in:
Thomas Waldmann 2022-03-31 15:42:07 +02:00
parent b9ea17da77
commit cfa34bdf71

View file

@ -807,7 +807,7 @@ def complete_xfer(intermediate=True):
# do not remove entry with empty shadowed_segments list here, # do not remove entry with empty shadowed_segments list here,
# it is needed for shadowed_put_exists code (see below)! # it is needed for shadowed_put_exists code (see below)!
pass pass
self.storage_quota_use -= len(data) + self.io.put_header_fmt.size self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE
elif tag == TAG_DELETE and not in_index: elif tag == TAG_DELETE and not in_index:
# If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag, # If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag,
# therefore we do not drop the delete, but write it to a current segment. # therefore we do not drop the delete, but write it to a current segment.
@ -1208,7 +1208,7 @@ def put(self, id, data, wait=True):
# be in the repo index (and we won't need it in the shadow_index). # be in the repo index (and we won't need it in the shadow_index).
self._delete(id, segment, offset, update_shadow_index=False) self._delete(id, segment, offset, update_shadow_index=False)
segment, offset = self.io.write_put(id, data) segment, offset = self.io.write_put(id, data)
self.storage_quota_use += len(data) + self.io.put_header_fmt.size self.storage_quota_use += len(data) + self.io.HEADER_ID_SIZE
self.segments.setdefault(segment, 0) self.segments.setdefault(segment, 0)
self.segments[segment] += 1 self.segments[segment] += 1
self.index[id] = segment, offset self.index[id] = segment, offset
@ -1269,8 +1269,6 @@ class SegmentFull(Exception):
header_fmt = struct.Struct('<IIB') header_fmt = struct.Struct('<IIB')
assert header_fmt.size == 9 assert header_fmt.size == 9
put_header_fmt = struct.Struct('<IIB32s')
assert put_header_fmt.size == 41
header_no_crc_fmt = struct.Struct('<IB') header_no_crc_fmt = struct.Struct('<IB')
assert header_no_crc_fmt.size == 5 assert header_no_crc_fmt.size == 5
crc_fmt = struct.Struct('<I') crc_fmt = struct.Struct('<I')
@ -1279,6 +1277,8 @@ class SegmentFull(Exception):
_commit = header_no_crc_fmt.pack(9, TAG_COMMIT) _commit = header_no_crc_fmt.pack(9, TAG_COMMIT)
COMMIT = crc_fmt.pack(crc32(_commit)) + _commit COMMIT = crc_fmt.pack(crc32(_commit)) + _commit
HEADER_ID_SIZE = header_fmt.size + 32
def __init__(self, path, limit, segments_per_dir, capacity=90): def __init__(self, path, limit, segments_per_dir, capacity=90):
self.path = path self.path = path
self.fds = LRUCache(capacity, dispose=self._close_fd) self.fds = LRUCache(capacity, dispose=self._close_fd)
@ -1490,7 +1490,7 @@ def iter_objects(self, segment, offset=0, include_data=False, read_data=True):
offset = MAGIC_LEN offset = MAGIC_LEN
header = fd.read(self.header_fmt.size) header = fd.read(self.header_fmt.size)
while header: while header:
size, tag, key, data = self._read(fd, self.header_fmt, header, segment, offset, size, tag, key, data = self._read(fd, header, segment, offset,
(TAG_PUT, TAG_DELETE, TAG_COMMIT), (TAG_PUT, TAG_DELETE, TAG_COMMIT),
read_data=read_data) read_data=read_data)
if include_data: if include_data:
@ -1549,31 +1549,25 @@ def read(self, segment, offset, id, read_data=True):
self._write_fd.sync() self._write_fd.sync()
fd = self.get_fd(segment) fd = self.get_fd(segment)
fd.seek(offset) fd.seek(offset)
header = fd.read(self.put_header_fmt.size) header = fd.read(self.header_fmt.size)
size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, ), read_data) size, tag, key, data = self._read(fd, header, segment, offset, (TAG_PUT,), read_data)
if id != key: if id != key:
raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format( raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format(
segment, offset)) segment, offset))
return data if read_data else size return data if read_data else size
def _read(self, fd, fmt, header, segment, offset, acceptable_tags, read_data=True): def _read(self, fd, header, segment, offset, acceptable_tags, read_data=True):
# some code shared by read() and iter_objects() # some code shared by read() and iter_objects()
# See comment on MAX_TAG_ID for details # See comment on MAX_TAG_ID for details
assert max(acceptable_tags) <= MAX_TAG_ID, 'Exceeding MAX_TAG_ID will break backwards compatibility' assert max(acceptable_tags) <= MAX_TAG_ID, 'Exceeding MAX_TAG_ID will break backwards compatibility'
fmt = self.header_fmt
try: try:
hdr_tuple = fmt.unpack(header) hdr_tuple = fmt.unpack(header)
except struct.error as err: except struct.error as err:
raise IntegrityError('Invalid segment entry header [segment {}, offset {}]: {}'.format( raise IntegrityError('Invalid segment entry header [segment {}, offset {}]: {}'.format(
segment, offset, err)) from None segment, offset, err)) from None
if fmt is self.put_header_fmt: crc, size, tag = hdr_tuple
crc, size, tag, key = hdr_tuple length = size - fmt.size # we already read the header
elif fmt is self.header_fmt:
crc, size, tag = hdr_tuple
key = None
else:
raise TypeError("_read called with unsupported format")
if size > MAX_OBJECT_SIZE: if size > MAX_OBJECT_SIZE:
# if you get this on an archive made with borg < 1.0.7 and millions of files and # if you get this on an archive made with borg < 1.0.7 and millions of files and
# you need to restore it, you can disable this check by using "if False:" above. # you need to restore it, you can disable this check by using "if False:" above.
@ -1582,30 +1576,31 @@ def _read(self, fd, fmt, header, segment, offset, acceptable_tags, read_data=Tru
if size < fmt.size: if size < fmt.size:
raise IntegrityError('Invalid segment entry size {} - too small [segment {}, offset {}]'.format( raise IntegrityError('Invalid segment entry size {} - too small [segment {}, offset {}]'.format(
size, segment, offset)) size, segment, offset))
length = size - fmt.size if tag in (TAG_PUT, TAG_DELETE):
if read_data: key = fd.read(32)
length -= 32
if len(key) != 32:
raise IntegrityError(
'Segment entry key short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
segment, offset, 32, len(key)))
else:
key = None
if read_data and tag == TAG_PUT:
data = fd.read(length) data = fd.read(length)
if len(data) != length: if len(data) != length:
raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format( raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
segment, offset, length, len(data))) segment, offset, length, len(data)))
if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc: if crc32(data, crc32(key, crc32(memoryview(header)[4:]))) & 0xffffffff != crc:
raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format( raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format(
segment, offset)) segment, offset))
if key is None and tag in (TAG_PUT, TAG_DELETE):
key, data = data[:32], data[32:]
else: else:
if key is None and tag in (TAG_PUT, TAG_DELETE):
key = fd.read(32)
length -= 32
if len(key) != 32:
raise IntegrityError('Segment entry key short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
segment, offset, 32, len(key)))
oldpos = fd.tell()
seeked = fd.seek(length, os.SEEK_CUR) - oldpos
data = None data = None
if seeked != length: if length > 0:
raise IntegrityError('Segment entry data short seek [segment {}, offset {}]: expected {}, got {} bytes'.format( oldpos = fd.tell()
segment, offset, length, seeked)) seeked = fd.seek(length, os.SEEK_CUR) - oldpos
if seeked != length:
raise IntegrityError('Segment entry data short seek [segment {}, offset {}]: expected {}, got {} bytes'.format(
segment, offset, length, seeked))
if tag not in acceptable_tags: if tag not in acceptable_tags:
raise IntegrityError('Invalid segment entry header, did not get acceptable tag [segment {}, offset {}]'.format( raise IntegrityError('Invalid segment entry header, did not get acceptable tag [segment {}, offset {}]'.format(
segment, offset)) segment, offset))
@ -1617,7 +1612,7 @@ def write_put(self, id, data, raise_full=False):
# this would push the segment entry size beyond MAX_OBJECT_SIZE. # this would push the segment entry size beyond MAX_OBJECT_SIZE.
raise IntegrityError(f'More than allowed put data [{data_size} > {MAX_DATA_SIZE}]') raise IntegrityError(f'More than allowed put data [{data_size} > {MAX_DATA_SIZE}]')
fd = self.get_write_fd(want_new=(id == Manifest.MANIFEST_ID), raise_full=raise_full) fd = self.get_write_fd(want_new=(id == Manifest.MANIFEST_ID), raise_full=raise_full)
size = data_size + self.put_header_fmt.size size = data_size + self.HEADER_ID_SIZE
offset = self.offset offset = self.offset
header = self.header_no_crc_fmt.pack(size, TAG_PUT) header = self.header_no_crc_fmt.pack(size, TAG_PUT)
crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff) crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff)
@ -1627,11 +1622,11 @@ def write_put(self, id, data, raise_full=False):
def write_delete(self, id, raise_full=False): def write_delete(self, id, raise_full=False):
fd = self.get_write_fd(want_new=(id == Manifest.MANIFEST_ID), raise_full=raise_full) fd = self.get_write_fd(want_new=(id == Manifest.MANIFEST_ID), raise_full=raise_full)
header = self.header_no_crc_fmt.pack(self.put_header_fmt.size, TAG_DELETE) header = self.header_no_crc_fmt.pack(self.HEADER_ID_SIZE, TAG_DELETE)
crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff) crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
fd.write(b''.join((crc, header, id))) fd.write(b''.join((crc, header, id)))
self.offset += self.put_header_fmt.size self.offset += self.HEADER_ID_SIZE
return self.segment, self.put_header_fmt.size return self.segment, self.HEADER_ID_SIZE
def write_commit(self, intermediate=False): def write_commit(self, intermediate=False):
# Intermediate commits go directly into the current segment - this makes checking their validity more # Intermediate commits go directly into the current segment - this makes checking their validity more
@ -1646,4 +1641,4 @@ def write_commit(self, intermediate=False):
return self.segment - 1 # close_segment() increments it return self.segment - 1 # close_segment() increments it
assert LoggedIO.put_header_fmt.size == 41 # see constants.MAX_OBJECT_SIZE assert LoggedIO.HEADER_ID_SIZE == 41 # see constants.MAX_OBJECT_SIZE