remove csize from item.chunks elements

This commit is contained in:
Thomas Waldmann 2022-06-10 20:36:58 +02:00
parent b9f9623a6d
commit ace5957524
11 changed files with 58 additions and 95 deletions

View File

@ -359,7 +359,7 @@ class CacheChunkBuffer(ChunkBuffer):
self.stats = stats
def write_chunk(self, chunk):
id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats, wait=False)
id_, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats, wait=False)
self.cache.repository.async_response(wait=False)
return id_
@ -921,7 +921,7 @@ Utilization of max. archive size: {csize_max:.0%}
item = Item(internal_dict=item)
if 'chunks' in item:
part = not self.consider_part_files and 'part' in item
for chunk_id, size, _ in item.chunks:
for chunk_id, size in item.chunks:
chunk_decref(chunk_id, stats, part=part)
except (TypeError, ValueError):
# if items metadata spans multiple chunks and one chunk got dropped somehow,
@ -1279,7 +1279,7 @@ class FilesystemObjectProcessors:
# this needs to be done early, so that part files also get the patched mode.
item.mode = stat.S_IFREG | stat.S_IMODE(item.mode)
if 'chunks' in item: # create_helper might have put chunks from a previous hardlink there
[cache.chunk_incref(id_, self.stats) for id_, _, _ in item.chunks]
[cache.chunk_incref(id_, self.stats) for id_, _ in item.chunks]
else: # normal case, no "2nd+" hardlink
if not is_special_file:
hashed_path = safe_encode(os.path.join(self.cwd, path))
@ -1761,7 +1761,7 @@ class ArchiveChecker:
has_chunks_healthy = False
chunks_healthy = chunks_current
for chunk_current, chunk_healthy in zip(chunks_current, chunks_healthy):
chunk_id, size, csize = chunk_healthy
chunk_id, size = chunk_healthy
if chunk_id not in self.chunks:
# a chunk of the healthy list is missing
if chunk_current == chunk_healthy:
@ -1775,7 +1775,7 @@ class ArchiveChecker:
logger.info('{}: {}: Previously missing file chunk is still missing (Byte {}-{}, Chunk {}). '
'It has an all-zero replacement chunk already.'.format(
archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id)))
chunk_id, size, csize = chunk_current
chunk_id, size = chunk_current
if chunk_id in self.chunks:
add_reference(chunk_id, size)
else:
@ -1794,7 +1794,7 @@ class ArchiveChecker:
archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id)))
add_reference(chunk_id, size)
mark_as_possibly_superseded(chunk_current[0]) # maybe orphaned the all-zero replacement chunk
chunk_list.append([chunk_id, size, csize]) # list-typed element as chunks_healthy is list-of-lists
chunk_list.append([chunk_id, size]) # list-typed element as chunks_healthy is list-of-lists
offset += size
if chunks_replaced and not has_chunks_healthy:
# if this is first repair, remember the correct chunk IDs, so we can maybe heal the file later
@ -2046,7 +2046,7 @@ class ArchiveRecreater:
def process_chunks(self, archive, target, item):
if not self.recompress and not target.recreate_rechunkify:
for chunk_id, size, csize in item.chunks:
for chunk_id, size in item.chunks:
self.cache.chunk_incref(chunk_id, target.stats)
return item.chunks
chunk_iterator = self.iter_chunks(archive, target, list(item.chunks))
@ -2070,7 +2070,7 @@ class ArchiveRecreater:
return chunk_entry
def iter_chunks(self, archive, target, chunks):
chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in chunks])
chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _ in chunks])
if target.recreate_rechunkify:
# The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
# (does not load the entire file into memory)

View File

