From c125ecc2f9effff465dfb84ea1f01eb4b0e1b661 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Borgstr=C3=B6m?= Date: Wed, 6 Jul 2011 22:16:07 +0200 Subject: [PATCH] Some experimental remote store optimizations --- darc/archive.py | 11 ++++---- darc/remote.py | 75 ++++++++++++++++++++++++++++++++++++++----------- darc/store.py | 4 +++ 3 files changed, 68 insertions(+), 22 deletions(-) diff --git a/darc/archive.py b/darc/archive.py index 69f0417a4..ab5b297e7 100644 --- a/darc/archive.py +++ b/darc/archive.py @@ -6,6 +6,7 @@ import os import socket import stat import sys +from itertools import izip from xattr import xattr, XATTR_NOFOLLOW from . import NS_ARCHIVE_METADATA, NS_ARCHIVE_ITEMS, NS_ARCHIVE_CHUNKS, NS_CHUNK, \ @@ -196,14 +197,14 @@ class Archive(object): os.utime(path, (item['atime'], item['mtime'])) def verify_file(self, item): - for id in item['chunks']: - try: - magic, data, hash = self.keychain.decrypt(self.store.get(NS_CHUNK, id)) + try: + for id, chunk in izip(item['chunks'], self.store.get_many(NS_CHUNK, item['chunks'])): + magic, data, hash = self.keychain.decrypt(chunk) assert magic == PACKET_CHUNK if self.keychain.id_hash(data) != id: raise IntegrityError('chunk id did not match') - except IntegrityError: - return False + except IntegrityError: + return False return True def delete(self, cache): diff --git a/darc/remote.py b/darc/remote.py index 17cf116bd..1edffe052 100644 --- a/darc/remote.py +++ b/darc/remote.py @@ -16,7 +16,7 @@ class ChannelNotifyer(object): def __init__(self, channel): self.channel = channel - self.enabled = True + self.enabled = 0 def set(self): if self.enabled: @@ -55,7 +55,12 @@ class StoreServer(object): sys.stdout.write(msgpack.packb((1, msgid, e.__class__.__name__, None))) else: if method not in ('put', 'delete'): - sys.stdout.write(msgpack.packb((1, msgid, None, res))) + if hasattr(res, 'next'): + for r in res: + sys.stdout.write(msgpack.packb((1, msgid, None, r))) + sys.stdout.write(msgpack.packb((2, msgid, None, None))) + else: + sys.stdout.write(msgpack.packb((1, msgid, None, res))) sys.stdout.flush() if es: return @@ -106,18 +111,12 @@ class RemoteStore(object): self.channel.in_stderr_buffer.set_event(self.notifier) self.channel.exec_command('darc serve') self.msgid = 0 - self.id, self.tid = self._cmd('open', (location.path, create)) + self.id, self.tid = self.cmd('open', (location.path, create)) - def _cmd(self, *args, **kw): - self.notifier.enabled = True - try: - return self._cmd2(*args, **kw) - finally: - self.notifier.enabled = False - - def _cmd2(self, cmd, args, defer=False): + def cmd(self, cmd, args, defer=False): self.msgid += 1 odata = msgpack.packb((0, self.msgid, cmd, args)) + self.notifier.enabled += 1 while True: if self.channel.closed: raise Exception('Connection closed') @@ -126,6 +125,7 @@ class RemoteStore(object): if n > 0: odata = odata[n:] if not odata and defer: + self.notifier.enabled -= 1 return elif self.channel.recv_stderr_ready(): print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE) @@ -134,21 +134,62 @@ class RemoteStore(object): for type, msgid, error, res in self.unpacker: if error: raise self.RPCError(error) + self.notifier.enabled -= 1 return res else: with self.channel.lock: self.channel.out_buffer_cv.wait(10) + def cmd_iter(self, cmd, args): + self.msgid += 1 + odata = msgpack.packb((0, self.msgid, cmd, args)) + self.notifier.enabled += 1 + try: + while True: + if self.channel.closed: + raise Exception('Connection closed') + if odata and self.channel.send_ready(): + n = self.channel.send(odata) + if n > 0: + odata = odata[n:] + if self.channel.recv_stderr_ready(): + print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE) + elif self.channel.recv_ready(): + self.unpacker.feed(self.channel.recv(BUFSIZE)) + for type, msgid, error, res in self.unpacker: + if msgid < self.msgid: + continue + if error: + raise self.RPCError(error) + if type == 1: + yield res + else: + return + else: + with self.channel.lock: + self.channel.out_buffer_cv.wait(10) + finally: + self.notifier.enabled -= 1 + def commit(self, *args): - self._cmd('commit', args) + self.cmd('commit', args) self.tid += 1 def rollback(self, *args): - return self._cmd('rollback', args) + return self.cmd('rollback', args) def get(self, *args): try: - return self._cmd('get', args) + return self.cmd('get', args) + except self.RPCError, e: + print e.name + if e.name == 'DoesNotExist': + raise self.DoesNotExist + raise + + def get_many(self, *args): + try: + return self.cmd_iter('get_many', args) except self.RPCError, e: if e.name == 'DoesNotExist': raise self.DoesNotExist @@ -156,14 +197,14 @@ class RemoteStore(object): def put(self, *args): try: - return self._cmd('put', args, defer=True) + return self.cmd('put', args, defer=True) except self.RPCError, e: if e.name == 'AlreadyExists': raise self.AlreadyExists def delete(self, *args): - return self._cmd('delete', args, defer=True) + return self.cmd('delete', args, defer=True) def list(self, *args): - return self._cmd('list', args) + return self.cmd('list', args) diff --git a/darc/store.py b/darc/store.py index 6b3cd7b2c..9543f30f4 100644 --- a/darc/store.py +++ b/darc/store.py @@ -185,6 +185,10 @@ class Store(object): except KeyError: raise self.DoesNotExist + def get_many(self, ns, ids): + for id in ids: + yield self.get(ns, id) + def put(self, ns, id, data): if not self.txn_active: self.begin_txn()