From 6eb65d07f976b7bd86174cf44fd73243db8adc5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Borgstr=C3=B6m?= Date: Sun, 17 Oct 2010 17:44:41 +0200 Subject: [PATCH] Switched to using avro serialization for archives --- dedupestore/__init__.py | 39 ++++++++++++++++++++++++++++++++++- dedupestore/archiver.py | 45 ++++++++++++++++++++++++++++------------- dedupestore/cache.py | 37 +++++++++++++++++++-------------- 3 files changed, 91 insertions(+), 30 deletions(-) diff --git a/dedupestore/__init__.py b/dedupestore/__init__.py index ae23f852c..7653d0051 100644 --- a/dedupestore/__init__.py +++ b/dedupestore/__init__.py @@ -1 +1,38 @@ -# This is a python package \ No newline at end of file +# This is a python package + +ARCHIVE_SCHEMA = """ +{ + "name": "Archive", + "type": "record", + "fields" : [ + { "name": "name", "type": "string" }, + { "name": "ts", "type": "string" }, + { "name": "chunks", "type": { "type": "array", "items": + { "type": "record", + "name": "Chunk", + "fields": [ + { "name": "id", "type": {"type": "fixed", "size": 32, "name": "sha256" }}, + { "name": "size", "type": "int" } + ] + } + }}, + { "name": "items", "type": {"type": "array", "items": + { + "type": "record", + "name": "Item", + "fields": [ + { "name": "type", "type": + { "name": "ItemType", "type": "enum", "symbols": ["FILE", "DIRECTORY"] } }, + { "name": "path", "type": "string" }, + { "name": "size", "type": ["null", "long"] }, + { "name": "chunks", "type": ["null", + { "type": "array", "items": "int" } + ]} + ] + } + }} + ] +} +""" +from avro import schema +archive_schema = schema.parse(ARCHIVE_SCHEMA) diff --git a/dedupestore/archiver.py b/dedupestore/archiver.py index dd5d53ba0..9e5b59585 100644 --- a/dedupestore/archiver.py +++ b/dedupestore/archiver.py @@ -2,10 +2,14 @@ import os import hashlib import logging import zlib -import cPickle import argparse import sys +from cStringIO import StringIO +from datetime import datetime +from avro import io + +from dedupestore import archive_schema from chunkifier import chunkify from cache import Cache, NS_ARCHIVES, NS_CHUNKS from bandstore import BandStore @@ -41,26 +45,39 @@ class Archive(object): data = self.store.get(NS_ARCHIVES, id) if hashlib.sha256(data).digest() != id: raise Exception('Archive hash did not match') - archive = cPickle.loads(zlib.decompress(data)) + buffer = StringIO(zlib.decompress(data)) + reader = io.DatumReader(archive_schema) + decoder = io.BinaryDecoder(buffer) + archive = reader.read(decoder) self.items = archive['items'] self.name = archive['name'] self.chunks = archive['chunks'] - for i, (id, csize, osize) in enumerate(archive['chunks']): - self.chunk_idx[i] = id + for i, chunk in enumerate(archive['chunks']): + self.chunk_idx[i] = chunk['id'] def save(self, name): - archive = {'name': name, 'items': self.items, 'chunks': self.chunks} - data = zlib.compress(cPickle.dumps(archive)) + archive = { + 'name': name, + 'ts': datetime.utcnow().isoformat(), + 'items': self.items, + 'chunks': self.chunks + } + writer = StringIO() + encoder = io.BinaryEncoder(writer) + datum_writer = io.DatumWriter(archive_schema) + datum_writer.write(archive, encoder) + data = zlib.compress(writer.getvalue()) + print 'archive size: %d' % len(data) self.id = hashlib.sha256(data).digest() self.store.put(NS_ARCHIVES, self.id, data) self.store.commit() - def add_chunk(self, id, csize, osize): + def add_chunk(self, id, size): try: return self.chunk_idx[id] except KeyError: idx = len(self.chunks) - self.chunks.append((id, csize, osize)) + self.chunks.append(dict(id=id, size=size)) self.chunk_idx[id] = idx return idx @@ -77,10 +94,10 @@ class Archive(object): chunk_count.setdefault(id, 0) chunk_count[id] += 1 for id, c in chunk_count.items(): - count, csize, osize = cache.chunkmap[id] - total_csize += csize + count, size = cache.chunkmap[id] + total_csize += size if c == count: - total_usize += csize + total_usize += size return dict(osize=total_osize, csize=total_csize, usize=total_usize) def list(self): @@ -93,7 +110,7 @@ class Archive(object): assert item['path'][0] not in ('/', '\\', ':') path = os.path.join(dest, item['path']) logging.info(path) - if item['type'] == 'DIR': + if item['type'] == 'DIRECTORY': if not os.path.exists(path): os.makedirs(path) if item['type'] == 'FILE': @@ -142,7 +159,7 @@ class Archive(object): if name in cache.archives: raise NameError('Archive already exists') for path in paths: - for root, dirs, files in os.walk(path): + for root, dirs, files in os.walk(unicode(path)): for d in dirs: p = os.path.join(root, d) self.items.append(self.process_dir(p, cache)) @@ -158,7 +175,7 @@ class Archive(object): def process_dir(self, path, cache): path = path.lstrip('/\\:') logging.info(path) - return {'type': 'DIR', 'path': path} + return {'type': 'DIRECTORY', 'path': path} def process_file(self, path, cache): try: diff --git a/dedupestore/cache.py b/dedupestore/cache.py index 1475db54c..5de1b3700 100644 --- a/dedupestore/cache.py +++ b/dedupestore/cache.py @@ -1,8 +1,11 @@ -import cPickle import hashlib import os -import sys import zlib +from avro import io +from cStringIO import StringIO +import cPickle + +from dedupestore import archive_schema NS_ARCHIVES = 'ARCHIVES' NS_CHUNKS = 'CHUNKS' @@ -48,13 +51,17 @@ class Cache(object): data = self.store.get(NS_ARCHIVES, id) if hashlib.sha256(data).digest() != id: raise Exception('Archive hash did not match') - archive = cPickle.loads(zlib.decompress(data)) + + buffer = StringIO(zlib.decompress(data)) + reader = io.DatumReader(archive_schema) + decoder = io.BinaryDecoder(buffer) + archive = reader.read(decoder) self.archives[archive['name']] = id - for id, csize, osize in archive['chunks']: + for id, size in archive['chunks']: if self.seen_chunk(id): self.chunk_incref(id) else: - self.init_chunk(id, csize, osize) + self.init_chunk(id, size) def save(self): assert self.store.state == self.store.OPEN @@ -78,27 +85,27 @@ class Cache(object): data = hashlib.sha256(data).digest() + data csize = len(data) self.store.put(NS_CHUNKS, id, data) - return self.init_chunk(id, csize, osize) + return self.init_chunk(id, csize) - def init_chunk(self, id, csize, osize): - self.chunkmap[id] = (1, csize, osize) - return id, csize, osize + def init_chunk(self, id, size): + self.chunkmap[id] = (1, size) + return id, size def seen_chunk(self, id): - count, csize, osize = self.chunkmap.get(id, (0, 0, 0)) + count, size = self.chunkmap.get(id, (0, 0)) return count def chunk_incref(self, id): - count, csize, osize = self.chunkmap[id] - self.chunkmap[id] = (count + 1, csize, osize) - return id, csize, osize + count, size = self.chunkmap[id] + self.chunkmap[id] = (count + 1, size) + return id, size def chunk_decref(self, id): - count, csize, osize = self.chunkmap[id] + count, size = self.chunkmap[id] if count == 1: del self.chunkmap[id] self.store.delete(NS_CHUNKS, id) else: - self.chunkmap[id] = (count - 1, csize, osize) + self.chunkmap[id] = (count - 1, size)