From a229db0dced982e56cadcdccc9e97081a9abfeaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Borgstr=C3=B6m?= Date: Mon, 24 Feb 2014 23:37:21 +0100 Subject: [PATCH] check: More code cleanup --- attic/archive.py | 118 +++++++++++++++++++++++++++-------------------- 1 file changed, 68 insertions(+), 50 deletions(-) diff --git a/attic/archive.py b/attic/archive.py index 1d770084f..d3a387add 100644 --- a/attic/archive.py +++ b/attic/archive.py @@ -410,13 +410,11 @@ def __init__(self, validator): self.validator = validator self._buffered_data = [] self._resync = False - self._skip = 0 self._unpacker = msgpack.Unpacker(object_hook=StableDict) def resync(self): self._buffered_data = [] self._resync = True - self._skip = 0 def feed(self, data): if self._resync: @@ -429,24 +427,24 @@ def __iter__(self): def __next__(self): if self._resync: + data = b''.join(self._buffered_data) while self._resync: - data = b''.join(self._buffered_data)[self._skip:] if not data: raise StopIteration - temp_unpacker = msgpack.Unpacker() - temp_unpacker.feed(data) - for item in temp_unpacker: + self._unpacker = msgpack.Unpacker(object_hook=StableDict) + self._unpacker.feed(data) + try: + item = next(self._unpacker) if self.validator(item): self._resync = False - self._unpacker = msgpack.Unpacker(object_hook=StableDict) - self.feed(b''.join(self._buffered_data)[self._skip:]) - return self._unpacker.__next__() - else: - self._skip += 1 - else: - raise StopIteration + return item + # Ignore exceptions that might be raised when feeding + # msgpack with invalid data + except (TypeError, ValueError, StopIteration): + pass + data = data[1:] else: - return self._unpacker.__next__() + return next(self._unpacker) class ArchiveChecker: @@ -460,7 +458,26 @@ def __init__(self): def __del__(self): shutil.rmtree(self.tmpdir) + def check(self, repository, progress=True, repair=False): + self.report_progress('Starting archive consistency check...') + self.repair = repair + self.progress = progress + self.repository = repository + self.init_chunks() + self.key = self.identify_key(repository) + if not Manifest.MANIFEST_ID in self.chunks: + self.manifest = self.rebuild_manifest() + else: + self.manifest, _ = Manifest.load(repository, key=self.key) + self.rebuild_refcounts() + self.verify_chunks() + if not self.error_found: + self.report_progress('Archive consistency check complete, no problems found.') + return self.repair or not self.error_found + def init_chunks(self): + """Fetch a list of all object keys from repository + """ # Explicity set the initial hash table capacity to avoid performance issues # due to hash table "resonance" capacity = int(len(self.repository) * 1.2) @@ -486,6 +503,10 @@ def identify_key(self, repository): return key_factory(repository, cdata) def rebuild_manifest(self): + """Rebuild the manifest object if it is missing + + Iterates through all objects in the repository looking for archive metadata blocks. + """ self.report_progress('Rebuilding missing manifest, this might take some time...', error=True) manifest = Manifest(self.key, self.repository) for chunk_id, _ in self.chunks.iteritems(): @@ -501,42 +522,15 @@ def rebuild_manifest(self): self.report_progress('Manifest rebuild complete', error=True) return manifest - def check(self, repository, progress=True, repair=False): - self.report_progress('Starting archive consistency check...') - self.repair = repair - self.progress = progress - self.repository = repository - self.init_chunks() - self.key = self.identify_key(repository) - if not Manifest.MANIFEST_ID in self.chunks: - self.manifest = self.rebuild_manifest() - else: - self.manifest, _ = Manifest.load(repository, key=self.key) - self.rebuild_chunks() - self.verify_chunks() - if not self.error_found: - self.report_progress('Archive consistency check complete, no problems found.') - return self.repair or not self.error_found + def rebuild_refcounts(self): + """Rebuild object reference counts by walking the metadata - def verify_chunks(self): - unused = set() - for id_, (count, size, csize) in self.chunks.iteritems(): - if count == 0: - unused.add(id_) - orphaned = unused - self.possibly_superseded - if orphaned: - self.report_progress('{} orphaned objects found'.format(len(orphaned)), error=True) - if self.repair: - for id_ in unused: - self.repository.delete(id_) - self.manifest.write() - self.repository.commit() - - def rebuild_chunks(self): + Missing and/or incorrect data is repaired when detected + """ # Exclude the manifest from chunks del self.chunks[Manifest.MANIFEST_ID] - def record_unused(id_): + def mark_as_possibly_superseded(id_): if self.chunks.get(id_, (0,))[0] == 0: self.possibly_superseded.add(id_) @@ -557,6 +551,11 @@ def add_reference(id_, size, csize, cdata=None): self.repository.put(id_, cdata) def verify_file_chunks(item): + """Verifies that all file chunks are present + + Missing file chunks will be replaced with new chunks of the same + length containing all zeros. + """ offset = 0 chunk_list = [] for chunk_id, size, csize in item[b'chunks']: @@ -575,6 +574,10 @@ def verify_file_chunks(item): item[b'chunks'] = chunk_list def robust_iterator(archive): + """Iterates through all archive items + + Missing item chunks will be skipped and the msgpack stream will be restarted + """ unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item) _state = 0 def missing_chunk_detector(chunk_id): @@ -602,25 +605,40 @@ def missing_chunk_detector(chunk_id): self.report_progress('Archive metadata block is missing', error=True) del self.manifest.archives[name] continue - items_buffer = ChunkBuffer(self.key) - items_buffer.write_chunk = add_callback + mark_as_possibly_superseded(archive_id) cdata = self.repository.get(archive_id) data = self.key.decrypt(archive_id, cdata) archive = StableDict(msgpack.unpackb(data)) if archive[b'version'] != 1: raise Exception('Unknown archive metadata version') decode_dict(archive, (b'name', b'hostname', b'username', b'time')) # fixme: argv + items_buffer = ChunkBuffer(self.key) + items_buffer.write_chunk = add_callback for item in robust_iterator(archive): if b'chunks' in item: verify_file_chunks(item) items_buffer.add(item) items_buffer.flush(flush=True) for previous_item_id in archive[b'items']: - record_unused(previous_item_id) + mark_as_possibly_superseded(previous_item_id) archive[b'items'] = items_buffer.chunks data = msgpack.packb(archive, unicode_errors='surrogateescape') new_archive_id = self.key.id_hash(data) cdata = self.key.encrypt(data) add_reference(new_archive_id, len(data), len(cdata), cdata) - record_unused(archive_id) info[b'id'] = new_archive_id + + def verify_chunks(self): + unused = set() + for id_, (count, size, csize) in self.chunks.iteritems(): + if count == 0: + unused.add(id_) + orphaned = unused - self.possibly_superseded + if orphaned: + self.report_progress('{} orphaned objects found'.format(len(orphaned)), error=True) + if self.repair: + for id_ in unused: + self.repository.delete(id_) + self.manifest.write() + self.repository.commit() +