From 6e011b935430ee2d4cf8953314692bbd8b057eee Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sat, 27 May 2017 21:50:28 +0200 Subject: [PATCH] cache: compact hashindex before writing to chunks.archive.d --- src/borg/_hashindex.c | 76 +++++++++++++++++++++++++++++---- src/borg/cache.py | 47 ++++++++++++++------ src/borg/hashindex.pyx | 18 +++++--- src/borg/helpers.py | 2 +- src/borg/remote.py | 4 +- src/borg/testsuite/hashindex.py | 18 +++++++- 6 files changed, 131 insertions(+), 34 deletions(-) diff --git a/src/borg/_hashindex.c b/src/borg/_hashindex.c index 824a6eec3..1fb30c0fa 100644 --- a/src/borg/_hashindex.c +++ b/src/borg/_hashindex.c @@ -109,10 +109,11 @@ static int hash_sizes[] = { #define EPRINTF_PATH(path, msg, ...) fprintf(stderr, "hashindex: %s: " msg " (%s)\n", path, ##__VA_ARGS__, strerror(errno)) #ifdef Py_PYTHON_H -static HashIndex *hashindex_read(PyObject *file_py); +static HashIndex *hashindex_read(PyObject *file_py, int permit_compact); static void hashindex_write(HashIndex *index, PyObject *file_py); #endif +static uint64_t hashindex_compact(HashIndex *index); static HashIndex *hashindex_init(int capacity, int key_size, int value_size); static const void *hashindex_get(HashIndex *index, const void *key); static int hashindex_set(HashIndex *index, const void *key, const void *value); @@ -273,7 +274,7 @@ count_empty(HashIndex *index) #ifdef Py_PYTHON_H static HashIndex * -hashindex_read(PyObject *file_py) +hashindex_read(PyObject *file_py, int permit_compact) { Py_ssize_t length, buckets_length, bytes_read; Py_buffer header_buffer; @@ -393,14 +394,16 @@ hashindex_read(PyObject *file_py) } index->buckets = index->buckets_buffer.buf; - index->min_empty = get_min_empty(index->num_buckets); - index->num_empty = count_empty(index); + if(!permit_compact) { + index->min_empty = get_min_empty(index->num_buckets); + index->num_empty = count_empty(index); - if(index->num_empty < index->min_empty) { - /* too many tombstones here / not enough empty buckets, do a same-size rebuild */ - if(!hashindex_resize(index, index->num_buckets)) { - PyErr_Format(PyExc_ValueError, "Failed to rebuild table"); - goto fail_free_buckets; + if(index->num_empty < index->min_empty) { + /* too many tombstones here / not enough empty buckets, do a same-size rebuild */ + if(!hashindex_resize(index, index->num_buckets)) { + PyErr_Format(PyExc_ValueError, "Failed to rebuild table"); + goto fail_free_buckets; + } } } @@ -620,6 +623,61 @@ hashindex_next_key(HashIndex *index, const void *key) return BUCKET_ADDR(index, idx); } +static uint64_t +hashindex_compact(HashIndex *index) +{ + int idx = 0; + int start_idx; + int begin_used_idx; + int empty_slot_count, count, buckets_to_copy; + int compact_tail_idx = 0; + uint64_t saved_size = (index->num_buckets - index->num_entries) * (uint64_t)index->bucket_size; + + if(index->num_buckets - index->num_entries == 0) { + /* already compact */ + return 0; + } + + while(idx < index->num_buckets) { + /* Phase 1: Find some empty slots */ + start_idx = idx; + while((BUCKET_IS_EMPTY(index, idx) || BUCKET_IS_DELETED(index, idx)) && idx < index->num_buckets) { + idx++; + } + + /* everything from start_idx to idx is empty or deleted */ + count = empty_slot_count = idx - start_idx; + begin_used_idx = idx; + + if(!empty_slot_count) { + memcpy(BUCKET_ADDR(index, compact_tail_idx), BUCKET_ADDR(index, idx), index->bucket_size); + idx++; + compact_tail_idx++; + continue; + } + + /* Phase 2: Find some non-empty/non-deleted slots we can move to the compact tail */ + + while(!(BUCKET_IS_EMPTY(index, idx) || BUCKET_IS_DELETED(index, idx)) && empty_slot_count && idx < index->num_buckets) { + idx++; + empty_slot_count--; + } + + buckets_to_copy = count - empty_slot_count; + + if(!buckets_to_copy) { + /* Nothing to move, reached end of the buckets array with no used buckets. */ + break; + } + + memcpy(BUCKET_ADDR(index, compact_tail_idx), BUCKET_ADDR(index, begin_used_idx), buckets_to_copy * index->bucket_size); + compact_tail_idx += buckets_to_copy; + } + + index->num_buckets = index->num_entries; + return saved_size; +} + static int hashindex_len(HashIndex *index) { diff --git a/src/borg/cache.py b/src/borg/cache.py index 7a9fce029..3f89d67a4 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -536,6 +536,10 @@ def sync(self): archive indexes. """ archive_path = os.path.join(self.path, 'chunks.archive.d') + # Instrumentation + processed_item_metadata_bytes = 0 + processed_item_metadata_chunks = 0 + compact_chunks_archive_saved_space = 0 def mkpath(id, suffix=''): id_hex = bin_to_hex(id) @@ -545,8 +549,10 @@ def mkpath(id, suffix=''): def cached_archives(): if self.do_cache: fns = os.listdir(archive_path) - # filenames with 64 hex digits == 256bit - return set(unhexlify(fn) for fn in fns if len(fn) == 64) + # filenames with 64 hex digits == 256bit, + # or compact indices which are 64 hex digits + ".compact" + return set(unhexlify(fn) for fn in fns if len(fn) == 64) | \ + set(unhexlify(fn[:64]) for fn in fns if len(fn) == 72 and fn.endswith('.compact')) else: return set() @@ -558,13 +564,21 @@ def cleanup_outdated(ids): cleanup_cached_archive(id) def cleanup_cached_archive(id): - os.unlink(mkpath(id)) try: + os.unlink(mkpath(id)) os.unlink(mkpath(id) + '.integrity') except FileNotFoundError: pass + try: + os.unlink(mkpath(id, suffix='.compact')) + os.unlink(mkpath(id, suffix='.compact') + '.integrity') + except FileNotFoundError: + pass def fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx): + nonlocal processed_item_metadata_bytes + nonlocal processed_item_metadata_chunks + nonlocal compact_chunks_archive_saved_space csize, data = decrypted_repository.get(archive_id) chunk_idx.add(archive_id, 1, len(data), csize) archive = ArchiveItem(internal_dict=msgpack.unpackb(data)) @@ -573,9 +587,12 @@ def fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx): sync = CacheSynchronizer(chunk_idx) for item_id, (csize, data) in zip(archive.items, decrypted_repository.get_many(archive.items)): chunk_idx.add(item_id, 1, len(data), csize) + processed_item_metadata_bytes += len(data) + processed_item_metadata_chunks += 1 sync.feed(data) if self.do_cache: - fn = mkpath(archive_id) + compact_chunks_archive_saved_space += chunk_idx.compact() + fn = mkpath(archive_id, suffix='.compact') fn_tmp = mkpath(archive_id, suffix='.tmp') try: with DetachedIntegrityCheckedFile(path=fn_tmp, write=True, @@ -612,7 +629,7 @@ def create_master_idx(chunk_idx): # due to hash table "resonance". master_index_capacity = int(len(self.repository) / ChunkIndex.MAX_LOAD_FACTOR) if archive_ids: - chunk_idx = None + chunk_idx = None if not self.do_cache else ChunkIndex(master_index_capacity) pi = ProgressIndicatorPercent(total=len(archive_ids), step=0.1, msg='%3.0f%% Syncing chunks cache. Processing archive %s', msgid='cache.sync') @@ -624,8 +641,12 @@ def create_master_idx(chunk_idx): archive_chunk_idx_path = mkpath(archive_id) logger.info("Reading cached archive chunk index for %s ...", archive_name) try: - with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path, write=False) as fd: - archive_chunk_idx = ChunkIndex.read(fd) + try: + with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path + '.compact', write=False) as fd: + archive_chunk_idx = ChunkIndex.read(fd, permit_compact=True) + except FileNotFoundError: + with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path, write=False) as fd: + archive_chunk_idx = ChunkIndex.read(fd) except FileIntegrityError as fie: logger.error('Cached archive chunk index of %s is corrupted: %s', archive_name, fie) # Delete it and fetch a new index @@ -639,18 +660,16 @@ def create_master_idx(chunk_idx): archive_chunk_idx = ChunkIndex() fetch_and_build_idx(archive_id, decrypted_repository, archive_chunk_idx) logger.info("Merging into master chunks index ...") - if chunk_idx is None: - # we just use the first archive's idx as starting point, - # to avoid growing the hash table from 0 size and also - # to save 1 merge call. - chunk_idx = archive_chunk_idx - else: - chunk_idx.merge(archive_chunk_idx) + chunk_idx.merge(archive_chunk_idx) else: chunk_idx = chunk_idx or ChunkIndex(master_index_capacity) logger.info('Fetching archive index for %s ...', archive_name) fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx) pi.finish() + logger.debug('Cache sync: processed %s bytes (%d chunks) of metadata', + format_file_size(processed_item_metadata_bytes), processed_item_metadata_chunks) + logger.debug('Cache sync: compact chunks.archive.d storage saved %s bytes', + format_file_size(compact_chunks_archive_saved_space)) logger.info('Done.') return chunk_idx diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index 338683440..f8c3f84bb 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -8,14 +8,14 @@ from libc.stdint cimport uint32_t, UINT32_MAX, uint64_t from libc.errno cimport errno from cpython.exc cimport PyErr_SetFromErrnoWithFilename -API_VERSION = '1.1_02' +API_VERSION = '1.1_03' cdef extern from "_hashindex.c": ctypedef struct HashIndex: pass - HashIndex *hashindex_read(object file_py) except * + HashIndex *hashindex_read(object file_py, int permit_compact) except * HashIndex *hashindex_init(int capacity, int key_size, int value_size) void hashindex_free(HashIndex *index) int hashindex_len(HashIndex *index) @@ -25,6 +25,7 @@ cdef extern from "_hashindex.c": void *hashindex_next_key(HashIndex *index, void *key) int hashindex_delete(HashIndex *index, void *key) int hashindex_set(HashIndex *index, void *key, void *value) + uint64_t hashindex_compact(HashIndex *index) uint32_t _htole32(uint32_t v) uint32_t _le32toh(uint32_t v) @@ -73,14 +74,14 @@ cdef class IndexBase: MAX_LOAD_FACTOR = HASH_MAX_LOAD MAX_VALUE = _MAX_VALUE - def __cinit__(self, capacity=0, path=None, key_size=32): + def __cinit__(self, capacity=0, path=None, key_size=32, permit_compact=False): self.key_size = key_size if path: if isinstance(path, (str, bytes)): with open(path, 'rb') as fd: - self.index = hashindex_read(fd) + self.index = hashindex_read(fd, permit_compact) else: - self.index = hashindex_read(path) + self.index = hashindex_read(path, permit_compact) assert self.index, 'hashindex_read() returned NULL with no exception set' else: self.index = hashindex_init(capacity, self.key_size, self.value_size) @@ -92,8 +93,8 @@ cdef class IndexBase: hashindex_free(self.index) @classmethod - def read(cls, path): - return cls(path=path) + def read(cls, path, permit_compact=False): + return cls(path=path, permit_compact=permit_compact) def write(self, path): if isinstance(path, (str, bytes)): @@ -140,6 +141,9 @@ cdef class IndexBase: """Return size (bytes) of hash table.""" return hashindex_size(self.index) + def compact(self): + return hashindex_compact(self.index) + cdef class NSIndex(IndexBase): diff --git a/src/borg/helpers.py b/src/borg/helpers.py index 7e4d4baf9..e47277e89 100644 --- a/src/borg/helpers.py +++ b/src/borg/helpers.py @@ -126,7 +126,7 @@ def check_python(): def check_extension_modules(): from . import platform, compress, item - if hashindex.API_VERSION != '1.1_02': + if hashindex.API_VERSION != '1.1_03': raise ExtensionModuleError if chunker.API_VERSION != '1.1_01': raise ExtensionModuleError diff --git a/src/borg/remote.py b/src/borg/remote.py index 63b5e817a..c316404db 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -642,8 +642,8 @@ def __exit__(self, exc_type, exc_val, exc_tb): # in any case, we want to cleanly close the repo, even if the # rollback can not succeed (e.g. because the connection was # already closed) and raised another exception: - logger.debug('RemoteRepository: %d bytes sent, %d bytes received, %d messages sent', - self.tx_bytes, self.rx_bytes, self.msgid) + logger.debug('RemoteRepository: %s bytes sent, %s bytes received, %d messages sent', + format_file_size(self.tx_bytes), format_file_size(self.rx_bytes), self.msgid) self.close() @property diff --git a/src/borg/testsuite/hashindex.py b/src/borg/testsuite/hashindex.py index 120c01b44..5550e1adc 100644 --- a/src/borg/testsuite/hashindex.py +++ b/src/borg/testsuite/hashindex.py @@ -4,7 +4,7 @@ import tempfile import zlib -from ..hashindex import NSIndex, ChunkIndex +from ..hashindex import NSIndex, ChunkIndex, ChunkIndexEntry from .. import hashindex from ..crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError from . import BaseTestCase @@ -156,6 +156,22 @@ def test_chunk_indexer(self): # the index should now be empty assert list(index.iteritems()) == [] + def test_vacuum(self): + idx1 = ChunkIndex() + idx1[H(1)] = 1, 100, 100 + idx1[H(2)] = 2, 200, 200 + idx1[H(3)] = 3, 300, 300 + idx1.compact() + assert idx1.size() == 18 + 3 * (32 + 3 * 4) + #with self.assert_raises(KeyError): + # idx1[H(1)] + data = list(idx1.iteritems()) + print(data) + assert (H(1), ChunkIndexEntry(1, 100, 100)) in data + assert (H(2), ChunkIndexEntry(2, 200, 200)) in data + assert (H(3), ChunkIndexEntry(3, 300, 300)) in data + + class HashIndexSizeTestCase(BaseTestCase): def test_size_on_disk(self):