diff --git a/borg/archive.py b/borg/archive.py index 9964f5f27..653e2dd1c 100644 --- a/borg/archive.py +++ b/borg/archive.py @@ -754,6 +754,9 @@ def valid_msgpacked_dict(d, keys_serialized): class RobustUnpacker: """A restartable/robust version of the streaming msgpack unpacker """ + class UnpackerCrashed(Exception): + """raise if unpacker crashed""" + def __init__(self, validator, item_keys): super().__init__() self.item_keys = [msgpack.packb(name) for name in item_keys] @@ -776,6 +779,14 @@ class RobustUnpacker: return self def __next__(self): + def unpack_next(): + try: + return next(self._unpacker) + except (TypeError, ValueError) as err: + # transform exceptions that might be raised when feeding + # msgpack with invalid data to a more specific exception + raise self.UnpackerCrashed(str(err)) + if self._resync: data = b''.join(self._buffered_data) while self._resync: @@ -788,17 +799,17 @@ class RobustUnpacker: self._unpacker = msgpack.Unpacker(object_hook=StableDict) self._unpacker.feed(data) try: - item = next(self._unpacker) + item = unpack_next() + except (self.UnpackerCrashed, StopIteration): + # as long as we are resyncing, we also ignore StopIteration + pass + else: if self.validator(item): self._resync = False return item - # Ignore exceptions that might be raised when feeding - # msgpack with invalid data - except (TypeError, ValueError, StopIteration): - pass data = data[1:] else: - return next(self._unpacker) + return unpack_next() class ArchiveChecker: @@ -1017,6 +1028,9 @@ class ArchiveChecker: yield item else: report('Did not get expected metadata dict when unpacking item metadata', chunk_id, i) + except RobustUnpacker.UnpackerCrashed as err: + report('Unpacker crashed while unpacking item metadata, trying to resync...', chunk_id, i) + unpacker.resync() except Exception: report('Exception while unpacking item metadata', chunk_id, i) raise