From 252c1b9802e26be76350dc106cbf36746693c313 Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Fri, 8 Apr 2016 15:38:13 +0200 Subject: [PATCH] Auto-recover from corrupted index/hints file(s) And don't swallow all OSErrors when creating archives. We need to work on that on a more general level... --- borg/hashindex.pyx | 2 +- borg/helpers.py | 4 +++ borg/repository.py | 47 ++++++++++++++++++++++++++++++------ borg/testsuite/repository.py | 42 +++++++++++++++++++++++++++++++- 4 files changed, 86 insertions(+), 9 deletions(-) diff --git a/borg/hashindex.pyx b/borg/hashindex.pyx index a99c0f602..459eed7b0 100644 --- a/borg/hashindex.pyx +++ b/borg/hashindex.pyx @@ -63,7 +63,7 @@ cdef class IndexBase: path = os.fsencode(path) self.index = hashindex_read(path) if not self.index: - raise Exception('hashindex_read failed') + raise RuntimeError('hashindex_read failed') else: self.index = hashindex_init(capacity, self.key_size, self.value_size) if not self.index: diff --git a/borg/helpers.py b/borg/helpers.py index 15c01bb7c..4fa4e4575 100644 --- a/borg/helpers.py +++ b/borg/helpers.py @@ -65,6 +65,10 @@ class ErrorWithTraceback(Error): traceback = True +class InternalOSError(ErrorWithTraceback): + """Error while accessing repository / cache files""" + + class IntegrityError(ErrorWithTraceback): """Data integrity error""" diff --git a/borg/repository.py b/borg/repository.py index 1620c8278..d59466358 100644 --- a/borg/repository.py +++ b/borg/repository.py @@ -15,7 +15,8 @@ import msgpack from .constants import * # NOQA -from .helpers import Error, ErrorWithTraceback, IntegrityError, Location, ProgressIndicatorPercent, bin_to_hex +from .helpers import Error, ErrorWithTraceback, IntegrityError, InternalOSError, Location, ProgressIndicatorPercent, \ + bin_to_hex from .hashindex import NSIndex from .locking import UpgradableLock, LockError, LockErrorT from .lrucache import LRUCache @@ -178,7 +179,7 @@ def get_index_transaction_id(self): else: return None - def get_transaction_id(self): + def check_transaction(self): index_transaction_id = self.get_index_transaction_id() segments_transaction_id = self.io.get_segments_transaction_id() if index_transaction_id is not None and segments_transaction_id is None: @@ -191,6 +192,9 @@ def get_transaction_id(self): else: replay_from = index_transaction_id self.replay_segments(replay_from, segments_transaction_id) + + def get_transaction_id(self): + self.check_transaction() return self.get_index_transaction_id() def break_lock(self): @@ -231,10 +235,23 @@ def commit(self, save_space=False): self.write_index() self.rollback() - def open_index(self, transaction_id): + def open_index(self, transaction_id, auto_recover=True): if transaction_id is None: return NSIndex() - return NSIndex.read((os.path.join(self.path, 'index.%d') % transaction_id).encode('utf-8')) + index_path = (os.path.join(self.path, 'index.%d') % transaction_id).encode('utf-8') + try: + return NSIndex.read(index_path) + except RuntimeError as re: + assert str(re) == 'hashindex_read failed' # everything else means we're in *deep* trouble + # corrupted index file, need to replay segments + os.unlink(os.path.join(self.path, 'hints.%d' % transaction_id)) + os.unlink(os.path.join(self.path, 'index.%d' % transaction_id)) + if not auto_recover: + raise + self.prepare_txn(self.get_transaction_id()) + # don't leave an open transaction around + self.commit() + return self.open_index(self.get_transaction_id()) def prepare_txn(self, transaction_id, do_cleanup=True): self._active_txn = True @@ -247,15 +264,31 @@ def prepare_txn(self, transaction_id, do_cleanup=True): self._active_txn = False raise if not self.index or transaction_id is None: - self.index = self.open_index(transaction_id) + try: + self.index = self.open_index(transaction_id, False) + except RuntimeError: + self.check_transaction() + self.index = self.open_index(transaction_id, False) if transaction_id is None: self.segments = {} # XXX bad name: usage_count_of_segment_x = self.segments[x] self.compact = FreeSpace() # XXX bad name: freeable_space_of_segment_x = self.compact[x] else: if do_cleanup: self.io.cleanup(transaction_id) - with open(os.path.join(self.path, 'hints.%d' % transaction_id), 'rb') as fd: - hints = msgpack.unpack(fd) + try: + with open(os.path.join(self.path, 'hints.%d' % transaction_id), 'rb') as fd: + hints = msgpack.unpack(fd) + except (msgpack.UnpackException, msgpack.ExtraData, FileNotFoundError) as e: + # corrupted or deleted hints file, need to replay segments + if not isinstance(e, FileNotFoundError): + os.unlink(os.path.join(self.path, 'hints.%d' % transaction_id)) + # index must exist at this point + os.unlink(os.path.join(self.path, 'index.%d' % transaction_id)) + self.check_transaction() + self.prepare_txn(transaction_id) + return + except OSError as os_error: + raise InternalOSError from os_error if hints[b'version'] == 1: logger.debug('Upgrading from v1 hints.%d', transaction_id) self.segments = hints[b'segments'] diff --git a/borg/testsuite/repository.py b/borg/testsuite/repository.py index 85f4af457..346711424 100644 --- a/borg/testsuite/repository.py +++ b/borg/testsuite/repository.py @@ -7,7 +7,7 @@ from unittest.mock import patch from ..hashindex import NSIndex -from ..helpers import Location, IntegrityError +from ..helpers import Location, IntegrityError, InternalOSError from ..locking import UpgradableLock, LockFailed from ..remote import RemoteRepository, InvalidRPCMethod, ConnectionClosedWithHint from ..repository import Repository, LoggedIO, MAGIC @@ -270,6 +270,46 @@ def segments_in_repository(): assert segments_in_repository() == 6 +class RepositoryAuxiliaryCorruptionTestCase(RepositoryTestCaseBase): + def setUp(self): + super().setUp() + self.repository.put(b'00000000000000000000000000000000', b'foo') + self.repository.commit() + self.repository.close() + + def do_commit(self): + with self.repository: + self.repository.put(b'00000000000000000000000000000000', b'fox') + self.repository.commit() + + def test_corrupted_hints(self): + with open(os.path.join(self.repository.path, 'hints.0'), 'ab') as fp: + fp.write(b'123456789') + self.do_commit() + + def test_deleted_hints(self): + os.unlink(os.path.join(self.repository.path, 'hints.0')) + self.do_commit() + + def test_unreadable_hints(self): + hints = os.path.join(self.repository.path, 'hints.0') + os.unlink(hints) + os.mkdir(hints) + with self.assert_raises(InternalOSError): + self.do_commit() + + def test_index(self): + with open(os.path.join(self.repository.path, 'index.0'), 'wb') as fp: + fp.write(b'123456789') + self.do_commit() + + def test_index_outside_transaction(self): + with open(os.path.join(self.repository.path, 'index.0'), 'wb') as fp: + fp.write(b'123456789') + with self.repository: + assert len(self.repository) == 1 + + class RepositoryCheckTestCase(RepositoryTestCaseBase): def list_indices(self):