1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2025-02-23 06:31:58 +00:00

Merge pull request #1292 from ThomasWaldmann/more-fuse-tests

More fuse tests
This commit is contained in:
enkore 2016-07-08 22:19:48 +02:00 committed by GitHub
commit 9bfd233ce1
2 changed files with 70 additions and 37 deletions

View file

@ -93,6 +93,21 @@ def _assert_dirs_equal_cmp(self, diff):
for sub_diff in diff.subdirs.values():
self._assert_dirs_equal_cmp(sub_diff)
@contextmanager
def fuse_mount(self, location, mountpoint):
os.mkdir(mountpoint)
self.cmd('mount', location, mountpoint, fork=True)
self.wait_for_mount(mountpoint)
yield
if sys.platform.startswith('linux'):
cmd = 'fusermount -u %s' % mountpoint
else:
cmd = 'umount %s' % mountpoint
os.system(cmd)
os.rmdir(mountpoint)
# Give the daemon some time to exit
time.sleep(.2)
def wait_for_mount(self, path, timeout=5):
"""Wait until a filesystem is mounted on `path`
"""

View file

@ -974,46 +974,68 @@ def test_help(self):
assert 'This command initializes' not in self.cmd('help', 'init', '--usage-only')
@unittest.skipUnless(has_llfuse, 'llfuse not installed')
def test_fuse_mount_repository(self):
mountpoint = os.path.join(self.tmpdir, 'mountpoint')
os.mkdir(mountpoint)
def test_fuse(self):
self.cmd('init', self.repository_location)
self.create_test_files()
self.cmd('create', self.repository_location + '::archive', 'input')
self.cmd('create', self.repository_location + '::archive2', 'input')
try:
self.cmd('mount', self.repository_location, mountpoint, fork=True)
self.wait_for_mount(mountpoint)
mountpoint = os.path.join(self.tmpdir, 'mountpoint')
# mount the whole repository, archive contents shall show up in archivename subdirs of mountpoint:
with self.fuse_mount(self.repository_location, mountpoint):
self.assert_dirs_equal(self.input_path, os.path.join(mountpoint, 'archive', 'input'))
self.assert_dirs_equal(self.input_path, os.path.join(mountpoint, 'archive2', 'input'))
finally:
if sys.platform.startswith('linux'):
os.system('fusermount -u ' + mountpoint)
else:
os.system('umount ' + mountpoint)
os.rmdir(mountpoint)
# Give the daemon some time to exit
time.sleep(.2)
@unittest.skipUnless(has_llfuse, 'llfuse not installed')
def test_fuse_mount_archive(self):
mountpoint = os.path.join(self.tmpdir, 'mountpoint')
os.mkdir(mountpoint)
self.cmd('init', self.repository_location)
self.create_test_files()
self.cmd('create', self.repository_location + '::archive', 'input')
try:
self.cmd('mount', self.repository_location + '::archive', mountpoint, fork=True)
self.wait_for_mount(mountpoint)
# mount only 1 archive, its contents shall show up directly in mountpoint:
with self.fuse_mount(self.repository_location + '::archive', mountpoint):
self.assert_dirs_equal(self.input_path, os.path.join(mountpoint, 'input'))
finally:
if sys.platform.startswith('linux'):
os.system('fusermount -u ' + mountpoint)
# regular file
in_fn = 'input/file1'
out_fn = os.path.join(mountpoint, 'input', 'file1')
# stat
sti1 = os.stat(in_fn)
sto1 = os.stat(out_fn)
assert sti1.st_mode == sto1.st_mode
assert sti1.st_uid == sto1.st_uid
assert sti1.st_gid == sto1.st_gid
assert sti1.st_size == sto1.st_size
assert sti1.st_atime == sto1.st_atime
assert sti1.st_ctime == sto1.st_ctime
assert sti1.st_mtime == sto1.st_mtime
# note: there is another hardlink to this, see below
assert sti1.st_nlink == sto1.st_nlink == 2
# read
with open(in_fn, 'rb') as in_f, open(out_fn, 'rb') as out_f:
assert in_f.read() == out_f.read()
# list/read xattrs
if xattr.is_enabled(self.input_path):
assert xattr.listxattr(out_fn) == ['user.foo', ]
assert xattr.getxattr(out_fn, 'user.foo') == b'bar'
else:
os.system('umount ' + mountpoint)
os.rmdir(mountpoint)
# Give the daemon some time to exit
time.sleep(.2)
assert xattr.listxattr(out_fn) == []
try:
xattr.getxattr(out_fn, 'user.foo')
except OSError as e:
assert e.errno == llfuse.ENOATTR
else:
assert False, "expected OSError(ENOATTR), but no error was raised"
# hardlink (to 'input/file1')
in_fn = 'input/hardlink'
out_fn = os.path.join(mountpoint, 'input', 'hardlink')
sti2 = os.stat(in_fn)
sto2 = os.stat(out_fn)
assert sti2.st_nlink == sto2.st_nlink == 2
assert sto1.st_ino == sto2.st_ino
# symlink
in_fn = 'input/link1'
out_fn = os.path.join(mountpoint, 'input', 'link1')
sti = os.stat(in_fn, follow_symlinks=False)
sto = os.stat(out_fn, follow_symlinks=False)
assert stat.S_ISLNK(sti.st_mode)
assert stat.S_ISLNK(sto.st_mode)
assert os.readlink(in_fn) == os.readlink(out_fn)
# FIFO
out_fn = os.path.join(mountpoint, 'input', 'fifo1')
sto = os.stat(out_fn)
assert stat.S_ISFIFO(sto.st_mode)
def verify_aes_counter_uniqueness(self, method):
seen = set() # Chunks already seen
@ -1187,11 +1209,7 @@ def test_remote_repo_restrict_to_path(self):
# this was introduced because some tests expect stderr contents to show up
# in "output" also. Also, the non-forking exec_cmd catches both, too.
@unittest.skip('deadlock issues')
def test_fuse_mount_repository(self):
pass
@unittest.skip('deadlock issues')
def test_fuse_mount_archive(self):
def test_fuse(self):
pass
@unittest.skip('only works locally')