From 217da798ab47087b5cb58ec7140f5f562e90631f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Borgstr=C3=B6m?= Date: Mon, 12 Sep 2011 21:34:09 +0200 Subject: [PATCH] Added generic callback support --- darc/archive.py | 10 ++++++---- darc/archiver.py | 8 +++++--- darc/remote.py | 11 ++++++----- darc/store.py | 3 +++ 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/darc/archive.py b/darc/archive.py index 13fe4f26a..cc61c87e3 100644 --- a/darc/archive.py +++ b/darc/archive.py @@ -203,10 +203,12 @@ def extract_item(self, item, dest=None, start_cb=None, restore_attrs=True): os.makedirs(os.path.dirname(path)) # Hard link? if 'source' in item: - source = os.path.join(dest, item['source']) - if os.path.exists(path): - os.unlink(path) - os.link(source, path) + def link_cb(_, __, item): + source = os.path.join(dest, item['source']) + if os.path.exists(path): + os.unlink(path) + os.link(source, path) + self.store.add_callback(link_cb, item) else: def extract_cb(chunk, error, (id, i)): if i == 0: diff --git a/darc/archiver.py b/darc/archiver.py index 4550b580c..a0256ca4b 100644 --- a/darc/archiver.py +++ b/darc/archiver.py @@ -137,8 +137,10 @@ def extract_cb(item): else: archive.extract_item(item, args.dest, start_cb) if dirs and not item['path'].startswith(dirs[-1]['path']): - # Extract directories twice to make sure mtime is correctly restored - archive.extract_item(dirs.pop(-1), args.dest) + def cb(_, __, item): + # Extract directories twice to make sure mtime is correctly restored + archive.extract_item(item, args.dest) + store.add_callback(cb, dirs.pop(-1)) store = self.open_store(args.archive) key = Key(store) manifest = Manifest(store, key) @@ -343,7 +345,7 @@ def run(self, args=None): subparser.add_argument('src', metavar='SRC', type=location_validator(), help='Store/Archive to list contents of') - subparser= subparsers.add_parser('verify', parents=[common_parser]) + subparser = subparsers.add_parser('verify', parents=[common_parser]) subparser.set_defaults(func=self.do_verify) subparser.add_argument('-i', '--include', dest='patterns', type=IncludePattern, action='append', diff --git a/darc/remote.py b/darc/remote.py index 99d51cf5f..e36a3a488 100644 --- a/darc/remote.py +++ b/darc/remote.py @@ -125,7 +125,7 @@ def cmd(self, cmd, args, callback=None, callback_data=None): self.odata.append(msgpack.packb((1, self.msgid, cmd, args))) self.recursion += 1 if callback: - self.callbacks[self.msgid] = callback, callback_data + self.add_callback(callback, callback_data) if self.recursion > 1: self.recursion -= 1 return @@ -146,8 +146,7 @@ def cmd(self, cmd, args, callback=None, callback_data=None): self.recursion -= 1 return res else: - c, d = self.callbacks.pop(msgid, (None, None)) - if c: + for c, d in self.callbacks.pop(msgid, []): c(res, error, d) elif self.odata and self.channel.send_ready(): data = self.odata.pop(0) @@ -184,6 +183,9 @@ def put(self, id, data, callback=None, callback_data=None): def delete(self, id, callback=None, callback_data=None): return self.cmd('delete', (id, ), callback, callback_data) + def add_callback(self, cb, data): + self.callbacks.setdefault(self.msgid, []).append((cb, data)) + def flush_rpc(self, counter=None, backlog=0): counter = counter or self.notifier.enabled while counter > backlog: @@ -199,8 +201,7 @@ def flush_rpc(self, counter=None, backlog=0): self.unpacker.feed(self.channel.recv(BUFSIZE)) for type, msgid, error, res in self.unpacker: self.notifier.enabled.dec() - c, d = self.callbacks.pop(msgid, (None, None)) - if c: + for c, d in self.callbacks.pop(msgid, []): c(res, error, d) if msgid == self.msgid: return diff --git a/darc/store.py b/darc/store.py index 6a042e998..da98fe275 100644 --- a/darc/store.py +++ b/darc/store.py @@ -235,6 +235,9 @@ def delete(self, id): def flush_rpc(self, *args): pass + def add_callback(self, cb, data): + cb(None, None, data) + class LoggedIO(object):