1
0
Fork 0
mirror of https://github.com/morpheus65535/bazarr synced 2024-12-30 19:46:25 +00:00
bazarr/libs/gitdb/test/test_util.py

101 lines
3.2 KiB
Python
Raw Normal View History

# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
#
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Test for object db"""
import tempfile
import os
from gitdb.test.lib import TestBase
from gitdb.util import (
to_hex_sha,
to_bin_sha,
NULL_HEX_SHA,
LockedFD
)
class TestUtils(TestBase):
def test_basics(self):
assert to_hex_sha(NULL_HEX_SHA) == NULL_HEX_SHA
assert len(to_bin_sha(NULL_HEX_SHA)) == 20
assert to_hex_sha(to_bin_sha(NULL_HEX_SHA)) == NULL_HEX_SHA.encode("ascii")
def _cmp_contents(self, file_path, data):
# raise if data from file at file_path
# does not match data string
with open(file_path, "rb") as fp:
assert fp.read() == data.encode("ascii")
def test_lockedfd(self):
my_file = tempfile.mktemp()
orig_data = "hello"
new_data = "world"
with open(my_file, "wb") as my_file_fp:
my_file_fp.write(orig_data.encode("ascii"))
try:
lfd = LockedFD(my_file)
lockfilepath = lfd._lockfilepath()
# cannot end before it was started
self.failUnlessRaises(AssertionError, lfd.rollback)
self.failUnlessRaises(AssertionError, lfd.commit)
# open for writing
assert not os.path.isfile(lockfilepath)
wfd = lfd.open(write=True)
assert lfd._fd is wfd
assert os.path.isfile(lockfilepath)
# write data and fail
os.write(wfd, new_data.encode("ascii"))
lfd.rollback()
assert lfd._fd is None
self._cmp_contents(my_file, orig_data)
assert not os.path.isfile(lockfilepath)
# additional call doesn't fail
lfd.commit()
lfd.rollback()
# test reading
lfd = LockedFD(my_file)
rfd = lfd.open(write=False)
assert os.read(rfd, len(orig_data)) == orig_data.encode("ascii")
assert os.path.isfile(lockfilepath)
# deletion rolls back
del(lfd)
assert not os.path.isfile(lockfilepath)
# write data - concurrently
lfd = LockedFD(my_file)
olfd = LockedFD(my_file)
assert not os.path.isfile(lockfilepath)
wfdstream = lfd.open(write=True, stream=True) # this time as stream
assert os.path.isfile(lockfilepath)
# another one fails
self.failUnlessRaises(IOError, olfd.open)
wfdstream.write(new_data.encode("ascii"))
lfd.commit()
assert not os.path.isfile(lockfilepath)
self._cmp_contents(my_file, new_data)
# could test automatic _end_writing on destruction
finally:
os.remove(my_file)
# END final cleanup
# try non-existing file for reading
lfd = LockedFD(tempfile.mktemp())
try:
lfd.open(write=False)
except OSError:
assert not os.path.exists(lfd._lockfilepath())
else:
self.fail("expected OSError")
# END handle exceptions