repository: implement PUT2: header crc32, overall xxh64, fixes #1704

note: this required a slight increase of MAX_OBJECT_SIZE so that MAX_DATA_SIZE
      could stay the same as before.

For PUT2, compute the hash over the whole entry (header and content, excluding
hash and crc32 fields, because the crc32 computation includes the hash).

Also: refactor crc32 checks into function, use f-strings, structure _read in
a more logical sequential order.

write_put: avoid creating a large temporary bytes object

why use xxh64?
- fast even without hw acceleration
- borg depends on it already anyway
- stronger than crc32 and strong enough for this purpose
This commit is contained in:
Thomas Waldmann 2022-03-31 18:21:46 +02:00
parent 7d33ad3db1
commit 52f75d7722
3 changed files with 144 additions and 78 deletions

View File

@ -33,14 +33,15 @@ CACHE_TAG_CONTENTS = b'Signature: 8a477f597d28d172789f06886806bc55'
# bytes. That's why it's 500 MiB instead of 512 MiB.
DEFAULT_MAX_SEGMENT_SIZE = 500 * 1024 * 1024
# 20 MiB minus 41 bytes for a Repository header (because the "size" field in the Repository includes
# the header, and the total size was set to 20 MiB).
# in borg < 1.3, this has been defined like this:
# 20 MiB minus 41 bytes for a PUT header (because the "size" field in the Repository includes
# the header, and the total size was set to precisely 20 MiB for borg < 1.3).
MAX_DATA_SIZE = 20971479
# MAX_OBJECT_SIZE = <20 MiB (MAX_DATA_SIZE) + 41 bytes for a Repository PUT header, which consists of
# a 1 byte tag ID, 4 byte CRC, 4 byte size and 32 bytes for the ID.
MAX_OBJECT_SIZE = MAX_DATA_SIZE + 41 # see LoggedIO.put_header_fmt.size assertion in repository module
assert MAX_OBJECT_SIZE == 20 * 1024 * 1024
# MAX_OBJECT_SIZE = MAX_DATA_SIZE + len(PUT2 header)
# note: for borg >= 1.3, this makes the MAX_OBJECT_SIZE grow slightly over the precise 20MiB used by
# borg < 1.3, but this is not expected to cause any issues.
MAX_OBJECT_SIZE = MAX_DATA_SIZE + 41 + 8 # see assertion at end of repository module
# repo config max_segment_size value must be below this limit to stay within uint32 offsets:
MAX_SEGMENT_SIZE_LIMIT = 2 ** 32 - MAX_OBJECT_SIZE

View File

