mirror of https://github.com/borgbackup/borg.git
465 lines
16 KiB
Python
465 lines
16 KiB
Python
import errno
|
|
import json
|
|
import os
|
|
import tempfile
|
|
import time
|
|
|
|
from . import platform
|
|
from .helpers import Error, ErrorWithTraceback
|
|
from .logger import create_logger
|
|
|
|
ADD, REMOVE, REMOVE2 = 'add', 'remove', 'remove2'
|
|
SHARED, EXCLUSIVE = 'shared', 'exclusive'
|
|
|
|
logger = create_logger(__name__)
|
|
|
|
|
|
class TimeoutTimer:
|
|
"""
|
|
A timer for timeout checks (can also deal with "never timeout").
|
|
It can also compute and optionally execute a reasonable sleep time (e.g. to avoid
|
|
polling too often or to support thread/process rescheduling).
|
|
"""
|
|
def __init__(self, timeout=None, sleep=None):
|
|
"""
|
|
Initialize a timer.
|
|
|
|
:param timeout: time out interval [s] or None (never timeout, wait forever) [default]
|
|
:param sleep: sleep interval [s] (>= 0: do sleep call, <0: don't call sleep)
|
|
or None (autocompute: use 10% of timeout [but not more than 60s],
|
|
or 1s for "never timeout" mode)
|
|
"""
|
|
if timeout is not None and timeout < 0:
|
|
raise ValueError("timeout must be >= 0")
|
|
self.timeout_interval = timeout
|
|
if sleep is None:
|
|
if timeout is None:
|
|
sleep = 1.0
|
|
else:
|
|
sleep = min(60.0, timeout / 10.0)
|
|
self.sleep_interval = sleep
|
|
self.start_time = None
|
|
self.end_time = None
|
|
|
|
def __repr__(self):
|
|
return "<{}: start={!r} end={!r} timeout={!r} sleep={!r}>".format(
|
|
self.__class__.__name__, self.start_time, self.end_time,
|
|
self.timeout_interval, self.sleep_interval)
|
|
|
|
def start(self):
|
|
self.start_time = time.time()
|
|
if self.timeout_interval is not None:
|
|
self.end_time = self.start_time + self.timeout_interval
|
|
return self
|
|
|
|
def sleep(self):
|
|
if self.sleep_interval >= 0:
|
|
time.sleep(self.sleep_interval)
|
|
|
|
def timed_out(self):
|
|
return self.end_time is not None and time.time() >= self.end_time
|
|
|
|
def timed_out_or_sleep(self):
|
|
if self.timed_out():
|
|
return True
|
|
else:
|
|
self.sleep()
|
|
return False
|
|
|
|
|
|
class LockError(Error):
|
|
"""Failed to acquire the lock {}."""
|
|
exit_mcode = 70
|
|
|
|
|
|
class LockErrorT(ErrorWithTraceback):
|
|
"""Failed to acquire the lock {}."""
|
|
exit_mcode = 71
|
|
|
|
|
|
class LockFailed(LockErrorT):
|
|
"""Failed to create/acquire the lock {} ({})."""
|
|
exit_mcode = 72
|
|
|
|
|
|
class LockTimeout(LockError):
|
|
"""Failed to create/acquire the lock {} (timeout)."""
|
|
exit_mcode = 73
|
|
|
|
|
|
class NotLocked(LockErrorT):
|
|
"""Failed to release the lock {} (was not locked)."""
|
|
exit_mcode = 74
|
|
|
|
|
|
class NotMyLock(LockErrorT):
|
|
"""Failed to release the lock {} (was/is locked, but not by me)."""
|
|
exit_mcode = 75
|
|
|
|
|
|
class ExclusiveLock:
|
|
"""An exclusive Lock based on mkdir fs operation being atomic.
|
|
|
|
If possible, try to use the contextmanager here like::
|
|
|
|
with ExclusiveLock(...) as lock:
|
|
...
|
|
|
|
This makes sure the lock is released again if the block is left, no
|
|
matter how (e.g. if an exception occurred).
|
|
"""
|
|
def __init__(self, path, timeout=None, sleep=None, id=None):
|
|
self.timeout = timeout
|
|
self.sleep = sleep
|
|
self.path = os.path.abspath(path)
|
|
self.id = id or platform.get_process_id()
|
|
self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id)
|
|
self.kill_stale_locks = True
|
|
self.stale_warning_printed = False
|
|
|
|
def __enter__(self):
|
|
return self.acquire()
|
|
|
|
def __exit__(self, *exc):
|
|
self.release()
|
|
|
|
def __repr__(self):
|
|
return f"<{self.__class__.__name__}: {self.unique_name!r}>"
|
|
|
|
def acquire(self, timeout=None, sleep=None):
|
|
if timeout is None:
|
|
timeout = self.timeout
|
|
if sleep is None:
|
|
sleep = self.sleep
|
|
parent_path, base_name = os.path.split(self.path)
|
|
unique_base_name = os.path.basename(self.unique_name)
|
|
temp_path = None
|
|
try:
|
|
temp_path = tempfile.mkdtemp(".tmp", base_name + '.', parent_path)
|
|
temp_unique_name = os.path.join(temp_path, unique_base_name)
|
|
with open(temp_unique_name, "wb"):
|
|
pass
|
|
except OSError as err:
|
|
raise LockFailed(self.path, str(err)) from None
|
|
else:
|
|
timer = TimeoutTimer(timeout, sleep).start()
|
|
while True:
|
|
try:
|
|
os.replace(temp_path, self.path)
|
|
except OSError: # already locked
|
|
if self.by_me():
|
|
return self
|
|
self.kill_stale_lock()
|
|
if timer.timed_out_or_sleep():
|
|
raise LockTimeout(self.path) from None
|
|
else:
|
|
temp_path = None # see finally:-block below
|
|
return self
|
|
finally:
|
|
if temp_path is not None:
|
|
# Renaming failed for some reason, so temp_dir still exists and
|
|
# should be cleaned up anyway. Try to clean up, but don't crash.
|
|
try:
|
|
os.unlink(temp_unique_name)
|
|
except:
|
|
pass
|
|
try:
|
|
os.rmdir(temp_path)
|
|
except:
|
|
pass
|
|
|
|
def release(self):
|
|
if not self.is_locked():
|
|
raise NotLocked(self.path)
|
|
if not self.by_me():
|
|
raise NotMyLock(self.path)
|
|
os.unlink(self.unique_name)
|
|
try:
|
|
os.rmdir(self.path)
|
|
except OSError as err:
|
|
if err.errno not in (errno.ENOTEMPTY, errno.EEXIST, errno.ENOENT):
|
|
# EACCES or EIO or ... = we cannot operate anyway, so re-throw
|
|
raise err
|
|
# else:
|
|
# Directory is not empty or doesn't exist any more.
|
|
# this means we lost the race to somebody else -- which is ok.
|
|
|
|
def is_locked(self):
|
|
return os.path.exists(self.path)
|
|
|
|
def by_me(self):
|
|
return os.path.exists(self.unique_name)
|
|
|
|
def kill_stale_lock(self):
|
|
try:
|
|
names = os.listdir(self.path)
|
|
except FileNotFoundError: # another process did our job in the meantime.
|
|
pass
|
|
else:
|
|
for name in names:
|
|
try:
|
|
host_pid, thread_str = name.rsplit('-', 1)
|
|
host, pid_str = host_pid.rsplit('.', 1)
|
|
pid = int(pid_str)
|
|
thread = int(thread_str, 16)
|
|
except ValueError:
|
|
# Malformed lock name? Or just some new format we don't understand?
|
|
logger.error("Found malformed lock %s in %s. Please check/fix manually.", name, self.path)
|
|
return False
|
|
|
|
if platform.process_alive(host, pid, thread):
|
|
return False
|
|
|
|
if not self.kill_stale_locks:
|
|
if not self.stale_warning_printed:
|
|
# Log this at warning level to hint the user at the ability
|
|
logger.warning("Found stale lock %s, but not deleting because self.kill_stale_locks = False.", name)
|
|
self.stale_warning_printed = True
|
|
return False
|
|
|
|
try:
|
|
os.unlink(os.path.join(self.path, name))
|
|
logger.warning('Killed stale lock %s.', name)
|
|
except OSError as err:
|
|
if not self.stale_warning_printed:
|
|
# This error will bubble up and likely result in locking failure
|
|
logger.error('Found stale lock %s, but cannot delete due to %s', name, str(err))
|
|
self.stale_warning_printed = True
|
|
return False
|
|
|
|
try:
|
|
os.rmdir(self.path)
|
|
except OSError as err:
|
|
if err.errno in (errno.ENOTEMPTY, errno.EEXIST, errno.ENOENT):
|
|
# Directory is not empty or doesn't exist any more = we lost the race to somebody else--which is ok.
|
|
return False
|
|
# EACCES or EIO or ... = we cannot operate anyway
|
|
logger.error('Failed to remove lock dir: %s', str(err))
|
|
return False
|
|
|
|
return True
|
|
|
|
def break_lock(self):
|
|
if self.is_locked():
|
|
for name in os.listdir(self.path):
|
|
os.unlink(os.path.join(self.path, name))
|
|
os.rmdir(self.path)
|
|
|
|
def migrate_lock(self, old_id, new_id):
|
|
"""migrate the lock ownership from old_id to new_id"""
|
|
assert self.id == old_id
|
|
new_unique_name = os.path.join(self.path, "%s.%d-%x" % new_id)
|
|
if self.is_locked() and self.by_me():
|
|
with open(new_unique_name, "wb"):
|
|
pass
|
|
os.unlink(self.unique_name)
|
|
self.id, self.unique_name = new_id, new_unique_name
|
|
|
|
|
|
class LockRoster:
|
|
"""
|
|
A Lock Roster to track shared/exclusive lockers.
|
|
|
|
Note: you usually should call the methods with an exclusive lock held,
|
|
to avoid conflicting access by multiple threads/processes/machines.
|
|
"""
|
|
def __init__(self, path, id=None):
|
|
self.path = path
|
|
self.id = id or platform.get_process_id()
|
|
self.kill_stale_locks = True
|
|
|
|
def load(self):
|
|
try:
|
|
with open(self.path) as f:
|
|
data = json.load(f)
|
|
|
|
# Just nuke the stale locks early on load
|
|
if self.kill_stale_locks:
|
|
for key in (SHARED, EXCLUSIVE):
|
|
try:
|
|
entries = data[key]
|
|
except KeyError:
|
|
continue
|
|
elements = set()
|
|
for host, pid, thread in entries:
|
|
if platform.process_alive(host, pid, thread):
|
|
elements.add((host, pid, thread))
|
|
else:
|
|
logger.warning('Removed stale %s roster lock for host %s pid %d thread %d.',
|
|
key, host, pid, thread)
|
|
data[key] = list(elements)
|
|
except (FileNotFoundError, ValueError):
|
|
# no or corrupt/empty roster file?
|
|
data = {}
|
|
return data
|
|
|
|
def save(self, data):
|
|
with open(self.path, "w") as f:
|
|
json.dump(data, f)
|
|
|
|
def remove(self):
|
|
try:
|
|
os.unlink(self.path)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
def get(self, key):
|
|
roster = self.load()
|
|
return {tuple(e) for e in roster.get(key, [])}
|
|
|
|
def empty(self, *keys):
|
|
return all(not self.get(key) for key in keys)
|
|
|
|
def modify(self, key, op):
|
|
roster = self.load()
|
|
try:
|
|
elements = {tuple(e) for e in roster[key]}
|
|
except KeyError:
|
|
elements = set()
|
|
if op == ADD:
|
|
elements.add(self.id)
|
|
elif op == REMOVE:
|
|
# note: we ignore it if the element is already not present anymore.
|
|
# this has been frequently seen in teardowns involving Repository.__del__ and Repository.__exit__.
|
|
elements.discard(self.id)
|
|
elif op == REMOVE2:
|
|
# needed for callers that do not want to ignore.
|
|
elements.remove(self.id)
|
|
else:
|
|
raise ValueError('Unknown LockRoster op %r' % op)
|
|
roster[key] = list(list(e) for e in elements)
|
|
self.save(roster)
|
|
|
|
def migrate_lock(self, key, old_id, new_id):
|
|
"""migrate the lock ownership from old_id to new_id"""
|
|
assert self.id == old_id
|
|
# need to temporarily switch off stale lock killing as we want to
|
|
# rather migrate than kill them (at least the one made by old_id).
|
|
killing, self.kill_stale_locks = self.kill_stale_locks, False
|
|
try:
|
|
try:
|
|
self.modify(key, REMOVE2)
|
|
except KeyError:
|
|
# entry was not there, so no need to add a new one, but still update our id
|
|
self.id = new_id
|
|
else:
|
|
# old entry removed, update our id and add a updated entry
|
|
self.id = new_id
|
|
self.modify(key, ADD)
|
|
finally:
|
|
self.kill_stale_locks = killing
|
|
|
|
|
|
class Lock:
|
|
"""
|
|
A Lock for a resource that can be accessed in a shared or exclusive way.
|
|
Typically, write access to a resource needs an exclusive lock (1 writer,
|
|
no one is allowed reading) and read access to a resource needs a shared
|
|
lock (multiple readers are allowed).
|
|
|
|
If possible, try to use the contextmanager here like::
|
|
|
|
with Lock(...) as lock:
|
|
...
|
|
|
|
This makes sure the lock is released again if the block is left, no
|
|
matter how (e.g. if an exception occurred).
|
|
"""
|
|
def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None):
|
|
self.path = path
|
|
self.is_exclusive = exclusive
|
|
self.sleep = sleep
|
|
self.timeout = timeout
|
|
self.id = id or platform.get_process_id()
|
|
# globally keeping track of shared and exclusive lockers:
|
|
self._roster = LockRoster(path + '.roster', id=id)
|
|
# an exclusive lock, used for:
|
|
# - holding while doing roster queries / updates
|
|
# - holding while the Lock itself is exclusive
|
|
self._lock = ExclusiveLock(path + '.exclusive', id=id, timeout=timeout)
|
|
|
|
def __enter__(self):
|
|
return self.acquire()
|
|
|
|
def __exit__(self, *exc):
|
|
self.release()
|
|
|
|
def __repr__(self):
|
|
return f"<{self.__class__.__name__}: {self.id!r}>"
|
|
|
|
def acquire(self, exclusive=None, remove=None, sleep=None):
|
|
if exclusive is None:
|
|
exclusive = self.is_exclusive
|
|
sleep = sleep or self.sleep or 0.2
|
|
if exclusive:
|
|
self._wait_for_readers_finishing(remove, sleep)
|
|
self._roster.modify(EXCLUSIVE, ADD)
|
|
else:
|
|
with self._lock:
|
|
if remove is not None:
|
|
self._roster.modify(remove, REMOVE)
|
|
self._roster.modify(SHARED, ADD)
|
|
self.is_exclusive = exclusive
|
|
return self
|
|
|
|
def _wait_for_readers_finishing(self, remove, sleep):
|
|
timer = TimeoutTimer(self.timeout, sleep).start()
|
|
while True:
|
|
self._lock.acquire()
|
|
try:
|
|
if remove is not None:
|
|
self._roster.modify(remove, REMOVE)
|
|
if len(self._roster.get(SHARED)) == 0:
|
|
return # we are the only one and we keep the lock!
|
|
# restore the roster state as before (undo the roster change):
|
|
if remove is not None:
|
|
self._roster.modify(remove, ADD)
|
|
except:
|
|
# avoid orphan lock when an exception happens here, e.g. Ctrl-C!
|
|
self._lock.release()
|
|
raise
|
|
else:
|
|
self._lock.release()
|
|
if timer.timed_out_or_sleep():
|
|
raise LockTimeout(self.path)
|
|
|
|
def release(self):
|
|
if self.is_exclusive:
|
|
self._roster.modify(EXCLUSIVE, REMOVE)
|
|
if self._roster.empty(EXCLUSIVE, SHARED):
|
|
self._roster.remove()
|
|
self._lock.release()
|
|
else:
|
|
with self._lock:
|
|
self._roster.modify(SHARED, REMOVE)
|
|
if self._roster.empty(EXCLUSIVE, SHARED):
|
|
self._roster.remove()
|
|
|
|
def upgrade(self):
|
|
# WARNING: if multiple read-lockers want to upgrade, it will deadlock because they
|
|
# all will wait until the other read locks go away - and that won't happen.
|
|
if not self.is_exclusive:
|
|
self.acquire(exclusive=True, remove=SHARED)
|
|
|
|
def downgrade(self):
|
|
if self.is_exclusive:
|
|
self.acquire(exclusive=False, remove=EXCLUSIVE)
|
|
|
|
def got_exclusive_lock(self):
|
|
return self.is_exclusive and self._lock.is_locked() and self._lock.by_me()
|
|
|
|
def break_lock(self):
|
|
self._roster.remove()
|
|
self._lock.break_lock()
|
|
|
|
def migrate_lock(self, old_id, new_id):
|
|
assert self.id == old_id
|
|
self.id = new_id
|
|
if self.is_exclusive:
|
|
self._lock.migrate_lock(old_id, new_id)
|
|
self._roster.migrate_lock(EXCLUSIVE, old_id, new_id)
|
|
else:
|
|
with self._lock:
|
|
self._lock.migrate_lock(old_id, new_id)
|
|
self._roster.migrate_lock(SHARED, old_id, new_id)
|