Merge pull request #7742 from bigtedde/remove-BaseTestCase

PR: Remove BaseTestCase from `testsuite/`
This commit is contained in:
TW 2023-07-29 22:14:33 +02:00 committed by GitHub
commit 1e7dec1541
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 323 additions and 302 deletions

View File

@ -7,7 +7,6 @@ from unittest.mock import Mock
import pytest
from . import BaseTestCase
from . import rejected_dotdot_paths
from ..crypto.key import PlaintextKey
from ..archive import Archive, CacheChunkBuffer, RobustUnpacker, valid_msgpacked_dict, ITEM_KEYS, Statistics
@ -133,97 +132,98 @@ class MockCache:
return id, len(data)
class ChunkBufferTestCase(BaseTestCase):
def test_cache_chunk_buffer(self):
data = [Item(path="p1"), Item(path="p2")]
cache = MockCache()
key = PlaintextKey(None)
chunks = CacheChunkBuffer(cache, key, None)
for d in data:
chunks.add(d)
chunks.flush()
chunks.flush(flush=True)
self.assert_equal(len(chunks.chunks), 2)
unpacker = msgpack.Unpacker()
for id in chunks.chunks:
unpacker.feed(cache.objects[id])
self.assert_equal(data, [Item(internal_dict=d) for d in unpacker])
def test_partial_cache_chunk_buffer(self):
big = "0123456789abcdefghijklmnopqrstuvwxyz" * 25000
data = [Item(path="full", target=big), Item(path="partial", target=big)]
cache = MockCache()
key = PlaintextKey(None)
chunks = CacheChunkBuffer(cache, key, None)
for d in data:
chunks.add(d)
chunks.flush(flush=False)
# the code is expected to leave the last partial chunk in the buffer
self.assert_equal(len(chunks.chunks), 3)
assert chunks.buffer.tell() > 0
# now really flush
chunks.flush(flush=True)
self.assert_equal(len(chunks.chunks), 4)
assert chunks.buffer.tell() == 0
unpacker = msgpack.Unpacker()
for id in chunks.chunks:
unpacker.feed(cache.objects[id])
self.assert_equal(data, [Item(internal_dict=d) for d in unpacker])
def test_cache_chunk_buffer():
data = [Item(path="p1"), Item(path="p2")]
cache = MockCache()
key = PlaintextKey(None)
chunks = CacheChunkBuffer(cache, key, None)
for d in data:
chunks.add(d)
chunks.flush()
chunks.flush(flush=True)
assert len(chunks.chunks) == 2
unpacker = msgpack.Unpacker()
for id in chunks.chunks:
unpacker.feed(cache.objects[id])
assert data == [Item(internal_dict=d) for d in unpacker]
class RobustUnpackerTestCase(BaseTestCase):
def make_chunks(self, items):
return b"".join(msgpack.packb({"path": item}) for item in items)
def test_partial_cache_chunk_buffer():
big = "0123456789abcdefghijklmnopqrstuvwxyz" * 25000
data = [Item(path="full", target=big), Item(path="partial", target=big)]
cache = MockCache()
key = PlaintextKey(None)
chunks = CacheChunkBuffer(cache, key, None)
for d in data:
chunks.add(d)
chunks.flush(flush=False)
# the code is expected to leave the last partial chunk in the buffer
assert len(chunks.chunks) == 3
assert chunks.buffer.tell() > 0
# now really flush
chunks.flush(flush=True)
assert len(chunks.chunks) == 4
assert chunks.buffer.tell() == 0
unpacker = msgpack.Unpacker()
for id in chunks.chunks:
unpacker.feed(cache.objects[id])
assert data == [Item(internal_dict=d) for d in unpacker]
def _validator(self, value):
return isinstance(value, dict) and value.get("path") in ("foo", "bar", "boo", "baz")
def process(self, input):
unpacker = RobustUnpacker(validator=self._validator, item_keys=ITEM_KEYS)
result = []
for should_sync, chunks in input:
if should_sync:
unpacker.resync()
for data in chunks:
unpacker.feed(data)
for item in unpacker:
result.append(item)
return result
def make_chunks(items):
return b"".join(msgpack.packb({"path": item}) for item in items)
def test_extra_garbage_no_sync(self):
chunks = [
(False, [self.make_chunks(["foo", "bar"])]),
(False, [b"garbage"] + [self.make_chunks(["boo", "baz"])]),
]
result = self.process(chunks)
self.assert_equal(
result, [{"path": "foo"}, {"path": "bar"}, 103, 97, 114, 98, 97, 103, 101, {"path": "boo"}, {"path": "baz"}]
)
def split(self, left, length):
parts = []
while left:
parts.append(left[:length])
left = left[length:]
return parts
def _validator(value):
return isinstance(value, dict) and value.get("path") in ("foo", "bar", "boo", "baz")
def test_correct_stream(self):
chunks = self.split(self.make_chunks(["foo", "bar", "boo", "baz"]), 2)
input = [(False, chunks)]
result = self.process(input)
self.assert_equal(result, [{"path": "foo"}, {"path": "bar"}, {"path": "boo"}, {"path": "baz"}])
def test_missing_chunk(self):
chunks = self.split(self.make_chunks(["foo", "bar", "boo", "baz"]), 4)
input = [(False, chunks[:3]), (True, chunks[4:])]
result = self.process(input)
self.assert_equal(result, [{"path": "foo"}, {"path": "boo"}, {"path": "baz"}])
def process(input):
unpacker = RobustUnpacker(validator=_validator, item_keys=ITEM_KEYS)
result = []
for should_sync, chunks in input:
if should_sync:
unpacker.resync()
for data in chunks:
unpacker.feed(data)
for item in unpacker:
result.append(item)
return result
def test_corrupt_chunk(self):
chunks = self.split(self.make_chunks(["foo", "bar", "boo", "baz"]), 4)
input = [(False, chunks[:3]), (True, [b"gar", b"bage"] + chunks[3:])]
result = self.process(input)
self.assert_equal(result, [{"path": "foo"}, {"path": "boo"}, {"path": "baz"}])
def test_extra_garbage_no_sync():
chunks = [(False, [make_chunks(["foo", "bar"])]), (False, [b"garbage"] + [make_chunks(["boo", "baz"])])]
res = process(chunks)
assert res == [{"path": "foo"}, {"path": "bar"}, 103, 97, 114, 98, 97, 103, 101, {"path": "boo"}, {"path": "baz"}]
def split(left, length):
parts = []
while left:
parts.append(left[:length])
left = left[length:]
return parts
def test_correct_stream():
chunks = split(make_chunks(["foo", "bar", "boo", "baz"]), 2)
input = [(False, chunks)]
result = process(input)
assert result == [{"path": "foo"}, {"path": "bar"}, {"path": "boo"}, {"path": "baz"}]
def test_missing_chunk():
chunks = split(make_chunks(["foo", "bar", "boo", "baz"]), 4)
input = [(False, chunks[:3]), (True, chunks[4:])]
result = process(input)
assert result == [{"path": "foo"}, {"path": "boo"}, {"path": "baz"}]
def test_corrupt_chunk():
chunks = split(make_chunks(["foo", "bar", "boo", "baz"]), 4)
input = [(False, chunks[:3]), (True, [b"gar", b"bage"] + chunks[3:])]
result = process(input)
assert result == [{"path": "foo"}, {"path": "boo"}, {"path": "baz"}]
@pytest.fixture

