xattr: move to platform package, use cython, fixes #2495

this code used to live in borg.xattr and used ctypes
(and was the only ctypes-using code in borg).

the low level code now was converted to cython and
the platform code moved to platform package.

got rid of the code that tried to find the libc.
This commit is contained in:
Thomas Waldmann 2018-06-21 23:53:47 +02:00
parent 02c48836fa
commit 99149684bf
9 changed files with 335 additions and 350 deletions

View File

@ -6,6 +6,7 @@ Platform-specific APIs.
Public APIs are documented in platform.base.
"""
from .base import listxattr, getxattr, setxattr, ENOATTR
from .base import acl_get, acl_set
from .base import set_flags, get_flags
from .base import SaveFile, SyncFile, sync_dir, fdatasync, safe_fadvise
@ -21,12 +22,15 @@ if not sys.platform.startswith(('win32', )):
if sys.platform.startswith('linux'): # pragma: linux only
from .linux import API_VERSION as OS_API_VERSION
from .linux import listxattr, getxattr, setxattr
from .linux import acl_get, acl_set
from .linux import set_flags, get_flags
from .linux import SyncFile
elif sys.platform.startswith('freebsd'): # pragma: freebsd only
from .freebsd import API_VERSION as OS_API_VERSION
from .freebsd import listxattr, getxattr, setxattr
from .freebsd import acl_get, acl_set
elif sys.platform == 'darwin': # pragma: darwin only
from .darwin import API_VERSION as OS_API_VERSION
from .darwin import listxattr, getxattr, setxattr
from .darwin import acl_get, acl_set

View File

@ -17,10 +17,47 @@ platform API: that way platform APIs provided by the platform-specific support m
are correctly composed into the base functionality.
"""
API_VERSION = '1.1_03'
API_VERSION = '1.2_01'
fdatasync = getattr(os, 'fdatasync', os.fsync)
from .xattr import ENOATTR
def listxattr(path, *, follow_symlinks=True):
"""
Return list of xattr names on a file.
*path* can either be a path (str or bytes) or an open file descriptor (int).
*follow_symlinks* indicates whether symlinks should be followed
and only applies when *path* is not an open file descriptor.
"""
return []
def getxattr(path, name, *, follow_symlinks=True):
"""
Read xattr and return its value (as bytes) or None if its empty.
*path* can either be a path (str or bytes) or an open file descriptor (int).
*name* is the name of the xattr to read (str).
*follow_symlinks* indicates whether symlinks should be followed
and only applies when *path* is not an open file descriptor.
"""
def setxattr(path, name, value, *, follow_symlinks=True):
"""
Write xattr on *path*.
*path* can either be a path (str or bytes) or an open file descriptor (int).
*name* is the name of the xattr to read (str).
*value* is the value to write. It is either bytes or None. The latter
signals that the value shall be empty (size equals zero).
*follow_symlinks* indicates whether symlinks should be followed
and only applies when *path* is not an open file descriptor.
"""
def acl_get(path, item, st, numeric_owner=False):
"""

View File

@ -1,9 +1,26 @@
import os
from libc.stdint cimport uint32_t
from ..helpers import user2uid, group2gid
from ..helpers import safe_decode, safe_encode
from .xattr import _listxattr_inner, _getxattr_inner, _setxattr_inner, split_string0
API_VERSION = '1.1_03'
API_VERSION = '1.2_01'
cdef extern from "sys/xattr.h":
ssize_t c_listxattr "listxattr" (const char *path, char *list, size_t size, int flags)
ssize_t c_flistxattr "flistxattr" (int filedes, char *list, size_t size, int flags)
ssize_t c_getxattr "getxattr" (const char *path, const char *name, void *value, size_t size, uint32_t pos, int flags)
ssize_t c_fgetxattr "fgetxattr" (int filedes, const char *name, void *value, size_t size, uint32_t pos, int flags)
int c_setxattr "setxattr" (const char *path, const char *name, const void *value, size_t size, uint32_t pos, int flags)
int c_fsetxattr "fsetxattr" (int filedes, const char *name, const void *value, size_t size, uint32_t pos, int flags)
int XATTR_NOFOLLOW
cdef int XATTR_NOFLAGS = 0x0000
cdef extern from "sys/acl.h":
ctypedef struct _acl_t:
@ -18,6 +35,50 @@ cdef extern from "sys/acl.h":
int ACL_TYPE_EXTENDED
def listxattr(path, *, follow_symlinks=True):
def func(path, buf, size):
cdef char *buffer = <char *> buf
if isinstance(path, int):
return c_flistxattr(path, buffer, size, XATTR_NOFLAGS)
else:
if follow_symlinks:
return c_listxattr(path, buffer, size, XATTR_NOFLAGS)
else:
return c_listxattr(path, buffer, size, XATTR_NOFOLLOW)
n, buf = _listxattr_inner(func, path)
return [os.fsdecode(name) for name in split_string0(buf[:n]) if name]
def getxattr(path, name, *, follow_symlinks=True):
def func(path, name, buf, size):
cdef char *buffer = <char *> buf
if isinstance(path, int):
return c_fgetxattr(path, name, buffer, size, 0, XATTR_NOFLAGS)
else:
if follow_symlinks:
return c_getxattr(path, name, buffer, size, 0, XATTR_NOFLAGS)
else:
return c_getxattr(path, name, buffer, size, 0, XATTR_NOFOLLOW)
n, buf = _getxattr_inner(func, path, name)
return buf[:n] or None
def setxattr(path, name, value, *, follow_symlinks=True):
def func(path, name, value, size):
cdef char *val = NULL if value is None else <char *> value
if isinstance(path, int):
return c_fsetxattr(path, name, val, size, 0, XATTR_NOFLAGS)
else:
if follow_symlinks:
return c_setxattr(path, name, val, size, 0, XATTR_NOFLAGS)
else:
return c_setxattr(path, name, val, size, 0, XATTR_NOFOLLOW)
_setxattr_inner(func, path, name, value)
def _remove_numeric_id_if_possible(acl):
"""Replace the user/group field with the local uid/gid if possible
"""

View File

@ -2,13 +2,29 @@ import os
from ..helpers import posix_acl_use_stored_uid_gid
from ..helpers import safe_encode, safe_decode
from .xattr import _listxattr_inner, _getxattr_inner, _setxattr_inner, split_lstring
API_VERSION = '1.1_03'
API_VERSION = '1.2_01'
cdef extern from "errno.h":
int errno
int EINVAL
cdef extern from "sys/extattr.h":
ssize_t c_extattr_list_file "extattr_list_file" (const char *path, int attrnamespace, void *data, size_t nbytes)
ssize_t c_extattr_list_link "extattr_list_link" (const char *path, int attrnamespace, void *data, size_t nbytes)
ssize_t c_extattr_list_fd "extattr_list_fd" (int fd, int attrnamespace, void *data, size_t nbytes)
ssize_t c_extattr_get_file "extattr_get_file" (const char *path, int attrnamespace, const char *attrname, void *data, size_t nbytes)
ssize_t c_extattr_get_link "extattr_get_link" (const char *path, int attrnamespace, const char *attrname, void *data, size_t nbytes)
ssize_t c_extattr_get_fd "extattr_get_fd" (int fd, int attrnamespace, const char *attrname, void *data, size_t nbytes)
int c_extattr_set_file "extattr_set_file" (const char *path, int attrnamespace, const char *attrname, const void *data, size_t nbytes)
int c_extattr_set_link "extattr_set_link" (const char *path, int attrnamespace, const char *attrname, const void *data, size_t nbytes)
int c_extattr_set_fd "extattr_set_fd" (int fd, int attrnamespace, const char *attrname, const void *data, size_t nbytes)
int EXTATTR_NAMESPACE_USER
cdef extern from "sys/types.h":
int ACL_TYPE_ACCESS
int ACL_TYPE_DEFAULT
@ -32,6 +48,50 @@ cdef extern from "unistd.h":
int _PC_ACL_NFS4
def listxattr(path, *, follow_symlinks=True):
def func(path, buf, size):
cdef char *buffer = <char *> buf
if isinstance(path, int):
return c_extattr_list_fd(path, EXTATTR_NAMESPACE_USER, buffer, size)
else:
if follow_symlinks:
return c_extattr_list_file(path, EXTATTR_NAMESPACE_USER, buffer, size)
else:
return c_extattr_list_link(path, EXTATTR_NAMESPACE_USER, buffer, size)
n, buf = _listxattr_inner(func, path)
return [os.fsdecode(name) for name in split_lstring(buf[:n]) if name]
def getxattr(path, name, *, follow_symlinks=True):
def func(path, name, buf, size):
cdef char *buffer = <char *> buf
if isinstance(path, int):
return c_extattr_get_fd(path, EXTATTR_NAMESPACE_USER, name, buffer, size)
else:
if follow_symlinks:
return c_extattr_get_file(path, EXTATTR_NAMESPACE_USER, name, buffer, size)
else:
return c_extattr_get_link(path, EXTATTR_NAMESPACE_USER, name, buffer, size)
n, buf = _getxattr_inner(func, path, name)
return buf[:n] or None
def setxattr(path, name, value, *, follow_symlinks=True):
def func(path, name, value, size):
cdef char *val = NULL if value is None else <char *> value
if isinstance(path, int):
return c_extattr_set_fd(path, EXTATTR_NAMESPACE_USER, name, val, size)
else:
if follow_symlinks:
return c_extattr_set_file(path, EXTATTR_NAMESPACE_USER, name, val, size)
else:
return c_extattr_set_link(path, EXTATTR_NAMESPACE_USER, name, val, size)
_setxattr_inner(func, path, name, value)
cdef _get_acl(p, type, item, attribute, int flags):
cdef acl_t acl
cdef char *text

View File

@ -8,11 +8,25 @@ from ..helpers import user2uid, group2gid
from ..helpers import safe_decode, safe_encode
from .base import SyncFile as BaseSyncFile
from .base import safe_fadvise
from .xattr import _listxattr_inner, _getxattr_inner, _setxattr_inner, split_string0
from libc cimport errno
from libc.stdint cimport int64_t
API_VERSION = '1.1_03'
API_VERSION = '1.2_01'
cdef extern from "attr/xattr.h":
ssize_t c_listxattr "listxattr" (const char *path, char *list, size_t size)
ssize_t c_llistxattr "llistxattr" (const char *path, char *list, size_t size)
ssize_t c_flistxattr "flistxattr" (int filedes, char *list, size_t size)
ssize_t c_getxattr "getxattr" (const char *path, const char *name, void *value, size_t size)
ssize_t c_lgetxattr "lgetxattr" (const char *path, const char *name, void *value, size_t size)
ssize_t c_fgetxattr "fgetxattr" (int filedes, const char *name, void *value, size_t size)
int c_setxattr "setxattr" (const char *path, const char *name, const void *value, size_t size, int flags)
int c_lsetxattr "lsetxattr" (const char *path, const char *name, const void *value, size_t size, int flags)
int c_fsetxattr "fsetxattr" (int filedes, const char *name, const void *value, size_t size, int flags)
cdef extern from "sys/types.h":
int ACL_TYPE_ACCESS
@ -62,6 +76,52 @@ cdef extern from "string.h":
_comment_re = re.compile(' *#.*', re.M)
def listxattr(path, *, follow_symlinks=True):
def func(path, buf, size):
cdef char *buffer = <char *> buf
if isinstance(path, int):
return c_flistxattr(path, buffer, size)
else:
if follow_symlinks:
return c_listxattr(path, buffer, size)
else:
return c_llistxattr(path, buffer, size)
n, buf = _listxattr_inner(func, path)
return [os.fsdecode(name) for name in split_string0(buf[:n])
if name and not name.startswith(b'system.posix_acl_')]
def getxattr(path, name, *, follow_symlinks=True):
def func(path, name, buf, size):
cdef char *buffer = <char *> buf
if isinstance(path, int):
return c_fgetxattr(path, name, buffer, size)
else:
if follow_symlinks:
return c_getxattr(path, name, buffer, size)
else:
return c_lgetxattr(path, name, buffer, size)
n, buf = _getxattr_inner(func, path, name)
return buf[:n] or None
def setxattr(path, name, value, *, follow_symlinks=True):
def func(path, name, value, size):
cdef char *val = NULL if value is None else <char *> value
flags = 0
if isinstance(path, int):
return c_fsetxattr(path, name, val, size, flags)
else:
if follow_symlinks:
return c_setxattr(path, name, val, size, flags)
else:
return c_lsetxattr(path, name, val, size, flags)
_setxattr_inner(func, path, name, value)
BSD_TO_LINUX_FLAGS = {
stat.UF_NODUMP: FS_NODUMP_FL,
stat.UF_IMMUTABLE: FS_IMMUTABLE_FL,

View File

@ -1,11 +1,16 @@
import errno
import os
from libc.errno cimport errno as c_errno
cdef extern from "wchar.h":
cdef int wcswidth(const Py_UNICODE *str, size_t n)
def get_errno():
return c_errno
def swidth(s):
str_len = len(s)
terminal_width = wcswidth(s, str_len)

100
src/borg/platform/xattr.py Normal file
View File

@ -0,0 +1,100 @@
import errno
import os
from .posix import get_errno
from ..helpers import Buffer
try:
ENOATTR = errno.ENOATTR
except AttributeError:
# on some platforms, ENOATTR is missing, use ENODATA there
ENOATTR = errno.ENODATA
buffer = Buffer(bytearray, limit=2**24)
def split_string0(buf):
"""split a list of zero-terminated strings into python not-zero-terminated bytes"""
if isinstance(buf, bytearray):
buf = bytes(buf) # use a bytes object, so we return a list of bytes objects
return buf.split(b'\0')[:-1]
def split_lstring(buf):
"""split a list of length-prefixed strings into python not-length-prefixed bytes"""
result = []
mv = memoryview(buf)
while mv:
length = mv[0]
result.append(bytes(mv[1:1 + length]))
mv = mv[1 + length:]
return result
class BufferTooSmallError(Exception):
"""the buffer given to a xattr function was too small for the result."""
def _check(rv, path=None, detect_buffer_too_small=False):
if rv < 0:
e = get_errno()
if detect_buffer_too_small and e == errno.ERANGE:
# listxattr and getxattr signal with ERANGE that they need a bigger result buffer.
# setxattr signals this way that e.g. a xattr key name is too long / inacceptable.
raise BufferTooSmallError
else:
try:
msg = os.strerror(e)
except ValueError:
msg = ''
if isinstance(path, int):
path = '<FD %d>' % path
raise OSError(e, msg, path)
if detect_buffer_too_small and rv >= len(buffer):
# freebsd does not error with ERANGE if the buffer is too small,
# it just fills the buffer, truncates and returns.
# so, we play safe and just assume that result is truncated if
# it happens to be a full buffer.
raise BufferTooSmallError
return rv
def _listxattr_inner(func, path):
if isinstance(path, str):
path = os.fsencode(path)
size = len(buffer)
while True:
buf = buffer.get(size)
try:
n = _check(func(path, buf, size), path, detect_buffer_too_small=True)
except BufferTooSmallError:
size *= 2
else:
return n, buf
def _getxattr_inner(func, path, name):
if isinstance(path, str):
path = os.fsencode(path)
name = os.fsencode(name)
size = len(buffer)
while True:
buf = buffer.get(size)
try:
n = _check(func(path, name, buf, size), path, detect_buffer_too_small=True)
except BufferTooSmallError:
size *= 2
else:
return n, buf
def _setxattr_inner(func, path, name, value):
if isinstance(path, str):
path = os.fsencode(path)
name = os.fsencode(name)
value = value and os.fsencode(value)
size = len(value) if value else 0
_check(func(path, name, value, size), path, detect_buffer_too_small=False)

View File

@ -4,7 +4,8 @@ import unittest
import pytest
from ..xattr import is_enabled, getxattr, setxattr, listxattr, buffer, split_lstring
from ..platform.xattr import buffer, split_lstring
from ..xattr import is_enabled, getxattr, setxattr, listxattr
from . import BaseTestCase

View File

@ -1,26 +1,11 @@
"""A basic extended attributes (xattr) implementation for Linux, FreeBSD and MacOS X."""
import errno
import os
import re
import subprocess
import sys
import tempfile
from ctypes import CDLL, create_string_buffer, c_ssize_t, c_size_t, c_char_p, c_int, c_uint32, get_errno
from ctypes.util import find_library
from distutils.version import LooseVersion
from .helpers import Buffer, prepare_subprocess_env
from .platform import listxattr, getxattr, setxattr, ENOATTR
try:
ENOATTR = errno.ENOATTR
except AttributeError:
# on some platforms, ENOATTR is missing, use ENODATA there
ENOATTR = errno.ENODATA
buffer = Buffer(create_string_buffer, limit=2**24)
XATTR_FAKEROOT = True # fakeroot with xattr support required (>= 1.20.2?)
def is_enabled(path=None):
@ -60,331 +45,3 @@ def get_all(path, follow_symlinks=True):
except OSError as e:
if e.errno in (errno.ENOTSUP, errno.EPERM):
return {}
libc_name = find_library('c')
if libc_name is None:
# find_library didn't work, maybe we are on some minimal system that misses essential
# tools used by find_library, like ldconfig, gcc/cc, objdump.
# so we can only try some "usual" names for the C library:
if sys.platform.startswith('linux'):
libc_name = 'libc.so.6'
elif sys.platform.startswith(('freebsd', 'netbsd')):
libc_name = 'libc.so'
elif sys.platform == 'darwin':
libc_name = 'libc.dylib'
else:
msg = "Can't find C library. No fallback known. Try installing ldconfig, gcc/cc or objdump."
print(msg, file=sys.stderr) # logger isn't initialized at this stage
raise Exception(msg)
# If we are running with fakeroot on Linux, then use the xattr functions of fakeroot. This is needed by
# the 'test_extract_capabilities' test, but also allows xattrs to work with fakeroot on Linux in normal use.
# TODO: Check whether fakeroot supports xattrs on all platforms supported below.
# TODO: If that's the case then we can make Borg fakeroot-xattr-compatible on these as well.
XATTR_FAKEROOT = False
if sys.platform.startswith('linux'):
LD_PRELOAD = os.environ.get('LD_PRELOAD', '')
preloads = re.split("[ :]", LD_PRELOAD)
for preload in preloads:
if preload.startswith("libfakeroot"):
env = prepare_subprocess_env(system=True)
fakeroot_output = subprocess.check_output(['fakeroot', '-v'], env=env)
fakeroot_version = LooseVersion(fakeroot_output.decode('ascii').split()[-1])
if fakeroot_version >= LooseVersion("1.20.2"):
# 1.20.2 has been confirmed to have xattr support
# 1.18.2 has been confirmed not to have xattr support
# Versions in-between are unknown
libc_name = preload
XATTR_FAKEROOT = True
break
try:
libc = CDLL(libc_name, use_errno=True)
except OSError as e:
msg = "Can't find C library [%s]. Try installing ldconfig, gcc/cc or objdump." % e
raise Exception(msg)
def split_string0(buf):
"""split a list of zero-terminated strings into python not-zero-terminated bytes"""
return buf.split(b'\0')[:-1]
def split_lstring(buf):
"""split a list of length-prefixed strings into python not-length-prefixed bytes"""
result = []
mv = memoryview(buf)
while mv:
length = mv[0]
result.append(bytes(mv[1:1 + length]))
mv = mv[1 + length:]
return result
class BufferTooSmallError(Exception):
"""the buffer given to a xattr function was too small for the result."""
def _check(rv, path=None, detect_buffer_too_small=False):
if rv < 0:
e = get_errno()
if detect_buffer_too_small and e == errno.ERANGE:
# listxattr and getxattr signal with ERANGE that they need a bigger result buffer.
# setxattr signals this way that e.g. a xattr key name is too long / inacceptable.
raise BufferTooSmallError
else:
try:
msg = os.strerror(e)
except ValueError:
msg = ''
if isinstance(path, int):
path = '<FD %d>' % path
raise OSError(e, msg, path)
if detect_buffer_too_small and rv >= len(buffer):
# freebsd does not error with ERANGE if the buffer is too small,
# it just fills the buffer, truncates and returns.
# so, we play sure and just assume that result is truncated if
# it happens to be a full buffer.
raise BufferTooSmallError
return rv
def _listxattr_inner(func, path):
if isinstance(path, str):
path = os.fsencode(path)
size = len(buffer)
while True:
buf = buffer.get(size)
try:
n = _check(func(path, buf, size), path, detect_buffer_too_small=True)
except BufferTooSmallError:
size *= 2
else:
return n, buf.raw
def _getxattr_inner(func, path, name):
if isinstance(path, str):
path = os.fsencode(path)
name = os.fsencode(name)
size = len(buffer)
while True:
buf = buffer.get(size)
try:
n = _check(func(path, name, buf, size), path, detect_buffer_too_small=True)
except BufferTooSmallError:
size *= 2
else:
return n, buf.raw
def _setxattr_inner(func, path, name, value):
if isinstance(path, str):
path = os.fsencode(path)
name = os.fsencode(name)
value = value and os.fsencode(value)
size = len(value) if value else 0
_check(func(path, name, value, size), path, detect_buffer_too_small=False)
if sys.platform.startswith('linux'): # pragma: linux only
libc.llistxattr.argtypes = (c_char_p, c_char_p, c_size_t)
libc.llistxattr.restype = c_ssize_t
libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
libc.flistxattr.restype = c_ssize_t
libc.lsetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_int)
libc.lsetxattr.restype = c_int
libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_int)
libc.fsetxattr.restype = c_int
libc.lgetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t)
libc.lgetxattr.restype = c_ssize_t
libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t)
libc.fgetxattr.restype = c_ssize_t
def listxattr(path, *, follow_symlinks=True):
def func(path, buf, size):
if isinstance(path, int):
return libc.flistxattr(path, buf, size)
else:
if follow_symlinks:
return libc.listxattr(path, buf, size)
else:
return libc.llistxattr(path, buf, size)
n, buf = _listxattr_inner(func, path)
return [os.fsdecode(name) for name in split_string0(buf[:n])
if name and not name.startswith(b'system.posix_acl_')]
def getxattr(path, name, *, follow_symlinks=True):
def func(path, name, buf, size):
if isinstance(path, int):
return libc.fgetxattr(path, name, buf, size)
else:
if follow_symlinks:
return libc.getxattr(path, name, buf, size)
else:
return libc.lgetxattr(path, name, buf, size)
n, buf = _getxattr_inner(func, path, name)
return buf[:n] or None
def setxattr(path, name, value, *, follow_symlinks=True):
def func(path, name, value, size):
flags = 0
if isinstance(path, int):
return libc.fsetxattr(path, name, value, size, flags)
else:
if follow_symlinks:
return libc.setxattr(path, name, value, size, flags)
else:
return libc.lsetxattr(path, name, value, size, flags)
_setxattr_inner(func, path, name, value)
elif sys.platform == 'darwin': # pragma: darwin only
libc.listxattr.argtypes = (c_char_p, c_char_p, c_size_t, c_int)
libc.listxattr.restype = c_ssize_t
libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
libc.flistxattr.restype = c_ssize_t
libc.setxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
libc.setxattr.restype = c_int
libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
libc.fsetxattr.restype = c_int
libc.getxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
libc.getxattr.restype = c_ssize_t
libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
libc.fgetxattr.restype = c_ssize_t
XATTR_NOFLAGS = 0x0000
XATTR_NOFOLLOW = 0x0001
def listxattr(path, *, follow_symlinks=True):
def func(path, buf, size):
if isinstance(path, int):
return libc.flistxattr(path, buf, size, XATTR_NOFLAGS)
else:
if follow_symlinks:
return libc.listxattr(path, buf, size, XATTR_NOFLAGS)
else:
return libc.listxattr(path, buf, size, XATTR_NOFOLLOW)
n, buf = _listxattr_inner(func, path)
return [os.fsdecode(name) for name in split_string0(buf[:n]) if name]
def getxattr(path, name, *, follow_symlinks=True):
def func(path, name, buf, size):
if isinstance(path, int):
return libc.fgetxattr(path, name, buf, size, 0, XATTR_NOFLAGS)
else:
if follow_symlinks:
return libc.getxattr(path, name, buf, size, 0, XATTR_NOFLAGS)
else:
return libc.getxattr(path, name, buf, size, 0, XATTR_NOFOLLOW)
n, buf = _getxattr_inner(func, path, name)
return buf[:n] or None
def setxattr(path, name, value, *, follow_symlinks=True):
def func(path, name, value, size):
if isinstance(path, int):
return libc.fsetxattr(path, name, value, size, 0, XATTR_NOFLAGS)
else:
if follow_symlinks:
return libc.setxattr(path, name, value, size, 0, XATTR_NOFLAGS)
else:
return libc.setxattr(path, name, value, size, 0, XATTR_NOFOLLOW)
_setxattr_inner(func, path, name, value)
elif sys.platform.startswith('freebsd'): # pragma: freebsd only
libc.extattr_list_fd.argtypes = (c_int, c_int, c_char_p, c_size_t)
libc.extattr_list_fd.restype = c_ssize_t
libc.extattr_list_link.argtypes = (c_char_p, c_int, c_char_p, c_size_t)
libc.extattr_list_link.restype = c_ssize_t
libc.extattr_list_file.argtypes = (c_char_p, c_int, c_char_p, c_size_t)
libc.extattr_list_file.restype = c_ssize_t
libc.extattr_get_fd.argtypes = (c_int, c_int, c_char_p, c_char_p, c_size_t)
libc.extattr_get_fd.restype = c_ssize_t
libc.extattr_get_link.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
libc.extattr_get_link.restype = c_ssize_t
libc.extattr_get_file.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
libc.extattr_get_file.restype = c_ssize_t
libc.extattr_set_fd.argtypes = (c_int, c_int, c_char_p, c_char_p, c_size_t)
libc.extattr_set_fd.restype = c_int
libc.extattr_set_link.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
libc.extattr_set_link.restype = c_int
libc.extattr_set_file.argtypes = (c_char_p, c_int, c_char_p, c_char_p, c_size_t)
libc.extattr_set_file.restype = c_int
ns = EXTATTR_NAMESPACE_USER = 0x0001
def listxattr(path, *, follow_symlinks=True):
def func(path, buf, size):
if isinstance(path, int):
return libc.extattr_list_fd(path, ns, buf, size)
else:
if follow_symlinks:
return libc.extattr_list_file(path, ns, buf, size)
else:
return libc.extattr_list_link(path, ns, buf, size)
n, buf = _listxattr_inner(func, path)
return [os.fsdecode(name) for name in split_lstring(buf[:n]) if name]
def getxattr(path, name, *, follow_symlinks=True):
def func(path, name, buf, size):
if isinstance(path, int):
return libc.extattr_get_fd(path, ns, name, buf, size)
else:
if follow_symlinks:
return libc.extattr_get_file(path, ns, name, buf, size)
else:
return libc.extattr_get_link(path, ns, name, buf, size)
n, buf = _getxattr_inner(func, path, name)
return buf[:n] or None
def setxattr(path, name, value, *, follow_symlinks=True):
def func(path, name, value, size):
if isinstance(path, int):
return libc.extattr_set_fd(path, ns, name, value, size)
else:
if follow_symlinks:
return libc.extattr_set_file(path, ns, name, value, size)
else:
return libc.extattr_set_link(path, ns, name, value, size)
_setxattr_inner(func, path, name, value)
else: # pragma: unknown platform only
def listxattr(path, *, follow_symlinks=True):
"""
Return list of xattr names on a file.
*path* can either be a path (str or bytes) or an open file descriptor (int).
*follow_symlinks* indicates whether symlinks should be followed
and only applies when *path* is not an open file descriptor.
"""
return []
def getxattr(path, name, *, follow_symlinks=True):
"""
Read xattr and return its value (as bytes) or None if its empty.
*path* can either be a path (str or bytes) or an open file descriptor (int).
*name* is the name of the xattr to read (str).
*follow_symlinks* indicates whether symlinks should be followed
and only applies when *path* is not an open file descriptor.
"""
def setxattr(path, name, value, *, follow_symlinks=True):
"""
Write xattr on *path*.
*path* can either be a path (str or bytes) or an open file descriptor (int).
*name* is the name of the xattr to read (str).
*value* is the value to write. It is either bytes or None. The latter
signals that the value shall be empty (size equals zero).
*follow_symlinks* indicates whether symlinks should be followed
and only applies when *path* is not an open file descriptor.
"""