diff --git a/src/borg/constants.py b/src/borg/constants.py index 4c0f43870..208991ee7 100644 --- a/src/borg/constants.py +++ b/src/borg/constants.py @@ -33,14 +33,15 @@ CACHE_TAG_CONTENTS = b'Signature: 8a477f597d28d172789f06886806bc55' # bytes. That's why it's 500 MiB instead of 512 MiB. DEFAULT_MAX_SEGMENT_SIZE = 500 * 1024 * 1024 -# 20 MiB minus 41 bytes for a Repository header (because the "size" field in the Repository includes -# the header, and the total size was set to 20 MiB). +# in borg < 1.3, this has been defined like this: +# 20 MiB minus 41 bytes for a PUT header (because the "size" field in the Repository includes +# the header, and the total size was set to precisely 20 MiB for borg < 1.3). MAX_DATA_SIZE = 20971479 -# MAX_OBJECT_SIZE = <20 MiB (MAX_DATA_SIZE) + 41 bytes for a Repository PUT header, which consists of -# a 1 byte tag ID, 4 byte CRC, 4 byte size and 32 bytes for the ID. -MAX_OBJECT_SIZE = MAX_DATA_SIZE + 41 # see LoggedIO.put_header_fmt.size assertion in repository module -assert MAX_OBJECT_SIZE == 20 * 1024 * 1024 +# MAX_OBJECT_SIZE = MAX_DATA_SIZE + len(PUT2 header) +# note: for borg >= 1.3, this makes the MAX_OBJECT_SIZE grow slightly over the precise 20MiB used by +# borg < 1.3, but this is not expected to cause any issues. +MAX_OBJECT_SIZE = MAX_DATA_SIZE + 41 + 8 # see assertion at end of repository module # repo config max_segment_size value must be below this limit to stay within uint32 offsets: MAX_SEGMENT_SIZE_LIMIT = 2 ** 32 - MAX_OBJECT_SIZE diff --git a/src/borg/repository.py b/src/borg/repository.py index 86fea881f..db85910f2 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -25,7 +25,7 @@ from .locking import Lock, LockError, LockErrorT from .logger import create_logger from .lrucache import LRUCache from .platform import SaveFile, SyncFile, sync_dir, safe_fadvise -from .checksums import crc32 +from .checksums import crc32, StreamingXXH64 from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError logger = create_logger(__name__) @@ -34,9 +34,11 @@ MAGIC = b'BORG_SEG' MAGIC_LEN = len(MAGIC) ATTIC_MAGIC = b'ATTICSEG' assert len(ATTIC_MAGIC) == MAGIC_LEN + TAG_PUT = 0 TAG_DELETE = 1 TAG_COMMIT = 2 +TAG_PUT2 = 3 # Highest ID usable as TAG_* value # @@ -788,7 +790,7 @@ class Repository: continue in_index = self.index.get(key) is_index_object = in_index == (segment, offset) - if tag == TAG_PUT and is_index_object: + if tag in (TAG_PUT2, TAG_PUT) and is_index_object: try: new_segment, offset = self.io.write_put(key, data, raise_full=True) except LoggedIO.SegmentFull: @@ -798,7 +800,10 @@ class Repository: segments.setdefault(new_segment, 0) segments[new_segment] += 1 segments[segment] -= 1 - elif tag == TAG_PUT and not is_index_object: + if tag == TAG_PUT: + # old tag is PUT, but new will be PUT2 and use a bit more storage + self.storage_quota_use += self.io.ENTRY_HASH_SIZE + elif tag in (TAG_PUT2, TAG_PUT) and not is_index_object: # If this is a PUT shadowed by a later tag, then it will be gone when this segment is deleted after # this loop. Therefore it is removed from the shadow index. try: @@ -807,7 +812,10 @@ class Repository: # do not remove entry with empty shadowed_segments list here, # it is needed for shadowed_put_exists code (see below)! pass - self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE + if tag == TAG_PUT2: + self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE + self.io.ENTRY_HASH_SIZE + elif tag == TAG_PUT: + self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE elif tag == TAG_DELETE and not in_index: # If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag, # therefore we do not drop the delete, but write it to a current segment. @@ -830,7 +838,7 @@ class Repository: # Consider the following series of operations if we would not do this, ie. this entire if: # would be removed. # Columns are segments, lines are different keys (line 1 = some key, line 2 = some other key) - # Legend: P=TAG_PUT, D=TAG_DELETE, c=commit, i=index is written for latest commit + # Legend: P=TAG_PUT/TAG_PUT2, D=TAG_DELETE, c=commit, i=index is written for latest commit # # Segment | 1 | 2 | 3 # --------+-------+-----+------ @@ -899,7 +907,7 @@ class Repository: """some code shared between replay_segments and check""" self.segments[segment] = 0 for tag, key, offset, size in objects: - if tag == TAG_PUT: + if tag in (TAG_PUT2, TAG_PUT): try: # If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space s, _ = self.index[key] @@ -950,7 +958,7 @@ class Repository: self.compact[segment] = 0 for tag, key, offset, size in self.io.iter_objects(segment, read_data=False): - if tag == TAG_PUT: + if tag in (TAG_PUT2, TAG_PUT): if self.index.get(key, (-1, -1)) != (segment, offset): # This PUT is superseded later self.compact[segment] += size @@ -1169,7 +1177,7 @@ class Repository: # also, for the next segment, we need to start at offset 0. start_offset = 0 continue - if tag == TAG_PUT and (segment, offset) == self.index.get(id): + if tag in (TAG_PUT2, TAG_PUT) and (segment, offset) == self.index.get(id): # we have found an existing and current object result.append(id) if len(result) == limit: @@ -1208,7 +1216,7 @@ class Repository: # be in the repo index (and we won't need it in the shadow_index). self._delete(id, segment, offset, update_shadow_index=False) segment, offset = self.io.write_put(id, data) - self.storage_quota_use += len(data) + self.io.HEADER_ID_SIZE + self.storage_quota_use += len(data) + self.io.HEADER_ID_SIZE + self.io.ENTRY_HASH_SIZE self.segments.setdefault(segment, 0) self.segments[segment] += 1 self.index[id] = segment, offset @@ -1278,6 +1286,7 @@ class LoggedIO: COMMIT = crc_fmt.pack(crc32(_commit)) + _commit HEADER_ID_SIZE = header_fmt.size + 32 + ENTRY_HASH_SIZE = 8 def __init__(self, path, limit, segments_per_dir, capacity=90): self.path = path @@ -1475,7 +1484,8 @@ class LoggedIO: Return object iterator for *segment*. If read_data is False then include_data must be False as well. - Integrity checks are skipped: all data obtained from the iterator must be considered informational. + + See the _read() docstring about confidence in the returned data. The iterator returns four-tuples of (tag, key, offset, data|size). """ @@ -1491,7 +1501,7 @@ class LoggedIO: header = fd.read(self.header_fmt.size) while header: size, tag, key, data = self._read(fd, header, segment, offset, - (TAG_PUT, TAG_DELETE, TAG_COMMIT), + (TAG_PUT2, TAG_DELETE, TAG_COMMIT, TAG_PUT), read_data=read_data) if include_data: yield tag, key, offset, data @@ -1528,8 +1538,25 @@ class LoggedIO: dst_fd.write(MAGIC) while len(d) >= self.header_fmt.size: crc, size, tag = self.header_fmt.unpack(d[:self.header_fmt.size]) - if size > MAX_OBJECT_SIZE or tag > MAX_TAG_ID or size < self.header_fmt.size \ - or size > len(d) or crc32(d[4:size]) & 0xffffffff != crc: + size_invalid = size > MAX_OBJECT_SIZE or size < self.header_fmt.size or size > len(d) + if size_invalid or tag > MAX_TAG_ID: + d = d[1:] + continue + if tag == TAG_PUT2: + c_offset = self.HEADER_ID_SIZE + self.ENTRY_HASH_SIZE + # skip if header is invalid + if crc32(d[4:c_offset]) & 0xffffffff != crc: + d = d[1:] + continue + # skip if content is invalid + if self.entry_hash(d[4:self.HEADER_ID_SIZE], d[c_offset:size]) != d[self.HEADER_ID_SIZE:c_offset]: + d = d[1:] + continue + elif tag in (TAG_DELETE, TAG_COMMIT, TAG_PUT): + if crc32(d[4:size]) & 0xffffffff != crc: + d = d[1:] + continue + else: # tag unknown d = d[1:] continue dst_fd.write(d[:size]) @@ -1538,72 +1565,108 @@ class LoggedIO: del d data.release() + def entry_hash(self, *data): + h = StreamingXXH64() + for d in data: + h.update(d) + return h.digest() + def read(self, segment, offset, id, read_data=True): """ Read entry from *segment* at *offset* with *id*. + If read_data is False the size of the entry is returned instead. - If read_data is False the size of the entry is returned instead and integrity checks are skipped. - The return value should thus be considered informational. + See the _read() docstring about confidence in the returned data. """ if segment == self.segment and self._write_fd: self._write_fd.sync() fd = self.get_fd(segment) fd.seek(offset) header = fd.read(self.header_fmt.size) - size, tag, key, data = self._read(fd, header, segment, offset, (TAG_PUT,), read_data) + size, tag, key, data = self._read(fd, header, segment, offset, (TAG_PUT2, TAG_PUT), read_data) if id != key: raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format( segment, offset)) return data if read_data else size def _read(self, fd, header, segment, offset, acceptable_tags, read_data=True): - # some code shared by read() and iter_objects() + """ + Code shared by read() and iter_objects(). + + Confidence in returned data: + PUT2 tags, read_data == True: crc32 check (header) plus digest check (header+data) + PUT2 tags, read_data == False: crc32 check (header) + PUT tags, read_data == True: crc32 check (header+data) + PUT tags, read_data == False: crc32 check can not be done, all data obtained must be considered informational + """ + def check_crc32(wanted, header, *data): + result = crc32(memoryview(header)[4:]) # skip first 32 bits of the header, they contain the crc. + for d in data: + result = crc32(d, result) + if result & 0xffffffff != wanted: + raise IntegrityError(f'Segment entry header checksum mismatch [segment {segment}, offset {offset}]') + # See comment on MAX_TAG_ID for details assert max(acceptable_tags) <= MAX_TAG_ID, 'Exceeding MAX_TAG_ID will break backwards compatibility' + key = data = None fmt = self.header_fmt try: hdr_tuple = fmt.unpack(header) except struct.error as err: - raise IntegrityError('Invalid segment entry header [segment {}, offset {}]: {}'.format( - segment, offset, err)) from None + raise IntegrityError(f'Invalid segment entry header [segment {segment}, offset {offset}]: {err}') from None crc, size, tag = hdr_tuple length = size - fmt.size # we already read the header if size > MAX_OBJECT_SIZE: # if you get this on an archive made with borg < 1.0.7 and millions of files and # you need to restore it, you can disable this check by using "if False:" above. - raise IntegrityError('Invalid segment entry size {} - too big [segment {}, offset {}]'.format( - size, segment, offset)) + raise IntegrityError(f'Invalid segment entry size {size} - too big [segment {segment}, offset {offset}]') if size < fmt.size: - raise IntegrityError('Invalid segment entry size {} - too small [segment {}, offset {}]'.format( - size, segment, offset)) - if tag in (TAG_PUT, TAG_DELETE): + raise IntegrityError(f'Invalid segment entry size {size} - too small [segment {segment}, offset {offset}]') + if tag not in (TAG_PUT2, TAG_DELETE, TAG_COMMIT, TAG_PUT): + raise IntegrityError(f'Invalid segment entry header, did not get a known tag ' + f'[segment {segment}, offset {offset}]') + if tag not in acceptable_tags: + raise IntegrityError(f'Invalid segment entry header, did not get acceptable tag ' + f'[segment {segment}, offset {offset}]') + if tag == TAG_COMMIT: + check_crc32(crc, header) + # that's all for COMMITs. + else: + # all other tags (TAG_PUT2, TAG_DELETE, TAG_PUT) have a key key = fd.read(32) length -= 32 if len(key) != 32: - raise IntegrityError( - 'Segment entry key short read [segment {}, offset {}]: expected {}, got {} bytes'.format( - segment, offset, 32, len(key))) - else: - key = None - if read_data and tag == TAG_PUT: - data = fd.read(length) - if len(data) != length: - raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format( - segment, offset, length, len(data))) - if crc32(data, crc32(key, crc32(memoryview(header)[4:]))) & 0xffffffff != crc: - raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format( - segment, offset)) - else: - data = None - if length > 0: - oldpos = fd.tell() - seeked = fd.seek(length, os.SEEK_CUR) - oldpos - if seeked != length: - raise IntegrityError('Segment entry data short seek [segment {}, offset {}]: expected {}, got {} bytes'.format( - segment, offset, length, seeked)) - if tag not in acceptable_tags: - raise IntegrityError('Invalid segment entry header, did not get acceptable tag [segment {}, offset {}]'.format( - segment, offset)) + raise IntegrityError(f'Segment entry key short read [segment {segment}, offset {offset}]: ' + f'expected {32}, got {len(key)} bytes') + if tag == TAG_DELETE: + check_crc32(crc, header, key) + # that's all for DELETEs. + else: + # TAG_PUT: we can not do a crc32 header check here, because the crc32 is computed over header+data! + # for the check, see code below when read_data is True. + if tag == TAG_PUT2: + entry_hash = fd.read(self.ENTRY_HASH_SIZE) + length -= self.ENTRY_HASH_SIZE + if len(entry_hash) != self.ENTRY_HASH_SIZE: + raise IntegrityError(f'Segment entry hash short read [segment {segment}, offset {offset}]: ' + f'expected {self.ENTRY_HASH_SIZE}, got {len(entry_hash)} bytes') + check_crc32(crc, header, key, entry_hash) + if not read_data: # seek over data + oldpos = fd.tell() + seeked = fd.seek(length, os.SEEK_CUR) - oldpos + if seeked != length: + raise IntegrityError(f'Segment entry data short seek [segment {segment}, offset {offset}]: ' + f'expected {length}, got {seeked} bytes') + else: # read data! + data = fd.read(length) + if len(data) != length: + raise IntegrityError(f'Segment entry data short read [segment {segment}, offset {offset}]: ' + f'expected {length}, got {len(data)} bytes') + if tag == TAG_PUT2: + if self.entry_hash(memoryview(header)[4:], key, data) != entry_hash: + raise IntegrityError(f'Segment entry hash mismatch [segment {segment}, offset {offset}]') + elif tag == TAG_PUT: + check_crc32(crc, header, key, data) return size, tag, key, data def write_put(self, id, data, raise_full=False): @@ -1612,11 +1675,13 @@ class LoggedIO: # this would push the segment entry size beyond MAX_OBJECT_SIZE. raise IntegrityError(f'More than allowed put data [{data_size} > {MAX_DATA_SIZE}]') fd = self.get_write_fd(want_new=(id == Manifest.MANIFEST_ID), raise_full=raise_full) - size = data_size + self.HEADER_ID_SIZE + size = data_size + self.HEADER_ID_SIZE + self.ENTRY_HASH_SIZE offset = self.offset - header = self.header_no_crc_fmt.pack(size, TAG_PUT) - crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff) - fd.write(b''.join((crc, header, id, data))) + header = self.header_no_crc_fmt.pack(size, TAG_PUT2) + entry_hash = self.entry_hash(header, id, data) + crc = self.crc_fmt.pack(crc32(entry_hash, crc32(id, crc32(header))) & 0xffffffff) + fd.write(b''.join((crc, header, id, entry_hash))) + fd.write(data) self.offset += size return self.segment, offset @@ -1641,4 +1706,4 @@ class LoggedIO: return self.segment - 1 # close_segment() increments it -assert LoggedIO.HEADER_ID_SIZE == 41 # see constants.MAX_OBJECT_SIZE +assert LoggedIO.HEADER_ID_SIZE + LoggedIO.ENTRY_HASH_SIZE == 41 + 8 # see constants.MAX_OBJECT_SIZE diff --git a/src/borg/testsuite/repository.py b/src/borg/testsuite/repository.py index c52820e33..83cf728e4 100644 --- a/src/borg/testsuite/repository.py +++ b/src/borg/testsuite/repository.py @@ -14,7 +14,7 @@ from ..helpers import IntegrityError from ..helpers import msgpack from ..locking import Lock, LockFailed from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, ConnectionClosedWithHint, handle_remote_line -from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE, TAG_PUT, TAG_COMMIT +from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE, TAG_PUT2, TAG_PUT, TAG_COMMIT from . import BaseTestCase from .hashindex import H @@ -58,7 +58,7 @@ class RepositoryTestCaseBase(BaseTestCase): label = label + ': ' if label is not None else '' H_trans = {H(i): i for i in range(10)} H_trans[None] = -1 # key == None appears in commits - tag_trans = {TAG_PUT: 'put', TAG_DELETE: 'del', TAG_COMMIT: 'comm'} + tag_trans = {TAG_PUT2: 'put2', TAG_PUT: 'put', TAG_DELETE: 'del', TAG_COMMIT: 'comm'} for segment, fn in self.repository.io.segment_iterator(): for tag, key, offset, size in self.repository.io.iter_objects(segment): print("%s%s H(%d) -> %s[%d..+%d]" % (label, tag_trans[tag], H_trans[key], fn, offset, size)) @@ -185,13 +185,13 @@ class LocalRepositoryTestCase(RepositoryTestCaseBase): def _assert_sparse(self): # The superseded 123456... PUT - assert self.repository.compact[0] == 41 + 9 + assert self.repository.compact[0] == 41 + 8 + 9 # a COMMIT assert self.repository.compact[1] == 9 # The DELETE issued by the superseding PUT (or issued directly) assert self.repository.compact[2] == 41 self.repository._rebuild_sparse(0) - assert self.repository.compact[0] == 41 + 9 + assert self.repository.compact[0] == 41 + 8 + 9 def test_sparse1(self): self.repository.put(H(0), b'foo') @@ -213,10 +213,10 @@ class LocalRepositoryTestCase(RepositoryTestCaseBase): self.repository.io._write_fd.sync() # The on-line tracking works on a per-object basis... - assert self.repository.compact[0] == 41 + 41 + 4 + assert self.repository.compact[0] == 41 + 8 + 41 + 4 self.repository._rebuild_sparse(0) # ...while _rebuild_sparse can mark whole segments as completely sparse (which then includes the segment magic) - assert self.repository.compact[0] == 41 + 41 + 4 + len(MAGIC) + assert self.repository.compact[0] == 41 + 8 + 41 + 4 + len(MAGIC) self.repository.commit(compact=True) assert 0 not in [segment for segment, _ in self.repository.io.segment_iterator()] @@ -459,42 +459,42 @@ class QuotaTestCase(RepositoryTestCaseBase): def test_tracking(self): assert self.repository.storage_quota_use == 0 self.repository.put(H(1), bytes(1234)) - assert self.repository.storage_quota_use == 1234 + 41 + assert self.repository.storage_quota_use == 1234 + 41 + 8 self.repository.put(H(2), bytes(5678)) - assert self.repository.storage_quota_use == 1234 + 5678 + 2 * 41 + assert self.repository.storage_quota_use == 1234 + 5678 + 2 * (41 + 8) self.repository.delete(H(1)) - assert self.repository.storage_quota_use == 1234 + 5678 + 2 * 41 # we have not compacted yet + assert self.repository.storage_quota_use == 1234 + 5678 + 2 * (41 + 8) # we have not compacted yet self.repository.commit(compact=False) - assert self.repository.storage_quota_use == 1234 + 5678 + 2 * 41 # we have not compacted yet + assert self.repository.storage_quota_use == 1234 + 5678 + 2 * (41 + 8) # we have not compacted yet self.reopen() with self.repository: # Open new transaction; hints and thus quota data is not loaded unless needed. self.repository.put(H(3), b'') self.repository.delete(H(3)) - assert self.repository.storage_quota_use == 1234 + 5678 + 3 * 41 # we have not compacted yet + assert self.repository.storage_quota_use == 1234 + 5678 + 3 * (41 + 8) # we have not compacted yet self.repository.commit(compact=True) - assert self.repository.storage_quota_use == 5678 + 41 + assert self.repository.storage_quota_use == 5678 + 41 + 8 def test_exceed_quota(self): assert self.repository.storage_quota_use == 0 - self.repository.storage_quota = 50 + self.repository.storage_quota = 80 self.repository.put(H(1), b'') - assert self.repository.storage_quota_use == 41 + assert self.repository.storage_quota_use == 41 + 8 self.repository.commit(compact=False) with pytest.raises(Repository.StorageQuotaExceeded): self.repository.put(H(2), b'') - assert self.repository.storage_quota_use == 82 + assert self.repository.storage_quota_use == (41 + 8) * 2 with pytest.raises(Repository.StorageQuotaExceeded): self.repository.commit(compact=False) - assert self.repository.storage_quota_use == 82 + assert self.repository.storage_quota_use == (41 + 8) * 2 self.reopen() with self.repository: - self.repository.storage_quota = 100 + self.repository.storage_quota = 150 # Open new transaction; hints and thus quota data is not loaded unless needed. self.repository.put(H(1), b'') - assert self.repository.storage_quota_use == 82 # we have 2 puts for H(1) here and not yet compacted. + assert self.repository.storage_quota_use == (41 + 8) * 2 # we have 2 puts for H(1) here and not yet compacted. self.repository.commit(compact=True) - assert self.repository.storage_quota_use == 41 # now we have compacted. + assert self.repository.storage_quota_use == 41 + 8 # now we have compacted. class NonceReservation(RepositoryTestCaseBase):