An almost working banded store impl

This commit is contained in:
Jonas Borgström 2010-05-26 21:41:55 +02:00
parent 62a4a37544
commit 4e476d7653
2 changed files with 202 additions and 3 deletions

View File

@ -6,8 +6,8 @@ from optparse import OptionParser
from chunkifier import chunkify
from cache import Cache, NS_ARCHIVES, NS_CHUNKS
from sqlitestore import SqliteStore
#from sqlitestore import SqliteStore
from bandstore import BandStore
CHUNK_SIZE = 256 * 1024
@ -205,7 +205,7 @@ class Archiver(object):
help="Display archive statistics", metavar="ARCHIVE")
(options, args) = parser.parse_args()
if options.store:
self.store = SqliteStore(options.store)
self.store = BandStore(options.store)
else:
parser.error('No store path specified')
self.cache = Cache(self.store)

199
dedupestore/bandstore.py Normal file
View File

@ -0,0 +1,199 @@
#!/usr/bin/env python
import os
import tempfile
import shutil
import unittest
import sqlite3
import uuid
class BandStore(object):
"""
"""
class DoesNotExist(KeyError):
""""""
class AlreadyExists(KeyError):
""""""
IDLE = 'Idle'
OPEN = 'Open'
ACTIVE = 'Active'
BAND_LIMIT = 1024 * 1024 * 10
def __init__(self, path):
if not os.path.exists(path):
self.create(path)
self.path = path
self.cnx = sqlite3.connect(os.path.join(path, 'index.db'))
self.cursor = self.cnx.cursor()
self._begin()
def _begin(self):
self.uuid, self.tid, self.next_band = self.cursor.execute('SELECT uuid, tid, nextband FROM system').fetchone()
self.state = self.OPEN
self.band = None
self.to_delete = set()
def create(self, path):
os.mkdir(path)
os.mkdir(os.path.join(path, 'bands'))
cnx = sqlite3.connect(os.path.join(path, 'index.db'))
cnx.execute('CREATE TABLE objects(ns TEXT NOT NULL, id NOT NULL, '
'band NOT NULL, offset NOT NULL, size NOT NULL)')
cnx.execute('CREATE TABLE system(uuid NOT NULL, tid NOT NULL, nextband NOT NULL)')
cnx.execute('INSERT INTO system VALUES(?,?,?)', (uuid.uuid1().hex, 0, 0))
cnx.execute('CREATE UNIQUE INDEX objects_pk ON objects(ns, id)')
def close(self):
self.cnx.close()
def commit(self):
"""
"""
self.band = None
for b in self.to_delete:
objects = self.cursor.execute('SELECT ns, id, offset, size '
'FROM objects WHERE band=?', (b,)).fetchall()
for o in objects:
band, offset, size = self.store_data(self.retrieve_data(b, *o[2:]))
self.cursor.execute('UPDATE objects SET band=?, offset=?, size=? '
'WHERE ns=? AND id=?', (band, offset, size, o[0], o[1]))
os.unlink(os.path.join(self.path, 'bands', str(b)))
self.cursor.execute('UPDATE system SET tid=tid+1, nextband=?',
(self.next_band,))
self.cnx.commit()
self.tid += 1
self._begin()
def rollback(self):
"""
"""
self.cnx.rollback()
self._begin()
def get(self, ns, id):
"""
"""
self.cursor.execute('SELECT band, offset, size FROM objects WHERE ns=? and id=?',
(ns.encode('hex'), id.encode('hex')))
row = self.cursor.fetchone()
if row:
return self.retrieve_data(*row)
else:
raise self.DoesNotExist
def retrieve_data(self, band, offset, size):
with open(os.path.join(self.path, 'bands', str(band)), 'rb') as fd:
fd.seek(offset)
return fd.read(size)
def store_data(self, data):
if self.band is None:
self.band = self.next_band
assert not os.path.exists(os.path.join(self.path, 'bands', str(self.band)))
self.next_band += 1
band = self.band
with open(os.path.join(self.path, 'bands', str(band)), 'ab') as fd:
offset = fd.tell()
fd.write(data)
if offset + len(data) > self.BAND_LIMIT:
self.band = None
return band, offset, len(data)
def put(self, ns, id, data):
"""
"""
try:
band, offset, size = self.store_data(data)
self.cursor.execute('INSERT INTO objects (ns, id, band, offset, size) '
'VALUES(?, ?, ?, ?, ?)',
(ns.encode('hex'), id.encode('hex'),
band, offset, size))
except sqlite3.IntegrityError:
raise self.AlreadyExists
def delete(self, ns, id):
"""
"""
self.cursor.execute('SELECT band FROM objects WHERE ns=? and id=?',
(ns.encode('hex'), id.encode('hex')))
row = self.cursor.fetchone()
if not row:
raise self.DoesNotExist
self.cursor.execute('DELETE FROM objects WHERE ns=? AND id=?',
(ns.encode('hex'), id.encode('hex')))
self.to_delete.add(row[0])
def list(self, ns, prefix='', marker=None, max_keys=1000000):
"""
"""
condition = ''
if prefix:
condition += ' AND id LIKE :prefix'
if marker:
condition += ' AND id >= :marker'
args = dict(ns=ns.encode('hex'), prefix=prefix.encode('hex') + '%',
marker=marker and marker.encode('hex'))
for row in self.cursor.execute('SELECT id FROM objects WHERE '
'ns=:ns ' + condition + ' LIMIT ' + str(max_keys),
args):
yield row[0].decode('hex')
class BandStoreTestCase(unittest.TestCase):
def setUp(self):
self.tmppath = tempfile.mkdtemp()
self.store = BandStore(os.path.join(self.tmppath, 'store'))
def tearDown(self):
shutil.rmtree(self.tmppath)
def test1(self):
self.assertEqual(self.store.tid, 0)
self.assertEqual(self.store.state, self.store.OPEN)
self.store.put('SOMENS', 'SOMEID', 'SOMEDATA')
self.assertRaises(self.store.AlreadyExists, lambda: self.store.put('SOMENS', 'SOMEID', 'SOMEDATA'))
self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
self.store.rollback()
self.assertRaises(self.store.DoesNotExist, lambda: self.store.get('SOMENS', 'SOMEID'))
self.assertEqual(self.store.tid, 0)
def test2(self):
self.assertEqual(self.store.tid, 0)
self.assertEqual(self.store.state, self.store.OPEN)
self.store.put('SOMENS', 'SOMEID', 'SOMEDATA')
self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
self.store.commit()
self.assertEqual(self.store.tid, 1)
self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
self.store.delete('SOMENS', 'SOMEID')
self.assertRaises(self.store.DoesNotExist, lambda: self.store.get('SOMENS', 'SOMEID'))
self.store.rollback()
self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
self.store.delete('SOMENS', 'SOMEID')
self.assertRaises(self.store.DoesNotExist, lambda: self.store.get('SOMENS', 'SOMEID'))
self.store.commit()
self.assertEqual(self.store.tid, 2)
self.assertRaises(self.store.DoesNotExist, lambda: self.store.get('SOMENS', 'SOMEID'))
def test_list(self):
self.store.put('SOMENS', 'SOMEID12', 'SOMEDATA')
self.store.put('SOMENS', 'SOMEID', 'SOMEDATA')
self.store.put('SOMENS', 'SOMEID1', 'SOMEDATA')
self.store.put('SOMENS', 'SOMEID123', 'SOMEDATA')
self.store.commit()
self.assertEqual(list(self.store.list('SOMENS', max_keys=3)),
['SOMEID', 'SOMEID1', 'SOMEID12'])
self.assertEqual(list(self.store.list('SOMENS', marker='SOMEID12')),
['SOMEID12', 'SOMEID123'])
self.assertEqual(list(self.store.list('SOMENS', prefix='SOMEID1', max_keys=2)),
['SOMEID1', 'SOMEID12'])
self.assertEqual(list(self.store.list('SOMENS', prefix='SOMEID1', marker='SOMEID12')),
['SOMEID12', 'SOMEID123'])
if __name__ == '__main__':
unittest.main()