implement async_response, add wait=True for add_chunk/chunk_decref

Before this changeset, async responses were:
- if not an error: ignored
- if an error: raised as response to the arbitrary/unrelated next command

Now, after sending async commands,  the async_response command must be used
to process outstanding responses / exceptions.

We are avoiding to pile up lots of stuff in cases of high latency, because we do NOT
first wait until ALL responses have arrived, but we just can begin to process responses.
Calls with wait=False will just return what we already have received.
Repeated calls with wait=True until None is returned will fetch all responses.

Async commands now actually could have non-exception non-None results, but
this is not used yet. None responses are still dropped.

The motivation for this is to have a clear separation between a request
blowing up because it (itself) failed and failures unrelated to that request /
to that line in the sourcecode.

also: fix processing for async repo obj deletes

exception_ignored is a special object used that is "not None" (as None is used to signal
"finished with processing async results") but also not a potential async response result value.

Also:

added wait=True to chunk_decref() and add_chunk()

this makes async processing explicit - the default is synchronous and you only
need to be careful and do extra steps for async processing if you explicitly
request async by calling with wait=False (usually for speed reasons).

to process async results, use async_response, see above.
This commit is contained in:
Thomas Waldmann 2017-03-05 05:19:32 +01:00
parent 713889dbfd
commit 945880af47
6 changed files with 92 additions and 19 deletions

View File

@ -260,7 +260,8 @@ class CacheChunkBuffer(ChunkBuffer):
self.stats = stats
def write_chunk(self, chunk):
id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk.data), chunk, self.stats)
id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk.data), chunk, self.stats, wait=False)
self.cache.repository.async_response(wait=False)
return id_
@ -469,6 +470,8 @@ Utilization of max. archive size: {csize_max:.0%}
data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b'archive')
self.id = self.key.id_hash(data)
self.cache.add_chunk(self.id, Chunk(data), self.stats)
while self.repository.async_response(wait=True) is not None:
pass
self.manifest.archives[name] = (self.id, metadata.time)
self.manifest.write()
self.repository.commit()
@ -730,18 +733,27 @@ Utilization of max. archive size: {csize_max:.0%}
class ChunksIndexError(Error):
"""Chunk ID {} missing from chunks index, corrupted chunks index - aborting transaction."""
def chunk_decref(id, stats):
nonlocal error
exception_ignored = object()
def fetch_async_response(wait=True):
try:
self.cache.chunk_decref(id, stats)
except KeyError:
cid = bin_to_hex(id)
raise ChunksIndexError(cid)
return self.repository.async_response(wait=wait)
except Repository.ObjectNotFound as e:
nonlocal error
# object not in repo - strange, but we wanted to delete it anyway.
if forced == 0:
raise
error = True
return exception_ignored # must not return None here
def chunk_decref(id, stats):
try:
self.cache.chunk_decref(id, stats, wait=False)
except KeyError:
cid = bin_to_hex(id)
raise ChunksIndexError(cid)
else:
fetch_async_response(wait=False)
error = False
try:
@ -778,6 +790,10 @@ Utilization of max. archive size: {csize_max:.0%}
# some harmless exception.
chunk_decref(self.id, stats)
del self.manifest.archives[self.name]
while fetch_async_response(wait=True) is not None:
# we did async deletes, process outstanding results (== exceptions),
# so there is nothing pending when we return and our caller wants to commit.
pass
if error:
logger.warning('forced deletion succeeded, but the deleted archive was corrupted.')
logger.warning('borg check --repair is required to free all space.')
@ -865,7 +881,9 @@ Utilization of max. archive size: {csize_max:.0%}
def chunk_file(self, item, cache, stats, chunk_iter, chunk_processor=None, **chunk_kw):
if not chunk_processor:
def chunk_processor(data):
return cache.add_chunk(self.key.id_hash(data), Chunk(data, **chunk_kw), stats)
chunk_entry = cache.add_chunk(self.key.id_hash(data), Chunk(data, **chunk_kw), stats, wait=False)
self.cache.repository.async_response(wait=False)
return chunk_entry
item.chunks = []
from_chunk = 0
@ -1654,7 +1672,8 @@ class ArchiveRecreater:
if Compressor.detect(old_chunk.data).name == compression_spec.name:
# Stored chunk has the same compression we wanted
overwrite = False
chunk_entry = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite)
chunk_entry = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite, wait=False)
self.cache.repository.async_response(wait=False)
self.seen_chunks.add(chunk_entry.id)
return chunk_entry

View File

@ -524,7 +524,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
self.do_cache = os.path.isdir(archive_path)
self.chunks = create_master_idx(self.chunks)
def add_chunk(self, id, chunk, stats, overwrite=False):
def add_chunk(self, id, chunk, stats, overwrite=False, wait=True):
if not self.txn_active:
self.begin_txn()
size = len(chunk.data)
@ -533,7 +533,7 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
return self.chunk_incref(id, stats)
data = self.key.encrypt(chunk)
csize = len(data)
self.repository.put(id, data, wait=False)
self.repository.put(id, data, wait=wait)
self.chunks.add(id, 1, size, csize)
stats.update(size, csize, not refcount)
return ChunkListEntry(id, size, csize)
@ -554,13 +554,13 @@ Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
stats.update(size, csize, False)
return ChunkListEntry(id, size, csize)
def chunk_decref(self, id, stats):
def chunk_decref(self, id, stats, wait=True):
if not self.txn_active:
self.begin_txn()
count, size, csize = self.chunks.decref(id)
if count == 0:
del self.chunks[id]
self.repository.delete(id, wait=False)
self.repository.delete(id, wait=wait)
stats.update(-size, -csize, True)
else:
stats.update(-size, -csize, False)

