diff --git a/darc/cache.py b/darc/cache.py index 284eb32a0..7cdd05b93 100644 --- a/darc/cache.py +++ b/darc/cache.py @@ -124,6 +124,22 @@ def rollback(self): def sync(self): """Initializes cache by fetching and reading all archive indicies """ + def cb(chunk, error, id): + assert not error + data, items_hash = self.key.decrypt(chunk) + assert self.key.id_hash(data) == id + unpacker.feed(data) + for item in unpacker: + try: + for id, size, csize in item['chunks']: + try: + count, size, csize = self.chunks[id] + self.chunks[id] = count + 1, size, csize + except KeyError: + self.chunks[id] = 1, size, csize + pass + except KeyError: + pass self.begin_txn() print 'Initializing cache...' self.chunks.clear() @@ -133,25 +149,13 @@ def sync(self): archive = msgpack.unpackb(data) print 'Analyzing archive:', archive['name'] for id, size, csize in archive['items']: - data, hash = self.key.decrypt(self.store.get(NS_CHUNK, id)) - assert self.key.id_hash(data) == id try: count, size, csize = self.chunks[id] self.chunks[id] = count + 1, size, csize except KeyError: self.chunks[id] = 1, size, csize - unpacker.feed(data) - for item in unpacker: - try: - for id, size, csize in item['chunks']: - try: - count, size, csize = self.chunks[id] - self.chunks[id] = count + 1, size, csize - except KeyError: - self.chunks[id] = 1, size, csize - pass - except KeyError: - pass + self.store.get(NS_CHUNK, id, callback=cb, callback_data=id) + self.store.flush_rpc() def add_chunk(self, id, data): if not self.txn_active: