import errno import json import os import tempfile import time from . import platform from .helpers import Error, ErrorWithTraceback from .logger import create_logger ADD, REMOVE, REMOVE2 = "add", "remove", "remove2" SHARED, EXCLUSIVE = "shared", "exclusive" logger = create_logger(__name__) class TimeoutTimer: """ A timer for timeout checks (can also deal with "never timeout"). It can also compute and optionally execute a reasonable sleep time (e.g. to avoid polling too often or to support thread/process rescheduling). """ def __init__(self, timeout=None, sleep=None): """ Initialize a timer. :param timeout: time out interval [s] or None (never timeout, wait forever) [default] :param sleep: sleep interval [s] (>= 0: do sleep call, <0: don't call sleep) or None (autocompute: use 10% of timeout [but not more than 60s], or 1s for "never timeout" mode) """ if timeout is not None and timeout < 0: raise ValueError("timeout must be >= 0") self.timeout_interval = timeout if sleep is None: if timeout is None: sleep = 1.0 else: sleep = min(60.0, timeout / 10.0) self.sleep_interval = sleep self.start_time = None self.end_time = None def __repr__(self): return "<{}: start={!r} end={!r} timeout={!r} sleep={!r}>".format( self.__class__.__name__, self.start_time, self.end_time, self.timeout_interval, self.sleep_interval ) def start(self): self.start_time = time.time() if self.timeout_interval is not None: self.end_time = self.start_time + self.timeout_interval return self def sleep(self): if self.sleep_interval >= 0: time.sleep(self.sleep_interval) def timed_out(self): return self.end_time is not None and time.time() >= self.end_time def timed_out_or_sleep(self): if self.timed_out(): return True else: self.sleep() return False class LockError(Error): """Failed to acquire the lock {}.""" exit_mcode = 70 class LockErrorT(ErrorWithTraceback): """Failed to acquire the lock {}.""" exit_mcode = 71 class LockFailed(LockErrorT): """Failed to create/acquire the lock {} ({}).""" exit_mcode = 72 class LockTimeout(LockError): """Failed to create/acquire the lock {} (timeout).""" exit_mcode = 73 class NotLocked(LockErrorT): """Failed to release the lock {} (was not locked).""" exit_mcode = 74 class NotMyLock(LockErrorT): """Failed to release the lock {} (was/is locked, but not by me).""" exit_mcode = 75 class ExclusiveLock: """An exclusive Lock based on mkdir fs operation being atomic. If possible, try to use the contextmanager here like:: with ExclusiveLock(...) as lock: ... This makes sure the lock is released again if the block is left, no matter how (e.g. if an exception occurred). """ def __init__(self, path, timeout=None, sleep=None, id=None): self.timeout = timeout self.sleep = sleep self.path = os.path.abspath(path) self.id = id or platform.get_process_id() self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id) self.kill_stale_locks = True self.stale_warning_printed = False def __enter__(self): return self.acquire() def __exit__(self, *exc): self.release() def __repr__(self): return f"<{self.__class__.__name__}: {self.unique_name!r}>" def acquire(self, timeout=None, sleep=None): if timeout is None: timeout = self.timeout if sleep is None: sleep = self.sleep parent_path, base_name = os.path.split(self.path) unique_base_name = os.path.basename(self.unique_name) temp_path = None try: temp_path = tempfile.mkdtemp(".tmp", base_name + ".", parent_path) temp_unique_name = os.path.join(temp_path, unique_base_name) with open(temp_unique_name, "wb"): pass except OSError as err: raise LockFailed(self.path, str(err)) from None else: timer = TimeoutTimer(timeout, sleep).start() while True: try: os.replace(temp_path, self.path) except OSError: # already locked if self.by_me(): return self self.kill_stale_lock() if timer.timed_out_or_sleep(): raise LockTimeout(self.path) from None else: temp_path = None # see finally:-block below return self finally: if temp_path is not None: # Renaming failed for some reason, so temp_dir still exists and # should be cleaned up anyway. Try to clean up, but don't crash. try: os.unlink(temp_unique_name) except: # noqa pass try: os.rmdir(temp_path) except: # noqa pass def release(self): if not self.is_locked(): raise NotLocked(self.path) if not self.by_me(): raise NotMyLock(self.path) os.unlink(self.unique_name) for retry in range(42): try: os.rmdir(self.path) except OSError as err: if err.errno in (errno.EACCES,): # windows behaving strangely? -> just try again. continue if err.errno not in (errno.ENOTEMPTY, errno.EEXIST, errno.ENOENT): # EACCES or EIO or ... = we cannot operate anyway, so re-throw raise err # else: # Directory is not empty or doesn't exist any more. # this means we lost the race to somebody else -- which is ok. return def is_locked(self): return os.path.exists(self.path) def by_me(self): return os.path.exists(self.unique_name) def kill_stale_lock(self): try: names = os.listdir(self.path) except FileNotFoundError: # another process did our job in the meantime. return False except PermissionError: # win32 might throw this. return False else: for name in names: try: host_pid, thread_str = name.rsplit("-", 1) host, pid_str = host_pid.rsplit(".", 1) pid = int(pid_str) thread = int(thread_str, 16) except ValueError: # Malformed lock name? Or just some new format we don't understand? logger.error("Found malformed lock %s in %s. Please check/fix manually.", name, self.path) return False if platform.process_alive(host, pid, thread): return False if not self.kill_stale_locks: if not self.stale_warning_printed: # Log this at warning level to hint the user at the ability logger.warning( "Found stale lock %s, but not deleting because self.kill_stale_locks = False.", name ) self.stale_warning_printed = True return False try: os.unlink(os.path.join(self.path, name)) logger.warning("Killed stale lock %s.", name) except OSError as err: if not self.stale_warning_printed: # This error will bubble up and likely result in locking failure logger.error("Found stale lock %s, but cannot delete due to %s", name, str(err)) self.stale_warning_printed = True return False try: os.rmdir(self.path) except OSError as err: if err.errno in (errno.ENOTEMPTY, errno.EEXIST, errno.ENOENT): # Directory is not empty or doesn't exist any more = we lost the race to somebody else--which is ok. return False # EACCES or EIO or ... = we cannot operate anyway logger.error("Failed to remove lock dir: %s", str(err)) return False return True def break_lock(self): if self.is_locked(): for name in os.listdir(self.path): os.unlink(os.path.join(self.path, name)) os.rmdir(self.path) def migrate_lock(self, old_id, new_id): """migrate the lock ownership from old_id to new_id""" assert self.id == old_id new_unique_name = os.path.join(self.path, "%s.%d-%x" % new_id) if self.is_locked() and self.by_me(): with open(new_unique_name, "wb"): pass os.unlink(self.unique_name) self.id, self.unique_name = new_id, new_unique_name class LockRoster: """ A Lock Roster to track shared/exclusive lockers. Note: you usually should call the methods with an exclusive lock held, to avoid conflicting access by multiple threads/processes/machines. """ def __init__(self, path, id=None): self.path = path self.id = id or platform.get_process_id() self.kill_stale_locks = True def load(self): try: with open(self.path) as f: data = json.load(f) # Just nuke the stale locks early on load if self.kill_stale_locks: for key in (SHARED, EXCLUSIVE): try: entries = data[key] except KeyError: continue elements = set() for host, pid, thread in entries: if platform.process_alive(host, pid, thread): elements.add((host, pid, thread)) else: logger.warning( "Removed stale %s roster lock for host %s pid %d thread %d.", key, host, pid, thread ) data[key] = list(elements) except (FileNotFoundError, ValueError): # no or corrupt/empty roster file? data = {} return data def save(self, data): with open(self.path, "w") as f: json.dump(data, f) def remove(self): try: os.unlink(self.path) except FileNotFoundError: pass def get(self, key): roster = self.load() return {tuple(e) for e in roster.get(key, [])} def empty(self, *keys): return all(not self.get(key) for key in keys) def modify(self, key, op): roster = self.load() try: elements = {tuple(e) for e in roster[key]} except KeyError: elements = set() if op == ADD: elements.add(self.id) elif op == REMOVE: # note: we ignore it if the element is already not present anymore. # this has been frequently seen in teardowns involving Repository.__del__ and Repository.__exit__. elements.discard(self.id) elif op == REMOVE2: # needed for callers that do not want to ignore. elements.remove(self.id) else: raise ValueError("Unknown LockRoster op %r" % op) roster[key] = list(list(e) for e in elements) self.save(roster) def migrate_lock(self, key, old_id, new_id): """migrate the lock ownership from old_id to new_id""" assert self.id == old_id # need to switch off stale lock killing temporarily as we want to # migrate rather than kill them (at least the one made by old_id). killing, self.kill_stale_locks = self.kill_stale_locks, False try: try: self.modify(key, REMOVE2) except KeyError: # entry was not there, so no need to add a new one, but still update our id self.id = new_id else: # old entry removed, update our id and add a updated entry self.id = new_id self.modify(key, ADD) finally: self.kill_stale_locks = killing class Lock: """ A Lock for a resource that can be accessed in a shared or exclusive way. Typically, write access to a resource needs an exclusive lock (1 writer, no one is allowed reading) and read access to a resource needs a shared lock (multiple readers are allowed). If possible, try to use the contextmanager here like:: with Lock(...) as lock: ... This makes sure the lock is released again if the block is left, no matter how (e.g. if an exception occurred). """ def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None): self.path = path self.is_exclusive = exclusive self.sleep = sleep self.timeout = timeout self.id = id or platform.get_process_id() # globally keeping track of shared and exclusive lockers: self._roster = LockRoster(path + ".roster", id=id) # an exclusive lock, used for: # - holding while doing roster queries / updates # - holding while the Lock itself is exclusive self._lock = ExclusiveLock(path + ".exclusive", id=id, timeout=timeout) def __enter__(self): return self.acquire() def __exit__(self, *exc): self.release() def __repr__(self): return f"<{self.__class__.__name__}: {self.id!r}>" def acquire(self, exclusive=None, remove=None, sleep=None): if exclusive is None: exclusive = self.is_exclusive sleep = sleep or self.sleep or 0.2 if exclusive: self._wait_for_readers_finishing(remove, sleep) self._roster.modify(EXCLUSIVE, ADD) else: with self._lock: if remove is not None: self._roster.modify(remove, REMOVE) self._roster.modify(SHARED, ADD) self.is_exclusive = exclusive return self def _wait_for_readers_finishing(self, remove, sleep): timer = TimeoutTimer(self.timeout, sleep).start() while True: self._lock.acquire() try: if remove is not None: self._roster.modify(remove, REMOVE) if len(self._roster.get(SHARED)) == 0: return # we are the only one and we keep the lock! # restore the roster state as before (undo the roster change): if remove is not None: self._roster.modify(remove, ADD) except: # noqa # avoid orphan lock when an exception happens here, e.g. Ctrl-C! self._lock.release() raise else: self._lock.release() if timer.timed_out_or_sleep(): raise LockTimeout(self.path) def release(self): if self.is_exclusive: self._roster.modify(EXCLUSIVE, REMOVE) if self._roster.empty(EXCLUSIVE, SHARED): self._roster.remove() self._lock.release() else: with self._lock: self._roster.modify(SHARED, REMOVE) if self._roster.empty(EXCLUSIVE, SHARED): self._roster.remove() def upgrade(self): # WARNING: if multiple read-lockers want to upgrade, it will deadlock because they # all will wait until the other read locks go away - and that won't happen. if not self.is_exclusive: self.acquire(exclusive=True, remove=SHARED) def downgrade(self): if self.is_exclusive: self.acquire(exclusive=False, remove=EXCLUSIVE) def got_exclusive_lock(self): return self.is_exclusive and self._lock.is_locked() and self._lock.by_me() def break_lock(self): self._roster.remove() self._lock.break_lock() def migrate_lock(self, old_id, new_id): assert self.id == old_id self.id = new_id if self.is_exclusive: self._lock.migrate_lock(old_id, new_id) self._roster.migrate_lock(EXCLUSIVE, old_id, new_id) else: with self._lock: self._lock.migrate_lock(old_id, new_id) self._roster.migrate_lock(SHARED, old_id, new_id)