From 740898d83ba9d83589bcbd607050d23007de318a Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Tue, 7 Mar 2017 15:13:59 +0100 Subject: [PATCH] CacheSynchronizer --- setup.py | 2 +- src/borg/_cache.c | 159 +++++++++++++++++++++++++++++++++++++++++ src/borg/cache.py | 12 +--- src/borg/hashindex.pyx | 38 ++++++++-- src/borg/helpers.py | 2 +- 5 files changed, 198 insertions(+), 15 deletions(-) create mode 100644 src/borg/_cache.c diff --git a/setup.py b/setup.py index 726c849c3..6878ed96b 100644 --- a/setup.py +++ b/setup.py @@ -600,7 +600,7 @@ if not on_rtd: ext_modules += [ Extension('borg.compress', [compress_source], libraries=['lz4'], include_dirs=include_dirs, library_dirs=library_dirs, define_macros=define_macros), Extension('borg.crypto.low_level', [crypto_ll_source], libraries=crypto_libraries, include_dirs=include_dirs, library_dirs=library_dirs, define_macros=define_macros), - Extension('borg.hashindex', [hashindex_source]), + Extension('borg.hashindex', [hashindex_source], libraries=['msgpackc']), Extension('borg.item', [item_source]), Extension('borg.algorithms.chunker', [chunker_source]), Extension('borg.algorithms.checksums', [checksums_source]), diff --git a/src/borg/_cache.c b/src/borg/_cache.c new file mode 100644 index 000000000..880608ff7 --- /dev/null +++ b/src/borg/_cache.c @@ -0,0 +1,159 @@ + +#include + +// 2**32 - 1025 +#define _MAX_VALUE ( (uint32_t) 4294966271 ) + +#define MIN(x, y) ((x) < (y) ? (x): (y)) + +typedef struct { + HashIndex *chunks; + + msgpack_unpacker unpacker; + msgpack_unpacked unpacked; + const char *error; +} CacheSyncCtx; + +static CacheSyncCtx * +cache_sync_init(HashIndex *chunks) +{ + CacheSyncCtx *ctx; + if (!(ctx = malloc(sizeof(CacheSyncCtx)))) { + return NULL; + } + + ctx->chunks = chunks; + ctx->error = NULL; + + if(!msgpack_unpacker_init(&ctx->unpacker, MSGPACK_UNPACKER_INIT_BUFFER_SIZE)) { + free(ctx); + return NULL; + } + + msgpack_unpacked_init(&ctx->unpacked); + + return ctx; +} + +static void +cache_sync_free(CacheSyncCtx *ctx) +{ + msgpack_unpacker_destroy(&ctx->unpacker); + msgpack_unpacked_destroy(&ctx->unpacked); + free(ctx); +} + +static const char * +cache_sync_error(CacheSyncCtx *ctx) +{ + return ctx->error; +} + +static int +cache_process_chunks(CacheSyncCtx *ctx, msgpack_object_array *array) +{ + uint32_t i; + const char *key; + uint32_t cache_values[3]; + uint32_t *cache_entry; + uint64_t refcount; + msgpack_object *current; + for (i = 0; i < array->size; i++) { + current = &array->ptr[i]; + + if (current->type != MSGPACK_OBJECT_ARRAY || current->via.array.size != 3 + || current->via.array.ptr[0].type != MSGPACK_OBJECT_STR || current->via.array.ptr[0].via.str.size != 32 + || current->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER + || current->via.array.ptr[2].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { + ctx->error = "Malformed chunk list entry"; + return 0; + } + + key = current->via.array.ptr[0].via.str.ptr; + cache_entry = (uint32_t*) hashindex_get(ctx->chunks, key); + if (cache_entry) { + refcount = _le32toh(cache_entry[0]); + refcount += 1; + cache_entry[0] = _htole32(MIN(refcount, _MAX_VALUE)); + } else { + /* refcount, size, csize */ + cache_values[0] = 1; + cache_values[1] = current->via.array.ptr[1].via.u64; + cache_values[2] = current->via.array.ptr[2].via.u64; + if (!hashindex_set(ctx->chunks, key, cache_values)) { + ctx->error = "hashindex_set failed"; + return 0; + } + } + } + return 1; +} + +/** + * feed data to the cache synchronizer + * 0 = abort, 1 = continue + * abort is a regular condition, check cache_sync_error + */ +static int +cache_sync_feed(CacheSyncCtx *ctx, void *data, uint32_t length) +{ + msgpack_unpack_return unpack_status; + + /* grow buffer if necessary */ + if (msgpack_unpacker_buffer_capacity(&ctx->unpacker) < length) { + if (!msgpack_unpacker_reserve_buffer(&ctx->unpacker, length)) { + return 0; + } + } + + memcpy(msgpack_unpacker_buffer(&ctx->unpacker), data, length); + msgpack_unpacker_buffer_consumed(&ctx->unpacker, length); + + do { + unpack_status = msgpack_unpacker_next(&ctx->unpacker, &ctx->unpacked); + + switch (unpack_status) { + case MSGPACK_UNPACK_SUCCESS: + { + uint32_t i; + msgpack_object *item = &ctx->unpacked.data; + msgpack_object_kv *current; + + if (item->type != MSGPACK_OBJECT_MAP) { + ctx->error = "Unexpected data type in item stream"; + return 0; + } + + for (i = 0; i < item->via.map.size; i++) { + current = &item->via.map.ptr[i]; + + if (current->key.type != MSGPACK_OBJECT_STR) { + ctx->error = "Invalid key data type in item"; + return 0; + } + + if (current->key.via.str.size == 6 + && !memcmp(current->key.via.str.ptr, "chunks", 6)) { + + if (current->val.type != MSGPACK_OBJECT_ARRAY) { + ctx->error = "Unexpected value type of item chunks"; + return 0; + } + + if (!cache_process_chunks(ctx, ¤t->val.via.array)) { + return 0; + } + } + } + } + break; + case MSGPACK_UNPACK_PARSE_ERROR: + ctx->error = "Malformed msgpack"; + return 0; + default: + break; + } + } while (unpack_status != MSGPACK_UNPACK_CONTINUE); + + return 1; +} diff --git a/src/borg/cache.py b/src/borg/cache.py index 13045f0e9..c9fa70b7f 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -12,7 +12,7 @@ from .logger import create_logger logger = create_logger() from .constants import CACHE_README -from .hashindex import ChunkIndex, ChunkIndexEntry +from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer from .helpers import Location from .helpers import Error from .helpers import get_cache_dir, get_security_dir @@ -571,17 +571,11 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" archive = ArchiveItem(internal_dict=msgpack.unpackb(data)) if archive.version != 1: raise Exception('Unknown archive metadata version') - unpacker = msgpack.Unpacker() + sync = CacheSynchronizer(chunk_idx) for item_id, chunk in zip(archive.items, repository.get_many(archive.items)): data = key.decrypt(item_id, chunk) chunk_idx.add(item_id, 1, len(data), len(chunk)) - unpacker.feed(data) - for item in unpacker: - if not isinstance(item, dict): - logger.error('Error: Did not get expected metadata dict - archive corrupted!') - continue # XXX: continue?! - for chunk_id, size, csize in item.get(b'chunks', []): - chunk_idx.add(chunk_id, 1, size, csize) + sync.feed(data) if self.do_cache: fn = mkpath(archive_id) fn_tmp = mkpath(archive_id, suffix='.tmp') diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index 2409836fe..75ba1df35 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -8,7 +8,7 @@ from libc.stdint cimport uint32_t, UINT32_MAX, uint64_t from libc.errno cimport errno from cpython.exc cimport PyErr_SetFromErrnoWithFilename -API_VERSION = '1.1_01' +API_VERSION = '1.1_02' cdef extern from "_hashindex.c": @@ -31,6 +31,18 @@ cdef extern from "_hashindex.c": double HASH_MAX_LOAD +cdef extern from "_cache.c": + ctypedef struct CacheSyncCtx: + pass + + CacheSyncCtx *cache_sync_init(HashIndex *chunks) + const char *cache_sync_error(CacheSyncCtx *ctx) + int cache_sync_feed(CacheSyncCtx *ctx, void *data, uint32_t length) + void cache_sync_free(CacheSyncCtx *ctx) + + uint32_t _MAX_VALUE + + cdef _NoDefault = object() """ @@ -50,9 +62,6 @@ AssertionError is raised instead. assert UINT32_MAX == 2**32-1 -# module-level constant because cdef's in classes can't have default values -cdef uint32_t _MAX_VALUE = 2**32-1025 - assert _MAX_VALUE % 2 == 1 @@ -375,3 +384,24 @@ cdef class ChunkKeyIterator: cdef uint32_t refcount = _le32toh(value[0]) assert refcount <= _MAX_VALUE, "invalid reference count" return (self.key)[:self.key_size], ChunkIndexEntry(refcount, _le32toh(value[1]), _le32toh(value[2])) + + +cdef class CacheSynchronizer: + cdef ChunkIndex chunks + cdef CacheSyncCtx *sync + + def __cinit__(self, chunks): + self.chunks = chunks + self.sync = cache_sync_init(self.chunks.index) + if not self.sync: + raise Exception('cache_sync_init failed') + + def __dealloc__(self): + if self.sync: + cache_sync_free(self.sync) + + def feed(self, chunk): + if not cache_sync_feed(self.sync, chunk, len(chunk)): + error = cache_sync_error(self.sync) + if error is not None: + raise Exception('cache_sync_feed failed: ' + error.decode('ascii')) diff --git a/src/borg/helpers.py b/src/borg/helpers.py index db66b822a..7e4d4baf9 100644 --- a/src/borg/helpers.py +++ b/src/borg/helpers.py @@ -126,7 +126,7 @@ def check_python(): def check_extension_modules(): from . import platform, compress, item - if hashindex.API_VERSION != '1.1_01': + if hashindex.API_VERSION != '1.1_02': raise ExtensionModuleError if chunker.API_VERSION != '1.1_01': raise ExtensionModuleError