1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2025-02-08 15:37:09 +00:00

Merge pull request #8399 from ThomasWaldmann/storelocking-updates

storelocking: fixes / cleanups
This commit is contained in:
TW 2024-09-20 14:28:01 +02:00 committed by GitHub
commit 275e5e136c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -93,7 +93,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
def __repr__(self): def __repr__(self):
return f"<{self.__class__.__name__}: {self.id!r}>" return f"<{self.__class__.__name__}: {self.id!r}>"
def _create_lock(self, *, exclusive=None): def _create_lock(self, *, exclusive=None, update_last_refresh=False):
assert exclusive is not None assert exclusive is not None
now = datetime.datetime.now(datetime.timezone.utc) now = datetime.datetime.now(datetime.timezone.utc)
timestamp = now.isoformat(timespec="milliseconds") timestamp = now.isoformat(timespec="milliseconds")
@ -102,20 +102,28 @@ def _create_lock(self, *, exclusive=None):
key = bin_to_hex(xxh64(value)) key = bin_to_hex(xxh64(value))
logger.debug(f"LOCK-CREATE: creating lock in store. key: {key}, lock: {lock}.") logger.debug(f"LOCK-CREATE: creating lock in store. key: {key}, lock: {lock}.")
self.store.store(f"locks/{key}", value) self.store.store(f"locks/{key}", value)
self.last_refresh_dt = now if update_last_refresh:
# we parse the timestamp str to get *precisely* the datetime in the lock:
self.last_refresh_dt = datetime.datetime.fromisoformat(timestamp)
return key return key
def _delete_lock(self, key, *, ignore_not_found=False): def _delete_lock(self, key, *, ignore_not_found=False, update_last_refresh=False):
logger.debug(f"LOCK-DELETE: deleting lock from store. key: {key}.") logger.debug(f"LOCK-DELETE: deleting lock from store. key: {key}.")
try: try:
self.store.delete(f"locks/{key}") self.store.delete(f"locks/{key}")
except ObjectNotFound: except ObjectNotFound:
if not ignore_not_found: if not ignore_not_found:
raise raise
finally:
if update_last_refresh:
self.last_refresh_dt = None
def _is_our_lock(self, lock):
return self.id == (lock["hostid"], lock["processid"], lock["threadid"])
def _is_stale_lock(self, lock): def _is_stale_lock(self, lock):
now = datetime.datetime.now(datetime.timezone.utc) now = datetime.datetime.now(datetime.timezone.utc)
if lock["dt"] < now - self.stale_td: if now > lock["dt"] + self.stale_td:
logger.debug(f"LOCK-STALE: lock is too old, it was not refreshed. lock: {lock}.") logger.debug(f"LOCK-STALE: lock is too old, it was not refreshed. lock: {lock}.")
return True return True
if not platform.process_alive(lock["hostid"], lock["processid"], lock["threadid"]): if not platform.process_alive(lock["hostid"], lock["processid"], lock["threadid"]):
@ -137,7 +145,7 @@ def _get_locks(self):
lock["dt"] = datetime.datetime.fromisoformat(lock["time"]) lock["dt"] = datetime.datetime.fromisoformat(lock["time"])
if self._is_stale_lock(lock): if self._is_stale_lock(lock):
# ignore it and delete it (even if it is not from us) # ignore it and delete it (even if it is not from us)
self._delete_lock(key, ignore_not_found=True) self._delete_lock(key, ignore_not_found=True, update_last_refresh=self._is_our_lock(lock))
else: else:
locks[key] = lock locks[key] = lock
return locks return locks
@ -163,7 +171,7 @@ def acquire(self):
exclusive_locks = self._find_locks(only_exclusive=True) exclusive_locks = self._find_locks(only_exclusive=True)
if len(exclusive_locks) == 0: if len(exclusive_locks) == 0:
# looks like there are no exclusive locks, create our lock. # looks like there are no exclusive locks, create our lock.
key = self._create_lock(exclusive=self.is_exclusive) key = self._create_lock(exclusive=self.is_exclusive, update_last_refresh=True)
# obviously we have a race condition here: other client(s) might have created exclusive # obviously we have a race condition here: other client(s) might have created exclusive
# lock(s) at the same time in parallel. thus we have to check again. # lock(s) at the same time in parallel. thus we have to check again.
time.sleep( time.sleep(
@ -183,7 +191,7 @@ def acquire(self):
break # timeout break # timeout
else: else:
logger.debug("LOCK-ACQUIRE: someone else also created an exclusive lock, deleting ours.") logger.debug("LOCK-ACQUIRE: someone else also created an exclusive lock, deleting ours.")
self._delete_lock(key, ignore_not_found=True) self._delete_lock(key, ignore_not_found=True, update_last_refresh=True)
else: # not is_exclusive else: # not is_exclusive
if len(exclusive_locks) == 0: if len(exclusive_locks) == 0:
logger.debug("LOCK-ACQUIRE: success! no exclusive locks detected.") logger.debug("LOCK-ACQUIRE: success! no exclusive locks detected.")
@ -191,7 +199,7 @@ def acquire(self):
return self return self
else: else:
logger.debug("LOCK-ACQUIRE: exclusive locks detected, deleting our shared lock.") logger.debug("LOCK-ACQUIRE: exclusive locks detected, deleting our shared lock.")
self._delete_lock(key, ignore_not_found=True) self._delete_lock(key, ignore_not_found=True, update_last_refresh=True)
# wait a random bit before retrying # wait a random bit before retrying
time.sleep(self.retry_delay_min + (self.retry_delay_max - self.retry_delay_min) * random.random()) time.sleep(self.retry_delay_min + (self.retry_delay_max - self.retry_delay_min) * random.random())
logger.debug("LOCK-ACQUIRE: timeout while trying to acquire a lock.") logger.debug("LOCK-ACQUIRE: timeout while trying to acquire a lock.")
@ -209,7 +217,7 @@ def release(self, *, ignore_not_found=False):
assert len(locks) == 1 assert len(locks) == 1
lock = locks[0] lock = locks[0]
logger.debug(f"LOCK-RELEASE: releasing lock: {lock}.") logger.debug(f"LOCK-RELEASE: releasing lock: {lock}.")
self._delete_lock(lock["key"], ignore_not_found=True) self._delete_lock(lock["key"], ignore_not_found=True, update_last_refresh=True)
def got_exclusive_lock(self): def got_exclusive_lock(self):
locks = self._find_locks(only_mine=True, only_exclusive=True) locks = self._find_locks(only_mine=True, only_exclusive=True)
@ -231,10 +239,8 @@ def migrate_lock(self, old_id, new_id):
old_locks = self._find_locks(only_mine=True) old_locks = self._find_locks(only_mine=True)
assert len(old_locks) == 1 assert len(old_locks) == 1
self.id = new_id self.id = new_id
self._create_lock(exclusive=old_locks[0]["exclusive"]) self._create_lock(exclusive=old_locks[0]["exclusive"], update_last_refresh=True)
self._delete_lock(old_locks[0]["key"]) self._delete_lock(old_locks[0]["key"], update_last_refresh=False)
now = datetime.datetime.now(datetime.timezone.utc)
self.last_refresh_dt = now
def refresh(self): def refresh(self):
"""refresh the lock - call this frequently, but not later than every <stale> seconds""" """refresh the lock - call this frequently, but not later than every <stale> seconds"""
@ -255,8 +261,7 @@ def refresh(self):
raise LockTimeout(str(self.store)) raise LockTimeout(str(self.store))
assert len(old_locks) == 1 # there shouldn't be more than 1 assert len(old_locks) == 1 # there shouldn't be more than 1
old_lock = old_locks[0] old_lock = old_locks[0]
if old_lock["dt"] < now - self.refresh_td: if now > old_lock["dt"] + self.refresh_td:
logger.debug(f"LOCK-REFRESH: lock needs a refresh. lock: {old_lock}.") logger.debug(f"LOCK-REFRESH: lock needs a refresh. lock: {old_lock}.")
self._create_lock(exclusive=old_lock["exclusive"]) self._create_lock(exclusive=old_lock["exclusive"], update_last_refresh=True)
self._delete_lock(old_lock["key"]) self._delete_lock(old_lock["key"], update_last_refresh=False)
self.last_refresh_dt = now