new locking code

exclusive locking by atomic mkdir fs operation
on top of that, shared (read) locks and exclusive (write) locks using a json roster.
This commit is contained in:
Thomas Waldmann 2015-07-13 12:22:49 +02:00
parent 434dac0e48
commit e4c519b1e9
5 changed files with 337 additions and 56 deletions

View File

@ -124,7 +124,7 @@ class Cache:
def open(self):
if not os.path.isdir(self.path):
raise Exception('%s Does not look like a Borg cache' % self.path)
self.lock = UpgradableLock(os.path.join(self.path, 'config'), exclusive=True)
self.lock = UpgradableLock(os.path.join(self.path, 'repo'), exclusive=True).acquire()
self.rollback()
def close(self):

View File

@ -1,43 +1,246 @@
import fcntl
import errno
import json
import os
import socket
import threading
import time
from borg.helpers import Error
ADD, REMOVE = 'add', 'remove'
SHARED, EXCLUSIVE = 'shared', 'exclusive'
class UpgradableLock:
class ReadLockFailed(Error):
"""Failed to acquire read lock on {}"""
def get_id():
"""Get identification tuple for 'us'"""
hostname = socket.gethostname()
pid = os.getpid()
tid = threading.current_thread().ident & 0xffffffff
return hostname, pid, tid
class WriteLockFailed(Error):
"""Failed to acquire write lock on {}"""
def __init__(self, path, exclusive=False):
self.path = path
try:
self.fd = open(path, 'r+')
except IOError:
self.fd = open(path, 'r')
try:
if exclusive:
fcntl.lockf(self.fd, fcntl.LOCK_EX)
class ExclusiveLock:
"""An exclusive Lock based on mkdir fs operation being atomic"""
class LockError(Error):
"""Failed to acquire the lock {}."""
class LockTimeout(LockError):
"""Failed to create/acquire the lock {} (timeout)."""
class LockFailed(LockError):
"""Failed to create/acquire the lock {} ({})."""
class UnlockError(Error):
"""Failed to release the lock {}."""
class NotLocked(UnlockError):
"""Failed to release the lock {} (was not locked)."""
class NotMyLock(UnlockError):
"""Failed to release the lock {} (was/is locked, but not by me)."""
def __init__(self, path, timeout=None, sleep=None, id=None):
self.timeout = timeout
self.sleep = sleep
self.path = os.path.abspath(path)
self.id = id or get_id()
self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id)
def __enter__(self):
return self.acquire()
def __exit__(self, *exc):
self.release()
def __repr__(self):
return "<%s: %r>" % (self.__class__.__name__, self.unique_name)
def _get_timing(self, timeout, sleep):
if timeout is None:
timeout = self.timeout
start = end = time.time()
if timeout is not None and timeout > 0:
end += timeout
if sleep is None:
sleep = self.sleep
if sleep is None:
if timeout is None:
sleep = 1.0
else:
fcntl.lockf(self.fd, fcntl.LOCK_SH)
# Python 3.2 raises IOError, Python3.3+ raises OSError
except (IOError, OSError):
if exclusive:
raise self.WriteLockFailed(self.path)
else:
raise self.ReadLockFailed(self.path)
self.is_exclusive = exclusive
sleep = max(0, timeout / 10.0)
return start, sleep, end, timeout
def upgrade(self):
try:
fcntl.lockf(self.fd, fcntl.LOCK_EX)
# Python 3.2 raises IOError, Python3.3+ raises OSError
except (IOError, OSError):
raise self.WriteLockFailed(self.path)
self.is_exclusive = True
def acquire(self, timeout=None, sleep=None):
start, sleep, end, timeout = self._get_timing(timeout, sleep)
while True:
try:
os.mkdir(self.path)
except OSError as err:
if err.errno == errno.EEXIST: # already locked
if self.by_me():
return self
if timeout is not None and time.time() > end:
raise self.LockTimeout(self.path)
time.sleep(sleep)
else:
raise self.LockFailed(self.path, str(err))
else:
with open(self.unique_name, "wb"):
pass
return self
def release(self):
fcntl.lockf(self.fd, fcntl.LOCK_UN)
self.fd.close()
if not self.is_locked():
raise self.NotLocked(self.path)
if not self.by_me():
raise self.NotMyLock(self.path)
os.unlink(self.unique_name)
os.rmdir(self.path)
def is_locked(self):
return os.path.exists(self.path)
def by_me(self):
return os.path.exists(self.unique_name)
def break_lock(self):
if self.is_locked():
for name in os.listdir(self.path):
os.unlink(os.path.join(self.path, name))
os.rmdir(self.path)
class LockRoster:
"""
A Lock Roster to track shared/exclusive lockers.
Note: you usually should call the methods with an exclusive lock held,
to avoid conflicting access by multiple threads/processes/machines.
"""
def __init__(self, path, id=None):
self.path = path
self.id = id or get_id()
def load(self):
try:
with open(self.path) as f:
data = json.load(f)
except IOError as err:
if err.errno != errno.ENOENT:
raise
data = {}
return data
def save(self, data):
with open(self.path, "w") as f:
json.dump(data, f)
def remove(self):
os.unlink(self.path)
def get(self, key):
roster = self.load()
return set(tuple(e) for e in roster.get(key, []))
def modify(self, key, op):
roster = self.load()
try:
elements = set(tuple(e) for e in roster[key])
except KeyError:
elements = set()
if op == ADD:
elements.add(self.id)
elif op == REMOVE:
elements.remove(self.id)
else:
raise ValueError('Unknown LockRoster op %r' % op)
roster[key] = list(list(e) for e in elements)
self.save(roster)
class UpgradableLock:
"""
A Lock for a resource that can be accessed in a shared or exclusive way.
Typically, write access to a resource needs an exclusive lock (1 writer,
noone is allowed reading) and read access to a resource needs a shared
lock (multiple readers are allowed).
"""
class SharedLockFailed(Error):
"""Failed to acquire shared lock [{}]"""
class ExclusiveLockFailed(Error):
"""Failed to acquire write lock [{}]"""
def __init__(self, path, exclusive=False, sleep=None, id=None):
self.path = path
self.is_exclusive = exclusive
self.sleep = sleep
self.id = id or get_id()
# globally keeping track of shared and exclusive lockers:
self._roster = LockRoster(path + '.roster', id=id)
# an exclusive lock, used for:
# - holding while doing roster queries / updates
# - holding while the UpgradableLock itself is exclusive
self._lock = ExclusiveLock(path + '.lock', id=id)
def __enter__(self):
return self.acquire()
def __exit__(self, *exc):
self.release()
def __repr__(self):
return "<%s: %r>" % (self.__class__.__name__, self.id)
def acquire(self, exclusive=None, remove=None, sleep=None):
if exclusive is None:
exclusive = self.is_exclusive
sleep = sleep or self.sleep or 0.2
try:
if exclusive:
self._wait_for_readers_finishing(remove, sleep)
self._roster.modify(EXCLUSIVE, ADD)
else:
with self._lock:
if remove is not None:
self._roster.modify(remove, REMOVE)
self._roster.modify(SHARED, ADD)
self.is_exclusive = exclusive
return self
except ExclusiveLock.LockError as err:
msg = str(err)
if exclusive:
raise self.ExclusiveLockFailed(msg)
else:
raise self.SharedLockFailed(msg)
def _wait_for_readers_finishing(self, remove, sleep):
while True:
self._lock.acquire()
if remove is not None:
self._roster.modify(remove, REMOVE)
remove = None
if len(self._roster.get(SHARED)) == 0:
return # we are the only one and we keep the lock!
self._lock.release()
time.sleep(sleep)
def release(self):
if self.is_exclusive:
self._roster.modify(EXCLUSIVE, REMOVE)
self._lock.release()
else:
with self._lock:
self._roster.modify(SHARED, REMOVE)
def upgrade(self):
if not self.is_exclusive:
self.acquire(exclusive=True, remove=SHARED)
def downgrade(self):
if self.is_exclusive:
self.acquire(exclusive=False, remove=EXCLUSIVE)
def break_lock(self):
self._roster.remove()
self._lock.break_lock()

