borg/dedupestore/bandstore.py

207 lines
7.5 KiB
Python

#!/usr/bin/env python
import os
import tempfile
import shutil
import unittest
import sqlite3
import uuid
class BandStore(object):
"""
"""
class DoesNotExist(KeyError):
""""""
class AlreadyExists(KeyError):
""""""
IDLE = 'Idle'
OPEN = 'Open'
ACTIVE = 'Active'
BAND_LIMIT = 1024 * 1024 * 10
def __init__(self, path):
if not os.path.exists(path):
self.create(path)
self.path = path
self.cnx = sqlite3.connect(os.path.join(path, 'index.db'))
self.cursor = self.cnx.cursor()
self._begin()
def _begin(self):
self.uuid, self.tid, self.next_band = self.cursor.execute('SELECT uuid, tid, nextband FROM system').fetchone()
self.state = self.OPEN
self.band = None
self.to_delete = set()
band = self.next_band
while os.path.exists(self.band_filename(band)):
os.unlink(self.band_filename(band))
band += 1
def create(self, path):
os.mkdir(path)
os.mkdir(os.path.join(path, 'bands'))
cnx = sqlite3.connect(os.path.join(path, 'index.db'))
cnx.execute('CREATE TABLE objects(ns TEXT NOT NULL, id NOT NULL, '
'band NOT NULL, offset NOT NULL, size NOT NULL)')
cnx.execute('CREATE TABLE system(uuid NOT NULL, tid NOT NULL, nextband NOT NULL)')
cnx.execute('INSERT INTO system VALUES(?,?,?)', (uuid.uuid1().hex, 0, 0))
cnx.execute('CREATE UNIQUE INDEX objects_pk ON objects(ns, id)')
def close(self):
self.cnx.close()
def commit(self):
"""
"""
self.band = None
for b in self.to_delete:
objects = self.cursor.execute('SELECT ns, id, offset, size '
'FROM objects WHERE band=?', (b,)).fetchall()
for o in objects:
band, offset, size = self.store_data(self.retrieve_data(b, *o[2:]))
self.cursor.execute('UPDATE objects SET band=?, offset=?, size=? '
'WHERE ns=? AND id=?', (band, offset, size, o[0], o[1]))
os.unlink(os.path.join(self.path, 'bands', str(b)))
self.cursor.execute('UPDATE system SET tid=tid+1, nextband=?',
(self.next_band,))
self.cnx.commit()
self.tid += 1
self._begin()
def rollback(self):
"""
"""
self.cnx.rollback()
self._begin()
def get(self, ns, id):
"""
"""
self.cursor.execute('SELECT band, offset, size FROM objects WHERE ns=? and id=?',
(ns.encode('hex'), id.encode('hex')))
row = self.cursor.fetchone()
if row:
return self.retrieve_data(*row)
else:
raise self.DoesNotExist
def band_filename(self, band):
return os.path.join(self.path, 'bands', str(band))
def retrieve_data(self, band, offset, size):
with open(self.band_filename(band), 'rb') as fd:
fd.seek(offset)
return fd.read(size)
def store_data(self, data):
if self.band is None:
self.band = self.next_band
assert not os.path.exists(self.band_filename(self.band))
self.next_band += 1
band = self.band
with open(self.band_filename(band), 'ab') as fd:
offset = fd.tell()
fd.write(data)
if offset + len(data) > self.BAND_LIMIT:
self.band = None
return band, offset, len(data)
def put(self, ns, id, data):
"""
"""
try:
band, offset, size = self.store_data(data)
self.cursor.execute('INSERT INTO objects (ns, id, band, offset, size) '
'VALUES(?, ?, ?, ?, ?)',
(ns.encode('hex'), id.encode('hex'),
band, offset, size))
except sqlite3.IntegrityError:
raise self.AlreadyExists
def delete(self, ns, id):
"""
"""
self.cursor.execute('SELECT band FROM objects WHERE ns=? and id=?',
(ns.encode('hex'), id.encode('hex')))
row = self.cursor.fetchone()
if not row:
raise self.DoesNotExist
self.cursor.execute('DELETE FROM objects WHERE ns=? AND id=?',
(ns.encode('hex'), id.encode('hex')))
self.to_delete.add(row[0])
def list(self, ns, prefix='', marker=None, max_keys=1000000):
"""
"""
condition = ''
if prefix:
condition += ' AND id LIKE :prefix'
if marker:
condition += ' AND id >= :marker'
args = dict(ns=ns.encode('hex'), prefix=prefix.encode('hex') + '%',
marker=marker and marker.encode('hex'))
for row in self.cursor.execute('SELECT id FROM objects WHERE '
'ns=:ns ' + condition + ' LIMIT ' + str(max_keys),
args):
yield row[0].decode('hex')
class BandStoreTestCase(unittest.TestCase):
def setUp(self):
self.tmppath = tempfile.mkdtemp()
self.store = BandStore(os.path.join(self.tmppath, 'store'))
def tearDown(self):
shutil.rmtree(self.tmppath)
def test1(self):
self.assertEqual(self.store.tid, 0)
self.assertEqual(self.store.state, self.store.OPEN)
self.store.put('SOMENS', 'SOMEID', 'SOMEDATA')
self.assertRaises(self.store.AlreadyExists, lambda: self.store.put('SOMENS', 'SOMEID', 'SOMEDATA'))
self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
self.store.rollback()
self.assertRaises(self.store.DoesNotExist, lambda: self.store.get('SOMENS', 'SOMEID'))
self.assertEqual(self.store.tid, 0)
def test2(self):
self.assertEqual(self.store.tid, 0)
self.assertEqual(self.store.state, self.store.OPEN)
self.store.put('SOMENS', 'SOMEID', 'SOMEDATA')
self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
self.store.commit()
self.assertEqual(self.store.tid, 1)
self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
self.store.delete('SOMENS', 'SOMEID')
self.assertRaises(self.store.DoesNotExist, lambda: self.store.get('SOMENS', 'SOMEID'))
self.store.rollback()
self.assertEqual(self.store.get('SOMENS', 'SOMEID'), 'SOMEDATA')
self.store.delete('SOMENS', 'SOMEID')
self.assertRaises(self.store.DoesNotExist, lambda: self.store.get('SOMENS', 'SOMEID'))
self.store.commit()
self.assertEqual(self.store.tid, 2)
self.assertRaises(self.store.DoesNotExist, lambda: self.store.get('SOMENS', 'SOMEID'))
def test_list(self):
self.store.put('SOMENS', 'SOMEID12', 'SOMEDATA')
self.store.put('SOMENS', 'SOMEID', 'SOMEDATA')
self.store.put('SOMENS', 'SOMEID1', 'SOMEDATA')
self.store.put('SOMENS', 'SOMEID123', 'SOMEDATA')
self.store.commit()
self.assertEqual(list(self.store.list('SOMENS', max_keys=3)),
['SOMEID', 'SOMEID1', 'SOMEID12'])
self.assertEqual(list(self.store.list('SOMENS', marker='SOMEID12')),
['SOMEID12', 'SOMEID123'])
self.assertEqual(list(self.store.list('SOMENS', prefix='SOMEID1', max_keys=2)),
['SOMEID1', 'SOMEID12'])
self.assertEqual(list(self.store.list('SOMENS', prefix='SOMEID1', marker='SOMEID12')),
['SOMEID12', 'SOMEID123'])
if __name__ == '__main__':
unittest.main()