bazarr/libs/gitdb/test/test_stream.py

165 lines
5.6 KiB
Python

# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
#
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Test for object db"""
from gitdb.test.lib import (
TestBase,
DummyStream,
make_bytes,
make_object,
fixture_path
)
from gitdb import (
DecompressMemMapReader,
FDCompressedSha1Writer,
LooseObjectDB,
Sha1Writer,
MemoryDB,
IStream,
)
from gitdb.util import hex_to_bin
import zlib
from gitdb.typ import (
str_blob_type
)
import tempfile
import os
from io import BytesIO
class TestStream(TestBase):
"""Test stream classes"""
data_sizes = (15, 10000, 1000 * 1024 + 512)
def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None):
"""Make stream tests - the orig_stream is seekable, allowing it to be
rewound and reused
:param cdata: the data we expect to read from stream, the contents
:param rewind_stream: function called to rewind the stream to make it ready
for reuse"""
ns = 10
assert len(cdata) > ns - 1, "Data must be larger than %i, was %i" % (ns, len(cdata))
# read in small steps
ss = len(cdata) // ns
for i in range(ns):
data = stream.read(ss)
chunk = cdata[i * ss:(i + 1) * ss]
assert data == chunk
# END for each step
rest = stream.read()
if rest:
assert rest == cdata[-len(rest):]
# END handle rest
if isinstance(stream, DecompressMemMapReader):
assert len(stream.data()) == stream.compressed_bytes_read()
# END handle special type
rewind_stream(stream)
# read everything
rdata = stream.read()
assert rdata == cdata
if isinstance(stream, DecompressMemMapReader):
assert len(stream.data()) == stream.compressed_bytes_read()
# END handle special type
def test_decompress_reader(self):
for close_on_deletion in range(2):
for with_size in range(2):
for ds in self.data_sizes:
cdata = make_bytes(ds, randomize=False)
# zdata = zipped actual data
# cdata = original content data
# create reader
if with_size:
# need object data
zdata = zlib.compress(make_object(str_blob_type, cdata))
typ, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion)
assert size == len(cdata)
assert typ == str_blob_type
# even if we don't set the size, it will be set automatically on first read
test_reader = DecompressMemMapReader(zdata, close_on_deletion=False)
assert test_reader._s == len(cdata)
else:
# here we need content data
zdata = zlib.compress(cdata)
reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata))
assert reader._s == len(cdata)
# END get reader
self._assert_stream_reader(reader, cdata, lambda r: r.seek(0))
# put in a dummy stream for closing
dummy = DummyStream()
reader._m = dummy
assert not dummy.closed
del(reader)
assert dummy.closed == close_on_deletion
# END for each datasize
# END whether size should be used
# END whether stream should be closed when deleted
def test_sha_writer(self):
writer = Sha1Writer()
assert 2 == writer.write("hi".encode("ascii"))
assert len(writer.sha(as_hex=1)) == 40
assert len(writer.sha(as_hex=0)) == 20
# make sure it does something ;)
prev_sha = writer.sha()
writer.write("hi again".encode("ascii"))
assert writer.sha() != prev_sha
def test_compressed_writer(self):
for ds in self.data_sizes:
fd, path = tempfile.mkstemp()
ostream = FDCompressedSha1Writer(fd)
data = make_bytes(ds, randomize=False)
# for now, just a single write, code doesn't care about chunking
assert len(data) == ostream.write(data)
ostream.close()
# its closed already
self.failUnlessRaises(OSError, os.close, fd)
# read everything back, compare to data we zip
fd = os.open(path, os.O_RDONLY | getattr(os, 'O_BINARY', 0))
written_data = os.read(fd, os.path.getsize(path))
assert len(written_data) == os.path.getsize(path)
os.close(fd)
assert written_data == zlib.compress(data, 1) # best speed
os.remove(path)
# END for each os
def test_decompress_reader_special_case(self):
odb = LooseObjectDB(fixture_path('objects'))
mdb = MemoryDB()
for sha in (b'888401851f15db0eed60eb1bc29dec5ddcace911',
b'7bb839852ed5e3a069966281bb08d50012fb309b',):
ostream = odb.stream(hex_to_bin(sha))
# if there is a bug, we will be missing one byte exactly !
data = ostream.read()
assert len(data) == ostream.size
# Putting it back in should yield nothing new - after all, we have
dump = mdb.store(IStream(ostream.type, ostream.size, BytesIO(data)))
assert dump.hexsha == sha
# end for each loose object sha to test