View File

@ -513,6 +513,7 @@ class RemoteRepository:
self.chunkid_to_msgids = {}
self.ignore_responses = set()
self.responses = {}
self.async_responses = {}
self.ratelimit = SleepingBandwidthLimiter(args.remote_ratelimit * 1024 if args and args.remote_ratelimit else 0)
self.unpacker = get_limited_unpacker('client')
self.server_version = parse_version('1.0.8') # fallback version if server is too old to send version information
@ -670,8 +671,8 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
for resp in self.call_many(cmd, [args], **kw):
return resp
def call_many(self, cmd, calls, wait=True, is_preloaded=False):
if not calls:
def call_many(self, cmd, calls, wait=True, is_preloaded=False, async_wait=True):
if not calls and cmd != 'async_responses':
return
def pop_preload_msgid(chunkid):
@ -726,6 +727,22 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
return
except KeyError:
break
if cmd == 'async_responses':
while True:
try:
msgid, unpacked = self.async_responses.popitem()
except KeyError:
# there is nothing left what we already have received
if async_wait and self.ignore_responses:
# but do not return if we shall wait and there is something left to wait for:
break
else:
return
else:
if b'exception_class' in unpacked:
handle_error(unpacked)
else:
yield unpacked[RESULT]
if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
w_fds = [self.stdin_fd]
else:
@ -755,8 +772,14 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
raise UnexpectedRPCDataFormatFromServer(data)
if msgid in self.ignore_responses:
self.ignore_responses.remove(msgid)
# async methods never return values, but may raise exceptions.
if b'exception_class' in unpacked:
handle_error(unpacked)
self.async_responses[msgid] = unpacked
else:
# we currently do not have async result values except "None",
# so we do not add them into async_responses.
if unpacked[RESULT] is not None:
self.async_responses[msgid] = unpacked
else:
self.responses[msgid] = unpacked
elif fd is self.stderr_fd:
@ -805,7 +828,7 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
# that the fd should be writable
if e.errno != errno.EAGAIN:
raise
self.ignore_responses |= set(waiting_for)
self.ignore_responses |= set(waiting_for) # we lose order here
@api(since=parse_version('1.0.0'),
append_only={'since': parse_version('1.0.7'), 'previously': False})
@ -883,6 +906,10 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
self.p.wait()
self.p = None
def async_response(self, wait=True):
for resp in self.call_many('async_responses', calls=[], wait=True, async_wait=wait):
return resp
def preload(self, ids):
self.preload_ids += ids

View File

@ -862,6 +862,11 @@ class Repository:
yield self.get(id_)
def put(self, id, data, wait=True):
"""put a repo object
Note: when doing calls with wait=False this gets async and caller must
deal with async results / exceptions later.
"""
if not self._active_txn:
self.prepare_txn(self.get_transaction_id())
try:
@ -881,6 +886,11 @@ class Repository:
self.index[id] = segment, offset
def delete(self, id, wait=True):
"""delete a repo object
Note: when doing calls with wait=False this gets async and caller must
deal with async results / exceptions later.
"""
if not self._active_txn:
self.prepare_txn(self.get_transaction_id())
try:
@ -895,6 +905,17 @@ class Repository:
self.compact[segment] += size
self.segments.setdefault(segment, 0)
def async_response(self, wait=True):
"""Get one async result (only applies to remote repositories).
async commands (== calls with wait=False, e.g. delete and put) have no results,
but may raise exceptions. These async exceptions must get collected later via
async_response() calls. Repeat the call until it returns None.
The previous calls might either return one (non-None) result or raise an exception.
If wait=True is given and there are outstanding responses, it will wait for them
to arrive. With wait=False, it will only return already received responses.
"""
def preload(self, ids):
"""Preload objects (only applies to remote repositories)
"""

View File

@ -63,10 +63,15 @@ This archive: 20 B 10 B 10 B""
class MockCache:
class MockRepo:
def async_response(self, wait=True):
pass
def __init__(self):
self.objects = {}
self.repository = self.MockRepo()
def add_chunk(self, id, chunk, stats=None):
def add_chunk(self, id, chunk, stats=None, wait=True):
self.objects[id] = chunk.data
return id, len(chunk.data), len(chunk.data)

View File

@ -1255,7 +1255,8 @@ class ArchiverTestCase(ArchiverTestCaseBase):
repository.delete(first_chunk_id)
repository.commit()
break
self.cmd('delete', '--force', self.repository_location + '::test')
output = self.cmd('delete', '--force', self.repository_location + '::test')
self.assert_in('deleted archive was corrupted', output)
self.cmd('check', '--repair', self.repository_location)
output = self.cmd('list', self.repository_location)
self.assert_not_in('test', output)