@ -424,7 +424,7 @@ class Archiver:
for item in other_archive.iter_items():
if 'chunks' in item:
chunks = []
for chunk_id, size, _ in item.chunks:
for chunk_id, size in item.chunks:
refcount = cache.seen_chunk(chunk_id, size)
if refcount == 0: # target repo does not yet have this chunk
if not dry_run:
@ -1331,7 +1331,7 @@ class Archiver:
"""
Return a file-like object that reads from the chunks of *item*.
"""
chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in item.chunks],
chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _ in item.chunks],
is_preloaded=True)
if pi:
info = [remove_surrogates(item.path)]

View File

@ -907,7 +907,7 @@ class LocalCache(CacheStatsMixin):
self.repository.put(id, data, wait=wait)
self.chunks.add(id, 1, size, csize)
stats.update(size)
return ChunkListEntry(id, size, csize)
return ChunkListEntry(id, size)
def seen_chunk(self, id, size=None):
refcount, stored_size, _ = self.chunks.get(id, ChunkIndexEntry(0, None, None))
@ -921,14 +921,14 @@ class LocalCache(CacheStatsMixin):
def chunk_incref(self, id, stats, size=None, part=False):
if not self.txn_active:
self.begin_txn()
count, _size, csize = self.chunks.incref(id)
count, _size, _ = self.chunks.incref(id)
stats.update(_size, part=part)
return ChunkListEntry(id, _size, csize)
return ChunkListEntry(id, _size)
def chunk_decref(self, id, stats, wait=True, part=False):
if not self.txn_active:
self.begin_txn()
count, size, csize = self.chunks.decref(id)
count, size, _ = self.chunks.decref(id)
if count == 0:
del self.chunks[id]
self.repository.delete(id, wait=wait)
@ -1075,7 +1075,7 @@ Chunk index: {0.total_unique_chunks:20d} unknown"""
self.repository.put(id, data, wait=wait)
self.chunks.add(id, 1, size, csize)
stats.update(size)
return ChunkListEntry(id, size, csize)
return ChunkListEntry(id, size)
def seen_chunk(self, id, size=None):
if not self._txn_active:
@ -1097,7 +1097,7 @@ Chunk index: {0.total_unique_chunks:20d} unknown"""
size = _size or size
assert size
stats.update(size, part=part)
return ChunkListEntry(id, size, csize)
return ChunkListEntry(id, size)
def chunk_decref(self, id, stats, wait=True, part=False):
if not self._txn_active:

View File

