1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2025-03-15 00:21:56 +00:00

Merge pull request #94 from ThomasWaldmann/locking

New Locking code
This commit is contained in:
TW 2015-07-13 19:27:24 +02:00
commit b3f135c642
8 changed files with 442 additions and 76 deletions

View file

@ -10,8 +10,9 @@ import tarfile
import tempfile import tempfile
from .key import PlaintextKey from .key import PlaintextKey
from .helpers import Error, get_cache_dir, decode_dict, st_mtime_ns, unhexlify, UpgradableLock, int_to_bigint, \ from .helpers import Error, get_cache_dir, decode_dict, st_mtime_ns, unhexlify, int_to_bigint, \
bigint_to_int bigint_to_int
from .locking import UpgradableLock
from .hashindex import ChunkIndex from .hashindex import ChunkIndex
@ -123,7 +124,7 @@ class Cache:
def open(self): def open(self):
if not os.path.isdir(self.path): if not os.path.isdir(self.path):
raise Exception('%s Does not look like a Borg cache' % self.path) raise Exception('%s Does not look like a Borg cache' % self.path)
self.lock = UpgradableLock(os.path.join(self.path, 'config'), exclusive=True) self.lock = UpgradableLock(os.path.join(self.path, 'repo'), exclusive=True).acquire()
self.rollback() self.rollback()
def close(self): def close(self):

View file

@ -2,7 +2,6 @@ import argparse
import binascii import binascii
from collections import namedtuple from collections import namedtuple
import grp import grp
import msgpack
import os import os
import pwd import pwd
import re import re
@ -11,7 +10,8 @@ import time
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from fnmatch import translate from fnmatch import translate
from operator import attrgetter from operator import attrgetter
import fcntl
import msgpack
from . import hashindex from . import hashindex
from . import chunker from . import chunker
@ -31,46 +31,6 @@ class ExtensionModuleError(Error):
"""The Borg binary extension modules do not seem to be properly installed""" """The Borg binary extension modules do not seem to be properly installed"""
class UpgradableLock:
class ReadLockFailed(Error):
"""Failed to acquire read lock on {}"""
class WriteLockFailed(Error):
"""Failed to acquire write lock on {}"""
def __init__(self, path, exclusive=False):
self.path = path
try:
self.fd = open(path, 'r+')
except IOError:
self.fd = open(path, 'r')
try:
if exclusive:
fcntl.lockf(self.fd, fcntl.LOCK_EX)
else:
fcntl.lockf(self.fd, fcntl.LOCK_SH)
# Python 3.2 raises IOError, Python3.3+ raises OSError
except (IOError, OSError):
if exclusive:
raise self.WriteLockFailed(self.path)
else:
raise self.ReadLockFailed(self.path)
self.is_exclusive = exclusive
def upgrade(self):
try:
fcntl.lockf(self.fd, fcntl.LOCK_EX)
# Python 3.2 raises IOError, Python3.3+ raises OSError
except (IOError, OSError):
raise self.WriteLockFailed(self.path)
self.is_exclusive = True
def release(self):
fcntl.lockf(self.fd, fcntl.LOCK_UN)
self.fd.close()
def check_extension_modules(): def check_extension_modules():
from . import platform from . import platform
if hashindex.API_VERSION != 2: if hashindex.API_VERSION != 2:

286
borg/locking.py Normal file
View file

