From 8b054a17c31076e240cee2f23960ae2e073a5a8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Borgstr=C3=B6m?= Date: Wed, 17 Oct 2012 11:40:23 +0200 Subject: [PATCH] Remote store rewrite. No more paramiko. --- darc/archive.py | 107 ++++++++++------------------ darc/archiver.py | 79 ++++++++++----------- darc/cache.py | 28 ++++++-- darc/helpers.py | 51 +++----------- darc/remote.py | 176 +++++++++++++---------------------------------- darc/store.py | 16 ++--- 6 files changed, 157 insertions(+), 300 deletions(-) diff --git a/darc/archive.py b/darc/archive.py index cc703a166..fff4553b4 100644 --- a/darc/archive.py +++ b/darc/archive.py @@ -1,6 +1,7 @@ from __future__ import with_statement from datetime import datetime, timedelta from getpass import getuser +from itertools import izip import msgpack import os import socket @@ -12,7 +13,7 @@ from xattr import xattr, XATTR_NOFOLLOW from ._speedups import chunkify from .helpers import uid2user, user2uid, gid2group, group2gid, \ - Counter, encode_filename, Statistics + encode_filename, Statistics ITEMS_BUFFER = 1024 * 1024 CHUNK_SIZE = 64 * 1024 @@ -81,24 +82,12 @@ class Archive(object): def __repr__(self): return 'Archive(%r)' % self.name - def iter_items(self, callback): + def iter_items(self): unpacker = msgpack.Unpacker() - counter = Counter(0) - - def cb(chunk, error, id): - if error: - raise error - assert not error - counter.dec() - data = self.key.decrypt(id, chunk) - unpacker.feed(data) - for item in unpacker: - callback(item) for id in self.metadata['items']: - # Limit the number of concurrent items requests to 10 - self.store.flush_rpc(counter, 10) - counter.inc() - self.store.get(id, callback=cb, callback_data=id) + unpacker.feed(self.key.decrypt(id, self.store.get(id))) + for item in unpacker: + yield item def add_item(self, item): self.items.write(msgpack.packb(item)) @@ -155,10 +144,11 @@ class Archive(object): def calc_stats(self, cache): # This function is a bit evil since it abuses the cache to calculate # the stats. The cache transaction must be rolled back afterwards - def cb(chunk, error, id): - assert not error - data = self.key.decrypt(id, chunk) - unpacker.feed(data) + unpacker = msgpack.Unpacker() + cache.begin_txn() + stats = Statistics() + for id in self.metadata['items']: + unpacker.feed(self.key.decrypt(id, self.store.get(id))) for item in unpacker: try: for id, size, csize in item['chunks']: @@ -168,15 +158,9 @@ class Archive(object): self.cache.chunks[id] = count - 1, size, csize except KeyError: pass - unpacker = msgpack.Unpacker() - cache.begin_txn() - stats = Statistics() - for id in self.metadata['items']: - self.store.get(id, callback=cb, callback_data=id) count, size, csize = self.cache.chunks[id] stats.update(size, csize, count == 1) self.cache.chunks[id] = count - 1, size, csize - self.store.flush_rpc() cache.rollback() return stats @@ -195,33 +179,25 @@ class Archive(object): os.makedirs(os.path.dirname(path)) # Hard link? if 'source' in item: - def link_cb(_, __, item): - source = os.path.join(dest, item['source']) - if os.path.exists(path): - os.unlink(path) - os.link(source, path) - self.store.add_callback(link_cb, item) + source = os.path.join(dest, item['source']) + if os.path.exists(path): + os.unlink(path) + os.link(source, path) else: - def extract_cb(chunk, error, (id, i)): - if i == 0: - state['fd'] = open(path, 'wb') - start_cb(item) - assert not error - data = self.key.decrypt(id, chunk) - state['fd'].write(data) - if i == n - 1: - state['fd'].close() - self.restore_attrs(path, item) - state = {} n = len(item['chunks']) ## 0 chunks indicates an empty (0 bytes) file if n == 0: open(path, 'wb').close() start_cb(item) - self.restore_attrs(path, item) else: - for i, (id, size, csize) in enumerate(item['chunks']): - self.store.get(id, callback=extract_cb, callback_data=(id, i)) + fd = open(path, 'wb') + start_cb(item) + ids = [id for id, size, csize in item['chunks']] + for id, chunk in izip(ids, self.store.get_many(ids)): + data = self.key.decrypt(id, chunk) + fd.write(data) + fd.close() + self.restore_attrs(path, item) elif stat.S_ISFIFO(mode): if not os.path.exists(os.path.dirname(path)): os.makedirs(os.path.dirname(path)) @@ -269,31 +245,24 @@ class Archive(object): os.utime(path, (item['mtime'], item['mtime'])) def verify_file(self, item, start, result): - def verify_chunk(chunk, error, (id, i)): - if error: - if not state: - result(item, False) - state[True] = True - return - if i == 0: - start(item) - self.key.decrypt(id, chunk) - if i == n - 1: - result(item, True) - state = {} - n = len(item['chunks']) - if n == 0: + if not item['chunks']: start(item) result(item, True) else: - for i, (id, size, csize) in enumerate(item['chunks']): - self.store.get(id, callback=verify_chunk, callback_data=(id, i)) + start(item) + ids = [id for id, size, csize in item['chunks']] + try: + for id, chunk in izip(ids, self.store.get_many(ids)): + self.key.decrypt(id, chunk) + except Exception: + result(item, False) + return + result(item, True) def delete(self, cache): - def callback(chunk, error, id): - assert not error - data = self.key.decrypt(id, chunk) - unpacker.feed(data) + unpacker = msgpack.Unpacker() + for id in self.metadata['items']: + unpacker.feed(self.key.decrypt(id, self.store.get(id))) for item in unpacker: try: for chunk_id, size, csize in item['chunks']: @@ -301,10 +270,6 @@ class Archive(object): except KeyError: pass self.cache.chunk_decref(id) - unpacker = msgpack.Unpacker() - for id in self.metadata['items']: - self.store.get(id, callback=callback, callback_data=id) - self.store.flush_rpc() self.cache.chunk_decref(self.id) del self.manifest.archives[self.name] self.manifest.write() diff --git a/darc/archiver.py b/darc/archiver.py index 38340b3d5..65a65d20c 100644 --- a/darc/archiver.py +++ b/darc/archiver.py @@ -10,7 +10,7 @@ from .store import Store from .cache import Cache from .key import Key from .helpers import location_validator, format_time, \ - format_file_mode, IncludePattern, ExcludePattern, exclude_path, to_localtime, \ + format_file_mode, IncludePattern, ExcludePattern, exclude_path, adjust_patterns, to_localtime, \ get_cache_dir, format_timedelta, prune_split, Manifest, Location from .remote import StoreServer, RemoteStore @@ -157,27 +157,22 @@ class Archiver(object): def start_cb(item): self.print_verbose(item['path']) - def extract_cb(item): - if exclude_path(item['path'], args.patterns): - return - if stat.S_ISDIR(item['mode']): - dirs.append(item) - archive.extract_item(item, args.dest, start_cb, restore_attrs=False) - else: - archive.extract_item(item, args.dest, start_cb) - if dirs and not item['path'].startswith(dirs[-1]['path']): - def cb(_, __, item): - # Extract directories twice to make sure mtime is correctly restored - archive.extract_item(item, args.dest) - store.add_callback(cb, dirs.pop(-1)) store = self.open_store(args.archive) key = Key(store) manifest = Manifest(store, key) archive = Archive(store, key, manifest, args.archive.archive, numeric_owner=args.numeric_owner) dirs = [] - archive.iter_items(extract_cb) - store.flush_rpc() + for item in archive.iter_items(): + if exclude_path(item['path'], args.patterns): + continue + if stat.S_ISDIR(item['mode']): + dirs.append(item) + archive.extract_item(item, args.dest, start_cb, restore_attrs=False) + else: + archive.extract_item(item, args.dest, start_cb) + if dirs and not item['path'].startswith(dirs[-1]['path']): + archive.extract_item(dirs.pop(-1), args.dest) while dirs: archive.extract_item(dirs.pop(-1), args.dest) return self.exit_code @@ -192,36 +187,33 @@ class Archiver(object): return self.exit_code def do_list(self, args): - def callback(item): - type = tmap.get(item['mode'] / 4096, '?') - mode = format_file_mode(item['mode']) - size = 0 - if type == '-': - try: - size = sum(size for _, size, _ in item['chunks']) - except KeyError: - pass - mtime = format_time(datetime.fromtimestamp(item['mtime'])) - if 'source' in item: - if type == 'l': - extra = ' -> %s' % item['source'] - else: - type = 'h' - extra = ' link to %s' % item['source'] - else: - extra = '' - print '%s%s %-6s %-6s %8d %s %s%s' % (type, mode, item['user'] or item['uid'], - item['group'] or item['gid'], size, mtime, - item['path'], extra) - store = self.open_store(args.src) key = Key(store) manifest = Manifest(store, key) if args.src.archive: tmap = {1: 'p', 2: 'c', 4: 'd', 6: 'b', 010: '-', 012: 'l', 014: 's'} archive = Archive(store, key, manifest, args.src.archive) - archive.iter_items(callback) - store.flush_rpc() + for item in archive.iter_items(): + type = tmap.get(item['mode'] / 4096, '?') + mode = format_file_mode(item['mode']) + size = 0 + if type == '-': + try: + size = sum(size for _, size, _ in item['chunks']) + except KeyError: + pass + mtime = format_time(datetime.fromtimestamp(item['mtime'])) + if 'source' in item: + if type == 'l': + extra = ' -> %s' % item['source'] + else: + type = 'h' + extra = ' link to %s' % item['source'] + else: + extra = '' + print '%s%s %-6s %-6s %8d %s %s%s' % (type, mode, item['user'] or item['uid'], + item['group'] or item['gid'], size, mtime, + item['path'], extra) else: for archive in sorted(Archive.list_archives(store, key, manifest), key=attrgetter('ts')): print '%-20s %s' % (archive.metadata['name'], to_localtime(archive.ts).strftime('%c')) @@ -242,14 +234,11 @@ class Archiver(object): else: self.print_verbose('ERROR') self.print_error('%s: verification failed' % item['path']) - - def callback(item): + for item in archive.iter_items(): if exclude_path(item['path'], args.patterns): return if stat.S_ISREG(item['mode']) and 'chunks' in item: archive.verify_file(item, start_cb, result_cb) - archive.iter_items(callback) - store.flush_rpc() return self.exit_code def do_info(self, args): @@ -425,6 +414,8 @@ class Archiver(object): help='Store to prune') args = parser.parse_args(args) + if getattr(args, 'patterns', None): + adjust_patterns(args.patterns) self.verbose = args.verbose return args.func(args) diff --git a/darc/cache.py b/darc/cache.py index 10ac0338b..2caa71314 100644 --- a/darc/cache.py +++ b/darc/cache.py @@ -5,7 +5,7 @@ import msgpack import os import shutil -from .helpers import error_callback, get_cache_dir +from .helpers import get_cache_dir from .hashindex import ChunkIndex @@ -41,7 +41,7 @@ class Cache(object): config.write(fd) ChunkIndex.create(os.path.join(self.path, 'chunks')) with open(os.path.join(self.path, 'files'), 'wb') as fd: - pass # empty file + pass # empty file def open(self): if not os.path.isdir(self.path): @@ -158,8 +158,24 @@ class Cache(object): archive = msgpack.unpackb(data) print 'Analyzing archive:', archive['name'] for id in archive['items']: - self.store.get(id, callback=cb, callback_data=id) - self.store.flush_rpc() + chunk = self.store.get(id) + try: + count, size, csize = self.chunks[id] + self.chunks[id] = count + 1, size, csize + except KeyError: + self.chunks[id] = 1, len(data), len(chunk) + unpacker.feed(self.key.decrypt(id, chunk)) + for item in unpacker: + try: + for id, size, csize in item['chunks']: + try: + count, size, csize = self.chunks[id] + self.chunks[id] = count + 1, size, csize + except KeyError: + self.chunks[id] = 1, size, csize + pass + except KeyError: + pass def add_chunk(self, id, data, stats): if not self.txn_active: @@ -169,7 +185,7 @@ class Cache(object): size = len(data) data = self.key.encrypt(data) csize = len(data) - self.store.put(id, data, callback=error_callback) + self.store.put(id, data, wait=False) self.chunks[id] = (1, size, csize) stats.update(size, csize, True) return id, size, csize @@ -191,7 +207,7 @@ class Cache(object): count, size, csize = self.chunks[id] if count == 1: del self.chunks[id] - self.store.delete(id, callback=error_callback) + self.store.delete(id, wait=False) else: self.chunks[id] = (count - 1, size, csize) diff --git a/darc/helpers.py b/darc/helpers.py index bc40fc0c4..fb6401d99 100644 --- a/darc/helpers.py +++ b/darc/helpers.py @@ -84,34 +84,13 @@ class Statistics(object): if sys.platform == 'darwin': def encode_filename(name): try: - name.decode('utf-8') - return name + return name.decode('utf-8') except UnicodeDecodeError: return urllib.quote(name) else: encode_filename = str -class Counter(object): - - __slots__ = ('v',) - - def __init__(self, value=0): - self.v = value - - def inc(self, amount=1): - self.v += amount - - def dec(self, amount=1): - self.v -= amount - - def __cmp__(self, x): - return cmp(self.v, x) - - def __repr__(self): - return '' % self.v - - def get_keys_dir(): """Determine where to store keys and cache""" return os.environ.get('DARC_KEYS_DIR', @@ -124,32 +103,18 @@ def get_cache_dir(): os.path.join(os.path.expanduser('~'), '.darc', 'cache')) -def deferrable(f): - def wrapper(*args, **kw): - callback = kw.pop('callback', None) - if callback: - data = kw.pop('callback_data', None) - try: - res = f(*args, **kw) - except Exception, e: - callback(None, e, data) - else: - callback(res, None, data) - else: - return f(*args, **kw) - return wrapper - - -def error_callback(res, error, data): - if res: - raise res - - def to_localtime(ts): """Convert datetime object from UTC to local time zone""" return ts - timedelta(seconds=time.altzone) +def adjust_patterns(patterns): + if patterns and isinstance(patterns[-1], IncludePattern): + patterns.append(ExcludePattern('*')) + elif patterns and isinstance(patterns[-1], ExcludePattern): + patterns.append(IncludePattern('*')) + + def exclude_path(path, patterns): """Used by create and extract sub-commands to determine if an item should be processed or not diff --git a/darc/remote.py b/darc/remote.py index 1689acdaf..b598dfdf8 100644 --- a/darc/remote.py +++ b/darc/remote.py @@ -2,31 +2,14 @@ from __future__ import with_statement import fcntl import msgpack import os -import paramiko import select +from subprocess import Popen, PIPE import sys import getpass from .store import Store -from .helpers import Counter - -BUFSIZE = 1024 * 1024 - - -class ChannelNotifyer(object): - - def __init__(self, channel): - self.channel = channel - self.enabled = Counter() - - def set(self): - if self.enabled > 0: - with self.channel.lock: - self.channel.out_buffer_cv.notifyAll() - - def clear(self): - pass +BUFSIZE = 10 * 1024 * 1024 class StoreServer(object): @@ -87,134 +70,73 @@ class RemoteStore(object): def __init__(self, name): self.name = name - def __init__(self, location, create=False): - self.client = paramiko.SSHClient() - self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - params = {'username': location.user or getpass.getuser(), - 'hostname': location.host, 'port': location.port} - while True: - try: - self.client.connect(**params) - break - except (paramiko.PasswordRequiredException, - paramiko.AuthenticationException, - paramiko.SSHException): - if not 'password' in params: - params['password'] = getpass.getpass('Password for %(username)s@%(hostname)s:' % params) - else: - raise - self.unpacker = msgpack.Unpacker() - self.transport = self.client.get_transport() - self.channel = self.transport.open_session() - self.notifier = ChannelNotifyer(self.channel) - self.channel.in_buffer.set_event(self.notifier) - self.channel.in_stderr_buffer.set_event(self.notifier) - self.channel.exec_command('darc serve') - self.callbacks = {} self.msgid = 0 - self.recursion = 0 - self.odata = [] - # Negotiate protocol version - version = self.cmd('negotiate', (1,)) + args = ['ssh', '-p', str(location.port), '%s@%s' % (location.user or getpass.getuser(), location.host), 'darc', 'serve'] + self.p = Popen(args, bufsize=0, stdin=PIPE, stdout=PIPE) + self.stdout_fd = self.p.stdout.fileno() + version = self.call('negotiate', (1,)) if version != 1: raise Exception('Server insisted on using unsupported protocol version %d' % version) - self.id = self.cmd('open', (location.path, create)) + self.id = self.call('open', (location.path, create)) - def wait(self, write=True): - with self.channel.lock: - if ((not write or self.channel.out_window_size == 0) and - len(self.channel.in_buffer._buffer) == 0 and - len(self.channel.in_stderr_buffer._buffer) == 0): - self.channel.out_buffer_cv.wait(1) + def __del__(self): + self.p.stdin.close() + self.p.stdout.close() + self.p.wait() - def cmd(self, cmd, args, callback=None, callback_data=None): - self.msgid += 1 - self.notifier.enabled.inc() - self.odata.append(msgpack.packb((1, self.msgid, cmd, args))) - self.recursion += 1 - if callback: - self.add_callback(callback, callback_data) - if self.recursion > 1: - self.recursion -= 1 - return - while True: - if self.channel.closed: - self.recursion -= 1 - raise Exception('Connection closed') - elif self.channel.recv_stderr_ready(): - print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE) - elif self.channel.recv_ready(): - self.unpacker.feed(self.channel.recv(BUFSIZE)) - for type, msgid, error, res in self.unpacker: - self.notifier.enabled.dec() - if msgid == self.msgid: - if error: - self.recursion -= 1 - raise self.RPCError(error) - self.recursion -= 1 - return res - else: - for c, d in self.callbacks.pop(msgid, []): - c(res, error, d) - elif self.odata and self.channel.send_ready(): - data = self.odata.pop(0) - n = self.channel.send(data) - if n != len(data): - self.odata.insert(0, data[n:]) - if not self.odata and callback: - self.recursion -= 1 - return - else: - self.wait(self.odata) + def _read(self, msgids): + data = os.read(self.stdout_fd, BUFSIZE) + self.unpacker.feed(data) + for type, msgid, error, res in self.unpacker: + if error: + raise self.RPCError(error) + if msgid in msgids: + msgids.remove(msgid) + yield res + + def call(self, cmd, args, wait=True): + for res in self.call_multi(cmd, [args], wait=wait): + return res + + def call_multi(self, cmd, argsv, wait=True): + msgids = set() + for args in argsv: + if select.select([self.stdout_fd], [], [], 0)[0]: + for res in self._read(msgids): + yield res + self.msgid += 1 + msgid = self.msgid + msgids.add(msgid) + self.p.stdin.write(msgpack.packb((1, msgid, cmd, args))) + while msgids and wait: + for res in self._read(msgids): + yield res def commit(self, *args): - self.cmd('commit', args) + self.call('commit', args) def rollback(self, *args): - return self.cmd('rollback', args) + return self.call('rollback', args) - def get(self, id, callback=None, callback_data=None): + def get(self, id): try: - return self.cmd('get', (id, ), callback, callback_data) + return self.call('get', (id, )) except self.RPCError, e: if e.name == 'DoesNotExist': raise self.DoesNotExist raise - def put(self, id, data, callback=None, callback_data=None): + def get_many(self, ids): + return self.call_multi('get', [(id, ) for id in ids]) + + def put(self, id, data, wait=True): try: - return self.cmd('put', (id, data), callback, callback_data) + return self.call('put', (id, data), wait=wait) except self.RPCError, e: if e.name == 'AlreadyExists': raise self.AlreadyExists - def delete(self, id, callback=None, callback_data=None): - return self.cmd('delete', (id, ), callback, callback_data) - - def add_callback(self, cb, data): - self.callbacks.setdefault(self.msgid, []).append((cb, data)) - - def flush_rpc(self, counter=None, backlog=0): - counter = counter or self.notifier.enabled - while counter > backlog: - if self.channel.closed: - raise Exception('Connection closed') - elif self.odata and self.channel.send_ready(): - n = self.channel.send(self.odata) - if n > 0: - self.odata = self.odata[n:] - elif self.channel.recv_stderr_ready(): - print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE) - elif self.channel.recv_ready(): - self.unpacker.feed(self.channel.recv(BUFSIZE)) - for type, msgid, error, res in self.unpacker: - self.notifier.enabled.dec() - for c, d in self.callbacks.pop(msgid, []): - c(res, error, d) - if msgid == self.msgid: - return - else: - self.wait(self.odata) - + def delete(self, id, wait=True): + return self.call('delete', (id, ), wait=wait) diff --git a/darc/store.py b/darc/store.py index e37534313..e3bb83fb5 100644 --- a/darc/store.py +++ b/darc/store.py @@ -10,7 +10,7 @@ import unittest from zlib import crc32 from .hashindex import NSIndex -from .helpers import IntegrityError, deferrable, read_msgpack, write_msgpack +from .helpers import IntegrityError, read_msgpack, write_msgpack from .lrucache import LRUCache MAX_OBJECT_SIZE = 20 * 1024 * 1024 @@ -194,7 +194,6 @@ class Store(object): self.recover(self.path) self.open_index(self.io.head, read_only=True) - @deferrable def get(self, id): try: segment, offset = self.index[id] @@ -202,8 +201,11 @@ class Store(object): except KeyError: raise self.DoesNotExist - @deferrable - def put(self, id, data): + def get_many(self, ids): + for id in ids: + yield self.get(id) + + def put(self, id, data, wait=True): if not self._active_txn: self._active_txn = True self.open_index(self.io.head) @@ -219,8 +221,7 @@ class Store(object): self.segments[segment] += 1 self.index[id] = segment, offset - @deferrable - def delete(self, id): + def delete(self, id, wait=True): if not self._active_txn: self._active_txn = True self.open_index(self.io.head) @@ -232,9 +233,6 @@ class Store(object): except KeyError: raise self.DoesNotExist - def flush_rpc(self, *args): - pass - def add_callback(self, cb, data): cb(None, None, data)