check: More code cleanup

This commit is contained in:
Jonas Borgström 2014-02-24 23:37:21 +01:00
parent 9e8a944a2a
commit a229db0dce
1 changed files with 68 additions and 50 deletions

View File

@ -410,13 +410,11 @@ class RobustUnpacker():
self.validator = validator
self._buffered_data = []
self._resync = False
self._skip = 0
self._unpacker = msgpack.Unpacker(object_hook=StableDict)
def resync(self):
self._buffered_data = []
self._resync = True
self._skip = 0
def feed(self, data):
if self._resync:
@ -429,24 +427,24 @@ class RobustUnpacker():
def __next__(self):
if self._resync:
data = b''.join(self._buffered_data)
while self._resync:
data = b''.join(self._buffered_data)[self._skip:]
if not data:
raise StopIteration
temp_unpacker = msgpack.Unpacker()
temp_unpacker.feed(data)
for item in temp_unpacker:
self._unpacker = msgpack.Unpacker(object_hook=StableDict)
self._unpacker.feed(data)
try:
item = next(self._unpacker)
if self.validator(item):
self._resync = False
self._unpacker = msgpack.Unpacker(object_hook=StableDict)
self.feed(b''.join(self._buffered_data)[self._skip:])
return self._unpacker.__next__()
else:
self._skip += 1
else:
raise StopIteration
return item
# Ignore exceptions that might be raised when feeding
# msgpack with invalid data
except (TypeError, ValueError, StopIteration):
pass
data = data[1:]
else:
return self._unpacker.__next__()
return next(self._unpacker)
class ArchiveChecker:
@ -460,7 +458,26 @@ class ArchiveChecker:
def __del__(self):
shutil.rmtree(self.tmpdir)
def check(self, repository, progress=True, repair=False):
self.report_progress('Starting archive consistency check...')
self.repair = repair
self.progress = progress
self.repository = repository
self.init_chunks()
self.key = self.identify_key(repository)
if not Manifest.MANIFEST_ID in self.chunks:
self.manifest = self.rebuild_manifest()
else:
self.manifest, _ = Manifest.load(repository, key=self.key)
self.rebuild_refcounts()
self.verify_chunks()
if not self.error_found:
self.report_progress('Archive consistency check complete, no problems found.')
return self.repair or not self.error_found
def init_chunks(self):
"""Fetch a list of all object keys from repository
"""
# Explicity set the initial hash table capacity to avoid performance issues
# due to hash table "resonance"
capacity = int(len(self.repository) * 1.2)
@ -486,6 +503,10 @@ class ArchiveChecker:
return key_factory(repository, cdata)
def rebuild_manifest(self):
"""Rebuild the manifest object if it is missing
Iterates through all objects in the repository looking for archive metadata blocks.
"""
self.report_progress('Rebuilding missing manifest, this might take some time...', error=True)
manifest = Manifest(self.key, self.repository)
for chunk_id, _ in self.chunks.iteritems():
@ -501,42 +522,15 @@ class ArchiveChecker:
self.report_progress('Manifest rebuild complete', error=True)
return manifest
def check(self, repository, progress=True, repair=False):
self.report_progress('Starting archive consistency check...')
self.repair = repair
self.progress = progress
self.repository = repository
self.init_chunks()
self.key = self.identify_key(repository)
if not Manifest.MANIFEST_ID in self.chunks:
self.manifest = self.rebuild_manifest()
else:
self.manifest, _ = Manifest.load(repository, key=self.key)
self.rebuild_chunks()
self.verify_chunks()
if not self.error_found:
self.report_progress('Archive consistency check complete, no problems found.')
return self.repair or not self.error_found
def rebuild_refcounts(self):
"""Rebuild object reference counts by walking the metadata
def verify_chunks(self):
unused = set()
for id_, (count, size, csize) in self.chunks.iteritems():
if count == 0:
unused.add(id_)
orphaned = unused - self.possibly_superseded
if orphaned:
self.report_progress('{} orphaned objects found'.format(len(orphaned)), error=True)
if self.repair:
for id_ in unused:
self.repository.delete(id_)
self.manifest.write()
self.repository.commit()
def rebuild_chunks(self):
Missing and/or incorrect data is repaired when detected
"""
# Exclude the manifest from chunks
del self.chunks[Manifest.MANIFEST_ID]
def record_unused(id_):
def mark_as_possibly_superseded(id_):
if self.chunks.get(id_, (0,))[0] == 0:
self.possibly_superseded.add(id_)
@ -557,6 +551,11 @@ class ArchiveChecker:
self.repository.put(id_, cdata)
def verify_file_chunks(item):
"""Verifies that all file chunks are present
Missing file chunks will be replaced with new chunks of the same
length containing all zeros.
"""
offset = 0
chunk_list = []
for chunk_id, size, csize in item[b'chunks']:
@ -575,6 +574,10 @@ class ArchiveChecker:
item[b'chunks'] = chunk_list
def robust_iterator(archive):
"""Iterates through all archive items
Missing item chunks will be skipped and the msgpack stream will be restarted
"""
unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item)
_state = 0
def missing_chunk_detector(chunk_id):
@ -602,25 +605,40 @@ class ArchiveChecker:
self.report_progress('Archive metadata block is missing', error=True)
del self.manifest.archives[name]
continue
items_buffer = ChunkBuffer(self.key)
items_buffer.write_chunk = add_callback
mark_as_possibly_superseded(archive_id)
cdata = self.repository.get(archive_id)
data = self.key.decrypt(archive_id, cdata)
archive = StableDict(msgpack.unpackb(data))
if archive[b'version'] != 1:
raise Exception('Unknown archive metadata version')
decode_dict(archive, (b'name', b'hostname', b'username', b'time')) # fixme: argv
items_buffer = ChunkBuffer(self.key)
items_buffer.write_chunk = add_callback
for item in robust_iterator(archive):
if b'chunks' in item:
verify_file_chunks(item)
items_buffer.add(item)
items_buffer.flush(flush=True)
for previous_item_id in archive[b'items']:
record_unused(previous_item_id)
mark_as_possibly_superseded(previous_item_id)
archive[b'items'] = items_buffer.chunks
data = msgpack.packb(archive, unicode_errors='surrogateescape')
new_archive_id = self.key.id_hash(data)
cdata = self.key.encrypt(data)
add_reference(new_archive_id, len(data), len(cdata), cdata)
record_unused(archive_id)
info[b'id'] = new_archive_id
def verify_chunks(self):
unused = set()
for id_, (count, size, csize) in self.chunks.iteritems():
if count == 0:
unused.add(id_)
orphaned = unused - self.possibly_superseded
if orphaned:
self.report_progress('{} orphaned objects found'.format(len(orphaned)), error=True)
if self.repair:
for id_ in unused:
self.repository.delete(id_)
self.manifest.write()
self.repository.commit()