# Note: these tests are part of the self test, do not use or import pytest functionality here. # See borg.selftest for details. If you add/remove test methods, update SELFTEST_COUNT from io import BytesIO from ..chunker import ChunkerFixed, Chunker, get_chunker, buzhash, buzhash_update from ..constants import * # NOQA from . import BaseTestCase def cf(chunks): """chunk filter""" # this is to simplify testing: either return the data piece (bytes) or the hole length (int). def _cf(chunk): if chunk.meta["allocation"] == CH_DATA: assert len(chunk.data) == chunk.meta["size"] return bytes(chunk.data) # make sure we have bytes, not memoryview if chunk.meta["allocation"] in (CH_HOLE, CH_ALLOC): assert chunk.data is None return chunk.meta["size"] assert False, "unexpected allocation value" return [_cf(chunk) for chunk in chunks] class ChunkerFixedTestCase(BaseTestCase): def test_chunkify_just_blocks(self): data = b"foobar" * 1500 chunker = ChunkerFixed(4096) parts = cf(chunker.chunkify(BytesIO(data))) self.assert_equal(parts, [data[0:4096], data[4096:8192], data[8192:]]) def test_chunkify_header_and_blocks(self): data = b"foobar" * 1500 chunker = ChunkerFixed(4096, 123) parts = cf(chunker.chunkify(BytesIO(data))) self.assert_equal( parts, [data[0:123], data[123 : 123 + 4096], data[123 + 4096 : 123 + 8192], data[123 + 8192 :]] ) def test_chunkify_just_blocks_fmap_complete(self): data = b"foobar" * 1500 chunker = ChunkerFixed(4096) fmap = [(0, 4096, True), (4096, 8192, True), (8192, 99999999, True)] parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap)) self.assert_equal(parts, [data[0:4096], data[4096:8192], data[8192:]]) def test_chunkify_header_and_blocks_fmap_complete(self): data = b"foobar" * 1500 chunker = ChunkerFixed(4096, 123) fmap = [(0, 123, True), (123, 4096, True), (123 + 4096, 4096, True), (123 + 8192, 4096, True)] parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap)) self.assert_equal( parts, [data[0:123], data[123 : 123 + 4096], data[123 + 4096 : 123 + 8192], data[123 + 8192 :]] ) def test_chunkify_header_and_blocks_fmap_zeros(self): data = b"H" * 123 + b"_" * 4096 + b"X" * 4096 + b"_" * 4096 chunker = ChunkerFixed(4096, 123) fmap = [(0, 123, True), (123, 4096, False), (123 + 4096, 4096, True), (123 + 8192, 4096, False)] parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap)) # because we marked the '_' ranges as holes, we will get hole ranges instead! self.assert_equal(parts, [data[0:123], 4096, data[123 + 4096 : 123 + 8192], 4096]) def test_chunkify_header_and_blocks_fmap_partial(self): data = b"H" * 123 + b"_" * 4096 + b"X" * 4096 + b"_" * 4096 chunker = ChunkerFixed(4096, 123) fmap = [ (0, 123, True), # (123, 4096, False), (123 + 4096, 4096, True), # (123+8192, 4096, False), ] parts = cf(chunker.chunkify(BytesIO(data), fmap=fmap)) # because we left out the '_' ranges from the fmap, we will not get them at all! self.assert_equal(parts, [data[0:123], data[123 + 4096 : 123 + 8192]]) class ChunkerTestCase(BaseTestCase): def test_chunkify(self): data = b"0" * int(1.5 * (1 << CHUNK_MAX_EXP)) + b"Y" parts = cf(Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(data))) self.assert_equal(len(parts), 2) self.assert_equal(b"".join(parts), data) self.assert_equal(cf(Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b""))), []) self.assert_equal( cf(Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b"foobarboobaz" * 3))), [b"fooba", b"rboobaz", b"fooba", b"rboobaz", b"fooba", b"rboobaz"], ) self.assert_equal( cf(Chunker(1, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b"foobarboobaz" * 3))), [b"fo", b"obarb", b"oob", b"azf", b"oobarb", b"oob", b"azf", b"oobarb", b"oobaz"], ) self.assert_equal( cf(Chunker(2, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b"foobarboobaz" * 3))), [b"foob", b"ar", b"boobazfoob", b"ar", b"boobazfoob", b"ar", b"boobaz"], ) self.assert_equal( cf(Chunker(0, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), [b"foobarboobaz" * 3] ) self.assert_equal( cf(Chunker(1, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), [b"foobar", b"boobazfo", b"obar", b"boobazfo", b"obar", b"boobaz"], ) self.assert_equal( cf(Chunker(2, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), [b"foob", b"arboobaz", b"foob", b"arboobaz", b"foob", b"arboobaz"], ) self.assert_equal( cf(Chunker(0, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), [b"foobarboobaz" * 3] ) self.assert_equal( cf(Chunker(1, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), [b"foobarbo", b"obazfoobar", b"boobazfo", b"obarboobaz"], ) self.assert_equal( cf(Chunker(2, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), [b"foobarboobaz", b"foobarboobaz", b"foobarboobaz"], ) def test_buzhash(self): self.assert_equal(buzhash(b"abcdefghijklmnop", 0), 3795437769) self.assert_equal(buzhash(b"abcdefghijklmnop", 1), 3795400502) self.assert_equal( buzhash(b"abcdefghijklmnop", 1), buzhash_update(buzhash(b"Xabcdefghijklmno", 1), ord("X"), ord("p"), 16, 1) ) # Test with more than 31 bytes to make sure our barrel_shift macro works correctly self.assert_equal(buzhash(b"abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz", 0), 566521248) def test_small_reads(self): class SmallReadFile: input = b"a" * (20 + 1) def read(self, nbytes): self.input = self.input[:-1] return self.input[:1] chunker = get_chunker(*CHUNKER_PARAMS, seed=0) reconstructed = b"".join(cf(chunker.chunkify(SmallReadFile()))) assert reconstructed == b"a" * 20