From 9e8a944a2ad652d0463be2b36d52e93f8ccc4bca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Borgstr=C3=B6m?= Date: Mon, 24 Feb 2014 22:43:17 +0100 Subject: [PATCH] check: archive metadata recovery improvements --- attic/archive.py | 86 +++++++++++++++++++++++++------------ attic/testsuite/archive.py | 54 ++++++++++++++++++++++- attic/testsuite/archiver.py | 14 +++--- 3 files changed, 120 insertions(+), 34 deletions(-) diff --git a/attic/archive.py b/attic/archive.py index 50e37a901..1d770084f 100644 --- a/attic/archive.py +++ b/attic/archive.py @@ -402,6 +402,53 @@ class Archive: yield Archive(repository, key, manifest, name, cache=cache) +class RobustUnpacker(): + """A restartable/robust version of the streaming msgpack unpacker + """ + def __init__(self, validator): + super(RobustUnpacker, self).__init__() + self.validator = validator + self._buffered_data = [] + self._resync = False + self._skip = 0 + self._unpacker = msgpack.Unpacker(object_hook=StableDict) + + def resync(self): + self._buffered_data = [] + self._resync = True + self._skip = 0 + + def feed(self, data): + if self._resync: + self._buffered_data.append(data) + else: + self._unpacker.feed(data) + + def __iter__(self): + return self + + def __next__(self): + if self._resync: + while self._resync: + data = b''.join(self._buffered_data)[self._skip:] + if not data: + raise StopIteration + temp_unpacker = msgpack.Unpacker() + temp_unpacker.feed(data) + for item in temp_unpacker: + if self.validator(item): + self._resync = False + self._unpacker = msgpack.Unpacker(object_hook=StableDict) + self.feed(b''.join(self._buffered_data)[self._skip:]) + return self._unpacker.__next__() + else: + self._skip += 1 + else: + raise StopIteration + else: + return self._unpacker.__next__() + + class ArchiveChecker: def __init__(self): @@ -527,38 +574,23 @@ class ArchiveChecker: offset += size item[b'chunks'] = chunk_list - def msgpack_resync(data): - data = memoryview(data) - while data: - unpacker = msgpack.Unpacker() - unpacker.feed(data) - item = next(unpacker) - if isinstance(item, dict) and b'path' in item: - return data - data = data[1:] - def robust_iterator(archive): - prev_state = None - state = 0 + unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item) + _state = 0 def missing_chunk_detector(chunk_id): - nonlocal state - if state % 2 != int(not chunk_id in self.chunks): - state += 1 - return state - + nonlocal _state + if _state % 2 != int(not chunk_id in self.chunks): + _state += 1 + return _state for state, items in groupby(archive[b'items'], missing_chunk_detector): - if state != prev_state: - unpacker = msgpack.Unpacker(object_hook=StableDict) - prev_state = state + items = list(items) if state % 2: self.report_progress('Archive metadata damage detected', error=True) - return - items = list(items) - for i, (chunk_id, cdata) in enumerate(zip(items, self.repository.get_many(items))): - data = self.key.decrypt(chunk_id, cdata) - if state and i == 0: - data = msgpack_resync(data) - unpacker.feed(data) + continue + if state > 0: + unpacker.resync() + for chunk_id, cdata in zip(items, self.repository.get_many(items)): + unpacker.feed(self.key.decrypt(chunk_id, cdata)) for item in unpacker: yield item diff --git a/attic/testsuite/archive.py b/attic/testsuite/archive.py index 8a61eb41c..79b639530 100644 --- a/attic/testsuite/archive.py +++ b/attic/testsuite/archive.py @@ -1,6 +1,6 @@ import msgpack from attic.testsuite import AtticTestCase -from attic.archive import CacheChunkBuffer +from attic.archive import CacheChunkBuffer, RobustUnpacker from attic.key import PlaintextKey @@ -30,3 +30,55 @@ class ChunkBufferTestCase(AtticTestCase): for id in chunks.chunks: unpacker.feed(cache.objects[id]) self.assert_equal(data, list(unpacker)) + + +class RobustUnpackerTestCase(AtticTestCase): + + def make_chunks(self, items): + return b''.join(msgpack.packb(item) for item in items) + + def _validator(self, value): + return value in (b'foo', b'bar', b'boo', b'baz') + + def process(self, input): + unpacker = RobustUnpacker(validator=self._validator) + result = [] + for should_sync, chunks in input: + if should_sync: + unpacker.resync() + for data in chunks: + unpacker.feed(data) + for item in unpacker: + result.append(item) + return result + + def test_extra_garbage_no_sync(self): + chunks = [(False, [self.make_chunks([b'foo', b'bar'])]), + (False, [b'garbage'] + [self.make_chunks([b'boo', b'baz'])])] + result = self.process(chunks) + self.assert_equal(result, [b'foo', b'bar', 103, 97, 114, 98, 97, 103, 101, b'boo', b'baz']) + + def split(self, left, length): + parts = [] + while left: + parts.append(left[:length]) + left = left[length:] + return parts + + def test_correct_stream(self): + chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 2) + input = [(False, chunks)] + result = self.process(input) + self.assert_equal(result, [b'foo', b'bar', b'boo', b'baz']) + + def test_missing_chunk(self): + chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 2) + input = [(False, chunks[:3]), (True, chunks[4:])] + result = self.process(input) + self.assert_equal(result, [b'foo', b'boo', b'baz']) + + def test_corrupt_chunk(self): + chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 2) + input = [(False, chunks[:3]), (True, [b'gar', b'bage'] + chunks[3:])] + result = self.process(input) + self.assert_equal(result, [b'foo', b'boo', b'baz']) diff --git a/attic/testsuite/archiver.py b/attic/testsuite/archiver.py index 90eea3118..875df8d02 100644 --- a/attic/testsuite/archiver.py +++ b/attic/testsuite/archiver.py @@ -9,12 +9,13 @@ import time import unittest from hashlib import sha256 from attic import xattr -from attic.archive import Archive +from attic.archive import Archive, ChunkBuffer from attic.archiver import Archiver +from attic.crypto import bytes_to_long, num_aes_blocks from attic.helpers import Manifest from attic.repository import Repository from attic.testsuite import AtticTestCase -from attic.crypto import bytes_to_long, num_aes_blocks +from attic.testsuite.mock import patch try: import llfuse @@ -328,9 +329,10 @@ class ArchiverCheckTestCase(ArchiverTestCaseBase): def setUp(self): super(ArchiverCheckTestCase, self).setUp() - self.attic('init', self.repository_location) - self.create_src_archive('archive1') - self.create_src_archive('archive2') + with patch.object(ChunkBuffer, 'BUFFER_SIZE', 10): + self.attic('init', self.repository_location) + self.create_src_archive('archive1') + self.create_src_archive('archive2') def open_archive(self, name): repository = Repository(self.repository_path) @@ -351,7 +353,7 @@ class ArchiverCheckTestCase(ArchiverTestCaseBase): def test_missing_archive_item_chunk(self): archive, repository = self.open_archive('archive1') - repository.delete(archive.metadata[b'items'][-1]) + repository.delete(archive.metadata[b'items'][-5]) repository.commit() self.attic('check', self.repository_location, exit_code=1) self.attic('check', '--repair', self.repository_location, exit_code=0)