import errno import json import os import socket import time from borg.helpers import Error, ErrorWithTraceback ADD, REMOVE = 'add', 'remove' SHARED, EXCLUSIVE = 'shared', 'exclusive' # only determine the PID and hostname once. # for FUSE mounts, we fork a child process that needs to release # the lock made by the parent, so it needs to use the same PID for that. _pid = os.getpid() _hostname = socket.gethostname() def get_id(): """Get identification tuple for 'us'""" thread_id = 0 return _hostname, _pid, thread_id class TimeoutTimer: """ A timer for timeout checks (can also deal with no timeout, give timeout=None [default]). It can also compute and optionally execute a reasonable sleep time (e.g. to avoid polling too often or to support thread/process rescheduling). """ def __init__(self, timeout=None, sleep=None): """ Initialize a timer. :param timeout: time out interval [s] or None (no timeout) :param sleep: sleep interval [s] (>= 0: do sleep call, <0: don't call sleep) or None (autocompute: use 10% of timeout, or 1s for no timeout) """ if timeout is not None and timeout < 0: raise ValueError("timeout must be >= 0") self.timeout_interval = timeout if sleep is None: if timeout is None: sleep = 1.0 else: sleep = timeout / 10.0 self.sleep_interval = sleep self.start_time = None self.end_time = None def __repr__(self): return "<%s: start=%r end=%r timeout=%r sleep=%r>" % ( self.__class__.__name__, self.start_time, self.end_time, self.timeout_interval, self.sleep_interval) def start(self): self.start_time = time.time() if self.timeout_interval is not None: self.end_time = self.start_time + self.timeout_interval return self def sleep(self): if self.sleep_interval >= 0: time.sleep(self.sleep_interval) def timed_out(self): return self.end_time is not None and time.time() >= self.end_time def timed_out_or_sleep(self): if self.timed_out(): return True else: self.sleep() return False class LockError(Error): """Failed to acquire the lock {}.""" class LockErrorT(ErrorWithTraceback): """Failed to acquire the lock {}.""" class LockTimeout(LockError): """Failed to create/acquire the lock {} (timeout).""" class LockFailed(LockErrorT): """Failed to create/acquire the lock {} ({}).""" class NotLocked(LockErrorT): """Failed to release the lock {} (was not locked).""" class NotMyLock(LockErrorT): """Failed to release the lock {} (was/is locked, but not by me).""" class ExclusiveLock: """An exclusive Lock based on mkdir fs operation being atomic. If possible, try to use the contextmanager here like: with ExclusiveLock(...) as lock: ... This makes sure the lock is released again if the block is left, no matter how (e.g. if an exception occurred). """ def __init__(self, path, timeout=None, sleep=None, id=None): self.timeout = timeout self.sleep = sleep self.path = os.path.abspath(path) self.id = id or get_id() self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id) def __enter__(self): return self.acquire() def __exit__(self, *exc): self.release() def __repr__(self): return "<%s: %r>" % (self.__class__.__name__, self.unique_name) def acquire(self, timeout=None, sleep=None): if timeout is None: timeout = self.timeout if sleep is None: sleep = self.sleep timer = TimeoutTimer(timeout, sleep).start() while True: try: os.mkdir(self.path) except OSError as err: if err.errno == errno.EEXIST: # already locked if self.by_me(): return self if timer.timed_out_or_sleep(): raise LockTimeout(self.path) else: raise LockFailed(self.path, str(err)) else: with open(self.unique_name, "wb"): pass return self def release(self): if not self.is_locked(): raise NotLocked(self.path) if not self.by_me(): raise NotMyLock(self.path) os.unlink(self.unique_name) os.rmdir(self.path) def is_locked(self): return os.path.exists(self.path) def by_me(self): return os.path.exists(self.unique_name) def break_lock(self): if self.is_locked(): for name in os.listdir(self.path): os.unlink(os.path.join(self.path, name)) os.rmdir(self.path) class LockRoster: """ A Lock Roster to track shared/exclusive lockers. Note: you usually should call the methods with an exclusive lock held, to avoid conflicting access by multiple threads/processes/machines. """ def __init__(self, path, id=None): self.path = path self.id = id or get_id() def load(self): try: with open(self.path) as f: data = json.load(f) except IOError as err: if err.errno != errno.ENOENT: raise data = {} except ValueError: # corrupt/empty roster file? data = {} return data def save(self, data): with open(self.path, "w") as f: json.dump(data, f) def remove(self): try: os.unlink(self.path) except OSError as e: if e.errno != errno.ENOENT: raise def get(self, key): roster = self.load() return set(tuple(e) for e in roster.get(key, [])) def modify(self, key, op): roster = self.load() try: elements = set(tuple(e) for e in roster[key]) except KeyError: elements = set() if op == ADD: elements.add(self.id) elif op == REMOVE: elements.remove(self.id) else: raise ValueError('Unknown LockRoster op %r' % op) roster[key] = list(list(e) for e in elements) self.save(roster) class UpgradableLock: """ A Lock for a resource that can be accessed in a shared or exclusive way. Typically, write access to a resource needs an exclusive lock (1 writer, noone is allowed reading) and read access to a resource needs a shared lock (multiple readers are allowed). If possible, try to use the contextmanager here like: with UpgradableLock(...) as lock: ... This makes sure the lock is released again if the block is left, no matter how (e.g. if an exception occurred). """ def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None): self.path = path self.is_exclusive = exclusive self.sleep = sleep self.timeout = timeout self.id = id or get_id() # globally keeping track of shared and exclusive lockers: self._roster = LockRoster(path + '.roster', id=id) # an exclusive lock, used for: # - holding while doing roster queries / updates # - holding while the UpgradableLock itself is exclusive self._lock = ExclusiveLock(path + '.exclusive', id=id, timeout=timeout) def __enter__(self): return self.acquire() def __exit__(self, *exc): self.release() def __repr__(self): return "<%s: %r>" % (self.__class__.__name__, self.id) def acquire(self, exclusive=None, remove=None, sleep=None): if exclusive is None: exclusive = self.is_exclusive sleep = sleep or self.sleep or 0.2 if exclusive: self._wait_for_readers_finishing(remove, sleep) self._roster.modify(EXCLUSIVE, ADD) else: with self._lock: if remove is not None: self._roster.modify(remove, REMOVE) self._roster.modify(SHARED, ADD) self.is_exclusive = exclusive return self def _wait_for_readers_finishing(self, remove, sleep): timer = TimeoutTimer(self.timeout, sleep).start() while True: self._lock.acquire() try: if remove is not None: self._roster.modify(remove, REMOVE) remove = None if len(self._roster.get(SHARED)) == 0: return # we are the only one and we keep the lock! except: # avoid orphan lock when an exception happens here, e.g. Ctrl-C! self._lock.release() raise else: self._lock.release() if timer.timed_out_or_sleep(): raise LockTimeout(self.path) def release(self): if self.is_exclusive: self._roster.modify(EXCLUSIVE, REMOVE) self._lock.release() else: with self._lock: self._roster.modify(SHARED, REMOVE) def upgrade(self): if not self.is_exclusive: self.acquire(exclusive=True, remove=SHARED) def downgrade(self): if self.is_exclusive: self.acquire(exclusive=False, remove=EXCLUSIVE) def break_lock(self): self._roster.remove() self._lock.break_lock()