@ -39,10 +39,8 @@ cache_sync_init(HashIndex *chunks)
/* needs to be set only once */
ctx->ctx.user.chunks = chunks;
ctx->ctx.user.parts.size = 0;
ctx->ctx.user.parts.csize = 0;
ctx->ctx.user.parts.num_files = 0;
ctx->ctx.user.totals.size = 0;
ctx->ctx.user.totals.csize = 0;
ctx->ctx.user.totals.num_files = 0;
ctx->buf = NULL;
ctx->head = 0;
@ -91,18 +89,6 @@ cache_sync_size_parts(const CacheSyncCtx *ctx)
return ctx->ctx.user.parts.size;
}
static uint64_t
cache_sync_csize_totals(const CacheSyncCtx *ctx)
{
return ctx->ctx.user.totals.csize;
}
static uint64_t
cache_sync_csize_parts(const CacheSyncCtx *ctx)
{
return ctx->ctx.user.parts.csize;
}
/**
* feed data to the cache synchronizer
* 0 = abort, 1 = continue

View File

@ -86,14 +86,12 @@ typedef struct unpack_user {
/*
* processing ChunkListEntry tuple:
* expect_key, expect_size, expect_csize, expect_entry_end
* expect_key, expect_size, expect_entry_end
*/
/* next thing must be the key (raw, l=32) */
expect_key,
/* next thing must be the size (int) */
expect_size,
/* next thing must be the csize (int) */
expect_csize,
/* next thing must be the end of the CLE (array_end) */
expect_entry_end,
@ -103,23 +101,22 @@ typedef struct unpack_user {
/* collect values here for current chunklist entry */
struct {
unsigned char key[32];
uint32_t csize;
uint32_t size;
} current;
/* summing up chunks sizes here within a single item */
struct {
uint64_t size, csize;
uint64_t size;
} item;
/* total sizes and files count coming from all files */
struct {
uint64_t size, csize, num_files;
uint64_t size, num_files;
} totals;
/* total sizes and files count coming from part files */
struct {
uint64_t size, csize, num_files;
uint64_t size, num_files;
} parts;
} unpack_user;
@ -147,10 +144,6 @@ static inline int unpack_callback_uint64(unpack_user* u, int64_t d)
switch(u->expect) {
case expect_size:
u->current.size = d;
u->expect = expect_csize;
break;
case expect_csize:
u->current.csize = d;
u->expect = expect_entry_end;
break;
default:
@ -239,7 +232,7 @@ static inline int unpack_callback_array(unpack_user* u, unsigned int n)
case expect_entry_begin_or_chunks_end:
/* b'chunks': [ (
* ^ */
if(n != 3) {
if(n != 2) {
SET_LAST_ERROR("Invalid chunk list entry length");
return -1;
}
@ -283,18 +276,16 @@ static inline int unpack_callback_array_end(unpack_user* u)
refcount += 1;
cache_entry[0] = _htole32(MIN(refcount, _MAX_VALUE));
} else {
/* refcount, size, csize */
/* refcount, size */
cache_values[0] = _htole32(1);
cache_values[1] = _htole32(u->current.size);
cache_values[2] = _htole32(u->current.csize);
cache_values[2] = _htole32(0); /* fake csize for now */
if(!hashindex_set(u->chunks, u->current.key, cache_values)) {
SET_LAST_ERROR("hashindex_set failed");
return -1;
}
}
u->item.size += u->current.size;
u->item.csize += u->current.csize;
u->expect = expect_entry_begin_or_chunks_end;
break;
case expect_entry_begin_or_chunks_end:
@ -330,7 +321,6 @@ static inline int unpack_callback_map(unpack_user* u, unsigned int n)
u->part = 0;
u->has_chunks = 0;
u->item.size = 0;
u->item.csize = 0;
}
if(u->inside_chunks) {
@ -372,11 +362,9 @@ static inline int unpack_callback_map_end(unpack_user* u)
if(u->part) {
u->parts.num_files += 1;
u->parts.size += u->item.size;
u->parts.csize += u->item.csize;
}
u->totals.num_files += 1;
u->totals.size += u->item.size;
u->totals.csize += u->item.csize;
}
}
return 0;

View File

@ -377,7 +377,7 @@ class FuseBackend:
file_id = blake2b_128(path)
current_version, previous_id = self.versions_index.get(file_id, (0, None))
contents_id = blake2b_128(b''.join(chunk_id for chunk_id, _, _ in item.chunks))
contents_id = blake2b_128(b''.join(chunk_id for chunk_id, _ in item.chunks))
if contents_id != previous_id:
current_version += 1
@ -658,7 +658,7 @@ class FuseOperations(llfuse.Operations, FuseBackend):
chunks = item.chunks
# note: using index iteration to avoid frequently copying big (sub)lists by slicing
for idx in range(chunk_no, len(chunks)):
id, s, csize = chunks[idx]
id, s = chunks[idx]
if s < offset:
offset -= s
chunk_offset += s

View File

@ -49,8 +49,6 @@ cdef extern from "cache_sync/cache_sync.c":
uint64_t cache_sync_num_files_parts(const CacheSyncCtx *ctx)
uint64_t cache_sync_size_totals(const CacheSyncCtx *ctx)
uint64_t cache_sync_size_parts(const CacheSyncCtx *ctx)
uint64_t cache_sync_csize_totals(const CacheSyncCtx *ctx)
uint64_t cache_sync_csize_parts(const CacheSyncCtx *ctx)
int cache_sync_feed(CacheSyncCtx *ctx, void *data, uint32_t length)
void cache_sync_free(CacheSyncCtx *ctx)
@ -544,11 +542,3 @@ cdef class CacheSynchronizer:
@property
def size_parts(self):
return cache_sync_size_parts(self.sync)
@property
def csize_totals(self):
return cache_sync_csize_totals(self.sync)
@property
def csize_parts(self):
return cache_sync_csize_parts(self.sync)

