borg/src/borg/locking.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

485 lines
17 KiB
Python
Raw Permalink Normal View History

import errno
import json
import os
import tempfile
import time
from . import platform
from .helpers import Error, ErrorWithTraceback
2016-10-01 16:33:51 +00:00
from .logger import create_logger
ADD, REMOVE, REMOVE2 = "add", "remove", "remove2"
SHARED, EXCLUSIVE = "shared", "exclusive"
2016-10-01 16:33:51 +00:00
logger = create_logger(__name__)
class TimeoutTimer:
"""
A timer for timeout checks (can also deal with "never timeout").
It can also compute and optionally execute a reasonable sleep time (e.g. to avoid
polling too often or to support thread/process rescheduling).
"""
def __init__(self, timeout=None, sleep=None):
"""
Initialize a timer.
:param timeout: time out interval [s] or None (never timeout, wait forever) [default]
:param sleep: sleep interval [s] (>= 0: do sleep call, <0: don't call sleep)
or None (autocompute: use 10% of timeout [but not more than 60s],
or 1s for "never timeout" mode)
"""
if timeout is not None and timeout < 0:
raise ValueError("timeout must be >= 0")
self.timeout_interval = timeout
if sleep is None:
if timeout is None:
sleep = 1.0
else:
sleep = min(60.0, timeout / 10.0)
self.sleep_interval = sleep
self.start_time = None
self.end_time = None
def __repr__(self):
2022-02-27 18:31:33 +00:00
return "<{}: start={!r} end={!r} timeout={!r} sleep={!r}>".format(
self.__class__.__name__, self.start_time, self.end_time, self.timeout_interval, self.sleep_interval
)
def start(self):
self.start_time = time.time()
if self.timeout_interval is not None:
self.end_time = self.start_time + self.timeout_interval
return self
def sleep(self):
if self.sleep_interval >= 0:
time.sleep(self.sleep_interval)
def timed_out(self):
return self.end_time is not None and time.time() >= self.end_time
def timed_out_or_sleep(self):
if self.timed_out():
return True
else:
self.sleep()
return False
class LockError(Error):
"""Failed to acquire the lock {}."""
exit_mcode = 70
class LockErrorT(ErrorWithTraceback):
"""Failed to acquire the lock {}."""
exit_mcode = 71
class LockFailed(LockErrorT):
"""Failed to create/acquire the lock {} ({})."""
exit_mcode = 72
class LockTimeout(LockError):
"""Failed to create/acquire the lock {} (timeout)."""
exit_mcode = 73
class NotLocked(LockErrorT):
"""Failed to release the lock {} (was not locked)."""
exit_mcode = 74
class NotMyLock(LockErrorT):
"""Failed to release the lock {} (was/is locked, but not by me)."""
exit_mcode = 75
class ExclusiveLock:
"""An exclusive Lock based on mkdir fs operation being atomic.
If possible, try to use the contextmanager here like::
with ExclusiveLock(...) as lock:
...
This makes sure the lock is released again if the block is left, no
matter how (e.g. if an exception occurred).
"""
def __init__(self, path, timeout=None, sleep=None, id=None):
self.timeout = timeout
self.sleep = sleep
self.path = os.path.abspath(path)
self.id = id or platform.get_process_id()
2016-01-30 20:32:45 +00:00
self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id)
self.kill_stale_locks = True
self.stale_warning_printed = False
def __enter__(self):
return self.acquire()
def __exit__(self, *exc):
self.release()
def __repr__(self):
2022-02-27 18:31:33 +00:00
return f"<{self.__class__.__name__}: {self.unique_name!r}>"
def acquire(self, timeout=None, sleep=None):
if timeout is None:
timeout = self.timeout
if sleep is None:
sleep = self.sleep
parent_path, base_name = os.path.split(self.path)
unique_base_name = os.path.basename(self.unique_name)
temp_path = None
try:
temp_path = tempfile.mkdtemp(".tmp", base_name + ".", parent_path)
temp_unique_name = os.path.join(temp_path, unique_base_name)
with open(temp_unique_name, "wb"):
pass
except OSError as err:
raise LockFailed(self.path, str(err)) from None
else:
timer = TimeoutTimer(timeout, sleep).start()
while True:
try:
2023-01-13 18:17:48 +00:00
os.replace(temp_path, self.path)
except OSError: # already locked
if self.by_me():
return self
self.kill_stale_lock()
if timer.timed_out_or_sleep():
raise LockTimeout(self.path) from None
else:
temp_path = None # see finally:-block below
return self
finally:
if temp_path is not None:
# Renaming failed for some reason, so temp_dir still exists and
# should be cleaned up anyway. Try to clean up, but don't crash.
try:
os.unlink(temp_unique_name)
except: # noqa
pass
try:
os.rmdir(temp_path)
except: # noqa
pass
def release(self):
if not self.is_locked():
raise NotLocked(self.path)
if not self.by_me():
raise NotMyLock(self.path)
os.unlink(self.unique_name)
2023-01-18 12:42:39 +00:00
for retry in range(42):
try:
os.rmdir(self.path)
except OSError as err:
if err.errno in (errno.EACCES,):
# windows behaving strangely? -> just try again.
continue
if err.errno not in (errno.ENOTEMPTY, errno.EEXIST, errno.ENOENT):
# EACCES or EIO or ... = we cannot operate anyway, so re-throw
raise err
# else:
# Directory is not empty or doesn't exist any more.
# this means we lost the race to somebody else -- which is ok.
return
def is_locked(self):
return os.path.exists(self.path)
def by_me(self):
return os.path.exists(self.unique_name)
def kill_stale_lock(self):
try:
names = os.listdir(self.path)
except FileNotFoundError: # another process did our job in the meantime.
return False
except PermissionError: # win32 might throw this.
return False
else:
for name in names:
try:
host_pid, thread_str = name.rsplit("-", 1)
host, pid_str = host_pid.rsplit(".", 1)
pid = int(pid_str)
thread = int(thread_str, 16)
except ValueError:
# Malformed lock name? Or just some new format we don't understand?
logger.error("Found malformed lock %s in %s. Please check/fix manually.", name, self.path)
return False
if platform.process_alive(host, pid, thread):
return False
if not self.kill_stale_locks:
if not self.stale_warning_printed:
# Log this at warning level to hint the user at the ability
logger.warning(
"Found stale lock %s, but not deleting because self.kill_stale_locks = False.", name
)
self.stale_warning_printed = True
return False
try:
os.unlink(os.path.join(self.path, name))
logger.warning("Killed stale lock %s.", name)
except OSError as err:
if not self.stale_warning_printed:
# This error will bubble up and likely result in locking failure
logger.error("Found stale lock %s, but cannot delete due to %s", name, str(err))
self.stale_warning_printed = True
return False
try:
os.rmdir(self.path)
except OSError as err:
if err.errno in (errno.ENOTEMPTY, errno.EEXIST, errno.ENOENT):
# Directory is not empty or doesn't exist any more = we lost the race to somebody else--which is ok.
return False
# EACCES or EIO or ... = we cannot operate anyway
logger.error("Failed to remove lock dir: %s", str(err))
return False
return True
def break_lock(self):
if self.is_locked():
for name in os.listdir(self.path):
os.unlink(os.path.join(self.path, name))
os.rmdir(self.path)
def migrate_lock(self, old_id, new_id):
"""migrate the lock ownership from old_id to new_id"""
assert self.id == old_id
new_unique_name = os.path.join(self.path, "%s.%d-%x" % new_id)
if self.is_locked() and self.by_me():
with open(new_unique_name, "wb"):
pass
os.unlink(self.unique_name)
self.id, self.unique_name = new_id, new_unique_name
class LockRoster:
"""
A Lock Roster to track shared/exclusive lockers.
Note: you usually should call the methods with an exclusive lock held,
to avoid conflicting access by multiple threads/processes/machines.
"""
def __init__(self, path, id=None):
self.path = path
self.id = id or platform.get_process_id()
self.kill_stale_locks = True
def load(self):
try:
with open(self.path) as f:
data = json.load(f)
# Just nuke the stale locks early on load
if self.kill_stale_locks:
for key in (SHARED, EXCLUSIVE):
try:
entries = data[key]
except KeyError:
continue
elements = set()
for host, pid, thread in entries:
if platform.process_alive(host, pid, thread):
elements.add((host, pid, thread))
else:
logger.warning(
"Removed stale %s roster lock for host %s pid %d thread %d.", key, host, pid, thread
)
data[key] = list(elements)
except (FileNotFoundError, ValueError):
# no or corrupt/empty roster file?
data = {}
return data
def save(self, data):
with open(self.path, "w") as f:
json.dump(data, f)
def remove(self):
try:
os.unlink(self.path)
except FileNotFoundError:
pass
def get(self, key):
roster = self.load()
2022-02-27 18:31:33 +00:00
return {tuple(e) for e in roster.get(key, [])}
def empty(self, *keys):
return all(not self.get(key) for key in keys)
def modify(self, key, op):
roster = self.load()
try:
2022-02-27 18:31:33 +00:00
elements = {tuple(e) for e in roster[key]}
except KeyError:
elements = set()
if op == ADD:
elements.add(self.id)
elif op == REMOVE:
# note: we ignore it if the element is already not present anymore.
# this has been frequently seen in teardowns involving Repository.__del__ and Repository.__exit__.
elements.discard(self.id)
elif op == REMOVE2:
# needed for callers that do not want to ignore.
elements.remove(self.id)
else:
raise ValueError("Unknown LockRoster op %r" % op)
roster[key] = list(list(e) for e in elements)
self.save(roster)
def migrate_lock(self, key, old_id, new_id):
"""migrate the lock ownership from old_id to new_id"""
assert self.id == old_id
# need to switch off stale lock killing temporarily as we want to
# migrate rather than kill them (at least the one made by old_id).
killing, self.kill_stale_locks = self.kill_stale_locks, False
try:
try:
self.modify(key, REMOVE2)
except KeyError:
# entry was not there, so no need to add a new one, but still update our id
self.id = new_id
else:
# old entry removed, update our id and add a updated entry
self.id = new_id
self.modify(key, ADD)
finally:
self.kill_stale_locks = killing
class Lock:
"""
A Lock for a resource that can be accessed in a shared or exclusive way.
Typically, write access to a resource needs an exclusive lock (1 writer,
2020-12-26 20:59:10 +00:00
no one is allowed reading) and read access to a resource needs a shared
lock (multiple readers are allowed).
If possible, try to use the contextmanager here like::
with Lock(...) as lock:
...
This makes sure the lock is released again if the block is left, no
matter how (e.g. if an exception occurred).
"""
def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None):
self.path = path
self.is_exclusive = exclusive
self.sleep = sleep
self.timeout = timeout
self.id = id or platform.get_process_id()
# globally keeping track of shared and exclusive lockers:
self._roster = LockRoster(path + ".roster", id=id)
# an exclusive lock, used for:
# - holding while doing roster queries / updates
# - holding while the Lock itself is exclusive
self._lock = ExclusiveLock(path + ".exclusive", id=id, timeout=timeout)
def __enter__(self):
return self.acquire()
def __exit__(self, *exc):
self.release()
def __repr__(self):
2022-02-27 18:31:33 +00:00
return f"<{self.__class__.__name__}: {self.id!r}>"
def acquire(self, exclusive=None, remove=None, sleep=None):
if exclusive is None:
exclusive = self.is_exclusive
sleep = sleep or self.sleep or 0.2
if exclusive:
self._wait_for_readers_finishing(remove, sleep)
self._roster.modify(EXCLUSIVE, ADD)
else:
with self._lock:
if remove is not None:
self._roster.modify(remove, REMOVE)
self._roster.modify(SHARED, ADD)
self.is_exclusive = exclusive
return self
def _wait_for_readers_finishing(self, remove, sleep):
timer = TimeoutTimer(self.timeout, sleep).start()
while True:
self._lock.acquire()
try:
if remove is not None:
self._roster.modify(remove, REMOVE)
if len(self._roster.get(SHARED)) == 0:
return # we are the only one and we keep the lock!
# restore the roster state as before (undo the roster change):
if remove is not None:
self._roster.modify(remove, ADD)
except: # noqa
# avoid orphan lock when an exception happens here, e.g. Ctrl-C!
self._lock.release()
raise
else:
self._lock.release()
if timer.timed_out_or_sleep():
raise LockTimeout(self.path)
def release(self):
if self.is_exclusive:
self._roster.modify(EXCLUSIVE, REMOVE)
if self._roster.empty(EXCLUSIVE, SHARED):
self._roster.remove()
self._lock.release()
else:
with self._lock:
self._roster.modify(SHARED, REMOVE)
if self._roster.empty(EXCLUSIVE, SHARED):
self._roster.remove()
def upgrade(self):
2016-07-23 11:58:19 +00:00
# WARNING: if multiple read-lockers want to upgrade, it will deadlock because they
# all will wait until the other read locks go away - and that won't happen.
if not self.is_exclusive:
self.acquire(exclusive=True, remove=SHARED)
def downgrade(self):
if self.is_exclusive:
self.acquire(exclusive=False, remove=EXCLUSIVE)
2016-07-23 11:58:19 +00:00
def got_exclusive_lock(self):
return self.is_exclusive and self._lock.is_locked() and self._lock.by_me()
def break_lock(self):
self._roster.remove()
self._lock.break_lock()
def migrate_lock(self, old_id, new_id):
assert self.id == old_id
self.id = new_id
if self.is_exclusive:
self._lock.migrate_lock(old_id, new_id)
self._roster.migrate_lock(EXCLUSIVE, old_id, new_id)
else:
with self._lock:
self._lock.migrate_lock(old_id, new_id)
self._roster.migrate_lock(SHARED, old_id, new_id)