mirror of
https://github.com/borgbackup/borg.git
synced 2024-12-24 08:45:13 +00:00
locking: fix ExclusiveLock race condition bug, fixes #4923 - ExclusiveLock is now based on os.rename instead of os.mkdir. - catch FileNotFoundError observed under race condition in ExclusiveLock.release() and .kill_stale_lock() - added TestExclusiveLock.test_race_condition() which reveals issue #4923 - updated docs - locking: use "raise LockTimeout from None" for prettier traceback Co-authored-by: Thomas Portmann <thomas@portmann.org> Co-authored-by: Thomas Waldmann <tw@waldmann-edv.de>
This commit is contained in:
parent
f2f8692f86
commit
dfc5e915cc
6 changed files with 172 additions and 53 deletions
|
@ -894,7 +894,7 @@ I am having troubles with some network/FUSE/special filesystem, why?
|
|||
--------------------------------------------------------------------
|
||||
|
||||
Borg is doing nothing special in the filesystem, it only uses very
|
||||
common and compatible operations (even the locking is just "mkdir").
|
||||
common and compatible operations (even the locking is just "rename").
|
||||
|
||||
So, if you are encountering issues like slowness, corruption or malfunction
|
||||
when using a specific filesystem, please try if you can reproduce the issues
|
||||
|
@ -1055,7 +1055,7 @@ Here's a (incomplete) list of some major changes:
|
|||
nor the pbkdf2 iteration count in "passphrase" mode)
|
||||
* simple sparse file support, great for virtual machine disk files
|
||||
* can read special files (e.g. block devices) or from stdin, write to stdout
|
||||
* mkdir-based locking is more compatible than attic's posix locking
|
||||
* rename-based locking is more compatible than attic's posix locking
|
||||
* uses fadvise to not spoil / blow up the fs cache
|
||||
* better error messages / exception handling
|
||||
* better logging, screen output, progress indication
|
||||
|
|
|
@ -977,16 +977,19 @@ Lock files
|
|||
Borg uses locks to get (exclusive or shared) access to the cache and
|
||||
the repository.
|
||||
|
||||
The locking system is based on creating a directory `lock.exclusive` (for
|
||||
exclusive locks). Inside the lock directory, there is a file indicating
|
||||
The locking system is based on renaming a temporary directory
|
||||
to `lock.exclusive` (for
|
||||
exclusive locks). Inside this directory, there is a file indicating
|
||||
hostname, process id and thread id of the lock holder.
|
||||
|
||||
There is also a json file `lock.roster` that keeps a directory of all shared
|
||||
and exclusive lockers.
|
||||
|
||||
If the process can create the `lock.exclusive` directory for a resource, it has
|
||||
the lock for it. If creation fails (because the directory has already been
|
||||
created by some other process), lock acquisition fails.
|
||||
If the process is able to rename a temporary directory (with the
|
||||
host/process/thread identifier prepared inside it) in the resource directory
|
||||
to `lock.exclusive`, it has the lock for it. If renaming fails
|
||||
(because this directory already exists and its host/process/thread identifier
|
||||
denotes a thread on the host which is still alive), lock acquisition fails.
|
||||
|
||||
The cache lock is usually in `~/.cache/borg/REPOID/lock.*`.
|
||||
The repository lock is in `repository/lock.*`.
|
||||
|
|
|
@ -585,7 +585,10 @@ Large repositories may require large files (>2 GB).
|
|||
.IP \(bu 2
|
||||
Up to 1000 files per directory (10000 for repositories initialized with Borg 1.0)
|
||||
.IP \(bu 2
|
||||
mkdir(2) should be atomic, since it is used for locking
|
||||
rename(2) / MoveFile(Ex) should work as specified, i.e. on the same file system
|
||||
it should be a move (not a copy) operation, and in case of a directory
|
||||
it should fail if the destination exists and is not an empty directory,
|
||||
since this is used for locking
|
||||
.IP \(bu 2
|
||||
Hardlinks are needed for \fIborg_upgrade\fP \fB\-\-inplace\fP
|
||||
.UNINDENT
|
||||
|
|
|
@ -22,5 +22,8 @@ and readable after one of the failures mentioned above occurred, run
|
|||
- Typically, file sizes up to a few hundred MB.
|
||||
Large repositories may require large files (>2 GB).
|
||||
- Up to 1000 files per directory (10000 for repositories initialized with Borg 1.0)
|
||||
- mkdir(2) should be atomic, since it is used for locking
|
||||
- rename(2) / MoveFile(Ex) should work as specified, i.e. on the same file system
|
||||
it should be a move (not a copy) operation, and in case of a directory
|
||||
it should fail if the destination exists and is not an empty directory,
|
||||
since this is used for locking.
|
||||
- Hardlinks are needed for :ref:`borg_upgrade` ``--inplace``
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import errno
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
from . import platform
|
||||
|
@ -124,22 +125,42 @@ def acquire(self, timeout=None, sleep=None):
|
|||
timeout = self.timeout
|
||||
if sleep is None:
|
||||
sleep = self.sleep
|
||||
timer = TimeoutTimer(timeout, sleep).start()
|
||||
while True:
|
||||
try:
|
||||
os.mkdir(self.path)
|
||||
except FileExistsError: # already locked
|
||||
if self.by_me():
|
||||
parent_path, base_name = os.path.split(self.path)
|
||||
unique_base_name = os.path.basename(self.unique_name)
|
||||
temp_path = None
|
||||
try:
|
||||
temp_path = tempfile.mkdtemp(".tmp", base_name + '.', parent_path)
|
||||
temp_unique_name = os.path.join(temp_path, unique_base_name)
|
||||
with open(temp_unique_name, "wb"):
|
||||
pass
|
||||
except OSError as err:
|
||||
raise LockFailed(self.path, str(err)) from None
|
||||
else:
|
||||
timer = TimeoutTimer(timeout, sleep).start()
|
||||
while True:
|
||||
try:
|
||||
os.rename(temp_path, self.path)
|
||||
except OSError: # already locked
|
||||
if self.by_me():
|
||||
return self
|
||||
self.kill_stale_lock()
|
||||
if timer.timed_out_or_sleep():
|
||||
raise LockTimeout(self.path) from None
|
||||
else:
|
||||
temp_path = None # see finally:-block below
|
||||
return self
|
||||
self.kill_stale_lock()
|
||||
if timer.timed_out_or_sleep():
|
||||
raise LockTimeout(self.path)
|
||||
except OSError as err:
|
||||
raise LockFailed(self.path, str(err)) from None
|
||||
else:
|
||||
with open(self.unique_name, "wb"):
|
||||
finally:
|
||||
if temp_path is not None:
|
||||
# Renaming failed for some reason, so temp_dir still exists and
|
||||
# should be cleaned up anyway. Try to clean up, but don't crash.
|
||||
try:
|
||||
os.unlink(temp_unique_name)
|
||||
except:
|
||||
pass
|
||||
try:
|
||||
os.rmdir(temp_path)
|
||||
except:
|
||||
pass
|
||||
return self
|
||||
|
||||
def release(self):
|
||||
if not self.is_locked():
|
||||
|
@ -147,7 +168,15 @@ def release(self):
|
|||
if not self.by_me():
|
||||
raise NotMyLock(self.path)
|
||||
os.unlink(self.unique_name)
|
||||
os.rmdir(self.path)
|
||||
try:
|
||||
os.rmdir(self.path)
|
||||
except OSError as err:
|
||||
if err.errno not in (errno.ENOTEMPTY, errno.ENOENT):
|
||||
# EACCES or EIO or ... = we cannot operate anyway, so re-throw
|
||||
raise err
|
||||
# else:
|
||||
# Directory is not empty or doesn't exist any more.
|
||||
# this means we lost the race to somebody else -- which is ok.
|
||||
|
||||
def is_locked(self):
|
||||
return os.path.exists(self.path)
|
||||
|
@ -156,42 +185,47 @@ def by_me(self):
|
|||
return os.path.exists(self.unique_name)
|
||||
|
||||
def kill_stale_lock(self):
|
||||
for name in os.listdir(self.path):
|
||||
try:
|
||||
host_pid, thread_str = name.rsplit('-', 1)
|
||||
host, pid_str = host_pid.rsplit('.', 1)
|
||||
pid = int(pid_str)
|
||||
thread = int(thread_str)
|
||||
except ValueError:
|
||||
# Malformed lock name? Or just some new format we don't understand?
|
||||
logger.error("Found malformed lock %s in %s. Please check/fix manually.", name, self.path)
|
||||
return False
|
||||
try:
|
||||
names = os.listdir(self.path)
|
||||
except FileNotFoundError: # another process did our job in the meantime.
|
||||
pass
|
||||
else:
|
||||
for name in names:
|
||||
try:
|
||||
host_pid, thread_str = name.rsplit('-', 1)
|
||||
host, pid_str = host_pid.rsplit('.', 1)
|
||||
pid = int(pid_str)
|
||||
thread = int(thread_str)
|
||||
except ValueError:
|
||||
# Malformed lock name? Or just some new format we don't understand?
|
||||
logger.error("Found malformed lock %s in %s. Please check/fix manually.", name, self.path)
|
||||
return False
|
||||
|
||||
if platform.process_alive(host, pid, thread):
|
||||
return False
|
||||
if platform.process_alive(host, pid, thread):
|
||||
return False
|
||||
|
||||
if not self.kill_stale_locks:
|
||||
if not self.stale_warning_printed:
|
||||
# Log this at warning level to hint the user at the ability
|
||||
logger.warning("Found stale lock %s, but not deleting because self.kill_stale_locks = False.", name)
|
||||
self.stale_warning_printed = True
|
||||
return False
|
||||
if not self.kill_stale_locks:
|
||||
if not self.stale_warning_printed:
|
||||
# Log this at warning level to hint the user at the ability
|
||||
logger.warning("Found stale lock %s, but not deleting because self.kill_stale_locks = False.", name)
|
||||
self.stale_warning_printed = True
|
||||
return False
|
||||
|
||||
try:
|
||||
os.unlink(os.path.join(self.path, name))
|
||||
logger.warning('Killed stale lock %s.', name)
|
||||
except OSError as err:
|
||||
if not self.stale_warning_printed:
|
||||
# This error will bubble up and likely result in locking failure
|
||||
logger.error('Found stale lock %s, but cannot delete due to %s', name, str(err))
|
||||
self.stale_warning_printed = True
|
||||
return False
|
||||
try:
|
||||
os.unlink(os.path.join(self.path, name))
|
||||
logger.warning('Killed stale lock %s.', name)
|
||||
except OSError as err:
|
||||
if not self.stale_warning_printed:
|
||||
# This error will bubble up and likely result in locking failure
|
||||
logger.error('Found stale lock %s, but cannot delete due to %s', name, str(err))
|
||||
self.stale_warning_printed = True
|
||||
return False
|
||||
|
||||
try:
|
||||
os.rmdir(self.path)
|
||||
except OSError as err:
|
||||
if err.errno == errno.ENOTEMPTY:
|
||||
# Directory is not empty = we lost the race to somebody else
|
||||
if err.errno == errno.ENOTEMPTY or err.errno == errno.ENOENT:
|
||||
# Directory is not empty or doesn't exist any more = we lost the race to somebody else--which is ok.
|
||||
return False
|
||||
# EACCES or EIO or ... = we cannot operate anyway
|
||||
logger.error('Failed to remove lock dir: %s', str(err))
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
import random
|
||||
import time
|
||||
from threading import Thread, Lock as ThreadingLock
|
||||
from traceback import format_exc
|
||||
|
||||
import pytest
|
||||
|
||||
|
@ -10,6 +12,8 @@
|
|||
|
||||
ID1 = "foo", 1, 1
|
||||
ID2 = "bar", 2, 2
|
||||
RACE_TEST_NUM_THREADS = 40
|
||||
RACE_TEST_DURATION = 0.4 # seconds
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
|
@ -90,6 +94,78 @@ def test_migrate_lock(self, lockpath):
|
|||
assert lock.by_me() # we still have the lock
|
||||
assert old_unique_name != new_unique_name # locking filename is different now
|
||||
|
||||
def test_race_condition(self, lockpath):
|
||||
|
||||
class SynchronizedCounter:
|
||||
|
||||
def __init__(self, count=0):
|
||||
self.lock = ThreadingLock()
|
||||
self.count = count
|
||||
self.maxcount = count
|
||||
|
||||
def value(self):
|
||||
with self.lock:
|
||||
return self.count
|
||||
|
||||
def maxvalue(self):
|
||||
with self.lock:
|
||||
return self.maxcount
|
||||
|
||||
def incr(self):
|
||||
with self.lock:
|
||||
self.count += 1
|
||||
if self.count > self.maxcount:
|
||||
self.maxcount = self.count
|
||||
return self.count
|
||||
|
||||
def decr(self):
|
||||
with self.lock:
|
||||
self.count -= 1
|
||||
return self.count
|
||||
|
||||
def print_locked(msg):
|
||||
with print_lock:
|
||||
print(msg)
|
||||
|
||||
def acquire_release_loop(id, timeout, thread_id, lock_owner_counter, exception_counter, print_lock, last_thread=None):
|
||||
print_locked("Thread %2d: Starting acquire_release_loop(id=%s, timeout=%d); lockpath=%s" % (thread_id, id, timeout, lockpath))
|
||||
timer = TimeoutTimer(timeout, -1).start()
|
||||
cycle = 0
|
||||
|
||||
while not timer.timed_out():
|
||||
cycle += 1
|
||||
try:
|
||||
with ExclusiveLock(lockpath, id=id, timeout=timeout/20, sleep=-1): # This timeout is only for not exceeding the given timeout by more than 5%. With sleep<0 it's constantly polling anyway.
|
||||
lock_owner_count = lock_owner_counter.incr()
|
||||
print_locked("Thread %2d: Acquired the lock. It's my %d. loop cycle. I am the %d. who has the lock concurrently." % (thread_id, cycle, lock_owner_count))
|
||||
time.sleep(0.005)
|
||||
lock_owner_count = lock_owner_counter.decr()
|
||||
print_locked("Thread %2d: Releasing the lock, finishing my %d. loop cycle. Currently, %d colleagues still have the lock." % (thread_id, cycle, lock_owner_count))
|
||||
except LockTimeout:
|
||||
print_locked("Thread %2d: Got LockTimeout, finishing my %d. loop cycle." % (thread_id, cycle))
|
||||
except:
|
||||
exception_count = exception_counter.incr()
|
||||
e = format_exc()
|
||||
print_locked("Thread %2d: Exception thrown, finishing my %d. loop cycle. It's the %d. exception seen until now: %s" % (thread_id, cycle, exception_count, e))
|
||||
|
||||
print_locked("Thread %2d: Loop timed out--terminating after %d loop cycles." % (thread_id, cycle))
|
||||
if last_thread is not None: # joining its predecessor, if any
|
||||
last_thread.join()
|
||||
|
||||
print('')
|
||||
lock_owner_counter = SynchronizedCounter()
|
||||
exception_counter = SynchronizedCounter()
|
||||
print_lock = ThreadingLock()
|
||||
thread = None
|
||||
for thread_id in range(RACE_TEST_NUM_THREADS):
|
||||
thread = Thread(target=acquire_release_loop, args=(('foo', thread_id, 0), RACE_TEST_DURATION, thread_id, lock_owner_counter, exception_counter, print_lock, thread))
|
||||
thread.start()
|
||||
thread.join() # joining the last thread
|
||||
|
||||
assert lock_owner_counter.maxvalue() > 0, 'Never gained the lock? Something went wrong here...'
|
||||
assert lock_owner_counter.maxvalue() <= 1, "Maximal number of concurrent lock holders was %d. So exclusivity is broken." % (lock_owner_counter.maxvalue())
|
||||
assert exception_counter.value() == 0, "ExclusiveLock threw %d exceptions due to unclean concurrency handling." % (exception_counter.value())
|
||||
|
||||
|
||||
class TestLock:
|
||||
def test_shared(self, lockpath):
|
||||
|
|
Loading…
Reference in a new issue