@ -0,0 +1,286 @@
import errno
import json
import os
import socket
import threading
import time
from borg.helpers import Error
ADD, REMOVE = 'add', 'remove'
SHARED, EXCLUSIVE = 'shared', 'exclusive'
def get_id():
"""Get identification tuple for 'us'"""
hostname = socket.gethostname()
pid = os.getpid()
tid = threading.current_thread().ident & 0xffffffff
return hostname, pid, tid
class TimeoutTimer:
"""
A timer for timeout checks (can also deal with no timeout, give timeout=None [default]).
It can also compute and optionally execute a reasonable sleep time (e.g. to avoid
polling too often or to support thread/process rescheduling).
"""
def __init__(self, timeout=None, sleep=None):
"""
Initialize a timer.
:param timeout: time out interval [s] or None (no timeout)
:param sleep: sleep interval [s] (>= 0: do sleep call, <0: don't call sleep)
or None (autocompute: use 10% of timeout, or 1s for no timeout)
"""
if timeout is not None and timeout < 0:
raise ValueError("timeout must be >= 0")
self.timeout_interval = timeout
if sleep is None:
if timeout is None:
sleep = 1.0
else:
sleep = timeout / 10.0
self.sleep_interval = sleep
self.start_time = None
self.end_time = None
def __repr__(self):
return "<%s: start=%r end=%r timeout=%r sleep=%r>" % (
self.__class__.__name__, self.start_time, self.end_time,
self.timeout_interval, self.sleep_interval)
def start(self):
self.start_time = time.time()
if self.timeout_interval is not None:
self.end_time = self.start_time + self.timeout_interval
return self
def sleep(self):
if self.sleep_interval >= 0:
time.sleep(self.sleep_interval)
def timed_out(self):
return self.end_time is not None and time.time() >= self.end_time
def timed_out_or_sleep(self):
if self.timed_out():
return True
else:
self.sleep()
return False
class ExclusiveLock:
"""An exclusive Lock based on mkdir fs operation being atomic"""
class LockError(Error):
"""Failed to acquire the lock {}."""
class LockTimeout(LockError):
"""Failed to create/acquire the lock {} (timeout)."""
class LockFailed(LockError):
"""Failed to create/acquire the lock {} ({})."""
class UnlockError(Error):
"""Failed to release the lock {}."""
class NotLocked(UnlockError):
"""Failed to release the lock {} (was not locked)."""
class NotMyLock(UnlockError):
"""Failed to release the lock {} (was/is locked, but not by me)."""
def __init__(self, path, timeout=None, sleep=None, id=None):
self.timeout = timeout
self.sleep = sleep
self.path = os.path.abspath(path)
self.id = id or get_id()
self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id)
def __enter__(self):
return self.acquire()
def __exit__(self, *exc):
self.release()
def __repr__(self):
return "<%s: %r>" % (self.__class__.__name__, self.unique_name)
def acquire(self, timeout=None, sleep=None):
if timeout is None:
timeout = self.timeout
if sleep is None:
sleep = self.sleep
timer = TimeoutTimer(timeout, sleep).start()
while True:
try:
os.mkdir(self.path)
except OSError as err:
if err.errno == errno.EEXIST: # already locked
if self.by_me():
return self
if timer.timed_out_or_sleep():
raise self.LockTimeout(self.path)
else:
raise self.LockFailed(self.path, str(err))
else:
with open(self.unique_name, "wb"):
pass
return self
def release(self):
if not self.is_locked():
raise self.NotLocked(self.path)
if not self.by_me():
raise self.NotMyLock(self.path)
os.unlink(self.unique_name)
os.rmdir(self.path)
def is_locked(self):
return os.path.exists(self.path)
def by_me(self):
return os.path.exists(self.unique_name)
def break_lock(self):
if self.is_locked():
for name in os.listdir(self.path):
os.unlink(os.path.join(self.path, name))
os.rmdir(self.path)
class LockRoster:
"""
A Lock Roster to track shared/exclusive lockers.
Note: you usually should call the methods with an exclusive lock held,
to avoid conflicting access by multiple threads/processes/machines.
"""
def __init__(self, path, id=None):
self.path = path
self.id = id or get_id()
def load(self):
try:
with open(self.path) as f:
data = json.load(f)
except IOError as err:
if err.errno != errno.ENOENT:
raise
data = {}
return data
def save(self, data):
with open(self.path, "w") as f:
json.dump(data, f)
def remove(self):
os.unlink(self.path)
def get(self, key):
roster = self.load()
return set(tuple(e) for e in roster.get(key, []))
def modify(self, key, op):
roster = self.load()
try:
elements = set(tuple(e) for e in roster[key])
except KeyError:
elements = set()
if op == ADD:
elements.add(self.id)
elif op == REMOVE:
elements.remove(self.id)
else:
raise ValueError('Unknown LockRoster op %r' % op)
roster[key] = list(list(e) for e in elements)
self.save(roster)
class UpgradableLock:
"""
A Lock for a resource that can be accessed in a shared or exclusive way.
Typically, write access to a resource needs an exclusive lock (1 writer,
noone is allowed reading) and read access to a resource needs a shared
lock (multiple readers are allowed).
"""
class SharedLockFailed(Error):
"""Failed to acquire shared lock [{}]"""
class ExclusiveLockFailed(Error):
"""Failed to acquire write lock [{}]"""
def __init__(self, path, exclusive=False, sleep=None, id=None):
self.path = path
self.is_exclusive = exclusive
self.sleep = sleep
self.id = id or get_id()
# globally keeping track of shared and exclusive lockers:
self._roster = LockRoster(path + '.roster', id=id)
# an exclusive lock, used for:
# - holding while doing roster queries / updates
# - holding while the UpgradableLock itself is exclusive
self._lock = ExclusiveLock(path + '.lock', id=id)
def __enter__(self):
return self.acquire()
def __exit__(self, *exc):
self.release()
def __repr__(self):
return "<%s: %r>" % (self.__class__.__name__, self.id)
def acquire(self, exclusive=None, remove=None, sleep=None):
if exclusive is None:
exclusive = self.is_exclusive
sleep = sleep or self.sleep or 0.2
try:
if exclusive:
self._wait_for_readers_finishing(remove, sleep)
self._roster.modify(EXCLUSIVE, ADD)
else:
with self._lock:
if remove is not None:
self._roster.modify(remove, REMOVE)
self._roster.modify(SHARED, ADD)
self.is_exclusive = exclusive
return self
except ExclusiveLock.LockError as err:
msg = str(err)
if exclusive:
raise self.ExclusiveLockFailed(msg)
else:
raise self.SharedLockFailed(msg)
def _wait_for_readers_finishing(self, remove, sleep):
while True:
self._lock.acquire()
if remove is not None:
self._roster.modify(remove, REMOVE)
remove = None
if len(self._roster.get(SHARED)) == 0:
return # we are the only one and we keep the lock!
self._lock.release()
time.sleep(sleep)
def release(self):
if self.is_exclusive:
self._roster.modify(EXCLUSIVE, REMOVE)
self._lock.release()
else:
with self._lock:
self._roster.modify(SHARED, REMOVE)
def upgrade(self):
if not self.is_exclusive:
self.acquire(exclusive=True, remove=SHARED)
def downgrade(self):
if self.is_exclusive:
self.acquire(exclusive=False, remove=EXCLUSIVE)
def break_lock(self):
self._roster.remove()
self._lock.break_lock()

