From c791921951ace5d4f08fab510fb5d55ece69d168 Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Tue, 13 Jun 2017 20:03:39 +0200 Subject: [PATCH 1/3] fuse: instrument caches note: signal processing can be arbitrarily delayed; Python processes signals as soon as the code returns into the interpreter loop, which doesn't happen unless libfuse returns control, i.e. some request has been sent to the file system. --- src/borg/fuse.py | 21 +++++++++++++++++++-- src/borg/remote.py | 8 +++++++- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/src/borg/fuse.py b/src/borg/fuse.py index 4441c4063..b561bbb56 100644 --- a/src/borg/fuse.py +++ b/src/borg/fuse.py @@ -2,6 +2,7 @@ import io import os import stat +import sys import tempfile import time from collections import defaultdict @@ -16,7 +17,7 @@ logger = create_logger() from .archive import Archive -from .helpers import daemonize, hardlinkable +from .helpers import daemonize, hardlinkable, signal_handler, format_file_size from .item import Item from .lrucache import LRUCache @@ -97,6 +98,20 @@ def _create_filesystem(self): self.contents[1][os.fsencode(name)] = archive_inode self.pending_archives[archive_inode] = archive + def sig_info_handler(self, sig_no, stack): + logger.debug('fuse: %d inodes, %d synth inodes, %d edges (%s)', + self._inode_count, len(self.items), len(self.parent), + # getsizeof is the size of the dict itself; key and value are two small-ish integers, + # which are shared due to code structure (this has been verified). + format_file_size(sys.getsizeof(self.parent) + len(self.parent) * sys.getsizeof(self._inode_count))) + logger.debug('fuse: %d pending archives', len(self.pending_archives)) + logger.debug('fuse: ItemCache %d entries, %s', + self._inode_count - len(self.items), + format_file_size(os.stat(self.cache.fd.fileno()).st_size)) + logger.debug('fuse: data cache: %d/%d entries, %s', len(self.data_cache.items()), self.data_cache._capacity, + format_file_size(sum(len(chunk) for key, chunk in self.data_cache.items()))) + self.repository.log_instrumentation() + def mount(self, mountpoint, mount_options, foreground=False): """Mount filesystem on *mountpoint* with *mount_options*.""" options = ['fsname=borgfs', 'ro'] @@ -124,7 +139,9 @@ def mount(self, mountpoint, mount_options, foreground=False): # mirror. umount = False try: - signal = fuse_main() + with signal_handler('SIGUSR1', self.sig_info_handler), \ + signal_handler('SIGINFO', self.sig_info_handler): + signal = fuse_main() # no crash and no signal (or it's ^C and we're in the foreground) -> umount request umount = (signal is None or (signal == SIGINT and foreground)) finally: diff --git a/src/borg/remote.py b/src/borg/remote.py index a961384f0..670179a44 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -1088,6 +1088,9 @@ def get_many(self, keys, cache=True): for key, data in zip(keys, self.repository.get_many(keys)): yield self.transform(key, data) + def log_instrumentation(self): + pass + class RepositoryCache(RepositoryNoCache): """ @@ -1161,12 +1164,15 @@ def add_entry(self, key, data, cache): self.backoff() return transformed - def close(self): + def log_instrumentation(self): logger.debug('RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), ' '%d evictions, %d ENOSPC hit', len(self.cache), format_file_size(self.size), format_file_size(self.size_limit), self.hits, self.misses, self.slow_misses, self.slow_lat, self.evictions, self.enospc) + + def close(self): + self.log_instrumentation() self.cache.clear() shutil.rmtree(self.basedir) From 879f72f227f519827f455d11615fb49e69c64f3a Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Tue, 13 Jun 2017 23:06:00 +0200 Subject: [PATCH 2/3] fuse: log process_archive timing the easier alternative to "/bin/time stat mountpoint//..." --- src/borg/fuse.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/borg/fuse.py b/src/borg/fuse.py index b561bbb56..731197ce9 100644 --- a/src/borg/fuse.py +++ b/src/borg/fuse.py @@ -160,6 +160,7 @@ def process_archive(self, archive, prefix=[]): """ self.file_versions = {} # for versions mode: original path -> version unpacker = msgpack.Unpacker() + t0 = time.perf_counter() for key, chunk in zip(archive.metadata.items, self.repository.get_many(archive.metadata.items)): data = self.key.decrypt(key, chunk) unpacker.feed(data) @@ -183,6 +184,8 @@ def process_archive(self, archive, prefix=[]): for segment in segments[:-1]: parent = self.process_inner(segment, parent) self.process_leaf(segments[-1], item, parent, prefix, is_dir) + duration = time.perf_counter() - t0 + logger.debug('fuse: process_archive completed in %.1f s for archive %s', duration, archive.name) def process_leaf(self, name, item, parent, prefix, is_dir): def file_version(item): From ff05895b7e282c275c84ebd5224c63a8d7fd6f2e Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Tue, 13 Jun 2017 23:14:52 +0200 Subject: [PATCH 3/3] fuse: don't keep all Archive() instances around they're only needed inside process_archive, and not needed in general for pending_archives. --- src/borg/fuse.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/src/borg/fuse.py b/src/borg/fuse.py index 731197ce9..6239625cd 100644 --- a/src/borg/fuse.py +++ b/src/borg/fuse.py @@ -81,22 +81,18 @@ def __init__(self, key, repository, manifest, args, cached_repo): def _create_filesystem(self): self._create_dir(parent=1) # first call, create root dir (inode == 1) if self.args.location.archive: - archive = Archive(self.repository_uncached, self.key, self.manifest, self.args.location.archive, - consider_part_files=self.args.consider_part_files) - self.process_archive(archive) + self.process_archive(self.args.location.archive) else: archive_names = (x.name for x in self.manifest.archives.list_considering(self.args)) - for name in archive_names: - archive = Archive(self.repository_uncached, self.key, self.manifest, name, - consider_part_files=self.args.consider_part_files) + for archive_name in archive_names: if self.versions: # process archives immediately - self.process_archive(archive) + self.process_archive(archive_name) else: # lazy load archives, create archive placeholder inode archive_inode = self._create_dir(parent=1) - self.contents[1][os.fsencode(name)] = archive_inode - self.pending_archives[archive_inode] = archive + self.contents[1][os.fsencode(archive_name)] = archive_inode + self.pending_archives[archive_inode] = archive_name def sig_info_handler(self, sig_no, stack): logger.debug('fuse: %d inodes, %d synth inodes, %d edges (%s)', @@ -155,12 +151,14 @@ def _create_dir(self, parent): self.parent[ino] = parent return ino - def process_archive(self, archive, prefix=[]): + def process_archive(self, archive_name, prefix=[]): """Build fuse inode hierarchy from archive metadata """ self.file_versions = {} # for versions mode: original path -> version - unpacker = msgpack.Unpacker() t0 = time.perf_counter() + unpacker = msgpack.Unpacker() + archive = Archive(self.repository_uncached, self.key, self.manifest, archive_name, + consider_part_files=self.args.consider_part_files) for key, chunk in zip(archive.metadata.items, self.repository.get_many(archive.metadata.items)): data = self.key.decrypt(key, chunk) unpacker.feed(data) @@ -314,9 +312,9 @@ def getxattr(self, inode, name, ctx=None): def _load_pending_archive(self, inode): # Check if this is an archive we need to load - archive = self.pending_archives.pop(inode, None) - if archive: - self.process_archive(archive, [os.fsencode(archive.name)]) + archive_name = self.pending_archives.pop(inode, None) + if archive_name: + self.process_archive(archive_name, [os.fsencode(archive_name)]) def lookup(self, parent_inode, name, ctx=None): self._load_pending_archive(parent_inode)