@ -25,7 +25,7 @@ from .locking import Lock, LockError, LockErrorT
from .logger import create_logger
from .lrucache import LRUCache
from .platform import SaveFile, SyncFile, sync_dir, safe_fadvise
from .checksums import crc32
from .checksums import crc32, StreamingXXH64
from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
logger = create_logger(__name__)
@ -34,9 +34,11 @@ MAGIC = b'BORG_SEG'
MAGIC_LEN = len(MAGIC)
ATTIC_MAGIC = b'ATTICSEG'
assert len(ATTIC_MAGIC) == MAGIC_LEN
TAG_PUT = 0
TAG_DELETE = 1
TAG_COMMIT = 2
TAG_PUT2 = 3
# Highest ID usable as TAG_* value
#
@ -788,7 +790,7 @@ class Repository:
continue
in_index = self.index.get(key)
is_index_object = in_index == (segment, offset)
if tag == TAG_PUT and is_index_object:
if tag in (TAG_PUT2, TAG_PUT) and is_index_object:
try:
new_segment, offset = self.io.write_put(key, data, raise_full=True)
except LoggedIO.SegmentFull:
@ -798,7 +800,10 @@ class Repository:
segments.setdefault(new_segment, 0)
segments[new_segment] += 1
segments[segment] -= 1
elif tag == TAG_PUT and not is_index_object:
if tag == TAG_PUT:
# old tag is PUT, but new will be PUT2 and use a bit more storage
self.storage_quota_use += self.io.ENTRY_HASH_SIZE
elif tag in (TAG_PUT2, TAG_PUT) and not is_index_object:
# If this is a PUT shadowed by a later tag, then it will be gone when this segment is deleted after
# this loop. Therefore it is removed from the shadow index.
try:
@ -807,7 +812,10 @@ class Repository:
# do not remove entry with empty shadowed_segments list here,
# it is needed for shadowed_put_exists code (see below)!
pass
self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE
if tag == TAG_PUT2:
self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE + self.io.ENTRY_HASH_SIZE
elif tag == TAG_PUT:
self.storage_quota_use -= len(data) + self.io.HEADER_ID_SIZE
elif tag == TAG_DELETE and not in_index:
# If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag,
# therefore we do not drop the delete, but write it to a current segment.
@ -830,7 +838,7 @@ class Repository:
# Consider the following series of operations if we would not do this, ie. this entire if:
# would be removed.
# Columns are segments, lines are different keys (line 1 = some key, line 2 = some other key)
# Legend: P=TAG_PUT, D=TAG_DELETE, c=commit, i=index is written for latest commit
# Legend: P=TAG_PUT/TAG_PUT2, D=TAG_DELETE, c=commit, i=index is written for latest commit
#
# Segment | 1 | 2 | 3
# --------+-------+-----+------
@ -899,7 +907,7 @@ class Repository:
"""some code shared between replay_segments and check"""
self.segments[segment] = 0
for tag, key, offset, size in objects:
if tag == TAG_PUT:
if tag in (TAG_PUT2, TAG_PUT):
try:
# If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space
s, _ = self.index[key]
@ -950,7 +958,7 @@ class Repository:
self.compact[segment] = 0
for tag, key, offset, size in self.io.iter_objects(segment, read_data=False):
if tag == TAG_PUT:
if tag in (TAG_PUT2, TAG_PUT):
if self.index.get(key, (-1, -1)) != (segment, offset):
# This PUT is superseded later
self.compact[segment] += size
@ -1169,7 +1177,7 @@ class Repository:
# also, for the next segment, we need to start at offset 0.
start_offset = 0
continue
if tag == TAG_PUT and (segment, offset) == self.index.get(id):
if tag in (TAG_PUT2, TAG_PUT) and (segment, offset) == self.index.get(id):
# we have found an existing and current object
result.append(id)
if len(result) == limit:
@ -1208,7 +1216,7 @@ class Repository:
# be in the repo index (and we won't need it in the shadow_index).
self._delete(id, segment, offset, update_shadow_index=False)
segment, offset = self.io.write_put(id, data)
self.storage_quota_use += len(data) + self.io.HEADER_ID_SIZE
self.storage_quota_use += len(data) + self.io.HEADER_ID_SIZE + self.io.ENTRY_HASH_SIZE
self.segments.setdefault(segment, 0)
self.segments[segment] += 1
self.index[id] = segment, offset
@ -1278,6 +1286,7 @@ class LoggedIO:
COMMIT = crc_fmt.pack(crc32(_commit)) + _commit
HEADER_ID_SIZE = header_fmt.size + 32
ENTRY_HASH_SIZE = 8
def __init__(self, path, limit, segments_per_dir, capacity=90):
self.path = path
@ -1475,7 +1484,8 @@ class LoggedIO:
Return object iterator for *segment*.
If read_data is False then include_data must be False as well.
Integrity checks are skipped: all data obtained from the iterator must be considered informational.
See the _read() docstring about confidence in the returned data.
The iterator returns four-tuples of (tag, key, offset, data|size).
"""
@ -1491,7 +1501,7 @@ class LoggedIO:
header = fd.read(self.header_fmt.size)
while header:
size, tag, key, data = self._read(fd, header, segment, offset,
(TAG_PUT, TAG_DELETE, TAG_COMMIT),
(TAG_PUT2, TAG_DELETE, TAG_COMMIT, TAG_PUT),
read_data=read_data)
if include_data:
yield tag, key, offset, data
@ -1528,8 +1538,25 @@ class LoggedIO:
dst_fd.write(MAGIC)
while len(d) >= self.header_fmt.size:
crc, size, tag = self.header_fmt.unpack(d[:self.header_fmt.size])
if size > MAX_OBJECT_SIZE or tag > MAX_TAG_ID or size < self.header_fmt.size \
or size > len(d) or crc32(d[4:size]) & 0xffffffff != crc:
size_invalid = size > MAX_OBJECT_SIZE or size < self.header_fmt.size or size > len(d)
if size_invalid or tag > MAX_TAG_ID:
d = d[1:]
continue
if tag == TAG_PUT2:
c_offset = self.HEADER_ID_SIZE + self.ENTRY_HASH_SIZE
# skip if header is invalid
if crc32(d[4:c_offset]) & 0xffffffff != crc:
d = d[1:]
continue
# skip if content is invalid
if self.entry_hash(d[4:self.HEADER_ID_SIZE], d[c_offset:size]) != d[self.HEADER_ID_SIZE:c_offset]:
d = d[1:]
continue
elif tag in (TAG_DELETE, TAG_COMMIT, TAG_PUT):
if crc32(d[4:size]) & 0xffffffff != crc:
d = d[1:]
continue
else: # tag unknown
d = d[1:]
continue
dst_fd.write(d[:size])
@ -1538,72 +1565,108 @@ class LoggedIO:
del d
data.release()
def entry_hash(self, *data):
h = StreamingXXH64()
for d in data:
h.update(d)
return h.digest()
def read(self, segment, offset, id, read_data=True):
"""
Read entry from *segment* at *offset* with *id*.
If read_data is False the size of the entry is returned instead.
If read_data is False the size of the entry is returned instead and integrity checks are skipped.
The return value should thus be considered informational.
See the _read() docstring about confidence in the returned data.
"""
if segment == self.segment and self._write_fd:
self._write_fd.sync()
fd = self.get_fd(segment)
fd.seek(offset)
header = fd.read(self.header_fmt.size)
size, tag, key, data = self._read(fd, header, segment, offset, (TAG_PUT,), read_data)
size, tag, key, data = self._read(fd, header, segment, offset, (TAG_PUT2, TAG_PUT), read_data)
if id != key:
raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format(
segment, offset))
return data if read_data else size
def _read(self, fd, header, segment, offset, acceptable_tags, read_data=True):
# some code shared by read() and iter_objects()
"""
Code shared by read() and iter_objects().
Confidence in returned data:
PUT2 tags, read_data == True: crc32 check (header) plus digest check (header+data)
PUT2 tags, read_data == False: crc32 check (header)
PUT tags, read_data == True: crc32 check (header+data)
PUT tags, read_data == False: crc32 check can not be done, all data obtained must be considered informational
"""
def check_crc32(wanted, header, *data):
result = crc32(memoryview(header)[4:]) # skip first 32 bits of the header, they contain the crc.
for d in data:
result = crc32(d, result)
if result & 0xffffffff != wanted:
raise IntegrityError(f'Segment entry header checksum mismatch [segment {segment}, offset {offset}]')
# See comment on MAX_TAG_ID for details
assert max(acceptable_tags) <= MAX_TAG_ID, 'Exceeding MAX_TAG_ID will break backwards compatibility'
key = data = None
fmt = self.header_fmt
try:
hdr_tuple = fmt.unpack(header)
except struct.error as err:
raise IntegrityError('Invalid segment entry header [segment {}, offset {}]: {}'.format(
segment, offset, err)) from None
raise IntegrityError(f'Invalid segment entry header [segment {segment}, offset {offset}]: {err}') from None
crc, size, tag = hdr_tuple
length = size - fmt.size # we already read the header
if size > MAX_OBJECT_SIZE:
# if you get this on an archive made with borg < 1.0.7 and millions of files and
# you need to restore it, you can disable this check by using "if False:" above.
raise IntegrityError('Invalid segment entry size {} - too big [segment {}, offset {}]'.format(
size, segment, offset))
raise IntegrityError(f'Invalid segment entry size {size} - too big [segment {segment}, offset {offset}]')
if size < fmt.size:
raise IntegrityError('Invalid segment entry size {} - too small [segment {}, offset {}]'.format(
size, segment, offset))
if tag in (TAG_PUT, TAG_DELETE):
raise IntegrityError(f'Invalid segment entry size {size} - too small [segment {segment}, offset {offset}]')
if tag not in (TAG_PUT2, TAG_DELETE, TAG_COMMIT, TAG_PUT):
raise IntegrityError(f'Invalid segment entry header, did not get a known tag '
f'[segment {segment}, offset {offset}]')
if tag not in acceptable_tags:
raise IntegrityError(f'Invalid segment entry header, did not get acceptable tag '
f'[segment {segment}, offset {offset}]')
if tag == TAG_COMMIT:
check_crc32(crc, header)
# that's all for COMMITs.
else:
# all other tags (TAG_PUT2, TAG_DELETE, TAG_PUT) have a key
key = fd.read(32)
length -= 32
if len(key) != 32:
raise IntegrityError(
'Segment entry key short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
segment, offset, 32, len(key)))
else:
key = None
if read_data and tag == TAG_PUT:
data = fd.read(length)
if len(data) != length:
raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
segment, offset, length, len(data)))
if crc32(data, crc32(key, crc32(memoryview(header)[4:]))) & 0xffffffff != crc:
raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format(
segment, offset))
else:
data = None
if length > 0:
oldpos = fd.tell()
seeked = fd.seek(length, os.SEEK_CUR) - oldpos
if seeked != length:
raise IntegrityError('Segment entry data short seek [segment {}, offset {}]: expected {}, got {} bytes'.format(
segment, offset, length, seeked))
if tag not in acceptable_tags:
raise IntegrityError('Invalid segment entry header, did not get acceptable tag [segment {}, offset {}]'.format(
segment, offset))
raise IntegrityError(f'Segment entry key short read [segment {segment}, offset {offset}]: '
f'expected {32}, got {len(key)} bytes')
if tag == TAG_DELETE:
check_crc32(crc, header, key)
# that's all for DELETEs.
else:
# TAG_PUT: we can not do a crc32 header check here, because the crc32 is computed over header+data!
# for the check, see code below when read_data is True.
if tag == TAG_PUT2:
entry_hash = fd.read(self.ENTRY_HASH_SIZE)
length -= self.ENTRY_HASH_SIZE
if len(entry_hash) != self.ENTRY_HASH_SIZE:
raise IntegrityError(f'Segment entry hash short read [segment {segment}, offset {offset}]: '
f'expected {self.ENTRY_HASH_SIZE}, got {len(entry_hash)} bytes')
check_crc32(crc, header, key, entry_hash)
if not read_data: # seek over data
oldpos = fd.tell()
seeked = fd.seek(length, os.SEEK_CUR) - oldpos
if seeked != length:
raise IntegrityError(f'Segment entry data short seek [segment {segment}, offset {offset}]: '
f'expected {length}, got {seeked} bytes')
else: # read data!
data = fd.read(length)
if len(data) != length:
raise IntegrityError(f'Segment entry data short read [segment {segment}, offset {offset}]: '
f'expected {length}, got {len(data)} bytes')
if tag == TAG_PUT2:
if self.entry_hash(memoryview(header)[4:], key, data) != entry_hash:
raise IntegrityError(f'Segment entry hash mismatch [segment {segment}, offset {offset}]')
elif tag == TAG_PUT:
check_crc32(crc, header, key, data)
return size, tag, key, data
def write_put(self, id, data, raise_full=False):
@ -1612,11 +1675,13 @@ class LoggedIO:
# this would push the segment entry size beyond MAX_OBJECT_SIZE.
raise IntegrityError(f'More than allowed put data [{data_size} > {MAX_DATA_SIZE}]')
fd = self.get_write_fd(want_new=(id == Manifest.MANIFEST_ID), raise_full=raise_full)
size = data_size + self.HEADER_ID_SIZE
size = data_size + self.HEADER_ID_SIZE + self.ENTRY_HASH_SIZE
offset = self.offset
header = self.header_no_crc_fmt.pack(size, TAG_PUT)
crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff)
fd.write(b''.join((crc, header, id, data)))
header = self.header_no_crc_fmt.pack(size, TAG_PUT2)
entry_hash = self.entry_hash(header, id, data)
crc = self.crc_fmt.pack(crc32(entry_hash, crc32(id, crc32(header))) & 0xffffffff)
fd.write(b''.join((crc, header, id, entry_hash)))
fd.write(data)
self.offset += size
return self.segment, offset
@ -1641,4 +1706,4 @@ class LoggedIO:
return self.segment - 1 # close_segment() increments it
assert LoggedIO.HEADER_ID_SIZE == 41 # see constants.MAX_OBJECT_SIZE
assert LoggedIO.HEADER_ID_SIZE + LoggedIO.ENTRY_HASH_SIZE == 41 + 8 # see constants.MAX_OBJECT_SIZE

View File

@ -14,7 +14,7 @@ from ..helpers import IntegrityError
from ..helpers import msgpack
from ..locking import Lock, LockFailed
from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, ConnectionClosedWithHint, handle_remote_line
from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE, TAG_PUT, TAG_COMMIT
from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE, TAG_PUT2, TAG_PUT, TAG_COMMIT
from . import BaseTestCase
from .hashindex import H
@ -58,7 +58,7 @@ class RepositoryTestCaseBase(BaseTestCase):
label = label + ': ' if label is not None else ''
H_trans = {H(i): i for i in range(10)}
H_trans[None] = -1 # key == None appears in commits
tag_trans = {TAG_PUT: 'put', TAG_DELETE: 'del', TAG_COMMIT: 'comm'}
tag_trans = {TAG_PUT2: 'put2', TAG_PUT: 'put', TAG_DELETE: 'del', TAG_COMMIT: 'comm'}
for segment, fn in self.repository.io.segment_iterator():
for tag, key, offset, size in self.repository.io.iter_objects(segment):
print("%s%s H(%d) -> %s[%d..+%d]" % (label, tag_trans[tag], H_trans[key], fn, offset, size))
@ -185,13 +185,13 @@ class LocalRepositoryTestCase(RepositoryTestCaseBase):
def _assert_sparse(self):
# The superseded 123456... PUT
assert self.repository.compact[0] == 41 + 9
assert self.repository.compact[0] == 41 + 8 + 9
# a COMMIT
assert self.repository.compact[1] == 9
# The DELETE issued by the superseding PUT (or issued directly)
assert self.repository.compact[2] == 41
self.repository._rebuild_sparse(0)
assert self.repository.compact[0] == 41 + 9
assert self.repository.compact[0] == 41 + 8 + 9
def test_sparse1(self):
self.repository.put(H(0), b'foo')
@ -213,10 +213,10 @@ class LocalRepositoryTestCase(RepositoryTestCaseBase):
self.repository.io._write_fd.sync()
# The on-line tracking works on a per-object basis...
assert self.repository.compact[0] == 41 + 41 + 4
assert self.repository.compact[0] == 41 + 8 + 41 + 4
self.repository._rebuild_sparse(0)
# ...while _rebuild_sparse can mark whole segments as completely sparse (which then includes the segment magic)
assert self.repository.compact[0] == 41 + 41 + 4 + len(MAGIC)
assert self.repository.compact[0] == 41 + 8 + 41 + 4 + len(MAGIC)
self.repository.commit(compact=True)
assert 0 not in [segment for segment, _ in self.repository.io.segment_iterator()]
@ -459,42 +459,42 @@ class QuotaTestCase(RepositoryTestCaseBase):
def test_tracking(self):
assert self.repository.storage_quota_use == 0
self.repository.put(H(1), bytes(1234))
assert self.repository.storage_quota_use == 1234 + 41
assert self.repository.storage_quota_use == 1234 + 41 + 8
self.repository.put(H(2), bytes(5678))
assert self.repository.storage_quota_use == 1234 + 5678 + 2 * 41
assert self.repository.storage_quota_use == 1234 + 5678 + 2 * (41 + 8)
self.repository.delete(H(1))
assert self.repository.storage_quota_use == 1234 + 5678 + 2 * 41 # we have not compacted yet
assert self.repository.storage_quota_use == 1234 + 5678 + 2 * (41 + 8) # we have not compacted yet
self.repository.commit(compact=False)
assert self.repository.storage_quota_use == 1234 + 5678 + 2 * 41 # we have not compacted yet
assert self.repository.storage_quota_use == 1234 + 5678 + 2 * (41 + 8) # we have not compacted yet
self.reopen()
with self.repository:
# Open new transaction; hints and thus quota data is not loaded unless needed.
self.repository.put(H(3), b'')
self.repository.delete(H(3))
assert self.repository.storage_quota_use == 1234 + 5678 + 3 * 41 # we have not compacted yet
assert self.repository.storage_quota_use == 1234 + 5678 + 3 * (41 + 8) # we have not compacted yet
self.repository.commit(compact=True)
assert self.repository.storage_quota_use == 5678 + 41
assert self.repository.storage_quota_use == 5678 + 41 + 8
def test_exceed_quota(self):
assert self.repository.storage_quota_use == 0
self.repository.storage_quota = 50
self.repository.storage_quota = 80
self.repository.put(H(1), b'')
assert self.repository.storage_quota_use == 41
assert self.repository.storage_quota_use == 41 + 8
self.repository.commit(compact=False)
with pytest.raises(Repository.StorageQuotaExceeded):
self.repository.put(H(2), b'')
assert self.repository.storage_quota_use == 82
assert self.repository.storage_quota_use == (41 + 8) * 2
with pytest.raises(Repository.StorageQuotaExceeded):
self.repository.commit(compact=False)
assert self.repository.storage_quota_use == 82
assert self.repository.storage_quota_use == (41 + 8) * 2
self.reopen()
with self.repository:
self.repository.storage_quota = 100
self.repository.storage_quota = 150
# Open new transaction; hints and thus quota data is not loaded unless needed.
self.repository.put(H(1), b'')
assert self.repository.storage_quota_use == 82 # we have 2 puts for H(1) here and not yet compacted.
assert self.repository.storage_quota_use == (41 + 8) * 2 # we have 2 puts for H(1) here and not yet compacted.
self.repository.commit(compact=True)
assert self.repository.storage_quota_use == 41 # now we have compacted.
assert self.repository.storage_quota_use == 41 + 8 # now we have compacted.
class NonceReservation(RepositoryTestCaseBase):