From 3f20f8ed7dbfe59d1ef7e5ba1c034c79e0502e13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Borgstr=C3=B6m?= Date: Sun, 17 Jul 2011 23:53:23 +0200 Subject: [PATCH] Asynchronous extract and verify mode --- darc/archive.py | 62 +++++++++++++++++++++++++++++------------------- darc/archiver.py | 23 +++++++++++------- darc/remote.py | 44 ++++++++++++++++++++++++---------- darc/store.py | 3 +++ 4 files changed, 88 insertions(+), 44 deletions(-) diff --git a/darc/archive.py b/darc/archive.py index a38ef3160..05f66397b 100644 --- a/darc/archive.py +++ b/darc/archive.py @@ -126,7 +126,7 @@ class Archive(object): usize += size return osize, csize, usize - def extract_item(self, item, dest=None): + def extract_item(self, item, dest=None, start_cb=None): dest = dest or os.getcwdu() dir_stat_queue = [] assert item['path'][0] not in ('/', '\\', ':') @@ -159,17 +159,29 @@ class Archive(object): os.unlink(path) os.link(source, path) else: - with open(path, 'wb') as fd: - for id in item['chunks']: - try: - magic, data, hash = self.keychain.decrypt(self.store.get(NS_CHUNK, id)) - assert magic == PACKET_CHUNK - if self.keychain.id_hash(data) != id: - raise IntegrityError('chunk hash did not match') - fd.write(data) - except ValueError: - raise Exception('Invalid chunk checksum') - self.restore_attrs(path, item) + def extract_cb(chunk, error, (id, i, last)): + if i==0: + start_cb(item) + assert not error + magic, data, hash = self.keychain.decrypt(chunk) + assert magic == PACKET_CHUNK + if self.keychain.id_hash(data) != id: + raise IntegrityError('chunk hash did not match') + fd.write(data) + if last: + self.restore_attrs(path, item) + fd.close() + + fd = open(path, 'wb') + n = len(item['chunks']) + if n == 0: + start_cb(item) + self.restore_attrs(path, item) + fd.close() + else: + for i, id in enumerate(item['chunks']): + self.store.get(NS_CHUNK, id, callback=extract_cb, callback_data=(id, i, i==n-1)) + else: raise Exception('Unknown archive item type %r' % item['mode']) @@ -196,22 +208,24 @@ class Archive(object): # FIXME: We should really call futimes here (c extension required) os.utime(path, (item['atime'], item['mtime'])) - def verify_file(self, item): - def verify_chunk(chunk, error, id): + def verify_file(self, item, start, result): + def verify_chunk(chunk, error, (id, i, last)): + if i == 0: + start(item) assert not error magic, data, hash = self.keychain.decrypt(chunk) assert magic == PACKET_CHUNK if self.keychain.id_hash(data) != id: - raise IntegrityError() - try: - for id in item['chunks'][:-1]: - self.store.get(NS_CHUNK, id, callback=verify_chunk, callback_data=id) - last = item['chunks'][-1] - chunk = self.store.get(NS_CHUNK, last) - verify_chunk(chunk, None, last) - return True - except IntegrityError: - return False + result(item, False) + elif last: + result(item, True) + n = len(item['chunks']) + if n == 0: + start(item) + result(item, True) + else: + for i, id in enumerate(item['chunks']): + self.store.get(NS_CHUNK, id, callback=verify_chunk, callback_data=(id, i, i==n-1)) def delete(self, cache): for id, size in self.get_chunks(): diff --git a/darc/archiver.py b/darc/archiver.py index bc9477d83..16f418413 100644 --- a/darc/archiver.py +++ b/darc/archiver.py @@ -109,6 +109,8 @@ class Archiver(object): self.print_error('Unknown file type: %s', path) def do_extract(self, args): + def start_cb(item): + self.print_verbose(item['path'].decode('utf-8')) store = self.open_store(args.archive) keychain = Keychain(args.keychain) archive = Archive(store, keychain, args.archive.archive) @@ -116,13 +118,13 @@ class Archiver(object): for item in archive.get_items(): if exclude_path(item['path'], args.patterns): continue - self.print_verbose(item['path'].decode('utf-8')) - archive.extract_item(item, args.dest) + archive.extract_item(item, args.dest, start_cb) if stat.S_ISDIR(item['mode']): dirs.append(item) if dirs and not item['path'].startswith(dirs[-1]['path']): # Extract directories twice to make sure mtime is correctly restored archive.extract_item(dirs.pop(-1), args.dest) + store.flush_rpc() while dirs: archive.extract_item(dirs.pop(-1), args.dest) return self.exit_code @@ -166,16 +168,21 @@ class Archiver(object): store = self.open_store(args.archive) keychain = Keychain(args.keychain) archive = Archive(store, keychain, args.archive.archive) + def start_cb(item): + self.print_verbose('%s ...', item['path'].decode('utf-8'), newline=False) + def result_cb(item, success): + if success: + self.print_verbose('OK') + else: + self.print_verbose('ERROR') + self.print_error('%s: verification failed' % item['path']) + for item in archive.get_items(): if exclude_path(item['path'], args.patterns): continue if stat.S_ISREG(item['mode']) and not 'source' in item: - self.print_verbose('%s ...', item['path'].decode('utf-8'), newline=False) - if archive.verify_file(item): - self.print_verbose('OK') - else: - self.print_verbose('ERROR') - self.print_error('%s: verification failed' % item['path']) + archive.verify_file(item, start_cb, result_cb) + store.flush_rpc() return self.exit_code def do_info(self, args): diff --git a/darc/remote.py b/darc/remote.py index 1820f70fe..b3fe7b35e 100644 --- a/darc/remote.py +++ b/darc/remote.py @@ -108,21 +108,22 @@ class RemoteStore(object): self.msgid = 0 self.id, self.tid = self.cmd('open', (location.path, create)) + def wait(self): + with self.channel.lock: + if (self.channel.out_window_size == 0 and + not self.channel.recv_ready() and + not self.channel.recv_stderr_ready()): + self.channel.out_buffer_cv.wait(10) + def cmd(self, cmd, args, callback=None, callback_data=None): self.msgid += 1 self.notifier.enabled += 1 + odata = msgpack.packb((0, self.msgid, cmd, args)) if callback: self.callbacks[self.msgid] = callback, callback_data - odata = msgpack.packb((0, self.msgid, cmd, args)) while True: if self.channel.closed: raise Exception('Connection closed') - if odata and self.channel.send_ready(): - n = self.channel.send(odata) - if n > 0: - odata = odata[n:] - if not odata and callback: - return elif self.channel.recv_stderr_ready(): print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE) elif self.channel.recv_ready(): @@ -137,12 +138,14 @@ class RemoteStore(object): c, d = self.callbacks.pop(msgid, (None, None)) if c: c(res, error, d) + elif odata and self.channel.send_ready(): + n = self.channel.send(odata) + if n > 0: + odata = odata[n:] + if not odata and callback: + return else: - with self.channel.lock: - if (self.channel.out_window_size == 0 and - not self.channel.recv_ready() and - not self.channel.recv_stderr_ready()): - self.channel.out_buffer_cv.wait(10) + self.wait() def commit(self, *args): self.cmd('commit', args) @@ -173,3 +176,20 @@ class RemoteStore(object): def list(self, *args): return self.cmd('list', args) + def flush_rpc(self): + while True: + if self.channel.closed: + raise Exception('Connection closed') + elif self.channel.recv_stderr_ready(): + print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE) + elif self.channel.recv_ready(): + self.unpacker.feed(self.channel.recv(BUFSIZE)) + for type, msgid, error, res in self.unpacker: + self.notifier.enabled -= 1 + c, d = self.callbacks.pop(msgid, (None, None)) + if c: + c(res, error, d) + if msgid == self.msgid: + return + else: + self.wait() diff --git a/darc/store.py b/darc/store.py index 8e113f5f1..dcc9ae92d 100644 --- a/darc/store.py +++ b/darc/store.py @@ -211,6 +211,9 @@ class Store(object): def list(self, ns, marker=None, limit=1000000): return [key for key, value in self.get_index(ns).iteritems(marker=marker, limit=limit)] + def flush_rpc(self): + pass + class BandIO(object):