repository: set/query flags, iteration over flagged items (NSIndex)

use this to query or set/clear flags in the "extra" word.

also: remove direct access to the "extra" word, adapt tests.
This commit is contained in:
Thomas Waldmann 2022-06-09 20:15:25 +02:00
parent bf9f42320e
commit e5ea016115
5 changed files with 130 additions and 38 deletions

View File

@ -210,7 +210,7 @@ cdef class FuseVersionsIndex(IndexBase):
return hashindex_get(self.index, <unsigned char *>key) != NULL
NSIndexEntry = namedtuple('NSIndexEntry', 'segment offset size extra')
NSIndexEntry = namedtuple('NSIndexEntry', 'segment offset size')
cdef class NSIndex(IndexBase):
@ -224,7 +224,7 @@ cdef class NSIndex(IndexBase):
raise KeyError(key)
cdef uint32_t segment = _le32toh(data[0])
assert segment <= _MAX_VALUE, "maximum number of segments reached"
return NSIndexEntry(segment, _le32toh(data[1]), _le32toh(data[2]), _le32toh(data[3]))
return NSIndexEntry(segment, _le32toh(data[1]), _le32toh(data[2]))
def __setitem__(self, key, value):
assert len(key) == self.key_size
@ -234,7 +234,7 @@ cdef class NSIndex(IndexBase):
data[0] = _htole32(segment)
data[1] = _htole32(value[1])
data[2] = _htole32(value[2])
data[3] = _htole32(value[3])
data[3] = 0 # init flags to all cleared
if not hashindex_set(self.index, <unsigned char *>key, data):
raise Exception('hashindex_set failed')
@ -247,9 +247,12 @@ cdef class NSIndex(IndexBase):
assert segment <= _MAX_VALUE, "maximum number of segments reached"
return data != NULL
def iteritems(self, marker=None):
def iteritems(self, marker=None, mask=0, value=0):
"""iterate over all items or optionally only over items having specific flag values"""
cdef const unsigned char *key
iter = NSKeyIterator(self.key_size)
assert isinstance(mask, int)
assert isinstance(value, int)
iter = NSKeyIterator(self.key_size, mask, value)
iter.idx = self
iter.index = self.index
if marker:
@ -259,6 +262,20 @@ cdef class NSIndex(IndexBase):
iter.key = key - self.key_size
return iter
def flags(self, key, mask=0xFFFFFFFF, value=None):
"""query and optionally set flags"""
assert len(key) == self.key_size
assert isinstance(mask, int)
data = <uint32_t *>hashindex_get(self.index, <unsigned char *>key)
if not data:
raise KeyError(key)
flags = _le32toh(data[3])
if isinstance(value, int):
new_flags = flags & ~mask # clear masked bits
new_flags |= value & mask # set value bits
data[3] = _htole32(new_flags)
return flags & mask # always return previous flags value
cdef class NSKeyIterator:
cdef NSIndex idx
@ -266,27 +283,38 @@ cdef class NSKeyIterator:
cdef const unsigned char *key
cdef int key_size
cdef int exhausted
cdef int flag_mask
cdef int flag_value
def __cinit__(self, key_size):
def __cinit__(self, key_size, mask, value):
self.key = NULL
self.key_size = key_size
# note: mask and value both default to 0, so they will match all entries
self.flag_mask = _htole32(mask)
self.flag_value = _htole32(value)
self.exhausted = 0
def __iter__(self):
return self
def __next__(self):
cdef uint32_t *value
if self.exhausted:
raise StopIteration
self.key = hashindex_next_key(self.index, <unsigned char *>self.key)
if not self.key:
self.exhausted = 1
raise StopIteration
cdef uint32_t *value = <uint32_t *>(self.key + self.key_size)
while True:
self.key = hashindex_next_key(self.index, <unsigned char *>self.key)
if not self.key:
self.exhausted = 1
raise StopIteration
value = <uint32_t *> (self.key + self.key_size)
if value[3] & self.flag_mask == self.flag_value:
# we found a matching entry!
break
cdef uint32_t segment = _le32toh(value[0])
assert segment <= _MAX_VALUE, "maximum number of segments reached"
return ((<char *>self.key)[:self.key_size],
NSIndexEntry(segment, _le32toh(value[1]), _le32toh(value[2]), _le32toh(value[3])))
NSIndexEntry(segment, _le32toh(value[1]), _le32toh(value[2])))
cdef class NSIndex1(IndexBase): # legacy borg 1.x

View File