View File

@ -5,35 +5,33 @@ from .chunker import cf
from ..chunker import Chunker
from ..crypto.low_level import blake2b_256
from ..constants import * # NOQA
from . import BaseTestCase
class ChunkerRegressionTestCase(BaseTestCase):
def test_chunkpoints_unchanged(self):
def twist(size):
x = 1
a = bytearray(size)
for i in range(size):
x = (x * 1103515245 + 12345) & 0x7FFFFFFF
a[i] = x & 0xFF
return a
def test_chunkpoints_unchanged():
def twist(size):
x = 1
a = bytearray(size)
for i in range(size):
x = (x * 1103515245 + 12345) & 0x7FFFFFFF
a[i] = x & 0xFF
return a
data = twist(100000)
data = twist(100000)
runs = []
for winsize in (65, 129, HASH_WINDOW_SIZE, 7351):
for minexp in (4, 6, 7, 11, 12):
for maxexp in (15, 17):
if minexp >= maxexp:
continue
for maskbits in (4, 7, 10, 12):
for seed in (1849058162, 1234567653):
fh = BytesIO(data)
chunker = Chunker(seed, minexp, maxexp, maskbits, winsize)
chunks = [blake2b_256(b"", c) for c in cf(chunker.chunkify(fh, -1))]
runs.append(blake2b_256(b"", b"".join(chunks)))
runs = []
for winsize in (65, 129, HASH_WINDOW_SIZE, 7351):
for minexp in (4, 6, 7, 11, 12):
for maxexp in (15, 17):
if minexp >= maxexp:
continue
for maskbits in (4, 7, 10, 12):
for seed in (1849058162, 1234567653):
fh = BytesIO(data)
chunker = Chunker(seed, minexp, maxexp, maskbits, winsize)
chunks = [blake2b_256(b"", c) for c in cf(chunker.chunkify(fh, -1))]
runs.append(blake2b_256(b"", b"".join(chunks)))
# The "correct" hash below matches the existing chunker behavior.
# Future chunker optimisations must not change this, or existing repos will bloat.
overall_hash = blake2b_256(b"", b"".join(runs))
self.assert_equal(overall_hash, unhexlify("b559b0ac8df8daaa221201d018815114241ea5c6609d98913cd2246a702af4e3"))
# The "correct" hash below matches the existing chunker behavior.
# Future chunker optimisations must not change this, or existing repos will bloat.
overall_hash = blake2b_256(b"", b"".join(runs))
assert overall_hash == unhexlify("b559b0ac8df8daaa221201d018815114241ea5c6609d98913cd2246a702af4e3")

