Added initial fuse test case

This commit is contained in:
Jonas Borgström 2013-07-25 22:54:19 +02:00
parent 023da55875
commit 0b19df3cea
3 changed files with 59 additions and 7 deletions

View File

@ -4,7 +4,8 @@ python:
- "3.3"
# command to install dependencies
install:
- "pip install --use-mirrors llfuse"
- "pip install --use-mirrors Cython"
- "pip install -e ."
# command to run tests
script: fakeroot python -m attic.testsuite.run -vb
script: fakeroot -u python -m attic.testsuite.run -vb

View File

@ -76,7 +76,7 @@ class AtticOperations(llfuse.Operations):
entry.st_mode = item[b'mode']
entry.st_nlink = item.get(b'nlink', 1)
entry.st_uid = item[b'uid']
entry.st_gid = item[b'uid']
entry.st_gid = item[b'gid']
entry.st_rdev = item.get(b'rdev', 0)
entry.st_size = size
entry.st_blksize = 512

View File

@ -2,19 +2,29 @@ import filecmp
import os
from io import StringIO
import stat
import subprocess
import sys
import shutil
import tempfile
import time
import unittest
from attic import xattr
from attic.archiver import Archiver
from attic.repository import Repository
from attic.testsuite import AtticTestCase
try:
import llfuse
has_llfuse = True
except ImportError:
has_llfuse = False
has_mtime_ns = sys.version >= '3.3'
utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
src_dir = os.path.join(os.getcwd(), os.path.dirname(__file__), '..', '..')
class changedir:
def __init__(self, dir):
self.dir = dir
@ -53,8 +63,21 @@ class ArchiverTestCase(AtticTestCase):
shutil.rmtree(self.tmpdir)
os.chdir(self._old_wd)
def attic(self, *args, **kwargs):
exit_code = kwargs.get('exit_code', 0)
def attic(self, *args, **kw):
exit_code = kw.get('exit_code', 0)
fork = kw.get('fork', False)
if fork:
try:
output = subprocess.check_output((sys.executable, '-m', 'attic.archiver') + args)
ret = 0
except subprocess.CalledProcessError as e:
output = e.output
ret = e.returncode
output = os.fsdecode(output)
if ret != exit_code:
print(output)
self.assert_equal(exit_code, ret)
return output
args = list(args)
try:
stdout, stderr = sys.stdout, sys.stderr
@ -86,19 +109,24 @@ class ArchiverTestCase(AtticTestCase):
except EnvironmentError:
return {}
def diff_dirs(self, dir1, dir2):
def diff_dirs(self, dir1, dir2, fuse=False):
diff = filecmp.dircmp(dir1, dir2)
self.assert_equal(diff.left_only, [])
self.assert_equal(diff.right_only, [])
self.assert_equal(diff.diff_files, [])
self.assert_equal(diff.funny_files, [])
for filename in diff.common:
path1 = os.path.join(dir1, filename)
path2 = os.path.join(dir2, filename)
s1 = os.lstat(path1)
s2 = os.lstat(path2)
attrs = ['st_mode', 'st_uid', 'st_gid', 'st_rdev']
if not fuse or not os.path.isdir(path1):
# dir nlink is always 1 on our fuse fileystem
attrs.append('st_nlink')
if not os.path.islink(path1) or utime_supports_fd:
attrs.append('st_mtime_ns' if has_mtime_ns else 'st_mtime')
# Fuse api is does not support ns precision
attrs.append('st_mtime_ns' if has_mtime_ns and not fuse else 'st_mtime')
d1 = [filename] + [getattr(s1, a) for a in attrs]
d2 = [filename] + [getattr(s2, a) for a in attrs]
# 'st_mtime precision is limited'
@ -109,7 +137,9 @@ class ArchiverTestCase(AtticTestCase):
d2.append(self.get_xattrs(path2))
self.assert_equal(d1, d2)
def test_basic_functionality(self):
def create_test_files(self):
"""Create a minimal test case including all supported file types
"""
# File
self.create_regual_file('file1', size=1024 * 80)
# Directory
@ -132,6 +162,9 @@ class ArchiverTestCase(AtticTestCase):
os.symlink('somewhere', os.path.join(self.input_path, 'link1'))
# FIFO node
os.mkfifo(os.path.join(self.input_path, 'fifo1'))
def test_basic_functionality(self):
self.create_test_files()
self.attic('init', self.repository_location)
self.attic('create', self.repository_location + '::test', 'input')
self.attic('create', self.repository_location + '::test.2', 'input')
@ -218,6 +251,24 @@ class ArchiverTestCase(AtticTestCase):
self.assert_raises(SystemExit, lambda: self.attic())
self.assert_raises(SystemExit, lambda: self.attic('-h'))
@unittest.skipUnless(has_llfuse, 'llfuse not installed')
def test_mount(self):
mountpoint = os.path.join(self.tmpdir, 'mountpoint')
os.mkdir(mountpoint)
self.attic('init', self.repository_location)
self.create_test_files()
self.attic('create', self.repository_location + '::archive', 'input')
try:
self.attic('mount', self.repository_location + '::archive', mountpoint, fork=True)
# Give fs some time to appear
time.sleep(.2)
self.diff_dirs(self.input_path, os.path.join(mountpoint, 'input'), fuse=True)
finally:
os.system('fusermount -u ' + mountpoint)
os.rmdir(mountpoint)
# Give the daemon some time to exit
time.sleep(.2)
class RemoteArchiverTestCase(ArchiverTestCase):
prefix = '__testsuite__:'