View File

@ -114,11 +114,11 @@ class Repository:
self.path = path
if not os.path.isdir(path):
raise self.DoesNotExist(path)
self.lock = UpgradableLock(os.path.join(path, 'repo'), exclusive).acquire()
self.config = RawConfigParser()
self.config.read(os.path.join(self.path, 'config'))
if 'repository' not in self.config.sections() or self.config.getint('repository', 'version') != 1:
raise self.InvalidRepository(path)
self.lock = UpgradableLock(os.path.join(path, 'config'), exclusive)
self.max_segment_size = self.config.getint('repository', 'max_segment_size')
self.segments_per_dir = self.config.getint('repository', 'segments_per_dir')
self.id = unhexlify(self.config.get('repository', 'id').strip())
@ -149,7 +149,7 @@ class Repository:
self._active_txn = True
try:
self.lock.upgrade()
except UpgradableLock.WriteLockFailed:
except UpgradableLock.ExclusiveLockFailed:
# if upgrading the lock to exclusive fails, we do not have an
# active transaction. this is important for "serve" mode, where
# the repository instance lives on - even if exceptions happened.

View File

@ -1,24 +1,102 @@
import os
import tempfile
import unittest
import pytest
from ..locking import UpgradableLock
from . import BaseTestCase
from ..locking import get_id, ExclusiveLock, UpgradableLock, LockRoster, ADD, REMOVE, SHARED, EXCLUSIVE
class UpgradableLockTestCase(BaseTestCase):
ID1 = "foo", 1, 1
ID2 = "bar", 2, 2
def test(self):
file = tempfile.NamedTemporaryFile()
lock = UpgradableLock(file.name)
lock.upgrade()
lock.upgrade()
lock.release()
def test_id():
hostname, pid, tid = get_id()
assert isinstance(hostname, str)
assert isinstance(pid, int)
assert isinstance(tid, int)
assert len(hostname) > 0
assert pid > 0
@unittest.skipIf(os.getuid() == 0, 'Root can always open files for writing')
def test_read_only_lock_file(self):
file = tempfile.NamedTemporaryFile()
os.chmod(file.name, 0o444)
lock = UpgradableLock(file.name)
self.assert_raises(UpgradableLock.WriteLockFailed, lock.upgrade)
lock.release()
@pytest.fixture()
def lockpath(tmpdir):
return str(tmpdir.join('lock'))
class TestExclusiveLock:
def test_checks(self, lockpath):
with ExclusiveLock(lockpath, timeout=1) as lock:
assert lock.is_locked() and lock.by_me()
def test_acquire_break_reacquire(self, lockpath):
lock = ExclusiveLock(lockpath, id=ID1).acquire()
lock.break_lock()
with ExclusiveLock(lockpath, id=ID2):
pass
def test_timeout(self, lockpath):
with ExclusiveLock(lockpath, id=ID1):
with pytest.raises(ExclusiveLock.LockTimeout):
ExclusiveLock(lockpath, id=ID2, timeout=0.1).acquire()
class TestUpgradableLock:
def test_shared(self, lockpath):
lock1 = UpgradableLock(lockpath, exclusive=False, id=ID1).acquire()
lock2 = UpgradableLock(lockpath, exclusive=False, id=ID2).acquire()
assert len(lock1._roster.get(SHARED)) == 2
assert len(lock1._roster.get(EXCLUSIVE)) == 0
lock1.release()
lock2.release()
def test_exclusive(self, lockpath):
with UpgradableLock(lockpath, exclusive=True, id=ID1) as lock:
assert len(lock._roster.get(SHARED)) == 0
assert len(lock._roster.get(EXCLUSIVE)) == 1
def test_upgrade(self, lockpath):
with UpgradableLock(lockpath, exclusive=False) as lock:
lock.upgrade()
lock.upgrade() # NOP
assert len(lock._roster.get(SHARED)) == 0
assert len(lock._roster.get(EXCLUSIVE)) == 1
def test_downgrade(self, lockpath):
with UpgradableLock(lockpath, exclusive=True) as lock:
lock.downgrade()
lock.downgrade() # NOP
assert len(lock._roster.get(SHARED)) == 1
assert len(lock._roster.get(EXCLUSIVE)) == 0
def test_break(self, lockpath):
lock = UpgradableLock(lockpath, exclusive=True, id=ID1).acquire()
lock.break_lock()
assert len(lock._roster.get(SHARED)) == 0
assert len(lock._roster.get(EXCLUSIVE)) == 0
with UpgradableLock(lockpath, exclusive=True, id=ID2):
pass
@pytest.fixture()
def rosterpath(tmpdir):
return str(tmpdir.join('roster'))
class TestLockRoster:
def test_empty(self, rosterpath):
roster = LockRoster(rosterpath)
empty = roster.load()
roster.save(empty)
assert empty == {}
def test_modify_get(self, rosterpath):
roster1 = LockRoster(rosterpath, id=ID1)
assert roster1.get(SHARED) == set()
roster1.modify(SHARED, ADD)
assert roster1.get(SHARED) == {ID1, }
roster2 = LockRoster(rosterpath, id=ID2)
roster2.modify(SHARED, ADD)
assert roster2.get(SHARED) == {ID1, ID2, }
roster1 = LockRoster(rosterpath, id=ID1)
roster1.modify(SHARED, REMOVE)
assert roster1.get(SHARED) == {ID2, }
roster2 = LockRoster(rosterpath, id=ID2)
roster2.modify(SHARED, REMOVE)
assert roster2.get(SHARED) == set()

View File

@ -157,9 +157,9 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase):
for name in os.listdir(self.repository.path):
if name.startswith('index.'):
os.unlink(os.path.join(self.repository.path, name))
with patch.object(UpgradableLock, 'upgrade', side_effect=UpgradableLock.WriteLockFailed) as upgrade:
with patch.object(UpgradableLock, 'upgrade', side_effect=UpgradableLock.ExclusiveLockFailed) as upgrade:
self.reopen()
self.assert_raises(UpgradableLock.WriteLockFailed, lambda: len(self.repository))
self.assert_raises(UpgradableLock.ExclusiveLockFailed, lambda: len(self.repository))
upgrade.assert_called_once_with()
def test_crash_before_write_index(self):