View File

@ -46,8 +46,7 @@ from ..helpers import safe_unlink
from ..helpers import text_to_json, binary_to_json
from ..helpers.passphrase import Passphrase, PasswordRetriesExceeded
from ..platform import is_cygwin, is_win32, is_darwin
from . import BaseTestCase, FakeInputs, are_hardlinks_supported
from . import FakeInputs, are_hardlinks_supported
from . import rejected_dotdot_paths
@ -365,68 +364,87 @@ def test_text_invalid(text):
tv(text)
class FormatTimedeltaTestCase(BaseTestCase):
def test(self):
t0 = datetime(2001, 1, 1, 10, 20, 3, 0)
t1 = datetime(2001, 1, 1, 12, 20, 4, 100000)
self.assert_equal(format_timedelta(t1 - t0), "2 hours 1.10 seconds")
def test_format_timedelta():
t0 = datetime(2001, 1, 1, 10, 20, 3, 0)
t1 = datetime(2001, 1, 1, 12, 20, 4, 100000)
assert format_timedelta(t1 - t0) == "2 hours 1.10 seconds"
def test_chunkerparams():
assert ChunkerParams("default") == ("buzhash", 19, 23, 21, 4095)
assert ChunkerParams("19,23,21,4095") == ("buzhash", 19, 23, 21, 4095)
assert ChunkerParams("buzhash,19,23,21,4095") == ("buzhash", 19, 23, 21, 4095)
assert ChunkerParams("10,23,16,4095") == ("buzhash", 10, 23, 16, 4095)
assert ChunkerParams("fixed,4096") == ("fixed", 4096, 0)
assert ChunkerParams("fixed,4096,200") == ("fixed", 4096, 200)
# invalid values checking
with pytest.raises(ArgumentTypeError):
ChunkerParams("crap,1,2,3,4") # invalid algo
with pytest.raises(ArgumentTypeError):
ChunkerParams("buzhash,5,7,6,4095") # too small min. size
with pytest.raises(ArgumentTypeError):
ChunkerParams("buzhash,19,24,21,4095") # too big max. size
with pytest.raises(ArgumentTypeError):
ChunkerParams("buzhash,23,19,21,4095") # violates min <= mask <= max
with pytest.raises(ArgumentTypeError):
ChunkerParams("fixed,63") # too small block size
with pytest.raises(ArgumentTypeError):
ChunkerParams("fixed,%d,%d" % (MAX_DATA_SIZE + 1, 4096)) # too big block size
with pytest.raises(ArgumentTypeError):
ChunkerParams("fixed,%d,%d" % (4096, MAX_DATA_SIZE + 1)) # too big header size
@pytest.mark.parametrize(
"chunker_params, expected_return",
[
("default", ("buzhash", 19, 23, 21, 4095)),
("19,23,21,4095", ("buzhash", 19, 23, 21, 4095)),
("buzhash,19,23,21,4095", ("buzhash", 19, 23, 21, 4095)),
("10,23,16,4095", ("buzhash", 10, 23, 16, 4095)),
("fixed,4096", ("fixed", 4096, 0)),
("fixed,4096,200", ("fixed", 4096, 200)),
],
)
def test_valid_chunkerparams(chunker_params, expected_return):
assert ChunkerParams(chunker_params) == expected_return
class RemoveDotdotPrefixesTestCase(BaseTestCase):
def test(self):
self.assert_equal(remove_dotdot_prefixes("."), ".")
self.assert_equal(remove_dotdot_prefixes(".."), ".")
self.assert_equal(remove_dotdot_prefixes("/"), ".")
self.assert_equal(remove_dotdot_prefixes("//"), ".")
self.assert_equal(remove_dotdot_prefixes("foo"), "foo")
self.assert_equal(remove_dotdot_prefixes("foo/bar"), "foo/bar")
self.assert_equal(remove_dotdot_prefixes("/foo/bar"), "foo/bar")
self.assert_equal(remove_dotdot_prefixes("../foo/bar"), "foo/bar")
@pytest.mark.parametrize(
"invalid_chunker_params",
[
"crap,1,2,3,4", # invalid algo
"buzhash,5,7,6,4095", # too small min. size
"buzhash,19,24,21,4095", # too big max. size
"buzhash,23,19,21,4095", # violates min <= mask <= max
"fixed,63", # too small block size
"fixed,%d,%d" % (MAX_DATA_SIZE + 1, 4096), # too big block size
"fixed,%d,%d" % (4096, MAX_DATA_SIZE + 1), # too big header size
],
)
def test_invalid_chunkerparams(invalid_chunker_params):
with pytest.raises(ArgumentTypeError):
ChunkerParams(invalid_chunker_params)
class MakePathSafeTestCase(BaseTestCase):
def test(self):
self.assert_equal(make_path_safe("."), ".")
self.assert_equal(make_path_safe("./"), ".")
self.assert_equal(make_path_safe("./foo"), "foo")
self.assert_equal(make_path_safe(".//foo"), "foo")
self.assert_equal(make_path_safe(".//foo//bar//"), "foo/bar")
self.assert_equal(make_path_safe("/foo/bar"), "foo/bar")
self.assert_equal(make_path_safe("//foo/bar"), "foo/bar")
self.assert_equal(make_path_safe("//foo/./bar"), "foo/bar")
self.assert_equal(make_path_safe(".test"), ".test")
self.assert_equal(make_path_safe(".test."), ".test.")
self.assert_equal(make_path_safe("..test.."), "..test..")
self.assert_equal(make_path_safe("/te..st/foo/bar"), "te..st/foo/bar")
self.assert_equal(make_path_safe("/..test../abc//"), "..test../abc")
@pytest.mark.parametrize(
"original_path, expected_path",
[
(".", "."),
("..", "."),
("/", "."),
("//", "."),
("foo", "foo"),
("foo/bar", "foo/bar"),
("/foo/bar", "foo/bar"),
("../foo/bar", "foo/bar"),
],
)
def test_remove_dotdot_prefixes(original_path, expected_path):
assert remove_dotdot_prefixes(original_path) == expected_path
for path in rejected_dotdot_paths:
with pytest.raises(ValueError, match="unexpected '..' element in path"):
make_path_safe(path)
@pytest.mark.parametrize(
"original_path, expected_path",
[
(".", "."),
("./", "."),
("/foo", "foo"),
("//foo", "foo"),
(".//foo//bar//", "foo/bar"),
("/foo/bar", "foo/bar"),
("//foo/bar", "foo/bar"),
("//foo/./bar", "foo/bar"),
(".test", ".test"),
(".test.", ".test."),
("..test..", "..test.."),
("/te..st/foo/bar", "te..st/foo/bar"),
("/..test../abc//", "..test../abc"),
],
)
def test_valid_make_path_safe(original_path, expected_path):
assert make_path_safe(original_path) == expected_path
@pytest.mark.parametrize("path", rejected_dotdot_paths)
def test_invalid_make_path_safe(path):
with pytest.raises(ValueError, match="unexpected '..' element in path"):
make_path_safe(path)
class MockArchive:
@ -503,7 +521,7 @@ def test_prune_split_keep_oldest():
MockArchive(datetime(2018, 1, 1, 10, 0, 0, tzinfo=local_tz), 1),
# an interim backup
MockArchive(datetime(2018, 12, 30, 10, 0, 0, tzinfo=local_tz), 2),
# year end backups
# year-end backups
MockArchive(datetime(2018, 12, 31, 10, 0, 0, tzinfo=local_tz), 3),
MockArchive(datetime(2019, 12, 31, 10, 0, 0, tzinfo=local_tz), 4),
]
@ -527,9 +545,6 @@ def test_prune_split_keep_oldest():
def test_prune_split_no_archives():
def subset(lst, ids):
return {i for i in lst if i.id in ids}
archives = []
kept_because = {}
@ -539,77 +554,70 @@ def test_prune_split_no_archives():
assert kept_because == {}
class IntervalTestCase(BaseTestCase):
def test_interval(self):
self.assert_equal(interval("1H"), 1)
self.assert_equal(interval("1d"), 24)
self.assert_equal(interval("1w"), 168)
self.assert_equal(interval("1m"), 744)
self.assert_equal(interval("1y"), 8760)
def test_interval_time_unit(self):
with pytest.raises(ArgumentTypeError) as exc:
interval("H")
self.assert_equal(exc.value.args, ('Unexpected interval number "": expected an integer greater than 0',))
with pytest.raises(ArgumentTypeError) as exc:
interval("-1d")
self.assert_equal(exc.value.args, ('Unexpected interval number "-1": expected an integer greater than 0',))
with pytest.raises(ArgumentTypeError) as exc:
interval("food")
self.assert_equal(exc.value.args, ('Unexpected interval number "foo": expected an integer greater than 0',))
def test_interval_number(self):
with pytest.raises(ArgumentTypeError) as exc:
interval("5")
self.assert_equal(
exc.value.args, ("Unexpected interval time unit \"5\": expected one of ['H', 'd', 'w', 'm', 'y']",)
)
@pytest.mark.parametrize("timeframe, num_hours", [("1H", 1), ("1d", 24), ("1w", 168), ("1m", 744), ("1y", 8760)])
def test_interval(timeframe, num_hours):
assert interval(timeframe) == num_hours
class PruneWithinTestCase(BaseTestCase):
def test_prune_within(self):
def subset(lst, indices):
return {lst[i] for i in indices}
def dotest(test_archives, within, indices):
for ta in test_archives, reversed(test_archives):
kept_because = {}
keep = prune_within(ta, interval(within), kept_because)
self.assert_equal(set(keep), subset(test_archives, indices))
assert all("within" == kept_because[a.id][0] for a in keep)
# 1 minute, 1.5 hours, 2.5 hours, 3.5 hours, 25 hours, 49 hours
test_offsets = [60, 90 * 60, 150 * 60, 210 * 60, 25 * 60 * 60, 49 * 60 * 60]
now = datetime.now(timezone.utc)
test_dates = [now - timedelta(seconds=s) for s in test_offsets]
test_archives = [MockArchive(date, i) for i, date in enumerate(test_dates)]
dotest(test_archives, "1H", [0])
dotest(test_archives, "2H", [0, 1])
dotest(test_archives, "3H", [0, 1, 2])
dotest(test_archives, "24H", [0, 1, 2, 3])
dotest(test_archives, "26H", [0, 1, 2, 3, 4])
dotest(test_archives, "2d", [0, 1, 2, 3, 4])
dotest(test_archives, "50H", [0, 1, 2, 3, 4, 5])
dotest(test_archives, "3d", [0, 1, 2, 3, 4, 5])
dotest(test_archives, "1w", [0, 1, 2, 3, 4, 5])
dotest(test_archives, "1m", [0, 1, 2, 3, 4, 5])
dotest(test_archives, "1y", [0, 1, 2, 3, 4, 5])
@pytest.mark.parametrize(
"invalid_interval, error_tuple",
[
("H", ('Unexpected interval number "": expected an integer greater than 0',)),
("-1d", ('Unexpected interval number "-1": expected an integer greater than 0',)),
("food", ('Unexpected interval number "foo": expected an integer greater than 0',)),
],
)
def test_interval_time_unit(invalid_interval, error_tuple):
with pytest.raises(ArgumentTypeError) as exc:
interval(invalid_interval)
assert exc.value.args == error_tuple
class StableDictTestCase(BaseTestCase):
def test(self):
d = StableDict(foo=1, bar=2, boo=3, baz=4)
self.assert_equal(list(d.items()), [("bar", 2), ("baz", 4), ("boo", 3), ("foo", 1)])
self.assert_equal(hashlib.md5(msgpack.packb(d)).hexdigest(), "fc78df42cd60691b3ac3dd2a2b39903f")
def test_interval_number():
with pytest.raises(ArgumentTypeError) as exc:
interval("5")
assert exc.value.args == ("Unexpected interval time unit \"5\": expected one of ['H', 'd', 'w', 'm', 'y']",)
class TestParseTimestamp(BaseTestCase):
def test(self):
self.assert_equal(
parse_timestamp("2015-04-19T20:25:00.226410"), datetime(2015, 4, 19, 20, 25, 0, 226410, timezone.utc)
)
self.assert_equal(parse_timestamp("2015-04-19T20:25:00"), datetime(2015, 4, 19, 20, 25, 0, 0, timezone.utc))
def test_prune_within():
def subset(lst, indices):
return {lst[i] for i in indices}
def dotest(test_archives, within, indices):
for ta in test_archives, reversed(test_archives):
kept_because = {}
keep = prune_within(ta, interval(within), kept_because)
assert set(keep) == subset(test_archives, indices)
assert all("within" == kept_because[a.id][0] for a in keep)
# 1 minute, 1.5 hours, 2.5 hours, 3.5 hours, 25 hours, 49 hours
test_offsets = [60, 90 * 60, 150 * 60, 210 * 60, 25 * 60 * 60, 49 * 60 * 60]
now = datetime.now(timezone.utc)
test_dates = [now - timedelta(seconds=s) for s in test_offsets]
test_archives = [MockArchive(date, i) for i, date in enumerate(test_dates)]
dotest(test_archives, "1H", [0])
dotest(test_archives, "2H", [0, 1])
dotest(test_archives, "3H", [0, 1, 2])
dotest(test_archives, "24H", [0, 1, 2, 3])
dotest(test_archives, "26H", [0, 1, 2, 3, 4])
dotest(test_archives, "2d", [0, 1, 2, 3, 4])
dotest(test_archives, "50H", [0, 1, 2, 3, 4, 5])
dotest(test_archives, "3d", [0, 1, 2, 3, 4, 5])
dotest(test_archives, "1w", [0, 1, 2, 3, 4, 5])
dotest(test_archives, "1m", [0, 1, 2, 3, 4, 5])
dotest(test_archives, "1y", [0, 1, 2, 3, 4, 5])
def test_stable_dict():
d = StableDict(foo=1, bar=2, boo=3, baz=4)
assert list(d.items()) == [("bar", 2), ("baz", 4), ("boo", 3), ("foo", 1)]
assert hashlib.md5(msgpack.packb(d)).hexdigest() == "fc78df42cd60691b3ac3dd2a2b39903f"
def test_parse_timestamp():
assert parse_timestamp("2015-04-19T20:25:00.226410") == datetime(2015, 4, 19, 20, 25, 0, 226410, timezone.utc)
assert parse_timestamp("2015-04-19T20:25:00") == datetime(2015, 4, 19, 20, 25, 0, 0, timezone.utc)
def test_get_base_dir(monkeypatch):
@ -814,70 +822,79 @@ def test_get_runtime_dir(monkeypatch):
assert get_runtime_dir() == "/var/tmp"
def test_file_size():
@pytest.mark.parametrize(
"size, fmt",
[
(0, "0 B"), # no rounding necessary for those
(1, "1 B"),
(142, "142 B"),
(999, "999 B"),
(1000, "1.00 kB"), # rounding starts here
(1001, "1.00 kB"), # should be rounded away
(1234, "1.23 kB"), # should be rounded down
(1235, "1.24 kB"), # should be rounded up
(1010, "1.01 kB"), # rounded down as well
(999990000, "999.99 MB"), # rounded down
(999990001, "999.99 MB"), # rounded down
(999995000, "1.00 GB"), # rounded up to next unit
(10**6, "1.00 MB"), # and all the remaining units, megabytes
(10**9, "1.00 GB"), # gigabytes
(10**12, "1.00 TB"), # terabytes
(10**15, "1.00 PB"), # petabytes
(10**18, "1.00 EB"), # exabytes
(10**21, "1.00 ZB"), # zottabytes
(10**24, "1.00 YB"), # yottabytes
(-1, "-1 B"), # negative value
(-1010, "-1.01 kB"), # negative value with rounding
],
)
def test_file_size(size, fmt):
"""test the size formatting routines"""
si_size_map = {
0: "0 B", # no rounding necessary for those
1: "1 B",
142: "142 B",
999: "999 B",
1000: "1.00 kB", # rounding starts here
1001: "1.00 kB", # should be rounded away
1234: "1.23 kB", # should be rounded down
1235: "1.24 kB", # should be rounded up
1010: "1.01 kB", # rounded down as well
999990000: "999.99 MB", # rounded down
999990001: "999.99 MB", # rounded down
999995000: "1.00 GB", # rounded up to next unit
10**6: "1.00 MB", # and all the remaining units, megabytes
10**9: "1.00 GB", # gigabytes
10**12: "1.00 TB", # terabytes
10**15: "1.00 PB", # petabytes
10**18: "1.00 EB", # exabytes
10**21: "1.00 ZB", # zottabytes
10**24: "1.00 YB", # yottabytes
-1: "-1 B", # negative value
-1010: "-1.01 kB", # negative value with rounding
}
for size, fmt in si_size_map.items():
assert format_file_size(size) == fmt
def test_file_size_iec():
"""test the size formatting routines"""
iec_size_map = {
0: "0 B",
2**0: "1 B",
2**10: "1.00 KiB",
2**20: "1.00 MiB",
2**30: "1.00 GiB",
2**40: "1.00 TiB",
2**50: "1.00 PiB",
2**60: "1.00 EiB",
2**70: "1.00 ZiB",
2**80: "1.00 YiB",
-(2**0): "-1 B",
-(2**10): "-1.00 KiB",
-(2**20): "-1.00 MiB",
}
for size, fmt in iec_size_map.items():
assert format_file_size(size, iec=True) == fmt
def test_file_size_precision():
assert format_file_size(1234, precision=1) == "1.2 kB" # rounded down
assert format_file_size(1254, precision=1) == "1.3 kB" # rounded up
assert format_file_size(999990000, precision=1) == "1.0 GB" # and not 999.9 MB or 1000.0 MB
def test_file_size_sign():
si_size_map = {0: "0 B", 1: "+1 B", 1234: "+1.23 kB", -1: "-1 B", -1234: "-1.23 kB"}
for size, fmt in si_size_map.items():
assert format_file_size(size, sign=True) == fmt
assert format_file_size(size) == fmt
@pytest.mark.parametrize(
"string,value", (("1", 1), ("20", 20), ("5K", 5000), ("1.75M", 1750000), ("1e+9", 1e9), ("-1T", -1e12))
"size, fmt",
[
(0, "0 B"),
(2**0, "1 B"),
(2**10, "1.00 KiB"),
(2**20, "1.00 MiB"),
(2**30, "1.00 GiB"),
(2**40, "1.00 TiB"),
(2**50, "1.00 PiB"),
(2**60, "1.00 EiB"),
(2**70, "1.00 ZiB"),
(2**80, "1.00 YiB"),
(-(2**0), "-1 B"),
(-(2**10), "-1.00 KiB"),
(-(2**20), "-1.00 MiB"),
],
)
def test_file_size_iec(size, fmt):
"""test the size formatting routines"""
assert format_file_size(size, iec=True) == fmt
@pytest.mark.parametrize(
"original_size, formatted_size",
[
(1234, "1.2 kB"), # rounded down
(1254, "1.3 kB"), # rounded up
(999990000, "1.0 GB"), # and not 999.9 MB or 1000.0 MB
],
)
def test_file_size_precision(original_size, formatted_size):
assert format_file_size(original_size, precision=1) == formatted_size
@pytest.mark.parametrize("size, fmt", [(0, "0 B"), (1, "+1 B"), (1234, "+1.23 kB"), (-1, "-1 B"), (-1234, "-1.23 kB")])
def test_file_size_sign(size, fmt):
assert format_file_size(size, sign=True) == fmt
@pytest.mark.parametrize(
"string, value", [("1", 1), ("20", 20), ("5K", 5000), ("1.75M", 1750000), ("1e+9", 1e9), ("-1T", -1e12)]
)
def test_parse_file_size(string, value):
assert parse_file_size(string) == int(value)
@ -1106,12 +1123,18 @@ def test_progress_percentage_quiet(capfd):
assert err == ""
def test_partial_format():
assert partial_format("{space:10}", {"space": " "}) == " " * 10
assert partial_format("{foobar}", {"bar": "wrong", "foobar": "correct"}) == "correct"
assert partial_format("{unknown_key}", {}) == "{unknown_key}"
assert partial_format("{key}{{escaped_key}}", {}) == "{key}{{escaped_key}}"
assert partial_format("{{escaped_key}}", {"escaped_key": 1234}) == "{{escaped_key}}"
@pytest.mark.parametrize(
"fmt, items_map, expected_result",
[
("{space:10}", {"space": " "}, " " * 10),
("{foobar}", {"bar": "wrong", "foobar": "correct"}, "correct"),
("{unknown_key}", {}, "{unknown_key}"),
("{key}{{escaped_key}}", {}, "{key}{{escaped_key}}"),
("{{escaped_key}}", {"escaped_key": 1234}, "{{escaped_key}}"),
],
)
def test_partial_format(fmt, items_map, expected_result):
assert partial_format(fmt, items_map) == expected_result
def test_chunk_file_wrapper():