View file

@ -9,7 +9,8 @@ import sys
from zlib import crc32 from zlib import crc32
from .hashindex import NSIndex from .hashindex import NSIndex
from .helpers import Error, IntegrityError, read_msgpack, write_msgpack, unhexlify, UpgradableLock from .helpers import Error, IntegrityError, read_msgpack, write_msgpack, unhexlify
from .locking import UpgradableLock
from .lrucache import LRUCache from .lrucache import LRUCache
MAX_OBJECT_SIZE = 20 * 1024 * 1024 MAX_OBJECT_SIZE = 20 * 1024 * 1024
@ -113,11 +114,11 @@ class Repository:
self.path = path self.path = path
if not os.path.isdir(path): if not os.path.isdir(path):
raise self.DoesNotExist(path) raise self.DoesNotExist(path)
self.lock = UpgradableLock(os.path.join(path, 'repo'), exclusive).acquire()
self.config = RawConfigParser() self.config = RawConfigParser()
self.config.read(os.path.join(self.path, 'config')) self.config.read(os.path.join(self.path, 'config'))
if 'repository' not in self.config.sections() or self.config.getint('repository', 'version') != 1: if 'repository' not in self.config.sections() or self.config.getint('repository', 'version') != 1:
raise self.InvalidRepository(path) raise self.InvalidRepository(path)
self.lock = UpgradableLock(os.path.join(path, 'config'), exclusive)
self.max_segment_size = self.config.getint('repository', 'max_segment_size') self.max_segment_size = self.config.getint('repository', 'max_segment_size')
self.segments_per_dir = self.config.getint('repository', 'segments_per_dir') self.segments_per_dir = self.config.getint('repository', 'segments_per_dir')
self.id = unhexlify(self.config.get('repository', 'id').strip()) self.id = unhexlify(self.config.get('repository', 'id').strip())
@ -148,7 +149,7 @@ class Repository:
self._active_txn = True self._active_txn = True
try: try:
self.lock.upgrade() self.lock.upgrade()
except UpgradableLock.WriteLockFailed: except UpgradableLock.ExclusiveLockFailed:
# if upgrading the lock to exclusive fails, we do not have an # if upgrading the lock to exclusive fails, we do not have an
# active transaction. this is important for "serve" mode, where # active transaction. this is important for "serve" mode, where
# the repository instance lives on - even if exceptions happened. # the repository instance lives on - even if exceptions happened.

