1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2025-01-03 05:35:58 +00:00

misc cleanups

- remove more hashindex tests
- remove IndexBase, _hashindex.c remainders
- remove early borg2 NSIndex
- remove hashindex_variant (we only support borg 1.x repo index
  aka NSIndex1, everything else uses the borghash based stuff)
- adapt code / tests so they use NSIndex1 (not NSIndex)
- minor fixes
- NSIndex1 can read the old borg 1.x on-disk format, but not write it.
- NSIndex1 can read/write the new borghash on-disk format.
- adapt legacyrepository code to work with NSIndex1 (segment, offset)
  values instead of NSIndex (segment, offset, size).
This commit is contained in:
Thomas Waldmann 2024-10-25 16:22:17 +02:00
parent 57e46cdfe5
commit 64bda1a636
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01
5 changed files with 43 additions and 367 deletions

View file

@ -3,242 +3,17 @@ import os
import struct
cimport cython
from libc.stdint cimport uint32_t, UINT32_MAX, uint64_t
from borghash cimport _borghash
API_VERSION = '1.2_01'
cdef extern from "_hashindex.c":
ctypedef struct HashIndex:
pass
HashIndex *hashindex_read(object file_py, int permit_compact, int legacy) except *
HashIndex *hashindex_init(int capacity, int key_size, int value_size)
void hashindex_free(HashIndex *index)
int hashindex_len(HashIndex *index)
int hashindex_size(HashIndex *index)
void hashindex_write(HashIndex *index, object file_py, int legacy) except *
unsigned char *hashindex_get(HashIndex *index, unsigned char *key)
unsigned char *hashindex_next_key(HashIndex *index, unsigned char *key)
int hashindex_delete(HashIndex *index, unsigned char *key)
int hashindex_set(HashIndex *index, unsigned char *key, void *value)
uint64_t hashindex_compact(HashIndex *index)
uint32_t _htole32(uint32_t v)
uint32_t _le32toh(uint32_t v)
double HASH_MAX_LOAD
_MAX_VALUE = 4294966271UL # 2**32 - 1025
cdef _NoDefault = object()
"""
The HashIndex is *not* a general purpose data structure. The value size must be at least 4 bytes, and these
first bytes are used for in-band signalling in the data structure itself.
The constant MAX_VALUE defines the valid range for these 4 bytes when interpreted as an uint32_t from 0
to MAX_VALUE (inclusive). The following reserved values beyond MAX_VALUE are currently in use
(byte order is LE)::
0xffffffff marks empty entries in the hashtable
0xfffffffe marks deleted entries in the hashtable
None of the publicly available classes in this module will accept nor return a reserved value;
AssertionError is raised instead.
"""
assert UINT32_MAX == 2**32-1
_MAX_VALUE = 4294966271UL # 2**32 - 1025
assert _MAX_VALUE % 2 == 1
def hashindex_variant(fn):
"""peek into an index file and find out what it is"""
with open(fn, 'rb') as f:
magic = f.read(8) # MAGIC_LEN
if magic == b'BORG_IDX':
return 1 # legacy
if magic == b'BORG2IDX':
return 2
if magic == b'12345678': # used by unit tests
return 2 # just return the current variant
raise ValueError(f'unknown hashindex magic: {magic!r}')
@cython.internal
cdef class IndexBase:
cdef HashIndex *index
cdef int key_size
legacy = 0
_key_size = 32
MAX_LOAD_FACTOR = HASH_MAX_LOAD
MAX_VALUE = _MAX_VALUE
def __cinit__(self, capacity=0, path=None, permit_compact=False, usable=None):
self.key_size = self._key_size
if path:
if isinstance(path, (str, bytes)):
with open(path, 'rb') as fd:
self.index = hashindex_read(fd, permit_compact, self.legacy)
else:
self.index = hashindex_read(path, permit_compact, self.legacy)
assert self.index, 'hashindex_read() returned NULL with no exception set'
else:
if usable is not None:
capacity = int(usable / self.MAX_LOAD_FACTOR)
self.index = hashindex_init(capacity, self.key_size, self.value_size)
if not self.index:
raise Exception('hashindex_init failed')
def __dealloc__(self):
if self.index:
hashindex_free(self.index)
@classmethod
def read(cls, path, permit_compact=False):
return cls(path=path, permit_compact=permit_compact)
def write(self, path):
if isinstance(path, (str, bytes)):
with open(path, 'wb') as fd:
hashindex_write(self.index, fd, self.legacy)
else:
hashindex_write(self.index, path, self.legacy)
def clear(self):
hashindex_free(self.index)
self.index = hashindex_init(0, self.key_size, self.value_size)
if not self.index:
raise Exception('hashindex_init failed')
def setdefault(self, key, value):
if not key in self:
self[key] = value
return self[key]
def __delitem__(self, key):
assert len(key) == self.key_size
rc = hashindex_delete(self.index, <unsigned char *>key)
if rc == 1:
return # success
if rc == -1:
raise KeyError(key)
if rc == 0:
raise Exception('hashindex_delete failed')
def get(self, key, default=None):
try:
return self[key]
except KeyError:
return default
def pop(self, key, default=_NoDefault):
try:
value = self[key]
del self[key]
return value
except KeyError:
if default != _NoDefault:
return default
raise
def __len__(self):
return hashindex_len(self.index)
def size(self):
"""Return size (bytes) of hash table."""
return hashindex_size(self.index)
def compact(self):
return hashindex_compact(self.index)
NSIndexEntry = namedtuple('NSIndexEntry', 'segment offset size')
cdef class NSIndex(IndexBase):
value_size = 12
def __getitem__(self, key):
assert len(key) == self.key_size
data = <uint32_t *>hashindex_get(self.index, <unsigned char *>key)
if not data:
raise KeyError(key)
cdef uint32_t segment = _le32toh(data[0])
assert segment <= _MAX_VALUE, "maximum number of segments reached"
return NSIndexEntry(segment, _le32toh(data[1]), _le32toh(data[2]))
def __setitem__(self, key, value):
assert len(key) == self.key_size
cdef uint32_t[3] data
assert len(value) == len(data)
cdef uint32_t segment = value[0]
assert segment <= _MAX_VALUE, "maximum number of segments reached"
data[0] = _htole32(segment)
data[1] = _htole32(value[1])
data[2] = _htole32(value[2])
if not hashindex_set(self.index, <unsigned char *>key, data):
raise Exception('hashindex_set failed')
def __contains__(self, key):
cdef uint32_t segment
assert len(key) == self.key_size
data = <uint32_t *>hashindex_get(self.index, <unsigned char *>key)
if data != NULL:
segment = _le32toh(data[0])
assert segment <= _MAX_VALUE, "maximum number of segments reached"
return data != NULL
def iteritems(self, marker=None):
"""iterate over all items or optionally only over items having specific flag values"""
cdef const unsigned char *key
iter = NSKeyIterator(self.key_size)
iter.idx = self
iter.index = self.index
if marker:
key = hashindex_get(self.index, <unsigned char *>marker)
if marker is None:
raise IndexError
iter.key = key - self.key_size
return iter
cdef class NSKeyIterator:
cdef NSIndex idx
cdef HashIndex *index
cdef const unsigned char *key
cdef int key_size
cdef int exhausted
def __cinit__(self, key_size):
self.key = NULL
self.key_size = key_size
self.exhausted = 0
def __iter__(self):
return self
def __next__(self):
cdef uint32_t *value
if self.exhausted:
raise StopIteration
self.key = hashindex_next_key(self.index, <unsigned char *>self.key)
if not self.key:
self.exhausted = 1
raise StopIteration
value = <uint32_t *> (self.key + self.key_size)
cdef uint32_t segment = _le32toh(value[0])
assert segment <= _MAX_VALUE, "maximum number of segments reached"
return ((<char *>self.key)[:self.key_size],
NSIndexEntry(segment, _le32toh(value[1]), _le32toh(value[2])))
ChunkIndexEntry = namedtuple('ChunkIndexEntry', 'refcount size')
@ -288,7 +63,7 @@ class ChunkIndex:
return default
def compact(self):
pass
return 0
def clear(self):
pass
@ -340,6 +115,8 @@ NSIndex1Entry = namedtuple('NSIndex1bEntry', 'segment offset')
class NSIndex1: # legacy borg 1.x
MAX_VALUE = 2**32 - 1 # borghash has the full uint32_t range
def __init__(self, capacity=1000, path=None, permit_compact=False, usable=None):
if usable is not None:
capacity = usable * 2 # load factor 0.5
@ -368,11 +145,21 @@ class NSIndex1: # legacy borg 1.x
except KeyError:
return default
def pop(self, key, default=_NoDefault):
try:
value = self[key]
del self[key]
return value
except KeyError:
if default != _NoDefault:
return default
raise
def iteritems(self):
yield from self.ht.iteritems()
def compact(self):
pass
return 0
def clear(self):
pass
@ -382,10 +169,10 @@ class NSIndex1: # legacy borg 1.x
return cls(path=path)
def write(self, path):
raise NotImplementedError
self.ht.write(path) # only for unit tests
def size(self):
raise NotImplementedError
return self.ht.size() # not quite correct as this is not the on-disk read-only format.
def _read(self, path):
if isinstance(path, str):
@ -395,6 +182,14 @@ class NSIndex1: # legacy borg 1.x
self._read_fd(path)
def _read_fd(self, fd):
magic = fd.read(8)
fd.seek(0)
if magic == b"BORG_IDX": # used for borg transfer borg 1.x -> borg 2
self._read_fd_borg1(fd)
if magic == b"BORGHASH": # only for unit tests
self.ht = _borghash.HashTableNT.read(fd)
def _read_fd_borg1(self, fd):
MAGIC = b"BORG_IDX" # borg 1.x
HEADER_FMT = "<8sIIBB" # magic, entries, buckets, ksize, vsize
header_size = struct.calcsize(HEADER_FMT)

