mirror of https://github.com/borgbackup/borg.git
Rewrote xattr support
This commit is contained in:
parent
6c5f494a82
commit
2281af5284
|
@ -280,7 +280,7 @@ class Archive(object):
|
||||||
xattrs = item.get(b'xattrs')
|
xattrs = item.get(b'xattrs')
|
||||||
if xattrs:
|
if xattrs:
|
||||||
for k, v in xattrs.items():
|
for k, v in xattrs.items():
|
||||||
xattr.set(fd or path, k, v)
|
xattr.setxattr(fd or path, k, v)
|
||||||
uid = gid = None
|
uid = gid = None
|
||||||
if not self.numeric_owner:
|
if not self.numeric_owner:
|
||||||
uid = user2uid(item[b'user'])
|
uid = user2uid(item[b'user'])
|
||||||
|
@ -350,7 +350,7 @@ class Archive(object):
|
||||||
if self.numeric_owner:
|
if self.numeric_owner:
|
||||||
item[b'user'] = item[b'group'] = None
|
item[b'user'] = item[b'group'] = None
|
||||||
try:
|
try:
|
||||||
xattrs = xattr.get_all(path)
|
xattrs = xattr.get_all(path, follow_symlinks=False)
|
||||||
if xattrs:
|
if xattrs:
|
||||||
item[b'xattrs'] = xattrs
|
item[b'xattrs'] = xattrs
|
||||||
except PermissionError:
|
except PermissionError:
|
||||||
|
|
|
@ -102,12 +102,10 @@ class AtticOperations(llfuse.Operations):
|
||||||
|
|
||||||
def listxattr(self, inode):
|
def listxattr(self, inode):
|
||||||
item = self.items[inode]
|
item = self.items[inode]
|
||||||
return [b'user.' + name for name in item.get(b'xattrs', {}).keys()]
|
return item.get(b'xattrs', {}).keys()
|
||||||
|
|
||||||
def getxattr(self, inode, name):
|
def getxattr(self, inode, name):
|
||||||
item = self.items[inode]
|
item = self.items[inode]
|
||||||
if name.startswith(b'user.'):
|
|
||||||
name = name[5:]
|
|
||||||
try:
|
try:
|
||||||
return item.get(b'xattrs', {})[name]
|
return item.get(b'xattrs', {})[name]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
|
|
@ -3,7 +3,8 @@ import os
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
import unittest
|
import unittest
|
||||||
from attic import xattr
|
from attic.xattr import get_all
|
||||||
|
|
||||||
|
|
||||||
has_mtime_ns = sys.version >= '3.3'
|
has_mtime_ns = sys.version >= '3.3'
|
||||||
utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
|
utime_supports_fd = os.utime in getattr(os, 'supports_fd', {})
|
||||||
|
@ -18,7 +19,7 @@ class AtticTestCase(unittest.TestCase):
|
||||||
|
|
||||||
def _get_xattrs(self, path):
|
def _get_xattrs(self, path):
|
||||||
try:
|
try:
|
||||||
return xattr.get_all(path)
|
return get_all(path, follow_symlinks=False)
|
||||||
except EnvironmentError:
|
except EnvironmentError:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
@ -81,6 +82,7 @@ def get_tests(suite):
|
||||||
class TestLoader(unittest.TestLoader):
|
class TestLoader(unittest.TestLoader):
|
||||||
"""A customzied test loader that properly detects and filters our test cases
|
"""A customzied test loader that properly detects and filters our test cases
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def loadTestsFromName(self, pattern, module=None):
|
def loadTestsFromName(self, pattern, module=None):
|
||||||
suite = self.discover('attic.testsuite', '*.py')
|
suite = self.discover('attic.testsuite', '*.py')
|
||||||
tests = unittest.TestSuite()
|
tests = unittest.TestSuite()
|
||||||
|
|
|
@ -117,7 +117,7 @@ class ArchiverTestCase(AtticTestCase):
|
||||||
# Char device
|
# Char device
|
||||||
os.mknod('input/cdev', 0o600 | stat.S_IFCHR, os.makedev(30, 40))
|
os.mknod('input/cdev', 0o600 | stat.S_IFCHR, os.makedev(30, 40))
|
||||||
if xattr.is_enabled():
|
if xattr.is_enabled():
|
||||||
xattr.set(os.path.join(self.input_path, 'file1'), b'foo', b'bar')
|
xattr.setxattr(os.path.join(self.input_path, 'file1'), 'user.foo', b'bar')
|
||||||
# Hard link
|
# Hard link
|
||||||
os.link(os.path.join(self.input_path, 'file1'),
|
os.link(os.path.join(self.input_path, 'file1'),
|
||||||
os.path.join(self.input_path, 'hardlink'))
|
os.path.join(self.input_path, 'hardlink'))
|
||||||
|
|
|
@ -2,8 +2,7 @@ import os
|
||||||
import tempfile
|
import tempfile
|
||||||
import unittest
|
import unittest
|
||||||
from attic.testsuite import AtticTestCase
|
from attic.testsuite import AtticTestCase
|
||||||
from attic.xattr import lsetxattr, llistxattr, lgetxattr, get_all, set, flistxattr, fgetxattr, fsetxattr, is_enabled
|
from attic.xattr import is_enabled, getxattr, setxattr, listxattr
|
||||||
|
|
||||||
|
|
||||||
@unittest.skipUnless(is_enabled(), 'xattr not enabled on filesystem')
|
@unittest.skipUnless(is_enabled(), 'xattr not enabled on filesystem')
|
||||||
class XattrTestCase(AtticTestCase):
|
class XattrTestCase(AtticTestCase):
|
||||||
|
@ -16,28 +15,16 @@ class XattrTestCase(AtticTestCase):
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
os.unlink(self.symlink)
|
os.unlink(self.symlink)
|
||||||
|
|
||||||
def test_low_level(self):
|
def test(self):
|
||||||
self.assert_equal(llistxattr(self.tmpfile.name), [])
|
self.assert_equal(listxattr(self.tmpfile.name), [])
|
||||||
self.assert_equal(llistxattr(self.symlink), [])
|
self.assert_equal(listxattr(self.tmpfile.fileno()), [])
|
||||||
lsetxattr(self.tmpfile.name, b'foo', b'bar')
|
self.assert_equal(listxattr(self.symlink), [])
|
||||||
self.assert_equal(llistxattr(self.tmpfile.name), [b'foo'])
|
setxattr(self.tmpfile.name, 'user.foo', b'bar')
|
||||||
self.assert_equal(lgetxattr(self.tmpfile.name, b'foo'), b'bar')
|
setxattr(self.tmpfile.fileno(), 'user.bar', b'foo')
|
||||||
self.assert_equal(llistxattr(self.symlink), [])
|
self.assert_equal(set(listxattr(self.tmpfile.name)), set(['user.foo', 'user.bar']))
|
||||||
|
self.assert_equal(set(listxattr(self.tmpfile.fileno())), set(['user.foo', 'user.bar']))
|
||||||
def test_low_level_fileno(self):
|
self.assert_equal(set(listxattr(self.symlink)), set(['user.foo', 'user.bar']))
|
||||||
self.assert_equal(flistxattr(self.tmpfile.fileno()), [])
|
self.assert_equal(listxattr(self.symlink, follow_symlinks=False), [])
|
||||||
fsetxattr(self.tmpfile.fileno(), b'foo', b'bar')
|
self.assert_equal(getxattr(self.tmpfile.name, 'user.foo'), b'bar')
|
||||||
self.assert_equal(flistxattr(self.tmpfile.fileno()), [b'foo'])
|
self.assert_equal(getxattr(self.tmpfile.fileno(), 'user.foo'), b'bar')
|
||||||
self.assert_equal(fgetxattr(self.tmpfile.fileno(), b'foo'), b'bar')
|
self.assert_equal(getxattr(self.symlink, 'user.foo'), b'bar')
|
||||||
|
|
||||||
def test_high_level(self):
|
|
||||||
self.assert_equal(get_all(self.tmpfile.name), {})
|
|
||||||
self.assert_equal(get_all(self.symlink), {})
|
|
||||||
set(self.tmpfile.name, b'foo', b'bar')
|
|
||||||
self.assert_equal(get_all(self.tmpfile.name), {b'foo': b'bar'})
|
|
||||||
self.assert_equal(get_all(self.symlink), {})
|
|
||||||
|
|
||||||
def test_high_level_fileno(self):
|
|
||||||
self.assert_equal(get_all(self.tmpfile.fileno()), {})
|
|
||||||
set(self.tmpfile.fileno(), b'foo', b'bar')
|
|
||||||
self.assert_equal(get_all(self.tmpfile.fileno()), {b'foo': b'bar'})
|
|
||||||
|
|
315
attic/xattr.py
315
attic/xattr.py
|
@ -1,14 +1,8 @@
|
||||||
"""A basic extended attributes (xattr) implementation for Linux and MacOS X
|
"""A basic extended attributes (xattr) implementation for Linux and MacOS X
|
||||||
|
|
||||||
On Linux only the "user." namespace is accessed
|
|
||||||
"""
|
"""
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import tempfile
|
import tempfile
|
||||||
from ctypes import CDLL, create_string_buffer, c_ssize_t, c_size_t, c_char_p, c_int, c_uint32, get_errno
|
|
||||||
from ctypes.util import find_library
|
|
||||||
|
|
||||||
libc = CDLL(find_library('c'), use_errno=True)
|
|
||||||
|
|
||||||
|
|
||||||
def is_enabled():
|
def is_enabled():
|
||||||
|
@ -16,163 +10,162 @@ def is_enabled():
|
||||||
"""
|
"""
|
||||||
with tempfile.TemporaryFile() as fd:
|
with tempfile.TemporaryFile() as fd:
|
||||||
try:
|
try:
|
||||||
set(fd.fileno(), b'name', b'value')
|
setxattr(fd.fileno(), 'user.name', b'value')
|
||||||
except OSError:
|
except OSError:
|
||||||
return False
|
return False
|
||||||
return get_all(fd.fileno()) == {b'name': b'value'}
|
return getxattr(fd.fileno(), 'user.name') == b'value'
|
||||||
|
|
||||||
|
|
||||||
def set(path_or_fd, name, value):
|
def get_all(path, follow_symlinks=True):
|
||||||
if isinstance(path_or_fd, int):
|
return dict((name, getxattr(path, name, follow_symlinks=follow_symlinks))
|
||||||
fsetxattr(path_or_fd, name, value)
|
for name in listxattr(path, follow_symlinks=follow_symlinks))
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Currently only available on Python 3.3+ on Linux
|
||||||
|
from os import getxattr, setxattr, listxattr2
|
||||||
|
except ImportError:
|
||||||
|
from ctypes import CDLL, create_string_buffer, c_ssize_t, c_size_t, c_char_p, c_int, c_uint32, get_errno
|
||||||
|
from ctypes.util import find_library
|
||||||
|
libc = CDLL(find_library('c'), use_errno=True)
|
||||||
|
|
||||||
|
def _check(rv, path=None):
|
||||||
|
if rv < 0:
|
||||||
|
raise OSError(get_errno(), path)
|
||||||
|
return rv
|
||||||
|
|
||||||
|
if sys.platform.startswith('linux'):
|
||||||
|
libc.llistxattr.argtypes = (c_char_p, c_char_p, c_size_t)
|
||||||
|
libc.llistxattr.restype = c_ssize_t
|
||||||
|
libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
|
||||||
|
libc.flistxattr.restype = c_ssize_t
|
||||||
|
libc.lsetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_int)
|
||||||
|
libc.lsetxattr.restype = c_int
|
||||||
|
libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_int)
|
||||||
|
libc.fsetxattr.restype = c_int
|
||||||
|
libc.lgetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t)
|
||||||
|
libc.lgetxattr.restype = c_ssize_t
|
||||||
|
libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t)
|
||||||
|
libc.fgetxattr.restype = c_ssize_t
|
||||||
|
|
||||||
|
def listxattr(path, *, follow_symlinks=True):
|
||||||
|
if isinstance(path, str):
|
||||||
|
path = os.fsencode(path)
|
||||||
|
if isinstance(path, int):
|
||||||
|
func = libc.flistxattr
|
||||||
|
elif follow_symlinks:
|
||||||
|
func = libc.listxattr
|
||||||
|
else:
|
||||||
|
func = libc.llistxattr
|
||||||
|
n = _check(func(path, None, 0), path)
|
||||||
|
if n == 0:
|
||||||
|
return []
|
||||||
|
namebuf = create_string_buffer(n)
|
||||||
|
n2 = _check(func(path, namebuf, n), path)
|
||||||
|
if n2 != n:
|
||||||
|
raise Exception('listxattr failed')
|
||||||
|
return [os.fsdecode(name) for name in namebuf.raw.split(b'\0')[:-1]]
|
||||||
|
|
||||||
|
def getxattr(path, name, *, follow_symlinks=True):
|
||||||
|
name = os.fsencode(name)
|
||||||
|
if isinstance(path, str):
|
||||||
|
path = os.fsencode(path)
|
||||||
|
if isinstance(path, int):
|
||||||
|
func = libc.fgetxattr
|
||||||
|
elif follow_symlinks:
|
||||||
|
func = libc.getxattr
|
||||||
|
else:
|
||||||
|
func = libc.lgetxattr
|
||||||
|
n = _check(func(path, name, None, 0))
|
||||||
|
if n == 0:
|
||||||
|
return
|
||||||
|
valuebuf = create_string_buffer(n)
|
||||||
|
n2 = _check(func(path, name, valuebuf, n), path)
|
||||||
|
if n2 != n:
|
||||||
|
raise Exception('getxattr failed')
|
||||||
|
return valuebuf.raw
|
||||||
|
|
||||||
|
def setxattr(path, name, value, *, follow_symlinks=True):
|
||||||
|
name = os.fsencode(name)
|
||||||
|
value = os.fsencode(value)
|
||||||
|
if isinstance(path, str):
|
||||||
|
path = os.fsencode(path)
|
||||||
|
if isinstance(path, int):
|
||||||
|
func = libc.fsetxattr
|
||||||
|
elif follow_symlinks:
|
||||||
|
func = libc.setxattr
|
||||||
|
else:
|
||||||
|
func = libc.lsetxattr
|
||||||
|
_check(func(path, name, value, len(value), 0), path)
|
||||||
|
|
||||||
|
elif sys.platform == 'darwin':
|
||||||
|
libc.listxattr.argtypes = (c_char_p, c_char_p, c_size_t, c_int)
|
||||||
|
libc.listxattr.restype = c_ssize_t
|
||||||
|
libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
|
||||||
|
libc.flistxattr.restype = c_ssize_t
|
||||||
|
libc.setxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
|
||||||
|
libc.setxattr.restype = c_int
|
||||||
|
libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
|
||||||
|
libc.fsetxattr.restype = c_int
|
||||||
|
libc.getxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
|
||||||
|
libc.getxattr.restype = c_ssize_t
|
||||||
|
libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
|
||||||
|
libc.fgetxattr.restype = c_ssize_t
|
||||||
|
|
||||||
|
XATTR_NOFOLLOW = 0x0001
|
||||||
|
|
||||||
|
def listxattr(path, *, follow_symlinks=True):
|
||||||
|
func = libc.listxattr
|
||||||
|
flags = 0
|
||||||
|
if isinstance(path, str):
|
||||||
|
path = os.fsencode(path)
|
||||||
|
if isinstance(path, int):
|
||||||
|
func = libc.flistxattr
|
||||||
|
elif not follow_symlinks:
|
||||||
|
flags = XATTR_NOFOLLOW
|
||||||
|
n = _check(func(path, None, 0, flags), path)
|
||||||
|
if n == 0:
|
||||||
|
return []
|
||||||
|
namebuf = create_string_buffer(n)
|
||||||
|
n2 = _check(func(path, namebuf, n, flags), path)
|
||||||
|
if n2 != n:
|
||||||
|
raise Exception('listxattr failed')
|
||||||
|
return [os.fsdecode(name) for name in namebuf.raw.split(b'\0')[:-1]]
|
||||||
|
|
||||||
|
def getxattr(path, name, *, follow_symlinks=True):
|
||||||
|
name = os.fsencode(name)
|
||||||
|
func = libc.getxattr
|
||||||
|
flags = 0
|
||||||
|
if isinstance(path, str):
|
||||||
|
path = os.fsencode(path)
|
||||||
|
func = libc.fgetxattr
|
||||||
|
if isinstance(path, int):
|
||||||
|
func = libc.fgetxattr
|
||||||
|
elif not follow_symlinks:
|
||||||
|
flags = XATTR_NOFOLLOW
|
||||||
|
else:
|
||||||
|
func = libc.lgetxattr
|
||||||
|
n = _check(func(path, name, None, 0, 0, flags))
|
||||||
|
if n == 0:
|
||||||
|
return
|
||||||
|
valuebuf = create_string_buffer(n)
|
||||||
|
n2 = _check(func(path, name, valuebuf, n, 0, flags), path)
|
||||||
|
if n2 != n:
|
||||||
|
raise Exception('getxattr failed')
|
||||||
|
return valuebuf.raw
|
||||||
|
|
||||||
|
def setxattr(path, name, value, *, follow_symlinks=True):
|
||||||
|
name = os.fsencode(name)
|
||||||
|
value = os.fsencode(value)
|
||||||
|
func = libc.setxattr
|
||||||
|
flags = 0
|
||||||
|
if isinstance(path, str):
|
||||||
|
path = os.fsencode(path)
|
||||||
|
if isinstance(path, int):
|
||||||
|
func = libc.fsetxattr
|
||||||
|
elif not follow_symlinks:
|
||||||
|
flags = XATTR_NOFOLLOW
|
||||||
|
_check(func(path, name, value, len(value), 0, flags), path)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
lsetxattr(path_or_fd, name, value)
|
raise Exception('Unsupported platform: %s' % sys.platform)
|
||||||
|
|
||||||
|
|
||||||
def get_all(path_or_fd):
|
|
||||||
"""Return a dictionary with all (user) xattrs for "path_or_fd"
|
|
||||||
|
|
||||||
Symbolic links are not followed
|
|
||||||
"""
|
|
||||||
if isinstance(path_or_fd, int):
|
|
||||||
return dict((name, fgetxattr(path_or_fd, name)) for name in flistxattr(path_or_fd))
|
|
||||||
else:
|
|
||||||
return dict((name, lgetxattr(path_or_fd, name)) for name in llistxattr(path_or_fd))
|
|
||||||
|
|
||||||
|
|
||||||
def _check(rv, path=None):
|
|
||||||
if rv < 0:
|
|
||||||
raise OSError(get_errno(), path)
|
|
||||||
return rv
|
|
||||||
|
|
||||||
|
|
||||||
if sys.platform.startswith('linux'):
|
|
||||||
libc.llistxattr.argtypes = (c_char_p, c_char_p, c_size_t)
|
|
||||||
libc.llistxattr.restype = c_ssize_t
|
|
||||||
libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
|
|
||||||
libc.flistxattr.restype = c_ssize_t
|
|
||||||
libc.lsetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_int)
|
|
||||||
libc.lsetxattr.restype = c_int
|
|
||||||
libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_int)
|
|
||||||
libc.fsetxattr.restype = c_int
|
|
||||||
libc.lgetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t)
|
|
||||||
libc.lgetxattr.restype = c_ssize_t
|
|
||||||
libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t)
|
|
||||||
libc.fgetxattr.restype = c_ssize_t
|
|
||||||
|
|
||||||
def llistxattr(path):
|
|
||||||
path = os.fsencode(path)
|
|
||||||
n = _check(libc.llistxattr(path, None, 0), path)
|
|
||||||
if n == 0:
|
|
||||||
[]
|
|
||||||
namebuf = create_string_buffer(n)
|
|
||||||
n2 = _check(libc.llistxattr(path, namebuf, n))
|
|
||||||
if n2 != n:
|
|
||||||
raise Exception('llistxattr failed')
|
|
||||||
return [name[5:] for name in namebuf.raw.split(b'\0')[:-1] if name.startswith(b'user.')]
|
|
||||||
|
|
||||||
def flistxattr(fd):
|
|
||||||
n = _check(libc.flistxattr(fd, None, 0))
|
|
||||||
if n == 0:
|
|
||||||
[]
|
|
||||||
namebuf = create_string_buffer(n)
|
|
||||||
n2 = _check(libc.flistxattr(fd, namebuf, n))
|
|
||||||
if n2 != n:
|
|
||||||
raise Exception('flistxattr failed')
|
|
||||||
return [name[5:] for name in namebuf.raw.split(b'\0')[:-1] if name.startswith(b'user.')]
|
|
||||||
|
|
||||||
def lsetxattr(path, name, value, flags=0):
|
|
||||||
_check(libc.lsetxattr(os.fsencode(path), b'user.' + name, value, len(value), flags), path)
|
|
||||||
|
|
||||||
def fsetxattr(fd, name, value, flags=0):
|
|
||||||
_check(libc.fsetxattr(fd, b'user.' + name, value, len(value), flags))
|
|
||||||
|
|
||||||
def lgetxattr(path, name):
|
|
||||||
path = os.fsencode(path)
|
|
||||||
name = b'user.' + name
|
|
||||||
n = _check(libc.lgetxattr(path, name, None, 0))
|
|
||||||
if n == 0:
|
|
||||||
return None
|
|
||||||
valuebuf = create_string_buffer(n)
|
|
||||||
n2 = _check(libc.lgetxattr(path, name, valuebuf, n), path)
|
|
||||||
if n2 != n:
|
|
||||||
raise Exception('lgetxattr failed')
|
|
||||||
return valuebuf.raw
|
|
||||||
|
|
||||||
def fgetxattr(fd, name):
|
|
||||||
name = b'user.' + name
|
|
||||||
n = _check(libc.fgetxattr(fd, name, None, 0))
|
|
||||||
if n == 0:
|
|
||||||
return None
|
|
||||||
valuebuf = create_string_buffer(n)
|
|
||||||
n2 = _check(libc.fgetxattr(fd, name, valuebuf, n))
|
|
||||||
if n2 != n:
|
|
||||||
raise Exception('fgetxattr failed')
|
|
||||||
return valuebuf.raw
|
|
||||||
|
|
||||||
elif sys.platform == 'darwin':
|
|
||||||
libc.listxattr.argtypes = (c_char_p, c_char_p, c_size_t, c_int)
|
|
||||||
libc.listxattr.restype = c_ssize_t
|
|
||||||
libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
|
|
||||||
libc.flistxattr.restype = c_ssize_t
|
|
||||||
libc.setxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
|
|
||||||
libc.setxattr.restype = c_int
|
|
||||||
libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
|
|
||||||
libc.fsetxattr.restype = c_int
|
|
||||||
libc.getxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
|
|
||||||
libc.getxattr.restype = c_ssize_t
|
|
||||||
libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
|
|
||||||
libc.fgetxattr.restype = c_ssize_t
|
|
||||||
|
|
||||||
XATTR_NOFOLLOW = 0x0001
|
|
||||||
|
|
||||||
def llistxattr(path):
|
|
||||||
path = os.fsencode(path)
|
|
||||||
n = _check(libc.listxattr(path, None, 0, XATTR_NOFOLLOW), path)
|
|
||||||
if n == 0:
|
|
||||||
[]
|
|
||||||
namebuf = create_string_buffer(n)
|
|
||||||
n2 = _check(libc.listxattr(path, namebuf, n, XATTR_NOFOLLOW))
|
|
||||||
if n2 != n:
|
|
||||||
raise Exception('llistxattr failed')
|
|
||||||
return namebuf.raw.split(b'\0')[:-1]
|
|
||||||
|
|
||||||
def flistxattr(fd):
|
|
||||||
n = _check(libc.flistxattr(fd, None, 0, 0))
|
|
||||||
if n == 0:
|
|
||||||
[]
|
|
||||||
namebuf = create_string_buffer(n)
|
|
||||||
n2 = _check(libc.flistxattr(fd, namebuf, n, 0))
|
|
||||||
if n2 != n:
|
|
||||||
raise Exception('flistxattr failed')
|
|
||||||
return namebuf.raw.split(b'\0')[:-1]
|
|
||||||
|
|
||||||
def lsetxattr(path, name, value, flags=XATTR_NOFOLLOW):
|
|
||||||
_check(libc.setxattr(os.fsencode(path), name, value, len(value), 0, flags), path)
|
|
||||||
|
|
||||||
def fsetxattr(fd, name, value, flags=0):
|
|
||||||
_check(libc.fsetxattr(fd, name, value, len(value), 0, flags))
|
|
||||||
|
|
||||||
def lgetxattr(path, name):
|
|
||||||
path = os.fsencode(path)
|
|
||||||
n = _check(libc.getxattr(path, name, None, 0, 0, XATTR_NOFOLLOW), path)
|
|
||||||
if n == 0:
|
|
||||||
return None
|
|
||||||
valuebuf = create_string_buffer(n)
|
|
||||||
n2 = _check(libc.getxattr(path, name, valuebuf, n, 0, XATTR_NOFOLLOW))
|
|
||||||
if n2 != n:
|
|
||||||
raise Exception('getxattr failed')
|
|
||||||
return valuebuf.raw
|
|
||||||
|
|
||||||
def fgetxattr(fd, name):
|
|
||||||
n = _check(libc.fgetxattr(fd, name, None, 0, 0, 0))
|
|
||||||
if n == 0:
|
|
||||||
return None
|
|
||||||
valuebuf = create_string_buffer(n)
|
|
||||||
n2 = _check(libc.fgetxattr(fd, name, valuebuf, n, 0, 0))
|
|
||||||
if n2 != n:
|
|
||||||
Exception('fgetxattr failed')
|
|
||||||
return valuebuf.raw
|
|
||||||
|
|
||||||
else:
|
|
||||||
raise Exception('Unsupported platform: %s' % sys.platform)
|
|
||||||
|
|
Loading…
Reference in New Issue