diff --git a/borg/archive.py b/borg/archive.py index 0aa92b4dc..f5a3d2964 100644 --- a/borg/archive.py +++ b/borg/archive.py @@ -834,7 +834,6 @@ class ArchiveChecker: raise i += 1 - repository = cache_if_remote(self.repository) if archive is None: # we need last N or all archives archive_items = sorted(self.manifest.archives.items(), reverse=True, @@ -848,37 +847,39 @@ class ArchiveChecker: archive_items = [item for item in self.manifest.archives.items() if item[0] == archive] num_archives = 1 end = 1 - for i, (name, info) in enumerate(archive_items[:end]): - logger.info('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives)) - archive_id = info[b'id'] - if archive_id not in self.chunks: - logger.error('Archive metadata block is missing!') - self.error_found = True - del self.manifest.archives[name] - continue - mark_as_possibly_superseded(archive_id) - cdata = self.repository.get(archive_id) - data = self.key.decrypt(archive_id, cdata) - archive = StableDict(msgpack.unpackb(data)) - if archive[b'version'] != 1: - raise Exception('Unknown archive metadata version') - decode_dict(archive, (b'name', b'hostname', b'username', b'time')) - archive[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in archive[b'cmdline']] - items_buffer = ChunkBuffer(self.key) - items_buffer.write_chunk = add_callback - for item in robust_iterator(archive): - if b'chunks' in item: - verify_file_chunks(item) - items_buffer.add(item) - items_buffer.flush(flush=True) - for previous_item_id in archive[b'items']: - mark_as_possibly_superseded(previous_item_id) - archive[b'items'] = items_buffer.chunks - data = msgpack.packb(archive, unicode_errors='surrogateescape') - new_archive_id = self.key.id_hash(data) - cdata = self.key.encrypt(data) - add_reference(new_archive_id, len(data), len(cdata), cdata) - info[b'id'] = new_archive_id + + with cache_if_remote(self.repository) as repository: + for i, (name, info) in enumerate(archive_items[:end]): + logger.info('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives)) + archive_id = info[b'id'] + if archive_id not in self.chunks: + logger.error('Archive metadata block is missing!') + self.error_found = True + del self.manifest.archives[name] + continue + mark_as_possibly_superseded(archive_id) + cdata = self.repository.get(archive_id) + data = self.key.decrypt(archive_id, cdata) + archive = StableDict(msgpack.unpackb(data)) + if archive[b'version'] != 1: + raise Exception('Unknown archive metadata version') + decode_dict(archive, (b'name', b'hostname', b'username', b'time')) + archive[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in archive[b'cmdline']] + items_buffer = ChunkBuffer(self.key) + items_buffer.write_chunk = add_callback + for item in robust_iterator(archive): + if b'chunks' in item: + verify_file_chunks(item) + items_buffer.add(item) + items_buffer.flush(flush=True) + for previous_item_id in archive[b'items']: + mark_as_possibly_superseded(previous_item_id) + archive[b'items'] = items_buffer.chunks + data = msgpack.packb(archive, unicode_errors='surrogateescape') + new_archive_id = self.key.id_hash(data) + cdata = self.key.encrypt(data) + add_reference(new_archive_id, len(data), len(cdata), cdata) + info[b'id'] = new_archive_id def orphan_chunks_check(self): if self.check_all: diff --git a/borg/archiver.py b/borg/archiver.py index 9cb7e3d1e..8791178b1 100644 --- a/borg/archiver.py +++ b/borg/archiver.py @@ -30,7 +30,7 @@ from .repository import Repository from .cache import Cache from .key import key_creator from .archive import Archive, ArchiveChecker, CHUNKER_PARAMS -from .remote import RepositoryServer, RemoteRepository +from .remote import RepositoryServer, RemoteRepository, cache_if_remote has_lchflags = hasattr(os, 'lchflags') @@ -380,18 +380,19 @@ class Archiver: repository = self.open_repository(args) try: - manifest, key = Manifest.load(repository) - if args.location.archive: - archive = Archive(repository, key, manifest, args.location.archive) - else: - archive = None - operations = FuseOperations(key, repository, manifest, archive) - logger.info("Mounting filesystem") - try: - operations.mount(args.mountpoint, args.options, args.foreground) - except RuntimeError: - # Relevant error message already printed to stderr by fuse - self.exit_code = EXIT_ERROR + with cache_if_remote(repository) as cached_repo: + manifest, key = Manifest.load(repository) + if args.location.archive: + archive = Archive(repository, key, manifest, args.location.archive) + else: + archive = None + operations = FuseOperations(key, repository, manifest, archive, cached_repo) + logger.info("Mounting filesystem") + try: + operations.mount(args.mountpoint, args.options, args.foreground) + except RuntimeError: + # Relevant error message already printed to stderr by fuse + self.exit_code = EXIT_ERROR finally: repository.close() return self.exit_code diff --git a/borg/cache.py b/borg/cache.py index eaefc990e..bfb817ab4 100644 --- a/borg/cache.py +++ b/borg/cache.py @@ -340,12 +340,12 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" pass self.begin_txn() - repository = cache_if_remote(self.repository) - legacy_cleanup() - # TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d - - # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk) - self.do_cache = os.path.isdir(archive_path) - self.chunks = create_master_idx(self.chunks) + with cache_if_remote(self.repository) as repository: + legacy_cleanup() + # TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d - + # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk) + self.do_cache = os.path.isdir(archive_path) + self.chunks = create_master_idx(self.chunks) def add_chunk(self, id, data, stats): if not self.txn_active: diff --git a/borg/fuse.py b/borg/fuse.py index 448fe02a4..36f761e40 100644 --- a/borg/fuse.py +++ b/borg/fuse.py @@ -8,7 +8,6 @@ import tempfile import time from .archive import Archive from .helpers import daemonize -from .remote import cache_if_remote import msgpack @@ -34,11 +33,11 @@ class ItemCache: class FuseOperations(llfuse.Operations): """Export archive as a fuse filesystem """ - def __init__(self, key, repository, manifest, archive): + def __init__(self, key, repository, manifest, archive, cached_repo): super().__init__() self._inode_count = 0 self.key = key - self.repository = cache_if_remote(repository) + self.repository = cached_repo self.items = {} self.parent = {} self.contents = defaultdict(dict) diff --git a/borg/remote.py b/borg/remote.py index 391a70fac..d8092ec71 100644 --- a/borg/remote.py +++ b/borg/remote.py @@ -359,21 +359,45 @@ class RemoteRepository: self.preload_ids += ids -class RepositoryCache: +class RepositoryNoCache: + """A not caching Repository wrapper, passes through to repository. + + Just to have same API (including the context manager) as RepositoryCache. + """ + def __init__(self, repository): + self.repository = repository + + def close(self): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def get(self, key): + return next(self.get_many([key])) + + def get_many(self, keys): + for data in self.repository.get_many(keys): + yield data + + +class RepositoryCache(RepositoryNoCache): """A caching Repository wrapper Caches Repository GET operations using a local temporary Repository. """ def __init__(self, repository): - self.repository = repository + super().__init__(repository) tmppath = tempfile.mkdtemp(prefix='borg-tmp') self.caching_repo = Repository(tmppath, create=True, exclusive=True) - def __del__(self): - self.caching_repo.destroy() - - def get(self, key): - return next(self.get_many([key])) + def close(self): + if self.caching_repo is not None: + self.caching_repo.destroy() + self.caching_repo = None def get_many(self, keys): unknown_keys = [key for key in keys if key not in self.caching_repo] @@ -395,4 +419,5 @@ class RepositoryCache: def cache_if_remote(repository): if isinstance(repository, RemoteRepository): return RepositoryCache(repository) - return repository + else: + return RepositoryNoCache(repository)