diff --git a/darc/store.py b/darc/store.py index 0ca6a58c0..5f04b7c3b 100644 --- a/darc/store.py +++ b/darc/store.py @@ -69,16 +69,6 @@ def open(self, path): self.lock_fd = open(os.path.join(path, 'README'), 'r+') fcntl.flock(self.lock_fd, fcntl.LOCK_EX) self.rollback() - self.config = RawConfigParser() - self.config.read(os.path.join(path, 'config')) - if self.config.getint('store', 'version') != 1: - raise Exception('%s Does not look like a darc store') - next_band = self.config.getint('store', 'next_band') - max_band_size = self.config.getint('store', 'max_band_size') - bands_per_dir = self.config.getint('store', 'bands_per_dir') - self.meta = dict(self.config.items('meta')) - self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir) - self.io.cleanup() def read_dict(self, filename): with open(filename, 'rb') as fd: @@ -142,16 +132,16 @@ def compact_bands(self): return self.io.close_band() def lookup(key): - return key in self.index + return self.index.get(key, (-1, -1))[0] == band bands = self.bands for band in self.compact: if bands[band] > 0: for key, data in self.io.iter_objects(band, lookup): new_band, offset = self.io.write(key, data) self.index[key] = new_band, offset - bands[band] -= 1 bands.setdefault(new_band, 0) bands[new_band] += 1 + bands[band] -= 1 self.write_dict(os.path.join(self.path, 'delete'), tuple(self.compact)) def rollback(self): @@ -178,6 +168,16 @@ def rollback(self): shutil.rmtree(os.path.join(self.path, 'txn.tmp')) self.index = NSIndex(os.path.join(self.path, 'index')) self.bands = self.read_dict(os.path.join(self.path, 'band')) + self.config = RawConfigParser() + self.config.read(os.path.join(self.path, 'config')) + if self.config.getint('store', 'version') != 1: + raise Exception('%s Does not look like a darc store') + next_band = self.config.getint('store', 'next_band') + max_band_size = self.config.getint('store', 'max_band_size') + bands_per_dir = self.config.getint('store', 'bands_per_dir') + self.meta = dict(self.config.items('meta')) + self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir) + self.io.cleanup() self.txn_active = False @deferrable @@ -192,6 +192,12 @@ def get(self, id): def put(self, id, data): if not self.txn_active: self.begin_txn() + try: + band, _ = self.index[id] + self.bands[band] -= 1 + self.compact.add(band) + except KeyError: + pass band, offset = self.io.write(id, data) self.bands.setdefault(band, 0) self.bands[band] += 1 @@ -228,6 +234,7 @@ def __init__(self, path, nextband, limit, bands_per_dir, capacity=100): def close(self): for band in self.fds.keys(): self.fds.pop(band).close() + self.fds = None # Just to make sure we're disabled def cleanup(self): """Delete band files left by aborted transactions @@ -336,6 +343,17 @@ def test1(self): self.store.close() store2 = Store(os.path.join(self.tmppath, 'store')) + def test2(self): + """Test multiple sequential transactions + """ + self.store.put('00000000000000000000000000000000', 'foo') + self.store.put('00000000000000000000000000000001', 'foo') + self.store.commit() + self.store.delete('00000000000000000000000000000000') + self.store.put('00000000000000000000000000000001', 'bar') + self.store.commit() + self.assertEqual(self.store.get('00000000000000000000000000000001'), 'bar') + def suite(): return unittest.TestLoader().loadTestsFromTestCase(StoreTestCase)