fuse: refactor ItemCache

This commit is contained in:
Marian Beermann 2017-06-14 12:15:46 +02:00
parent 9fd79a9e56
commit 3b928a4558
3 changed files with 182 additions and 87 deletions

View File

@ -2,6 +2,7 @@ import errno
import io
import os
import stat
import struct
import sys
import tempfile
import time
@ -35,76 +36,162 @@ else:
class ItemCache:
GROW_BY = 2 * 1024 * 1024
"""
This is the "meat" of the file system's metadata storage.
This class generates inode numbers that efficiently index items in archives,
and retrieves items from these inode numbers.
"""
# Approximately ~230000 items (depends on the average number of items per metadata chunk)
# Since growing a bytearray has to copy it, growing it will converge to o(n), however,
# this is not yet relevant due to the swiftness of copying memory. If it becomes an issue,
# use an anonymous mmap and just resize that (or, if on 64 bit, make it so big you never need
# to resize it in the first place; that's free).
GROW_META_BY = 2 * 1024 * 1024
indirect_entry_struct = struct.Struct('=cII')
assert indirect_entry_struct.size == 9
def __init__(self, decrypted_repository):
self.decrypted_repository = decrypted_repository
self.data = bytearray()
self.writeptr = 0
self.fd = tempfile.TemporaryFile(prefix='borg-tmp')
# self.meta, the "meta-array" is a densely packed array of metadata about where items can be found.
# It is indexed by the inode number minus self.offset. (This is in a way eerily similar to how the first
# unices did this).
# The meta-array contains chunk IDs and item entries (described in inode_for_current_item).
# The chunk IDs are referenced by item entries through relative offsets,
# which are bounded by the metadata chunk size.
self.meta = bytearray()
# The current write offset in self.meta
self.write_offset = 0
# Offset added to meta-indices, resulting in an inode,
# or substracted from inodes, resulting in a meta-indices.
self.offset = 1000000
def new_stream(self):
self.stream_offset = 0
self.chunk_begin = 0
self.chunk_length = 0
self.current_item = b''
# A temporary file that contains direct items, i.e. items directly cached in this layer.
# These are items that span more than one chunk and thus cannot be efficiently cached
# by the object cache (self.decrypted_repository), which would require variable-length structures;
# possible but not worth the effort, see inode_for_current_item.
self.fd = tempfile.TemporaryFile(prefix='borg-tmp')
def set_current_id(self, chunk_id, chunk_length):
self.chunk_id = chunk_id
self.chunk_begin += self.chunk_length
self.chunk_length = chunk_length
# A small LRU cache for chunks requested by ItemCache.get() from the object cache,
# this significantly speeds up directory traversal and similar operations which
# tend to re-read the same chunks over and over.
# The capacity is kept low because increasing it does not provide any significant advantage,
# but makes LRUCache's square behaviour noticeable as well as consuming some memory.
self.chunks = LRUCache(capacity=10, dispose=lambda _: None)
def write_bytes(self, msgpacked_bytes):
self.current_item += msgpacked_bytes
self.stream_offset += len(msgpacked_bytes)
def unpacked(self):
msgpacked_bytes = self.current_item
self.current_item = b''
self.last_context_sensitive = self.stream_offset - len(msgpacked_bytes) <= self.chunk_begin
self.last_length = len(msgpacked_bytes)
self.last_item = msgpacked_bytes
def inode_for_current_item(self):
if self.writeptr + 37 >= len(self.data):
self.data = self.data + bytes(self.GROW_BY)
if self.last_context_sensitive:
pos = self.fd.seek(0, io.SEEK_END)
self.fd.write(self.last_item)
self.data[self.writeptr:self.writeptr+9] = b'S' + pos.to_bytes(8, 'little')
self.writeptr += 9
return self.writeptr - 9 + self.offset
else:
self.data[self.writeptr:self.writeptr+1] = b'I'
self.data[self.writeptr+1:self.writeptr+33] = self.chunk_id
last_item_offset = self.stream_offset - self.last_length
last_item_offset -= self.chunk_begin
self.data[self.writeptr+33:self.writeptr+37] = last_item_offset.to_bytes(4, 'little')
self.writeptr += 37
return self.writeptr - 37 + self.offset
# Instrumentation
# Count of indirect items, i.e. data is cached in the object cache, in this cache
self.indirect_items = 0
# Count of direct items, i.e. data is in self.fd
self.direct_items = 0
def get(self, inode):
offset = inode - self.offset
if offset < 0:
raise ValueError('ItemCache.get() called with an invalid inode number')
is_context_sensitive = self.data[offset] == ord(b'S')
# print(is_context_sensitive)
if is_context_sensitive:
fd_offset = int.from_bytes(self.data[offset+1:offset+9], 'little')
if self.meta[offset] == ord(b'S'):
fd_offset = int.from_bytes(self.meta[offset + 1:offset + 9], 'little')
self.fd.seek(fd_offset, io.SEEK_SET)
return Item(internal_dict=next(msgpack.Unpacker(self.fd, read_size=1024)))
else:
chunk_id = bytes(self.data[offset+1:offset+33])
chunk_offset = int.from_bytes(self.data[offset+33:offset+37], 'little')
csize, chunk = next(self.decrypted_repository.get_many([chunk_id]))
_, chunk_id_relative_offset, chunk_offset = self.indirect_entry_struct.unpack_from(self.meta, offset)
chunk_id_offset = offset - chunk_id_relative_offset
# bytearray slices are bytearrays as well, explicitly convert to bytes()
chunk_id = bytes(self.meta[chunk_id_offset:chunk_id_offset + 32])
chunk_offset = int.from_bytes(self.meta[offset + 5:offset + 9], 'little')
chunk = self.chunks.get(chunk_id)
if not chunk:
csize, chunk = next(self.decrypted_repository.get_many([chunk_id]))
self.chunks[chunk_id] = chunk
data = memoryview(chunk)[chunk_offset:]
unpacker = msgpack.Unpacker()
unpacker.feed(data)
return Item(internal_dict=next(unpacker))
def iter_archive_items(self, archive_item_ids):
unpacker = msgpack.Unpacker()
stream_offset = 0
chunk_begin = 0
last_chunk_length = 0
msgpacked_bytes = b''
write_offset = self.write_offset
meta = self.meta
pack_indirect_into = self.indirect_entry_struct.pack_into
def write_bytes(append_msgpacked_bytes):
nonlocal msgpacked_bytes
nonlocal stream_offset
msgpacked_bytes += append_msgpacked_bytes
stream_offset += len(append_msgpacked_bytes)
for key, (csize, data) in zip(archive_item_ids, self.decrypted_repository.get_many(archive_item_ids)):
# Store the chunk ID in the meta-array
if write_offset + 32 >= len(meta):
self.meta = meta = meta + bytes(self.GROW_META_BY)
meta[write_offset:write_offset + 32] = key
current_id_offset = write_offset
write_offset += 32
# The chunk boundaries cannot be tracked through write_bytes, because the unpack state machine
# *can* and *will* consume partial items, so calls to write_bytes are unrelated to chunk boundaries.
chunk_begin += last_chunk_length
last_chunk_length = len(data)
unpacker.feed(data)
while True:
try:
item = unpacker.unpack(write_bytes)
except msgpack.OutOfData:
# Need more data, feed the next chunk
break
current_item_length = len(msgpacked_bytes)
current_spans_chunks = stream_offset - current_item_length <= chunk_begin
current_item = msgpacked_bytes
msgpacked_bytes = b''
if write_offset + 9 >= len(meta):
self.meta = meta = meta + bytes(self.GROW_META_BY)
# item entries in the meta-array come in two different flavours, both nine bytes long.
# (1) for items that span chunks:
#
# 'S' + 8 byte offset into the self.fd file, where the msgpacked item starts.
#
# (2) for items that are completely contained in one chunk, which usually is the great majority
# (about 700:1 for system backups)
#
# 'I' + 4 byte offset where the chunk ID is + 4 byte offset in the chunk
# where the msgpacked items starts
#
# The chunk ID offset is the number of bytes _back_ from the start of the entry, i.e.:
#
# |Chunk ID| .... |S1234abcd|
# ^------ offset ----------^
if current_spans_chunks:
pos = self.fd.seek(0, io.SEEK_END)
self.fd.write(current_item)
meta[write_offset:write_offset + 9] = b'S' + pos.to_bytes(8, 'little')
write_offset += 9
self.direct_items += 1
inode = write_offset - 9 + self.offset
else:
item_offset = stream_offset - current_item_length - chunk_begin
pack_indirect_into(meta, write_offset, b'I', write_offset - current_id_offset, item_offset)
write_offset += 9
self.indirect_items += 1
inode = write_offset - 9 + self.offset
yield inode, Item(internal_dict=item)
self.write_offset = write_offset
class FuseOperations(llfuse.Operations):
"""Export archive as a fuse filesystem
@ -120,9 +207,17 @@ class FuseOperations(llfuse.Operations):
self.args = args
self.manifest = manifest
self.key = key
self._inode_count = 0
# Maps inode numbers to Item instances. This is used for synthetic inodes,
# i.e. file-system objects that are made up by FuseOperations and are not contained
# in the archives. For example archive directories or intermediate directories
# not contained in archives.
self.items = {}
# _inode_count is the current count of synthetic inodes, i.e. those in self.items
self._inode_count = 0
# Maps inode numbers to the inode number of the parent
self.parent = {}
# Maps inode numbers to a dictionary mapping byte directory entry names to their inode numbers,
# i.e. this contains all dirents of everything that is mounted. (It becomes really big).
self.contents = defaultdict(dict)
self.default_uid = os.getuid()
self.default_gid = os.getgid()
@ -150,15 +245,15 @@ class FuseOperations(llfuse.Operations):
self.pending_archives[archive_inode] = archive_name
def sig_info_handler(self, sig_no, stack):
logger.debug('fuse: %d inodes, %d synth inodes, %d edges (%s)',
self._inode_count, len(self.items), len(self.parent),
logger.debug('fuse: %d synth inodes, %d edges (%s)',
self._inode_count, len(self.parent),
# getsizeof is the size of the dict itself; key and value are two small-ish integers,
# which are shared due to code structure (this has been verified).
format_file_size(sys.getsizeof(self.parent) + len(self.parent) * sys.getsizeof(self._inode_count)))
logger.debug('fuse: %d pending archives', len(self.pending_archives))
logger.debug('fuse: ItemCache %d entries, meta-array %s, dependent items %s',
self._inode_count - len(self.items),
format_file_size(sys.getsizeof(self.cache.data)),
logger.debug('fuse: ItemCache %d entries (%d direct, %d indirect), meta-array size %s, direct items size %s',
self.cache.direct_items + self.cache.indirect_items, self.cache.direct_items, self.cache.indirect_items,
format_file_size(sys.getsizeof(self.cache.meta)),
format_file_size(os.stat(self.cache.fd.fileno()).st_size))
logger.debug('fuse: data cache: %d/%d entries, %s', len(self.data_cache.items()), self.data_cache._capacity,
format_file_size(sum(len(chunk) for key, chunk in self.data_cache.items())))
@ -212,42 +307,31 @@ class FuseOperations(llfuse.Operations):
"""
self.file_versions = {} # for versions mode: original path -> version
t0 = time.perf_counter()
unpacker = msgpack.Unpacker()
archive = Archive(self.repository_uncached, self.key, self.manifest, archive_name,
consider_part_files=self.args.consider_part_files)
self.cache.new_stream()
for key, (csize, data) in zip(archive.metadata.items, self.decrypted_repository.get_many(archive.metadata.items)):
self.cache.set_current_id(key, len(data))
unpacker.feed(data)
while True:
for item_inode, item in self.cache.iter_archive_items(archive.metadata.items):
path = os.fsencode(item.path)
is_dir = stat.S_ISDIR(item.mode)
if is_dir:
try:
item = unpacker.unpack(self.cache.write_bytes)
except msgpack.OutOfData:
break
self.cache.unpacked()
item = Item(internal_dict=item)
path = os.fsencode(item.path)
is_dir = stat.S_ISDIR(item.mode)
if is_dir:
try:
# This can happen if an archive was created with a command line like
# $ borg create ... dir1/file dir1
# In this case the code below will have created a default_dir inode for dir1 already.
inode = self._find_inode(path, prefix)
except KeyError:
pass
else:
self.items[inode] = item
continue
segments = prefix + path.split(b'/')
parent = 1
for segment in segments[:-1]:
parent = self.process_inner(segment, parent)
self.process_leaf(segments[-1], item, parent, prefix, is_dir)
# This can happen if an archive was created with a command line like
# $ borg create ... dir1/file dir1
# In this case the code below will have created a default_dir inode for dir1 already.
inode = self._find_inode(path, prefix)
except KeyError:
pass
else:
self.items[inode] = item
continue
segments = prefix + path.split(b'/')
parent = 1
for segment in segments[:-1]:
parent = self.process_inner(segment, parent)
self.process_leaf(segments[-1], item, parent, prefix, is_dir, item_inode)
duration = time.perf_counter() - t0
logger.debug('fuse: process_archive completed in %.1f s for archive %s', duration, archive.name)
def process_leaf(self, name, item, parent, prefix, is_dir):
def process_leaf(self, name, item, parent, prefix, is_dir, item_inode):
def file_version(item):
if 'chunks' in item:
ident = 0
@ -290,7 +374,7 @@ class FuseOperations(llfuse.Operations):
item.nlink = item.get('nlink', 1) + 1
self.items[inode] = item
else:
inode = self.cache.inode_for_current_item()
inode = item_inode
self.parent[inode] = parent
if name:
self.contents[parent][name] = inode

View File

@ -28,6 +28,14 @@ class LRUCache:
def __contains__(self, key):
return key in self._cache
def get(self, key, default=None):
value = self._cache.get(key, default)
if value is default:
return value
self._lru.remove(key)
self._lru.append(key)
return value
def clear(self):
for value in self._cache.values():
self._dispose(value)

View File

@ -19,7 +19,10 @@ class TestLRUCache:
assert 'b' in c
with pytest.raises(KeyError):
c['a']
assert c.get('a') is None
assert c.get('a', 'foo') == 'foo'
assert c['b'] == 1
assert c.get('b') == 1
assert c['c'] == 2
c['d'] = 3
assert len(c) == 2