From 4f7f1fdbe0f5a159e1fb93769c007fe50e5b77f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Borgstr=C3=B6m?= Date: Thu, 18 Aug 2011 22:23:05 +0200 Subject: [PATCH] Store: rename bands to segments --- darc/store.py | 174 +++++++++++++++++++++++++------------------------- darc/test.py | 2 +- 2 files changed, 88 insertions(+), 88 deletions(-) diff --git a/darc/store.py b/darc/store.py index 5f04b7c3b..269174200 100644 --- a/darc/store.py +++ b/darc/store.py @@ -21,12 +21,12 @@ class Store(object): On disk layout: dir/README dir/config - dir/bands// - dir/band + dir/data// + dir/segments dir/index """ - DEFAULT_MAX_BAND_SIZE = 5 * 1024 * 1024 - DEFAULT_BANDS_PER_DIR = 10000 + DEFAULT_MAX_SEGMENT_SIZE = 5 * 1024 * 1024 + DEFAULT_SEGMENTS_PER_DIR = 10000 class DoesNotExist(KeyError): """Requested key does not exist""" @@ -47,18 +47,18 @@ class Store(object): os.mkdir(path) with open(os.path.join(path, 'README'), 'wb') as fd: fd.write('This is a DARC store') - os.mkdir(os.path.join(path, 'bands')) + os.mkdir(os.path.join(path, 'data')) config = RawConfigParser() config.add_section('store') config.set('store', 'version', '1') - config.set('store', 'bands_per_dir', self.DEFAULT_BANDS_PER_DIR) - config.set('store', 'max_band_size', self.DEFAULT_MAX_BAND_SIZE) - config.set('store', 'next_band', '0') + config.set('store', 'segments_per_dir', self.DEFAULT_SEGMENTS_PER_DIR) + config.set('store', 'max_segment_size', self.DEFAULT_MAX_SEGMENT_SIZE) + config.set('store', 'next_segment', '0') config.add_section('meta') config.set('meta', 'manifest', '') config.set('meta', 'id', os.urandom(32).encode('hex')) NSIndex.create(os.path.join(path, 'index')) - self.write_dict(os.path.join(path, 'band'), {}) + self.write_dict(os.path.join(path, 'segments'), {}) with open(os.path.join(path, 'config'), 'w') as fd: config.write(fd) @@ -79,14 +79,14 @@ class Store(object): fd.write(msgpack.packb(d)) os.rename(filename+'.tmp', filename) - def delete_bands(self): + def delete_segments(self): delete_path = os.path.join(self.path, 'delete') if os.path.exists(delete_path): - bands = self.read_dict(os.path.join(self.path, 'band')) - for band in self.read_dict(delete_path): - assert bands.pop(band, 0) == 0 - self.io.delete_band(band, missing_ok=True) - self.write_dict(os.path.join(self.path, 'band'), bands) + segments = self.read_dict(os.path.join(self.path, 'segments')) + for segment in self.read_dict(delete_path): + assert segments.pop(segment, 0) == 0 + self.io.delete_segment(segment, missing_ok=True) + self.write_dict(os.path.join(self.path, 'segments'), segments) def begin_txn(self): txn_dir = os.path.join(self.path, 'txn.tmp') @@ -94,7 +94,7 @@ class Store(object): os.mkdir(txn_dir) shutil.copy(os.path.join(self.path, 'config'), txn_dir) shutil.copy(os.path.join(self.path, 'index'), txn_dir) - shutil.copy(os.path.join(self.path, 'band'), txn_dir) + shutil.copy(os.path.join(self.path, 'segments'), txn_dir) os.rename(os.path.join(self.path, 'txn.tmp'), os.path.join(self.path, 'txn.active')) self.compact = set() @@ -108,9 +108,9 @@ class Store(object): """Commit transaction """ meta = meta or self.meta - self.compact_bands() + self.compact_segments() self.io.close() - self.config.set('store', 'next_band', self.io.band + 1) + self.config.set('store', 'next_segment', self.io.segment + 1) self.config.remove_section('meta') self.config.add_section('meta') for k, v in meta.items(): @@ -118,30 +118,30 @@ class Store(object): with open(os.path.join(self.path, 'config'), 'w') as fd: self.config.write(fd) self.index.flush() - self.write_dict(os.path.join(self.path, 'band'), self.bands) + self.write_dict(os.path.join(self.path, 'segments'), self.segments) # If we crash before this line, the transaction will be # rolled back by open() os.rename(os.path.join(self.path, 'txn.active'), os.path.join(self.path, 'txn.commit')) self.rollback() - def compact_bands(self): - """Compact sparse bands by copying data into new bands + def compact_segments(self): + """Compact sparse segments by copying data into new segments """ if not self.compact: return - self.io.close_band() + self.io.close_segment() def lookup(key): - return self.index.get(key, (-1, -1))[0] == band - bands = self.bands - for band in self.compact: - if bands[band] > 0: - for key, data in self.io.iter_objects(band, lookup): - new_band, offset = self.io.write(key, data) - self.index[key] = new_band, offset - bands.setdefault(new_band, 0) - bands[new_band] += 1 - bands[band] -= 1 + return self.index.get(key, (-1, -1))[0] == segment + segments = self.segments + for segment in self.compact: + if segments[segment] > 0: + for key, data in self.io.iter_objects(segment, lookup): + new_segment, offset = self.io.write(key, data) + self.index[key] = new_segment, offset + segments.setdefault(new_segment, 0) + segments[new_segment] += 1 + segments[segment] -= 1 self.write_dict(os.path.join(self.path, 'delete'), tuple(self.compact)) def rollback(self): @@ -149,7 +149,7 @@ class Store(object): """ # Commit any half committed transaction if os.path.exists(os.path.join(self.path, 'txn.commit')): - self.delete_bands() + self.delete_segments() os.rename(os.path.join(self.path, 'txn.commit'), os.path.join(self.path, 'txn.tmp')) @@ -161,30 +161,30 @@ class Store(object): if os.path.exists(txn_dir): shutil.copy(os.path.join(txn_dir, 'config'), self.path) shutil.copy(os.path.join(txn_dir, 'index'), self.path) - shutil.copy(os.path.join(txn_dir, 'band'), self.path) + shutil.copy(os.path.join(txn_dir, 'segments'), self.path) os.rename(txn_dir, os.path.join(self.path, 'txn.tmp')) # Remove partially removed transaction if os.path.exists(os.path.join(self.path, 'txn.tmp')): shutil.rmtree(os.path.join(self.path, 'txn.tmp')) self.index = NSIndex(os.path.join(self.path, 'index')) - self.bands = self.read_dict(os.path.join(self.path, 'band')) + self.segments = self.read_dict(os.path.join(self.path, 'segments')) self.config = RawConfigParser() self.config.read(os.path.join(self.path, 'config')) if self.config.getint('store', 'version') != 1: raise Exception('%s Does not look like a darc store') - next_band = self.config.getint('store', 'next_band') - max_band_size = self.config.getint('store', 'max_band_size') - bands_per_dir = self.config.getint('store', 'bands_per_dir') + next_segment = self.config.getint('store', 'next_segment') + max_segment_size = self.config.getint('store', 'max_segment_size') + segments_per_dir = self.config.getint('store', 'segments_per_dir') self.meta = dict(self.config.items('meta')) - self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir) + self.io = SegmentIO(self.path, next_segment, max_segment_size, segments_per_dir) self.io.cleanup() self.txn_active = False @deferrable def get(self, id): try: - band, offset = self.index[id] - return self.io.read(band, offset, id) + segment, offset = self.index[id] + return self.io.read(segment, offset, id) except KeyError: raise self.DoesNotExist @@ -193,24 +193,24 @@ class Store(object): if not self.txn_active: self.begin_txn() try: - band, _ = self.index[id] - self.bands[band] -= 1 - self.compact.add(band) + segment, _ = self.index[id] + self.segments[segment] -= 1 + self.compact.add(segment) except KeyError: pass - band, offset = self.io.write(id, data) - self.bands.setdefault(band, 0) - self.bands[band] += 1 - self.index[id] = band, offset + segment, offset = self.io.write(id, data) + self.segments.setdefault(segment, 0) + self.segments[segment] += 1 + self.index[id] = segment, offset @deferrable def delete(self, id): if not self.txn_active: self.begin_txn() try: - band, offset = self.index.pop(id) - self.bands[band] -= 1 - self.compact.add(band) + segment, offset = self.index.pop(id) + self.segments[segment] -= 1 + self.compact.add(segment) except KeyError: raise self.DoesNotExist @@ -218,85 +218,85 @@ class Store(object): pass -class BandIO(object): +class SegmentIO(object): header_fmt = struct.Struct(' self.limit: - self.close_band() - fd = self.get_fd(self.band, write=True) + self.close_segment() + fd = self.get_fd(self.segment, write=True) fd.seek(self.offset) if self.offset == 0: - fd.write('DARCBAND') + fd.write('DSEGMENT') self.offset = 8 offset = self.offset hash = crc32(data) & 0xffffffff fd.write(self.header_fmt.pack(size, 0, hash, id)) fd.write(data) self.offset += size - return self.band, offset + return self.segment, offset - def close_band(self): - self.band += 1 + def close_segment(self): + self.segment += 1 self.offset = 0 diff --git a/darc/test.py b/darc/test.py index e2eef8cfb..6493ac5da 100644 --- a/darc/test.py +++ b/darc/test.py @@ -109,7 +109,7 @@ class Test(unittest.TestCase): def test_corrupted_store(self): self.create_src_archive('test') self.darc('verify', self.store_path + '::test') - fd = open(os.path.join(self.tmpdir, 'store', 'bands', '0', '0'), 'r+') + fd = open(os.path.join(self.tmpdir, 'store', 'data', '0', '0'), 'r+') fd.seek(100) fd.write('X') fd.close()