View file

@ -1,13 +1,11 @@
import hashlib import hashlib
from time import mktime, strptime from time import mktime, strptime
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
import os
import tempfile
import unittest
import msgpack import msgpack
from ..helpers import adjust_patterns, exclude_path, Location, format_timedelta, ExcludePattern, make_path_safe, UpgradableLock, prune_within, prune_split, \ from ..helpers import adjust_patterns, exclude_path, Location, format_timedelta, ExcludePattern, make_path_safe, \
prune_within, prune_split, \
StableDict, int_to_bigint, bigint_to_int, parse_timestamp StableDict, int_to_bigint, bigint_to_int, parse_timestamp
from . import BaseTestCase from . import BaseTestCase
@ -119,24 +117,6 @@ class MakePathSafeTestCase(BaseTestCase):
self.assert_equal(make_path_safe('/'), '.') self.assert_equal(make_path_safe('/'), '.')
class UpgradableLockTestCase(BaseTestCase):
def test(self):
file = tempfile.NamedTemporaryFile()
lock = UpgradableLock(file.name)
lock.upgrade()
lock.upgrade()
lock.release()
@unittest.skipIf(os.getuid() == 0, 'Root can always open files for writing')
def test_read_only_lock_file(self):
file = tempfile.NamedTemporaryFile()
os.chmod(file.name, 0o444)
lock = UpgradableLock(file.name)
self.assert_raises(UpgradableLock.WriteLockFailed, lock.upgrade)
lock.release()
class MockArchive: class MockArchive:
def __init__(self, ts): def __init__(self, ts):

121
borg/testsuite/locking.py Normal file
View file

@ -0,0 +1,121 @@
import time
import pytest
from ..locking import get_id, TimeoutTimer, ExclusiveLock , UpgradableLock, LockRoster, ADD, REMOVE, SHARED, EXCLUSIVE
ID1 = "foo", 1, 1
ID2 = "bar", 2, 2
def test_id():
hostname, pid, tid = get_id()
assert isinstance(hostname, str)
assert isinstance(pid, int)
assert isinstance(tid, int)
assert len(hostname) > 0
assert pid > 0
class TestTimeoutTimer:
def test_timeout(self):
timeout = 0.5
t = TimeoutTimer(timeout).start()
assert not t.timed_out()
time.sleep(timeout * 1.5)
assert t.timed_out()
def test_notimeout_sleep(self):
timeout, sleep = None, 0.5
t = TimeoutTimer(timeout, sleep).start()
assert not t.timed_out_or_sleep()
assert time.time() >= t.start_time + 1 * sleep
assert not t.timed_out_or_sleep()
assert time.time() >= t.start_time + 2 * sleep
@pytest.fixture()
def lockpath(tmpdir):
return str(tmpdir.join('lock'))
class TestExclusiveLock:
def test_checks(self, lockpath):
with ExclusiveLock(lockpath, timeout=1) as lock:
assert lock.is_locked() and lock.by_me()
def test_acquire_break_reacquire(self, lockpath):
lock = ExclusiveLock(lockpath, id=ID1).acquire()
lock.break_lock()
with ExclusiveLock(lockpath, id=ID2):
pass
def test_timeout(self, lockpath):
with ExclusiveLock(lockpath, id=ID1):
with pytest.raises(ExclusiveLock.LockTimeout):
ExclusiveLock(lockpath, id=ID2, timeout=0.1).acquire()
class TestUpgradableLock:
def test_shared(self, lockpath):
lock1 = UpgradableLock(lockpath, exclusive=False, id=ID1).acquire()
lock2 = UpgradableLock(lockpath, exclusive=False, id=ID2).acquire()
assert len(lock1._roster.get(SHARED)) == 2
assert len(lock1._roster.get(EXCLUSIVE)) == 0
lock1.release()
lock2.release()
def test_exclusive(self, lockpath):
with UpgradableLock(lockpath, exclusive=True, id=ID1) as lock:
assert len(lock._roster.get(SHARED)) == 0
assert len(lock._roster.get(EXCLUSIVE)) == 1
def test_upgrade(self, lockpath):
with UpgradableLock(lockpath, exclusive=False) as lock:
lock.upgrade()
lock.upgrade() # NOP
assert len(lock._roster.get(SHARED)) == 0
assert len(lock._roster.get(EXCLUSIVE)) == 1
def test_downgrade(self, lockpath):
with UpgradableLock(lockpath, exclusive=True) as lock:
lock.downgrade()
lock.downgrade() # NOP
assert len(lock._roster.get(SHARED)) == 1
assert len(lock._roster.get(EXCLUSIVE)) == 0
def test_break(self, lockpath):
lock = UpgradableLock(lockpath, exclusive=True, id=ID1).acquire()
lock.break_lock()
assert len(lock._roster.get(SHARED)) == 0
assert len(lock._roster.get(EXCLUSIVE)) == 0
with UpgradableLock(lockpath, exclusive=True, id=ID2):
pass
@pytest.fixture()
def rosterpath(tmpdir):
return str(tmpdir.join('roster'))
class TestLockRoster:
def test_empty(self, rosterpath):
roster = LockRoster(rosterpath)
empty = roster.load()
roster.save(empty)
assert empty == {}
def test_modify_get(self, rosterpath):
roster1 = LockRoster(rosterpath, id=ID1)
assert roster1.get(SHARED) == set()
roster1.modify(SHARED, ADD)
assert roster1.get(SHARED) == {ID1, }
roster2 = LockRoster(rosterpath, id=ID2)
roster2.modify(SHARED, ADD)
assert roster2.get(SHARED) == {ID1, ID2, }
roster1 = LockRoster(rosterpath, id=ID1)
roster1.modify(SHARED, REMOVE)
assert roster1.get(SHARED) == {ID2, }
roster2 = LockRoster(rosterpath, id=ID2)
roster2.modify(SHARED, REMOVE)
assert roster2.get(SHARED) == set()

