1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2024-12-25 17:27:31 +00:00
borg/attic/testsuite/archive.py
2014-03-04 21:15:52 +01:00

88 lines
3 KiB
Python

import msgpack
from attic.testsuite import AtticTestCase
from attic.archive import CacheChunkBuffer, RobustUnpacker
from attic.key import PlaintextKey
class MockCache:
def __init__(self):
self.objects = {}
def add_chunk(self, id, data, stats=None):
self.objects[id] = data
return id, len(data), len(data)
class ChunkBufferTestCase(AtticTestCase):
def test(self):
data = [{b'foo': 1}, {b'bar': 2}]
cache = MockCache()
key = PlaintextKey()
chunks = CacheChunkBuffer(cache, key, None)
for d in data:
chunks.add(d)
chunks.flush()
chunks.flush(flush=True)
self.assert_equal(len(chunks.chunks), 2)
unpacker = msgpack.Unpacker()
for id in chunks.chunks:
unpacker.feed(cache.objects[id])
self.assert_equal(data, list(unpacker))
class RobustUnpackerTestCase(AtticTestCase):
def make_chunks(self, items):
return b''.join(msgpack.packb({'path': item}) for item in items)
def _validator(self, value):
return isinstance(value, dict) and value.get(b'path') in (b'foo', b'bar', b'boo', b'baz')
def process(self, input):
unpacker = RobustUnpacker(validator=self._validator)
result = []
for should_sync, chunks in input:
if should_sync:
unpacker.resync()
for data in chunks:
unpacker.feed(data)
for item in unpacker:
result.append(item)
return result
def test_extra_garbage_no_sync(self):
chunks = [(False, [self.make_chunks([b'foo', b'bar'])]),
(False, [b'garbage'] + [self.make_chunks([b'boo', b'baz'])])]
result = self.process(chunks)
self.assert_equal(result, [
{b'path': b'foo'}, {b'path': b'bar'},
103, 97, 114, 98, 97, 103, 101,
{b'path': b'boo'},
{b'path': b'baz'}])
def split(self, left, length):
parts = []
while left:
parts.append(left[:length])
left = left[length:]
return parts
def test_correct_stream(self):
chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 2)
input = [(False, chunks)]
result = self.process(input)
self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'bar'}, {b'path': b'boo'}, {b'path': b'baz'}])
def test_missing_chunk(self):
chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 4)
input = [(False, chunks[:3]), (True, chunks[4:])]
result = self.process(input)
self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'boo'}, {b'path': b'baz'}])
def test_corrupt_chunk(self):
chunks = self.split(self.make_chunks([b'foo', b'bar', b'boo', b'baz']), 4)
input = [(False, chunks[:3]), (True, [b'gar', b'bage'] + chunks[3:])]
result = self.process(input)
self.assert_equal(result, [{b'path': b'foo'}, {b'path': b'boo'}, {b'path': b'baz'}])