From 575efb5e0b49e5366e235ee122ccee83942862f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Borgstr=C3=B6m?= Date: Tue, 23 Feb 2010 21:34:28 +0100 Subject: [PATCH] Switch from Repository to Store. --- dedupstore/archiver.py | 123 ++++++++++++--------- dedupstore/repository.py | 223 --------------------------------------- dedupstore/store.py | 79 +++++++------- 3 files changed, 116 insertions(+), 309 deletions(-) delete mode 100644 dedupstore/repository.py diff --git a/dedupstore/archiver.py b/dedupstore/archiver.py index aef0e60db..cdaa697eb 100644 --- a/dedupstore/archiver.py +++ b/dedupstore/archiver.py @@ -3,7 +3,7 @@ import sys import hashlib import zlib import cPickle -from repository import Repository +from store import Store CHUNKSIZE = 256 * 1024 @@ -11,21 +11,19 @@ CHUNKSIZE = 256 * 1024 class Cache(object): """Client Side cache """ - def __init__(self, path, repo): - self.repo = repo + def __init__(self, path, store): + self.store = store self.path = path - self.chunkmap = {} - self.archives = [] - self.tid = -1 + self.tid = 'unknown' self.open() - if self.tid != self.repo.tid: - print self.tid, self.repo.tid + if self.tid != self.store.tid: + print self.tid.encode('hex'), self.store.tid.encode('hex') self.create() def open(self): - if self.repo.tid == 0: + if self.store.tid == '': return - filename = os.path.join(self.path, '%s.cache' % self.repo.uuid) + filename = os.path.join(self.path, '%s.cache' % self.store.uuid) if not os.path.exists(filename): return print 'Reading cache: ', filename, '...' @@ -35,59 +33,75 @@ class Cache(object): self.archives = data['archives'] print 'done' + def update_manifest(self): + print 'old manifest', self.tid.encode('hex') + if self.tid: + self.chunk_decref(self.tid) + manifest = {'archives': self.archives.values()} + hash = self.add_chunk(zlib.compress(cPickle.dumps(manifest))) + print 'new manifest', hash.encode('hex') + self.store.commit(hash) + def create(self): + self.archives = {} + self.chunkmap = {} + self.tid = self.store.tid + if self.store.tid == '': + return print 'Recreating cache...' - for archive in self.repo.listdir('archives'): - self.archives.append(archive) - data = self.repo.get_file(os.path.join('archives', archive)) - a = cPickle.loads(zlib.decompress(data)) - for item in a['items']: + self.chunk_incref(self.store.tid) + manifest = cPickle.loads(zlib.decompress(self.store.get(self.store.tid))) + for hash in manifest['archives']: + self.chunk_incref(hash) + archive = cPickle.loads(zlib.decompress(self.store.get(hash))) + self.archives[archive['name']] = hash + for item in archive['items']: if item['type'] == 'FILE': for c in item['chunks']: self.chunk_incref(c) - self.tid = self.repo.tid print 'done' def save(self): - assert self.repo.state == Repository.OPEN - print 'saving',self.tid, self.repo.tid - data = {'chunkmap': self.chunkmap, 'tid': self.repo.tid, 'archives': self.archives} - filename = os.path.join(self.path, '%s.cache' % self.repo.uuid) + assert self.store.state == Store.OPEN + print 'saving cache' + data = {'chunkmap': self.chunkmap, 'tid': self.store.tid, 'archives': self.archives} + filename = os.path.join(self.path, '%s.cache' % self.store.uuid) print 'Saving cache as:', filename with open(filename, 'wb') as fd: fd.write(zlib.compress(cPickle.dumps(data))) print 'done' - def chunk_filename(self, sha): - hex = sha.encode('hex') - return 'chunks/%s/%s/%s' % (hex[:2], hex[2:4], hex[4:]) - def add_chunk(self, data): - sha = hashlib.sha1(data).digest() - if not self.seen_chunk(sha): - self.repo.put_file(self.chunk_filename(sha), data) + hash = hashlib.sha1(data).digest() + if not self.seen_chunk(hash): + self.store.put(data, hash) else: - print 'seen chunk', sha.encode('hex') - self.chunk_incref(sha) - return sha + print 'seen chunk', hash.encode('hex') + self.chunk_incref(hash) + return hash - def seen_chunk(self, sha): - return self.chunkmap.get(sha, 0) > 0 + def seen_chunk(self, hash): + return self.chunkmap.get(hash, 0) > 0 - def chunk_incref(self, sha): - self.chunkmap.setdefault(sha, 0) - self.chunkmap[sha] += 1 + def chunk_incref(self, hash): + self.chunkmap.setdefault(hash, 0) + self.chunkmap[hash] += 1 + + def chunk_decref(self, hash): + count = self.chunkmap.get(hash, 0) - 1 + assert count >= 0 + self.chunkmap[hash] = count + if not count: + print 'deleting chunk: ', hash.encode('hex') + self.store.delete(hash) + return count - def chunk_decref(self, sha): - assert self.chunkmap.get(sha, 0) > 0 - self.chunkmap[sha] -= 1 - return self.chunkmap[sha] class Archiver(object): def __init__(self): - self.repo = Repository('/tmp/repo') - self.cache = Cache('/tmp/cache', self.repo) + self.store = Store('/tmp/store') + self.cache = Cache('/tmp/cache', self.store) def create_archive(self, archive_name, path): if archive_name in self.cache.archives: @@ -101,11 +115,23 @@ class Archiver(object): name = os.path.join(root, f) items.append(self.process_file(name, self.cache)) archive = {'name': name, 'items': items} - zdata = zlib.compress(cPickle.dumps(archive)) - self.repo.put_file(os.path.join('archives', archive_name), zdata) - self.cache.archives.append(archive_name) - print 'Archive file size: %d' % len(zdata) - self.repo.commit() + hash = self.cache.add_chunk(zlib.compress(cPickle.dumps(archive))) + self.cache.archives[archive_name] = hash + self.cache.update_manifest() + self.cache.save() + + def delete_archive(self, archive_name): + hash = self.cache.archives.get(archive_name) + if not hash: + raise Exception('Archive "%s" does not exist' % archive_name) + archive = cPickle.loads(zlib.decompress(self.store.get(hash))) + self.cache.chunk_decref(hash) + for item in archive['items']: + if item['type'] == 'FILE': + for c in item['chunks']: + self.cache.chunk_decref(c) + del self.cache.archives[archive_name] + self.cache.update_manifest() self.cache.save() def process_dir(self, path, cache): @@ -128,7 +154,10 @@ class Archiver(object): def main(): archiver = Archiver() - archiver.create_archive(sys.argv[1], sys.argv[2]) + if sys.argv[1] == 'delete': + archiver.delete_archive(sys.argv[2]) + else: + archiver.create_archive(sys.argv[1], sys.argv[2]) if __name__ == '__main__': main() \ No newline at end of file diff --git a/dedupstore/repository.py b/dedupstore/repository.py deleted file mode 100644 index 9e996d92a..000000000 --- a/dedupstore/repository.py +++ /dev/null @@ -1,223 +0,0 @@ -#!/usr/bin/env python -import fcntl -import tempfile -import logging -import os -import posixpath -import shutil -import unittest -import uuid - -log = logging.getLogger('') - - -class Repository(object): - """ - """ - IDLE = 'Idle' - OPEN = 'Open' - ACTIVE = 'Active' - VERSION = 'DEDUPSTORE REPOSITORY VERSION 1' - - def __init__(self, path): - self.tid = -1 - self.state = Repository.IDLE - if not os.path.exists(path): - self.create(path) - self.open(path) - - def create(self, path): - log.info('Initializing Repository at "%s"' % path) - os.mkdir(path) - open(os.path.join(path, 'VERSION'), 'wb').write(self.VERSION) - open(os.path.join(path, 'uuid'), 'wb').write(str(uuid.uuid4())) - open(os.path.join(path, 'tid'), 'wb').write('0') - os.mkdir(os.path.join(path, 'data')) - - def open(self, path): - self.path = path - if not os.path.isdir(path): - raise Exception('%s Does not look like a repository') - version_path = os.path.join(path, 'version') - if not os.path.exists(version_path) or open(version_path, 'rb').read() != self.VERSION: - raise Exception('%s Does not look like a repository2') - self.uuid = open(os.path.join(path, 'uuid'), 'rb').read() - self.lock_fd = open(os.path.join(path, 'lock'), 'w') - fcntl.flock(self.lock_fd, fcntl.LOCK_EX) - self.tid = int(open(os.path.join(path, 'tid'), 'r').read()) - self.recover() - - def recover(self): - if os.path.exists(os.path.join(self.path, 'txn-active')): - self.rollback() - if os.path.exists(os.path.join(self.path, 'txn-commit')): - self.apply_txn() - if os.path.exists(os.path.join(self.path, 'txn-applied')): - shutil.rmtree(os.path.join(self.path, 'txn-applied')) - self.state = Repository.OPEN - - def close(self): - self.recover() - self.lock_fd.close() - self.state = Repository.IDLE - - def commit(self): - """ - """ - if self.state == Repository.OPEN: - return - assert self.state == Repository.ACTIVE - remove_fd = open(os.path.join(self.path, 'txn-active', 'remove'), 'wb') - remove_fd.write('\n'.join(self.txn_removed)) - remove_fd.close() - add_fd = open(os.path.join(self.path, 'txn-active', 'add_index'), 'wb') - add_fd.write('\n'.join(self.txn_added)) - add_fd.close() - tid_fd = open(os.path.join(self.path, 'txn-active', 'tid'), 'wb') - tid_fd.write(str(self.tid + 1)) - tid_fd.close() - os.rename(os.path.join(self.path, 'txn-active'), - os.path.join(self.path, 'txn-commit')) - self.apply_txn() - - def apply_txn(self): - assert os.path.isdir(os.path.join(self.path, 'txn-commit')) - tid = int(open(os.path.join(self.path, 'txn-commit', 'tid'), 'rb').read()) - assert tid >= self.tid - remove_list = [line.strip() for line in - open(os.path.join(self.path, 'txn-commit', 'remove'), 'rb').readlines()] - for name in remove_list: - path = os.path.join(self.path, 'data', name) - os.unlink(path) - add_list = [line.strip() for line in - open(os.path.join(self.path, 'txn-commit', 'add_index'), 'rb').readlines()] - for name in add_list: - destname = os.path.join(self.path, 'data', name) - if not os.path.exists(os.path.dirname(destname)): - os.makedirs(os.path.dirname(destname)) - shutil.move(os.path.join(self.path, 'txn-commit', 'add', name), destname) - tid_fd = open(os.path.join(self.path, 'tid'), 'wb') - tid_fd.write(str(tid)) - tid_fd.close() - os.rename(os.path.join(self.path, 'txn-commit'), - os.path.join(self.path, 'txn-applied')) - shutil.rmtree(os.path.join(self.path, 'txn-applied')) - self.tid = tid - self.state = Repository.OPEN - - def rollback(self): - """ - """ - txn_path = os.path.join(self.path, 'txn-active') - if os.path.exists(txn_path): - shutil.rmtree(txn_path) - self.state = Repository.OPEN - - def prepare_txn(self): - if self.state == Repository.ACTIVE: - return os.path.join(self.path, 'txn-active') - elif self.state == Repository.OPEN: - os.mkdir(os.path.join(self.path, 'txn-active')) - os.mkdir(os.path.join(self.path, 'txn-active', 'add')) - self.txn_removed = [] - self.txn_added = [] - self.state = Repository.ACTIVE - - def get_file(self, path): - """ - """ - if os.path.exists(os.path.join(self.path, 'txn-active', 'add', path)): - return open(os.path.join(self.path, 'txn-active', 'add', path), 'rb').read() - elif os.path.exists(os.path.join(self.path, 'data', path)): - return open(os.path.join(self.path, 'data', path), 'rb').read() - else: - raise Exception('FileNotFound: %s' % path) - - def put_file(self, path, data): - """ - """ - self.prepare_txn() - if (path in self.txn_added or - (path not in self.txn_removed and os.path.exists(os.path.join(self.path, 'data', path)))): - raise Exception('FileAlreadyExists: %s' % path) - if path in self.txn_removed: - self.txn_removed.remove(path) - if path not in self.txn_added: - self.txn_added.append(path) - filename = os.path.join(self.path, 'txn-active', 'add', path) - if not os.path.exists(os.path.dirname(filename)): - os.makedirs(os.path.dirname(filename)) - fd = open(filename, 'wb') - try: - fd.write(data) - finally: - fd.close() - - def delete(self, path): - """ - """ - self.prepare_txn() - if os.path.exists(os.path.join(self.path, 'txn-active', 'add', path)): - os.unlink(os.path.join(self.path, 'txn-active', 'add', path)) - elif os.path.exists(os.path.join(self.path, 'data', path)): - self.txn_removed.append(path) - else: - raise Exception('FileNotFound: %s' % path) - - def listdir(self, path): - """ - """ - entries = set(os.listdir(os.path.join(self.path, 'data', path))) - if self.state == Repository.ACTIVE: - txn_entries = set(os.listdir(os.path.join(self.path, 'txn-active', 'add', path))) - entries = entries.union(txn_entries) - for e in entries: - if posixpath.join(path, e) in self.txn_removed: - entries.remove(e) - return list(entries) - - def mkdir(self, path): - """ - """ - - def rmdir(self, path): - """ - """ - -class RepositoryTestCase(unittest.TestCase): - - def setUp(self): - self.tmppath = tempfile.mkdtemp() - self.repo = Repository(os.path.join(self.tmppath, 'repo')) - - def tearDown(self): - shutil.rmtree(self.tmppath) - - def test1(self): - self.assertEqual(self.repo.tid, 0) - self.assertEqual(self.repo.state, Repository.OPEN) - self.assertEqual(self.repo.listdir(''), []) - self.repo.put_file('foo', 'SOMEDATA') - self.assertRaises(Exception, lambda: self.repo.put_file('foo', 'SOMETHINGELSE')) - self.assertEqual(self.repo.get_file('foo'), 'SOMEDATA') - self.assertEqual(self.repo.listdir(''), ['foo']) - self.repo.rollback() - self.assertEqual(self.repo.listdir(''), []) - - def test2(self): - self.repo.put_file('foo', 'SOMEDATA') - self.repo.put_file('bar', 'SOMEDATAbar') - self.assertEqual(self.repo.listdir(''), ['foo', 'bar']) - self.assertEqual(self.repo.get_file('foo'), 'SOMEDATA') - self.repo.delete('foo') - self.assertRaises(Exception, lambda: self.repo.get_file('foo')) - self.assertEqual(self.repo.listdir(''), ['bar']) - self.assertEqual(self.repo.state, Repository.ACTIVE) - self.assertEqual(os.path.exists(os.path.join(self.tmppath, 'repo', 'data', 'bar')), False) - self.repo.commit() - self.assertEqual(os.path.exists(os.path.join(self.tmppath, 'repo', 'data', 'bar')), True) - self.assertEqual(self.repo.listdir(''), ['bar']) - self.assertEqual(self.repo.state, Repository.IDLE) - -if __name__ == '__main__': - unittest.main() diff --git a/dedupstore/store.py b/dedupstore/store.py index db09b4906..69f9db4fc 100644 --- a/dedupstore/store.py +++ b/dedupstore/store.py @@ -17,7 +17,7 @@ class Store(object): VERSION = 'DEDUPSTORE VERSION 1' def __init__(self, path): - self.tid = -1 + self.tid = '' self.state = Store.IDLE if not os.path.exists(path): self.create(path) @@ -27,7 +27,7 @@ class Store(object): os.mkdir(path) open(os.path.join(path, 'version'), 'wb').write(self.VERSION) open(os.path.join(path, 'uuid'), 'wb').write(str(uuid.uuid4())) - open(os.path.join(path, 'tid'), 'wb').write('0') + open(os.path.join(path, 'tid'), 'wb').write('') os.mkdir(os.path.join(path, 'data')) def open(self, path): @@ -40,7 +40,7 @@ class Store(object): self.uuid = open(os.path.join(path, 'uuid'), 'rb').read() self.lock_fd = open(os.path.join(path, 'lock'), 'w') fcntl.flock(self.lock_fd, fcntl.LOCK_EX) - self.tid = int(open(os.path.join(path, 'tid'), 'r').read()) + self.tid = open(os.path.join(path, 'tid'), 'r').read() self.recover() def recover(self): @@ -59,7 +59,7 @@ class Store(object): self.lock_fd.close() self.state = Store.IDLE - def commit(self): + def commit(self, tid): """ """ if self.state == Store.OPEN: @@ -70,15 +70,14 @@ class Store(object): with open(os.path.join(self.path, 'txn-active', 'write_index'), 'wb') as fd: fd.write('\n'.join(self.txn_write)) with open(os.path.join(self.path, 'txn-active', 'tid'), 'wb') as fd: - fd.write(str(self.tid + 1)) + fd.write(tid) os.rename(os.path.join(self.path, 'txn-active'), os.path.join(self.path, 'txn-commit')) self.recover() def apply_txn(self): assert os.path.isdir(os.path.join(self.path, 'txn-commit')) - tid = int(open(os.path.join(self.path, 'txn-commit', 'tid'), 'rb').read()) - assert tid == self.tid + 1 + tid = open(os.path.join(self.path, 'txn-commit', 'tid'), 'rb').read() delete_list = [line.strip() for line in open(os.path.join(self.path, 'txn-commit', 'delete_index'), 'rb').readlines()] for name in delete_list: @@ -92,7 +91,7 @@ class Store(object): os.makedirs(os.path.dirname(destname)) os.rename(os.path.join(self.path, 'txn-commit', 'write', name), destname) with open(os.path.join(self.path, 'tid'), 'wb') as fd: - fd.write(str(tid)) + fd.write(tid) os.rename(os.path.join(self.path, 'txn-commit'), os.path.join(self.path, 'txn-applied')) shutil.rmtree(os.path.join(self.path, 'txn-applied')) @@ -111,50 +110,50 @@ class Store(object): os.mkdir(os.path.join(self.path, 'txn-active', 'write')) self.state = Store.ACTIVE - def _filename(self, sha, base=''): - hex = sha.encode('hex') + def _filename(self, hash, base=''): + hex = hash.encode('hex') return os.path.join(base, hex[:2], hex[2:4], hex[4:]) - def get(self, sha): + def get(self, hash): """ """ - path = self._filename(sha) + path = self._filename(hash) if path in self.txn_write: filename = os.path.join(self.path, 'txn-active', 'write', path) return open(filename, 'rb').read() - filename = self._filename(sha, os.path.join(self.path, 'objects')) + filename = self._filename(hash, os.path.join(self.path, 'objects')) if os.path.exists(filename): return open(filename, 'rb').read() else: - raise Exception('Object %s does not exist' % sha.encode('hex')) + raise Exception('Object %s does not exist' % hash.encode('hex')) - def put(self, data, sha=None): + def put(self, data, hash=None): """ """ - if not sha: - sha = hashlib.sha1(data).digest() + if not hash: + hash = hashlib.sha1(data).digest() self.prepare_txn() - path = self._filename(sha) - filename = self._filename(sha, os.path.join(self.path, 'objects')) + path = self._filename(hash) + filename = self._filename(hash, os.path.join(self.path, 'objects')) if (path in self.txn_write or (path not in self.txn_delete and os.path.exists(filename))): - raise Exception('Object already exists: %s' % sha.encode('hex')) + raise Exception('Object already exists: %s' % hash.encode('hex')) if path in self.txn_delete: self.txn_delete.remove(path) if path not in self.txn_write: self.txn_write.append(path) - filename = self._filename(sha, os.path.join(self.path, 'txn-active', 'write')) + filename = self._filename(hash, os.path.join(self.path, 'txn-active', 'write')) if not os.path.exists(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) with open(filename, 'wb') as fd: fd.write(data) - return sha + return hash - def delete(self, sha): + def delete(self, hash): """ """ self.prepare_txn() - path = self._filename(sha) + path = self._filename(hash) if path in self.txn_write: self.txn_write.remove(path) os.unlink(filename) @@ -163,7 +162,7 @@ class Store(object): if os.path.exists(filename): self.txn_delete.append(path) else: - raise Exception('Object does not exist: %s' % sha.encode('hex')) + raise Exception('Object does not exist: %s' % hash.encode('hex')) class StoreTestCase(unittest.TestCase): @@ -176,31 +175,33 @@ class StoreTestCase(unittest.TestCase): shutil.rmtree(self.tmppath) def test1(self): - self.assertEqual(self.store.tid, 0) + self.assertEqual(self.store.tid, '') self.assertEqual(self.store.state, Store.OPEN) - SOMEDATA_sha = self.store.put('SOMEDATA') + SOMEDATA_hash = self.store.put('SOMEDATA') self.assertRaises(Exception, lambda: self.store.put('SOMEDATA')) - self.assertEqual(self.store.get(SOMEDATA_sha), 'SOMEDATA') + self.assertEqual(self.store.get(SOMEDATA_hash), 'SOMEDATA') self.store.rollback() self.assertRaises(Exception, lambda: self.store.get('SOMEDATA')) - self.assertEqual(self.store.tid, 0) + self.assertEqual(self.store.tid, '') def test2(self): - self.assertEqual(self.store.tid, 0) + self.assertEqual(self.store.tid, '') self.assertEqual(self.store.state, Store.OPEN) - SOMEDATA_sha = self.store.put('SOMEDATA') - self.assertEqual(self.store.get(SOMEDATA_sha), 'SOMEDATA') - self.store.commit() - self.assertEqual(self.store.get(SOMEDATA_sha), 'SOMEDATA') - self.assertEqual(self.store.tid, 1) - self.store.delete(SOMEDATA_sha) + SOMEDATA_hash = self.store.put('SOMEDATA') + self.assertEqual(self.store.get(SOMEDATA_hash), 'SOMEDATA') + self.store.commit(SOMEDATA_hash) + self.assertEqual(self.store.tid, SOMEDATA_hash) + self.assertEqual(self.store.get(SOMEDATA_hash), 'SOMEDATA') + self.store.delete(SOMEDATA_hash) self.assertRaises(Exception, lambda: self.store.get('SOMEDATA')) self.store.rollback() - self.assertEqual(self.store.get(SOMEDATA_sha), 'SOMEDATA') - self.store.delete(SOMEDATA_sha) + self.assertEqual(self.store.get(SOMEDATA_hash), 'SOMEDATA') + self.store.delete(SOMEDATA_hash) self.assertRaises(Exception, lambda: self.store.get('SOMEDATA')) - self.store.commit() + self.store.commit('Something Else') + self.assertEqual(self.store.tid, 'Something Else') self.assertRaises(Exception, lambda: self.store.get('SOMEDATA')) + if __name__ == '__main__': unittest.main()