Repository code cleanup

This commit is contained in:
Jonas Borgström 2014-02-08 13:31:51 +01:00
parent c22bc30a06
commit 2252616f9e
3 changed files with 57 additions and 55 deletions

View File

@ -50,6 +50,7 @@ class Repository(object):
self.path = path self.path = path
self.io = None self.io = None
self.lock = None self.lock = None
self.index = None
if create: if create:
self.create(path) self.create(path)
self.open(path) self.open(path)
@ -76,8 +77,14 @@ class Repository(object):
with open(os.path.join(path, 'config'), 'w') as fd: with open(os.path.join(path, 'config'), 'w') as fd:
config.write(fd) config.write(fd)
def get_index_transaction_id(self):
indicies = sorted((int(name[6:]) for name in os.listdir(self.path) if name.startswith('index.') and name[6:].isdigit()))
if indicies:
return indicies[-1]
else:
return None
def open(self, path): def open(self, path):
self.head = None
self.path = path self.path = path
if not os.path.isdir(path): if not os.path.isdir(path):
raise self.DoesNotExist(path) raise self.DoesNotExist(path)
@ -99,7 +106,7 @@ class Repository(object):
self.lock.release() self.lock.release()
self.lock = None self.lock = None
def commit(self, rollback=True): def commit(self):
"""Commit transaction """Commit transaction
""" """
self.io.write_commit() self.io.write_commit()
@ -107,13 +114,9 @@ class Repository(object):
self.write_index() self.write_index()
self.rollback() self.rollback()
def _available_indices(self, reverse=False):
names = [int(name[6:]) for name in os.listdir(self.path) if re.match('index\.\d+', name)]
names.sort(reverse=reverse)
return names
def open_index(self, head, read_only=False): def open_index(self, head, read_only=False):
if head is None: if head is None:
self.lock.upgrade()
self.index = NSIndex.create(os.path.join(self.path, 'index.tmp').encode('utf-8')) self.index = NSIndex.create(os.path.join(self.path, 'index.tmp').encode('utf-8'))
self.segments = {} self.segments = {}
self.compact = set() self.compact = set()
@ -121,6 +124,8 @@ class Repository(object):
if read_only: if read_only:
self.index = NSIndex((os.path.join(self.path, 'index.%d') % head).encode('utf-8'), readonly=True) self.index = NSIndex((os.path.join(self.path, 'index.%d') % head).encode('utf-8'), readonly=True)
else: else:
self.lock.upgrade()
self.io.cleanup()
shutil.copy(os.path.join(self.path, 'index.%d' % head), shutil.copy(os.path.join(self.path, 'index.%d' % head),
os.path.join(self.path, 'index.tmp')) os.path.join(self.path, 'index.tmp'))
self.index = NSIndex(os.path.join(self.path, 'index.tmp').encode('utf-8')) self.index = NSIndex(os.path.join(self.path, 'index.tmp').encode('utf-8'))
@ -211,6 +216,8 @@ class Repository(object):
This method verifies all segment checksums and makes sure This method verifies all segment checksums and makes sure
the index is consistent with the data stored in the segments. the index is consistent with the data stored in the segments.
""" """
if not self.index:
self.open_index(self.io.head, read_only=True)
progress_time = None progress_time = None
error_found = False error_found = False
def report_progress(msg, error=False): def report_progress(msg, error=False):
@ -220,7 +227,7 @@ class Repository(object):
if error or progress: if error or progress:
print(msg, file=sys.stderr) print(msg, file=sys.stderr)
seen = set() seen = set()
for segment, filename in self.io._segment_names(): for segment, filename in self.io.segment_iterator():
if progress: if progress:
if int(time.time()) != progress_time: if int(time.time()) != progress_time:
progress_time = int(time.time()) progress_time = int(time.time())
@ -250,22 +257,24 @@ class Repository(object):
def rollback(self): def rollback(self):
""" """
""" """
self._active_txn = False
if self.io: if self.io:
self.io.close() self.io.close()
self.io = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir) self.io = None
if self.io.head is not None and not os.path.exists(os.path.join(self.path, 'index.%d' % self.io.head)): self.index = None
self.lock.upgrade() self._active_txn = False
self.recover(self.path) self.io = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir, self.get_index_transaction_id())
self.open_index(self.io.head, read_only=True)
def _len(self): def _len(self):
if not self.index:
self.open_index(self.io.head, read_only=True)
return len(self.index) return len(self.index)
def get(self, id): def get(self, id_):
if not self.index:
self.open_index(self.io.head, read_only=True)
try: try:
segment, offset = self.index[id] segment, offset = self.index[id_]
return self.io.read(segment, offset, id) return self.io.read(segment, offset, id_)
except KeyError: except KeyError:
raise self.DoesNotExist(self.path) raise self.DoesNotExist(self.path)
@ -276,7 +285,6 @@ class Repository(object):
def put(self, id, data, wait=True): def put(self, id, data, wait=True):
if not self._active_txn: if not self._active_txn:
self._active_txn = True self._active_txn = True
self.lock.upgrade()
self.open_index(self.io.head) self.open_index(self.io.head)
try: try:
segment, _ = self.index[id] segment, _ = self.index[id]
@ -295,7 +303,6 @@ class Repository(object):
def delete(self, id, wait=True): def delete(self, id, wait=True):
if not self._active_txn: if not self._active_txn:
self._active_txn = True self._active_txn = True
self.lock.upgrade()
self.open_index(self.io.head) self.open_index(self.io.head)
try: try:
segment, offset = self.index.pop(id) segment, offset = self.index.pop(id)
@ -326,7 +333,7 @@ class LoggedIO(object):
_commit = header_no_crc_fmt.pack(9, TAG_COMMIT) _commit = header_no_crc_fmt.pack(9, TAG_COMMIT)
COMMIT = crc_fmt.pack(crc32(_commit)) + _commit COMMIT = crc_fmt.pack(crc32(_commit)) + _commit
def __init__(self, path, limit, segments_per_dir, capacity=100): def __init__(self, path, limit, segments_per_dir, latest_index, capacity=100):
self.path = path self.path = path
self.fds = LRUCache(capacity) self.fds = LRUCache(capacity)
self.segment = None self.segment = None
@ -335,7 +342,7 @@ class LoggedIO(object):
self.offset = 0 self.offset = 0
self._write_fd = None self._write_fd = None
self.head = None self.head = None
self.cleanup() self.verify_segments_head(latest_index)
def close(self): def close(self):
for segment in list(self.fds.keys()): for segment in list(self.fds.keys()):
@ -343,37 +350,45 @@ class LoggedIO(object):
self.close_segment() self.close_segment()
self.fds = None # Just to make sure we're disabled self.fds = None # Just to make sure we're disabled
def _segment_names(self, reverse=False): def segment_iterator(self, reverse=False):
for dirpath, dirs, filenames in os.walk(os.path.join(self.path, 'data')): for dirpath, dirs, filenames in os.walk(os.path.join(self.path, 'data')):
dirs.sort(key=int, reverse=reverse) dirs.sort(key=int, reverse=reverse)
filenames = sorted((filename for filename in filenames if filename.isdigit()), key=int, reverse=reverse) filenames = sorted((filename for filename in filenames if filename.isdigit()), key=int, reverse=reverse)
for filename in filenames: for filename in filenames:
yield int(filename), os.path.join(dirpath, filename) yield int(filename), os.path.join(dirpath, filename)
def verify_segments_head(self, latest_index):
"""Verify that the transaction id is consistent with the index transaction id
"""
self.segment = 0
for segment, filename in self.segment_iterator(reverse=True):
if latest_index is None or segment < latest_index:
# The index is newer than any committed transaction found
raise Repository.CheckNeeded()
if self.is_committed_segment(filename):
if segment > latest_index:
# The committed transaction found is newer than the index
raise Repository.CheckNeeded()
self.head = segment
self.segment = self.head + 1
break
else:
if latest_index is not None:
# An index has been found but no committed transaction
raise Repository.CheckNeeded()
def cleanup(self): def cleanup(self):
"""Delete segment files left by aborted transactions """Delete segment files left by aborted transactions
""" """
self.head = None for segment, filename in self.segment_iterator(reverse=True):
self.segment = 0 if segment > self.head:
to_delete = []
for segment, filename in self._segment_names(reverse=True):
if self.is_complete_segment(filename):
self.head = segment
self.segment = self.head + 1
for filename in to_delete:
os.unlink(filename) os.unlink(filename)
else:
break break
else:
to_delete.append(filename)
else:
# Abort if no transaction is found, otherwise all segments
# would be deleted
if to_delete:
raise Repository.CheckNeeded(self.path)
def is_committed_segment(self, filename):
"""Check if segment ends with a COMMIT_TAG tag
def is_complete_segment(self, filename): """
with open(filename, 'rb') as fd: with open(filename, 'rb') as fd:
try: try:
fd.seek(-self.header_fmt.size, os.SEEK_END) fd.seek(-self.header_fmt.size, os.SEEK_END)

