diff --git a/darc/archive.py b/darc/archive.py index 394714cac..33d7b1ee4 100644 --- a/darc/archive.py +++ b/darc/archive.py @@ -1,7 +1,7 @@ from __future__ import with_statement from datetime import datetime, timedelta from getpass import getuser -from itertools import izip +from itertools import izip_longest import msgpack import os import socket @@ -29,6 +29,7 @@ def __init__(self, unpacker, filter): self.unpacker = iter(unpacker) self.filter = filter self.stack = [] + self.peeks = 0 self._peek = None self._peek_iter = None global foo @@ -39,9 +40,12 @@ def __iter__(self): def next(self): if self.stack: - return self.stack.pop(0) - self._peek = None - return self.get_next() + item = self.stack.pop(0) + else: + self._peek = None + item = self.get_next() + self.peeks = max(0, self.peeks - len(item.get('chunks', []))) + return item def get_next(self): next = self.unpacker.next() @@ -52,7 +56,7 @@ def get_next(self): def peek(self): while True: while not self._peek or not self._peek_iter: - if len(self.stack) > 100: + if self.peeks > 100: raise StopIteration self._peek = self.get_next() self.stack.append(self._peek) @@ -61,7 +65,9 @@ def peek(self): else: self._peek_iter = None try: - return self._peek_iter.next() + item = self._peek_iter.next() + self.peeks += 1 + return item except StopIteration: self._peek = None @@ -186,25 +192,26 @@ def save(self, name=None): self.cache.commit() def calc_stats(self, cache): + def add(id): + count, size, csize = self.cache.chunks[id] + stats.update(size, csize, count == 1) + self.cache.chunks[id] = count - 1, size, csize # This function is a bit evil since it abuses the cache to calculate # the stats. The cache transaction must be rolled back afterwards unpacker = msgpack.Unpacker() cache.begin_txn() stats = Statistics() - for id in self.metadata['items']: - unpacker.feed(self.key.decrypt(id, self.store.get(id))) + add(self.id) + for id, chunk in izip_longest(self.metadata['items'], self.store.get_many(self.metadata['items'])): + unpacker.feed(self.key.decrypt(id, chunk)) for item in unpacker: try: for id, size, csize in item['chunks']: - count, _, _ = self.cache.chunks[id] - stats.update(size, csize, count == 1) - stats.nfiles += 1 - self.cache.chunks[id] = count - 1, size, csize + add(id) + stats.nfiles += 1 except KeyError: pass - count, size, csize = self.cache.chunks[id] - stats.update(size, csize, count == 1) - self.cache.chunks[id] = count - 1, size, csize + add(id) cache.rollback() return stats @@ -237,7 +244,7 @@ def extract_item(self, item, dest=None, start_cb=None, restore_attrs=True, peek= fd = open(path, 'wb') start_cb(item) ids = [id for id, size, csize in item['chunks']] - for id, chunk in izip(ids, self.store.get_many(ids, peek)): + for id, chunk in izip_longest(ids, self.store.get_many(ids, peek)): data = self.key.decrypt(id, chunk) fd.write(data) fd.close() @@ -296,9 +303,8 @@ def verify_file(self, item, start, result, peek=None): start(item) ids = [id for id, size, csize in item['chunks']] try: - for id, chunk in izip(ids, self.store.get_many(ids, peek)): - if chunk: - self.key.decrypt(id, chunk) + for id, chunk in izip_longest(ids, self.store.get_many(ids, peek)): + self.key.decrypt(id, chunk) except Exception, e: result(item, False) return diff --git a/darc/remote.py b/darc/remote.py index 6064b57ee..90108407a 100644 --- a/darc/remote.py +++ b/darc/remote.py @@ -75,7 +75,7 @@ def __init__(self, location, create=False): self.cache = LRUCache(200) self.to_send = '' self.extra = {} - self.pending_cache = {} + self.pending = {} self.unpacker = msgpack.Unpacker() self.msgid = 0 self.received_msgid = 0 @@ -106,13 +106,10 @@ def _read(self): self.received_msgid = msgid if error: raise self.RPCError(error) - if msgid in self.pending_cache: - args = self.pending_cache.pop(msgid) - self.cache[args] = msgid, res - else: - print 'unknown response' - for args in self.extra.pop(msgid, []): - to_yield.append(self.cache[args][1]) + args = self.pending.pop(msgid) + self.cache[args] = msgid, res + for args, resp in self.extra.pop(msgid, []): + to_yield.append(resp or self.cache[args][1]) for res in to_yield: yield res @@ -127,12 +124,12 @@ def gen_request(self, cmd, argsv): if not args in self.cache: self.msgid += 1 msgid = self.msgid - self.pending_cache[msgid] = args + self.pending[msgid] = args self.cache[args] = msgid, None data.append(msgpack.packb((1, msgid, cmd, args))) msgid, resp = self.cache[args] m = max(m, msgid) - self.extra.setdefault(m, []).append(args) + self.extra.setdefault(m, []).append((args, resp)) return ''.join(data) def gen_cache_requests(self, cmd, peek): @@ -146,7 +143,7 @@ def gen_cache_requests(self, cmd, peek): continue self.msgid += 1 msgid = self.msgid - self.pending_cache[msgid] = args + self.pending[msgid] = args self.cache[args] = msgid, None data.append(msgpack.packb((1, msgid, cmd, args))) return ''.join(data) @@ -156,9 +153,9 @@ def call_multi(self, cmd, argsv, wait=True, peek=None): left = len(argsv) data = self.gen_request(cmd, argsv) self.to_send += data - for args in self.extra.pop(self.received_msgid, []): + for args, resp in self.extra.pop(self.received_msgid, []): left -= 1 - yield self.cache[args][1] + yield resp or self.cache[args][1] while left: r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1) if x: