From 2252616f9e749f382dcc0324c603710685aad151 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Borgstr=C3=B6m?= Date: Sat, 8 Feb 2014 13:31:51 +0100 Subject: [PATCH] Repository code cleanup --- attic/repository.py | 97 ++++++++++++++++++++--------------- attic/testsuite/archiver.py | 1 + attic/testsuite/repository.py | 14 ----- 3 files changed, 57 insertions(+), 55 deletions(-) diff --git a/attic/repository.py b/attic/repository.py index 5b79245be..2c821fb43 100644 --- a/attic/repository.py +++ b/attic/repository.py @@ -50,6 +50,7 @@ class Repository(object): self.path = path self.io = None self.lock = None + self.index = None if create: self.create(path) self.open(path) @@ -76,8 +77,14 @@ class Repository(object): with open(os.path.join(path, 'config'), 'w') as fd: config.write(fd) + def get_index_transaction_id(self): + indicies = sorted((int(name[6:]) for name in os.listdir(self.path) if name.startswith('index.') and name[6:].isdigit())) + if indicies: + return indicies[-1] + else: + return None + def open(self, path): - self.head = None self.path = path if not os.path.isdir(path): raise self.DoesNotExist(path) @@ -99,7 +106,7 @@ class Repository(object): self.lock.release() self.lock = None - def commit(self, rollback=True): + def commit(self): """Commit transaction """ self.io.write_commit() @@ -107,13 +114,9 @@ class Repository(object): self.write_index() self.rollback() - def _available_indices(self, reverse=False): - names = [int(name[6:]) for name in os.listdir(self.path) if re.match('index\.\d+', name)] - names.sort(reverse=reverse) - return names - def open_index(self, head, read_only=False): if head is None: + self.lock.upgrade() self.index = NSIndex.create(os.path.join(self.path, 'index.tmp').encode('utf-8')) self.segments = {} self.compact = set() @@ -121,6 +124,8 @@ class Repository(object): if read_only: self.index = NSIndex((os.path.join(self.path, 'index.%d') % head).encode('utf-8'), readonly=True) else: + self.lock.upgrade() + self.io.cleanup() shutil.copy(os.path.join(self.path, 'index.%d' % head), os.path.join(self.path, 'index.tmp')) self.index = NSIndex(os.path.join(self.path, 'index.tmp').encode('utf-8')) @@ -211,6 +216,8 @@ class Repository(object): This method verifies all segment checksums and makes sure the index is consistent with the data stored in the segments. """ + if not self.index: + self.open_index(self.io.head, read_only=True) progress_time = None error_found = False def report_progress(msg, error=False): @@ -220,7 +227,7 @@ class Repository(object): if error or progress: print(msg, file=sys.stderr) seen = set() - for segment, filename in self.io._segment_names(): + for segment, filename in self.io.segment_iterator(): if progress: if int(time.time()) != progress_time: progress_time = int(time.time()) @@ -250,22 +257,24 @@ class Repository(object): def rollback(self): """ """ - self._active_txn = False if self.io: self.io.close() - self.io = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir) - if self.io.head is not None and not os.path.exists(os.path.join(self.path, 'index.%d' % self.io.head)): - self.lock.upgrade() - self.recover(self.path) - self.open_index(self.io.head, read_only=True) + self.io = None + self.index = None + self._active_txn = False + self.io = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir, self.get_index_transaction_id()) def _len(self): + if not self.index: + self.open_index(self.io.head, read_only=True) return len(self.index) - def get(self, id): + def get(self, id_): + if not self.index: + self.open_index(self.io.head, read_only=True) try: - segment, offset = self.index[id] - return self.io.read(segment, offset, id) + segment, offset = self.index[id_] + return self.io.read(segment, offset, id_) except KeyError: raise self.DoesNotExist(self.path) @@ -276,7 +285,6 @@ class Repository(object): def put(self, id, data, wait=True): if not self._active_txn: self._active_txn = True - self.lock.upgrade() self.open_index(self.io.head) try: segment, _ = self.index[id] @@ -295,7 +303,6 @@ class Repository(object): def delete(self, id, wait=True): if not self._active_txn: self._active_txn = True - self.lock.upgrade() self.open_index(self.io.head) try: segment, offset = self.index.pop(id) @@ -326,7 +333,7 @@ class LoggedIO(object): _commit = header_no_crc_fmt.pack(9, TAG_COMMIT) COMMIT = crc_fmt.pack(crc32(_commit)) + _commit - def __init__(self, path, limit, segments_per_dir, capacity=100): + def __init__(self, path, limit, segments_per_dir, latest_index, capacity=100): self.path = path self.fds = LRUCache(capacity) self.segment = None @@ -335,7 +342,7 @@ class LoggedIO(object): self.offset = 0 self._write_fd = None self.head = None - self.cleanup() + self.verify_segments_head(latest_index) def close(self): for segment in list(self.fds.keys()): @@ -343,37 +350,45 @@ class LoggedIO(object): self.close_segment() self.fds = None # Just to make sure we're disabled - def _segment_names(self, reverse=False): + def segment_iterator(self, reverse=False): for dirpath, dirs, filenames in os.walk(os.path.join(self.path, 'data')): dirs.sort(key=int, reverse=reverse) filenames = sorted((filename for filename in filenames if filename.isdigit()), key=int, reverse=reverse) for filename in filenames: yield int(filename), os.path.join(dirpath, filename) + def verify_segments_head(self, latest_index): + """Verify that the transaction id is consistent with the index transaction id + """ + self.segment = 0 + for segment, filename in self.segment_iterator(reverse=True): + if latest_index is None or segment < latest_index: + # The index is newer than any committed transaction found + raise Repository.CheckNeeded() + if self.is_committed_segment(filename): + if segment > latest_index: + # The committed transaction found is newer than the index + raise Repository.CheckNeeded() + self.head = segment + self.segment = self.head + 1 + break + else: + if latest_index is not None: + # An index has been found but no committed transaction + raise Repository.CheckNeeded() + def cleanup(self): """Delete segment files left by aborted transactions """ - self.head = None - self.segment = 0 - to_delete = [] - for segment, filename in self._segment_names(reverse=True): - if self.is_complete_segment(filename): - self.head = segment - self.segment = self.head + 1 - for filename in to_delete: - os.unlink(filename) - break + for segment, filename in self.segment_iterator(reverse=True): + if segment > self.head: + os.unlink(filename) else: - to_delete.append(filename) - else: - # Abort if no transaction is found, otherwise all segments - # would be deleted - if to_delete: - raise Repository.CheckNeeded(self.path) + break - - - def is_complete_segment(self, filename): + def is_committed_segment(self, filename): + """Check if segment ends with a COMMIT_TAG tag + """ with open(filename, 'rb') as fd: try: fd.seek(-self.header_fmt.size, os.SEEK_END) diff --git a/attic/testsuite/archiver.py b/attic/testsuite/archiver.py index b3d5ca8aa..891ec6008 100644 --- a/attic/testsuite/archiver.py +++ b/attic/testsuite/archiver.py @@ -263,6 +263,7 @@ class ArchiverTestCase(AtticTestCase): def verify_uniqueness(): repository = Repository(self.repository_path) + repository.open_index(repository.io.head) for key, _ in repository.index.iteritems(): data = repository.get(key) hash = sha256(data).digest() diff --git a/attic/testsuite/repository.py b/attic/testsuite/repository.py index c9cb4b27d..778e7bfab 100644 --- a/attic/testsuite/repository.py +++ b/attic/testsuite/repository.py @@ -49,20 +49,6 @@ class RepositoryTestCase(AtticTestCase): self.repository.commit() self.assert_equal(self.repository.get(b'00000000000000000000000000000001'), b'bar') - def test_index_rebuild(self): - """Verify that repository index rebuild works properly - """ - def extract_and_unlink_index(): - index_name = [n for n in os.listdir(os.path.join(self.tmppath, 'repository')) if n.startswith('index')][0] - idx = NSIndex(os.path.join(self.tmppath, 'repository', index_name)) - os.unlink(os.path.join(self.tmppath, 'repository', index_name)) - return list(idx.iteritems()) - self.test2() - self.repository.close() - before = extract_and_unlink_index() - self.open() - self.assert_equal(before, extract_and_unlink_index()) - def test_consistency(self): """Test cache consistency """