diff --git a/src/borg/archive.py b/src/borg/archive.py index 6efc304b7..bfc2d533c 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -260,7 +260,8 @@ class CacheChunkBuffer(ChunkBuffer): self.stats = stats def write_chunk(self, chunk): - id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk.data), chunk, self.stats) + id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk.data), chunk, self.stats, wait=False) + self.cache.repository.async_response(wait=False) return id_ @@ -469,6 +470,8 @@ Utilization of max. archive size: {csize_max:.0%} data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b'archive') self.id = self.key.id_hash(data) self.cache.add_chunk(self.id, Chunk(data), self.stats) + while self.repository.async_response(wait=True) is not None: + pass self.manifest.archives[name] = (self.id, metadata.time) self.manifest.write() self.repository.commit() @@ -730,18 +733,27 @@ Utilization of max. archive size: {csize_max:.0%} class ChunksIndexError(Error): """Chunk ID {} missing from chunks index, corrupted chunks index - aborting transaction.""" - def chunk_decref(id, stats): - nonlocal error + exception_ignored = object() + + def fetch_async_response(wait=True): try: - self.cache.chunk_decref(id, stats) - except KeyError: - cid = bin_to_hex(id) - raise ChunksIndexError(cid) + return self.repository.async_response(wait=wait) except Repository.ObjectNotFound as e: + nonlocal error # object not in repo - strange, but we wanted to delete it anyway. if forced == 0: raise error = True + return exception_ignored # must not return None here + + def chunk_decref(id, stats): + try: + self.cache.chunk_decref(id, stats, wait=False) + except KeyError: + cid = bin_to_hex(id) + raise ChunksIndexError(cid) + else: + fetch_async_response(wait=False) error = False try: @@ -778,6 +790,10 @@ Utilization of max. archive size: {csize_max:.0%} # some harmless exception. chunk_decref(self.id, stats) del self.manifest.archives[self.name] + while fetch_async_response(wait=True) is not None: + # we did async deletes, process outstanding results (== exceptions), + # so there is nothing pending when we return and our caller wants to commit. + pass if error: logger.warning('forced deletion succeeded, but the deleted archive was corrupted.') logger.warning('borg check --repair is required to free all space.') @@ -865,7 +881,9 @@ Utilization of max. archive size: {csize_max:.0%} def chunk_file(self, item, cache, stats, chunk_iter, chunk_processor=None, **chunk_kw): if not chunk_processor: def chunk_processor(data): - return cache.add_chunk(self.key.id_hash(data), Chunk(data, **chunk_kw), stats) + chunk_entry = cache.add_chunk(self.key.id_hash(data), Chunk(data, **chunk_kw), stats, wait=False) + self.cache.repository.async_response(wait=False) + return chunk_entry item.chunks = [] from_chunk = 0 @@ -1654,7 +1672,8 @@ class ArchiveRecreater: if Compressor.detect(old_chunk.data).name == compression_spec.name: # Stored chunk has the same compression we wanted overwrite = False - chunk_entry = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite) + chunk_entry = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite, wait=False) + self.cache.repository.async_response(wait=False) self.seen_chunks.add(chunk_entry.id) return chunk_entry diff --git a/src/borg/cache.py b/src/borg/cache.py index 79c0a4979..b3d7e12f2 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -524,7 +524,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" self.do_cache = os.path.isdir(archive_path) self.chunks = create_master_idx(self.chunks) - def add_chunk(self, id, chunk, stats, overwrite=False): + def add_chunk(self, id, chunk, stats, overwrite=False, wait=True): if not self.txn_active: self.begin_txn() size = len(chunk.data) @@ -533,7 +533,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" return self.chunk_incref(id, stats) data = self.key.encrypt(chunk) csize = len(data) - self.repository.put(id, data, wait=False) + self.repository.put(id, data, wait=wait) self.chunks.add(id, 1, size, csize) stats.update(size, csize, not refcount) return ChunkListEntry(id, size, csize) @@ -554,13 +554,13 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" stats.update(size, csize, False) return ChunkListEntry(id, size, csize) - def chunk_decref(self, id, stats): + def chunk_decref(self, id, stats, wait=True): if not self.txn_active: self.begin_txn() count, size, csize = self.chunks.decref(id) if count == 0: del self.chunks[id] - self.repository.delete(id, wait=False) + self.repository.delete(id, wait=wait) stats.update(-size, -csize, True) else: stats.update(-size, -csize, False) diff --git a/src/borg/remote.py b/src/borg/remote.py index 7bb9ece04..d3af82707 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -513,6 +513,8 @@ class RemoteRepository: self.chunkid_to_msgids = {} self.ignore_responses = set() self.responses = {} + self.async_responses = {} + self.shutdown_time = None self.ratelimit = SleepingBandwidthLimiter(args.remote_ratelimit * 1024 if args and args.remote_ratelimit else 0) self.unpacker = get_limited_unpacker('client') self.server_version = parse_version('1.0.8') # fallback version if server is too old to send version information @@ -604,6 +606,7 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. def __exit__(self, exc_type, exc_val, exc_tb): try: if exc_type is not None: + self.shutdown_time = time.monotonic() + 30 self.rollback() finally: # in any case, we want to cleanly close the repo, even if the @@ -670,8 +673,8 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. for resp in self.call_many(cmd, [args], **kw): return resp - def call_many(self, cmd, calls, wait=True, is_preloaded=False): - if not calls: + def call_many(self, cmd, calls, wait=True, is_preloaded=False, async_wait=True): + if not calls and cmd != 'async_responses': return def pop_preload_msgid(chunkid): @@ -714,6 +717,12 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. calls = list(calls) waiting_for = [] while wait or calls: + if self.shutdown_time and time.monotonic() > self.shutdown_time: + # we are shutting this RemoteRepository down already, make sure we do not waste + # a lot of time in case a lot of async stuff is coming in or remote is gone or slow. + logger.debug('shutdown_time reached, shutting down with %d waiting_for and %d async_responses.', + len(waiting_for), len(self.async_responses)) + return while waiting_for: try: unpacked = self.responses.pop(waiting_for[0]) @@ -726,6 +735,22 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. return except KeyError: break + if cmd == 'async_responses': + while True: + try: + msgid, unpacked = self.async_responses.popitem() + except KeyError: + # there is nothing left what we already have received + if async_wait and self.ignore_responses: + # but do not return if we shall wait and there is something left to wait for: + break + else: + return + else: + if b'exception_class' in unpacked: + handle_error(unpacked) + else: + yield unpacked[RESULT] if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT): w_fds = [self.stdin_fd] else: @@ -755,8 +780,14 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. raise UnexpectedRPCDataFormatFromServer(data) if msgid in self.ignore_responses: self.ignore_responses.remove(msgid) + # async methods never return values, but may raise exceptions. if b'exception_class' in unpacked: - handle_error(unpacked) + self.async_responses[msgid] = unpacked + else: + # we currently do not have async result values except "None", + # so we do not add them into async_responses. + if unpacked[RESULT] is not None: + self.async_responses[msgid] = unpacked else: self.responses[msgid] = unpacked elif fd is self.stderr_fd: @@ -805,7 +836,7 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. # that the fd should be writable if e.errno != errno.EAGAIN: raise - self.ignore_responses |= set(waiting_for) + self.ignore_responses |= set(waiting_for) # we lose order here @api(since=parse_version('1.0.0'), append_only={'since': parse_version('1.0.7'), 'previously': False}) @@ -883,6 +914,10 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. self.p.wait() self.p = None + def async_response(self, wait=True): + for resp in self.call_many('async_responses', calls=[], wait=True, async_wait=wait): + return resp + def preload(self, ids): self.preload_ids += ids diff --git a/src/borg/repository.py b/src/borg/repository.py index d7b84ab38..2073eec07 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -785,6 +785,7 @@ class Repository: self._active_txn = False def rollback(self): + # note: when used in remote mode, this is time limited, see RemoteRepository.shutdown_time. self._rollback(cleanup=False) def __len__(self): @@ -862,6 +863,11 @@ class Repository: yield self.get(id_) def put(self, id, data, wait=True): + """put a repo object + + Note: when doing calls with wait=False this gets async and caller must + deal with async results / exceptions later. + """ if not self._active_txn: self.prepare_txn(self.get_transaction_id()) try: @@ -881,6 +887,11 @@ class Repository: self.index[id] = segment, offset def delete(self, id, wait=True): + """delete a repo object + + Note: when doing calls with wait=False this gets async and caller must + deal with async results / exceptions later. + """ if not self._active_txn: self.prepare_txn(self.get_transaction_id()) try: @@ -895,6 +906,17 @@ class Repository: self.compact[segment] += size self.segments.setdefault(segment, 0) + def async_response(self, wait=True): + """Get one async result (only applies to remote repositories). + + async commands (== calls with wait=False, e.g. delete and put) have no results, + but may raise exceptions. These async exceptions must get collected later via + async_response() calls. Repeat the call until it returns None. + The previous calls might either return one (non-None) result or raise an exception. + If wait=True is given and there are outstanding responses, it will wait for them + to arrive. With wait=False, it will only return already received responses. + """ + def preload(self, ids): """Preload objects (only applies to remote repositories) """ diff --git a/src/borg/testsuite/archive.py b/src/borg/testsuite/archive.py index 7bf3f5b63..dc172d438 100644 --- a/src/borg/testsuite/archive.py +++ b/src/borg/testsuite/archive.py @@ -63,10 +63,15 @@ This archive: 20 B 10 B 10 B"" class MockCache: + class MockRepo: + def async_response(self, wait=True): + pass + def __init__(self): self.objects = {} + self.repository = self.MockRepo() - def add_chunk(self, id, chunk, stats=None): + def add_chunk(self, id, chunk, stats=None, wait=True): self.objects[id] = chunk.data return id, len(chunk.data), len(chunk.data) diff --git a/src/borg/testsuite/archiver.py b/src/borg/testsuite/archiver.py index 52cb05a6a..e44f2753b 100644 --- a/src/borg/testsuite/archiver.py +++ b/src/borg/testsuite/archiver.py @@ -1255,7 +1255,8 @@ class ArchiverTestCase(ArchiverTestCaseBase): repository.delete(first_chunk_id) repository.commit() break - self.cmd('delete', '--force', self.repository_location + '::test') + output = self.cmd('delete', '--force', self.repository_location + '::test') + self.assert_in('deleted archive was corrupted', output) self.cmd('check', '--repair', self.repository_location) output = self.cmd('list', self.repository_location) self.assert_not_in('test', output)