diff --git a/dedupestore/archive.py b/dedupestore/archive.py index ff2cc548f..4a06ec640 100644 --- a/dedupestore/archive.py +++ b/dedupestore/archive.py @@ -1,58 +1,59 @@ from datetime import datetime import hashlib import logging -import msgpack import os import stat import sys import zlib -from .cache import NS_ARCHIVES, NS_CHUNKS +from .cache import NS_ARCHIVES, NS_CHUNKS, NS_CINDEX from .chunkifier import chunkify -from .helpers import uid2user, user2uid, gid2group, group2gid +from .helpers import uid2user, user2uid, gid2group, group2gid, pack, unpack CHUNK_SIZE = 55001 class Archive(object): - def __init__(self, store, cache, name=None): + def __init__(self, store, name=None): self.store = store - self.cache = cache self.items = [] self.chunks = [] self.chunk_idx = {} self.hard_links = {} if name: - self.open(name) + self.load(hashlib.sha256(name).digest()) - def open(self, name): - id = self.cache.archives[name] - data = self.store.get(NS_ARCHIVES, id) - if hashlib.sha256(data).digest() != id: - raise Exception('Archive hash did not match') - archive = msgpack.unpackb(zlib.decompress(data)) - version = archive.get('version') - if version != 1: - raise Exception('Archive version %r not supported' % version) + def load(self, id): + self.id = id + archive = unpack(self.store.get(NS_ARCHIVES, self.id)) + if archive['version'] != 1: + raise Exception('Archive version %r not supported' % archive['version']) self.items = archive['items'] self.name = archive['name'] - self.chunks = archive['chunks'] - for i, chunk in enumerate(archive['chunks']): + cindex = unpack(self.store.get(NS_CINDEX, self.id)) + assert cindex['version'] == 1 + self.chunks = cindex['chunks'] + for i, chunk in enumerate(self.chunks): self.chunk_idx[i] = chunk[0] def save(self, name): + self.id = hashlib.sha256(name).digest() archive = { 'version': 1, 'name': name, - 'cmdline': ' '.join(sys.argv), + 'cmdline': sys.argv, 'ts': datetime.utcnow().isoformat(), 'items': self.items, - 'chunks': self.chunks } - data = zlib.compress(msgpack.packb(archive)) - self.id = hashlib.sha256(data).digest() + _, data = pack(archive) self.store.put(NS_ARCHIVES, self.id, data) + cindex = { + 'version': 1, + 'chunks': self.chunks, + } + _, data = pack(cindex) + self.store.put(NS_CINDEX, self.id, data) self.store.commit() def add_chunk(self, id, size): @@ -71,7 +72,7 @@ class Archive(object): osize += item['size'] for id, size in self.chunks: csize += size - if self.cache.seen_chunk(id) == 1: + if cache.seen_chunk(id) == 1: usize += size return osize, csize, usize @@ -115,13 +116,10 @@ class Archive(object): with open(path, 'wb') as fd: for chunk in item['chunks']: id = self.chunk_idx[chunk] - data = self.store.get(NS_CHUNKS, id) - cid = data[:32] - data = data[32:] - if hashlib.sha256(data).digest() != cid: + try: + fd.write(unpack(self.store.get(NS_CHUNKS, id))) + except ValueError: raise Exception('Invalid chunk checksum') - data = zlib.decompress(data) - fd.write(data) self.restore_stat(path, item) else: raise Exception('Unknown archive item type %r' % item['type']) @@ -146,21 +144,20 @@ class Archive(object): item['path'] = item['path'].decode('utf-8') for chunk in item['chunks']: id = self.chunk_idx[chunk] - data = self.store.get(NS_CHUNKS, id) - cid = data[:32] - data = data[32:] - if (hashlib.sha256(data).digest() != cid): + try: + unpack(self.store.get(NS_CHUNKS, id)) + except ValueError: logging.error('%s ... ERROR', item['path']) break - else: - logging.info('%s ... OK', item['path']) + else: + logging.info('%s ... OK', item['path']) def delete(self, cache): - self.store.delete(NS_ARCHIVES, self.cache.archives[self.name]) + self.store.delete(NS_ARCHIVES, self.id) + self.store.delete(NS_CINDEX, self.id) for id, size in self.chunks: cache.chunk_decref(id) self.store.commit() - del cache.archives[self.name] cache.save() def _walk(self, path): @@ -172,7 +169,11 @@ class Archive(object): yield x def create(self, name, paths, cache): - if name in cache.archives: + try: + self.store.get(NS_ARCHIVES, name) + except self.store.DoesNotExist: + pass + else: raise NameError('Archive already exists') for path in paths: for path, st in self._walk(unicode(path)): @@ -181,11 +182,10 @@ class Archive(object): elif stat.S_ISLNK(st.st_mode): self.process_symlink(path, st) elif stat.S_ISREG(st.st_mode): - self.process_file(path, st) + self.process_file(path, st, cache) else: logging.error('Unknown file type: %s', path) self.save(name) - cache.archives[name] = self.id cache.save() def process_dir(self, path, st): @@ -210,7 +210,7 @@ class Archive(object): 'gid': st.st_gid, 'group': gid2group(st.st_gid), 'ctime': st.st_ctime, 'mtime': st.st_mtime, }) - def process_file(self, path, st): + def process_file(self, path, st, cache): safe_path = path.lstrip('/\\:') if st.st_nlink > 1: source = self.hard_links.get((st.st_ino, st.st_dev)) @@ -231,7 +231,7 @@ class Archive(object): chunks = [] size = 0 for chunk in chunkify(fd, CHUNK_SIZE, 30): - chunks.append(self.process_chunk(chunk)) + chunks.append(self.process_chunk(chunk, cache)) size += len(chunk) self.items.append({ 'type': 'FILE', 'path': safe_path, 'chunks': chunks, 'size': size, @@ -241,14 +241,20 @@ class Archive(object): 'ctime': st.st_ctime, 'mtime': st.st_mtime, }) - def process_chunk(self, data): + def process_chunk(self, data, cache): id = hashlib.sha256(data).digest() try: return self.chunk_idx[id] except KeyError: idx = len(self.chunks) - size = self.cache.add_chunk(id, data) + size = cache.add_chunk(id, data) self.chunks.append((id, size)) self.chunk_idx[id] = idx return idx + @staticmethod + def list_archives(store): + for id in store.list(NS_ARCHIVES): + archive = Archive(store) + archive.load(id) + yield archive diff --git a/dedupestore/archiver.py b/dedupestore/archiver.py index 77bad7d48..25014412d 100644 --- a/dedupestore/archiver.py +++ b/dedupestore/archiver.py @@ -12,49 +12,51 @@ class Archiver(object): def open_store(self, location): store = BandStore(location.path) - cache = Cache(store) - return store, cache + return store def exit_code_from_logger(self): return 1 if self.level_filter.count.get('ERROR') else 0 def do_create(self, args): - store, cache = self.open_store(args.archive) - archive = Archive(store, cache) + store = self.open_store(args.archive) + cache = Cache(store) + archive = Archive(store) archive.create(args.archive.archive, args.paths, cache) return self.exit_code_from_logger() def do_extract(self, args): - store, cache = self.open_store(args.archive) - archive = Archive(store, cache, args.archive.archive) + store = self.open_store(args.archive) + archive = Archive(store, args.archive.archive) archive.extract(args.dest) return self.exit_code_from_logger() def do_delete(self, args): - store, cache = self.open_store(args.archive) - archive = Archive(store, cache, args.archive.archive) + store = self.open_store(args.archive) + cache = Cache(store) + archive = Archive(store, args.archive.archive) archive.delete(cache) return self.exit_code_from_logger() def do_list(self, args): - store, cache = self.open_store(args.src) + store = self.open_store(args.src) if args.src.archive: - archive = Archive(store, cache, args.src.archive) + archive = Archive(store, args.src.archive) archive.list() else: - for archive in sorted(cache.archives): + for archive in Archive.list_archives(store): print archive return self.exit_code_from_logger() def do_verify(self, args): - store, cache = self.open_store(args.archive) - archive = Archive(store, cache, args.archive.archive) + store = self.open_store(args.archive) + archive = Archive(store, args.archive.archive) archive.verify() return self.exit_code_from_logger() def do_info(self, args): - store, cache = self.open_store(args.archive) - archive = Archive(store, cache, args.archive.archive) + store = self.open_store(args.archive) + cache = Cache(store) + archive = Archive(store, args.archive.archive) osize, csize, usize = archive.stats(cache) print 'Original size:', pretty_size(osize) print 'Compressed size:', pretty_size(csize) diff --git a/dedupestore/cache.py b/dedupestore/cache.py index 1debb260f..36b0a8da1 100644 --- a/dedupestore/cache.py +++ b/dedupestore/cache.py @@ -4,8 +4,11 @@ import msgpack import os import zlib +from .helpers import pack, unpack + NS_ARCHIVES = 'A' NS_CHUNKS = 'C' +NS_CINDEX = 'I' class Cache(object): @@ -37,25 +40,19 @@ class Cache(object): if data['store'] != self.store.uuid: raise Exception('Cache UUID mismatch') self.chunkmap = data['chunkmap'] - self.archives = data['archives'] self.tid = data['tid'] def init(self): """Initializes cache by fetching and reading all archive indicies """ - logging.info('Initialzing cache...') + logging.info('Initializing cache...') self.chunkmap = {} - self.archives = {} self.tid = self.store.tid if self.store.tid == 0: return - for id in list(self.store.list(NS_ARCHIVES)): - data = self.store.get(NS_ARCHIVES, id) - if hashlib.sha256(data).digest() != id: - raise Exception('Archive hash did not match') - archive = msgpack.unpackb(zlib.decompress(data)) - self.archives[archive['name']] = id - for id, size in archive['chunks']: + for id in list(self.store.list(NS_CINDEX)): + cindex = unpack(self.store.get(NS_CINDEX, id)) + for id, size in cindex['chunks']: try: count, size = self.chunkmap[id] self.chunkmap[id] = count + 1, size @@ -68,7 +65,8 @@ class Cache(object): data = {'version': 1, 'store': self.store.uuid, 'chunkmap': self.chunkmap, - 'tid': self.store.tid, 'archives': self.archives} + 'tid': self.store.tid, + } cachedir = os.path.dirname(self.path) if not os.path.exists(cachedir): os.makedirs(cachedir) @@ -80,8 +78,7 @@ class Cache(object): def add_chunk(self, id, data): if self.seen_chunk(id): return self.chunk_incref(id) - data = zlib.compress(data) - data = hashlib.sha256(data).digest() + data + _, data = pack(data) csize = len(data) self.store.put(NS_CHUNKS, id, data) self.chunkmap[id] = (1, csize) diff --git a/dedupestore/helpers.py b/dedupestore/helpers.py index a639b30fd..3ae98c0f3 100644 --- a/dedupestore/helpers.py +++ b/dedupestore/helpers.py @@ -1,8 +1,27 @@ -import logging import argparse -import re import grp +import hashlib +import logging +import msgpack import pwd +import re +import zlib + + +def pack(data): + data = zlib.compress(msgpack.packb(data)) + id = hashlib.sha256(data).digest() + tid = 0 + return id, msgpack.packb((1, tid, id, data)) + + +def unpack(data): + version, tid, id, data = msgpack.unpackb(data) + assert version == 1 + if hashlib.sha256(data).digest() != id: + raise ValueError + return msgpack.unpackb(zlib.decompress(data)) + def memoize(function): cache = {}