Remote store rewrite. No more paramiko.

This commit is contained in:
Jonas Borgström 2012-10-17 11:40:23 +02:00
parent 93add0680d
commit 8b054a17c3
6 changed files with 157 additions and 300 deletions

View File

@ -1,6 +1,7 @@
from __future__ import with_statement
from datetime import datetime, timedelta
from getpass import getuser
from itertools import izip
import msgpack
import os
import socket
@ -12,7 +13,7 @@ from xattr import xattr, XATTR_NOFOLLOW
from ._speedups import chunkify
from .helpers import uid2user, user2uid, gid2group, group2gid, \
Counter, encode_filename, Statistics
encode_filename, Statistics
ITEMS_BUFFER = 1024 * 1024
CHUNK_SIZE = 64 * 1024
@ -81,24 +82,12 @@ class Archive(object):
def __repr__(self):
return 'Archive(%r)' % self.name
def iter_items(self, callback):
def iter_items(self):
unpacker = msgpack.Unpacker()
counter = Counter(0)
def cb(chunk, error, id):
if error:
raise error
assert not error
counter.dec()
data = self.key.decrypt(id, chunk)
unpacker.feed(data)
for item in unpacker:
callback(item)
for id in self.metadata['items']:
# Limit the number of concurrent items requests to 10
self.store.flush_rpc(counter, 10)
counter.inc()
self.store.get(id, callback=cb, callback_data=id)
unpacker.feed(self.key.decrypt(id, self.store.get(id)))
for item in unpacker:
yield item
def add_item(self, item):
self.items.write(msgpack.packb(item))
@ -155,10 +144,11 @@ class Archive(object):
def calc_stats(self, cache):
# This function is a bit evil since it abuses the cache to calculate
# the stats. The cache transaction must be rolled back afterwards
def cb(chunk, error, id):
assert not error
data = self.key.decrypt(id, chunk)
unpacker.feed(data)
unpacker = msgpack.Unpacker()
cache.begin_txn()
stats = Statistics()
for id in self.metadata['items']:
unpacker.feed(self.key.decrypt(id, self.store.get(id)))
for item in unpacker:
try:
for id, size, csize in item['chunks']:
@ -168,15 +158,9 @@ class Archive(object):
self.cache.chunks[id] = count - 1, size, csize
except KeyError:
pass
unpacker = msgpack.Unpacker()
cache.begin_txn()
stats = Statistics()
for id in self.metadata['items']:
self.store.get(id, callback=cb, callback_data=id)
count, size, csize = self.cache.chunks[id]
stats.update(size, csize, count == 1)
self.cache.chunks[id] = count - 1, size, csize
self.store.flush_rpc()
cache.rollback()
return stats
@ -195,33 +179,25 @@ class Archive(object):
os.makedirs(os.path.dirname(path))
# Hard link?
if 'source' in item:
def link_cb(_, __, item):
source = os.path.join(dest, item['source'])
if os.path.exists(path):
os.unlink(path)
os.link(source, path)
self.store.add_callback(link_cb, item)
source = os.path.join(dest, item['source'])
if os.path.exists(path):
os.unlink(path)
os.link(source, path)
else:
def extract_cb(chunk, error, (id, i)):
if i == 0:
state['fd'] = open(path, 'wb')
start_cb(item)
assert not error
data = self.key.decrypt(id, chunk)
state['fd'].write(data)
if i == n - 1:
state['fd'].close()
self.restore_attrs(path, item)
state = {}
n = len(item['chunks'])
## 0 chunks indicates an empty (0 bytes) file
if n == 0:
open(path, 'wb').close()
start_cb(item)
self.restore_attrs(path, item)
else:
for i, (id, size, csize) in enumerate(item['chunks']):
self.store.get(id, callback=extract_cb, callback_data=(id, i))
fd = open(path, 'wb')
start_cb(item)
ids = [id for id, size, csize in item['chunks']]
for id, chunk in izip(ids, self.store.get_many(ids)):
data = self.key.decrypt(id, chunk)
fd.write(data)
fd.close()
self.restore_attrs(path, item)
elif stat.S_ISFIFO(mode):
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
@ -269,31 +245,24 @@ class Archive(object):
os.utime(path, (item['mtime'], item['mtime']))
def verify_file(self, item, start, result):
def verify_chunk(chunk, error, (id, i)):
if error:
if not state:
result(item, False)
state[True] = True
return
if i == 0:
start(item)
self.key.decrypt(id, chunk)
if i == n - 1:
result(item, True)
state = {}
n = len(item['chunks'])
if n == 0:
if not item['chunks']:
start(item)
result(item, True)
else:
for i, (id, size, csize) in enumerate(item['chunks']):
self.store.get(id, callback=verify_chunk, callback_data=(id, i))
start(item)
ids = [id for id, size, csize in item['chunks']]
try:
for id, chunk in izip(ids, self.store.get_many(ids)):
self.key.decrypt(id, chunk)
except Exception:
result(item, False)
return
result(item, True)
def delete(self, cache):
def callback(chunk, error, id):
assert not error
data = self.key.decrypt(id, chunk)
unpacker.feed(data)
unpacker = msgpack.Unpacker()
for id in self.metadata['items']:
unpacker.feed(self.key.decrypt(id, self.store.get(id)))
for item in unpacker:
try:
for chunk_id, size, csize in item['chunks']:
@ -301,10 +270,6 @@ class Archive(object):
except KeyError:
pass
self.cache.chunk_decref(id)
unpacker = msgpack.Unpacker()
for id in self.metadata['items']:
self.store.get(id, callback=callback, callback_data=id)
self.store.flush_rpc()
self.cache.chunk_decref(self.id)
del self.manifest.archives[self.name]
self.manifest.write()

