info: use CacheSynchronizer & HashIndex.stats_against

This commit is contained in:
Marian Beermann 2017-06-13 14:15:37 +02:00
parent fd1efbac90
commit e189a4d302
5 changed files with 73 additions and 24 deletions

View File

@ -25,7 +25,7 @@ from .cache import ChunkListEntry
from .crypto.key import key_factory
from .compress import Compressor, CompressionSpec
from .constants import * # NOQA
from .hashindex import ChunkIndex, ChunkIndexEntry
from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer
from .helpers import Manifest
from .helpers import hardlinkable
from .helpers import ChunkIteratorFileWrapper, open_item
@ -478,30 +478,22 @@ Utilization of max. archive size: {csize_max:.0%}
def calc_stats(self, cache):
def add(id):
count, size, csize = cache.chunks[id]
stats.update(size, csize, count == 1)
cache.chunks[id] = count - 1, size, csize
entry = cache.chunks[id]
archive_index.add(id, 1, entry.size, entry.csize)
def add_file_chunks(chunks):
for id, _, _ in chunks:
add(id)
# This function is a bit evil since it abuses the cache to calculate
# the stats. The cache transaction must be rolled back afterwards
unpacker = msgpack.Unpacker(use_list=False)
cache.begin_txn()
stats = Statistics()
archive_index = ChunkIndex()
sync = CacheSynchronizer(archive_index)
add(self.id)
pi = ProgressIndicatorPercent(total=len(self.metadata.items), msg='Calculating statistics... %3d%%')
for id, chunk in zip(self.metadata.items, self.repository.get_many(self.metadata.items)):
pi.show(increase=1)
add(id)
data = self.key.decrypt(id, chunk)
unpacker.feed(data)
for item in unpacker:
chunks = item.get(b'chunks')
if chunks is not None:
stats.nfiles += 1
add_file_chunks(chunks)
cache.rollback()
sync.feed(data)
stats = Statistics()
stats.osize, stats.csize, unique_size, stats.usize, unique_chunks, chunks = archive_index.stats_against(cache.chunks)
stats.nfiles = sync.num_files
pi.finish()
return stats
@contextmanager

View File

@ -38,6 +38,7 @@ cache_sync_init(HashIndex *chunks)
unpack_init(&ctx->ctx);
/* needs to be set only once */
ctx->ctx.user.chunks = chunks;
ctx->ctx.user.num_files = 0;
ctx->buf = NULL;
ctx->head = 0;
ctx->tail = 0;
@ -56,11 +57,17 @@ cache_sync_free(CacheSyncCtx *ctx)
}
static const char *
cache_sync_error(CacheSyncCtx *ctx)
cache_sync_error(const CacheSyncCtx *ctx)
{
return ctx->ctx.user.last_error;
}
static uint64_t
cache_sync_num_files(const CacheSyncCtx *ctx)
{
return ctx->ctx.user.num_files;
}
/**
* feed data to the cache synchronizer
* 0 = abort, 1 = continue

View File

@ -50,6 +50,8 @@ typedef struct unpack_user {
HashIndex *chunks;
uint64_t num_files;
/*
* We don't care about most stuff. This flag tells us whether we're at the chunks structure,
* meaning:
@ -358,6 +360,7 @@ static inline int unpack_callback_raw(unpack_user* u, const char* b, const char*
if(length == 6 && !memcmp("chunks", p, 6)) {
u->expect = expect_chunks_begin;
u->inside_chunks = 1;
u->num_files++;
} else {
u->expect = expect_map_item_end;
}

View File

@ -9,7 +9,7 @@ from libc.errno cimport errno
from cpython.exc cimport PyErr_SetFromErrnoWithFilename
from cpython.buffer cimport PyBUF_SIMPLE, PyObject_GetBuffer, PyBuffer_Release
API_VERSION = '1.1_03'
API_VERSION = '1.1_04'
cdef extern from "_hashindex.c":
@ -38,7 +38,8 @@ cdef extern from "cache_sync/cache_sync.c":
pass
CacheSyncCtx *cache_sync_init(HashIndex *chunks)
const char *cache_sync_error(CacheSyncCtx *ctx)
const char *cache_sync_error(const CacheSyncCtx *ctx)
uint64_t cache_sync_num_files(const CacheSyncCtx *ctx)
int cache_sync_feed(CacheSyncCtx *ctx, void *data, uint32_t length)
void cache_sync_free(CacheSyncCtx *ctx)
@ -329,6 +330,48 @@ cdef class ChunkIndex(IndexBase):
return size, csize, unique_size, unique_csize, unique_chunks, chunks
def stats_against(self, ChunkIndex master_index):
"""
Calculate chunk statistics of this index against *master_index*.
A chunk is counted as unique if the number of references
in this index matches the number of references in *master_index*.
This index must be a subset of *master_index*.
Return the same statistics tuple as summarize:
size, csize, unique_size, unique_csize, unique_chunks, chunks.
"""
cdef uint64_t size = 0, csize = 0, unique_size = 0, unique_csize = 0, chunks = 0, unique_chunks = 0
cdef uint32_t our_refcount, chunk_size, chunk_csize
cdef const uint32_t *our_values
cdef const uint32_t *master_values
cdef const void *key = NULL
cdef HashIndex *master = master_index.index
while True:
key = hashindex_next_key(self.index, key)
if not key:
break
our_values = <const uint32_t*> (key + self.key_size)
master_values = <const uint32_t*> hashindex_get(master, key)
if not master_values:
raise ValueError('stats_against: key contained in self but not in master_index.')
our_refcount = _le32toh(our_values[0])
chunk_size = _le32toh(master_values[1])
chunk_csize = _le32toh(master_values[2])
chunks += our_refcount
size += <uint64_t> chunk_size * our_refcount
csize += <uint64_t> chunk_csize * our_refcount
if our_values[0] == master_values[0]:
# our refcount equals the master's refcount, so this chunk is unique to us
unique_chunks += 1
unique_size += chunk_size
unique_csize += chunk_csize
return size, csize, unique_size, unique_csize, unique_chunks, chunks
def add(self, key, refs, size, csize):
assert len(key) == self.key_size
cdef uint32_t[3] data
@ -420,3 +463,7 @@ cdef class CacheSynchronizer:
error = cache_sync_error(self.sync)
if error != NULL:
raise ValueError('cache_sync_feed failed: ' + error.decode('ascii'))
@property
def num_files(self):
return cache_sync_num_files(self.sync)

View File

@ -131,7 +131,7 @@ class MandatoryFeatureUnsupported(Error):
def check_extension_modules():
from . import platform, compress, item
if hashindex.API_VERSION != '1.1_03':
if hashindex.API_VERSION != '1.1_04':
raise ExtensionModuleError
if chunker.API_VERSION != '1.1_01':
raise ExtensionModuleError