mirror of
https://github.com/borgbackup/borg.git
synced 2024-12-23 16:26:29 +00:00
168 lines
6.3 KiB
Python
168 lines
6.3 KiB
Python
"""A basic extended attributes (xattr) implementation for Linux and MacOS X
|
|
"""
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
|
|
|
|
def is_enabled():
|
|
"""Determine if xattr is enabled on the filesystem
|
|
"""
|
|
with tempfile.TemporaryFile() as fd:
|
|
try:
|
|
setxattr(fd.fileno(), 'user.name', b'value')
|
|
except OSError:
|
|
return False
|
|
return getxattr(fd.fileno(), 'user.name') == b'value'
|
|
|
|
|
|
def get_all(path, follow_symlinks=True):
|
|
return dict((name, getxattr(path, name, follow_symlinks=follow_symlinks))
|
|
for name in listxattr(path, follow_symlinks=follow_symlinks))
|
|
|
|
|
|
try:
|
|
# Currently only available on Python 3.3+ on Linux
|
|
from os import getxattr, setxattr, listxattr2
|
|
except ImportError:
|
|
from ctypes import CDLL, create_string_buffer, c_ssize_t, c_size_t, c_char_p, c_int, c_uint32, get_errno
|
|
from ctypes.util import find_library
|
|
libc = CDLL(find_library('c'), use_errno=True)
|
|
|
|
def _check(rv, path=None):
|
|
if rv < 0:
|
|
raise OSError(get_errno(), path)
|
|
return rv
|
|
|
|
if sys.platform.startswith('linux'):
|
|
libc.llistxattr.argtypes = (c_char_p, c_char_p, c_size_t)
|
|
libc.llistxattr.restype = c_ssize_t
|
|
libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
|
|
libc.flistxattr.restype = c_ssize_t
|
|
libc.lsetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_int)
|
|
libc.lsetxattr.restype = c_int
|
|
libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_int)
|
|
libc.fsetxattr.restype = c_int
|
|
libc.lgetxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t)
|
|
libc.lgetxattr.restype = c_ssize_t
|
|
libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t)
|
|
libc.fgetxattr.restype = c_ssize_t
|
|
|
|
def listxattr(path, *, follow_symlinks=True):
|
|
if isinstance(path, str):
|
|
path = os.fsencode(path)
|
|
if isinstance(path, int):
|
|
func = libc.flistxattr
|
|
elif follow_symlinks:
|
|
func = libc.listxattr
|
|
else:
|
|
func = libc.llistxattr
|
|
n = _check(func(path, None, 0), path)
|
|
if n == 0:
|
|
return []
|
|
namebuf = create_string_buffer(n)
|
|
n2 = _check(func(path, namebuf, n), path)
|
|
if n2 != n:
|
|
raise Exception('listxattr failed')
|
|
return [os.fsdecode(name) for name in namebuf.raw.split(b'\0')[:-1]]
|
|
|
|
def getxattr(path, name, *, follow_symlinks=True):
|
|
name = os.fsencode(name)
|
|
if isinstance(path, str):
|
|
path = os.fsencode(path)
|
|
if isinstance(path, int):
|
|
func = libc.fgetxattr
|
|
elif follow_symlinks:
|
|
func = libc.getxattr
|
|
else:
|
|
func = libc.lgetxattr
|
|
n = _check(func(path, name, None, 0))
|
|
if n == 0:
|
|
return
|
|
valuebuf = create_string_buffer(n)
|
|
n2 = _check(func(path, name, valuebuf, n), path)
|
|
if n2 != n:
|
|
raise Exception('getxattr failed')
|
|
return valuebuf.raw
|
|
|
|
def setxattr(path, name, value, *, follow_symlinks=True):
|
|
name = os.fsencode(name)
|
|
value = os.fsencode(value)
|
|
if isinstance(path, str):
|
|
path = os.fsencode(path)
|
|
if isinstance(path, int):
|
|
func = libc.fsetxattr
|
|
elif follow_symlinks:
|
|
func = libc.setxattr
|
|
else:
|
|
func = libc.lsetxattr
|
|
_check(func(path, name, value, len(value), 0), path)
|
|
|
|
elif sys.platform == 'darwin':
|
|
libc.listxattr.argtypes = (c_char_p, c_char_p, c_size_t, c_int)
|
|
libc.listxattr.restype = c_ssize_t
|
|
libc.flistxattr.argtypes = (c_int, c_char_p, c_size_t)
|
|
libc.flistxattr.restype = c_ssize_t
|
|
libc.setxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
|
|
libc.setxattr.restype = c_int
|
|
libc.fsetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
|
|
libc.fsetxattr.restype = c_int
|
|
libc.getxattr.argtypes = (c_char_p, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
|
|
libc.getxattr.restype = c_ssize_t
|
|
libc.fgetxattr.argtypes = (c_int, c_char_p, c_char_p, c_size_t, c_uint32, c_int)
|
|
libc.fgetxattr.restype = c_ssize_t
|
|
|
|
XATTR_NOFOLLOW = 0x0001
|
|
|
|
def listxattr(path, *, follow_symlinks=True):
|
|
func = libc.listxattr
|
|
flags = 0
|
|
if isinstance(path, str):
|
|
path = os.fsencode(path)
|
|
if isinstance(path, int):
|
|
func = libc.flistxattr
|
|
elif not follow_symlinks:
|
|
flags = XATTR_NOFOLLOW
|
|
n = _check(func(path, None, 0, flags), path)
|
|
if n == 0:
|
|
return []
|
|
namebuf = create_string_buffer(n)
|
|
n2 = _check(func(path, namebuf, n, flags), path)
|
|
if n2 != n:
|
|
raise Exception('listxattr failed')
|
|
return [os.fsdecode(name) for name in namebuf.raw.split(b'\0')[:-1]]
|
|
|
|
def getxattr(path, name, *, follow_symlinks=True):
|
|
name = os.fsencode(name)
|
|
func = libc.getxattr
|
|
flags = 0
|
|
if isinstance(path, str):
|
|
path = os.fsencode(path)
|
|
if isinstance(path, int):
|
|
func = libc.fgetxattr
|
|
elif not follow_symlinks:
|
|
flags = XATTR_NOFOLLOW
|
|
n = _check(func(path, name, None, 0, 0, flags))
|
|
if n == 0:
|
|
return
|
|
valuebuf = create_string_buffer(n)
|
|
n2 = _check(func(path, name, valuebuf, n, 0, flags), path)
|
|
if n2 != n:
|
|
raise Exception('getxattr failed')
|
|
return valuebuf.raw
|
|
|
|
def setxattr(path, name, value, *, follow_symlinks=True):
|
|
name = os.fsencode(name)
|
|
value = os.fsencode(value)
|
|
func = libc.setxattr
|
|
flags = 0
|
|
if isinstance(path, str):
|
|
path = os.fsencode(path)
|
|
if isinstance(path, int):
|
|
func = libc.fsetxattr
|
|
elif not follow_symlinks:
|
|
flags = XATTR_NOFOLLOW
|
|
_check(func(path, name, value, len(value), 0, flags), path)
|
|
|
|
else:
|
|
raise Exception('Unsupported platform: %s' % sys.platform)
|