View File

@ -10,7 +10,7 @@ from .store import Store
from .cache import Cache
from .key import Key
from .helpers import location_validator, format_time, \
format_file_mode, IncludePattern, ExcludePattern, exclude_path, to_localtime, \
format_file_mode, IncludePattern, ExcludePattern, exclude_path, adjust_patterns, to_localtime, \
get_cache_dir, format_timedelta, prune_split, Manifest, Location
from .remote import StoreServer, RemoteStore
@ -157,27 +157,22 @@ class Archiver(object):
def start_cb(item):
self.print_verbose(item['path'])
def extract_cb(item):
if exclude_path(item['path'], args.patterns):
return
if stat.S_ISDIR(item['mode']):
dirs.append(item)
archive.extract_item(item, args.dest, start_cb, restore_attrs=False)
else:
archive.extract_item(item, args.dest, start_cb)
if dirs and not item['path'].startswith(dirs[-1]['path']):
def cb(_, __, item):
# Extract directories twice to make sure mtime is correctly restored
archive.extract_item(item, args.dest)
store.add_callback(cb, dirs.pop(-1))
store = self.open_store(args.archive)
key = Key(store)
manifest = Manifest(store, key)
archive = Archive(store, key, manifest, args.archive.archive,
numeric_owner=args.numeric_owner)
dirs = []
archive.iter_items(extract_cb)
store.flush_rpc()
for item in archive.iter_items():
if exclude_path(item['path'], args.patterns):
continue
if stat.S_ISDIR(item['mode']):
dirs.append(item)
archive.extract_item(item, args.dest, start_cb, restore_attrs=False)
else:
archive.extract_item(item, args.dest, start_cb)
if dirs and not item['path'].startswith(dirs[-1]['path']):
archive.extract_item(dirs.pop(-1), args.dest)
while dirs:
archive.extract_item(dirs.pop(-1), args.dest)
return self.exit_code
@ -192,36 +187,33 @@ class Archiver(object):
return self.exit_code
def do_list(self, args):
def callback(item):
type = tmap.get(item['mode'] / 4096, '?')
mode = format_file_mode(item['mode'])
size = 0
if type == '-':
try:
size = sum(size for _, size, _ in item['chunks'])
except KeyError:
pass
mtime = format_time(datetime.fromtimestamp(item['mtime']))
if 'source' in item:
if type == 'l':
extra = ' -> %s' % item['source']
else:
type = 'h'
extra = ' link to %s' % item['source']
else:
extra = ''
print '%s%s %-6s %-6s %8d %s %s%s' % (type, mode, item['user'] or item['uid'],
item['group'] or item['gid'], size, mtime,
item['path'], extra)
store = self.open_store(args.src)
key = Key(store)
manifest = Manifest(store, key)
if args.src.archive:
tmap = {1: 'p', 2: 'c', 4: 'd', 6: 'b', 010: '-', 012: 'l', 014: 's'}
archive = Archive(store, key, manifest, args.src.archive)
archive.iter_items(callback)
store.flush_rpc()
for item in archive.iter_items():
type = tmap.get(item['mode'] / 4096, '?')
mode = format_file_mode(item['mode'])
size = 0
if type == '-':
try:
size = sum(size for _, size, _ in item['chunks'])
except KeyError:
pass
mtime = format_time(datetime.fromtimestamp(item['mtime']))
if 'source' in item:
if type == 'l':
extra = ' -> %s' % item['source']
else:
type = 'h'
extra = ' link to %s' % item['source']
else:
extra = ''
print '%s%s %-6s %-6s %8d %s %s%s' % (type, mode, item['user'] or item['uid'],
item['group'] or item['gid'], size, mtime,
item['path'], extra)
else:
for archive in sorted(Archive.list_archives(store, key, manifest), key=attrgetter('ts')):
print '%-20s %s' % (archive.metadata['name'], to_localtime(archive.ts).strftime('%c'))
@ -242,14 +234,11 @@ class Archiver(object):
else:
self.print_verbose('ERROR')
self.print_error('%s: verification failed' % item['path'])
def callback(item):
for item in archive.iter_items():
if exclude_path(item['path'], args.patterns):
return
if stat.S_ISREG(item['mode']) and 'chunks' in item:
archive.verify_file(item, start_cb, result_cb)
archive.iter_items(callback)
store.flush_rpc()
return self.exit_code
def do_info(self, args):
@ -425,6 +414,8 @@ class Archiver(object):
help='Store to prune')
args = parser.parse_args(args)
if getattr(args, 'patterns', None):
adjust_patterns(args.patterns)
self.verbose = args.verbose
return args.func(args)