@ -821,7 +821,7 @@ class Repository:
except LoggedIO.SegmentFull:
complete_xfer()
new_segment, offset = self.io.write_put(key, data)
self.index[key] = NSIndexEntry(new_segment, offset, len(data), in_index.extra)
self.index[key] = NSIndexEntry(new_segment, offset, len(data))
segments.setdefault(new_segment, 0)
segments[new_segment] += 1
segments[segment] -= 1
@ -937,7 +937,7 @@ class Repository:
self.segments[in_index.segment] -= 1
except KeyError:
pass
self.index[key] = NSIndexEntry(segment, offset, size, 0)
self.index[key] = NSIndexEntry(segment, offset, size)
self.segments[segment] += 1
self.storage_quota_use += header_size(tag) + size
elif tag == TAG_DELETE:
@ -1182,7 +1182,7 @@ class Repository:
self.index = self.open_index(transaction_id)
at_start = marker is None
# smallest valid seg is <uint32> 0, smallest valid offs is <uint32> 8
start_segment, start_offset, _, _ = (0, 0, 0, 0) if at_start else self.index[marker]
start_segment, start_offset, _ = (0, 0, 0) if at_start else self.index[marker]
result = []
for segment, filename in self.io.segment_iterator(start_segment):
obj_iterator = self.io.iter_objects(segment, start_offset, read_data=False, include_data=False)
@ -1212,7 +1212,7 @@ class Repository:
if not self.index:
self.index = self.open_index(self.get_transaction_id())
try:
in_index = NSIndexEntry(*((self.index[id] + (None, None))[:4])) # legacy: no size/extra
in_index = NSIndexEntry(*((self.index[id] + (None, ))[:3])) # legacy: index entriess have no size element
return self.io.read(in_index.segment, in_index.offset, id, expected_size=in_index.size)
except KeyError:
raise self.ObjectNotFound(id, self.path) from None
@ -1243,7 +1243,7 @@ class Repository:
self.storage_quota_use += header_size(TAG_PUT2) + len(data)
self.segments.setdefault(segment, 0)
self.segments[segment] += 1
self.index[id] = NSIndexEntry(segment, offset, len(data), 0)
self.index[id] = NSIndexEntry(segment, offset, len(data))
if self.storage_quota and self.storage_quota_use > self.storage_quota:
self.transaction_doomed = self.StorageQuotaExceeded(
format_file_size(self.storage_quota), format_file_size(self.storage_quota_use))

View File

@ -33,7 +33,7 @@ SELFTEST_CASES = [
ChunkerTestCase,
]
SELFTEST_COUNT = 36
SELFTEST_COUNT = 37
class SelfTestResult(TestResult):

View File

