mirror of
https://github.com/borgbackup/borg.git
synced 2024-12-23 16:26:29 +00:00
323 lines
10 KiB
Python
323 lines
10 KiB
Python
import json
|
|
import os
|
|
import socket
|
|
import time
|
|
|
|
from borg.helpers import Error, ErrorWithTraceback
|
|
|
|
ADD, REMOVE = 'add', 'remove'
|
|
SHARED, EXCLUSIVE = 'shared', 'exclusive'
|
|
|
|
# only determine the PID and hostname once.
|
|
# for FUSE mounts, we fork a child process that needs to release
|
|
# the lock made by the parent, so it needs to use the same PID for that.
|
|
_pid = os.getpid()
|
|
_hostname = socket.gethostname()
|
|
|
|
|
|
def get_id():
|
|
"""Get identification tuple for 'us'"""
|
|
thread_id = 0
|
|
return _hostname, _pid, thread_id
|
|
|
|
|
|
class TimeoutTimer:
|
|
"""
|
|
A timer for timeout checks (can also deal with no timeout, give timeout=None [default]).
|
|
It can also compute and optionally execute a reasonable sleep time (e.g. to avoid
|
|
polling too often or to support thread/process rescheduling).
|
|
"""
|
|
def __init__(self, timeout=None, sleep=None):
|
|
"""
|
|
Initialize a timer.
|
|
|
|
:param timeout: time out interval [s] or None (no timeout)
|
|
:param sleep: sleep interval [s] (>= 0: do sleep call, <0: don't call sleep)
|
|
or None (autocompute: use 10% of timeout [but not more than 60s],
|
|
or 1s for no timeout)
|
|
"""
|
|
if timeout is not None and timeout < 0:
|
|
raise ValueError("timeout must be >= 0")
|
|
self.timeout_interval = timeout
|
|
if sleep is None:
|
|
if timeout is None:
|
|
sleep = 1.0
|
|
else:
|
|
sleep = min(60.0, timeout / 10.0)
|
|
self.sleep_interval = sleep
|
|
self.start_time = None
|
|
self.end_time = None
|
|
|
|
def __repr__(self):
|
|
return "<%s: start=%r end=%r timeout=%r sleep=%r>" % (
|
|
self.__class__.__name__, self.start_time, self.end_time,
|
|
self.timeout_interval, self.sleep_interval)
|
|
|
|
def start(self):
|
|
self.start_time = time.time()
|
|
if self.timeout_interval is not None:
|
|
self.end_time = self.start_time + self.timeout_interval
|
|
return self
|
|
|
|
def sleep(self):
|
|
if self.sleep_interval >= 0:
|
|
time.sleep(self.sleep_interval)
|
|
|
|
def timed_out(self):
|
|
return self.end_time is not None and time.time() >= self.end_time
|
|
|
|
def timed_out_or_sleep(self):
|
|
if self.timed_out():
|
|
return True
|
|
else:
|
|
self.sleep()
|
|
return False
|
|
|
|
|
|
class LockError(Error):
|
|
"""Failed to acquire the lock {}."""
|
|
|
|
|
|
class LockErrorT(ErrorWithTraceback):
|
|
"""Failed to acquire the lock {}."""
|
|
|
|
|
|
class LockTimeout(LockError):
|
|
"""Failed to create/acquire the lock {} (timeout)."""
|
|
|
|
|
|
class LockFailed(LockErrorT):
|
|
"""Failed to create/acquire the lock {} ({})."""
|
|
|
|
|
|
class NotLocked(LockErrorT):
|
|
"""Failed to release the lock {} (was not locked)."""
|
|
|
|
|
|
class NotMyLock(LockErrorT):
|
|
"""Failed to release the lock {} (was/is locked, but not by me)."""
|
|
|
|
|
|
class ExclusiveLock:
|
|
"""An exclusive Lock based on mkdir fs operation being atomic.
|
|
|
|
If possible, try to use the contextmanager here like::
|
|
|
|
with ExclusiveLock(...) as lock:
|
|
...
|
|
|
|
This makes sure the lock is released again if the block is left, no
|
|
matter how (e.g. if an exception occurred).
|
|
"""
|
|
def __init__(self, path, timeout=None, sleep=None, id=None):
|
|
self.timeout = timeout
|
|
self.sleep = sleep
|
|
self.path = os.path.abspath(path)
|
|
self.id = id or get_id()
|
|
self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id)
|
|
|
|
def __enter__(self):
|
|
return self.acquire()
|
|
|
|
def __exit__(self, *exc):
|
|
self.release()
|
|
|
|
def __repr__(self):
|
|
return "<%s: %r>" % (self.__class__.__name__, self.unique_name)
|
|
|
|
def acquire(self, timeout=None, sleep=None):
|
|
if timeout is None:
|
|
timeout = self.timeout
|
|
if sleep is None:
|
|
sleep = self.sleep
|
|
timer = TimeoutTimer(timeout, sleep).start()
|
|
while True:
|
|
try:
|
|
os.mkdir(self.path)
|
|
except FileExistsError: # already locked
|
|
if self.by_me():
|
|
return self
|
|
if timer.timed_out_or_sleep():
|
|
raise LockTimeout(self.path)
|
|
except OSError as err:
|
|
raise LockFailed(self.path, str(err)) from None
|
|
else:
|
|
with open(self.unique_name, "wb"):
|
|
pass
|
|
return self
|
|
|
|
def release(self):
|
|
if not self.is_locked():
|
|
raise NotLocked(self.path)
|
|
if not self.by_me():
|
|
raise NotMyLock(self.path)
|
|
os.unlink(self.unique_name)
|
|
os.rmdir(self.path)
|
|
|
|
def is_locked(self):
|
|
return os.path.exists(self.path)
|
|
|
|
def by_me(self):
|
|
return os.path.exists(self.unique_name)
|
|
|
|
def break_lock(self):
|
|
if self.is_locked():
|
|
for name in os.listdir(self.path):
|
|
os.unlink(os.path.join(self.path, name))
|
|
os.rmdir(self.path)
|
|
|
|
|
|
class LockRoster:
|
|
"""
|
|
A Lock Roster to track shared/exclusive lockers.
|
|
|
|
Note: you usually should call the methods with an exclusive lock held,
|
|
to avoid conflicting access by multiple threads/processes/machines.
|
|
"""
|
|
def __init__(self, path, id=None):
|
|
self.path = path
|
|
self.id = id or get_id()
|
|
|
|
def load(self):
|
|
try:
|
|
with open(self.path) as f:
|
|
data = json.load(f)
|
|
except (FileNotFoundError, ValueError):
|
|
# no or corrupt/empty roster file?
|
|
data = {}
|
|
return data
|
|
|
|
def save(self, data):
|
|
with open(self.path, "w") as f:
|
|
json.dump(data, f)
|
|
|
|
def remove(self):
|
|
try:
|
|
os.unlink(self.path)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
def get(self, key):
|
|
roster = self.load()
|
|
return set(tuple(e) for e in roster.get(key, []))
|
|
|
|
def empty(self, *keys):
|
|
return all(not self.get(key) for key in keys)
|
|
|
|
def modify(self, key, op):
|
|
roster = self.load()
|
|
try:
|
|
elements = set(tuple(e) for e in roster[key])
|
|
except KeyError:
|
|
elements = set()
|
|
if op == ADD:
|
|
elements.add(self.id)
|
|
elif op == REMOVE:
|
|
elements.remove(self.id)
|
|
else:
|
|
raise ValueError('Unknown LockRoster op %r' % op)
|
|
roster[key] = list(list(e) for e in elements)
|
|
self.save(roster)
|
|
|
|
|
|
class Lock:
|
|
"""
|
|
A Lock for a resource that can be accessed in a shared or exclusive way.
|
|
Typically, write access to a resource needs an exclusive lock (1 writer,
|
|
noone is allowed reading) and read access to a resource needs a shared
|
|
lock (multiple readers are allowed).
|
|
|
|
If possible, try to use the contextmanager here like::
|
|
|
|
with Lock(...) as lock:
|
|
...
|
|
|
|
This makes sure the lock is released again if the block is left, no
|
|
matter how (e.g. if an exception occurred).
|
|
"""
|
|
def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None):
|
|
self.path = path
|
|
self.is_exclusive = exclusive
|
|
self.sleep = sleep
|
|
self.timeout = timeout
|
|
self.id = id or get_id()
|
|
# globally keeping track of shared and exclusive lockers:
|
|
self._roster = LockRoster(path + '.roster', id=id)
|
|
# an exclusive lock, used for:
|
|
# - holding while doing roster queries / updates
|
|
# - holding while the Lock instance itself is exclusive
|
|
self._lock = ExclusiveLock(path + '.exclusive', id=id, timeout=timeout)
|
|
|
|
def __enter__(self):
|
|
return self.acquire()
|
|
|
|
def __exit__(self, *exc):
|
|
self.release()
|
|
|
|
def __repr__(self):
|
|
return "<%s: %r>" % (self.__class__.__name__, self.id)
|
|
|
|
def acquire(self, exclusive=None, remove=None, sleep=None):
|
|
if exclusive is None:
|
|
exclusive = self.is_exclusive
|
|
sleep = sleep or self.sleep or 0.2
|
|
if exclusive:
|
|
self._wait_for_readers_finishing(remove, sleep)
|
|
self._roster.modify(EXCLUSIVE, ADD)
|
|
else:
|
|
with self._lock:
|
|
if remove is not None:
|
|
self._roster.modify(remove, REMOVE)
|
|
self._roster.modify(SHARED, ADD)
|
|
self.is_exclusive = exclusive
|
|
return self
|
|
|
|
def _wait_for_readers_finishing(self, remove, sleep):
|
|
timer = TimeoutTimer(self.timeout, sleep).start()
|
|
while True:
|
|
self._lock.acquire()
|
|
try:
|
|
if remove is not None:
|
|
self._roster.modify(remove, REMOVE)
|
|
if len(self._roster.get(SHARED)) == 0:
|
|
return # we are the only one and we keep the lock!
|
|
# restore the roster state as before (undo the roster change):
|
|
if remove is not None:
|
|
self._roster.modify(remove, ADD)
|
|
except:
|
|
# avoid orphan lock when an exception happens here, e.g. Ctrl-C!
|
|
self._lock.release()
|
|
raise
|
|
else:
|
|
self._lock.release()
|
|
if timer.timed_out_or_sleep():
|
|
raise LockTimeout(self.path)
|
|
|
|
def release(self):
|
|
if self.is_exclusive:
|
|
self._roster.modify(EXCLUSIVE, REMOVE)
|
|
if self._roster.empty(EXCLUSIVE, SHARED):
|
|
self._roster.remove()
|
|
self._lock.release()
|
|
else:
|
|
with self._lock:
|
|
self._roster.modify(SHARED, REMOVE)
|
|
if self._roster.empty(EXCLUSIVE, SHARED):
|
|
self._roster.remove()
|
|
|
|
def upgrade(self):
|
|
# WARNING: if multiple read-lockers want to upgrade, it will deadlock because they
|
|
# all will wait until the other read locks go away - and that won't happen.
|
|
if not self.is_exclusive:
|
|
self.acquire(exclusive=True, remove=SHARED)
|
|
|
|
def downgrade(self):
|
|
if self.is_exclusive:
|
|
self.acquire(exclusive=False, remove=EXCLUSIVE)
|
|
|
|
def got_exclusive_lock(self):
|
|
return self.is_exclusive and self._lock.is_locked() and self._lock.by_me()
|
|
|
|
def break_lock(self):
|
|
self._roster.remove()
|
|
self._lock.break_lock()
|