From 5cbafdfc2091c4c0d8ae792035c142d86a048326 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Borgstr=C3=B6m?= Date: Wed, 24 Feb 2010 20:38:27 +0100 Subject: [PATCH] Switched to a namespace base object store implementation. --- dedupstore/archiver.py | 59 +++++++----------- dedupstore/store.py | 138 +++++++++++++++++++++++++++-------------- 2 files changed, 113 insertions(+), 84 deletions(-) diff --git a/dedupstore/archiver.py b/dedupstore/archiver.py index e1013da3d..799503e63 100644 --- a/dedupstore/archiver.py +++ b/dedupstore/archiver.py @@ -8,7 +8,8 @@ from optparse import OptionParser from store import Store CHUNKSIZE = 256 * 1024 - +NS_ARCHIVES = 'ARCHIVES' +NS_CHUNKS = 'CHUNKS' class Cache(object): """Client Side cache @@ -16,14 +17,14 @@ class Cache(object): def __init__(self, path, store): self.store = store self.path = path - self.tid = 'unknown' + self.tid = -1 self.open() if self.tid != self.store.tid: - print self.tid.encode('hex'), self.store.tid.encode('hex') + print self.tid, self.store.tid self.create() def open(self): - if self.store.tid == '': + if self.store.tid == -1: return filename = os.path.join(self.path, '%s.cache' % self.store.uuid) if not os.path.exists(filename): @@ -32,31 +33,16 @@ class Cache(object): data = cPickle.loads(zlib.decompress(open(filename, 'rb').read())) self.chunkmap = data['chunkmap'] self.tid = data['tid'] - self.archives = data['archives'] print 'done' - def update_manifest(self): - print 'old manifest', self.tid.encode('hex') - if self.tid: - self.chunk_decref(self.tid) - manifest = {'archives': self.archives.values()} - hash = self.add_chunk(zlib.compress(cPickle.dumps(manifest))) - print 'new manifest', hash.encode('hex') - self.store.commit(hash) - def create(self): - self.archives = {} self.chunkmap = {} self.tid = self.store.tid - if self.store.tid == '': + if self.store.tid == 0: return print 'Recreating cache...' - self.chunk_incref(self.store.tid) - manifest = cPickle.loads(zlib.decompress(self.store.get(self.store.tid))) - for hash in manifest['archives']: - self.chunk_incref(hash) - archive = cPickle.loads(zlib.decompress(self.store.get(hash))) - self.archives[archive['name']] = hash + for id in self.store.list(NS_ARCHIVES): + archive = cPickle.loads(zlib.decompress(self.store.get(NS_ARCHIVES, id))) for item in archive['items']: if item['type'] == 'FILE': for c in item['chunks']: @@ -66,7 +52,7 @@ class Cache(object): def save(self): assert self.store.state == Store.OPEN print 'saving cache' - data = {'chunkmap': self.chunkmap, 'tid': self.store.tid, 'archives': self.archives} + data = {'chunkmap': self.chunkmap, 'tid': self.store.tid} filename = os.path.join(self.path, '%s.cache' % self.store.uuid) print 'Saving cache as:', filename with open(filename, 'wb') as fd: @@ -76,7 +62,7 @@ class Cache(object): def add_chunk(self, data): hash = hashlib.sha1(data).digest() if not self.seen_chunk(hash): - self.store.put(data, hash) + self.store.put(NS_CHUNKS, hash, data) else: print 'seen chunk', hash.encode('hex') self.chunk_incref(hash) @@ -95,7 +81,7 @@ class Cache(object): self.chunkmap[hash] = count if not count: print 'deleting chunk: ', hash.encode('hex') - self.store.delete(hash) + self.store.delete(NS_CHUNKS, hash) return count @@ -106,8 +92,8 @@ class Archiver(object): self.cache = Cache('/tmp/cache', self.store) def create_archive(self, archive_name, paths): - if archive_name in self.cache.archives: - raise Exception('Archive "%s" already exists' % archive_name) +# if archive_name in self.cache.archives: +# raise Exception('Archive "%s" already exists' % archive_name) items = [] for path in paths: for root, dirs, files in os.walk(path): @@ -117,24 +103,21 @@ class Archiver(object): for f in files: name = os.path.join(root, f) items.append(self.process_file(name, self.cache)) - archive = {'name': name, 'items': items} - hash = self.cache.add_chunk(zlib.compress(cPickle.dumps(archive))) - self.cache.archives[archive_name] = hash - self.cache.update_manifest() + archive = {'name': archive_name, 'items': items} + hash = self.store.put(NS_ARCHIVES, archive_name, zlib.compress(cPickle.dumps(archive))) + self.store.commit() self.cache.save() def delete_archive(self, archive_name): - hash = self.cache.archives.get(archive_name) - if not hash: - raise Exception('Archive "%s" does not exist' % archive_name) - archive = cPickle.loads(zlib.decompress(self.store.get(hash))) - self.cache.chunk_decref(hash) + archive = cPickle.loads(zlib.decompress(self.store.get(NS_ARCHIVES, archive_name))) + self.store.delete(NS_ARCHIVES, archive_name) +# if not hash: +# raise Exception('Archive "%s" does not exist' % archive_name) for item in archive['items']: if item['type'] == 'FILE': for c in item['chunks']: self.cache.chunk_decref(c) - del self.cache.archives[archive_name] - self.cache.update_manifest() + self.store.commit() self.cache.save() def process_dir(self, path, cache): diff --git a/dedupstore/store.py b/dedupstore/store.py index 69f9db4fc..920f8fab7 100644 --- a/dedupstore/store.py +++ b/dedupstore/store.py @@ -17,7 +17,7 @@ class Store(object): VERSION = 'DEDUPSTORE VERSION 1' def __init__(self, path): - self.tid = '' + self.tid = '-1' self.state = Store.IDLE if not os.path.exists(path): self.create(path) @@ -27,7 +27,7 @@ class Store(object): os.mkdir(path) open(os.path.join(path, 'version'), 'wb').write(self.VERSION) open(os.path.join(path, 'uuid'), 'wb').write(str(uuid.uuid4())) - open(os.path.join(path, 'tid'), 'wb').write('') + open(os.path.join(path, 'tid'), 'wb').write('0') os.mkdir(os.path.join(path, 'data')) def open(self, path): @@ -40,7 +40,7 @@ class Store(object): self.uuid = open(os.path.join(path, 'uuid'), 'rb').read() self.lock_fd = open(os.path.join(path, 'lock'), 'w') fcntl.flock(self.lock_fd, fcntl.LOCK_EX) - self.tid = open(os.path.join(path, 'tid'), 'r').read() + self.tid = int(open(os.path.join(path, 'tid'), 'r').read()) self.recover() def recover(self): @@ -59,7 +59,7 @@ class Store(object): self.lock_fd.close() self.state = Store.IDLE - def commit(self, tid): + def commit(self): """ """ if self.state == Store.OPEN: @@ -70,28 +70,29 @@ class Store(object): with open(os.path.join(self.path, 'txn-active', 'write_index'), 'wb') as fd: fd.write('\n'.join(self.txn_write)) with open(os.path.join(self.path, 'txn-active', 'tid'), 'wb') as fd: - fd.write(tid) + fd.write(str(self.tid + 1)) os.rename(os.path.join(self.path, 'txn-active'), os.path.join(self.path, 'txn-commit')) self.recover() def apply_txn(self): assert os.path.isdir(os.path.join(self.path, 'txn-commit')) - tid = open(os.path.join(self.path, 'txn-commit', 'tid'), 'rb').read() + tid = int(open(os.path.join(self.path, 'txn-commit', 'tid'), 'rb').read()) + assert tid == self.tid + 1 delete_list = [line.strip() for line in open(os.path.join(self.path, 'txn-commit', 'delete_index'), 'rb').readlines()] for name in delete_list: - path = os.path.join(self.path, 'objects', name) + path = os.path.join(self.path, 'data', name) os.unlink(path) write_list = [line.strip() for line in open(os.path.join(self.path, 'txn-commit', 'write_index'), 'rb').readlines()] for name in write_list: - destname = os.path.join(self.path, 'objects', name) + destname = os.path.join(self.path, 'data', name) if not os.path.exists(os.path.dirname(destname)): os.makedirs(os.path.dirname(destname)) os.rename(os.path.join(self.path, 'txn-commit', 'write', name), destname) with open(os.path.join(self.path, 'tid'), 'wb') as fd: - fd.write(tid) + fd.write(str(tid)) os.rename(os.path.join(self.path, 'txn-commit'), os.path.join(self.path, 'txn-applied')) shutil.rmtree(os.path.join(self.path, 'txn-applied')) @@ -110,60 +111,90 @@ class Store(object): os.mkdir(os.path.join(self.path, 'txn-active', 'write')) self.state = Store.ACTIVE - def _filename(self, hash, base=''): - hex = hash.encode('hex') - return os.path.join(base, hex[:2], hex[2:4], hex[4:]) + def _filename(self, ns, id, base=''): + ns = ns.encode('hex') + id = id.encode('hex') + return os.path.join(base, ns, id[:2], id[2:4], id[4:]) - def get(self, hash): + def get(self, ns, id): """ """ - path = self._filename(hash) + path = self._filename(ns, id) if path in self.txn_write: filename = os.path.join(self.path, 'txn-active', 'write', path) return open(filename, 'rb').read() - filename = self._filename(hash, os.path.join(self.path, 'objects')) + if path in self.txn_delete: + raise Exception('Object %s does not exist' % hash.encode('hex')) + filename = self._filename(ns, id, os.path.join(self.path, 'data')) if os.path.exists(filename): return open(filename, 'rb').read() else: raise Exception('Object %s does not exist' % hash.encode('hex')) - def put(self, data, hash=None): + def put(self, ns, id, data): """ """ - if not hash: - hash = hashlib.sha1(data).digest() self.prepare_txn() - path = self._filename(hash) - filename = self._filename(hash, os.path.join(self.path, 'objects')) + path = self._filename(ns, id) + filename = self._filename(ns, id, os.path.join(self.path, 'data')) if (path in self.txn_write or (path not in self.txn_delete and os.path.exists(filename))): - raise Exception('Object already exists: %s' % hash.encode('hex')) + raise Exception('Object already exists: %s:%s' % (ns.encode('hex'), id.encode('hex'))) if path in self.txn_delete: self.txn_delete.remove(path) if path not in self.txn_write: self.txn_write.append(path) - filename = self._filename(hash, os.path.join(self.path, 'txn-active', 'write')) + filename = self._filename(ns, id, os.path.join(self.path, 'txn-active', 'write')) if not os.path.exists(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) with open(filename, 'wb') as fd: fd.write(data) - return hash - def delete(self, hash): + def delete(self, ns, id): """ """ self.prepare_txn() - path = self._filename(hash) + path = self._filename(ns, id) if path in self.txn_write: + filename = self._filename(ns, id, os.path.join(self.path, 'txn-active', 'write')) self.txn_write.remove(path) os.unlink(filename) else: - filename = os.path.join(self.path, 'objects', path) + filename = os.path.join(self.path, 'data', path) if os.path.exists(filename): self.txn_delete.append(path) else: raise Exception('Object does not exist: %s' % hash.encode('hex')) + def list(self, ns, prefix='', marker=None, max_keys=1000000): + for x in self.foo(os.path.join(self.path, 'data', ns.encode('hex')), + prefix, marker, '', max_keys): + yield x + + + def foo(self, path, prefix, marker, base, max_keys): + n = 0 + for name in sorted(os.listdir(path)): + if n >= max_keys: + return + dirs = [] + names = [] + id = name.decode('hex') + if os.path.isdir(os.path.join(path, name)): + if prefix and not id.startswith(prefix[:len(id)]): + continue + for x in self.foo(os.path.join(path, name), + prefix[len(id):], marker, + base + id, max_keys - n): + yield x + n += 1 + else: + if prefix and not id.startswith(prefix): + continue + if not marker or base + id >= marker: + yield base + id + n += 1 + class StoreTestCase(unittest.TestCase): @@ -175,32 +206,47 @@ class StoreTestCase(unittest.TestCase): shutil.rmtree(self.tmppath) def test1(self): - self.assertEqual(self.store.tid, '') + self.assertEqual(self.store.tid, 0) self.assertEqual(self.store.state, Store.OPEN) - SOMEDATA_hash = self.store.put('SOMEDATA') - self.assertRaises(Exception, lambda: self.store.put('SOMEDATA')) - self.assertEqual(self.store.get(SOMEDATA_hash), 'SOMEDATA') + self.store.put('SOMENS', 'SOMEID', 'SOMEDATA') + self.assertRaises(Exception, lambda: self.store.put('SOMENS', 'SOMEID', 'SOMEDATA')) + self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA') self.store.rollback() - self.assertRaises(Exception, lambda: self.store.get('SOMEDATA')) - self.assertEqual(self.store.tid, '') + self.assertRaises(Exception, lambda: self.store.get('SOMENS', 'SOMEID')) + self.assertEqual(self.store.tid, 0) def test2(self): - self.assertEqual(self.store.tid, '') + self.assertEqual(self.store.tid, 0) self.assertEqual(self.store.state, Store.OPEN) - SOMEDATA_hash = self.store.put('SOMEDATA') - self.assertEqual(self.store.get(SOMEDATA_hash), 'SOMEDATA') - self.store.commit(SOMEDATA_hash) - self.assertEqual(self.store.tid, SOMEDATA_hash) - self.assertEqual(self.store.get(SOMEDATA_hash), 'SOMEDATA') - self.store.delete(SOMEDATA_hash) - self.assertRaises(Exception, lambda: self.store.get('SOMEDATA')) + self.store.put('SOMENS', 'SOMEID', 'SOMEDATA') + self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA') + self.store.commit() + self.assertEqual(self.store.tid, 1) + self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA') + self.store.delete('SOMENS', 'SOMEID') + self.assertRaises(Exception, lambda: self.store.get('SOMENS', 'SOMEID')) self.store.rollback() - self.assertEqual(self.store.get(SOMEDATA_hash), 'SOMEDATA') - self.store.delete(SOMEDATA_hash) - self.assertRaises(Exception, lambda: self.store.get('SOMEDATA')) - self.store.commit('Something Else') - self.assertEqual(self.store.tid, 'Something Else') - self.assertRaises(Exception, lambda: self.store.get('SOMEDATA')) + self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA') + self.store.delete('SOMENS', 'SOMEID') + self.assertRaises(Exception, lambda: self.store.get('SOMENS', 'SOMEID')) + self.store.commit() + self.assertEqual(self.store.tid, 2) + self.assertRaises(Exception, lambda: self.store.get('SOMENS', 'SOMEID')) + + def test_list(self): + self.store.put('SOMENS', 'SOMEID12', 'SOMEDATA') + self.store.put('SOMENS', 'SOMEID', 'SOMEDATA') + self.store.put('SOMENS', 'SOMEID1', 'SOMEDATA') + self.store.put('SOMENS', 'SOMEID123', 'SOMEDATA') + self.store.commit() + self.assertEqual(list(self.store.list('SOMENS', max_keys=3)), + ['SOMEID', 'SOMEID1', 'SOMEID12']) + self.assertEqual(list(self.store.list('SOMENS', marker='SOMEID12')), + ['SOMEID12', 'SOMEID123']) + self.assertEqual(list(self.store.list('SOMENS', prefix='SOMEID1', max_keys=2)), + ['SOMEID1', 'SOMEID12']) + self.assertEqual(list(self.store.list('SOMENS', prefix='SOMEID1', marker='SOMEID12')), + ['SOMEID12', 'SOMEID123']) if __name__ == '__main__':