1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2025-01-01 12:45:34 +00:00

Merge pull request #8444 from ThomasWaldmann/refresh-lock-many-unchanged-files

Refresh lock at more places
This commit is contained in:
TW 2024-10-02 14:13:39 +02:00 committed by GitHub
commit 5a87b41e37
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 21 additions and 1 deletions

View file

@ -4,7 +4,7 @@
import shutil import shutil
import stat import stat
from collections import namedtuple from collections import namedtuple
from datetime import datetime, timezone from datetime import datetime, timezone, timedelta
from time import perf_counter from time import perf_counter
from .logger import create_logger from .logger import create_logger
@ -709,6 +709,8 @@ class ChunksMixin:
def __init__(self): def __init__(self):
self._chunks = None self._chunks = None
self.last_refresh_dt = datetime.now(timezone.utc)
self.refresh_td = timedelta(seconds=60)
@property @property
def chunks(self): def chunks(self):
@ -751,13 +753,18 @@ def add_chunk(
size = len(data) # data is still uncompressed size = len(data) # data is still uncompressed
else: else:
raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also") raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
now = datetime.now(timezone.utc)
exists = self.seen_chunk(id, size) exists = self.seen_chunk(id, size)
if exists: if exists:
# if borg create is processing lots of unchanged files (no content and not metadata changes),
# there could be a long time without any repository operations and the repo lock would get stale.
self.refresh_lock(now)
return self.reuse_chunk(id, size, stats) return self.reuse_chunk(id, size, stats)
cdata = self.repo_objs.format( cdata = self.repo_objs.format(
id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type
) )
self.repository.put(id, cdata, wait=wait) self.repository.put(id, cdata, wait=wait)
self.last_refresh_dt = now # .put also refreshed the lock
self.chunks.add(id, ChunkIndex.MAX_VALUE, size) self.chunks.add(id, ChunkIndex.MAX_VALUE, size)
stats.update(size, not exists) stats.update(size, not exists)
return ChunkListEntry(id, size) return ChunkListEntry(id, size)
@ -767,6 +774,13 @@ def _write_chunks_cache(self, chunks):
write_chunkindex_to_repo_cache(self.repository, self._chunks, compact=True, clear=True) write_chunkindex_to_repo_cache(self.repository, self._chunks, compact=True, clear=True)
self._chunks = None # nothing there (cleared!) self._chunks = None # nothing there (cleared!)
def refresh_lock(self, now):
if now > self.last_refresh_dt + self.refresh_td:
# the repository lock needs to get refreshed regularly, or it will be killed as stale.
# refreshing the lock is not part of the repository API, so we do it indirectly via repository.info.
self.repository.info()
self.last_refresh_dt = now
class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin): class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin):
""" """

View file

@ -476,25 +476,31 @@ def migrate_lock(self, old_id, new_id):
self.lock.migrate_lock(old_id, new_id) self.lock.migrate_lock(old_id, new_id)
def get_manifest(self): def get_manifest(self):
self._lock_refresh()
try: try:
return self.store.load("config/manifest") return self.store.load("config/manifest")
except StoreObjectNotFound: except StoreObjectNotFound:
raise NoManifestError raise NoManifestError
def put_manifest(self, data): def put_manifest(self, data):
self._lock_refresh()
return self.store.store("config/manifest", data) return self.store.store("config/manifest", data)
def store_list(self, name): def store_list(self, name):
self._lock_refresh()
try: try:
return list(self.store.list(name)) return list(self.store.list(name))
except StoreObjectNotFound: except StoreObjectNotFound:
return [] return []
def store_load(self, name): def store_load(self, name):
self._lock_refresh()
return self.store.load(name) return self.store.load(name)
def store_store(self, name, value): def store_store(self, name, value):
self._lock_refresh()
return self.store.store(name, value) return self.store.store(name, value)
def store_delete(self, name): def store_delete(self, name):
self._lock_refresh()
return self.store.delete(name) return self.store.delete(name)