1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2025-01-01 12:45:34 +00:00

parameterized compression tests

This commit is contained in:
bigtedde 2023-05-29 16:31:56 -07:00
parent 87b74f3b0d
commit e0f6685498

View file

@ -2,138 +2,96 @@
import os
import zlib
try:
import lzma
except ImportError:
lzma = None
import pytest
from ..compress import get_compressor, Compressor, CompressionSpec, CNONE, ZLIB, LZ4, LZMA, ZSTD, Auto
buffer = bytes(2**16)
data = b"fooooooooobaaaaaaaar" * 10
DATA = b"fooooooooobaaaaaaaar" * 10
params = dict(name="zlib", level=6)
def test_get_compressor():
c = get_compressor(name="none")
assert isinstance(c, CNONE)
c = get_compressor(name="lz4")
assert isinstance(c, LZ4)
c = get_compressor(name="zlib")
assert isinstance(c, ZLIB)
with pytest.raises(KeyError):
get_compressor(name="foobar")
@pytest.mark.parametrize(
"c_type, expected_compressor",
[("none", CNONE), ("lz4", LZ4), ("zlib", ZLIB), ("lzma", LZMA), ("zstd", ZSTD), ("foobar", None)],
)
def test_get_compressor(c_type, expected_compressor):
if expected_compressor is not None:
compressor = get_compressor(name=c_type)
assert isinstance(compressor, expected_compressor)
else:
with pytest.raises(KeyError):
get_compressor(name=c_type)
def test_cnull():
c = get_compressor(name="none")
meta, cdata = c.compress({}, data)
assert len(cdata) >= len(data)
assert data in cdata # it's not compressed and just in there 1:1
assert data == c.decompress(meta, cdata)[1]
assert data == Compressor(**params).decompress(meta, cdata)[1] # autodetect
def test_lz4():
c = get_compressor(name="lz4")
meta, cdata = c.compress({}, data)
assert len(cdata) < len(data)
assert data == c.decompress(meta, cdata)[1]
assert data == Compressor(**params).decompress(meta, cdata)[1] # autodetect
@pytest.mark.parametrize("c_type", ["none", "lz4", "zlib", "zstd", "lzma"])
def test_compression_types(c_type):
c = get_compressor(name=c_type)
meta, cdata = c.compress({}, DATA)
if c_type == "none":
assert len(cdata) >= len(DATA) # it's not compressed and just in there 1:1
else:
assert len(cdata) < len(DATA)
assert DATA == c.decompress(meta, cdata)[1]
assert DATA == Compressor(**params).decompress(meta, cdata)[1] # autodetect
def test_lz4_buffer_allocation(monkeypatch):
# disable fallback to no compression on incompressible data
monkeypatch.setattr(LZ4, "decide", lambda always_compress: LZ4)
# test with a rather huge data object to see if buffer allocation / resizing works
data = os.urandom(5 * 2**20) * 10 # 50MiB badly compressible data
assert len(data) == 50 * 2**20
incompressible_data = os.urandom(5 * 2**20) * 10 # 50MiB badly compressible data
c = Compressor("lz4")
meta, cdata = c.compress({}, data)
assert len(cdata) >= len(data)
assert data == c.decompress(meta, cdata)[1]
meta, cdata = c.compress({}, incompressible_data)
assert len(incompressible_data) == 50 * 2**20
assert len(cdata) >= len(incompressible_data)
assert incompressible_data == c.decompress(meta, cdata)[1]
def test_zlib():
c = get_compressor(name="zlib")
meta, cdata = c.compress({}, data)
assert len(cdata) < len(data)
assert data == c.decompress(meta, cdata)[1]
assert data == Compressor(**params).decompress(meta, cdata)[1] # autodetect
def test_lzma():
if lzma is None:
pytest.skip("No lzma support found.")
c = get_compressor(name="lzma")
meta, cdata = c.compress({}, data)
assert len(cdata) < len(data)
assert data == c.decompress(meta, cdata)[1]
assert data == Compressor(**params).decompress(meta, cdata)[1] # autodetect
def test_zstd():
c = get_compressor(name="zstd")
meta, cdata = c.compress({}, data)
assert len(cdata) < len(data)
assert data == c.decompress(meta, cdata)[1]
assert data == Compressor(**params).decompress(meta, cdata)[1] # autodetect
def test_autodetect_invalid():
@pytest.mark.parametrize("invalid_cdata", [b"\xff\xfftotalcrap", b"\x08\x00notreallyzlib"])
def test_autodetect_invalid(invalid_cdata):
with pytest.raises(ValueError):
Compressor(**params, legacy_mode=True).decompress({}, b"\xff\xfftotalcrap")
with pytest.raises(ValueError):
Compressor(**params, legacy_mode=True).decompress({}, b"\x08\x00notreallyzlib")
Compressor(**params, legacy_mode=True).decompress({}, invalid_cdata)
def test_zlib_legacy_compat():
# for compatibility reasons, we do not add an extra header for zlib,
# nor do we expect one when decompressing / autodetecting
# nor do we expect one when decompressing / auto-detecting
for level in range(10):
c = get_compressor(name="zlib_legacy", level=level, legacy_mode=True)
meta1, cdata1 = c.compress({}, data)
cdata2 = zlib.compress(data, level)
meta1, cdata1 = c.compress({}, DATA)
cdata2 = zlib.compress(DATA, level)
assert cdata1 == cdata2
meta2, data2 = c.decompress({}, cdata2)
assert data == data2
# _, data2 = Compressor(**params).decompress({}, cdata2)
# assert data == data2
assert DATA == data2
def test_compressor():
params_list = [
@pytest.mark.parametrize(
"c_params",
[
dict(name="none"),
dict(name="lz4"),
dict(name="zstd", level=1),
dict(name="zstd", level=3),
# avoiding high zstd levels, memory needs unclear
dict(name="zstd", level=3), # avoiding high zstd levels, memory needs unclear
dict(name="zlib", level=0),
dict(name="zlib", level=6),
dict(name="zlib", level=9),
]
if lzma:
params_list += [
dict(name="lzma", level=0),
dict(name="lzma", level=6),
# we do not test lzma on level 9 because of the huge memory needs
]
for params in params_list:
c = Compressor(**params)
meta_c, data_compressed = c.compress({}, data)
assert "ctype" in meta_c
assert "clevel" in meta_c
assert meta_c["csize"] == len(data_compressed)
assert meta_c["size"] == len(data)
meta_d, data_decompressed = c.decompress(meta_c, data_compressed)
assert data == data_decompressed
assert "ctype" in meta_d
assert "clevel" in meta_d
assert meta_d["csize"] == len(data_compressed)
assert meta_d["size"] == len(data)
dict(name="lzma", level=0),
dict(name="lzma", level=6), # we do not test lzma on level 9 because of the huge memory needs
],
)
def test_compressor(c_params):
c = Compressor(**c_params)
meta_c, data_compressed = c.compress({}, DATA)
assert "ctype" in meta_c
assert "clevel" in meta_c
assert meta_c["csize"] == len(data_compressed)
assert meta_c["size"] == len(DATA)
meta_d, data_decompressed = c.decompress(meta_c, data_compressed)
assert DATA == data_decompressed
assert "ctype" in meta_d
assert "clevel" in meta_d
assert meta_d["csize"] == len(data_compressed)
assert meta_d["size"] == len(DATA)
def test_auto():
@ -157,60 +115,60 @@ def test_auto():
assert meta["csize"] == len(compressed)
def test_obfuscate():
compressor = CompressionSpec("obfuscate,1,none").compressor
data = bytes(10000)
_, compressed = compressor.compress({}, data)
assert len(data) <= len(compressed) <= len(data) * 101
# compressing 100 times the same data should give at least 50 different result sizes
assert len({len(compressor.compress({}, data)[1]) for i in range(100)}) > 50
cs = CompressionSpec("obfuscate,2,lz4")
assert isinstance(cs.inner.compressor, LZ4)
@pytest.mark.parametrize(
"specs, c_type, result_range, obfuscation_factor",
[
("obfuscate,1,none", CNONE, 50, 10**1),
("obfuscate,2,lz4", LZ4, 10, 10**2),
("obfuscate,6,zstd,3", ZSTD, 90, 10**6),
("obfuscate,2,auto,zstd,10", Auto, 10, 10**2),
],
)
def test_factor_obfuscation(specs, c_type, result_range, obfuscation_factor: int):
# Testing relative random reciprocal size variation, obfuscation spec 1 to 6 inclusive
# obfuscate_factor = 10**(obfuscation spec)
cs = CompressionSpec(specs)
assert isinstance(cs.inner.compressor, c_type)
compressor = cs.compressor
data = bytes(10000)
_, compressed = compressor.compress({}, data)
min_compress, max_compress = 0.2, 0.001 # estimate compression factor outer boundaries
assert max_compress * len(data) <= len(compressed) <= min_compress * len(data) * 1001
if c_type is CNONE: # no compression
assert len(data) <= len(compressed) <= len(data) * (10 * obfuscation_factor) + 1
else: # with compression
min_compress, max_compress = 0.2, 0.001 # estimate compression factor outer boundaries
assert max_compress * len(data) <= len(compressed) <= min_compress * len(data) * (10 * obfuscation_factor) + 1
assert len({len(compressor.compress({}, data)[1]) for i in range(100)}) > result_range
# compressing 100 times the same data should give multiple different result sizes
assert len({len(compressor.compress({}, data)[1]) for i in range(100)}) > 10
cs = CompressionSpec("obfuscate,6,zstd,3")
assert isinstance(cs.inner.compressor, ZSTD)
compressor = cs.compressor
data = bytes(10000)
_, compressed = compressor.compress({}, data)
min_compress, max_compress = 0.2, 0.001 # estimate compression factor outer boundaries
assert max_compress * len(data) <= len(compressed) <= min_compress * len(data) * 10000001
# compressing 100 times the same data should give multiple different result sizes
assert len({len(compressor.compress({}, data)[1]) for i in range(100)}) > 90
cs = CompressionSpec("obfuscate,2,auto,zstd,10")
assert isinstance(cs.inner.compressor, Auto)
@pytest.mark.parametrize(
"specs, c_type, obfuscation_padding",
[
("obfuscate,110,none", CNONE, 2**10), # up to 1KiB padding
("obfuscate,120,lz4", LZ4, 2**20), # up to 1MiB padding
("obfuscate,123,zstd,3", ZSTD, 2**23), # max, up to 8MiB padding
],
)
def test_additive_obfuscation(specs, c_type, obfuscation_padding: int):
# Testing randomly sized padding, obfuscation spec 110 to 123 inclusive
# obfuscate_padding = 2 ** (obfuscation spec - 100)
cs = CompressionSpec(specs)
assert isinstance(cs.inner.compressor, c_type)
compressor = cs.compressor
data = bytes(10000)
_, compressed = compressor.compress({}, data)
min_compress, max_compress = 0.2, 0.001 # estimate compression factor outer boundaries
assert max_compress * len(data) <= len(compressed) <= min_compress * len(data) * 1001
# compressing 100 times the same data should give multiple different result sizes
assert len({len(compressor.compress({}, data)[1]) for i in range(100)}) > 10
cs = CompressionSpec("obfuscate,110,none")
assert isinstance(cs.inner.compressor, CNONE)
compressor = cs.compressor
data = bytes(1000)
_, compressed = compressor.compress({}, data)
assert 1000 <= len(compressed) <= 1000 + 1024
data = bytes(1100)
_, compressed = compressor.compress({}, data)
assert 1100 <= len(compressed) <= 1100 + 1024
data_list = (bytes(1000), bytes(1100))
for data in data_list:
_, compressed = compressor.compress({}, data)
if c_type is CNONE: # no compression
assert len(data) <= len(compressed) <= len(data) + obfuscation_padding
else: # with compression
min_compress, max_compress = 0.2, 0.001 # estimate compression factor outer boundaries
assert max_compress * len(data) <= len(compressed) <= min_compress * len(data) * obfuscation_padding
def test_obfuscate_meta():
compressor = CompressionSpec("obfuscate,3,lz4").compressor
meta = {}
data = bytes(10000)
meta, compressed = compressor.compress(meta, data)
meta, compressed = compressor.compress({}, data)
assert "ctype" in meta
assert meta["ctype"] == LZ4.ID
assert "clevel" in meta
@ -221,51 +179,34 @@ def test_obfuscate_meta():
assert "psize" in meta
psize = meta["psize"]
assert 0 < psize < 100
assert csize - psize >= 0 # there is a obfuscation trailer
assert csize - psize >= 0 # there is an obfuscation trailer
trailer = compressed[psize:]
assert not trailer or set(trailer) == {0} # trailer is all-zero-bytes
def test_compression_specs():
@pytest.mark.parametrize(
"c_type, c_name", [(CNONE, "none"), (LZ4, "lz4"), (ZLIB, "zlib"), (LZMA, "lzma"), (ZSTD, "zstd")]
)
def test_default_compression_level(c_type, c_name):
cs = CompressionSpec(c_name).compressor
assert isinstance(cs, c_type)
if c_type in (ZLIB, LZMA):
assert cs.level == 6
elif c_type is ZSTD:
assert cs.level == 3
@pytest.mark.parametrize(
"c_type, c_name, c_levels", [(ZLIB, "zlib", [0, 9]), (LZMA, "lzma", [0, 9]), (ZSTD, "zstd", [1, 22])]
)
def test_specified_compression_level(c_type, c_name, c_levels):
for level in c_levels:
cs = CompressionSpec(f"{c_name},{level}").compressor
assert isinstance(cs, c_type)
assert cs.level == level
@pytest.mark.parametrize("invalid_spec", ["", "lzma,9,invalid", "invalid"])
def test_invalid_compression_level(invalid_spec):
with pytest.raises(argparse.ArgumentTypeError):
CompressionSpec("")
assert isinstance(CompressionSpec("none").compressor, CNONE)
assert isinstance(CompressionSpec("lz4").compressor, LZ4)
zlib = CompressionSpec("zlib").compressor
assert isinstance(zlib, ZLIB)
assert zlib.level == 6
zlib = CompressionSpec("zlib,0").compressor
assert isinstance(zlib, ZLIB)
assert zlib.level == 0
zlib = CompressionSpec("zlib,9").compressor
assert isinstance(zlib, ZLIB)
assert zlib.level == 9
with pytest.raises(argparse.ArgumentTypeError):
CompressionSpec("zlib,9,invalid")
lzma = CompressionSpec("lzma").compressor
assert isinstance(lzma, LZMA)
assert lzma.level == 6
lzma = CompressionSpec("lzma,0").compressor
assert isinstance(lzma, LZMA)
assert lzma.level == 0
lzma = CompressionSpec("lzma,9").compressor
assert isinstance(lzma, LZMA)
assert lzma.level == 9
zstd = CompressionSpec("zstd").compressor
assert isinstance(zstd, ZSTD)
assert zstd.level == 3
zstd = CompressionSpec("zstd,1").compressor
assert isinstance(zstd, ZSTD)
assert zstd.level == 1
zstd = CompressionSpec("zstd,22").compressor
assert isinstance(zstd, ZSTD)
assert zstd.level == 22
with pytest.raises(argparse.ArgumentTypeError):
CompressionSpec("lzma,9,invalid")
with pytest.raises(argparse.ArgumentTypeError):
CompressionSpec("invalid")
CompressionSpec(invalid_spec)