diff --git a/darc/archiver.py b/darc/archiver.py index 1d571e42a..c98c08768 100644 --- a/darc/archiver.py +++ b/darc/archiver.py @@ -10,15 +10,18 @@ from .cache import Cache from .keychain import Keychain from .helpers import location_validator, format_file_size, format_time,\ format_file_mode, walk_path, IncludePattern, ExcludePattern, exclude_path - +from .remote import StoreServer, RemoteStore class Archiver(object): def __init__(self): self.exit_code = 0 - def open_store(self, location): - return Store(location.path) + def open_store(self, location, create=False): + if location.host: + return RemoteStore(location, create=create) + else: + return Store(location.path, create=create) def print_error(self, msg, *args): msg = args and msg % args or msg @@ -38,9 +41,16 @@ class Archiver(object): print msg, def do_init(self, args): - Store(args.store.path, create=True) + self.open_store(args.store, create=True) return self.exit_code + def do_serve(self, args): + try: + return StoreServer().serve() + except Exception, e: + self.print_error('eek', repr(e)) + return 1 + def do_create(self, args): store = self.open_store(args.archive) keychain = Keychain(args.keychain) @@ -143,7 +153,7 @@ class Archiver(object): archive.get_items() for item in archive.items: if stat.S_ISREG(item['mode']) and not 'source' in item: - self.print_verbose('%s ...', item['path'], newline=False) + self.print_verbose('%s ...', item['path'].decode('utf-8'), newline=False) if archive.verify_file(item): self.print_verbose('OK') else: @@ -207,6 +217,9 @@ class Archiver(object): type=location_validator(archive=False), help='Store to initialize') + subparser = subparsers.add_parser('serve') + subparser.set_defaults(func=self.do_serve) + subparser = subparsers.add_parser('create') subparser.set_defaults(func=self.do_create) subparser.add_argument('-i', '--include', dest='patterns', diff --git a/darc/cache.py b/darc/cache.py index acd02da18..25bda823e 100644 --- a/darc/cache.py +++ b/darc/cache.py @@ -60,7 +60,6 @@ class Cache(object): yield key, (value[0] + 1,) + value[1:] def save(self): - assert self.store.state == self.store.OPEN cache = {'version': 1, 'tid': self.store.tid, 'chunk_counts': self.chunk_counts, diff --git a/darc/remote.py b/darc/remote.py new file mode 100644 index 000000000..e742e98f5 --- /dev/null +++ b/darc/remote.py @@ -0,0 +1,127 @@ +import fcntl +import msgpack +import os +import paramiko +import select +import sys +import getpass + +from .store import Store + + +BUFSIZE = 1024 * 1024 + + +class StoreServer(object): + + def __init__(self): + self.store = None + + def serve(self): + # Make stdin non-blocking + fl = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL) + fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK) + unpacker = msgpack.Unpacker() + while True: + r, w, es = select.select([sys.stdin], [], [], 10) + if r: + data = os.read(sys.stdin.fileno(), BUFSIZE) + if not data: + return + unpacker.feed(data) + for type, msgid, method, args in unpacker: + try: + if method == 'open': + self.store = Store(*args) + res = self.store.id, self.store.tid + else: + res = getattr(self.store, method)(*args) + except Exception, e: + sys.stdout.write(msgpack.packb((1, msgid, e.__class__.__name__, None))) + else: + sys.stdout.write(msgpack.packb((1, msgid, None, res))) + sys.stdout.flush() + if es: + return + + +class RemoteStore(object): + + class DoesNotExist(Exception): + pass + + class AlreadyExists(Exception): + pass + + class RPCError(Exception): + + def __init__(self, name): + self.name = name + + + def __init__(self, location, create=False): + self.client = paramiko.SSHClient() + self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + params = {'username': location.user or getpass.getuser(), + 'hostname': location.host} + while True: + try: + self.client.connect(**params) + break + except paramiko.PasswordRequiredException: + params['password'] = getpass.getpass('Password for %(username)s@%(hostname)s:' % params) + + self.unpacker = msgpack.Unpacker() + self.transport = self.client.get_transport() + self.channel = self.transport.open_session() + self.channel.exec_command('darc serve') + self.msgid = 0 + self.id, self.tid = self._cmd('open', (location.path, create)) + + def _cmd(self, cmd, args): + self.msgid += 1 + self.channel.sendall(msgpack.packb((0, self.msgid, cmd, args))) + while True: + r, w, e = select.select([self.channel], [], [self.channel], 10) + if r: + if self.channel.recv_stderr_ready(): + raise Exception(self.channel.recv_stderr(BUFSIZE)) + elif self.channel.recv_ready(): + self.unpacker.feed(self.channel.recv(BUFSIZE)) + for type, msgid, error, res in self.unpacker: + if error: + raise self.RPCError(error) + return res + else: + raise Exception('Read event but no data?!?') + if e: + raise Exception('ssh channel error') + + def commit(self, *args): + self._cmd('commit', args) + self.tid += 1 + + def rollback(self, *args): + return self._cmd('rollback', args) + + def get(self, *args): + try: + return self._cmd('get', args) + except self.RPCError, e: + if e.name == 'DoesNotExist': + raise self.DoesNotExist + raise + + def put(self, *args): + try: + return self._cmd('put', args) + except self.RPCError, e: + if e.name == 'AlreadyExists': + raise self.AlreadyExists + + def delete(self, *args): + return self._cmd('delete', args) + + def list(self, *args): + return self._cmd('list', args) + diff --git a/darc/store.py b/darc/store.py index c773485ef..ff7670de0 100644 --- a/darc/store.py +++ b/darc/store.py @@ -206,8 +206,7 @@ class Store(object): if marker: sql += ' AND id >= :marker' args['marker'] = Binary(marker) - for row in self.cursor.execute(sql + ' LIMIT ' + str(max_keys), args): - yield str(row[0]) + return list(str(row[0]) for row in self.cursor.execute(sql + ' LIMIT ' + str(max_keys), args)) class StoreTestCase(unittest.TestCase): diff --git a/setup.py b/setup.py index b18c4236c..aa3cfc46b 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ import sys from setuptools import setup, Extension -dependencies = ['pycrypto', 'msgpack-python', 'pbkdf2.py', 'xattr'] +dependencies = ['pycrypto', 'msgpack-python', 'pbkdf2.py', 'xattr', 'paramiko'] if sys.version_info < (2, 7): dependencies.append('argparse')