From 32e773c15d04b9727860977d6f7888525186aacd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Borgstr=C3=B6m?= Date: Sun, 16 Feb 2014 22:21:18 +0100 Subject: [PATCH] Implemented archive metadata checking and repair --- attic/archive.py | 230 ++++++++++++++++++++++++++++++++++-- attic/archiver.py | 21 ++-- attic/cache.py | 6 +- attic/helpers.py | 11 +- attic/repository.py | 3 +- attic/testsuite/archive.py | 4 +- attic/testsuite/archiver.py | 71 ++++++++++- 7 files changed, 310 insertions(+), 36 deletions(-) diff --git a/attic/archive.py b/attic/archive.py index 4ed9a1ab3..508538bb9 100644 --- a/attic/archive.py +++ b/attic/archive.py @@ -1,6 +1,10 @@ +from binascii import hexlify from datetime import datetime, timedelta, timezone from getpass import getuser -from itertools import zip_longest +from itertools import groupby +import shutil +import tempfile +from attic.key import key_factory import msgpack import os import socket @@ -10,8 +14,9 @@ import time from io import BytesIO from attic import xattr from attic.chunker import chunkify +from attic.hashindex import ChunkIndex from attic.helpers import Error, uid2user, user2uid, gid2group, group2gid, \ - Statistics, decode_dict, st_mtime_ns, make_path_safe + Manifest, Statistics, decode_dict, st_mtime_ns, make_path_safe ITEMS_BUFFER = 1024 * 1024 CHUNK_MIN = 1024 @@ -44,23 +49,26 @@ class DownloadPipeline: yield item def fetch_many(self, ids, is_preloaded=False): - for id_, data in zip_longest(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)): + for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)): yield self.key.decrypt(id_, data) class ChunkBuffer: BUFFER_SIZE = 1 * 1024 * 1024 - def __init__(self, cache, key, stats): + def __init__(self, key): self.buffer = BytesIO() self.packer = msgpack.Packer(unicode_errors='surrogateescape') - self.cache = cache self.chunks = [] self.key = key - self.stats = stats def add(self, item): self.buffer.write(self.packer.pack(item)) + if self.is_full(): + self.flush() + + def write_chunk(self, chunk): + raise NotImplementedError def flush(self, flush=False): if self.buffer.tell() == 0: @@ -72,8 +80,7 @@ class ChunkBuffer: # Leave the last parital chunk in the buffer unless flush is True end = None if flush or len(chunks) == 1 else -1 for chunk in chunks[:end]: - id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats) - self.chunks.append(id_) + self.chunks.append(self.write_chunk(chunk)) if end == -1: self.buffer.write(chunks[-1]) @@ -81,6 +88,18 @@ class ChunkBuffer: return self.buffer.tell() > self.BUFFER_SIZE +class CacheChunkBuffer(ChunkBuffer): + + def __init__(self, cache, key, stats): + super(CacheChunkBuffer, self).__init__(key) + self.cache = cache + self.stats = stats + + def write_chunk(self, chunk): + id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats) + return id_ + + class Archive: class DoesNotExist(Error): @@ -101,7 +120,7 @@ class Archive: self.name = name self.checkpoint_interval = checkpoint_interval self.numeric_owner = numeric_owner - self.items_buffer = ChunkBuffer(self.cache, self.key, self.stats) + self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats) self.pipeline = DownloadPipeline(self.repository, self.key) if create: if name in manifest.archives: @@ -148,8 +167,6 @@ class Archive: if now - self.last_checkpoint > self.checkpoint_interval: self.last_checkpoint = now self.write_checkpoint() - if self.items_buffer.is_full(): - self.items_buffer.flush() def write_checkpoint(self): self.save(self.checkpoint_name) @@ -192,7 +209,7 @@ class Archive: cache.begin_txn() stats = Statistics() add(self.id) - for id, chunk in zip_longest(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])): + for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])): add(id) unpacker.feed(self.key.decrypt(id, chunk)) for item in unpacker: @@ -306,7 +323,7 @@ class Archive: def delete(self, cache): unpacker = msgpack.Unpacker(use_list=False) - for id_, data in zip_longest(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])): + for id_, data in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])): unpacker.feed(self.key.decrypt(id_, data)) self.cache.chunk_decref(id_) for item in unpacker: @@ -388,3 +405,190 @@ class Archive: def list_archives(repository, key, manifest, cache=None): for name, info in manifest.archives.items(): yield Archive(repository, key, manifest, name, cache=cache) + + +class ArchiveChecker: + + def __init__(self): + self.error_found = False + self.progress = True + self.possibly_superseded = set() + self.tmpdir = tempfile.mkdtemp() + + def __del__(self): + shutil.rmtree(self.tmpdir) + + def init_chunks(self): + self.chunks = ChunkIndex.create(os.path.join(self.tmpdir, 'chunks').encode('utf-8')) + marker = None + while True: + result = self.repository.list(limit=10000, marker=marker) + if not result: + break + marker = result[-1] + for id_ in result: + self.chunks[id_] = (0, 0, 0) + + def report_progress(self, msg, error=False): + if error: + self.error_found = True + if error or self.progress: + print(msg, file=sys.stderr) + sys.stderr.flush() + + def identify_key(self, repository): + cdata = repository.get(next(self.chunks.iteritems())[0]) + return key_factory(repository, cdata) + + def rebuild_manifest(self): + self.report_progress('Rebuilding missing manifest, this might take some time...', error=True) + manifest = Manifest(self.key, self.repository) + for chunk_id, _ in self.chunks.iteritems(): + cdata = self.repository.get(chunk_id) + data = self.key.decrypt(chunk_id, cdata) + try: + archive = msgpack.unpackb(data) + except: + continue + if isinstance(archive, dict) and b'items' in archive and b'cmdline' in archive: + self.report_progress('Found archive ' + archive[b'name'].decode('utf-8'), error=True) + manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']} + self.report_progress('Manifest rebuild complete', error=True) + return manifest + + def check(self, repository, progress=True, repair=False): + self.report_progress('Starting archive consistency check...') + self.repair = repair + self.progress = progress + self.repository = repository + self.init_chunks() + self.key = self.identify_key(repository) + if not Manifest.MANIFEST_ID in self.chunks: + self.manifest = self.rebuild_manifest() + else: + self.manifest, _ = Manifest.load(repository) + self.rebuild_chunks() + self.verify_chunks() + if not self.error_found: + self.report_progress('Archive consistency check complete, no errors found.') + return self.repair or not self.error_found + + def verify_chunks(self): + unused = set() + for id_, (count, size, csize) in self.chunks.iteritems(): + if count == 0: + unused.add(id_) + unexpected = unused - self.possibly_superseded + if unexpected: + self.report_progress('{} excessive objects found'.format(len(unexpected)), error=True) + if self.repair: + for id_ in unused: + self.repository.delete(id_) + self.manifest.write() + self.repository.commit() + + def rebuild_chunks(self): + # Exclude the manifest from chunks + del self.chunks[Manifest.MANIFEST_ID] + def record_unused(id_): + if self.chunks.get(id_, (0,))[0] == 0: + self.possibly_superseded.add(id_) + + def add_callback(chunk): + id_ = self.key.id_hash(chunk) + cdata = self.key.encrypt(chunk) + add_reference(id_, len(chunk), len(cdata), cdata) + return id_ + + def add_reference(id_, size, csize, cdata=None): + try: + count, _, _ = self.chunks[id_] + self.chunks[id_] = count + 1, size, csize + except KeyError: + assert cdata is not None + self.chunks[id_] = 1, size, csize + if self.repair: + self.repository.put(id_, cdata) + + def verify_file_chunks(item): + offset = 0 + chunk_list = [] + for chunk_id, size, csize in item[b'chunks']: + if not chunk_id in self.chunks: + # If a file chunk is missing, create an all empty replacement chunk + self.report_progress('{}: Missing file chunk detected (Byte {}-{})'.format(item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size), error=True) + data = bytes(size) + chunk_id = self.key.id_hash(data) + cdata = self.key.encrypt(data) + csize = len(cdata) + add_reference(chunk_id, size, csize, cdata) + else: + add_reference(chunk_id, size, csize) + chunk_list.append((chunk_id, size, csize)) + offset += size + item[b'chunks'] = chunk_list + + def msgpack_resync(data): + data = memoryview(data) + while data: + unpacker = msgpack.Unpacker() + unpacker.feed(data) + item = next(unpacker) + if isinstance(item, dict) and b'path' in item: + return data + data = data[1:] + + def robust_iterator(archive): + prev_state = None + state = 0 + def missing_chunk_detector(chunk_id): + nonlocal state + if state % 2 != int(not chunk_id in self.chunks): + state += 1 + return state + + for state, items in groupby(archive[b'items'], missing_chunk_detector): + if state != prev_state: + unpacker = msgpack.Unpacker() + prev_state = state + if state % 2: + self.report_progress('Archive metadata damage detected', error=True) + return + items = list(items) + for i, (chunk_id, cdata) in enumerate(zip(items, self.repository.get_many(items))): + data = self.key.decrypt(chunk_id, cdata) + if state and i == 0: + data = msgpack_resync(data) + unpacker.feed(data) + for item in unpacker: + yield item + + for name, info in list(self.manifest.archives.items()): + self.report_progress('Analyzing archive: ' + name) + archive_id = info[b'id'] + if not archive_id in self.chunks: + self.report_progress('Archive metadata block is missing', error=True) + del self.manifest.archives[name] + continue + items_buffer = ChunkBuffer(self.key) + items_buffer.write_chunk = add_callback + cdata = self.repository.get(archive_id) + data = self.key.decrypt(archive_id, cdata) + archive = msgpack.unpackb(data) + if archive[b'version'] != 1: + raise Exception('Unknown archive metadata version') + decode_dict(archive, (b'name', b'hostname', b'username', b'time')) # fixme: argv + for item in robust_iterator(archive): + if b'chunks' in item: + verify_file_chunks(item) + items_buffer.add(item) + items_buffer.flush(flush=True) + for previous_item_id in archive[b'items']: + record_unused(previous_item_id) + archive[b'items'] = items_buffer.chunks + data = msgpack.packb(archive, unicode_errors='surrogateescape') + new_archive_id = self.key.id_hash(data) + cdata = self.key.encrypt(data) + add_reference(new_archive_id, len(data), len(cdata), cdata) + record_unused(archive_id) + info[b'id'] = new_archive_id diff --git a/attic/archiver.py b/attic/archiver.py index a4a828f82..d7060e649 100644 --- a/attic/archiver.py +++ b/attic/archiver.py @@ -7,7 +7,7 @@ import stat import sys from attic import __version__ -from attic.archive import Archive +from attic.archive import Archive, ArchiveChecker from attic.repository import Repository from attic.cache import Cache from attic.key import key_creator @@ -53,8 +53,7 @@ class Archiver: print('Initializing repository at "%s"' % args.repository.orig) repository = self.open_repository(args.repository, create=True) key = key_creator(repository, args) - manifest = Manifest() - manifest.repository = repository + manifest = Manifest(key, repository) manifest.key = key manifest.write() repository.commit() @@ -65,10 +64,9 @@ class Archiver: """ repository = self.open_repository(args.repository) if args.repair: - while True: - self.print_error("""Warning: check --repair is an experimental feature that might result -in data loss. Checking and repairing archive metadata consistency is not yet -supported so some types of corruptions will be undetected and not repaired. + while not os.environ.get('ATTIC_CHECK_I_KWOW_WHAT_I_AM_DOING'): + self.print_error("""Warning: 'check --repair' is an experimental feature that might result +in data loss. Type "Yes I am sure" if you understand this and want to continue.\n""") if input('Do you want to continue? ') == 'Yes I am sure': @@ -76,8 +74,11 @@ Type "Yes I am sure" if you understand this and want to continue.\n""") if args.progress is None: args.progress = sys.stdout.isatty() or args.verbose if not repository.check(progress=args.progress, repair=args.repair): - self.exit_code = 1 - return self.exit_code + return 1 + + if not ArchiveChecker().check(repository, progress=args.progress, repair=args.repair): + return 1 + return 0 def do_change_passphrase(self, args): """Change repository key file passphrase @@ -85,7 +86,7 @@ Type "Yes I am sure" if you understand this and want to continue.\n""") repository = self.open_repository(args.repository) manifest, key = Manifest.load(repository) key.change_passphrase() - return self.exit_code + return 0 def do_create(self, args): """Create new archive diff --git a/attic/cache.py b/attic/cache.py index 5f3e08c19..62c4b725c 100644 --- a/attic/cache.py +++ b/attic/cache.py @@ -16,17 +16,17 @@ class Cache(object): class RepositoryReplay(Error): """Cache is newer than repository, refusing to continue""" - def __init__(self, repository, key, manifest): + def __init__(self, repository, key, manifest, path=None, sync=True): self.timestamp = None self.txn_active = False self.repository = repository self.key = key self.manifest = manifest - self.path = os.path.join(get_cache_dir(), hexlify(repository.id).decode('ascii')) + self.path = path or os.path.join(get_cache_dir(), hexlify(repository.id).decode('ascii')) if not os.path.exists(self.path): self.create() self.open() - if self.manifest.id != self.manifest_id: + if sync and self.manifest.id != self.manifest_id: # If repository is older than the cache something fishy is going on if self.timestamp and self.timestamp > manifest.timestamp: raise self.RepositoryReplay() diff --git a/attic/helpers.py b/attic/helpers.py index 7287704da..2f805ebed 100644 --- a/attic/helpers.py +++ b/attic/helpers.py @@ -56,17 +56,18 @@ class Manifest: MANIFEST_ID = b'\0' * 32 - def __init__(self): + def __init__(self, key, repository): self.archives = {} self.config = {} + self.key = key + self.repository = repository @classmethod def load(cls, repository): from .key import key_factory - manifest = cls() - manifest.repository = repository - cdata = repository.get(manifest.MANIFEST_ID) - manifest.key = key = key_factory(repository, cdata) + cdata = repository.get(cls.MANIFEST_ID) + key = key_factory(repository, cdata) + manifest = cls(key, repository) data = key.decrypt(None, cdata) manifest.id = key.id_hash(data) m = msgpack.unpackb(data) diff --git a/attic/repository.py b/attic/repository.py index e171a20f5..5db0afa35 100644 --- a/attic/repository.py +++ b/attic/repository.py @@ -202,6 +202,7 @@ class Repository(object): sys.stderr.flush() assert not self._active_txn + report_progress('Starting repository check...') index_transaction_id = self.get_index_transaction_id() segments_transaction_id = self.io.get_segments_transaction_id(index_transaction_id) if index_transaction_id is None and segments_transaction_id is None: @@ -271,7 +272,7 @@ class Repository(object): if current_index and len(current_index) != len(self.index): report_progress('Index object count mismatch. {} != {}'.format(len(current_index), len(self.index)), error=True) if not error_found: - report_progress('Check complete, no errors found.') + report_progress('Repository check complete, no errors found.') if repair: self.write_index() else: diff --git a/attic/testsuite/archive.py b/attic/testsuite/archive.py index 25e8bb64e..8a61eb41c 100644 --- a/attic/testsuite/archive.py +++ b/attic/testsuite/archive.py @@ -1,6 +1,6 @@ import msgpack from attic.testsuite import AtticTestCase -from attic.archive import ChunkBuffer +from attic.archive import CacheChunkBuffer from attic.key import PlaintextKey @@ -20,7 +20,7 @@ class ChunkBufferTestCase(AtticTestCase): data = [{b'foo': 1}, {b'bar': 2}] cache = MockCache() key = PlaintextKey() - chunks = ChunkBuffer(cache, key, None) + chunks = CacheChunkBuffer(cache, key, None) for d in data: chunks.add(d) chunks.flush() diff --git a/attic/testsuite/archiver.py b/attic/testsuite/archiver.py index 8f8945549..72ef59864 100644 --- a/attic/testsuite/archiver.py +++ b/attic/testsuite/archiver.py @@ -9,7 +9,9 @@ import time import unittest from hashlib import sha256 from attic import xattr +from attic.archive import Archive from attic.archiver import Archiver +from attic.helpers import Manifest from attic.repository import Repository from attic.testsuite import AtticTestCase from attic.crypto import bytes_to_long, num_aes_blocks @@ -20,7 +22,7 @@ try: except ImportError: has_llfuse = False -src_dir = os.path.join(os.getcwd(), os.path.dirname(__file__), '..', '..') +src_dir = os.path.join(os.getcwd(), os.path.dirname(__file__), '..') class changedir: @@ -35,11 +37,12 @@ class changedir: os.chdir(self.old) -class ArchiverTestCase(AtticTestCase): +class ArchiverTestCaseBase(AtticTestCase): prefix = '' def setUp(self): + os.environ['ATTIC_CHECK_I_KWOW_WHAT_I_AM_DOING'] = '1' self.archiver = Archiver() self.tmpdir = tempfile.mkdtemp() self.repository_path = os.path.join(self.tmpdir, 'repository') @@ -96,6 +99,9 @@ class ArchiverTestCase(AtticTestCase): def create_src_archive(self, name): self.attic('create', self.repository_location + '::' + name, src_dir) + +class ArchiverTestCase(ArchiverTestCaseBase): + def create_regual_file(self, name, size=0): filename = os.path.join(self.input_path, name) if not os.path.exists(os.path.dirname(filename)): @@ -299,6 +305,67 @@ class ArchiverTestCase(AtticTestCase): self.verify_aes_counter_uniqueness('passphrase') +class ArchiverCheckTestCase(ArchiverTestCaseBase): + + def setUp(self): + super(ArchiverCheckTestCase, self).setUp() + self.attic('init', self.repository_location) + self.create_src_archive('archive1') + self.create_src_archive('archive2') + + def open_archive(self, name): + repository = Repository(self.repository_path) + manifest, key = Manifest.load(repository) + archive = Archive(repository, key, manifest, name) + return archive, repository + + def test_missing_file_chunk(self): + archive, repository = self.open_archive('archive1') + for item in archive.iter_items(): + if item[b'path'].endswith('testsuite/archiver.py'): + repository.delete(item[b'chunks'][-1][0]) + break + repository.commit() + self.attic('check', self.repository_location, exit_code=1) + self.attic('check', '--repair', self.repository_location, exit_code=0) + self.attic('check', self.repository_location, exit_code=0) + + def test_missing_archive_item_chunk(self): + archive, repository = self.open_archive('archive1') + repository.delete(archive.metadata[b'items'][-1]) + repository.commit() + self.attic('check', self.repository_location, exit_code=1) + self.attic('check', '--repair', self.repository_location, exit_code=0) + self.attic('check', self.repository_location, exit_code=0) + + def test_missing_archive_metadata(self): + archive, repository = self.open_archive('archive1') + repository.delete(archive.id) + repository.commit() + self.attic('check', self.repository_location, exit_code=1) + self.attic('check', '--repair', self.repository_location, exit_code=0) + self.attic('check', self.repository_location, exit_code=0) + + def test_missing_manifest(self): + archive, repository = self.open_archive('archive1') + repository.delete(Manifest.MANIFEST_ID) + repository.commit() + self.attic('check', self.repository_location, exit_code=1) + self.attic('check', '--repair', '--progress', self.repository_location, exit_code=0) + self.attic('check', '--progress', self.repository_location, exit_code=0) + + def test_extra_chunks(self): + self.attic('check', self.repository_location, exit_code=0) + repository = Repository(self.repository_location) + repository.put(b'01234567890123456789012345678901', b'xxxx') + repository.commit() + repository.close() + self.attic('check', self.repository_location, exit_code=1) + self.attic('check', self.repository_location, exit_code=1) + self.attic('check', '--repair', self.repository_location, exit_code=0) + self.attic('check', self.repository_location, exit_code=0) + self.attic('verify', self.repository_location + '::archive1', exit_code=0) + class RemoteArchiverTestCase(ArchiverTestCase): prefix = '__testsuite__:'