From 31e5318e66b531d2a4ed0eb065098e7f124aa32d Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 20 Sep 2024 09:12:15 +0200 Subject: [PATCH] storelocking: fixes / cleanups - on explicit request, update .last_refresh_dt inside _create_lock / _delete_lock - reset .last_refresh_dt if we kill our own lock - be more precise, have exactly the datetime of the lock in .last_refresh_dt - cosmetic: do refresh/stale time comparisons always in the same way --- src/borg/storelocking.py | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/src/borg/storelocking.py b/src/borg/storelocking.py index 19d92c43d..2e8e9aad5 100644 --- a/src/borg/storelocking.py +++ b/src/borg/storelocking.py @@ -93,7 +93,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): def __repr__(self): return f"<{self.__class__.__name__}: {self.id!r}>" - def _create_lock(self, *, exclusive=None): + def _create_lock(self, *, exclusive=None, update_last_refresh=False): assert exclusive is not None now = datetime.datetime.now(datetime.timezone.utc) timestamp = now.isoformat(timespec="milliseconds") @@ -102,20 +102,28 @@ def _create_lock(self, *, exclusive=None): key = bin_to_hex(xxh64(value)) logger.debug(f"LOCK-CREATE: creating lock in store. key: {key}, lock: {lock}.") self.store.store(f"locks/{key}", value) - self.last_refresh_dt = now + if update_last_refresh: + # we parse the timestamp str to get *precisely* the datetime in the lock: + self.last_refresh_dt = datetime.datetime.fromisoformat(timestamp) return key - def _delete_lock(self, key, *, ignore_not_found=False): + def _delete_lock(self, key, *, ignore_not_found=False, update_last_refresh=False): logger.debug(f"LOCK-DELETE: deleting lock from store. key: {key}.") try: self.store.delete(f"locks/{key}") except ObjectNotFound: if not ignore_not_found: raise + finally: + if update_last_refresh: + self.last_refresh_dt = None + + def _is_our_lock(self, lock): + return self.id == (lock["hostid"], lock["processid"], lock["threadid"]) def _is_stale_lock(self, lock): now = datetime.datetime.now(datetime.timezone.utc) - if lock["dt"] < now - self.stale_td: + if now > lock["dt"] + self.stale_td: logger.debug(f"LOCK-STALE: lock is too old, it was not refreshed. lock: {lock}.") return True if not platform.process_alive(lock["hostid"], lock["processid"], lock["threadid"]): @@ -137,7 +145,7 @@ def _get_locks(self): lock["dt"] = datetime.datetime.fromisoformat(lock["time"]) if self._is_stale_lock(lock): # ignore it and delete it (even if it is not from us) - self._delete_lock(key, ignore_not_found=True) + self._delete_lock(key, ignore_not_found=True, update_last_refresh=self._is_our_lock(lock)) else: locks[key] = lock return locks @@ -163,7 +171,7 @@ def acquire(self): exclusive_locks = self._find_locks(only_exclusive=True) if len(exclusive_locks) == 0: # looks like there are no exclusive locks, create our lock. - key = self._create_lock(exclusive=self.is_exclusive) + key = self._create_lock(exclusive=self.is_exclusive, update_last_refresh=True) # obviously we have a race condition here: other client(s) might have created exclusive # lock(s) at the same time in parallel. thus we have to check again. time.sleep( @@ -183,7 +191,7 @@ def acquire(self): break # timeout else: logger.debug("LOCK-ACQUIRE: someone else also created an exclusive lock, deleting ours.") - self._delete_lock(key, ignore_not_found=True) + self._delete_lock(key, ignore_not_found=True, update_last_refresh=True) else: # not is_exclusive if len(exclusive_locks) == 0: logger.debug("LOCK-ACQUIRE: success! no exclusive locks detected.") @@ -191,7 +199,7 @@ def acquire(self): return self else: logger.debug("LOCK-ACQUIRE: exclusive locks detected, deleting our shared lock.") - self._delete_lock(key, ignore_not_found=True) + self._delete_lock(key, ignore_not_found=True, update_last_refresh=True) # wait a random bit before retrying time.sleep(self.retry_delay_min + (self.retry_delay_max - self.retry_delay_min) * random.random()) logger.debug("LOCK-ACQUIRE: timeout while trying to acquire a lock.") @@ -209,7 +217,7 @@ def release(self, *, ignore_not_found=False): assert len(locks) == 1 lock = locks[0] logger.debug(f"LOCK-RELEASE: releasing lock: {lock}.") - self._delete_lock(lock["key"], ignore_not_found=True) + self._delete_lock(lock["key"], ignore_not_found=True, update_last_refresh=True) def got_exclusive_lock(self): locks = self._find_locks(only_mine=True, only_exclusive=True) @@ -231,10 +239,8 @@ def migrate_lock(self, old_id, new_id): old_locks = self._find_locks(only_mine=True) assert len(old_locks) == 1 self.id = new_id - self._create_lock(exclusive=old_locks[0]["exclusive"]) - self._delete_lock(old_locks[0]["key"]) - now = datetime.datetime.now(datetime.timezone.utc) - self.last_refresh_dt = now + self._create_lock(exclusive=old_locks[0]["exclusive"], update_last_refresh=True) + self._delete_lock(old_locks[0]["key"], update_last_refresh=False) def refresh(self): """refresh the lock - call this frequently, but not later than every seconds""" @@ -255,8 +261,7 @@ def refresh(self): raise LockTimeout(str(self.store)) assert len(old_locks) == 1 # there shouldn't be more than 1 old_lock = old_locks[0] - if old_lock["dt"] < now - self.refresh_td: + if now > old_lock["dt"] + self.refresh_td: logger.debug(f"LOCK-REFRESH: lock needs a refresh. lock: {old_lock}.") - self._create_lock(exclusive=old_lock["exclusive"]) - self._delete_lock(old_lock["key"]) - self.last_refresh_dt = now + self._create_lock(exclusive=old_lock["exclusive"], update_last_refresh=True) + self._delete_lock(old_lock["key"], update_last_refresh=False)