View File

@ -263,6 +263,7 @@ class ArchiverTestCase(AtticTestCase):
def verify_uniqueness(): def verify_uniqueness():
repository = Repository(self.repository_path) repository = Repository(self.repository_path)
repository.open_index(repository.io.head)
for key, _ in repository.index.iteritems(): for key, _ in repository.index.iteritems():
data = repository.get(key) data = repository.get(key)
hash = sha256(data).digest() hash = sha256(data).digest()

View File

@ -49,20 +49,6 @@ class RepositoryTestCase(AtticTestCase):
self.repository.commit() self.repository.commit()
self.assert_equal(self.repository.get(b'00000000000000000000000000000001'), b'bar') self.assert_equal(self.repository.get(b'00000000000000000000000000000001'), b'bar')
def test_index_rebuild(self):
"""Verify that repository index rebuild works properly
"""
def extract_and_unlink_index():
index_name = [n for n in os.listdir(os.path.join(self.tmppath, 'repository')) if n.startswith('index')][0]
idx = NSIndex(os.path.join(self.tmppath, 'repository', index_name))
os.unlink(os.path.join(self.tmppath, 'repository', index_name))
return list(idx.iteritems())
self.test2()
self.repository.close()
before = extract_and_unlink_index()
self.open()
self.assert_equal(before, extract_and_unlink_index())
def test_consistency(self): def test_consistency(self):
"""Test cache consistency """Test cache consistency
""" """