View File

@ -5,7 +5,7 @@ import msgpack
import os
import shutil
from .helpers import error_callback, get_cache_dir
from .helpers import get_cache_dir
from .hashindex import ChunkIndex
@ -41,7 +41,7 @@ class Cache(object):
config.write(fd)
ChunkIndex.create(os.path.join(self.path, 'chunks'))
with open(os.path.join(self.path, 'files'), 'wb') as fd:
pass # empty file
pass # empty file
def open(self):
if not os.path.isdir(self.path):
@ -158,8 +158,24 @@ class Cache(object):
archive = msgpack.unpackb(data)
print 'Analyzing archive:', archive['name']
for id in archive['items']:
self.store.get(id, callback=cb, callback_data=id)
self.store.flush_rpc()
chunk = self.store.get(id)
try:
count, size, csize = self.chunks[id]
self.chunks[id] = count + 1, size, csize
except KeyError:
self.chunks[id] = 1, len(data), len(chunk)
unpacker.feed(self.key.decrypt(id, chunk))
for item in unpacker:
try:
for id, size, csize in item['chunks']:
try:
count, size, csize = self.chunks[id]
self.chunks[id] = count + 1, size, csize
except KeyError:
self.chunks[id] = 1, size, csize
pass
except KeyError:
pass
def add_chunk(self, id, data, stats):
if not self.txn_active:
@ -169,7 +185,7 @@ class Cache(object):
size = len(data)
data = self.key.encrypt(data)
csize = len(data)
self.store.put(id, data, callback=error_callback)
self.store.put(id, data, wait=False)
self.chunks[id] = (1, size, csize)
stats.update(size, csize, True)
return id, size, csize
@ -191,7 +207,7 @@ class Cache(object):
count, size, csize = self.chunks[id]
if count == 1:
del self.chunks[id]
self.store.delete(id, callback=error_callback)
self.store.delete(id, wait=False)
else:
self.chunks[id] = (count - 1, size, csize)

View File