View file

@ -13,7 +13,7 @@
from typing import Callable, DefaultDict
from .constants import * # NOQA
from .hashindex import NSIndexEntry, NSIndex, NSIndex1, hashindex_variant
from .hashindex import NSIndex1Entry, NSIndex1
from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size
from .helpers import Location
from .helpers import ProgressIndicatorPercent
@ -562,16 +562,12 @@ def _read_integrity(self, transaction_id, key):
def open_index(self, transaction_id, auto_recover=True):
if transaction_id is None:
return NSIndex()
return NSIndex1()
index_path = os.path.join(self.path, "index.%d" % transaction_id)
variant = hashindex_variant(index_path)
integrity_data = self._read_integrity(transaction_id, "index")
try:
with IntegrityCheckedFile(index_path, write=False, integrity_data=integrity_data) as fd:
if variant == 2:
return NSIndex.read(fd)
if variant == 1: # legacy
return NSIndex1.read(fd)
return NSIndex1.read(fd)
except (ValueError, OSError, FileIntegrityError) as exc:
logger.warning("Repository index missing or corrupted, trying to recover from: %s", exc)
os.unlink(index_path)
@ -864,7 +860,7 @@ def complete_xfer(intermediate=True):
except LoggedIO.SegmentFull:
complete_xfer()
new_segment, offset = self.io.write_put(key, data)
self.index[key] = NSIndexEntry(new_segment, offset, len(data))
self.index[key] = NSIndex1Entry(new_segment, offset)
segments.setdefault(new_segment, 0)
segments[new_segment] += 1
segments[segment] -= 1
@ -1001,7 +997,7 @@ def _update_index(self, segment, objects, report=None):
self.shadow_index.setdefault(key, []).append(in_index.segment)
except KeyError:
pass
self.index[key] = NSIndexEntry(segment, offset, size)
self.index[key] = NSIndex1Entry(segment, offset)
self.segments[segment] += 1
self.storage_quota_use += header_size(tag) + size
elif tag == TAG_DELETE:
@ -1015,7 +1011,7 @@ def _update_index(self, segment, objects, report=None):
# the old index is not necessarily valid for this transaction (e.g. compaction); if the segment
# is already gone, then it was already compacted.
self.segments[in_index.segment] -= 1
self.compact[in_index.segment] += header_size(tag) + in_index.size
self.compact[in_index.segment] += header_size(tag) + 0
self.shadow_index.setdefault(key, []).append(in_index.segment)
elif tag == TAG_COMMIT:
continue
@ -1219,8 +1215,8 @@ def get(self, id, read_data=True):
if not self.index:
self.index = self.open_index(self.get_transaction_id())
try:
in_index = NSIndexEntry(*((self.index[id] + (None,))[:3])) # legacy: index entries have no size element
return self.io.read(in_index.segment, in_index.offset, id, expected_size=in_index.size, read_data=read_data)
in_index = NSIndex1Entry(*(self.index[id][:2])) # legacy: index entries have no size element
return self.io.read(in_index.segment, in_index.offset, id, read_data=read_data)
except KeyError:
raise self.ObjectNotFound(id, self.path) from None
@ -1245,12 +1241,12 @@ def put(self, id, data, wait=True):
# it is essential to do a delete first to get correct quota bookkeeping
# and also a correctly updated shadow_index, so that the compaction code
# does not wrongly resurrect an old PUT by dropping a DEL that is still needed.
self._delete(id, in_index.segment, in_index.offset, in_index.size)
self._delete(id, in_index.segment, in_index.offset, 0)
segment, offset = self.io.write_put(id, data)
self.storage_quota_use += header_size(TAG_PUT2) + len(data)
self.segments.setdefault(segment, 0)
self.segments[segment] += 1
self.index[id] = NSIndexEntry(segment, offset, len(data))
self.index[id] = NSIndex1Entry(segment, offset)
if self.storage_quota and self.storage_quota_use > self.storage_quota:
self.transaction_doomed = self.StorageQuotaExceeded(
format_file_size(self.storage_quota), format_file_size(self.storage_quota_use)
@ -1269,7 +1265,7 @@ def delete(self, id, wait=True):
in_index = self.index.pop(id)
except KeyError:
raise self.ObjectNotFound(id, self.path) from None
self._delete(id, in_index.segment, in_index.offset, in_index.size)
self._delete(id, in_index.segment, in_index.offset, 0)
def _delete(self, id, segment, offset, size):
# common code used by put and delete

View file

@ -1,75 +0,0 @@
# more hashindex tests. kept separate so we can use pytest here.
import os
import random
import pytest
from ..hashindex import NSIndex
def verify_hash_table(kv, idx):
"""kv should be a python dictionary and idx an NSIndex. Check that idx
has the expected entries and the right number of entries.
"""
for k, v in kv.items():
assert k in idx and idx[k] == (v, v, v)
assert len(idx) == len(kv)
def make_hashtables(*, entries, loops):
idx = NSIndex()
kv = {}
for i in range(loops):
# put some entries
for j in range(entries):
k = random.randbytes(32)
v = random.randint(0, NSIndex.MAX_VALUE - 1)
idx[k] = (v, v, v)
kv[k] = v
# check and delete a random amount of entries
delete_keys = random.sample(list(kv), k=random.randint(0, len(kv)))
for k in delete_keys:
v = kv.pop(k)
assert idx.pop(k) == (v, v, v)
verify_hash_table(kv, idx)
return idx, kv
@pytest.mark.skipif("BORG_TESTS_SLOW" not in os.environ, reason="slow tests not enabled, use BORG_TESTS_SLOW=1")
def test_hashindex_stress():
"""checks if the hashtable behaves as expected
This can be used in _hashindex.c before running this test to provoke more collisions (don't forget to compile):
#define HASH_MAX_LOAD .99
#define HASH_MAX_EFF_LOAD .999
"""
make_hashtables(entries=10000, loops=1000) # we do quite some assertions while making them
def test_hashindex_compact():
"""test that we do not lose or corrupt data by the compaction nor by expanding/rebuilding"""
idx, kv = make_hashtables(entries=5000, loops=5)
size_noncompact = idx.size()
# compact the hashtable (remove empty/tombstone buckets)
saved_space = idx.compact()
# did we actually compact (reduce space usage)?
size_compact = idx.size()
assert saved_space > 0
assert size_noncompact - size_compact == saved_space
# did we lose anything?
verify_hash_table(kv, idx)
# now expand the hashtable again. trigger a resize/rebuild by adding an entry.
k = b"x" * 32
idx[k] = (0, 0, 0)
kv[k] = 0
size_rebuilt = idx.size()
assert size_rebuilt > size_compact + 1
# did we lose anything?
verify_hash_table(kv, idx)
@pytest.mark.skipif("BORG_TESTS_SLOW" not in os.environ, reason="slow tests not enabled, use BORG_TESTS_SLOW=1")
def test_hashindex_compact_stress():
for _ in range(100):
test_hashindex_compact()

View file

@ -4,7 +4,7 @@
import hashlib
import struct
from ..hashindex import NSIndex, ChunkIndex
from ..hashindex import ChunkIndex
from . import BaseTestCase
@ -29,48 +29,8 @@ def test_chunkindex_add(self):
def test_keyerror(self):
chunks = ChunkIndex()
x = H2(1)
with self.assert_raises(KeyError):
chunks[H(1)]
chunks[x]
with self.assert_raises(struct.error):
chunks.add(H(1), -1, 0)
class IndexCorruptionTestCase(BaseTestCase):
def test_bug_4829(self):
from struct import pack
def HH(x, y, z):
# make some 32byte long thing that depends on x, y, z.
# same x will mean a collision in the hashtable as bucket index is computed from
# first 4 bytes. giving a specific x targets bucket index x.
# y is to create different keys and does not go into the bucket index calculation.
# so, same x + different y --> collision
return pack("<IIIIIIII", x, y, z, 0, 0, 0, 0, 0) # 8 * 4 == 32
idx = NSIndex()
# create lots of colliding entries
for y in range(700): # stay below max load not to trigger resize
idx[HH(0, y, 0)] = (0, y, 0)
assert idx.size() == 1024 + 1031 * 44 # header + 1031 buckets
# delete lots of the collisions, creating lots of tombstones
for y in range(400): # stay above min load not to trigger resize
del idx[HH(0, y, 0)]
# create lots of colliding entries, within the not yet used part of the hashtable
for y in range(330): # stay below max load not to trigger resize
# at y == 259 a resize will happen due to going beyond max EFFECTIVE load
# if the bug is present, that element will be inserted at the wrong place.
# and because it will be at the wrong place, it can not be found again.
idx[HH(600, y, 0)] = 600, y, 0
# now check if hashtable contents is as expected:
assert [idx.get(HH(0, y, 0)) for y in range(400, 700)] == [(0, y, 0) for y in range(400, 700)]
assert [HH(0, y, 0) in idx for y in range(400)] == [False for y in range(400)] # deleted entries
# this will fail at HH(600, 259) if the bug is present.
assert [idx.get(HH(600, y, 0)) for y in range(330)] == [(600, y, 0) for y in range(330)]
chunks.add(x, -1, 0)

View file

@ -7,7 +7,7 @@
import pytest
from ..checksums import xxh64
from ..hashindex import NSIndex
from ..hashindex import NSIndex1
from ..helpers import Location
from ..helpers import IntegrityError
from ..helpers import msgpack
@ -268,7 +268,7 @@ def test_sparse_delete(repository):
repository.delete(H(0))
repository.io._write_fd.sync()
# the on-line tracking works on a per-object basis...
assert repository.compact[0] == 41 + 8 + 41 + len(chunk0)
assert repository.compact[0] == 41 + 8 + 41 + 0 # len(chunk0) information is lost
repository._rebuild_sparse(0)
# ...while _rebuild_sparse can mark whole segments as completely sparse (which then includes the segment magic)
assert repository.compact[0] == 41 + 8 + 41 + len(chunk0) + len(MAGIC)
@ -805,7 +805,7 @@ def get_head(repo_path):
def open_index(repo_path):
return NSIndex.read(os.path.join(repo_path, f"index.{get_head(repo_path)}"))
return NSIndex1.read(os.path.join(repo_path, f"index.{get_head(repo_path)}"))
def corrupt_object(repo_path, id_):