diff --git a/darc/archive.py b/darc/archive.py index 2614bb728..30722ee2d 100644 --- a/darc/archive.py +++ b/darc/archive.py @@ -6,12 +6,12 @@ import socket import stat import sys -from os.path import dirname +from zlib import crc32 from xattr import xattr, XATTR_NOFOLLOW from . import NS_ARCHIVE_METADATA, NS_CHUNK from ._speedups import chunkify -from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError +from .helpers import uid2user, user2uid, gid2group, group2gid, IntegrityError, Counter CHUNK_SIZE = 64 * 1024 WINDOW_SIZE = 4096 @@ -31,7 +31,6 @@ def __init__(self, store, key, name=None, cache=None): self.cache = cache self.items = '' self.items_refs = [] - self.items_prefix = '' self.items_ids = [] self.hard_links = {} if name: @@ -54,28 +53,33 @@ def ts(self): def iter_items(self, callback): unpacker = msgpack.Unpacker() + counter = Counter(0) def cb(chunk, error, id): + counter.dec() + print len(chunk) data, items_hash = self.key.decrypt(chunk) assert self.key.id_hash(data) == id unpacker.feed(data) for item in unpacker: callback(item) for id, size, csize in self.metadata['items']: + # Limit the number of concurrent items requests to 3 + self.store.flush_rpc(counter, 10) + counter.inc() self.store.get(NS_CHUNK, id, callback=cb, callback_data=id) def add_item(self, item, refs=None): data = msgpack.packb(item) - prefix = dirname(item['path']) - if self.items_prefix and self.items_prefix != prefix: + if crc32(item['path'].encode('utf-8')) % 1000 == 0: self.flush_items() if refs: self.items_refs += refs self.items += data - self.items_prefix = prefix def flush_items(self): if not self.items: return + print 'flush', len(self.items) id = self.key.id_hash(self.items) if self.cache.seen_chunk(id): self.items_ids.append(self.cache.chunk_incref(id)) @@ -85,7 +89,6 @@ def flush_items(self): self.items_ids.append(self.cache.add_chunk(id, self.items)) self.items = '' self.items_refs = [] - self.items_prefix = '' def save(self, name, cache): self.id = self.key.archive_hash(name) @@ -171,27 +174,28 @@ def extract_item(self, item, dest=None, start_cb=None): os.unlink(path) os.link(source, path) else: - def extract_cb(chunk, error, (id, i, last)): - if i==0: + def extract_cb(chunk, error, (id, i)): + if i == 0: + state['fd'] = open(path, 'wb') start_cb(item) assert not error data, hash = self.key.decrypt(chunk) if self.key.id_hash(data) != id: raise IntegrityError('chunk hash did not match') - fd.write(data) - if last: - fd.close() + state['fd'].write(data) + if i == n - 1: + state['fd'].close() self.restore_attrs(path, item) - - fd = open(path, 'wb') + state = {} n = len(item['chunks']) + ## 0 chunks indicates an empty (0 bytes) file if n == 0: + open(path, 'wb').close() start_cb(item) self.restore_attrs(path, item) - fd.close() else: for i, (id, size, csize) in enumerate(item['chunks']): - self.store.get(NS_CHUNK, id, callback=extract_cb, callback_data=(id, i, i==n-1)) + self.store.get(NS_CHUNK, id, callback=extract_cb, callback_data=(id, i)) else: raise Exception('Unknown archive item type %r' % item['mode']) diff --git a/darc/helpers.py b/darc/helpers.py index fa299f5f4..e254b73e2 100644 --- a/darc/helpers.py +++ b/darc/helpers.py @@ -10,6 +10,25 @@ import struct import time +class Counter(object): + + __slots__ = ('v',) + + def __init__(self, value=0): + self.v = value + + def inc(self, amount=1): + self.v += amount + + def dec(self, amount=1): + self.v -= amount + + def __cmp__(self, x): + return cmp(self.v, x) + + def __repr__(self): + return '' % self.v + def deferrable(f): def wrapper(*args, **kw): diff --git a/darc/remote.py b/darc/remote.py index fcd4ee45f..998f5aabe 100644 --- a/darc/remote.py +++ b/darc/remote.py @@ -7,6 +7,7 @@ import getpass from .store import Store +from .helpers import Counter BUFSIZE = 1024 * 1024 @@ -16,10 +17,10 @@ class ChannelNotifyer(object): def __init__(self, channel): self.channel = channel - self.enabled = 0 + self.enabled = Counter() def set(self): - if self.enabled: + if self.enabled > 0: with self.channel.lock: self.channel.out_buffer_cv.notifyAll() @@ -106,6 +107,8 @@ def __init__(self, location, create=False): self.channel.exec_command('darc serve') self.callbacks = {} self.msgid = 0 + self.recursion = 0 + self.odata = '' self.id, self.tid = self.cmd('open', (location.path, create)) def wait(self, write=True): @@ -113,39 +116,46 @@ def wait(self, write=True): if ((not write or self.channel.out_window_size == 0) and len(self.channel.in_buffer._buffer) == 0 and len(self.channel.in_stderr_buffer._buffer) == 0): - self.channel.out_buffer_cv.wait(10) + self.channel.out_buffer_cv.wait(1) def cmd(self, cmd, args, callback=None, callback_data=None): self.msgid += 1 - self.notifier.enabled += 1 - odata = msgpack.packb((0, self.msgid, cmd, args)) + self.notifier.enabled.inc() + self.odata += msgpack.packb((0, self.msgid, cmd, args)) + self.recursion += 1 if callback: self.callbacks[self.msgid] = callback, callback_data + if self.recursion > 1: + self.recursion -= 1 + return while True: if self.channel.closed: + self.recursion -= 1 raise Exception('Connection closed') elif self.channel.recv_stderr_ready(): print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE) elif self.channel.recv_ready(): self.unpacker.feed(self.channel.recv(BUFSIZE)) for type, msgid, error, res in self.unpacker: - self.notifier.enabled -= 1 + self.notifier.enabled.dec() if msgid == self.msgid: if error: raise self.RPCError(error) + self.recursion -= 1 return res else: c, d = self.callbacks.pop(msgid, (None, None)) if c: c(res, error, d) - elif odata and self.channel.send_ready(): - n = self.channel.send(odata) + elif self.odata and self.channel.send_ready(): + n = self.channel.send(self.odata) if n > 0: - odata = odata[n:] - if not odata and callback: + self.odata = self.odata[n:] + if not self.odata and callback: + self.recursion -= 1 return else: - self.wait(odata) + self.wait(self.odata) def commit(self, *args): self.cmd('commit', args) @@ -176,20 +186,26 @@ def delete(self, ns, id, callback=None, callback_data=None): def list(self, *args): return self.cmd('list', args) - def flush_rpc(self): - while True: + def flush_rpc(self, counter=None, backlog=0): + counter = counter or self.notifier.enabled + while counter > backlog: if self.channel.closed: raise Exception('Connection closed') + elif self.odata and self.channel.send_ready(): + n = self.channel.send(self.odata) + if n > 0: + self.odata = self.odata[n:] elif self.channel.recv_stderr_ready(): print >> sys.stderr, 'remote stderr:', self.channel.recv_stderr(BUFSIZE) elif self.channel.recv_ready(): self.unpacker.feed(self.channel.recv(BUFSIZE)) for type, msgid, error, res in self.unpacker: - self.notifier.enabled -= 1 + self.notifier.enabled.dec() c, d = self.callbacks.pop(msgid, (None, None)) if c: c(res, error, d) if msgid == self.msgid: return else: - self.wait() + self.wait(self.odata) +