@ -84,34 +84,13 @@ class Statistics(object):
if sys.platform == 'darwin':
def encode_filename(name):
try:
name.decode('utf-8')
return name
return name.decode('utf-8')
except UnicodeDecodeError:
return urllib.quote(name)
else:
encode_filename = str
class Counter(object):
__slots__ = ('v',)
def __init__(self, value=0):
self.v = value
def inc(self, amount=1):
self.v += amount
def dec(self, amount=1):
self.v -= amount
def __cmp__(self, x):
return cmp(self.v, x)
def __repr__(self):
return '<Counter(%r)>' % self.v
def get_keys_dir():
"""Determine where to store keys and cache"""
return os.environ.get('DARC_KEYS_DIR',
@ -124,32 +103,18 @@ def get_cache_dir():
os.path.join(os.path.expanduser('~'), '.darc', 'cache'))
def deferrable(f):
def wrapper(*args, **kw):
callback = kw.pop('callback', None)
if callback:
data = kw.pop('callback_data', None)
try:
res = f(*args, **kw)
except Exception, e:
callback(None, e, data)
else:
callback(res, None, data)
else:
return f(*args, **kw)
return wrapper
def error_callback(res, error, data):
if res:
raise res
def to_localtime(ts):
"""Convert datetime object from UTC to local time zone"""
return ts - timedelta(seconds=time.altzone)
def adjust_patterns(patterns):
if patterns and isinstance(patterns[-1], IncludePattern):
patterns.append(ExcludePattern('*'))
elif patterns and isinstance(patterns[-1], ExcludePattern):
patterns.append(IncludePattern('*'))
def exclude_path(path, patterns):
"""Used by create and extract sub-commands to determine
if an item should be processed or not

View File

@ -2,31 +2,14 @@ from __future__ import with_statement
import fcntl
import msgpack
import os
import paramiko
import select
from subprocess import Popen, PIPE
import sys
import getpass
from .store import Store
from .helpers import Counter
BUFSIZE = 1024 * 1024
class ChannelNotifyer(object):
def __init__(self, channel):
self.channel = channel
self.enabled = Counter()
def set(self):
if self.enabled > 0:
with self.channel.lock:
self.channel.out_buffer_cv.notifyAll()
def clear(self):
pass
BUFSIZE = 10 * 1024 * 1024
class StoreServer(object):
@ -87,134 +70,73 @@ class RemoteStore(object):
def __init__(self, name):
self.name = name
def __init__(self, location, create=False):
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
params = {'username': location.user or getpass.getuser(),
'hostname': location.host, 'port': location.port}
while True:
try:
self.client.connect(**params)
break
except (paramiko.PasswordRequiredException,
paramiko.AuthenticationException,
paramiko.SSHException):
if not 'password' in params:
params['password'] = getpass.getpass('Password for %(username)s@%(hostname)s:' % params)
else:
raise
self.unpacker = msgpack.Unpacker()
self.transport = self.client.get_transport()
self.channel = self.transport.open_session()
self.notifier = ChannelNotifyer(self.channel)
self.channel.in_buffer.set_event(self.notifier)
self.channel.in_stderr_buffer.set_event(self.notifier)
self.channel.exec_command('darc serve')
self.callbacks = {}
self.msgid = 0
self.recursion = 0
self.odata = []
# Negotiate protocol version
version = self.cmd('negotiate', (1,))
args = ['ssh', '-p', str(location.port), '%s@%s' % (location.user or getpass.getuser(), location.host), 'darc', 'serve']
self.p = Popen(args, bufsize=0, stdin=PIPE, stdout=PIPE)
self.stdout_fd = self.p.stdout.fileno()
version = self.call('negotiate', (1,))
if version != 1:
raise Exception('Server insisted on using unsupported protocol version %d' % version)
self.id = self.cmd('open', (location.path, create))
self.id = self.call('open', (location.path, create))
def wait(self, write=True):
with self.channel.lock:
if ((not write or self.channel.out_window_size == 0) and
len(self.channel.in_buffer._buffer) == 0 and
len(self.channel.in_stderr_buffer._buffer) == 0):
self.channel.out_buffer_cv.wait(1)
def __del__(self):
self.p.stdin.close()
self.p.stdout.close()
self.p.wait()
def cmd(self, cmd, args, callback=None, callback_data=None):
self.msgid += 1
self.notifier.enabled.inc()
self.odata.append(msgpack.packb((1, self.msgid, cmd, args)))
self.recursion += 1
if callback:
self.add_callback(callback, callback_data)
if self.recursion > 1:
self.recursion -= 1
return
while True:
if self.channel.closed:
self.recursion -= 1
raise Exception('Connection closed')
elif self.channel.recv_stderr_ready():
print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE)
elif self.channel.recv_ready():
self.unpacker.feed(self.channel.recv(BUFSIZE))
for type, msgid, error, res in self.unpacker:
self.notifier.enabled.dec()
if msgid == self.msgid:
if error:
self.recursion -= 1
raise self.RPCError(error)
self.recursion -= 1
return res
else:
for c, d in self.callbacks.pop(msgid, []):
c(res, error, d)
elif self.odata and self.channel.send_ready():
data = self.odata.pop(0)
n = self.channel.send(data)
if n != len(data):
self.odata.insert(0, data[n:])
if not self.odata and callback:
self.recursion -= 1
return
else:
self.wait(self.odata)
def _read(self, msgids):
data = os.read(self.stdout_fd, BUFSIZE)
self.unpacker.feed(data)
for type, msgid, error, res in self.unpacker:
if error:
raise self.RPCError(error)
if msgid in msgids:
msgids.remove(msgid)
yield res
def call(self, cmd, args, wait=True):
for res in self.call_multi(cmd, [args], wait=wait):
return res
def call_multi(self, cmd, argsv, wait=True):
msgids = set()
for args in argsv:
if select.select([self.stdout_fd], [], [], 0)[0]:
for res in self._read(msgids):
yield res
self.msgid += 1
msgid = self.msgid
msgids.add(msgid)
self.p.stdin.write(msgpack.packb((1, msgid, cmd, args)))
while msgids and wait:
for res in self._read(msgids):
yield res
def commit(self, *args):
self.cmd('commit', args)
self.call('commit', args)
def rollback(self, *args):
return self.cmd('rollback', args)
return self.call('rollback', args)
def get(self, id, callback=None, callback_data=None):
def get(self, id):
try:
return self.cmd('get', (id, ), callback, callback_data)
return self.call('get', (id, ))
except self.RPCError, e:
if e.name == 'DoesNotExist':
raise self.DoesNotExist
raise
def put(self, id, data, callback=None, callback_data=None):
def get_many(self, ids):
return self.call_multi('get', [(id, ) for id in ids])
def put(self, id, data, wait=True):
try:
return self.cmd('put', (id, data), callback, callback_data)
return self.call('put', (id, data), wait=wait)
except self.RPCError, e:
if e.name == 'AlreadyExists':
raise self.AlreadyExists
def delete(self, id, callback=None, callback_data=None):
return self.cmd('delete', (id, ), callback, callback_data)
def add_callback(self, cb, data):
self.callbacks.setdefault(self.msgid, []).append((cb, data))
def flush_rpc(self, counter=None, backlog=0):
counter = counter or self.notifier.enabled
while counter > backlog:
if self.channel.closed:
raise Exception('Connection closed')
elif self.odata and self.channel.send_ready():
n = self.channel.send(self.odata)
if n > 0:
self.odata = self.odata[n:]
elif self.channel.recv_stderr_ready():
print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE)
elif self.channel.recv_ready():
self.unpacker.feed(self.channel.recv(BUFSIZE))
for type, msgid, error, res in self.unpacker:
self.notifier.enabled.dec()
for c, d in self.callbacks.pop(msgid, []):
c(res, error, d)
if msgid == self.msgid:
return
else:
self.wait(self.odata)
def delete(self, id, wait=True):
return self.call('delete', (id, ), wait=wait)

View File

@ -10,7 +10,7 @@ import unittest
from zlib import crc32
from .hashindex import NSIndex
from .helpers import IntegrityError, deferrable, read_msgpack, write_msgpack
from .helpers import IntegrityError, read_msgpack, write_msgpack
from .lrucache import LRUCache
MAX_OBJECT_SIZE = 20 * 1024 * 1024
@ -194,7 +194,6 @@ class Store(object):
self.recover(self.path)
self.open_index(self.io.head, read_only=True)
@deferrable
def get(self, id):
try:
segment, offset = self.index[id]
@ -202,8 +201,11 @@ class Store(object):
except KeyError:
raise self.DoesNotExist
@deferrable
def put(self, id, data):
def get_many(self, ids):
for id in ids:
yield self.get(id)
def put(self, id, data, wait=True):
if not self._active_txn:
self._active_txn = True
self.open_index(self.io.head)
@ -219,8 +221,7 @@ class Store(object):
self.segments[segment] += 1
self.index[id] = segment, offset
@deferrable
def delete(self, id):
def delete(self, id, wait=True):
if not self._active_txn:
self._active_txn = True
self.open_index(self.io.head)
@ -232,9 +233,6 @@ class Store(object):
except KeyError:
raise self.DoesNotExist
def flush_rpc(self, *args):
pass
def add_callback(self, cb, data):
cb(None, None, data)