From 3b928a455828ec15bf492fe599eb70c1ea326247 Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Wed, 14 Jun 2017 12:15:46 +0200 Subject: [PATCH] fuse: refactor ItemCache --- src/borg/fuse.py | 258 ++++++++++++++++++++++----------- src/borg/lrucache.py | 8 + src/borg/testsuite/lrucache.py | 3 + 3 files changed, 182 insertions(+), 87 deletions(-) diff --git a/src/borg/fuse.py b/src/borg/fuse.py index 00cdd7306..b2db06813 100644 --- a/src/borg/fuse.py +++ b/src/borg/fuse.py @@ -2,6 +2,7 @@ import errno import io import os import stat +import struct import sys import tempfile import time @@ -35,76 +36,162 @@ else: class ItemCache: - GROW_BY = 2 * 1024 * 1024 + """ + This is the "meat" of the file system's metadata storage. + + This class generates inode numbers that efficiently index items in archives, + and retrieves items from these inode numbers. + """ + + # Approximately ~230000 items (depends on the average number of items per metadata chunk) + # Since growing a bytearray has to copy it, growing it will converge to o(n), however, + # this is not yet relevant due to the swiftness of copying memory. If it becomes an issue, + # use an anonymous mmap and just resize that (or, if on 64 bit, make it so big you never need + # to resize it in the first place; that's free). + GROW_META_BY = 2 * 1024 * 1024 + + indirect_entry_struct = struct.Struct('=cII') + assert indirect_entry_struct.size == 9 def __init__(self, decrypted_repository): self.decrypted_repository = decrypted_repository - self.data = bytearray() - self.writeptr = 0 - self.fd = tempfile.TemporaryFile(prefix='borg-tmp') + # self.meta, the "meta-array" is a densely packed array of metadata about where items can be found. + # It is indexed by the inode number minus self.offset. (This is in a way eerily similar to how the first + # unices did this). + # The meta-array contains chunk IDs and item entries (described in inode_for_current_item). + # The chunk IDs are referenced by item entries through relative offsets, + # which are bounded by the metadata chunk size. + self.meta = bytearray() + # The current write offset in self.meta + self.write_offset = 0 + + # Offset added to meta-indices, resulting in an inode, + # or substracted from inodes, resulting in a meta-indices. self.offset = 1000000 - def new_stream(self): - self.stream_offset = 0 - self.chunk_begin = 0 - self.chunk_length = 0 - self.current_item = b'' + # A temporary file that contains direct items, i.e. items directly cached in this layer. + # These are items that span more than one chunk and thus cannot be efficiently cached + # by the object cache (self.decrypted_repository), which would require variable-length structures; + # possible but not worth the effort, see inode_for_current_item. + self.fd = tempfile.TemporaryFile(prefix='borg-tmp') - def set_current_id(self, chunk_id, chunk_length): - self.chunk_id = chunk_id - self.chunk_begin += self.chunk_length - self.chunk_length = chunk_length + # A small LRU cache for chunks requested by ItemCache.get() from the object cache, + # this significantly speeds up directory traversal and similar operations which + # tend to re-read the same chunks over and over. + # The capacity is kept low because increasing it does not provide any significant advantage, + # but makes LRUCache's square behaviour noticeable as well as consuming some memory. + self.chunks = LRUCache(capacity=10, dispose=lambda _: None) - def write_bytes(self, msgpacked_bytes): - self.current_item += msgpacked_bytes - self.stream_offset += len(msgpacked_bytes) - - def unpacked(self): - msgpacked_bytes = self.current_item - self.current_item = b'' - self.last_context_sensitive = self.stream_offset - len(msgpacked_bytes) <= self.chunk_begin - self.last_length = len(msgpacked_bytes) - self.last_item = msgpacked_bytes - - def inode_for_current_item(self): - if self.writeptr + 37 >= len(self.data): - self.data = self.data + bytes(self.GROW_BY) - - if self.last_context_sensitive: - pos = self.fd.seek(0, io.SEEK_END) - self.fd.write(self.last_item) - self.data[self.writeptr:self.writeptr+9] = b'S' + pos.to_bytes(8, 'little') - self.writeptr += 9 - return self.writeptr - 9 + self.offset - else: - self.data[self.writeptr:self.writeptr+1] = b'I' - self.data[self.writeptr+1:self.writeptr+33] = self.chunk_id - last_item_offset = self.stream_offset - self.last_length - last_item_offset -= self.chunk_begin - self.data[self.writeptr+33:self.writeptr+37] = last_item_offset.to_bytes(4, 'little') - self.writeptr += 37 - return self.writeptr - 37 + self.offset + # Instrumentation + # Count of indirect items, i.e. data is cached in the object cache, in this cache + self.indirect_items = 0 + # Count of direct items, i.e. data is in self.fd + self.direct_items = 0 def get(self, inode): offset = inode - self.offset if offset < 0: raise ValueError('ItemCache.get() called with an invalid inode number') - is_context_sensitive = self.data[offset] == ord(b'S') - - # print(is_context_sensitive) - if is_context_sensitive: - fd_offset = int.from_bytes(self.data[offset+1:offset+9], 'little') + if self.meta[offset] == ord(b'S'): + fd_offset = int.from_bytes(self.meta[offset + 1:offset + 9], 'little') self.fd.seek(fd_offset, io.SEEK_SET) return Item(internal_dict=next(msgpack.Unpacker(self.fd, read_size=1024))) else: - chunk_id = bytes(self.data[offset+1:offset+33]) - chunk_offset = int.from_bytes(self.data[offset+33:offset+37], 'little') - csize, chunk = next(self.decrypted_repository.get_many([chunk_id])) + _, chunk_id_relative_offset, chunk_offset = self.indirect_entry_struct.unpack_from(self.meta, offset) + chunk_id_offset = offset - chunk_id_relative_offset + # bytearray slices are bytearrays as well, explicitly convert to bytes() + chunk_id = bytes(self.meta[chunk_id_offset:chunk_id_offset + 32]) + chunk_offset = int.from_bytes(self.meta[offset + 5:offset + 9], 'little') + chunk = self.chunks.get(chunk_id) + if not chunk: + csize, chunk = next(self.decrypted_repository.get_many([chunk_id])) + self.chunks[chunk_id] = chunk data = memoryview(chunk)[chunk_offset:] unpacker = msgpack.Unpacker() unpacker.feed(data) return Item(internal_dict=next(unpacker)) + def iter_archive_items(self, archive_item_ids): + unpacker = msgpack.Unpacker() + + stream_offset = 0 + chunk_begin = 0 + last_chunk_length = 0 + msgpacked_bytes = b'' + + write_offset = self.write_offset + meta = self.meta + pack_indirect_into = self.indirect_entry_struct.pack_into + + def write_bytes(append_msgpacked_bytes): + nonlocal msgpacked_bytes + nonlocal stream_offset + msgpacked_bytes += append_msgpacked_bytes + stream_offset += len(append_msgpacked_bytes) + + for key, (csize, data) in zip(archive_item_ids, self.decrypted_repository.get_many(archive_item_ids)): + # Store the chunk ID in the meta-array + if write_offset + 32 >= len(meta): + self.meta = meta = meta + bytes(self.GROW_META_BY) + meta[write_offset:write_offset + 32] = key + current_id_offset = write_offset + write_offset += 32 + + # The chunk boundaries cannot be tracked through write_bytes, because the unpack state machine + # *can* and *will* consume partial items, so calls to write_bytes are unrelated to chunk boundaries. + chunk_begin += last_chunk_length + last_chunk_length = len(data) + + unpacker.feed(data) + while True: + try: + item = unpacker.unpack(write_bytes) + except msgpack.OutOfData: + # Need more data, feed the next chunk + break + + current_item_length = len(msgpacked_bytes) + current_spans_chunks = stream_offset - current_item_length <= chunk_begin + current_item = msgpacked_bytes + msgpacked_bytes = b'' + + if write_offset + 9 >= len(meta): + self.meta = meta = meta + bytes(self.GROW_META_BY) + + # item entries in the meta-array come in two different flavours, both nine bytes long. + # (1) for items that span chunks: + # + # 'S' + 8 byte offset into the self.fd file, where the msgpacked item starts. + # + # (2) for items that are completely contained in one chunk, which usually is the great majority + # (about 700:1 for system backups) + # + # 'I' + 4 byte offset where the chunk ID is + 4 byte offset in the chunk + # where the msgpacked items starts + # + # The chunk ID offset is the number of bytes _back_ from the start of the entry, i.e.: + # + # |Chunk ID| .... |S1234abcd| + # ^------ offset ----------^ + + if current_spans_chunks: + pos = self.fd.seek(0, io.SEEK_END) + self.fd.write(current_item) + meta[write_offset:write_offset + 9] = b'S' + pos.to_bytes(8, 'little') + write_offset += 9 + self.direct_items += 1 + inode = write_offset - 9 + self.offset + else: + item_offset = stream_offset - current_item_length - chunk_begin + pack_indirect_into(meta, write_offset, b'I', write_offset - current_id_offset, item_offset) + write_offset += 9 + self.indirect_items += 1 + inode = write_offset - 9 + self.offset + + yield inode, Item(internal_dict=item) + + self.write_offset = write_offset + class FuseOperations(llfuse.Operations): """Export archive as a fuse filesystem @@ -120,9 +207,17 @@ class FuseOperations(llfuse.Operations): self.args = args self.manifest = manifest self.key = key - self._inode_count = 0 + # Maps inode numbers to Item instances. This is used for synthetic inodes, + # i.e. file-system objects that are made up by FuseOperations and are not contained + # in the archives. For example archive directories or intermediate directories + # not contained in archives. self.items = {} + # _inode_count is the current count of synthetic inodes, i.e. those in self.items + self._inode_count = 0 + # Maps inode numbers to the inode number of the parent self.parent = {} + # Maps inode numbers to a dictionary mapping byte directory entry names to their inode numbers, + # i.e. this contains all dirents of everything that is mounted. (It becomes really big). self.contents = defaultdict(dict) self.default_uid = os.getuid() self.default_gid = os.getgid() @@ -150,15 +245,15 @@ class FuseOperations(llfuse.Operations): self.pending_archives[archive_inode] = archive_name def sig_info_handler(self, sig_no, stack): - logger.debug('fuse: %d inodes, %d synth inodes, %d edges (%s)', - self._inode_count, len(self.items), len(self.parent), + logger.debug('fuse: %d synth inodes, %d edges (%s)', + self._inode_count, len(self.parent), # getsizeof is the size of the dict itself; key and value are two small-ish integers, # which are shared due to code structure (this has been verified). format_file_size(sys.getsizeof(self.parent) + len(self.parent) * sys.getsizeof(self._inode_count))) logger.debug('fuse: %d pending archives', len(self.pending_archives)) - logger.debug('fuse: ItemCache %d entries, meta-array %s, dependent items %s', - self._inode_count - len(self.items), - format_file_size(sys.getsizeof(self.cache.data)), + logger.debug('fuse: ItemCache %d entries (%d direct, %d indirect), meta-array size %s, direct items size %s', + self.cache.direct_items + self.cache.indirect_items, self.cache.direct_items, self.cache.indirect_items, + format_file_size(sys.getsizeof(self.cache.meta)), format_file_size(os.stat(self.cache.fd.fileno()).st_size)) logger.debug('fuse: data cache: %d/%d entries, %s', len(self.data_cache.items()), self.data_cache._capacity, format_file_size(sum(len(chunk) for key, chunk in self.data_cache.items()))) @@ -212,42 +307,31 @@ class FuseOperations(llfuse.Operations): """ self.file_versions = {} # for versions mode: original path -> version t0 = time.perf_counter() - unpacker = msgpack.Unpacker() archive = Archive(self.repository_uncached, self.key, self.manifest, archive_name, consider_part_files=self.args.consider_part_files) - self.cache.new_stream() - for key, (csize, data) in zip(archive.metadata.items, self.decrypted_repository.get_many(archive.metadata.items)): - self.cache.set_current_id(key, len(data)) - unpacker.feed(data) - while True: + for item_inode, item in self.cache.iter_archive_items(archive.metadata.items): + path = os.fsencode(item.path) + is_dir = stat.S_ISDIR(item.mode) + if is_dir: try: - item = unpacker.unpack(self.cache.write_bytes) - except msgpack.OutOfData: - break - self.cache.unpacked() - item = Item(internal_dict=item) - path = os.fsencode(item.path) - is_dir = stat.S_ISDIR(item.mode) - if is_dir: - try: - # This can happen if an archive was created with a command line like - # $ borg create ... dir1/file dir1 - # In this case the code below will have created a default_dir inode for dir1 already. - inode = self._find_inode(path, prefix) - except KeyError: - pass - else: - self.items[inode] = item - continue - segments = prefix + path.split(b'/') - parent = 1 - for segment in segments[:-1]: - parent = self.process_inner(segment, parent) - self.process_leaf(segments[-1], item, parent, prefix, is_dir) + # This can happen if an archive was created with a command line like + # $ borg create ... dir1/file dir1 + # In this case the code below will have created a default_dir inode for dir1 already. + inode = self._find_inode(path, prefix) + except KeyError: + pass + else: + self.items[inode] = item + continue + segments = prefix + path.split(b'/') + parent = 1 + for segment in segments[:-1]: + parent = self.process_inner(segment, parent) + self.process_leaf(segments[-1], item, parent, prefix, is_dir, item_inode) duration = time.perf_counter() - t0 logger.debug('fuse: process_archive completed in %.1f s for archive %s', duration, archive.name) - def process_leaf(self, name, item, parent, prefix, is_dir): + def process_leaf(self, name, item, parent, prefix, is_dir, item_inode): def file_version(item): if 'chunks' in item: ident = 0 @@ -290,7 +374,7 @@ class FuseOperations(llfuse.Operations): item.nlink = item.get('nlink', 1) + 1 self.items[inode] = item else: - inode = self.cache.inode_for_current_item() + inode = item_inode self.parent[inode] = parent if name: self.contents[parent][name] = inode diff --git a/src/borg/lrucache.py b/src/borg/lrucache.py index 4d3ba73b7..03db283f5 100644 --- a/src/borg/lrucache.py +++ b/src/borg/lrucache.py @@ -28,6 +28,14 @@ class LRUCache: def __contains__(self, key): return key in self._cache + def get(self, key, default=None): + value = self._cache.get(key, default) + if value is default: + return value + self._lru.remove(key) + self._lru.append(key) + return value + def clear(self): for value in self._cache.values(): self._dispose(value) diff --git a/src/borg/testsuite/lrucache.py b/src/borg/testsuite/lrucache.py index 9fb4f92b6..eea171d64 100644 --- a/src/borg/testsuite/lrucache.py +++ b/src/borg/testsuite/lrucache.py @@ -19,7 +19,10 @@ class TestLRUCache: assert 'b' in c with pytest.raises(KeyError): c['a'] + assert c.get('a') is None + assert c.get('a', 'foo') == 'foo' assert c['b'] == 1 + assert c.get('b') == 1 assert c['c'] == 2 c['d'] = 3 assert len(c) == 2