diff --git a/docs/faq.rst b/docs/faq.rst index 6df7b0355..5585d7d09 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -894,7 +894,7 @@ I am having troubles with some network/FUSE/special filesystem, why? -------------------------------------------------------------------- Borg is doing nothing special in the filesystem, it only uses very -common and compatible operations (even the locking is just "mkdir"). +common and compatible operations (even the locking is just "rename"). So, if you are encountering issues like slowness, corruption or malfunction when using a specific filesystem, please try if you can reproduce the issues @@ -1055,7 +1055,7 @@ Here's a (incomplete) list of some major changes: nor the pbkdf2 iteration count in "passphrase" mode) * simple sparse file support, great for virtual machine disk files * can read special files (e.g. block devices) or from stdin, write to stdout -* mkdir-based locking is more compatible than attic's posix locking +* rename-based locking is more compatible than attic's posix locking * uses fadvise to not spoil / blow up the fs cache * better error messages / exception handling * better logging, screen output, progress indication diff --git a/docs/internals/data-structures.rst b/docs/internals/data-structures.rst index 241a25de9..3f54da93b 100644 --- a/docs/internals/data-structures.rst +++ b/docs/internals/data-structures.rst @@ -977,16 +977,19 @@ Lock files Borg uses locks to get (exclusive or shared) access to the cache and the repository. -The locking system is based on creating a directory `lock.exclusive` (for -exclusive locks). Inside the lock directory, there is a file indicating +The locking system is based on renaming a temporary directory +to `lock.exclusive` (for +exclusive locks). Inside this directory, there is a file indicating hostname, process id and thread id of the lock holder. There is also a json file `lock.roster` that keeps a directory of all shared and exclusive lockers. -If the process can create the `lock.exclusive` directory for a resource, it has -the lock for it. If creation fails (because the directory has already been -created by some other process), lock acquisition fails. +If the process is able to rename a temporary directory (with the +host/process/thread identifier prepared inside it) in the resource directory +to `lock.exclusive`, it has the lock for it. If renaming fails +(because this directory already exists and its host/process/thread identifier +denotes a thread on the host which is still alive), lock acquisition fails. The cache lock is usually in `~/.cache/borg/REPOID/lock.*`. The repository lock is in `repository/lock.*`. diff --git a/docs/man/borg.1 b/docs/man/borg.1 index f2f69906a..2cd4fb64e 100644 --- a/docs/man/borg.1 +++ b/docs/man/borg.1 @@ -585,7 +585,10 @@ Large repositories may require large files (>2 GB). .IP \(bu 2 Up to 1000 files per directory (10000 for repositories initialized with Borg 1.0) .IP \(bu 2 -mkdir(2) should be atomic, since it is used for locking +rename(2) / MoveFile(Ex) should work as specified, i.e. on the same file system +it should be a move (not a copy) operation, and in case of a directory +it should fail if the destination exists and is not an empty directory, +since this is used for locking .IP \(bu 2 Hardlinks are needed for \fIborg_upgrade\fP \fB\-\-inplace\fP .UNINDENT diff --git a/docs/usage/general/file-systems.rst.inc b/docs/usage/general/file-systems.rst.inc index 5a7d75e85..137697ca9 100644 --- a/docs/usage/general/file-systems.rst.inc +++ b/docs/usage/general/file-systems.rst.inc @@ -22,5 +22,8 @@ and readable after one of the failures mentioned above occurred, run - Typically, file sizes up to a few hundred MB. Large repositories may require large files (>2 GB). - Up to 1000 files per directory (10000 for repositories initialized with Borg 1.0) -- mkdir(2) should be atomic, since it is used for locking +- rename(2) / MoveFile(Ex) should work as specified, i.e. on the same file system + it should be a move (not a copy) operation, and in case of a directory + it should fail if the destination exists and is not an empty directory, + since this is used for locking. - Hardlinks are needed for :ref:`borg_upgrade` ``--inplace`` diff --git a/src/borg/locking.py b/src/borg/locking.py index 37d1f8f7b..34f5281fb 100644 --- a/src/borg/locking.py +++ b/src/borg/locking.py @@ -1,6 +1,7 @@ import errno import json import os +import tempfile import time from . import platform @@ -124,22 +125,42 @@ class ExclusiveLock: timeout = self.timeout if sleep is None: sleep = self.sleep - timer = TimeoutTimer(timeout, sleep).start() - while True: - try: - os.mkdir(self.path) - except FileExistsError: # already locked - if self.by_me(): + parent_path, base_name = os.path.split(self.path) + unique_base_name = os.path.basename(self.unique_name) + temp_path = None + try: + temp_path = tempfile.mkdtemp(".tmp", base_name + '.', parent_path) + temp_unique_name = os.path.join(temp_path, unique_base_name) + with open(temp_unique_name, "wb"): + pass + except OSError as err: + raise LockFailed(self.path, str(err)) from None + else: + timer = TimeoutTimer(timeout, sleep).start() + while True: + try: + os.rename(temp_path, self.path) + except OSError: # already locked + if self.by_me(): + return self + self.kill_stale_lock() + if timer.timed_out_or_sleep(): + raise LockTimeout(self.path) from None + else: + temp_path = None # see finally:-block below return self - self.kill_stale_lock() - if timer.timed_out_or_sleep(): - raise LockTimeout(self.path) - except OSError as err: - raise LockFailed(self.path, str(err)) from None - else: - with open(self.unique_name, "wb"): + finally: + if temp_path is not None: + # Renaming failed for some reason, so temp_dir still exists and + # should be cleaned up anyway. Try to clean up, but don't crash. + try: + os.unlink(temp_unique_name) + except: + pass + try: + os.rmdir(temp_path) + except: pass - return self def release(self): if not self.is_locked(): @@ -147,7 +168,15 @@ class ExclusiveLock: if not self.by_me(): raise NotMyLock(self.path) os.unlink(self.unique_name) - os.rmdir(self.path) + try: + os.rmdir(self.path) + except OSError as err: + if err.errno not in (errno.ENOTEMPTY, errno.ENOENT): + # EACCES or EIO or ... = we cannot operate anyway, so re-throw + raise err + # else: + # Directory is not empty or doesn't exist any more. + # this means we lost the race to somebody else -- which is ok. def is_locked(self): return os.path.exists(self.path) @@ -156,42 +185,47 @@ class ExclusiveLock: return os.path.exists(self.unique_name) def kill_stale_lock(self): - for name in os.listdir(self.path): - try: - host_pid, thread_str = name.rsplit('-', 1) - host, pid_str = host_pid.rsplit('.', 1) - pid = int(pid_str) - thread = int(thread_str) - except ValueError: - # Malformed lock name? Or just some new format we don't understand? - logger.error("Found malformed lock %s in %s. Please check/fix manually.", name, self.path) - return False + try: + names = os.listdir(self.path) + except FileNotFoundError: # another process did our job in the meantime. + pass + else: + for name in names: + try: + host_pid, thread_str = name.rsplit('-', 1) + host, pid_str = host_pid.rsplit('.', 1) + pid = int(pid_str) + thread = int(thread_str) + except ValueError: + # Malformed lock name? Or just some new format we don't understand? + logger.error("Found malformed lock %s in %s. Please check/fix manually.", name, self.path) + return False - if platform.process_alive(host, pid, thread): - return False + if platform.process_alive(host, pid, thread): + return False - if not self.kill_stale_locks: - if not self.stale_warning_printed: - # Log this at warning level to hint the user at the ability - logger.warning("Found stale lock %s, but not deleting because self.kill_stale_locks = False.", name) - self.stale_warning_printed = True - return False + if not self.kill_stale_locks: + if not self.stale_warning_printed: + # Log this at warning level to hint the user at the ability + logger.warning("Found stale lock %s, but not deleting because self.kill_stale_locks = False.", name) + self.stale_warning_printed = True + return False - try: - os.unlink(os.path.join(self.path, name)) - logger.warning('Killed stale lock %s.', name) - except OSError as err: - if not self.stale_warning_printed: - # This error will bubble up and likely result in locking failure - logger.error('Found stale lock %s, but cannot delete due to %s', name, str(err)) - self.stale_warning_printed = True - return False + try: + os.unlink(os.path.join(self.path, name)) + logger.warning('Killed stale lock %s.', name) + except OSError as err: + if not self.stale_warning_printed: + # This error will bubble up and likely result in locking failure + logger.error('Found stale lock %s, but cannot delete due to %s', name, str(err)) + self.stale_warning_printed = True + return False try: os.rmdir(self.path) except OSError as err: - if err.errno == errno.ENOTEMPTY: - # Directory is not empty = we lost the race to somebody else + if err.errno == errno.ENOTEMPTY or err.errno == errno.ENOENT: + # Directory is not empty or doesn't exist any more = we lost the race to somebody else--which is ok. return False # EACCES or EIO or ... = we cannot operate anyway logger.error('Failed to remove lock dir: %s', str(err)) diff --git a/src/borg/testsuite/locking.py b/src/borg/testsuite/locking.py index de14e394a..7a0f219cd 100644 --- a/src/borg/testsuite/locking.py +++ b/src/borg/testsuite/locking.py @@ -1,5 +1,7 @@ import random import time +from threading import Thread, Lock as ThreadingLock +from traceback import format_exc import pytest @@ -10,6 +12,8 @@ from ..locking import TimeoutTimer, ExclusiveLock, Lock, LockRoster, \ ID1 = "foo", 1, 1 ID2 = "bar", 2, 2 +RACE_TEST_NUM_THREADS = 40 +RACE_TEST_DURATION = 0.4 # seconds @pytest.fixture() @@ -90,6 +94,78 @@ class TestExclusiveLock: assert lock.by_me() # we still have the lock assert old_unique_name != new_unique_name # locking filename is different now + def test_race_condition(self, lockpath): + + class SynchronizedCounter: + + def __init__(self, count=0): + self.lock = ThreadingLock() + self.count = count + self.maxcount = count + + def value(self): + with self.lock: + return self.count + + def maxvalue(self): + with self.lock: + return self.maxcount + + def incr(self): + with self.lock: + self.count += 1 + if self.count > self.maxcount: + self.maxcount = self.count + return self.count + + def decr(self): + with self.lock: + self.count -= 1 + return self.count + + def print_locked(msg): + with print_lock: + print(msg) + + def acquire_release_loop(id, timeout, thread_id, lock_owner_counter, exception_counter, print_lock, last_thread=None): + print_locked("Thread %2d: Starting acquire_release_loop(id=%s, timeout=%d); lockpath=%s" % (thread_id, id, timeout, lockpath)) + timer = TimeoutTimer(timeout, -1).start() + cycle = 0 + + while not timer.timed_out(): + cycle += 1 + try: + with ExclusiveLock(lockpath, id=id, timeout=timeout/20, sleep=-1): # This timeout is only for not exceeding the given timeout by more than 5%. With sleep<0 it's constantly polling anyway. + lock_owner_count = lock_owner_counter.incr() + print_locked("Thread %2d: Acquired the lock. It's my %d. loop cycle. I am the %d. who has the lock concurrently." % (thread_id, cycle, lock_owner_count)) + time.sleep(0.005) + lock_owner_count = lock_owner_counter.decr() + print_locked("Thread %2d: Releasing the lock, finishing my %d. loop cycle. Currently, %d colleagues still have the lock." % (thread_id, cycle, lock_owner_count)) + except LockTimeout: + print_locked("Thread %2d: Got LockTimeout, finishing my %d. loop cycle." % (thread_id, cycle)) + except: + exception_count = exception_counter.incr() + e = format_exc() + print_locked("Thread %2d: Exception thrown, finishing my %d. loop cycle. It's the %d. exception seen until now: %s" % (thread_id, cycle, exception_count, e)) + + print_locked("Thread %2d: Loop timed out--terminating after %d loop cycles." % (thread_id, cycle)) + if last_thread is not None: # joining its predecessor, if any + last_thread.join() + + print('') + lock_owner_counter = SynchronizedCounter() + exception_counter = SynchronizedCounter() + print_lock = ThreadingLock() + thread = None + for thread_id in range(RACE_TEST_NUM_THREADS): + thread = Thread(target=acquire_release_loop, args=(('foo', thread_id, 0), RACE_TEST_DURATION, thread_id, lock_owner_counter, exception_counter, print_lock, thread)) + thread.start() + thread.join() # joining the last thread + + assert lock_owner_counter.maxvalue() > 0, 'Never gained the lock? Something went wrong here...' + assert lock_owner_counter.maxvalue() <= 1, "Maximal number of concurrent lock holders was %d. So exclusivity is broken." % (lock_owner_counter.maxvalue()) + assert exception_counter.value() == 0, "ExclusiveLock threw %d exceptions due to unclean concurrency handling." % (exception_counter.value()) + class TestLock: def test_shared(self, lockpath):