View file

@ -3,7 +3,8 @@ import shutil
import tempfile import tempfile
from ..hashindex import NSIndex from ..hashindex import NSIndex
from ..helpers import Location, IntegrityError, UpgradableLock from ..helpers import Location, IntegrityError
from ..locking import UpgradableLock
from ..remote import RemoteRepository, InvalidRPCMethod from ..remote import RemoteRepository, InvalidRPCMethod
from ..repository import Repository from ..repository import Repository
from . import BaseTestCase from . import BaseTestCase
@ -156,9 +157,9 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase):
for name in os.listdir(self.repository.path): for name in os.listdir(self.repository.path):
if name.startswith('index.'): if name.startswith('index.'):
os.unlink(os.path.join(self.repository.path, name)) os.unlink(os.path.join(self.repository.path, name))
with patch.object(UpgradableLock, 'upgrade', side_effect=UpgradableLock.WriteLockFailed) as upgrade: with patch.object(UpgradableLock, 'upgrade', side_effect=UpgradableLock.ExclusiveLockFailed) as upgrade:
self.reopen() self.reopen()
self.assert_raises(UpgradableLock.WriteLockFailed, lambda: len(self.repository)) self.assert_raises(UpgradableLock.ExclusiveLockFailed, lambda: len(self.repository))
upgrade.assert_called_once_with() upgrade.assert_called_once_with()
def test_crash_before_write_index(self): def test_crash_before_write_index(self):

24
tox.ini
View file

@ -1,11 +1,27 @@
# tox configuration - if you change anything here, run this to verify:
# fakeroot -u tox --recreate
#
# Invokation examples:
# fakeroot -u tox # run all tests
# fakeroot -u tox -e py32 # run all tests, but only on python 3.2
# fakeroot -u tox borg.testsuite.locking # only run 1 test module
# fakeroot -u tox borg.testsuite.locking -- -k '"not Timer"' # exclude some tests
# fakeroot -u tox borg.testsuite -- -v # verbose py.test
#
# Important notes:
# Without fakeroot -u some tests will fail.
# When using -- to give options to py.test, you MUST also give borg.testsuite[.module].
[tox] [tox]
envlist = py32, py33, py34 envlist = py32, py33, py34
[testenv] [testenv]
# Change dir to avoid import problem # Change dir to avoid import problem for cython code. The directory does
changedir = {envdir} # not really matter, should be just different from the toplevel dir.
changedir = {toxworkdir}
deps = deps =
pytest pytest
mock mock
commands = py.test commands = py.test --pyargs {posargs:borg.testsuite}
passenv = * # fakeroot -u needs some env vars # fakeroot -u needs some env vars:
passenv = *