From 945880af47a3222db4bca32d8a347a09e72188ee Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sun, 5 Mar 2017 05:19:32 +0100 Subject: [PATCH 1/2] implement async_response, add wait=True for add_chunk/chunk_decref Before this changeset, async responses were: - if not an error: ignored - if an error: raised as response to the arbitrary/unrelated next command Now, after sending async commands, the async_response command must be used to process outstanding responses / exceptions. We are avoiding to pile up lots of stuff in cases of high latency, because we do NOT first wait until ALL responses have arrived, but we just can begin to process responses. Calls with wait=False will just return what we already have received. Repeated calls with wait=True until None is returned will fetch all responses. Async commands now actually could have non-exception non-None results, but this is not used yet. None responses are still dropped. The motivation for this is to have a clear separation between a request blowing up because it (itself) failed and failures unrelated to that request / to that line in the sourcecode. also: fix processing for async repo obj deletes exception_ignored is a special object used that is "not None" (as None is used to signal "finished with processing async results") but also not a potential async response result value. Also: added wait=True to chunk_decref() and add_chunk() this makes async processing explicit - the default is synchronous and you only need to be careful and do extra steps for async processing if you explicitly request async by calling with wait=False (usually for speed reasons). to process async results, use async_response, see above. --- src/borg/archive.py | 37 +++++++++++++++++++++++++--------- src/borg/cache.py | 8 ++++---- src/borg/remote.py | 35 ++++++++++++++++++++++++++++---- src/borg/repository.py | 21 +++++++++++++++++++ src/borg/testsuite/archive.py | 7 ++++++- src/borg/testsuite/archiver.py | 3 ++- 6 files changed, 92 insertions(+), 19 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 6efc304b7..bfc2d533c 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -260,7 +260,8 @@ class CacheChunkBuffer(ChunkBuffer): self.stats = stats def write_chunk(self, chunk): - id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk.data), chunk, self.stats) + id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk.data), chunk, self.stats, wait=False) + self.cache.repository.async_response(wait=False) return id_ @@ -469,6 +470,8 @@ Utilization of max. archive size: {csize_max:.0%} data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b'archive') self.id = self.key.id_hash(data) self.cache.add_chunk(self.id, Chunk(data), self.stats) + while self.repository.async_response(wait=True) is not None: + pass self.manifest.archives[name] = (self.id, metadata.time) self.manifest.write() self.repository.commit() @@ -730,18 +733,27 @@ Utilization of max. archive size: {csize_max:.0%} class ChunksIndexError(Error): """Chunk ID {} missing from chunks index, corrupted chunks index - aborting transaction.""" - def chunk_decref(id, stats): - nonlocal error + exception_ignored = object() + + def fetch_async_response(wait=True): try: - self.cache.chunk_decref(id, stats) - except KeyError: - cid = bin_to_hex(id) - raise ChunksIndexError(cid) + return self.repository.async_response(wait=wait) except Repository.ObjectNotFound as e: + nonlocal error # object not in repo - strange, but we wanted to delete it anyway. if forced == 0: raise error = True + return exception_ignored # must not return None here + + def chunk_decref(id, stats): + try: + self.cache.chunk_decref(id, stats, wait=False) + except KeyError: + cid = bin_to_hex(id) + raise ChunksIndexError(cid) + else: + fetch_async_response(wait=False) error = False try: @@ -778,6 +790,10 @@ Utilization of max. archive size: {csize_max:.0%} # some harmless exception. chunk_decref(self.id, stats) del self.manifest.archives[self.name] + while fetch_async_response(wait=True) is not None: + # we did async deletes, process outstanding results (== exceptions), + # so there is nothing pending when we return and our caller wants to commit. + pass if error: logger.warning('forced deletion succeeded, but the deleted archive was corrupted.') logger.warning('borg check --repair is required to free all space.') @@ -865,7 +881,9 @@ Utilization of max. archive size: {csize_max:.0%} def chunk_file(self, item, cache, stats, chunk_iter, chunk_processor=None, **chunk_kw): if not chunk_processor: def chunk_processor(data): - return cache.add_chunk(self.key.id_hash(data), Chunk(data, **chunk_kw), stats) + chunk_entry = cache.add_chunk(self.key.id_hash(data), Chunk(data, **chunk_kw), stats, wait=False) + self.cache.repository.async_response(wait=False) + return chunk_entry item.chunks = [] from_chunk = 0 @@ -1654,7 +1672,8 @@ class ArchiveRecreater: if Compressor.detect(old_chunk.data).name == compression_spec.name: # Stored chunk has the same compression we wanted overwrite = False - chunk_entry = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite) + chunk_entry = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite, wait=False) + self.cache.repository.async_response(wait=False) self.seen_chunks.add(chunk_entry.id) return chunk_entry diff --git a/src/borg/cache.py b/src/borg/cache.py index 79c0a4979..b3d7e12f2 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -524,7 +524,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" self.do_cache = os.path.isdir(archive_path) self.chunks = create_master_idx(self.chunks) - def add_chunk(self, id, chunk, stats, overwrite=False): + def add_chunk(self, id, chunk, stats, overwrite=False, wait=True): if not self.txn_active: self.begin_txn() size = len(chunk.data) @@ -533,7 +533,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" return self.chunk_incref(id, stats) data = self.key.encrypt(chunk) csize = len(data) - self.repository.put(id, data, wait=False) + self.repository.put(id, data, wait=wait) self.chunks.add(id, 1, size, csize) stats.update(size, csize, not refcount) return ChunkListEntry(id, size, csize) @@ -554,13 +554,13 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" stats.update(size, csize, False) return ChunkListEntry(id, size, csize) - def chunk_decref(self, id, stats): + def chunk_decref(self, id, stats, wait=True): if not self.txn_active: self.begin_txn() count, size, csize = self.chunks.decref(id) if count == 0: del self.chunks[id] - self.repository.delete(id, wait=False) + self.repository.delete(id, wait=wait) stats.update(-size, -csize, True) else: stats.update(-size, -csize, False) diff --git a/src/borg/remote.py b/src/borg/remote.py index 7bb9ece04..d38a66e51 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -513,6 +513,7 @@ class RemoteRepository: self.chunkid_to_msgids = {} self.ignore_responses = set() self.responses = {} + self.async_responses = {} self.ratelimit = SleepingBandwidthLimiter(args.remote_ratelimit * 1024 if args and args.remote_ratelimit else 0) self.unpacker = get_limited_unpacker('client') self.server_version = parse_version('1.0.8') # fallback version if server is too old to send version information @@ -670,8 +671,8 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. for resp in self.call_many(cmd, [args], **kw): return resp - def call_many(self, cmd, calls, wait=True, is_preloaded=False): - if not calls: + def call_many(self, cmd, calls, wait=True, is_preloaded=False, async_wait=True): + if not calls and cmd != 'async_responses': return def pop_preload_msgid(chunkid): @@ -726,6 +727,22 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. return except KeyError: break + if cmd == 'async_responses': + while True: + try: + msgid, unpacked = self.async_responses.popitem() + except KeyError: + # there is nothing left what we already have received + if async_wait and self.ignore_responses: + # but do not return if we shall wait and there is something left to wait for: + break + else: + return + else: + if b'exception_class' in unpacked: + handle_error(unpacked) + else: + yield unpacked[RESULT] if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT): w_fds = [self.stdin_fd] else: @@ -755,8 +772,14 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. raise UnexpectedRPCDataFormatFromServer(data) if msgid in self.ignore_responses: self.ignore_responses.remove(msgid) + # async methods never return values, but may raise exceptions. if b'exception_class' in unpacked: - handle_error(unpacked) + self.async_responses[msgid] = unpacked + else: + # we currently do not have async result values except "None", + # so we do not add them into async_responses. + if unpacked[RESULT] is not None: + self.async_responses[msgid] = unpacked else: self.responses[msgid] = unpacked elif fd is self.stderr_fd: @@ -805,7 +828,7 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. # that the fd should be writable if e.errno != errno.EAGAIN: raise - self.ignore_responses |= set(waiting_for) + self.ignore_responses |= set(waiting_for) # we lose order here @api(since=parse_version('1.0.0'), append_only={'since': parse_version('1.0.7'), 'previously': False}) @@ -883,6 +906,10 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. self.p.wait() self.p = None + def async_response(self, wait=True): + for resp in self.call_many('async_responses', calls=[], wait=True, async_wait=wait): + return resp + def preload(self, ids): self.preload_ids += ids diff --git a/src/borg/repository.py b/src/borg/repository.py index d7b84ab38..862cd0561 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -862,6 +862,11 @@ class Repository: yield self.get(id_) def put(self, id, data, wait=True): + """put a repo object + + Note: when doing calls with wait=False this gets async and caller must + deal with async results / exceptions later. + """ if not self._active_txn: self.prepare_txn(self.get_transaction_id()) try: @@ -881,6 +886,11 @@ class Repository: self.index[id] = segment, offset def delete(self, id, wait=True): + """delete a repo object + + Note: when doing calls with wait=False this gets async and caller must + deal with async results / exceptions later. + """ if not self._active_txn: self.prepare_txn(self.get_transaction_id()) try: @@ -895,6 +905,17 @@ class Repository: self.compact[segment] += size self.segments.setdefault(segment, 0) + def async_response(self, wait=True): + """Get one async result (only applies to remote repositories). + + async commands (== calls with wait=False, e.g. delete and put) have no results, + but may raise exceptions. These async exceptions must get collected later via + async_response() calls. Repeat the call until it returns None. + The previous calls might either return one (non-None) result or raise an exception. + If wait=True is given and there are outstanding responses, it will wait for them + to arrive. With wait=False, it will only return already received responses. + """ + def preload(self, ids): """Preload objects (only applies to remote repositories) """ diff --git a/src/borg/testsuite/archive.py b/src/borg/testsuite/archive.py index 7bf3f5b63..dc172d438 100644 --- a/src/borg/testsuite/archive.py +++ b/src/borg/testsuite/archive.py @@ -63,10 +63,15 @@ This archive: 20 B 10 B 10 B"" class MockCache: + class MockRepo: + def async_response(self, wait=True): + pass + def __init__(self): self.objects = {} + self.repository = self.MockRepo() - def add_chunk(self, id, chunk, stats=None): + def add_chunk(self, id, chunk, stats=None, wait=True): self.objects[id] = chunk.data return id, len(chunk.data), len(chunk.data) diff --git a/src/borg/testsuite/archiver.py b/src/borg/testsuite/archiver.py index 52cb05a6a..e44f2753b 100644 --- a/src/borg/testsuite/archiver.py +++ b/src/borg/testsuite/archiver.py @@ -1255,7 +1255,8 @@ class ArchiverTestCase(ArchiverTestCaseBase): repository.delete(first_chunk_id) repository.commit() break - self.cmd('delete', '--force', self.repository_location + '::test') + output = self.cmd('delete', '--force', self.repository_location + '::test') + self.assert_in('deleted archive was corrupted', output) self.cmd('check', '--repair', self.repository_location) output = self.cmd('list', self.repository_location) self.assert_not_in('test', output) From a9088135aa020311ac4f854e501c9ffbcaf8b002 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Tue, 21 Mar 2017 23:44:47 +0100 Subject: [PATCH 2/2] RemoteRepository: shutdown with timeout --- src/borg/remote.py | 8 ++++++++ src/borg/repository.py | 1 + 2 files changed, 9 insertions(+) diff --git a/src/borg/remote.py b/src/borg/remote.py index d38a66e51..d3af82707 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -514,6 +514,7 @@ class RemoteRepository: self.ignore_responses = set() self.responses = {} self.async_responses = {} + self.shutdown_time = None self.ratelimit = SleepingBandwidthLimiter(args.remote_ratelimit * 1024 if args and args.remote_ratelimit else 0) self.unpacker = get_limited_unpacker('client') self.server_version = parse_version('1.0.8') # fallback version if server is too old to send version information @@ -605,6 +606,7 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. def __exit__(self, exc_type, exc_val, exc_tb): try: if exc_type is not None: + self.shutdown_time = time.monotonic() + 30 self.rollback() finally: # in any case, we want to cleanly close the repo, even if the @@ -715,6 +717,12 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. calls = list(calls) waiting_for = [] while wait or calls: + if self.shutdown_time and time.monotonic() > self.shutdown_time: + # we are shutting this RemoteRepository down already, make sure we do not waste + # a lot of time in case a lot of async stuff is coming in or remote is gone or slow. + logger.debug('shutdown_time reached, shutting down with %d waiting_for and %d async_responses.', + len(waiting_for), len(self.async_responses)) + return while waiting_for: try: unpacked = self.responses.pop(waiting_for[0]) diff --git a/src/borg/repository.py b/src/borg/repository.py index 862cd0561..2073eec07 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -785,6 +785,7 @@ class Repository: self._active_txn = False def rollback(self): + # note: when used in remote mode, this is time limited, see RemoteRepository.shutdown_time. self._rollback(cleanup=False) def __len__(self):