diff --git a/darc/crypto.py b/darc/crypto.py index e5e6e69fe..1235d1556 100644 --- a/darc/crypto.py +++ b/darc/crypto.py @@ -1,10 +1,11 @@ +import sys from ctypes import cdll, c_char_p, c_int, c_uint, c_void_p, POINTER, create_string_buffer from ctypes.util import find_library import struct libcrypto = cdll.LoadLibrary(find_library('crypto')) # Default libcrypto on OS X is too old, try the brew version -if not hasattr(libcrypto, 'PKCS5_PBKDF2_HMAC'): +if not hasattr(libcrypto, 'PKCS5_PBKDF2_HMAC') and sys.platform == 'darwin': libcrypto = cdll.LoadLibrary('/usr/local/opt/openssl/lib/libcrypto.dylib') libcrypto.PKCS5_PBKDF2_HMAC.argtypes = (c_char_p, c_int, c_char_p, c_int, c_int, c_void_p, c_int, c_char_p) libcrypto.EVP_sha256.restype = c_void_p diff --git a/darc/testsuite/xattr.py b/darc/testsuite/xattr.py index 99e844b9c..37c10f277 100644 --- a/darc/testsuite/xattr.py +++ b/darc/testsuite/xattr.py @@ -6,28 +6,36 @@ from darc.xattr import lsetxattr, llistxattr, lgetxattr, get_all, set, flistxatt class XattrTestCase(DarcTestCase): + def setUp(self): + self.tmpfile = tempfile.NamedTemporaryFile(dir=os.getcwd()) + self.symlink = os.path.join(os.path.dirname(self.tmpfile.name), 'symlink') + os.symlink(self.tmpfile.name, self.symlink) + + def tearDown(self): + os.unlink(self.symlink) + def test_low_level(self): - with tempfile.NamedTemporaryFile(dir=os.getcwd()) as fd: - self.assert_equal(llistxattr(fd.name), []) - lsetxattr(fd.name, b'user.foo', b'bar') - self.assert_equal(llistxattr(fd.name), [b'user.foo']) - self.assert_equal(lgetxattr(fd.name, b'user.foo'), b'bar') + self.assert_equal(llistxattr(self.tmpfile.name), []) + self.assert_equal(llistxattr(self.symlink), []) + lsetxattr(self.tmpfile.name, b'user.foo', b'bar') + self.assert_equal(llistxattr(self.tmpfile.name), [b'user.foo']) + self.assert_equal(lgetxattr(self.tmpfile.name, b'user.foo'), b'bar') + self.assert_equal(llistxattr(self.symlink), []) def test_low_level_fileno(self): - with tempfile.NamedTemporaryFile(dir=os.getcwd()) as fd: - self.assert_equal(flistxattr(fd.fileno()), []) - fsetxattr(fd.fileno(), b'user.foo', b'bar') - self.assert_equal(flistxattr(fd.fileno()), [b'user.foo']) - self.assert_equal(fgetxattr(fd.fileno(), b'user.foo'), b'bar') + self.assert_equal(flistxattr(self.tmpfile.fileno()), []) + fsetxattr(self.tmpfile.fileno(), b'user.foo', b'bar') + self.assert_equal(flistxattr(self.tmpfile.fileno()), [b'user.foo']) + self.assert_equal(fgetxattr(self.tmpfile.fileno(), b'user.foo'), b'bar') def test_high_level(self): - with tempfile.NamedTemporaryFile(dir=os.getcwd()) as fd: - self.assert_equal(get_all(fd.name), {}) - set(fd.name, b'foo', b'bar') - self.assert_equal(get_all(fd.name), {b'foo': b'bar'}) + self.assert_equal(get_all(self.tmpfile.name), {}) + self.assert_equal(get_all(self.symlink), {}) + set(self.tmpfile.name, b'foo', b'bar') + self.assert_equal(get_all(self.tmpfile.name), {b'foo': b'bar'}) + self.assert_equal(get_all(self.symlink), {}) def test_high_level_fileno(self): - with tempfile.NamedTemporaryFile(dir=os.getcwd()) as fd: - self.assert_equal(get_all(fd.fileno()), {}) - set(fd.fileno(), b'foo', b'bar') - self.assert_equal(get_all(fd.fileno()), {b'foo': b'bar'}) + self.assert_equal(get_all(self.tmpfile.fileno()), {}) + set(self.tmpfile.fileno(), b'foo', b'bar') + self.assert_equal(get_all(self.tmpfile.fileno()), {b'foo': b'bar'}) diff --git a/darc/xattr.py b/darc/xattr.py index a047838a6..50a89a5cb 100644 --- a/darc/xattr.py +++ b/darc/xattr.py @@ -7,13 +7,14 @@ from ctypes.util import find_library libc = CDLL(find_library('c'), use_errno=True) + def _check(rv, path=None): if rv < 0: raise OSError(get_errno(), path) return rv -if sys.platform == 'linux': +if sys.platform.startswith('linux'): libc.llistxattr.argtypes = (c_char_p, c_char_p, c_size_t) libc.llistxattr.restype = c_ssize_t libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t) @@ -27,23 +28,22 @@ if sys.platform == 'linux': libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t) libc.fgetxattr.restype = c_ssize_t - def set(path_or_fd, name, value): if isinstance(path_or_fd, int): fsetxattr(path_or_fd, b'user.' + name, value) else: lsetxattr(path_or_fd, b'user.' + name, value) - def get_all(path_or_fd): """Return a dictionary with all (user) xattrs for "path_or_fd" + + Symbolic links are not followed """ if isinstance(path_or_fd, int): return dict((name[5:], fgetxattr(path_or_fd, name)) for name in flistxattr(path_or_fd) if name.startswith(b'user.')) else: return dict((name[5:], lgetxattr(path_or_fd, name)) for name in llistxattr(path_or_fd) if name.startswith(b'user.')) - def llistxattr(path): path = os.fsencode(path) n = _check(libc.llistxattr(path, None, 0), path) @@ -55,7 +55,6 @@ if sys.platform == 'linux': raise Exception('llistxattr failed') return namebuf.raw.split(b'\0')[:-1] - def flistxattr(fd): n = _check(libc.flistxattr(fd, None, 0)) if n == 0: @@ -66,15 +65,12 @@ if sys.platform == 'linux': raise Exception('flistxattr failed') return namebuf.raw.split(b'\0')[:-1] - def lsetxattr(path, name, value, flags=0): _check(libc.lsetxattr(os.fsencode(path), name, value, len(value), flags), path) - def fsetxattr(fd, name, value, flags=0): _check(libc.fsetxattr(fd, name, value, len(value), flags)) - def lgetxattr(path, name): path = os.fsencode(path) n = _check(libc.lgetxattr(path, name, None, 0)) @@ -86,7 +82,6 @@ if sys.platform == 'linux': raise Exception('lgetxattr failed') return valuebuf.raw - def fgetxattr(fd, name): n = _check(libc.fgetxattr(fd, name, None, 0)) if n == 0: @@ -95,6 +90,7 @@ if sys.platform == 'linux': n2 = _check(libc.fgetxattr(fd, name, valuebuf, n)) if n2 != n: raise Exception('fgetxattr failed') + return valuebuf.raw elif sys.platform == 'darwin': libc.listxattr.argtypes = (c_char_p, c_char_p, c_size_t, c_int) @@ -118,16 +114,16 @@ elif sys.platform == 'darwin': else: lsetxattr(path_or_fd, name, value) - def get_all(path_or_fd): """Return a dictionary with all (user) xattrs for "path_or_fd" + + Symbolic links are not followed """ if isinstance(path_or_fd, int): return dict((name, fgetxattr(path_or_fd, name)) for name in flistxattr(path_or_fd)) else: return dict((name, lgetxattr(path_or_fd, name)) for name in llistxattr(path_or_fd)) - def llistxattr(path): path = os.fsencode(path) n = _check(libc.listxattr(path, None, 0, XATTR_NOFOLLOW), path) @@ -139,7 +135,6 @@ elif sys.platform == 'darwin': raise Exception('llistxattr failed') return namebuf.raw.split(b'\0')[:-1] - def flistxattr(fd): n = _check(libc.flistxattr(fd, None, 0, 0)) if n == 0: @@ -150,14 +145,11 @@ elif sys.platform == 'darwin': raise Exception('flistxattr failed') return namebuf.raw.split(b'\0')[:-1] - def lsetxattr(path, name, value, flags=XATTR_NOFOLLOW): - rv = _check(libc.setxattr(os.fsencode(path), name, value, len(value), 0, flags), path) - + _check(libc.setxattr(os.fsencode(path), name, value, len(value), 0, flags), path) def fsetxattr(fd, name, value, flags=0): - rv = _check(libc.fsetxattr(fd, name, value, len(value), 0, flags)) - + _check(libc.fsetxattr(fd, name, value, len(value), 0, flags)) def lgetxattr(path, name): path = os.fsencode(path) @@ -170,7 +162,6 @@ elif sys.platform == 'darwin': raise Exception('getxattr failed') return valuebuf.raw - def fgetxattr(fd, name): n = _check(libc.fgetxattr(fd, name, None, 0, 0, 0)) if n == 0: @@ -182,4 +173,4 @@ elif sys.platform == 'darwin': return valuebuf.raw else: - raise Exception('Unsupported platform') + raise Exception('Unsupported platform: %s' % sys.platform)