@ -87,8 +87,8 @@ class HashIndexTestCase(BaseTestCase):
del idx
def test_nsindex(self):
self._generic_test(NSIndex, lambda x: (x, x, x, x),
'c9fe5878800d2a0691b667c665a00d4a186e204e891076d6b109016940742bed')
self._generic_test(NSIndex, lambda x: (x, x, x),
'7d70671d0b7e9d2f51b2691ecf35184b9f8ecc1202cceb2748c905c8fc04c256')
def test_chunkindex(self):
self._generic_test(ChunkIndex, lambda x: (x, x),
@ -153,6 +153,70 @@ class HashIndexTestCase(BaseTestCase):
assert chunks == 1 + 2 + 3
assert unique_chunks == 3
def test_flags(self):
idx = NSIndex()
key = H(0)
self.assert_raises(KeyError, idx.flags, key, 0)
idx[key] = 0, 0, 0 # create entry
# check bit 0 and 1, should be both 0 after entry creation
self.assert_equal(idx.flags(key, mask=3), 0)
# set bit 0
idx.flags(key, mask=1, value=1)
self.assert_equal(idx.flags(key, mask=1), 1)
# set bit 1
idx.flags(key, mask=2, value=2)
self.assert_equal(idx.flags(key, mask=2), 2)
# check both bit 0 and 1, both should be set
self.assert_equal(idx.flags(key, mask=3), 3)
# clear bit 1
idx.flags(key, mask=2, value=0)
self.assert_equal(idx.flags(key, mask=2), 0)
# clear bit 0
idx.flags(key, mask=1, value=0)
self.assert_equal(idx.flags(key, mask=1), 0)
# check both bit 0 and 1, both should be cleared
self.assert_equal(idx.flags(key, mask=3), 0)
def test_flags_iteritems(self):
idx = NSIndex()
keys_flagged0 = {H(i) for i in (1, 2, 3, 42)}
keys_flagged1 = {H(i) for i in (11, 12, 13, 142)}
keys_flagged2 = {H(i) for i in (21, 22, 23, 242)}
keys_flagged3 = {H(i) for i in (31, 32, 33, 342)}
for key in keys_flagged0:
idx[key] = 0, 0, 0 # create entry
idx.flags(key, mask=3, value=0) # not really necessary, unflagged is default
for key in keys_flagged1:
idx[key] = 0, 0, 0 # create entry
idx.flags(key, mask=3, value=1)
for key in keys_flagged2:
idx[key] = 0, 0, 0 # create entry
idx.flags(key, mask=3, value=2)
for key in keys_flagged3:
idx[key] = 0, 0, 0 # create entry
idx.flags(key, mask=3, value=3)
# check if we can iterate over all items
k_all = {k for k, v in idx.iteritems()}
self.assert_equal(k_all, keys_flagged0 | keys_flagged1 | keys_flagged2 | keys_flagged3)
# check if we can iterate over the flagged0 items
k0 = {k for k, v in idx.iteritems(mask=3, value=0)}
self.assert_equal(k0, keys_flagged0)
# check if we can iterate over the flagged1 items
k1 = {k for k, v in idx.iteritems(mask=3, value=1)}
self.assert_equal(k1, keys_flagged1)
# check if we can iterate over the flagged2 items
k1 = {k for k, v in idx.iteritems(mask=3, value=2)}
self.assert_equal(k1, keys_flagged2)
# check if we can iterate over the flagged3 items
k1 = {k for k, v in idx.iteritems(mask=3, value=3)}
self.assert_equal(k1, keys_flagged3)
# check if we can iterate over the flagged1 + flagged3 items
k1 = {k for k, v in idx.iteritems(mask=1, value=1)}
self.assert_equal(k1, keys_flagged1 | keys_flagged3)
# check if we can iterate over the flagged0 + flagged2 items
k1 = {k for k, v in idx.iteritems(mask=1, value=0)}
self.assert_equal(k1, keys_flagged0 | keys_flagged2)
class HashIndexExtraTestCase(BaseTestCase):
"""These tests are separate because they should not become part of the selftest.
@ -531,38 +595,38 @@ class IndexCorruptionTestCase(BaseTestCase):
from struct import pack
def HH(w, x, y, z):
# make some 32byte long thing that depends on w, x, y, z.
# same w will mean a collision in the hashtable as bucket index is computed from
# first 4 bytes. giving a specific w targets bucket index w.
# x is to create different keys and does not go into the bucket index calculation.
# so, same w + different x --> collision
return pack('<IIIIIIII', w, x, y, z, 0, 0, 0, 0) # 8 * 4 == 32
def HH(x, y, z):
# make some 32byte long thing that depends on x, y, z.
# same x will mean a collision in the hashtable as bucket index is computed from
# first 4 bytes. giving a specific x targets bucket index x.
# y is to create different keys and does not go into the bucket index calculation.
# so, same x + different y --> collision
return pack('<IIIIIIII', x, y, z, 0, 0, 0, 0, 0) # 8 * 4 == 32
idx = NSIndex()
# create lots of colliding entries
for x in range(700): # stay below max load to not trigger resize
idx[HH(0, x, 0, 0)] = (0, x, 0, 0)
for y in range(700): # stay below max load to not trigger resize
idx[HH(0, y, 0)] = (0, y, 0)
assert idx.size() == 1031 * 48 + 18 # 1031 buckets + header
# delete lots of the collisions, creating lots of tombstones
for x in range(400): # stay above min load to not trigger resize
del idx[HH(0, x, 0, 0)]
for y in range(400): # stay above min load to not trigger resize
del idx[HH(0, y, 0)]
# create lots of colliding entries, within the not yet used part of the hashtable
for x in range(330): # stay below max load to not trigger resize
# at x == 259 a resize will happen due to going beyond max EFFECTIVE load
for y in range(330): # stay below max load to not trigger resize
# at y == 259 a resize will happen due to going beyond max EFFECTIVE load
# if the bug is present, that element will be inserted at the wrong place.
# and because it will be at the wrong place, it can not be found again.
idx[HH(600, x, 0, 0)] = 600, x, 0, 0
idx[HH(600, y, 0)] = 600, y, 0
# now check if hashtable contents is as expected:
assert [idx.get(HH(0, x, 0, 0)) for x in range(400, 700)] == [(0, x, 0, 0) for x in range(400, 700)]
assert [idx.get(HH(0, y, 0)) for y in range(400, 700)] == [(0, y, 0) for y in range(400, 700)]
assert [HH(0, x, 0, 0) in idx for x in range(400)] == [False for x in range(400)] # deleted entries
assert [HH(0, y, 0) in idx for y in range(400)] == [False for y in range(400)] # deleted entries
# this will fail at HH(600, 259) if the bug is present.
assert [idx.get(HH(600, x, 0, 0)) for x in range(330)] == [(600, x, 0, 0) for x in range(330)]
assert [idx.get(HH(600, y, 0)) for y in range(330)] == [(600, y, 0) for y in range(330)]

View File

@ -714,7 +714,7 @@ class RepositoryCheckTestCase(RepositoryTestCaseBase):
def corrupt_object(self, id_):
idx = self.open_index()
segment, offset, _, _ = idx[H(id_)]
segment, offset, _ = idx[H(id_)]
with open(os.path.join(self.tmppath, 'repository', 'data', '0', str(segment)), 'r+b') as fd:
fd.seek(offset)
fd.write(b'BOOM')