From c786a5941eb560b51264c24e18f9c1442fb721bb Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Fri, 26 May 2017 22:54:27 +0200 Subject: [PATCH] CacheSynchronizer: redo as quasi FSM on top of unpack.h This is a (relatively) simple state machine running in the data callbacks invoked by the msgpack unpacking stack machine (the same machine is used in msgpack-c and msgpack-python, changes are minor and cosmetic, e.g. removal of msgpack_unpack_object, removal of the C++ template thus porting to C and so on). Compared to the previous solution this has multiple advantages - msgpack-c dependency is removed - this approach is faster and requires fewer and smaller memory allocations Testability of the two solutions does not differ in my professional opinion(tm). Two other changes were rolled up; _hashindex.c can be compiled without Python.h again (handy for fuzzing and testing); a "small" bug in the cache sync was fixed which allocated too large archive indices, leading to excessive archive.chunks.d disk usage (that actually gave me an idea). --- scripts/fuzz-cache-sync/HOWTO | 10 + scripts/fuzz-cache-sync/main.c | 30 ++ .../fuzz-cache-sync/testcase_dir/test_simple | Bin 0 -> 119 bytes setup.py | 4 +- src/borg/_cache.c | 159 -------- src/borg/_hashindex.c | 19 +- src/borg/cache.py | 2 +- src/borg/cache_sync/cache_sync.c | 108 +++++ src/borg/cache_sync/sysdep.h | 194 +++++++++ src/borg/cache_sync/unpack.h | 374 ++++++++++++++++++ src/borg/cache_sync/unpack_define.h | 95 +++++ src/borg/cache_sync/unpack_template.h | 359 +++++++++++++++++ src/borg/hashindex.pyx | 6 +- src/borg/testsuite/cache.py | 139 +++++++ 14 files changed, 1332 insertions(+), 167 deletions(-) create mode 100644 scripts/fuzz-cache-sync/HOWTO create mode 100644 scripts/fuzz-cache-sync/main.c create mode 100644 scripts/fuzz-cache-sync/testcase_dir/test_simple delete mode 100644 src/borg/_cache.c create mode 100644 src/borg/cache_sync/cache_sync.c create mode 100644 src/borg/cache_sync/sysdep.h create mode 100644 src/borg/cache_sync/unpack.h create mode 100644 src/borg/cache_sync/unpack_define.h create mode 100644 src/borg/cache_sync/unpack_template.h create mode 100644 src/borg/testsuite/cache.py diff --git a/scripts/fuzz-cache-sync/HOWTO b/scripts/fuzz-cache-sync/HOWTO new file mode 100644 index 00000000..ae144b28 --- /dev/null +++ b/scripts/fuzz-cache-sync/HOWTO @@ -0,0 +1,10 @@ +- Install AFL and the requirements for LLVM mode (see docs) +- Compile the fuzzing target, e.g. + + AFL_HARDEN=1 afl-clang-fast main.c -o fuzz-target -O3 + + (other options, like using ASan or MSan are possible as well) +- Add additional test cases to testcase_dir +- Run afl, easiest (but inefficient) way; + + afl-fuzz -i testcase_dir -o findings_dir ./fuzz-target diff --git a/scripts/fuzz-cache-sync/main.c b/scripts/fuzz-cache-sync/main.c new file mode 100644 index 00000000..b25d925d --- /dev/null +++ b/scripts/fuzz-cache-sync/main.c @@ -0,0 +1,30 @@ +#include "../../src/borg/_hashindex.c" +#include "../../src/borg/cache_sync/cache_sync.c" + +#define BUFSZ 32768 + +int main() { + char buf[BUFSZ]; + int len, ret; + CacheSyncCtx *ctx; + HashIndex *idx; + + /* capacity, key size, value size */ + idx = hashindex_init(0, 32, 12); + ctx = cache_sync_init(idx); + + while (1) { + len = read(0, buf, BUFSZ); + if (!len) { + break; + } + ret = cache_sync_feed(ctx, buf, len); + if(!ret && cache_sync_error(ctx)) { + fprintf(stderr, "error: %s\n", cache_sync_error(ctx)); + return 1; + } + } + hashindex_free(idx); + cache_sync_free(ctx); + return 0; +} diff --git a/scripts/fuzz-cache-sync/testcase_dir/test_simple b/scripts/fuzz-cache-sync/testcase_dir/test_simple new file mode 100644 index 0000000000000000000000000000000000000000..0bf5a0ea1616fcbcec4e17de7b347b00f5661ca1 GIT binary patch literal 119 zcmZo&oR*)zI4Q9Rh^x-BTmmuAis>yWElw?3mYh+Vmt72{CQZJ@pkRO>7&0;up~{Gf F82}?xBQF2| literal 0 HcmV?d00001 diff --git a/setup.py b/setup.py index 6878ed96..951d61af 100644 --- a/setup.py +++ b/setup.py @@ -91,6 +91,8 @@ try: 'src/borg/crypto/low_level.c', 'src/borg/algorithms/chunker.c', 'src/borg/algorithms/buzhash.c', 'src/borg/hashindex.c', 'src/borg/_hashindex.c', + 'src/borg/cache_sync/cache_sync.c', 'src/borg/cache_sync/sysdep.h', 'src/borg/cache_sync/unpack.h', + 'src/borg/cache_sync/unpack_define.h', 'src/borg/cache_sync/unpack_template.h', 'src/borg/item.c', 'src/borg/algorithms/checksums.c', 'src/borg/algorithms/crc32_dispatch.c', 'src/borg/algorithms/crc32_clmul.c', 'src/borg/algorithms/crc32_slice_by_8.c', @@ -600,7 +602,7 @@ if not on_rtd: ext_modules += [ Extension('borg.compress', [compress_source], libraries=['lz4'], include_dirs=include_dirs, library_dirs=library_dirs, define_macros=define_macros), Extension('borg.crypto.low_level', [crypto_ll_source], libraries=crypto_libraries, include_dirs=include_dirs, library_dirs=library_dirs, define_macros=define_macros), - Extension('borg.hashindex', [hashindex_source], libraries=['msgpackc']), + Extension('borg.hashindex', [hashindex_source]), Extension('borg.item', [item_source]), Extension('borg.algorithms.chunker', [chunker_source]), Extension('borg.algorithms.checksums', [checksums_source]), diff --git a/src/borg/_cache.c b/src/borg/_cache.c deleted file mode 100644 index 880608ff..00000000 --- a/src/borg/_cache.c +++ /dev/null @@ -1,159 +0,0 @@ - -#include - -// 2**32 - 1025 -#define _MAX_VALUE ( (uint32_t) 4294966271 ) - -#define MIN(x, y) ((x) < (y) ? (x): (y)) - -typedef struct { - HashIndex *chunks; - - msgpack_unpacker unpacker; - msgpack_unpacked unpacked; - const char *error; -} CacheSyncCtx; - -static CacheSyncCtx * -cache_sync_init(HashIndex *chunks) -{ - CacheSyncCtx *ctx; - if (!(ctx = malloc(sizeof(CacheSyncCtx)))) { - return NULL; - } - - ctx->chunks = chunks; - ctx->error = NULL; - - if(!msgpack_unpacker_init(&ctx->unpacker, MSGPACK_UNPACKER_INIT_BUFFER_SIZE)) { - free(ctx); - return NULL; - } - - msgpack_unpacked_init(&ctx->unpacked); - - return ctx; -} - -static void -cache_sync_free(CacheSyncCtx *ctx) -{ - msgpack_unpacker_destroy(&ctx->unpacker); - msgpack_unpacked_destroy(&ctx->unpacked); - free(ctx); -} - -static const char * -cache_sync_error(CacheSyncCtx *ctx) -{ - return ctx->error; -} - -static int -cache_process_chunks(CacheSyncCtx *ctx, msgpack_object_array *array) -{ - uint32_t i; - const char *key; - uint32_t cache_values[3]; - uint32_t *cache_entry; - uint64_t refcount; - msgpack_object *current; - for (i = 0; i < array->size; i++) { - current = &array->ptr[i]; - - if (current->type != MSGPACK_OBJECT_ARRAY || current->via.array.size != 3 - || current->via.array.ptr[0].type != MSGPACK_OBJECT_STR || current->via.array.ptr[0].via.str.size != 32 - || current->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER - || current->via.array.ptr[2].type != MSGPACK_OBJECT_POSITIVE_INTEGER) { - ctx->error = "Malformed chunk list entry"; - return 0; - } - - key = current->via.array.ptr[0].via.str.ptr; - cache_entry = (uint32_t*) hashindex_get(ctx->chunks, key); - if (cache_entry) { - refcount = _le32toh(cache_entry[0]); - refcount += 1; - cache_entry[0] = _htole32(MIN(refcount, _MAX_VALUE)); - } else { - /* refcount, size, csize */ - cache_values[0] = 1; - cache_values[1] = current->via.array.ptr[1].via.u64; - cache_values[2] = current->via.array.ptr[2].via.u64; - if (!hashindex_set(ctx->chunks, key, cache_values)) { - ctx->error = "hashindex_set failed"; - return 0; - } - } - } - return 1; -} - -/** - * feed data to the cache synchronizer - * 0 = abort, 1 = continue - * abort is a regular condition, check cache_sync_error - */ -static int -cache_sync_feed(CacheSyncCtx *ctx, void *data, uint32_t length) -{ - msgpack_unpack_return unpack_status; - - /* grow buffer if necessary */ - if (msgpack_unpacker_buffer_capacity(&ctx->unpacker) < length) { - if (!msgpack_unpacker_reserve_buffer(&ctx->unpacker, length)) { - return 0; - } - } - - memcpy(msgpack_unpacker_buffer(&ctx->unpacker), data, length); - msgpack_unpacker_buffer_consumed(&ctx->unpacker, length); - - do { - unpack_status = msgpack_unpacker_next(&ctx->unpacker, &ctx->unpacked); - - switch (unpack_status) { - case MSGPACK_UNPACK_SUCCESS: - { - uint32_t i; - msgpack_object *item = &ctx->unpacked.data; - msgpack_object_kv *current; - - if (item->type != MSGPACK_OBJECT_MAP) { - ctx->error = "Unexpected data type in item stream"; - return 0; - } - - for (i = 0; i < item->via.map.size; i++) { - current = &item->via.map.ptr[i]; - - if (current->key.type != MSGPACK_OBJECT_STR) { - ctx->error = "Invalid key data type in item"; - return 0; - } - - if (current->key.via.str.size == 6 - && !memcmp(current->key.via.str.ptr, "chunks", 6)) { - - if (current->val.type != MSGPACK_OBJECT_ARRAY) { - ctx->error = "Unexpected value type of item chunks"; - return 0; - } - - if (!cache_process_chunks(ctx, ¤t->val.via.array)) { - return 0; - } - } - } - } - break; - case MSGPACK_UNPACK_PARSE_ERROR: - ctx->error = "Malformed msgpack"; - return 0; - default: - break; - } - } while (unpack_status != MSGPACK_UNPACK_CONTINUE); - - return 1; -} diff --git a/src/borg/_hashindex.c b/src/borg/_hashindex.c index b41d57b7..824a6eec 100644 --- a/src/borg/_hashindex.c +++ b/src/borg/_hashindex.c @@ -1,6 +1,6 @@ -#include #include +#include #include #include #include @@ -56,8 +56,10 @@ typedef struct { int lower_limit; int upper_limit; int min_empty; +#ifdef Py_PYTHON_H /* buckets may be backed by a Python buffer. If buckets_buffer.buf is NULL then this is not used. */ Py_buffer buckets_buffer; +#endif } HashIndex; /* prime (or w/ big prime factors) hash table sizes @@ -106,8 +108,11 @@ static int hash_sizes[] = { #define EPRINTF(msg, ...) fprintf(stderr, "hashindex: " msg "(%s)\n", ##__VA_ARGS__, strerror(errno)) #define EPRINTF_PATH(path, msg, ...) fprintf(stderr, "hashindex: %s: " msg " (%s)\n", path, ##__VA_ARGS__, strerror(errno)) +#ifdef Py_PYTHON_H static HashIndex *hashindex_read(PyObject *file_py); static void hashindex_write(HashIndex *index, PyObject *file_py); +#endif + static HashIndex *hashindex_init(int capacity, int key_size, int value_size); static const void *hashindex_get(HashIndex *index, const void *key); static int hashindex_set(HashIndex *index, const void *key, const void *value); @@ -120,9 +125,12 @@ static void hashindex_free(HashIndex *index); static void hashindex_free_buckets(HashIndex *index) { +#ifdef Py_PYTHON_H if(index->buckets_buffer.buf) { PyBuffer_Release(&index->buckets_buffer); - } else { + } else +#endif + { free(index->buckets); } } @@ -263,6 +271,7 @@ count_empty(HashIndex *index) /* Public API */ +#ifdef Py_PYTHON_H static HashIndex * hashindex_read(PyObject *file_py) { @@ -418,6 +427,7 @@ fail_decref_header: fail: return index; } +#endif static HashIndex * hashindex_init(int capacity, int key_size, int value_size) @@ -444,7 +454,9 @@ hashindex_init(int capacity, int key_size, int value_size) index->lower_limit = get_lower_limit(index->num_buckets); index->upper_limit = get_upper_limit(index->num_buckets); index->min_empty = get_min_empty(index->num_buckets); +#ifdef Py_PYTHON_H index->buckets_buffer.buf = NULL; +#endif for(i = 0; i < capacity; i++) { BUCKET_MARK_EMPTY(index, i); } @@ -458,7 +470,7 @@ hashindex_free(HashIndex *index) free(index); } - +#ifdef Py_PYTHON_H static void hashindex_write(HashIndex *index, PyObject *file_py) { @@ -521,6 +533,7 @@ hashindex_write(HashIndex *index, PyObject *file_py) return; } } +#endif static const void * hashindex_get(HashIndex *index, const void *key) diff --git a/src/borg/cache.py b/src/borg/cache.py index e34a3426..934f12a7 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -640,7 +640,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" # Do not make this an else branch; the FileIntegrityError exception handler # above can remove *archive_id* from *cached_ids*. logger.info('Fetching and building archive index for %s ...', archive_name) - archive_chunk_idx = ChunkIndex(master_index_capacity) + archive_chunk_idx = ChunkIndex() fetch_and_build_idx(archive_id, repository, self.key, archive_chunk_idx) logger.info("Merging into master chunks index ...") if chunk_idx is None: diff --git a/src/borg/cache_sync/cache_sync.c b/src/borg/cache_sync/cache_sync.c new file mode 100644 index 00000000..174e4b90 --- /dev/null +++ b/src/borg/cache_sync/cache_sync.c @@ -0,0 +1,108 @@ + +#include "unpack.h" + +typedef struct { + unpack_context ctx; + + char *buf; + size_t head; + size_t tail; + size_t size; +} CacheSyncCtx; + +static CacheSyncCtx * +cache_sync_init(HashIndex *chunks) +{ + CacheSyncCtx *ctx; + if (!(ctx = (CacheSyncCtx*)malloc(sizeof(CacheSyncCtx)))) { + return NULL; + } + + unpack_init(&ctx->ctx); + + ctx->ctx.user.chunks = chunks; + ctx->ctx.user.last_error = NULL; + ctx->ctx.user.level = 0; + ctx->ctx.user.inside_chunks = false; + ctx->buf = NULL; + ctx->head = 0; + ctx->tail = 0; + ctx->size = 0; + + return ctx; +} + +static void +cache_sync_free(CacheSyncCtx *ctx) +{ + if(ctx->buf) { + free(ctx->buf); + } + free(ctx); +} + +static const char * +cache_sync_error(CacheSyncCtx *ctx) +{ + return ctx->ctx.user.last_error; +} + +/** + * feed data to the cache synchronizer + * 0 = abort, 1 = continue + * abort is a regular condition, check cache_sync_error + */ +static int +cache_sync_feed(CacheSyncCtx *ctx, void *data, uint32_t length) +{ + size_t new_size; + int ret; + char *new_buf; + + if(ctx->tail + length > ctx->size) { + if((ctx->tail - ctx->head) + length <= ctx->size) { + /* | XXXXX| -> move data in buffer backwards -> |XXXXX | */ + memmove(ctx->buf, ctx->buf + ctx->head, ctx->tail - ctx->head); + ctx->tail -= ctx->head; + ctx->head = 0; + } else { + /* must expand buffer to fit all data */ + new_size = (ctx->tail - ctx->head) + length; + new_buf = (char*) malloc(new_size); + if(!new_buf) { + ctx->ctx.user.last_error = "cache_sync_feed: unable to allocate buffer"; + return 0; + } + memcpy(new_buf, ctx->buf + ctx->head, ctx->tail - ctx->head); + free(ctx->buf); + ctx->buf = new_buf; + ctx->tail -= ctx->head; + ctx->head = 0; + ctx->size = new_size; + } + } + + memcpy(ctx->buf + ctx->tail, data, length); + ctx->tail += length; + + while(1) { + if(ctx->head >= ctx->tail) { + return 1; /* request more bytes */ + } + + ret = unpack_execute(&ctx->ctx, ctx->buf, ctx->tail, &ctx->head); + if(ret == 1) { + unpack_init(&ctx->ctx); + continue; + } else if(ret == 0) { + return 1; + } else { + if(!ctx->ctx.user.last_error) { + ctx->ctx.user.last_error = "Unknown error"; + } + return 0; + } + } + /* unreachable */ + return 1; +} diff --git a/src/borg/cache_sync/sysdep.h b/src/borg/cache_sync/sysdep.h new file mode 100644 index 00000000..ed9c1bc0 --- /dev/null +++ b/src/borg/cache_sync/sysdep.h @@ -0,0 +1,194 @@ +/* + * MessagePack system dependencies + * + * Copyright (C) 2008-2010 FURUHASHI Sadayuki + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MSGPACK_SYSDEP_H__ +#define MSGPACK_SYSDEP_H__ + +#include +#include +#if defined(_MSC_VER) && _MSC_VER < 1600 +typedef __int8 int8_t; +typedef unsigned __int8 uint8_t; +typedef __int16 int16_t; +typedef unsigned __int16 uint16_t; +typedef __int32 int32_t; +typedef unsigned __int32 uint32_t; +typedef __int64 int64_t; +typedef unsigned __int64 uint64_t; +#elif defined(_MSC_VER) // && _MSC_VER >= 1600 +#include +#else +#include +#include +#endif + +#ifdef _WIN32 +#define _msgpack_atomic_counter_header +typedef long _msgpack_atomic_counter_t; +#define _msgpack_sync_decr_and_fetch(ptr) InterlockedDecrement(ptr) +#define _msgpack_sync_incr_and_fetch(ptr) InterlockedIncrement(ptr) +#elif defined(__GNUC__) && ((__GNUC__*10 + __GNUC_MINOR__) < 41) +#define _msgpack_atomic_counter_header "gcc_atomic.h" +#else +typedef unsigned int _msgpack_atomic_counter_t; +#define _msgpack_sync_decr_and_fetch(ptr) __sync_sub_and_fetch(ptr, 1) +#define _msgpack_sync_incr_and_fetch(ptr) __sync_add_and_fetch(ptr, 1) +#endif + +#ifdef _WIN32 + +#ifdef __cplusplus +/* numeric_limits::min,max */ +#ifdef max +#undef max +#endif +#ifdef min +#undef min +#endif +#endif + +#else +#include /* __BYTE_ORDER */ +#endif + +#if !defined(__LITTLE_ENDIAN__) && !defined(__BIG_ENDIAN__) +#if __BYTE_ORDER == __LITTLE_ENDIAN +#define __LITTLE_ENDIAN__ +#elif __BYTE_ORDER == __BIG_ENDIAN +#define __BIG_ENDIAN__ +#elif _WIN32 +#define __LITTLE_ENDIAN__ +#endif +#endif + + +#ifdef __LITTLE_ENDIAN__ + +#ifdef _WIN32 +# if defined(ntohs) +# define _msgpack_be16(x) ntohs(x) +# elif defined(_byteswap_ushort) || (defined(_MSC_VER) && _MSC_VER >= 1400) +# define _msgpack_be16(x) ((uint16_t)_byteswap_ushort((unsigned short)x)) +# else +# define _msgpack_be16(x) ( \ + ((((uint16_t)x) << 8) ) | \ + ((((uint16_t)x) >> 8) ) ) +# endif +#else +# define _msgpack_be16(x) ntohs(x) +#endif + +#ifdef _WIN32 +# if defined(ntohl) +# define _msgpack_be32(x) ntohl(x) +# elif defined(_byteswap_ulong) || (defined(_MSC_VER) && _MSC_VER >= 1400) +# define _msgpack_be32(x) ((uint32_t)_byteswap_ulong((unsigned long)x)) +# else +# define _msgpack_be32(x) \ + ( ((((uint32_t)x) << 24) ) | \ + ((((uint32_t)x) << 8) & 0x00ff0000U ) | \ + ((((uint32_t)x) >> 8) & 0x0000ff00U ) | \ + ((((uint32_t)x) >> 24) ) ) +# endif +#else +# define _msgpack_be32(x) ntohl(x) +#endif + +#if defined(_byteswap_uint64) || (defined(_MSC_VER) && _MSC_VER >= 1400) +# define _msgpack_be64(x) (_byteswap_uint64(x)) +#elif defined(bswap_64) +# define _msgpack_be64(x) bswap_64(x) +#elif defined(__DARWIN_OSSwapInt64) +# define _msgpack_be64(x) __DARWIN_OSSwapInt64(x) +#else +#define _msgpack_be64(x) \ + ( ((((uint64_t)x) << 56) ) | \ + ((((uint64_t)x) << 40) & 0x00ff000000000000ULL ) | \ + ((((uint64_t)x) << 24) & 0x0000ff0000000000ULL ) | \ + ((((uint64_t)x) << 8) & 0x000000ff00000000ULL ) | \ + ((((uint64_t)x) >> 8) & 0x00000000ff000000ULL ) | \ + ((((uint64_t)x) >> 24) & 0x0000000000ff0000ULL ) | \ + ((((uint64_t)x) >> 40) & 0x000000000000ff00ULL ) | \ + ((((uint64_t)x) >> 56) ) ) +#endif + +#define _msgpack_load16(cast, from) ((cast)( \ + (((uint16_t)((uint8_t*)(from))[0]) << 8) | \ + (((uint16_t)((uint8_t*)(from))[1]) ) )) + +#define _msgpack_load32(cast, from) ((cast)( \ + (((uint32_t)((uint8_t*)(from))[0]) << 24) | \ + (((uint32_t)((uint8_t*)(from))[1]) << 16) | \ + (((uint32_t)((uint8_t*)(from))[2]) << 8) | \ + (((uint32_t)((uint8_t*)(from))[3]) ) )) + +#define _msgpack_load64(cast, from) ((cast)( \ + (((uint64_t)((uint8_t*)(from))[0]) << 56) | \ + (((uint64_t)((uint8_t*)(from))[1]) << 48) | \ + (((uint64_t)((uint8_t*)(from))[2]) << 40) | \ + (((uint64_t)((uint8_t*)(from))[3]) << 32) | \ + (((uint64_t)((uint8_t*)(from))[4]) << 24) | \ + (((uint64_t)((uint8_t*)(from))[5]) << 16) | \ + (((uint64_t)((uint8_t*)(from))[6]) << 8) | \ + (((uint64_t)((uint8_t*)(from))[7]) ) )) + +#else + +#define _msgpack_be16(x) (x) +#define _msgpack_be32(x) (x) +#define _msgpack_be64(x) (x) + +#define _msgpack_load16(cast, from) ((cast)( \ + (((uint16_t)((uint8_t*)from)[0]) << 8) | \ + (((uint16_t)((uint8_t*)from)[1]) ) )) + +#define _msgpack_load32(cast, from) ((cast)( \ + (((uint32_t)((uint8_t*)from)[0]) << 24) | \ + (((uint32_t)((uint8_t*)from)[1]) << 16) | \ + (((uint32_t)((uint8_t*)from)[2]) << 8) | \ + (((uint32_t)((uint8_t*)from)[3]) ) )) + +#define _msgpack_load64(cast, from) ((cast)( \ + (((uint64_t)((uint8_t*)from)[0]) << 56) | \ + (((uint64_t)((uint8_t*)from)[1]) << 48) | \ + (((uint64_t)((uint8_t*)from)[2]) << 40) | \ + (((uint64_t)((uint8_t*)from)[3]) << 32) | \ + (((uint64_t)((uint8_t*)from)[4]) << 24) | \ + (((uint64_t)((uint8_t*)from)[5]) << 16) | \ + (((uint64_t)((uint8_t*)from)[6]) << 8) | \ + (((uint64_t)((uint8_t*)from)[7]) ) )) +#endif + + +#define _msgpack_store16(to, num) \ + do { uint16_t val = _msgpack_be16(num); memcpy(to, &val, 2); } while(0) +#define _msgpack_store32(to, num) \ + do { uint32_t val = _msgpack_be32(num); memcpy(to, &val, 4); } while(0) +#define _msgpack_store64(to, num) \ + do { uint64_t val = _msgpack_be64(num); memcpy(to, &val, 8); } while(0) + +/* +#define _msgpack_load16(cast, from) \ + ({ cast val; memcpy(&val, (char*)from, 2); _msgpack_be16(val); }) +#define _msgpack_load32(cast, from) \ + ({ cast val; memcpy(&val, (char*)from, 4); _msgpack_be32(val); }) +#define _msgpack_load64(cast, from) \ + ({ cast val; memcpy(&val, (char*)from, 8); _msgpack_be64(val); }) +*/ + + +#endif /* msgpack/sysdep.h */ diff --git a/src/borg/cache_sync/unpack.h b/src/borg/cache_sync/unpack.h new file mode 100644 index 00000000..0d71a940 --- /dev/null +++ b/src/borg/cache_sync/unpack.h @@ -0,0 +1,374 @@ +/* + * Borg cache synchronizer, + * based on a MessagePack for Python unpacking routine + * + * Copyright (C) 2009 Naoki INADA + * Copyright (c) 2017 Marian Beermann + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This limits the depth of the structures we can unpack, i.e. how many containers + * are nestable. + */ +#define MSGPACK_EMBED_STACK_SIZE (16) +#include "unpack_define.h" + +// 2**32 - 1025 +#define _MAX_VALUE ( (uint32_t) 4294966271 ) + +#define MIN(x, y) ((x) < (y) ? (x): (y)) + +#ifdef DEBUG +#define set_last_error(msg) \ + fprintf(stderr, "cache_sync parse error: %s\n", (msg)); \ + u->last_error = (msg); +#else +#define set_last_error(msg) \ + u->last_error = (msg); +#endif + +typedef struct unpack_user { + /* Item.chunks is at the top level; we don't care about anything else, + * only need to track the current level to navigate arbitrary and unknown structure. + * To discern keys from everything else on the top level we use expect_map_item_end. + */ + int level; + + const char *last_error; + + HashIndex *chunks; + + /* + * We don't care about most stuff. This flag tells us whether we're at the chunks structure, + * meaning: + * {'foo': 'bar', 'chunks': [...], 'stuff': ... } + * ^-HERE-^ + */ + int inside_chunks; + enum { + /* the next thing is a map key at the Item root level, + * and it might be the "chunks" key we're looking for */ + expect_chunks_map_key, + + /* blocking state to expect_chunks_map_key + * { 'stuff': , 'chunks': [ + * ecmk -> emie -> -> -> -> ecmk ecb eeboce + * (nested containers are tracked via level) + * ecmk=expect_chunks_map_key, emie=expect_map_item_end, ecb=expect_chunks_begin, + * eeboce=expect_entry_begin_or_chunks_end + */ + expect_map_item_end, + + /* next thing must be the chunks array (array) */ + expect_chunks_begin, + + /* next thing must either be another CLE (array) or end of Item.chunks (array_end) */ + expect_entry_begin_or_chunks_end, + + /* + * processing ChunkListEntry tuple: + * expect_key, expect_size, expect_csize, expect_entry_end + */ + /* next thing must be the key (raw, l=32) */ + expect_key, + /* next thing must be the size (int) */ + expect_size, + /* next thing must be the csize (int) */ + expect_csize, + /* next thing must be the end of the CLE (array_end) */ + expect_entry_end, + } expect; + + struct { + char key[32]; + uint32_t csize; + uint32_t size; + } current; +} unpack_user; + +struct unpack_context; +typedef struct unpack_context unpack_context; +typedef int (*execute_fn)(unpack_context *ctx, const char* data, size_t len, size_t* off); + +#define unexpected(what) \ + if(u->inside_chunks || u->expect == expect_chunks_map_key) { \ + set_last_error("Unexpected object: " what); \ + return -1; \ + } + +static inline int unpack_callback_int64(unpack_user* u, int64_t d) +{ + switch(u->expect) { + case expect_size: + u->current.size = d; + u->expect = expect_csize; + break; + case expect_csize: + u->current.csize = d; + u->expect = expect_entry_end; + break; + default: + unexpected("integer"); + } + return 0; +} + +static inline int unpack_callback_uint16(unpack_user* u, uint16_t d) +{ + return unpack_callback_int64(u, d); +} + +static inline int unpack_callback_uint8(unpack_user* u, uint8_t d) +{ + return unpack_callback_int64(u, d); +} + + +static inline int unpack_callback_uint32(unpack_user* u, uint32_t d) +{ + return unpack_callback_int64(u, d); +} + +static inline int unpack_callback_uint64(unpack_user* u, uint64_t d) +{ + return unpack_callback_int64(u, d); +} + +static inline int unpack_callback_int32(unpack_user* u, int32_t d) +{ + return unpack_callback_int64(u, d); +} + +static inline int unpack_callback_int16(unpack_user* u, int16_t d) +{ + return unpack_callback_int64(u, d); +} + +static inline int unpack_callback_int8(unpack_user* u, int8_t d) +{ + return unpack_callback_int64(u, d); +} + +/* Ain't got anything to do with those floats */ +static inline int unpack_callback_double(unpack_user* u, double d) +{ + (void)d; + unexpected("double"); + return 0; +} + +static inline int unpack_callback_float(unpack_user* u, float d) +{ + (void)d; + unexpected("float"); + return 0; +} + +/* nil/true/false — I/don't/care */ +static inline int unpack_callback_nil(unpack_user* u) +{ + unexpected("nil"); + return 0; +} + +static inline int unpack_callback_true(unpack_user* u) +{ + unexpected("true"); + return 0; +} + +static inline int unpack_callback_false(unpack_user* u) +{ + unexpected("false"); + return 0; +} + +static inline int unpack_callback_array(unpack_user* u, unsigned int n) +{ + switch(u->expect) { + case expect_chunks_begin: + /* b'chunks': [ + * ^ */ + u->expect = expect_entry_begin_or_chunks_end; + break; + case expect_entry_begin_or_chunks_end: + /* b'chunks': [ ( + * ^ */ + if(n != 3) { + set_last_error("Invalid chunk list entry length"); + return -1; + } + u->expect = expect_key; + break; + default: + if(u->inside_chunks) { + set_last_error("Unexpected array start"); + return -1; + } else { + u->level++; + return 0; + } + } + return 0; +} + +static inline int unpack_callback_array_item(unpack_user* u, unsigned int current) +{ + (void)u; (void)current; + return 0; +} + +static inline int unpack_callback_array_end(unpack_user* u) +{ + uint32_t *cache_entry; + uint32_t cache_values[3]; + uint64_t refcount; + + switch(u->expect) { + case expect_entry_end: + /* b'chunks': [ ( b'1234...', 123, 345 ) + * ^ */ + cache_entry = (uint32_t*) hashindex_get(u->chunks, u->current.key); + if (cache_entry) { + refcount = _le32toh(cache_entry[0]); + refcount += 1; + cache_entry[0] = _htole32(MIN(refcount, _MAX_VALUE)); + } else { + /* refcount, size, csize */ + cache_values[0] = _htole32(1); + cache_values[1] = _htole32(u->current.size); + cache_values[2] = _htole32(u->current.csize); + if (!hashindex_set(u->chunks, u->current.key, cache_values)) { + set_last_error("hashindex_set failed"); + return -1; + } + } + + u->expect = expect_entry_begin_or_chunks_end; + break; + case expect_entry_begin_or_chunks_end: + /* b'chunks': [ ] + * ^ */ + /* end of Item.chunks */ + u->inside_chunks = 0; + u->expect = expect_map_item_end; + break; + default: + if(u->inside_chunks) { + set_last_error("Invalid state transition (unexpected array end)"); + return -1; + } else { + u->level--; + return 0; + } + } + + return 0; +} + +static inline int unpack_callback_map(unpack_user* u, unsigned int n) +{ + (void)n; + + + if(u->level == 0) { + /* This begins a new Item */ + u->expect = expect_chunks_map_key; + } + + if(u->inside_chunks) { + unexpected("map"); + } + + u->level++; + + return 0; +} + +static inline int unpack_callback_map_item(unpack_user* u, unsigned int current) +{ + (void)u; (void)current; + if(u->level == 1) { + switch(u->expect) { + case expect_map_item_end: + u->expect = expect_chunks_map_key; + break; + default: + set_last_error("Unexpected map item"); + return -1; + } + } + return 0; +} + +static inline int unpack_callback_map_end(unpack_user* u) +{ + u->level--; + if(u->inside_chunks) { + set_last_error("Unexpected map end"); + return -1; + } + return 0; +} + +static inline int unpack_callback_raw(unpack_user* u, const char* b, const char* p, unsigned int l) +{ + /* raw = what Borg uses for binary stuff and strings as well */ + /* Note: p points to an internal buffer which contains l bytes. */ + (void)b; + + switch(u->expect) { + case expect_key: + if(l != 32) { + set_last_error("Incorrect key length"); + return -1; + } + memcpy(u->current.key, p, 32); + u->expect = expect_size; + break; + case expect_chunks_map_key: + if(l == 6 && !memcmp("chunks", p, 6)) { + u->expect = expect_chunks_begin; + u->inside_chunks = 1; + } else { + u->expect = expect_map_item_end; + } + break; + default: + if(u->inside_chunks) { + set_last_error("Unexpected bytes in chunks structure"); + return -1; + } + } + + return 0; +} + +static inline int unpack_callback_bin(unpack_user* u, const char* b, const char* p, unsigned int l) +{ + (void)u; (void)b; (void)p; (void)l; + unexpected("bin"); + return 0; +} + +static inline int unpack_callback_ext(unpack_user* u, const char* base, const char* pos, + unsigned int length) +{ + (void)u; (void)base; (void)pos; (void)length; + unexpected("ext"); + return 0; +} + +#include "unpack_template.h" diff --git a/src/borg/cache_sync/unpack_define.h b/src/borg/cache_sync/unpack_define.h new file mode 100644 index 00000000..d681277b --- /dev/null +++ b/src/borg/cache_sync/unpack_define.h @@ -0,0 +1,95 @@ +/* + * MessagePack unpacking routine template + * + * Copyright (C) 2008-2010 FURUHASHI Sadayuki + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MSGPACK_UNPACK_DEFINE_H__ +#define MSGPACK_UNPACK_DEFINE_H__ + +#include "sysdep.h" +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + + +#ifndef MSGPACK_EMBED_STACK_SIZE +#define MSGPACK_EMBED_STACK_SIZE 32 +#endif + + +// CS is first byte & 0x1f +typedef enum { + CS_HEADER = 0x00, // nil + + //CS_ = 0x01, + //CS_ = 0x02, // false + //CS_ = 0x03, // true + + CS_BIN_8 = 0x04, + CS_BIN_16 = 0x05, + CS_BIN_32 = 0x06, + + CS_EXT_8 = 0x07, + CS_EXT_16 = 0x08, + CS_EXT_32 = 0x09, + + CS_FLOAT = 0x0a, + CS_DOUBLE = 0x0b, + CS_UINT_8 = 0x0c, + CS_UINT_16 = 0x0d, + CS_UINT_32 = 0x0e, + CS_UINT_64 = 0x0f, + CS_INT_8 = 0x10, + CS_INT_16 = 0x11, + CS_INT_32 = 0x12, + CS_INT_64 = 0x13, + + //CS_FIXEXT1 = 0x14, + //CS_FIXEXT2 = 0x15, + //CS_FIXEXT4 = 0x16, + //CS_FIXEXT8 = 0x17, + //CS_FIXEXT16 = 0x18, + + CS_RAW_8 = 0x19, + CS_RAW_16 = 0x1a, + CS_RAW_32 = 0x1b, + CS_ARRAY_16 = 0x1c, + CS_ARRAY_32 = 0x1d, + CS_MAP_16 = 0x1e, + CS_MAP_32 = 0x1f, + + ACS_RAW_VALUE, + ACS_BIN_VALUE, + ACS_EXT_VALUE, +} msgpack_unpack_state; + + +typedef enum { + CT_ARRAY_ITEM, + CT_MAP_KEY, + CT_MAP_VALUE, +} msgpack_container_type; + + +#ifdef __cplusplus +} +#endif + +#endif /* msgpack/unpack_define.h */ diff --git a/src/borg/cache_sync/unpack_template.h b/src/borg/cache_sync/unpack_template.h new file mode 100644 index 00000000..aa7a4c0b --- /dev/null +++ b/src/borg/cache_sync/unpack_template.h @@ -0,0 +1,359 @@ +/* + * MessagePack unpacking routine template + * + * Copyright (C) 2008-2010 FURUHASHI Sadayuki + * Copyright (c) 2017 Marian Beermann + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef USE_CASE_RANGE +#if !defined(_MSC_VER) +#define USE_CASE_RANGE +#endif +#endif + +typedef struct unpack_stack { + size_t size; + size_t count; + unsigned int ct; +} unpack_stack; + +struct unpack_context { + unpack_user user; + unsigned int cs; + unsigned int trail; + unsigned int top; + unpack_stack stack[MSGPACK_EMBED_STACK_SIZE]; +}; + +static inline void unpack_init(unpack_context* ctx) +{ + ctx->cs = CS_HEADER; + ctx->trail = 0; + ctx->top = 0; +} + +#define construct 1 + +static inline int unpack_execute(unpack_context* ctx, const char* data, size_t len, size_t* off) +{ + assert(len >= *off); + + const unsigned char* p = (unsigned char*)data + *off; + const unsigned char* const pe = (unsigned char*)data + len; + const void* n = NULL; + + unsigned int trail = ctx->trail; + unsigned int cs = ctx->cs; + unsigned int top = ctx->top; + unpack_stack* stack = ctx->stack; + unpack_user* user = &ctx->user; + + unpack_stack* c = NULL; + + int ret; + +#define construct_cb(name) \ + construct && unpack_callback ## name + +#define push_simple_value(func) \ + if(construct_cb(func)(user) < 0) { goto _failed; } \ + goto _push +#define push_fixed_value(func, arg) \ + if(construct_cb(func)(user, arg) < 0) { goto _failed; } \ + goto _push +#define push_variable_value(func, base, pos, len) \ + if(construct_cb(func)(user, \ + (const char*)base, (const char*)pos, len) < 0) { goto _failed; } \ + goto _push + +#define again_fixed_trail(_cs, trail_len) \ + trail = trail_len; \ + cs = _cs; \ + goto _fixed_trail_again +#define again_fixed_trail_if_zero(_cs, trail_len, ifzero) \ + trail = trail_len; \ + if(trail == 0) { goto ifzero; } \ + cs = _cs; \ + goto _fixed_trail_again + +#define start_container(func, count_, ct_) \ + if(top >= MSGPACK_EMBED_STACK_SIZE) { goto _failed; } /* FIXME */ \ + if(construct_cb(func)(user, count_) < 0) { goto _failed; } \ + if((count_) == 0) { \ + if (construct_cb(func##_end)(user) < 0) { goto _failed; } \ + goto _push; } \ + stack[top].ct = ct_; \ + stack[top].size = count_; \ + stack[top].count = 0; \ + ++top; \ + goto _header_again + +#define NEXT_CS(p) ((unsigned int)*p & 0x1f) + +#ifdef USE_CASE_RANGE +#define SWITCH_RANGE_BEGIN switch(*p) { +#define SWITCH_RANGE(FROM, TO) case FROM ... TO: +#define SWITCH_RANGE_DEFAULT default: +#define SWITCH_RANGE_END } +#else +#define SWITCH_RANGE_BEGIN { if(0) { +#define SWITCH_RANGE(FROM, TO) } else if(FROM <= *p && *p <= TO) { +#define SWITCH_RANGE_DEFAULT } else { +#define SWITCH_RANGE_END } } +#endif + + if(p == pe) { goto _out; } + do { + switch(cs) { + case CS_HEADER: + SWITCH_RANGE_BEGIN + SWITCH_RANGE(0x00, 0x7f) // Positive Fixnum + push_fixed_value(_uint8, *(uint8_t*)p); + SWITCH_RANGE(0xe0, 0xff) // Negative Fixnum + push_fixed_value(_int8, *(int8_t*)p); + SWITCH_RANGE(0xc0, 0xdf) // Variable + switch(*p) { + case 0xc0: // nil + push_simple_value(_nil); + //case 0xc1: // never used + case 0xc2: // false + push_simple_value(_false); + case 0xc3: // true + push_simple_value(_true); + case 0xc4: // bin 8 + again_fixed_trail(NEXT_CS(p), 1); + case 0xc5: // bin 16 + again_fixed_trail(NEXT_CS(p), 2); + case 0xc6: // bin 32 + again_fixed_trail(NEXT_CS(p), 4); + case 0xc7: // ext 8 + again_fixed_trail(NEXT_CS(p), 1); + case 0xc8: // ext 16 + again_fixed_trail(NEXT_CS(p), 2); + case 0xc9: // ext 32 + again_fixed_trail(NEXT_CS(p), 4); + case 0xca: // float + case 0xcb: // double + case 0xcc: // unsigned int 8 + case 0xcd: // unsigned int 16 + case 0xce: // unsigned int 32 + case 0xcf: // unsigned int 64 + case 0xd0: // signed int 8 + case 0xd1: // signed int 16 + case 0xd2: // signed int 32 + case 0xd3: // signed int 64 + again_fixed_trail(NEXT_CS(p), 1 << (((unsigned int)*p) & 0x03)); + case 0xd4: // fixext 1 + case 0xd5: // fixext 2 + case 0xd6: // fixext 4 + case 0xd7: // fixext 8 + again_fixed_trail_if_zero(ACS_EXT_VALUE, + (1 << (((unsigned int)*p) & 0x03))+1, + _ext_zero); + case 0xd8: // fixext 16 + again_fixed_trail_if_zero(ACS_EXT_VALUE, 16+1, _ext_zero); + case 0xd9: // str 8 + again_fixed_trail(NEXT_CS(p), 1); + case 0xda: // raw 16 + case 0xdb: // raw 32 + case 0xdc: // array 16 + case 0xdd: // array 32 + case 0xde: // map 16 + case 0xdf: // map 32 + again_fixed_trail(NEXT_CS(p), 2 << (((unsigned int)*p) & 0x01)); + default: + goto _failed; + } + SWITCH_RANGE(0xa0, 0xbf) // FixRaw + again_fixed_trail_if_zero(ACS_RAW_VALUE, ((unsigned int)*p & 0x1f), _raw_zero); + SWITCH_RANGE(0x90, 0x9f) // FixArray + start_container(_array, ((unsigned int)*p) & 0x0f, CT_ARRAY_ITEM); + SWITCH_RANGE(0x80, 0x8f) // FixMap + start_container(_map, ((unsigned int)*p) & 0x0f, CT_MAP_KEY); + + SWITCH_RANGE_DEFAULT + goto _failed; + SWITCH_RANGE_END + // end CS_HEADER + + + _fixed_trail_again: + ++p; + + default: + if((size_t)(pe - p) < trail) { goto _out; } + n = p; p += trail - 1; + switch(cs) { + case CS_EXT_8: + again_fixed_trail_if_zero(ACS_EXT_VALUE, *(uint8_t*)n+1, _ext_zero); + case CS_EXT_16: + again_fixed_trail_if_zero(ACS_EXT_VALUE, + _msgpack_load16(uint16_t,n)+1, + _ext_zero); + case CS_EXT_32: + again_fixed_trail_if_zero(ACS_EXT_VALUE, + _msgpack_load32(uint32_t,n)+1, + _ext_zero); + case CS_FLOAT: { + union { uint32_t i; float f; } mem; + mem.i = _msgpack_load32(uint32_t,n); + push_fixed_value(_float, mem.f); } + case CS_DOUBLE: { + union { uint64_t i; double f; } mem; + mem.i = _msgpack_load64(uint64_t,n); +#if defined(__arm__) && !(__ARM_EABI__) // arm-oabi + // https://github.com/msgpack/msgpack-perl/pull/1 + mem.i = (mem.i & 0xFFFFFFFFUL) << 32UL | (mem.i >> 32UL); +#endif + push_fixed_value(_double, mem.f); } + case CS_UINT_8: + push_fixed_value(_uint8, *(uint8_t*)n); + case CS_UINT_16: + push_fixed_value(_uint16, _msgpack_load16(uint16_t,n)); + case CS_UINT_32: + push_fixed_value(_uint32, _msgpack_load32(uint32_t,n)); + case CS_UINT_64: + push_fixed_value(_uint64, _msgpack_load64(uint64_t,n)); + + case CS_INT_8: + push_fixed_value(_int8, *(int8_t*)n); + case CS_INT_16: + push_fixed_value(_int16, _msgpack_load16(int16_t,n)); + case CS_INT_32: + push_fixed_value(_int32, _msgpack_load32(int32_t,n)); + case CS_INT_64: + push_fixed_value(_int64, _msgpack_load64(int64_t,n)); + + case CS_BIN_8: + again_fixed_trail_if_zero(ACS_BIN_VALUE, *(uint8_t*)n, _bin_zero); + case CS_BIN_16: + again_fixed_trail_if_zero(ACS_BIN_VALUE, _msgpack_load16(uint16_t,n), _bin_zero); + case CS_BIN_32: + again_fixed_trail_if_zero(ACS_BIN_VALUE, _msgpack_load32(uint32_t,n), _bin_zero); + case ACS_BIN_VALUE: + _bin_zero: + push_variable_value(_bin, data, n, trail); + + case CS_RAW_8: + again_fixed_trail_if_zero(ACS_RAW_VALUE, *(uint8_t*)n, _raw_zero); + case CS_RAW_16: + again_fixed_trail_if_zero(ACS_RAW_VALUE, _msgpack_load16(uint16_t,n), _raw_zero); + case CS_RAW_32: + again_fixed_trail_if_zero(ACS_RAW_VALUE, _msgpack_load32(uint32_t,n), _raw_zero); + case ACS_RAW_VALUE: + _raw_zero: + push_variable_value(_raw, data, n, trail); + + case ACS_EXT_VALUE: + _ext_zero: + push_variable_value(_ext, data, n, trail); + + case CS_ARRAY_16: + start_container(_array, _msgpack_load16(uint16_t,n), CT_ARRAY_ITEM); + case CS_ARRAY_32: + /* FIXME security guard */ + start_container(_array, _msgpack_load32(uint32_t,n), CT_ARRAY_ITEM); + + case CS_MAP_16: + start_container(_map, _msgpack_load16(uint16_t,n), CT_MAP_KEY); + case CS_MAP_32: + /* FIXME security guard */ + start_container(_map, _msgpack_load32(uint32_t,n), CT_MAP_KEY); + + default: + goto _failed; + } + } + +_push: + if(top == 0) { goto _finish; } + c = &stack[top-1]; + switch(c->ct) { + case CT_ARRAY_ITEM: + if(construct_cb(_array_item)(user, c->count) < 0) { goto _failed; } + if(++c->count == c->size) { + if (construct_cb(_array_end)(user) < 0) { goto _failed; } + --top; + /*printf("stack pop %d\n", top);*/ + goto _push; + } + goto _header_again; + case CT_MAP_KEY: + c->ct = CT_MAP_VALUE; + goto _header_again; + case CT_MAP_VALUE: + if(construct_cb(_map_item)(user, c->count) < 0) { goto _failed; } + if(++c->count == c->size) { + if (construct_cb(_map_end)(user) < 0) { goto _failed; } + --top; + /*printf("stack pop %d\n", top);*/ + goto _push; + } + c->ct = CT_MAP_KEY; + goto _header_again; + + default: + goto _failed; + } + +_header_again: + cs = CS_HEADER; + ++p; + } while(p != pe); + goto _out; + + +_finish: + if (!construct) + unpack_callback_nil(user); + ++p; + ret = 1; + /* printf("-- finish --\n"); */ + goto _end; + +_failed: + /* printf("** FAILED **\n"); */ + ret = -1; + goto _end; + +_out: + ret = 0; + goto _end; + +_end: + ctx->cs = cs; + ctx->trail = trail; + ctx->top = top; + *off = p - (const unsigned char*)data; + + return ret; +#undef construct_cb +} + +#undef SWITCH_RANGE_BEGIN +#undef SWITCH_RANGE +#undef SWITCH_RANGE_DEFAULT +#undef SWITCH_RANGE_END +#undef push_simple_value +#undef push_fixed_value +#undef push_variable_value +#undef again_fixed_trail +#undef again_fixed_trail_if_zero +#undef start_container +#undef construct + +#undef NEXT_CS + +/* vim: set ts=4 sw=4 sts=4 expandtab */ diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index 75ba1df3..33868344 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -31,7 +31,7 @@ cdef extern from "_hashindex.c": double HASH_MAX_LOAD -cdef extern from "_cache.c": +cdef extern from "cache_sync/cache_sync.c": ctypedef struct CacheSyncCtx: pass @@ -403,5 +403,5 @@ cdef class CacheSynchronizer: def feed(self, chunk): if not cache_sync_feed(self.sync, chunk, len(chunk)): error = cache_sync_error(self.sync) - if error is not None: - raise Exception('cache_sync_feed failed: ' + error.decode('ascii')) + if error != NULL: + raise ValueError('cache_sync_feed failed: ' + error.decode('ascii')) diff --git a/src/borg/testsuite/cache.py b/src/borg/testsuite/cache.py new file mode 100644 index 00000000..690e50e3 --- /dev/null +++ b/src/borg/testsuite/cache.py @@ -0,0 +1,139 @@ + +from msgpack import packb + +import pytest + +from ..hashindex import ChunkIndex, CacheSynchronizer +from .hashindex import H + + +class TestCacheSynchronizer: + @pytest.fixture + def index(self): + return ChunkIndex() + + @pytest.fixture + def sync(self, index): + return CacheSynchronizer(index) + + def test_no_chunks(self, index, sync): + data = packb({ + 'foo': 'bar', + 'baz': 1234, + 'bar': 5678, + 'user': 'chunks', + 'chunks': [] + }) + sync.feed(data) + assert not len(index) + + def test_simple(self, index, sync): + data = packb({ + 'foo': 'bar', + 'baz': 1234, + 'bar': 5678, + 'user': 'chunks', + 'chunks': [ + (H(1), 1, 2), + (H(2), 2, 3), + ] + }) + sync.feed(data) + assert len(index) == 2 + assert index[H(1)] == (1, 1, 2) + assert index[H(2)] == (1, 2, 3) + + def test_multiple(self, index, sync): + data = packb({ + 'foo': 'bar', + 'baz': 1234, + 'bar': 5678, + 'user': 'chunks', + 'chunks': [ + (H(1), 1, 2), + (H(2), 2, 3), + ] + }) + data += packb({ + 'xattrs': { + 'security.foo': 'bar', + 'chunks': '123456', + }, + 'stuff': [ + (1, 2, 3), + ] + }) + data += packb({ + 'xattrs': { + 'security.foo': 'bar', + 'chunks': '123456', + }, + 'chunks': [ + (H(1), 1, 2), + (H(2), 2, 3), + ], + 'stuff': [ + (1, 2, 3), + ] + }) + data += packb({ + 'chunks': [ + (H(3), 1, 2), + ], + }) + data += packb({ + 'chunks': [ + (H(1), 1, 2), + ], + }) + + part1 = data[:70] + part2 = data[70:120] + part3 = data[120:] + sync.feed(part1) + sync.feed(part2) + sync.feed(part3) + assert len(index) == 3 + assert index[H(1)] == (3, 1, 2) + assert index[H(2)] == (2, 2, 3) + assert index[H(3)] == (1, 1, 2) + + @pytest.mark.parametrize('elem,error', ( + ({1: 2}, 'Unexpected object: map'), + (bytes(213), [ + 'Unexpected bytes in chunks structure', # structure 2/3 + 'Incorrect key length']), # structure 3/3 + (1, 'Unexpected object: integer'), + (1.0, 'Unexpected object: double'), + (True, 'Unexpected object: true'), + (False, 'Unexpected object: false'), + (None, 'Unexpected object: nil'), + )) + @pytest.mark.parametrize('structure', ( + lambda elem: {'chunks': elem}, + lambda elem: {'chunks': [elem]}, + lambda elem: {'chunks': [(elem, 1, 2)]}, + )) + def test_corrupted(self, sync, structure, elem, error): + packed = packb(structure(elem)) + with pytest.raises(ValueError) as excinfo: + sync.feed(packed) + if isinstance(error, str): + error = [error] + possible_errors = ['cache_sync_feed failed: ' + error for error in error] + assert str(excinfo.value) in possible_errors + + @pytest.mark.parametrize('data,error', ( + # Incorrect tuple length + ({'chunks': [(bytes(32), 2, 3, 4)]}, 'Invalid chunk list entry length'), + ({'chunks': [(bytes(32), 2)]}, 'Invalid chunk list entry length'), + # Incorrect types + ({'chunks': [(1, 2, 3)]}, 'Unexpected object: integer'), + ({'chunks': [(1, bytes(32), 2)]}, 'Unexpected object: integer'), + ({'chunks': [(bytes(32), 1.0, 2)]}, 'Unexpected object: double'), + )) + def test_corrupted_ancillary(self, index, sync, data, error): + packed = packb(data) + with pytest.raises(ValueError) as excinfo: + sync.feed(packed) + assert str(excinfo.value) == 'cache_sync_feed failed: ' + error