mirror of https://github.com/borgbackup/borg.git
mount_cmds converted
This commit is contained in:
parent
91dd71e3b1
commit
0a104a3435
|
@ -14,16 +14,23 @@ from .. import has_lchflags, llfuse
|
||||||
from .. import changedir, no_selinux, same_ts_ns
|
from .. import changedir, no_selinux, same_ts_ns
|
||||||
from .. import are_symlinks_supported, are_hardlinks_supported, are_fifos_supported
|
from .. import are_symlinks_supported, are_hardlinks_supported, are_fifos_supported
|
||||||
from ..platform import fakeroot_detected
|
from ..platform import fakeroot_detected
|
||||||
from . import ArchiverTestCaseBase, ArchiverTestCaseBinaryBase, RemoteArchiverTestCaseBase, RK_ENCRYPTION, BORG_EXES
|
from . import RK_ENCRYPTION, cmd, assert_dirs_equal, create_regular_file, create_src_archive, open_archive
|
||||||
from . import src_file, requires_hardlinks
|
from . import src_file, requires_hardlinks, _extract_hardlinks_setup, fuse_mount, create_test_files
|
||||||
|
|
||||||
|
|
||||||
class ArchiverTestCase(ArchiverTestCaseBase):
|
def pytest_generate_tests(metafunc):
|
||||||
@requires_hardlinks
|
# Generate tests for different scenarios: local repository, remote repository, and using the borg binary.
|
||||||
@unittest.skipUnless(llfuse, "llfuse not installed")
|
if "archivers" in metafunc.fixturenames:
|
||||||
def test_fuse_mount_hardlinks(self):
|
metafunc.parametrize("archivers", ["archiver", "remote_archiver", "binary_archiver"])
|
||||||
self._extract_hardlinks_setup()
|
|
||||||
mountpoint = os.path.join(self.tmpdir, "mountpoint")
|
|
||||||
|
@requires_hardlinks
|
||||||
|
@unittest.skipUnless(llfuse, "llfuse not installed")
|
||||||
|
def test_fuse_mount_hardlinks(archivers, request):
|
||||||
|
archiver = request.getfixturevalue(archivers)
|
||||||
|
repo_location = archiver.repository_location
|
||||||
|
_extract_hardlinks_setup(archiver)
|
||||||
|
mountpoint = os.path.join(archiver.tmpdir, "mountpoint")
|
||||||
# we need to get rid of permissions checking because fakeroot causes issues with it.
|
# we need to get rid of permissions checking because fakeroot causes issues with it.
|
||||||
# On all platforms, borg defaults to "default_permissions" and we need to get rid of it via "ignore_permissions".
|
# On all platforms, borg defaults to "default_permissions" and we need to get rid of it via "ignore_permissions".
|
||||||
# On macOS (darwin), we additionally need "defer_permissions" to switch off the checks in osxfuse.
|
# On macOS (darwin), we additionally need "defer_permissions" to switch off the checks in osxfuse.
|
||||||
|
@ -31,23 +38,23 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
||||||
ignore_perms = ["-o", "ignore_permissions,defer_permissions"]
|
ignore_perms = ["-o", "ignore_permissions,defer_permissions"]
|
||||||
else:
|
else:
|
||||||
ignore_perms = ["-o", "ignore_permissions"]
|
ignore_perms = ["-o", "ignore_permissions"]
|
||||||
with self.fuse_mount(
|
with fuse_mount(repo_location, mountpoint, "-a", "test", "--strip-components=2", *ignore_perms), changedir(
|
||||||
self.repository_location, mountpoint, "-a", "test", "--strip-components=2", *ignore_perms
|
os.path.join(mountpoint, "test")
|
||||||
), changedir(os.path.join(mountpoint, "test")):
|
):
|
||||||
assert os.stat("hardlink").st_nlink == 2
|
assert os.stat("hardlink").st_nlink == 2
|
||||||
assert os.stat("subdir/hardlink").st_nlink == 2
|
assert os.stat("subdir/hardlink").st_nlink == 2
|
||||||
assert open("subdir/hardlink", "rb").read() == b"123456"
|
assert open("subdir/hardlink", "rb").read() == b"123456"
|
||||||
assert os.stat("aaaa").st_nlink == 2
|
assert os.stat("aaaa").st_nlink == 2
|
||||||
assert os.stat("source2").st_nlink == 2
|
assert os.stat("source2").st_nlink == 2
|
||||||
with self.fuse_mount(
|
with fuse_mount(repo_location, mountpoint, "input/dir1", "-a", "test", *ignore_perms), changedir(
|
||||||
self.repository_location, mountpoint, "input/dir1", "-a", "test", *ignore_perms
|
os.path.join(mountpoint, "test")
|
||||||
), changedir(os.path.join(mountpoint, "test")):
|
):
|
||||||
assert os.stat("input/dir1/hardlink").st_nlink == 2
|
assert os.stat("input/dir1/hardlink").st_nlink == 2
|
||||||
assert os.stat("input/dir1/subdir/hardlink").st_nlink == 2
|
assert os.stat("input/dir1/subdir/hardlink").st_nlink == 2
|
||||||
assert open("input/dir1/subdir/hardlink", "rb").read() == b"123456"
|
assert open("input/dir1/subdir/hardlink", "rb").read() == b"123456"
|
||||||
assert os.stat("input/dir1/aaaa").st_nlink == 2
|
assert os.stat("input/dir1/aaaa").st_nlink == 2
|
||||||
assert os.stat("input/dir1/source2").st_nlink == 2
|
assert os.stat("input/dir1/source2").st_nlink == 2
|
||||||
with self.fuse_mount(self.repository_location, mountpoint, "-a", "test", *ignore_perms), changedir(
|
with fuse_mount(repo_location, mountpoint, "-a", "test", *ignore_perms), changedir(
|
||||||
os.path.join(mountpoint, "test")
|
os.path.join(mountpoint, "test")
|
||||||
):
|
):
|
||||||
assert os.stat("input/source").st_nlink == 4
|
assert os.stat("input/source").st_nlink == 4
|
||||||
|
@ -56,8 +63,14 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
||||||
assert os.stat("input/dir1/subdir/hardlink").st_nlink == 4
|
assert os.stat("input/dir1/subdir/hardlink").st_nlink == 4
|
||||||
assert open("input/dir1/subdir/hardlink", "rb").read() == b"123456"
|
assert open("input/dir1/subdir/hardlink", "rb").read() == b"123456"
|
||||||
|
|
||||||
@unittest.skipUnless(llfuse, "llfuse not installed")
|
|
||||||
def test_fuse(self):
|
@unittest.skipUnless(llfuse, "llfuse not installed")
|
||||||
|
def test_fuse(archivers, request):
|
||||||
|
archiver = request.getfixturevalue(archivers)
|
||||||
|
if archiver.EXE and fakeroot_detected():
|
||||||
|
pytest.skip("test_fuse with the binary is not compatible with fakeroot")
|
||||||
|
repo_location, input_path = archiver.repository_location, archiver.input_path
|
||||||
|
|
||||||
def has_noatime(some_file):
|
def has_noatime(some_file):
|
||||||
atime_before = os.stat(some_file).st_atime_ns
|
atime_before = os.stat(some_file).st_atime_ns
|
||||||
try:
|
try:
|
||||||
|
@ -69,28 +82,28 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
||||||
noatime_used = flags_noatime != flags_normal
|
noatime_used = flags_noatime != flags_normal
|
||||||
return noatime_used and atime_before == atime_after
|
return noatime_used and atime_before == atime_after
|
||||||
|
|
||||||
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
|
||||||
self.create_test_files()
|
create_test_files(input_path)
|
||||||
have_noatime = has_noatime("input/file1")
|
have_noatime = has_noatime("input/file1")
|
||||||
self.cmd(f"--repo={self.repository_location}", "create", "--exclude-nodump", "--atime", "archive", "input")
|
cmd(archiver, f"--repo={repo_location}", "create", "--exclude-nodump", "--atime", "archive", "input")
|
||||||
self.cmd(f"--repo={self.repository_location}", "create", "--exclude-nodump", "--atime", "archive2", "input")
|
cmd(archiver, f"--repo={repo_location}", "create", "--exclude-nodump", "--atime", "archive2", "input")
|
||||||
if has_lchflags:
|
if has_lchflags:
|
||||||
# remove the file we did not backup, so input and output become equal
|
# remove the file that we did not back up, so input and output become equal
|
||||||
os.remove(os.path.join("input", "flagfile"))
|
os.remove(os.path.join("input", "flagfile"))
|
||||||
mountpoint = os.path.join(self.tmpdir, "mountpoint")
|
mountpoint = os.path.join(archiver.tmpdir, "mountpoint")
|
||||||
# mount the whole repository, archive contents shall show up in archivename subdirs of mountpoint:
|
# mount the whole repository, archive contents shall show up in archivename subdirectories of mountpoint:
|
||||||
with self.fuse_mount(self.repository_location, mountpoint):
|
with fuse_mount(repo_location, mountpoint):
|
||||||
# flags are not supported by the FUSE mount
|
# flags are not supported by the FUSE mount
|
||||||
# we also ignore xattrs here, they are tested separately
|
# we also ignore xattrs here, they are tested separately
|
||||||
self.assert_dirs_equal(
|
assert_dirs_equal(
|
||||||
self.input_path, os.path.join(mountpoint, "archive", "input"), ignore_flags=True, ignore_xattrs=True
|
input_path, os.path.join(mountpoint, "archive", "input"), ignore_flags=True, ignore_xattrs=True
|
||||||
)
|
)
|
||||||
self.assert_dirs_equal(
|
assert_dirs_equal(
|
||||||
self.input_path, os.path.join(mountpoint, "archive2", "input"), ignore_flags=True, ignore_xattrs=True
|
input_path, os.path.join(mountpoint, "archive2", "input"), ignore_flags=True, ignore_xattrs=True
|
||||||
)
|
)
|
||||||
with self.fuse_mount(self.repository_location, mountpoint, "-a", "archive"):
|
with fuse_mount(repo_location, mountpoint, "-a", "archive"):
|
||||||
self.assert_dirs_equal(
|
assert_dirs_equal(
|
||||||
self.input_path, os.path.join(mountpoint, "archive", "input"), ignore_flags=True, ignore_xattrs=True
|
input_path, os.path.join(mountpoint, "archive", "input"), ignore_flags=True, ignore_xattrs=True
|
||||||
)
|
)
|
||||||
# regular file
|
# regular file
|
||||||
in_fn = "input/file1"
|
in_fn = "input/file1"
|
||||||
|
@ -140,7 +153,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
||||||
try:
|
try:
|
||||||
in_fn = "input/fusexattr"
|
in_fn = "input/fusexattr"
|
||||||
out_fn = os.fsencode(os.path.join(mountpoint, "archive", "input", "fusexattr"))
|
out_fn = os.fsencode(os.path.join(mountpoint, "archive", "input", "fusexattr"))
|
||||||
if not xattr.XATTR_FAKEROOT and xattr.is_enabled(self.input_path):
|
if not xattr.XATTR_FAKEROOT and xattr.is_enabled(input_path):
|
||||||
assert sorted(no_selinux(xattr.listxattr(out_fn))) == [b"user.empty", b"user.foo"]
|
assert sorted(no_selinux(xattr.listxattr(out_fn))) == [b"user.empty", b"user.foo"]
|
||||||
assert xattr.getxattr(out_fn, b"user.foo") == b"bar"
|
assert xattr.getxattr(out_fn, b"user.foo") == b"bar"
|
||||||
assert xattr.getxattr(out_fn, b"user.empty") == b""
|
assert xattr.getxattr(out_fn, b"user.empty") == b""
|
||||||
|
@ -159,20 +172,23 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
@unittest.skipUnless(llfuse, "llfuse not installed")
|
|
||||||
def test_fuse_versions_view(self):
|
@unittest.skipUnless(llfuse, "llfuse not installed")
|
||||||
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
def test_fuse_versions_view(archivers, request):
|
||||||
self.create_regular_file("test", contents=b"first")
|
archiver = request.getfixturevalue(archivers)
|
||||||
|
repo_location, input_path = archiver.repository_location, archiver.input_path
|
||||||
|
cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
|
||||||
|
create_regular_file(input_path, "test", contents=b"first")
|
||||||
if are_hardlinks_supported():
|
if are_hardlinks_supported():
|
||||||
self.create_regular_file("hardlink1", contents=b"123456")
|
create_regular_file(input_path, "hardlink1", contents=b"123456")
|
||||||
os.link("input/hardlink1", "input/hardlink2")
|
os.link("input/hardlink1", "input/hardlink2")
|
||||||
os.link("input/hardlink1", "input/hardlink3")
|
os.link("input/hardlink1", "input/hardlink3")
|
||||||
self.cmd(f"--repo={self.repository_location}", "create", "archive1", "input")
|
cmd(archiver, f"--repo={repo_location}", "create", "archive1", "input")
|
||||||
self.create_regular_file("test", contents=b"second")
|
create_regular_file(input_path, "test", contents=b"second")
|
||||||
self.cmd(f"--repo={self.repository_location}", "create", "archive2", "input")
|
cmd(archiver, f"--repo={repo_location}", "create", "archive2", "input")
|
||||||
mountpoint = os.path.join(self.tmpdir, "mountpoint")
|
mountpoint = os.path.join(archiver.tmpdir, "mountpoint")
|
||||||
# mount the whole repository, archive contents shall show up in versioned view:
|
# mount the whole repository, archive contents shall show up in versioned view:
|
||||||
with self.fuse_mount(self.repository_location, mountpoint, "-o", "versions"):
|
with fuse_mount(repo_location, mountpoint, "-o", "versions"):
|
||||||
path = os.path.join(mountpoint, "input", "test") # filename shows up as directory ...
|
path = os.path.join(mountpoint, "input", "test") # filename shows up as directory ...
|
||||||
files = os.listdir(path)
|
files = os.listdir(path)
|
||||||
assert all(f.startswith("test.") for f in files) # ... with files test.xxxxx in there
|
assert all(f.startswith("test.") for f in files) # ... with files test.xxxxx in there
|
||||||
|
@ -184,19 +200,22 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
||||||
assert os.stat(hl1).st_ino == os.stat(hl2).st_ino == os.stat(hl3).st_ino
|
assert os.stat(hl1).st_ino == os.stat(hl2).st_ino == os.stat(hl3).st_ino
|
||||||
assert open(hl3, "rb").read() == b"123456"
|
assert open(hl3, "rb").read() == b"123456"
|
||||||
# similar again, but exclude the 1st hardlink:
|
# similar again, but exclude the 1st hardlink:
|
||||||
with self.fuse_mount(self.repository_location, mountpoint, "-o", "versions", "-e", "input/hardlink1"):
|
with fuse_mount(repo_location, mountpoint, "-o", "versions", "-e", "input/hardlink1"):
|
||||||
if are_hardlinks_supported():
|
if are_hardlinks_supported():
|
||||||
hl2 = os.path.join(mountpoint, "input", "hardlink2", "hardlink2.00001")
|
hl2 = os.path.join(mountpoint, "input", "hardlink2", "hardlink2.00001")
|
||||||
hl3 = os.path.join(mountpoint, "input", "hardlink3", "hardlink3.00001")
|
hl3 = os.path.join(mountpoint, "input", "hardlink3", "hardlink3.00001")
|
||||||
assert os.stat(hl2).st_ino == os.stat(hl3).st_ino
|
assert os.stat(hl2).st_ino == os.stat(hl3).st_ino
|
||||||
assert open(hl3, "rb").read() == b"123456"
|
assert open(hl3, "rb").read() == b"123456"
|
||||||
|
|
||||||
@unittest.skipUnless(llfuse, "llfuse not installed")
|
|
||||||
def test_fuse_allow_damaged_files(self):
|
@unittest.skipUnless(llfuse, "llfuse not installed")
|
||||||
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
def test_fuse_allow_damaged_files(archivers, request):
|
||||||
self.create_src_archive("archive")
|
archiver = request.getfixturevalue(archivers)
|
||||||
|
repo_location, repo_path = archiver.repository_location, archiver.repository_path
|
||||||
|
cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
|
||||||
|
create_src_archive(archiver, "archive")
|
||||||
# Get rid of a chunk and repair it
|
# Get rid of a chunk and repair it
|
||||||
archive, repository = self.open_archive("archive")
|
archive, repository = open_archive(repo_path, "archive")
|
||||||
with repository:
|
with repository:
|
||||||
for item in archive.iter_items():
|
for item in archive.iter_items():
|
||||||
if item.path.endswith(src_file):
|
if item.path.endswith(src_file):
|
||||||
|
@ -206,40 +225,48 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
||||||
else:
|
else:
|
||||||
assert False # missed the file
|
assert False # missed the file
|
||||||
repository.commit(compact=False)
|
repository.commit(compact=False)
|
||||||
self.cmd(f"--repo={self.repository_location}", "check", "--repair", exit_code=0)
|
cmd(archiver, f"--repo={repo_location}", "check", "--repair", exit_code=0)
|
||||||
|
|
||||||
mountpoint = os.path.join(self.tmpdir, "mountpoint")
|
mountpoint = os.path.join(archiver.tmpdir, "mountpoint")
|
||||||
with self.fuse_mount(self.repository_location, mountpoint, "-a", "archive"):
|
with fuse_mount(repo_location, mountpoint, "-a", "archive"):
|
||||||
with pytest.raises(OSError) as excinfo:
|
with pytest.raises(OSError) as excinfo:
|
||||||
open(os.path.join(mountpoint, "archive", path))
|
open(os.path.join(mountpoint, "archive", path))
|
||||||
assert excinfo.value.errno == errno.EIO
|
assert excinfo.value.errno == errno.EIO
|
||||||
with self.fuse_mount(self.repository_location, mountpoint, "-a", "archive", "-o", "allow_damaged_files"):
|
with fuse_mount(repo_location, mountpoint, "-a", "archive", "-o", "allow_damaged_files"):
|
||||||
open(os.path.join(mountpoint, "archive", path)).close()
|
open(os.path.join(mountpoint, "archive", path)).close()
|
||||||
|
|
||||||
@unittest.skipUnless(llfuse, "llfuse not installed")
|
|
||||||
def test_fuse_mount_options(self):
|
|
||||||
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
|
|
||||||
self.create_src_archive("arch11")
|
|
||||||
self.create_src_archive("arch12")
|
|
||||||
self.create_src_archive("arch21")
|
|
||||||
self.create_src_archive("arch22")
|
|
||||||
|
|
||||||
mountpoint = os.path.join(self.tmpdir, "mountpoint")
|
@unittest.skipUnless(llfuse, "llfuse not installed")
|
||||||
with self.fuse_mount(self.repository_location, mountpoint, "--first=2", "--sort=name"):
|
def test_fuse_mount_options(archivers, request):
|
||||||
|
archiver = request.getfixturevalue(archivers)
|
||||||
|
repo_location = archiver.repository_location
|
||||||
|
cmd(archiver, f"--repo={repo_location}", "rcreate", RK_ENCRYPTION)
|
||||||
|
create_src_archive(archiver, "arch11")
|
||||||
|
create_src_archive(archiver, "arch12")
|
||||||
|
create_src_archive(archiver, "arch21")
|
||||||
|
create_src_archive(archiver, "arch22")
|
||||||
|
|
||||||
|
mountpoint = os.path.join(archiver.tmpdir, "mountpoint")
|
||||||
|
with fuse_mount(repo_location, mountpoint, "--first=2", "--sort=name"):
|
||||||
assert sorted(os.listdir(os.path.join(mountpoint))) == ["arch11", "arch12"]
|
assert sorted(os.listdir(os.path.join(mountpoint))) == ["arch11", "arch12"]
|
||||||
with self.fuse_mount(self.repository_location, mountpoint, "--last=2", "--sort=name"):
|
with fuse_mount(repo_location, mountpoint, "--last=2", "--sort=name"):
|
||||||
assert sorted(os.listdir(os.path.join(mountpoint))) == ["arch21", "arch22"]
|
assert sorted(os.listdir(os.path.join(mountpoint))) == ["arch21", "arch22"]
|
||||||
with self.fuse_mount(self.repository_location, mountpoint, "--match-archives=sh:arch1*"):
|
with fuse_mount(repo_location, mountpoint, "--match-archives=sh:arch1*"):
|
||||||
assert sorted(os.listdir(os.path.join(mountpoint))) == ["arch11", "arch12"]
|
assert sorted(os.listdir(os.path.join(mountpoint))) == ["arch11", "arch12"]
|
||||||
with self.fuse_mount(self.repository_location, mountpoint, "--match-archives=sh:arch2*"):
|
with fuse_mount(repo_location, mountpoint, "--match-archives=sh:arch2*"):
|
||||||
assert sorted(os.listdir(os.path.join(mountpoint))) == ["arch21", "arch22"]
|
assert sorted(os.listdir(os.path.join(mountpoint))) == ["arch21", "arch22"]
|
||||||
with self.fuse_mount(self.repository_location, mountpoint, "--match-archives=sh:arch*"):
|
with fuse_mount(repo_location, mountpoint, "--match-archives=sh:arch*"):
|
||||||
assert sorted(os.listdir(os.path.join(mountpoint))) == ["arch11", "arch12", "arch21", "arch22"]
|
assert sorted(os.listdir(os.path.join(mountpoint))) == ["arch11", "arch12", "arch21", "arch22"]
|
||||||
with self.fuse_mount(self.repository_location, mountpoint, "--match-archives=nope"):
|
with fuse_mount(repo_location, mountpoint, "--match-archives=nope"):
|
||||||
assert sorted(os.listdir(os.path.join(mountpoint))) == []
|
assert sorted(os.listdir(os.path.join(mountpoint))) == []
|
||||||
|
|
||||||
@unittest.skipUnless(llfuse, "llfuse not installed")
|
|
||||||
def test_migrate_lock_alive(self):
|
@unittest.skipUnless(llfuse, "llfuse not installed")
|
||||||
|
def test_migrate_lock_alive(archivers, request):
|
||||||
|
archiver = request.getfixturevalue(archivers)
|
||||||
|
if archiver.prefix == "ssh://__testsuite__":
|
||||||
|
pytest.skip("only works locally")
|
||||||
|
repo_location = archiver.repository_location
|
||||||
"""Both old_id and new_id must not be stale during lock migration / daemonization."""
|
"""Both old_id and new_id must not be stale during lock migration / daemonization."""
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
import pickle
|
import pickle
|
||||||
|
@ -247,7 +274,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
||||||
|
|
||||||
# Check results are communicated from the borg mount background process
|
# Check results are communicated from the borg mount background process
|
||||||
# to the pytest process by means of a serialized dict object stored in this file.
|
# to the pytest process by means of a serialized dict object stored in this file.
|
||||||
assert_data_file = os.path.join(self.tmpdir, "migrate_lock_assert_data.pickle")
|
assert_data_file = os.path.join(archiver.tmpdir, "migrate_lock_assert_data.pickle")
|
||||||
|
|
||||||
# Decorates Lock.migrate_lock() with process_alive() checks before and after.
|
# Decorates Lock.migrate_lock() with process_alive() checks before and after.
|
||||||
# (We don't want to mix testing code into runtime.)
|
# (We don't want to mix testing code into runtime.)
|
||||||
|
@ -279,10 +306,7 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
||||||
assert_data["exception.extr_tb"] = traceback.extract_tb(e.__traceback__)
|
assert_data["exception.extr_tb"] = traceback.extract_tb(e.__traceback__)
|
||||||
finally:
|
finally:
|
||||||
assert_data["after"].update(
|
assert_data["after"].update(
|
||||||
{
|
{"old_id_alive": platform.process_alive(*old_id), "new_id_alive": platform.process_alive(*new_id)}
|
||||||
"old_id_alive": platform.process_alive(*old_id),
|
|
||||||
"new_id_alive": platform.process_alive(*new_id),
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
with open(assert_data_file, "wb") as _out:
|
with open(assert_data_file, "wb") as _out:
|
||||||
|
@ -296,12 +320,12 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
||||||
# Decorate
|
# Decorate
|
||||||
Lock.migrate_lock = write_assert_data(Lock.migrate_lock)
|
Lock.migrate_lock = write_assert_data(Lock.migrate_lock)
|
||||||
try:
|
try:
|
||||||
self.cmd(f"--repo={self.repository_location}", "rcreate", "--encryption=none")
|
cmd(archiver, f"--repo={repo_location}", "rcreate", "--encryption=none")
|
||||||
self.create_src_archive("arch")
|
create_src_archive(archiver, "arch")
|
||||||
mountpoint = os.path.join(self.tmpdir, "mountpoint")
|
mountpoint = os.path.join(archiver.tmpdir, "mountpoint")
|
||||||
# In order that the decoration is kept for the borg mount process, we must not spawn, but actually fork;
|
# In order that the decoration is kept for the borg mount process, we must not spawn, but actually fork;
|
||||||
# not to be confused with the forking in borg.helpers.daemonize() which is done as well.
|
# not to be confused with the forking in borg.helpers.daemonize() which is done as well.
|
||||||
with self.fuse_mount(self.repository_location, mountpoint, os_fork=True):
|
with fuse_mount(repo_location, mountpoint, os_fork=True):
|
||||||
pass
|
pass
|
||||||
with open(assert_data_file, "rb") as _in:
|
with open(assert_data_file, "rb") as _in:
|
||||||
assert_data = pickle.load(_in)
|
assert_data = pickle.load(_in)
|
||||||
|
@ -341,20 +365,3 @@ class ArchiverTestCase(ArchiverTestCaseBase):
|
||||||
finally:
|
finally:
|
||||||
# Undecorate
|
# Undecorate
|
||||||
Lock.migrate_lock = Lock.migrate_lock.__wrapped__
|
Lock.migrate_lock = Lock.migrate_lock.__wrapped__
|
||||||
|
|
||||||
|
|
||||||
class RemoteArchiverTestCase(RemoteArchiverTestCaseBase, ArchiverTestCase):
|
|
||||||
"""run the same tests, but with a remote repository"""
|
|
||||||
|
|
||||||
@unittest.skip("only works locally")
|
|
||||||
def test_migrate_lock_alive(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
@unittest.skipUnless("binary" in BORG_EXES, "no borg.exe available")
|
|
||||||
class ArchiverTestCaseBinary(ArchiverTestCaseBinaryBase, ArchiverTestCase):
|
|
||||||
def test_fuse(self):
|
|
||||||
if fakeroot_detected():
|
|
||||||
unittest.skip("test_fuse with the binary is not compatible with fakeroot")
|
|
||||||
else:
|
|
||||||
super().test_fuse()
|
|
||||||
|
|
Loading…
Reference in New Issue