borg/src/borg/cache_sync/cache_sync.c

139 lines
3.4 KiB
C

/*
* Borg cache synchronizer,
* high level interface.
*
* These routines parse msgpacked item metadata and update a HashIndex
* with all chunks that are referenced from the items.
*
* This file only contains some initialization and buffer management.
*
* The parser is split in two parts, somewhat similar to lexer/parser combinations:
*
* unpack_template.h munches msgpack and calls a specific callback for each object
* encountered (e.g. beginning of a map, an integer, a string, a map item etc.).
*
* unpack.h implements these callbacks and uses another state machine to
* extract chunk references from it.
*/
#include "unpack.h"
typedef struct {
unpack_context ctx;
char *buf;
size_t head;
size_t tail;
size_t size;
} CacheSyncCtx;
static CacheSyncCtx *
cache_sync_init(HashIndex *chunks)
{
CacheSyncCtx *ctx;
if (!(ctx = (CacheSyncCtx*)malloc(sizeof(CacheSyncCtx)))) {
return NULL;
}
unpack_init(&ctx->ctx);
/* needs to be set only once */
ctx->ctx.user.chunks = chunks;
ctx->ctx.user.totals.size = 0;
ctx->ctx.user.totals.num_files = 0;
ctx->buf = NULL;
ctx->head = 0;
ctx->tail = 0;
ctx->size = 0;
return ctx;
}
static void
cache_sync_free(CacheSyncCtx *ctx)
{
if(ctx->buf) {
free(ctx->buf);
}
free(ctx);
}
static const char *
cache_sync_error(const CacheSyncCtx *ctx)
{
return ctx->ctx.user.last_error;
}
static uint64_t
cache_sync_num_files_totals(const CacheSyncCtx *ctx)
{
return ctx->ctx.user.totals.num_files;
}
static uint64_t
cache_sync_size_totals(const CacheSyncCtx *ctx)
{
return ctx->ctx.user.totals.size;
}
/**
* feed data to the cache synchronizer
* 0 = abort, 1 = continue
* abort is a regular condition, check cache_sync_error
*/
static int
cache_sync_feed(CacheSyncCtx *ctx, void *data, uint32_t length)
{
size_t new_size;
int ret;
char *new_buf;
if(ctx->tail + length > ctx->size) {
if((ctx->tail - ctx->head) + length <= ctx->size) {
/* | XXXXX| -> move data in buffer backwards -> |XXXXX | */
memmove(ctx->buf, ctx->buf + ctx->head, ctx->tail - ctx->head);
ctx->tail -= ctx->head;
ctx->head = 0;
} else {
/* must expand buffer to fit all data */
new_size = (ctx->tail - ctx->head) + length;
new_buf = (char*) malloc(new_size);
if(!new_buf) {
ctx->ctx.user.last_error = "cache_sync_feed: unable to allocate buffer";
return 0;
}
if(ctx->buf) {
memcpy(new_buf, ctx->buf + ctx->head, ctx->tail - ctx->head);
free(ctx->buf);
}
ctx->buf = new_buf;
ctx->tail -= ctx->head;
ctx->head = 0;
ctx->size = new_size;
}
}
memcpy(ctx->buf + ctx->tail, data, length);
ctx->tail += length;
while(1) {
if(ctx->head >= ctx->tail) {
return 1; /* request more bytes */
}
ret = unpack_execute(&ctx->ctx, ctx->buf, ctx->tail, &ctx->head);
if(ret == 1) {
unpack_init(&ctx->ctx);
continue;
} else if(ret == 0) {
return 1;
} else {
if(!ctx->ctx.user.last_error) {
ctx->ctx.user.last_error = "Unknown error";
}
return 0;
}
}
/* unreachable */
return 1;
}