mirror of https://github.com/borgbackup/borg.git
147 lines
4.8 KiB
Python
147 lines
4.8 KiB
Python
import time
|
|
|
|
import pytest
|
|
|
|
from ..locking import get_id, TimeoutTimer, ExclusiveLock, Lock, LockRoster, \
|
|
ADD, REMOVE, SHARED, EXCLUSIVE, LockTimeout
|
|
|
|
|
|
ID1 = "foo", 1, 1
|
|
ID2 = "bar", 2, 2
|
|
|
|
|
|
def test_id():
|
|
hostname, pid, tid = get_id()
|
|
assert isinstance(hostname, str)
|
|
assert isinstance(pid, int)
|
|
assert isinstance(tid, int)
|
|
assert len(hostname) > 0
|
|
assert pid > 0
|
|
|
|
|
|
class TestTimeoutTimer:
|
|
def test_timeout(self):
|
|
timeout = 0.5
|
|
t = TimeoutTimer(timeout).start()
|
|
assert not t.timed_out()
|
|
time.sleep(timeout * 1.5)
|
|
assert t.timed_out()
|
|
|
|
def test_notimeout_sleep(self):
|
|
timeout, sleep = None, 0.5
|
|
t = TimeoutTimer(timeout, sleep).start()
|
|
assert not t.timed_out_or_sleep()
|
|
assert time.time() >= t.start_time + 1 * sleep
|
|
assert not t.timed_out_or_sleep()
|
|
assert time.time() >= t.start_time + 2 * sleep
|
|
|
|
|
|
@pytest.fixture()
|
|
def lockpath(tmpdir):
|
|
return str(tmpdir.join('lock'))
|
|
|
|
|
|
class TestExclusiveLock:
|
|
def test_checks(self, lockpath):
|
|
with ExclusiveLock(lockpath, timeout=1) as lock:
|
|
assert lock.is_locked() and lock.by_me()
|
|
|
|
def test_acquire_break_reacquire(self, lockpath):
|
|
lock = ExclusiveLock(lockpath, id=ID1).acquire()
|
|
lock.break_lock()
|
|
with ExclusiveLock(lockpath, id=ID2):
|
|
pass
|
|
|
|
def test_timeout(self, lockpath):
|
|
with ExclusiveLock(lockpath, id=ID1):
|
|
with pytest.raises(LockTimeout):
|
|
ExclusiveLock(lockpath, id=ID2, timeout=0.1).acquire()
|
|
|
|
|
|
class TestLock:
|
|
def test_shared(self, lockpath):
|
|
lock1 = Lock(lockpath, exclusive=False, id=ID1).acquire()
|
|
lock2 = Lock(lockpath, exclusive=False, id=ID2).acquire()
|
|
assert len(lock1._roster.get(SHARED)) == 2
|
|
assert len(lock1._roster.get(EXCLUSIVE)) == 0
|
|
assert not lock1._roster.empty(SHARED, EXCLUSIVE)
|
|
assert lock1._roster.empty(EXCLUSIVE)
|
|
lock1.release()
|
|
lock2.release()
|
|
|
|
def test_exclusive(self, lockpath):
|
|
with Lock(lockpath, exclusive=True, id=ID1) as lock:
|
|
assert len(lock._roster.get(SHARED)) == 0
|
|
assert len(lock._roster.get(EXCLUSIVE)) == 1
|
|
assert not lock._roster.empty(SHARED, EXCLUSIVE)
|
|
|
|
def test_upgrade(self, lockpath):
|
|
with Lock(lockpath, exclusive=False) as lock:
|
|
lock.upgrade()
|
|
lock.upgrade() # NOP
|
|
assert len(lock._roster.get(SHARED)) == 0
|
|
assert len(lock._roster.get(EXCLUSIVE)) == 1
|
|
assert not lock._roster.empty(SHARED, EXCLUSIVE)
|
|
|
|
def test_downgrade(self, lockpath):
|
|
with Lock(lockpath, exclusive=True) as lock:
|
|
lock.downgrade()
|
|
lock.downgrade() # NOP
|
|
assert len(lock._roster.get(SHARED)) == 1
|
|
assert len(lock._roster.get(EXCLUSIVE)) == 0
|
|
|
|
def test_got_exclusive_lock(self, lockpath):
|
|
lock = Lock(lockpath, exclusive=True, id=ID1)
|
|
assert not lock.got_exclusive_lock()
|
|
lock.acquire()
|
|
assert lock.got_exclusive_lock()
|
|
lock.release()
|
|
assert not lock.got_exclusive_lock()
|
|
|
|
def test_break(self, lockpath):
|
|
lock = Lock(lockpath, exclusive=True, id=ID1).acquire()
|
|
lock.break_lock()
|
|
assert len(lock._roster.get(SHARED)) == 0
|
|
assert len(lock._roster.get(EXCLUSIVE)) == 0
|
|
with Lock(lockpath, exclusive=True, id=ID2):
|
|
pass
|
|
|
|
def test_timeout(self, lockpath):
|
|
with Lock(lockpath, exclusive=False, id=ID1):
|
|
with pytest.raises(LockTimeout):
|
|
Lock(lockpath, exclusive=True, id=ID2, timeout=0.1).acquire()
|
|
with Lock(lockpath, exclusive=True, id=ID1):
|
|
with pytest.raises(LockTimeout):
|
|
Lock(lockpath, exclusive=False, id=ID2, timeout=0.1).acquire()
|
|
with Lock(lockpath, exclusive=True, id=ID1):
|
|
with pytest.raises(LockTimeout):
|
|
Lock(lockpath, exclusive=True, id=ID2, timeout=0.1).acquire()
|
|
|
|
|
|
@pytest.fixture()
|
|
def rosterpath(tmpdir):
|
|
return str(tmpdir.join('roster'))
|
|
|
|
|
|
class TestLockRoster:
|
|
def test_empty(self, rosterpath):
|
|
roster = LockRoster(rosterpath)
|
|
empty = roster.load()
|
|
roster.save(empty)
|
|
assert empty == {}
|
|
|
|
def test_modify_get(self, rosterpath):
|
|
roster1 = LockRoster(rosterpath, id=ID1)
|
|
assert roster1.get(SHARED) == set()
|
|
roster1.modify(SHARED, ADD)
|
|
assert roster1.get(SHARED) == {ID1, }
|
|
roster2 = LockRoster(rosterpath, id=ID2)
|
|
roster2.modify(SHARED, ADD)
|
|
assert roster2.get(SHARED) == {ID1, ID2, }
|
|
roster1 = LockRoster(rosterpath, id=ID1)
|
|
roster1.modify(SHARED, REMOVE)
|
|
assert roster1.get(SHARED) == {ID2, }
|
|
roster2 = LockRoster(rosterpath, id=ID2)
|
|
roster2.modify(SHARED, REMOVE)
|
|
assert roster2.get(SHARED) == set()
|