diff --git a/darc/remote.py b/darc/remote.py index 55d83260a..17cf116bd 100644 --- a/darc/remote.py +++ b/darc/remote.py @@ -12,6 +12,21 @@ from .store import Store BUFSIZE = 1024 * 1024 +class ChannelNotifyer(object): + + def __init__(self, channel): + self.channel = channel + self.enabled = True + + def set(self): + if self.enabled: + with self.channel.lock: + self.channel.out_buffer_cv.notifyAll() + + def clear(self): + pass + + class StoreServer(object): def __init__(self): @@ -39,7 +54,8 @@ class StoreServer(object): except Exception, e: sys.stdout.write(msgpack.packb((1, msgid, e.__class__.__name__, None))) else: - sys.stdout.write(msgpack.packb((1, msgid, None, res))) + if method not in ('put', 'delete'): + sys.stdout.write(msgpack.packb((1, msgid, None, res))) sys.stdout.flush() if es: return @@ -75,38 +91,53 @@ class RemoteStore(object): self.client.connect(**params) break except (paramiko.PasswordRequiredException, - paramiko.AuthenticationException, - paramiko.SSHException): - if not 'password' in params: - params['password'] = getpass.getpass('Password for %(username)s@%(hostname)s:' % params) - else: - raise + paramiko.AuthenticationException, + paramiko.SSHException): + if not 'password' in params: + params['password'] = getpass.getpass('Password for %(username)s@%(hostname)s:' % params) + else: + raise self.unpacker = msgpack.Unpacker() self.transport = self.client.get_transport() self.channel = self.transport.open_session() + self.notifier = ChannelNotifyer(self.channel) + self.channel.in_buffer.set_event(self.notifier) + self.channel.in_stderr_buffer.set_event(self.notifier) self.channel.exec_command('darc serve') self.msgid = 0 self.id, self.tid = self._cmd('open', (location.path, create)) - def _cmd(self, cmd, args): + def _cmd(self, *args, **kw): + self.notifier.enabled = True + try: + return self._cmd2(*args, **kw) + finally: + self.notifier.enabled = False + + def _cmd2(self, cmd, args, defer=False): self.msgid += 1 - self.channel.sendall(msgpack.packb((0, self.msgid, cmd, args))) + odata = msgpack.packb((0, self.msgid, cmd, args)) while True: - r, w, e = select.select([self.channel], [], [self.channel], 10) - if r: - if self.channel.closed: - raise Exception('Connection closed') - if self.channel.recv_stderr_ready(): - print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE) - elif self.channel.recv_ready(): - self.unpacker.feed(self.channel.recv(BUFSIZE)) - for type, msgid, error, res in self.unpacker: - if error: - raise self.RPCError(error) - return res - if e: - raise Exception('ssh channel error') + if self.channel.closed: + raise Exception('Connection closed') + if odata and self.channel.send_ready(): + n = self.channel.send(odata) + if n > 0: + odata = odata[n:] + if not odata and defer: + return + elif self.channel.recv_stderr_ready(): + print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE) + elif self.channel.recv_ready(): + self.unpacker.feed(self.channel.recv(BUFSIZE)) + for type, msgid, error, res in self.unpacker: + if error: + raise self.RPCError(error) + return res + else: + with self.channel.lock: + self.channel.out_buffer_cv.wait(10) def commit(self, *args): self._cmd('commit', args) @@ -125,13 +156,13 @@ class RemoteStore(object): def put(self, *args): try: - return self._cmd('put', args) + return self._cmd('put', args, defer=True) except self.RPCError, e: if e.name == 'AlreadyExists': raise self.AlreadyExists def delete(self, *args): - return self._cmd('delete', args) + return self._cmd('delete', args, defer=True) def list(self, *args): return self._cmd('list', args) diff --git a/darc/store.py b/darc/store.py index 322a9d28d..6b3cd7b2c 100644 --- a/darc/store.py +++ b/darc/store.py @@ -205,8 +205,7 @@ class Store(object): raise self.DoesNotExist def list(self, ns, marker=None, limit=1000000): - for key, value in self.get_index(ns).iteritems(marker=marker, limit=limit): - yield key + return [key for key, value in self.get_index(ns).iteritems(marker=marker, limit=limit)] class BandIO(object):