mirror of https://github.com/borgbackup/borg.git
migrate locks to child PID when daemonize is used
also: increase platform api version due to change in get_process_id behaviour.
This commit is contained in:
parent
72c8ec2583
commit
6f94949a36
|
@ -22,6 +22,7 @@ from .hashindex import FuseVersionsIndex
|
||||||
from .helpers import daemonize, hardlinkable, signal_handler, format_file_size
|
from .helpers import daemonize, hardlinkable, signal_handler, format_file_size
|
||||||
from .item import Item
|
from .item import Item
|
||||||
from .lrucache import LRUCache
|
from .lrucache import LRUCache
|
||||||
|
from .remote import RemoteRepository
|
||||||
|
|
||||||
# Does this version of llfuse support ns precision?
|
# Does this version of llfuse support ns precision?
|
||||||
have_fuse_xtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns')
|
have_fuse_xtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns')
|
||||||
|
@ -285,7 +286,10 @@ class FuseOperations(llfuse.Operations):
|
||||||
self._create_filesystem()
|
self._create_filesystem()
|
||||||
llfuse.init(self, mountpoint, options)
|
llfuse.init(self, mountpoint, options)
|
||||||
if not foreground:
|
if not foreground:
|
||||||
daemonize()
|
old_id, new_id = daemonize()
|
||||||
|
if not isinstance(self.repository_uncached, RemoteRepository):
|
||||||
|
# local repo and the locking process' PID just changed, migrate it:
|
||||||
|
self.repository_uncached.migrate_lock(old_id, new_id)
|
||||||
|
|
||||||
# If the file system crashes, we do not want to umount because in that
|
# If the file system crashes, we do not want to umount because in that
|
||||||
# case the mountpoint suddenly appears to become empty. This can have
|
# case the mountpoint suddenly appears to become empty. This can have
|
||||||
|
|
|
@ -28,7 +28,7 @@ def check_extension_modules():
|
||||||
raise ExtensionModuleError
|
raise ExtensionModuleError
|
||||||
if borg.crypto.low_level.API_VERSION != '1.1_02':
|
if borg.crypto.low_level.API_VERSION != '1.1_02':
|
||||||
raise ExtensionModuleError
|
raise ExtensionModuleError
|
||||||
if platform.API_VERSION != platform.OS_API_VERSION != '1.1_01':
|
if platform.API_VERSION != platform.OS_API_VERSION != '1.1_02':
|
||||||
raise ExtensionModuleError
|
raise ExtensionModuleError
|
||||||
if item.API_VERSION != '1.1_03':
|
if item.API_VERSION != '1.1_03':
|
||||||
raise ExtensionModuleError
|
raise ExtensionModuleError
|
||||||
|
|
|
@ -11,7 +11,12 @@ logger = create_logger()
|
||||||
|
|
||||||
|
|
||||||
def daemonize():
|
def daemonize():
|
||||||
"""Detach process from controlling terminal and run in background"""
|
"""Detach process from controlling terminal and run in background
|
||||||
|
|
||||||
|
Returns: old and new get_process_id tuples
|
||||||
|
"""
|
||||||
|
from ..platform import get_process_id
|
||||||
|
old_id = get_process_id()
|
||||||
pid = os.fork()
|
pid = os.fork()
|
||||||
if pid:
|
if pid:
|
||||||
os._exit(0)
|
os._exit(0)
|
||||||
|
@ -27,6 +32,8 @@ def daemonize():
|
||||||
os.dup2(fd, 0)
|
os.dup2(fd, 0)
|
||||||
os.dup2(fd, 1)
|
os.dup2(fd, 1)
|
||||||
os.dup2(fd, 2)
|
os.dup2(fd, 2)
|
||||||
|
new_id = get_process_id()
|
||||||
|
return old_id, new_id
|
||||||
|
|
||||||
|
|
||||||
class SignalException(BaseException):
|
class SignalException(BaseException):
|
||||||
|
|
|
@ -202,6 +202,16 @@ class ExclusiveLock:
|
||||||
os.unlink(os.path.join(self.path, name))
|
os.unlink(os.path.join(self.path, name))
|
||||||
os.rmdir(self.path)
|
os.rmdir(self.path)
|
||||||
|
|
||||||
|
def migrate_lock(self, old_id, new_id):
|
||||||
|
"""migrate the lock ownership from old_id to new_id"""
|
||||||
|
if self.id == old_id:
|
||||||
|
new_unique_name = os.path.join(self.path, "%s.%d-%x" % new_id)
|
||||||
|
if self.is_locked() and self.by_me():
|
||||||
|
with open(new_unique_name, "wb"):
|
||||||
|
pass
|
||||||
|
os.unlink(self.unique_name)
|
||||||
|
self.id, self.unique_name = new_id, new_unique_name
|
||||||
|
|
||||||
|
|
||||||
class LockRoster:
|
class LockRoster:
|
||||||
"""
|
"""
|
||||||
|
@ -271,6 +281,25 @@ class LockRoster:
|
||||||
roster[key] = list(list(e) for e in elements)
|
roster[key] = list(list(e) for e in elements)
|
||||||
self.save(roster)
|
self.save(roster)
|
||||||
|
|
||||||
|
def migrate_lock(self, key, old_id, new_id):
|
||||||
|
"""migrate the lock ownership from old_id to new_id"""
|
||||||
|
if self.id == old_id:
|
||||||
|
# need to temporarily switch off stale lock killing as we want to
|
||||||
|
# rather migrate than kill them (at least the one made by old_id).
|
||||||
|
killing, self.kill_stale_locks = self.kill_stale_locks, False
|
||||||
|
try:
|
||||||
|
try:
|
||||||
|
self.modify(key, REMOVE)
|
||||||
|
except KeyError:
|
||||||
|
# entry was not there, so no need to add a new one, but still update our id
|
||||||
|
self.id = new_id
|
||||||
|
else:
|
||||||
|
# old entry removed, update our id and add a updated entry
|
||||||
|
self.id = new_id
|
||||||
|
self.modify(key, ADD)
|
||||||
|
finally:
|
||||||
|
self.kill_stale_locks = killing
|
||||||
|
|
||||||
|
|
||||||
class Lock:
|
class Lock:
|
||||||
"""
|
"""
|
||||||
|
@ -373,3 +402,14 @@ class Lock:
|
||||||
def break_lock(self):
|
def break_lock(self):
|
||||||
self._roster.remove()
|
self._roster.remove()
|
||||||
self._lock.break_lock()
|
self._lock.break_lock()
|
||||||
|
|
||||||
|
def migrate_lock(self, old_id, new_id):
|
||||||
|
if self.id == old_id:
|
||||||
|
self.id = new_id
|
||||||
|
if self.is_exclusive:
|
||||||
|
self._lock.migrate_lock(old_id, new_id)
|
||||||
|
self._roster.migrate_lock(EXCLUSIVE, old_id, new_id)
|
||||||
|
else:
|
||||||
|
with self._lock:
|
||||||
|
self._lock.migrate_lock(old_id, new_id)
|
||||||
|
self._roster.migrate_lock(SHARED, old_id, new_id)
|
||||||
|
|
|
@ -15,7 +15,7 @@ platform API: that way platform APIs provided by the platform-specific support m
|
||||||
are correctly composed into the base functionality.
|
are correctly composed into the base functionality.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
API_VERSION = '1.1_01'
|
API_VERSION = '1.1_02'
|
||||||
|
|
||||||
fdatasync = getattr(os, 'fdatasync', os.fsync)
|
fdatasync = getattr(os, 'fdatasync', os.fsync)
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,7 @@ from ..helpers import user2uid, group2gid
|
||||||
from ..helpers import safe_decode, safe_encode
|
from ..helpers import safe_decode, safe_encode
|
||||||
from .posix import swidth
|
from .posix import swidth
|
||||||
|
|
||||||
API_VERSION = '1.1_01'
|
API_VERSION = '1.1_02'
|
||||||
|
|
||||||
cdef extern from "sys/acl.h":
|
cdef extern from "sys/acl.h":
|
||||||
ctypedef struct _acl_t:
|
ctypedef struct _acl_t:
|
||||||
|
|
|
@ -4,7 +4,7 @@ from ..helpers import posix_acl_use_stored_uid_gid
|
||||||
from ..helpers import safe_encode, safe_decode
|
from ..helpers import safe_encode, safe_decode
|
||||||
from .posix import swidth
|
from .posix import swidth
|
||||||
|
|
||||||
API_VERSION = '1.1_01'
|
API_VERSION = '1.1_02'
|
||||||
|
|
||||||
cdef extern from "errno.h":
|
cdef extern from "errno.h":
|
||||||
int errno
|
int errno
|
||||||
|
|
|
@ -13,7 +13,7 @@ from .posix import swidth
|
||||||
from libc cimport errno
|
from libc cimport errno
|
||||||
from libc.stdint cimport int64_t
|
from libc.stdint cimport int64_t
|
||||||
|
|
||||||
API_VERSION = '1.1_01'
|
API_VERSION = '1.1_02'
|
||||||
|
|
||||||
cdef extern from "sys/types.h":
|
cdef extern from "sys/types.h":
|
||||||
int ACL_TYPE_ACCESS
|
int ACL_TYPE_ACCESS
|
||||||
|
|
|
@ -18,23 +18,21 @@ def swidth(s):
|
||||||
return str_len
|
return str_len
|
||||||
|
|
||||||
|
|
||||||
# only determine the PID and hostname once.
|
# for performance reasons, only determine the hostname once.
|
||||||
# for FUSE mounts, we fork a child process that needs to release
|
|
||||||
# the lock made by the parent, so it needs to use the same PID for that.
|
|
||||||
_pid = os.getpid()
|
|
||||||
# XXX this sometimes requires live internet access for issuing a DNS query in the background.
|
# XXX this sometimes requires live internet access for issuing a DNS query in the background.
|
||||||
_hostname = '%s@%s' % (socket.getfqdn(), uuid.getnode())
|
_hostname = '%s@%s' % (socket.getfqdn(), uuid.getnode())
|
||||||
|
|
||||||
|
|
||||||
def get_process_id():
|
def get_process_id():
|
||||||
"""
|
"""
|
||||||
Return identification tuple (hostname, pid, thread_id) for 'us'. If this is a FUSE process, then the PID will be
|
Return identification tuple (hostname, pid, thread_id) for 'us'.
|
||||||
that of the parent, not the forked FUSE child.
|
This always returns the current pid, which might be different from before, e.g. if daemonize() was used.
|
||||||
|
|
||||||
Note: Currently thread_id is *always* zero.
|
Note: Currently thread_id is *always* zero.
|
||||||
"""
|
"""
|
||||||
thread_id = 0
|
thread_id = 0
|
||||||
return _hostname, _pid, thread_id
|
pid = os.getpid()
|
||||||
|
return _hostname, pid, thread_id
|
||||||
|
|
||||||
|
|
||||||
def process_alive(host, pid, thread):
|
def process_alive(host, pid, thread):
|
||||||
|
|
|
@ -346,6 +346,11 @@ class Repository:
|
||||||
def break_lock(self):
|
def break_lock(self):
|
||||||
Lock(os.path.join(self.path, 'lock')).break_lock()
|
Lock(os.path.join(self.path, 'lock')).break_lock()
|
||||||
|
|
||||||
|
def migrate_lock(self, old_id, new_id):
|
||||||
|
# note: only needed for local repos
|
||||||
|
if self.lock is not None:
|
||||||
|
self.lock.migrate_lock(old_id, new_id)
|
||||||
|
|
||||||
def open(self, path, exclusive, lock_wait=None, lock=True):
|
def open(self, path, exclusive, lock_wait=None, lock=True):
|
||||||
self.path = path
|
self.path = path
|
||||||
if not os.path.isdir(path):
|
if not os.path.isdir(path):
|
||||||
|
|
|
@ -3,6 +3,7 @@ import time
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
from ..helpers import daemonize
|
||||||
from ..platform import get_process_id, process_alive
|
from ..platform import get_process_id, process_alive
|
||||||
from ..locking import TimeoutTimer, ExclusiveLock, Lock, LockRoster, \
|
from ..locking import TimeoutTimer, ExclusiveLock, Lock, LockRoster, \
|
||||||
ADD, REMOVE, SHARED, EXCLUSIVE, LockTimeout, NotLocked, NotMyLock
|
ADD, REMOVE, SHARED, EXCLUSIVE, LockTimeout, NotLocked, NotMyLock
|
||||||
|
@ -76,6 +77,19 @@ class TestExclusiveLock:
|
||||||
with pytest.raises(LockTimeout):
|
with pytest.raises(LockTimeout):
|
||||||
ExclusiveLock(lockpath, id=our_id, kill_stale_locks=True, timeout=0.1).acquire()
|
ExclusiveLock(lockpath, id=our_id, kill_stale_locks=True, timeout=0.1).acquire()
|
||||||
|
|
||||||
|
def test_migrate_lock(self, lockpath):
|
||||||
|
old_id, new_id = ID1, ID2
|
||||||
|
assert old_id[1] != new_id[1] # different PIDs (like when doing daemonize())
|
||||||
|
lock = ExclusiveLock(lockpath, id=old_id).acquire()
|
||||||
|
assert lock.id == old_id # lock is for old id / PID
|
||||||
|
old_unique_name = lock.unique_name
|
||||||
|
assert lock.by_me() # we have the lock
|
||||||
|
lock.migrate_lock(old_id, new_id) # fix the lock
|
||||||
|
assert lock.id == new_id # lock corresponds to the new id / PID
|
||||||
|
new_unique_name = lock.unique_name
|
||||||
|
assert lock.by_me() # we still have the lock
|
||||||
|
assert old_unique_name != new_unique_name # locking filename is different now
|
||||||
|
|
||||||
|
|
||||||
class TestLock:
|
class TestLock:
|
||||||
def test_shared(self, lockpath):
|
def test_shared(self, lockpath):
|
||||||
|
@ -155,6 +169,22 @@ class TestLock:
|
||||||
with pytest.raises(LockTimeout):
|
with pytest.raises(LockTimeout):
|
||||||
Lock(lockpath, id=our_id, kill_stale_locks=True, timeout=0.1).acquire()
|
Lock(lockpath, id=our_id, kill_stale_locks=True, timeout=0.1).acquire()
|
||||||
|
|
||||||
|
def test_migrate_lock(self, lockpath):
|
||||||
|
old_id, new_id = ID1, ID2
|
||||||
|
assert old_id[1] != new_id[1] # different PIDs (like when doing daemonize())
|
||||||
|
|
||||||
|
lock = Lock(lockpath, id=old_id, exclusive=True).acquire()
|
||||||
|
assert lock.id == old_id
|
||||||
|
lock.migrate_lock(old_id, new_id) # fix the lock
|
||||||
|
assert lock.id == new_id
|
||||||
|
lock.release()
|
||||||
|
|
||||||
|
lock = Lock(lockpath, id=old_id, exclusive=False).acquire()
|
||||||
|
assert lock.id == old_id
|
||||||
|
lock.migrate_lock(old_id, new_id) # fix the lock
|
||||||
|
assert lock.id == new_id
|
||||||
|
lock.release()
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture()
|
@pytest.fixture()
|
||||||
def rosterpath(tmpdir):
|
def rosterpath(tmpdir):
|
||||||
|
@ -207,3 +237,14 @@ class TestLockRoster:
|
||||||
other_killer_roster = LockRoster(rosterpath, kill_stale_locks=True)
|
other_killer_roster = LockRoster(rosterpath, kill_stale_locks=True)
|
||||||
# Did not kill us, since we're alive
|
# Did not kill us, since we're alive
|
||||||
assert other_killer_roster.get(SHARED) == {our_id, cant_know_if_dead_id}
|
assert other_killer_roster.get(SHARED) == {our_id, cant_know_if_dead_id}
|
||||||
|
|
||||||
|
def test_migrate_lock(self, rosterpath):
|
||||||
|
old_id, new_id = ID1, ID2
|
||||||
|
assert old_id[1] != new_id[1] # different PIDs (like when doing daemonize())
|
||||||
|
roster = LockRoster(rosterpath, id=old_id)
|
||||||
|
assert roster.id == old_id
|
||||||
|
roster.modify(SHARED, ADD)
|
||||||
|
assert roster.get(SHARED) == {old_id}
|
||||||
|
roster.migrate_lock(SHARED, old_id, new_id) # fix the lock
|
||||||
|
assert roster.id == new_id
|
||||||
|
assert roster.get(SHARED) == {new_id}
|
||||||
|
|
Loading…
Reference in New Issue