View File

@ -61,10 +61,10 @@ def fix_list_of_chunkentries(v):
chunks = []
for ce in v:
assert isinstance(ce, (tuple, list))
assert len(ce) == 3 # id, size, csize
assert len(ce) in (2, 3) # id, size[, csize]
assert isinstance(ce[1], int)
assert isinstance(ce[2], int)
ce_fixed = [want_bytes(ce[0]), ce[1], ce[2]] # list!
assert len(ce) == 2 or isinstance(ce[2], int)
ce_fixed = [want_bytes(ce[0]), ce[1]] # list! id, size only, drop csize
chunks.append(ce_fixed) # create a list of lists
return chunks
@ -227,7 +227,7 @@ class PropDict:
return property(_get, _set, _del, doc=doc)
ChunkListEntry = namedtuple('ChunkListEntry', 'id size csize')
ChunkListEntry = namedtuple('ChunkListEntry', 'id size')
class Item(PropDict):
"""

View File

@ -95,7 +95,7 @@ class MockCache:
def add_chunk(self, id, chunk, stats=None, wait=True):
self.objects[id] = chunk
return id, len(chunk), len(chunk)
return id, len(chunk)
class ArchiveTimestampTestCase(BaseTestCase):

View File

@ -43,14 +43,14 @@ class TestCacheSynchronizer:
'bar': 5678,
'user': 'chunks',
'chunks': [
(H(1), 1, 2),
(H(2), 2, 3),
(H(1), 1),
(H(2), 2),
]
})
sync.feed(data)
assert len(index) == 2
assert index[H(1)] == (1, 1, 2)
assert index[H(2)] == (1, 2, 3)
assert index[H(1)] == (1, 1, 0)
assert index[H(2)] == (1, 2, 0)
def test_multiple(self, index, sync):
data = packb({
@ -59,8 +59,8 @@ class TestCacheSynchronizer:
'bar': 5678,
'user': 'chunks',
'chunks': [
(H(1), 1, 2),
(H(2), 2, 3),
(H(1), 1),
(H(2), 2),
]
})
data += packb({
@ -78,8 +78,8 @@ class TestCacheSynchronizer:
'chunks': '123456',
},
'chunks': [
(H(1), 1, 2),
(H(2), 2, 3),
(H(1), 1),
(H(2), 2),
],
'stuff': [
(1, 2, 3),
@ -87,12 +87,12 @@ class TestCacheSynchronizer:
})
data += packb({
'chunks': [
(H(3), 1, 2),
(H(3), 1),
],
})
data += packb({
'chunks': [
(H(1), 1, 2),
(H(1), 1),
],
})
@ -103,9 +103,9 @@ class TestCacheSynchronizer:
sync.feed(part2)
sync.feed(part3)
assert len(index) == 3
assert index[H(1)] == (3, 1, 2)
assert index[H(2)] == (2, 2, 3)
assert index[H(3)] == (1, 1, 2)
assert index[H(1)] == (3, 1, 0)
assert index[H(2)] == (2, 2, 0)
assert index[H(3)] == (1, 1, 0)
@pytest.mark.parametrize('elem,error', (
({1: 2}, 'Unexpected object: map'),
@ -121,7 +121,7 @@ class TestCacheSynchronizer:
@pytest.mark.parametrize('structure', (
lambda elem: {'chunks': elem},
lambda elem: {'chunks': [elem]},
lambda elem: {'chunks': [(elem, 1, 2)]},
lambda elem: {'chunks': [(elem, 1)]},
))
def test_corrupted(self, sync, structure, elem, error):
packed = packb(structure(elem))
@ -135,11 +135,11 @@ class TestCacheSynchronizer:
@pytest.mark.parametrize('data,error', (
# Incorrect tuple length
({'chunks': [(bytes(32), 2, 3, 4)]}, 'Invalid chunk list entry length'),
({'chunks': [(bytes(32), 2)]}, 'Invalid chunk list entry length'),
({'chunks': [(bytes(32), )]}, 'Invalid chunk list entry length'),
# Incorrect types
({'chunks': [(1, 2, 3)]}, 'Unexpected object: integer'),
({'chunks': [(1, bytes(32), 2)]}, 'Unexpected object: integer'),
({'chunks': [(bytes(32), 1.0, 2)]}, 'Unexpected object: double'),
({'chunks': [(1, 2)]}, 'Unexpected object: integer'),
({'chunks': [(1, bytes(32))]}, 'Unexpected object: integer'),
({'chunks': [(bytes(32), 1.0)]}, 'Unexpected object: double'),
))
def test_corrupted_ancillary(self, index, sync, data, error):
packed = packb(data)
@ -173,7 +173,7 @@ class TestCacheSynchronizer:
sync = CacheSynchronizer(index)
data = packb({
'chunks': [
(H(0), 1, 2),
(H(0), 1),
]
})
with pytest.raises(ValueError) as excinfo:
@ -185,7 +185,7 @@ class TestCacheSynchronizer:
sync = CacheSynchronizer(index)
data = packb({
'chunks': [
(H(0), 1, 2),
(H(0), 1),
]
})
sync.feed(data)
@ -196,7 +196,7 @@ class TestCacheSynchronizer:
sync = CacheSynchronizer(index)
data = packb({
'chunks': [
(H(0), 1, 2),
(H(0), 1),
]
})
sync.feed(data)
@ -244,7 +244,7 @@ class TestAdHocCache:
cache.add_chunk(H(1), b'5678', Statistics(), overwrite=True)
def test_seen_chunk_add_chunk_size(self, cache):
assert cache.add_chunk(H(1), b'5678', Statistics()) == (H(1), 4, 0)
assert cache.add_chunk(H(1), b'5678', Statistics()) == (H(1), 4)
def test_deletes_chunks_during_lifetime(self, cache, repository):
"""E.g. checkpoint archives"""
@ -270,10 +270,10 @@ class TestAdHocCache:
assert not hasattr(cache, 'chunks')
def test_incref_after_add_chunk(self, cache):
assert cache.add_chunk(H(3), b'5678', Statistics()) == (H(3), 4, 47)
assert cache.chunk_incref(H(3), Statistics()) == (H(3), 4, 47)
assert cache.add_chunk(H(3), b'5678', Statistics()) == (H(3), 4)
assert cache.chunk_incref(H(3), Statistics()) == (H(3), 4)
def test_existing_incref_after_add_chunk(self, cache):
"""This case occurs with part files, see Archive.chunk_file."""
assert cache.add_chunk(H(1), b'5678', Statistics()) == (H(1), 4, 0)
assert cache.chunk_incref(H(1), Statistics()) == (H(1), 4, 0)
assert cache.add_chunk(H(1), b'5678', Statistics()) == (H(1), 4)
assert cache.chunk_incref(H(1), Statistics()) == (H(1), 4)

View File

@ -143,11 +143,10 @@ def test_unknown_property():
def test_item_file_size():
item = Item(mode=0o100666, chunks=[
ChunkListEntry(csize=0, size=1000, id=None),
ChunkListEntry(csize=0, size=2000, id=None),
ChunkListEntry(size=1000, id=None),
ChunkListEntry(size=2000, id=None),
])
assert item.get_size() == 3000
assert item.get_size(compressed=True) == 0 # no csize any more
item.get_size(memorize=True)
assert item.size == 3000