1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2024-12-23 00:07:38 +00:00

More xattr code cleanup

This commit is contained in:
Jonas Borgström 2013-06-27 22:10:15 +02:00
parent 4ff095c073
commit bf4b13d9dc
4 changed files with 37 additions and 45 deletions

View file

@ -17,16 +17,16 @@ def tearDown(self):
def test_low_level(self):
self.assert_equal(llistxattr(self.tmpfile.name), [])
self.assert_equal(llistxattr(self.symlink), [])
lsetxattr(self.tmpfile.name, b'user.foo', b'bar')
self.assert_equal(llistxattr(self.tmpfile.name), [b'user.foo'])
self.assert_equal(lgetxattr(self.tmpfile.name, b'user.foo'), b'bar')
lsetxattr(self.tmpfile.name, b'foo', b'bar')
self.assert_equal(llistxattr(self.tmpfile.name), [b'foo'])
self.assert_equal(lgetxattr(self.tmpfile.name, b'foo'), b'bar')
self.assert_equal(llistxattr(self.symlink), [])
def test_low_level_fileno(self):
self.assert_equal(flistxattr(self.tmpfile.fileno()), [])
fsetxattr(self.tmpfile.fileno(), b'user.foo', b'bar')
self.assert_equal(flistxattr(self.tmpfile.fileno()), [b'user.foo'])
self.assert_equal(fgetxattr(self.tmpfile.fileno(), b'user.foo'), b'bar')
fsetxattr(self.tmpfile.fileno(), b'foo', b'bar')
self.assert_equal(flistxattr(self.tmpfile.fileno()), [b'foo'])
self.assert_equal(fgetxattr(self.tmpfile.fileno(), b'foo'), b'bar')
def test_high_level(self):
self.assert_equal(get_all(self.tmpfile.name), {})

View file

@ -1,4 +1,6 @@
"""A basic extended attributes (xattr) implementation for Linux
"""A basic extended attributes (xattr) implementation for Linux and MacOS X
On Linux only the "user." namespace is accessed
"""
import os
import sys
@ -8,6 +10,24 @@
libc = CDLL(find_library('c'), use_errno=True)
def set(path_or_fd, name, value):
if isinstance(path_or_fd, int):
fsetxattr(path_or_fd, name, value)
else:
lsetxattr(path_or_fd, name, value)
def get_all(path_or_fd):
"""Return a dictionary with all (user) xattrs for "path_or_fd"
Symbolic links are not followed
"""
if isinstance(path_or_fd, int):
return dict((name, fgetxattr(path_or_fd, name)) for name in flistxattr(path_or_fd))
else:
return dict((name, lgetxattr(path_or_fd, name)) for name in llistxattr(path_or_fd))
def _check(rv, path=None):
if rv < 0:
raise OSError(get_errno(), path)
@ -28,22 +48,6 @@ def _check(rv, path=None):
libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t)
libc.fgetxattr.restype = c_ssize_t
def set(path_or_fd, name, value):
if isinstance(path_or_fd, int):
fsetxattr(path_or_fd, b'user.' + name, value)
else:
lsetxattr(path_or_fd, b'user.' + name, value)
def get_all(path_or_fd):
"""Return a dictionary with all (user) xattrs for "path_or_fd"
Symbolic links are not followed
"""
if isinstance(path_or_fd, int):
return dict((name[5:], fgetxattr(path_or_fd, name)) for name in flistxattr(path_or_fd) if name.startswith(b'user.'))
else:
return dict((name[5:], lgetxattr(path_or_fd, name)) for name in llistxattr(path_or_fd) if name.startswith(b'user.'))
def llistxattr(path):
path = os.fsencode(path)
n = _check(libc.llistxattr(path, None, 0), path)
@ -53,7 +57,7 @@ def llistxattr(path):
n2 = _check(libc.llistxattr(path, namebuf, n))
if n2 != n:
raise Exception('llistxattr failed')
return namebuf.raw.split(b'\0')[:-1]
return [name[5:] for name in namebuf.raw.split(b'\0')[:-1] if name.startswith(b'user.')]
def flistxattr(fd):
n = _check(libc.flistxattr(fd, None, 0))
@ -63,16 +67,17 @@ def flistxattr(fd):
n2 = _check(libc.flistxattr(fd, namebuf, n))
if n2 != n:
raise Exception('flistxattr failed')
return namebuf.raw.split(b'\0')[:-1]
return [name[5:] for name in namebuf.raw.split(b'\0')[:-1] if name.startswith(b'user.')]
def lsetxattr(path, name, value, flags=0):
_check(libc.lsetxattr(os.fsencode(path), name, value, len(value), flags), path)
_check(libc.lsetxattr(os.fsencode(path), b'user.' + name, value, len(value), flags), path)
def fsetxattr(fd, name, value, flags=0):
_check(libc.fsetxattr(fd, name, value, len(value), flags))
_check(libc.fsetxattr(fd, b'user.' + name, value, len(value), flags))
def lgetxattr(path, name):
path = os.fsencode(path)
name = b'user.' + name
n = _check(libc.lgetxattr(path, name, None, 0))
if n == 0:
return None
@ -83,6 +88,7 @@ def lgetxattr(path, name):
return valuebuf.raw
def fgetxattr(fd, name):
name = b'user.' + name
n = _check(libc.fgetxattr(fd, name, None, 0))
if n == 0:
return None
@ -108,22 +114,6 @@ def fgetxattr(fd, name):
XATTR_NOFOLLOW = 0x0001
def set(path_or_fd, name, value):
if isinstance(path_or_fd, int):
fsetxattr(path_or_fd, name, value)
else:
lsetxattr(path_or_fd, name, value)
def get_all(path_or_fd):
"""Return a dictionary with all (user) xattrs for "path_or_fd"
Symbolic links are not followed
"""
if isinstance(path_or_fd, int):
return dict((name, fgetxattr(path_or_fd, name)) for name in flistxattr(path_or_fd))
else:
return dict((name, lgetxattr(path_or_fd, name)) for name in llistxattr(path_or_fd))
def llistxattr(path):
path = os.fsencode(path)
n = _check(libc.listxattr(path, None, 0, XATTR_NOFOLLOW), path)

View file

@ -55,12 +55,14 @@ def __init__(self, *args, **kwargs):
url='http://github.com/jborg/darc/',
description='Deduplicating ARChiver written in Python',
license='BSD',
platforms=['Linux', 'MacOS X'],
classifiers=[
'Development Status :: 4 - Beta',
'Environment :: Console',
'Intended Audience :: System Administrators',
'License :: OSI Approved :: BSD License',
'Operating System :: POSIX',
'Operating System :: MacOS :: MacOS X',
'Operating System :: POSIX :: Linux',
'Programming Language :: Python',
'Topic :: Security :: Cryptography',
'Topic :: System :: Archiving :: Backup',

View file

@ -3,4 +3,4 @@ envlist = py32, py33
[testenv]
changedir = docs # Change dir to avoid import problem
commands = fakeroot {envpython} -m darc.testsuite.run -bv []
commands = {envpython} -m darc.testsuite.run -bv []