Experimental Linux ACL support (#66)

This commit is contained in:
Jonas Borgström 2014-04-13 20:26:46 +02:00
parent ebb597193d
commit 0ad5253d84
7 changed files with 256 additions and 19 deletions

View File

@ -8,10 +8,10 @@ Version 0.13
(feature release, released on X)
- Experimental Linux ACL support (#66)
- Added support for backup and restore of BSDFlags (OSX, FreeBSD) (#56)
- Fix bug where xattrs on symlinks were not correctly restored
Version 0.12
------------

View File

@ -14,6 +14,7 @@ import sys
import time
from io import BytesIO
from attic import xattr
from attic.platform import acl_get, acl_set
from attic.chunker import chunkify
from attic.hashindex import ChunkIndex
from attic.helpers import Error, uid2user, user2uid, gid2group, group2gid, \
@ -294,8 +295,8 @@ class Archive:
if not self.numeric_owner:
uid = user2uid(item[b'user'])
gid = group2gid(item[b'group'])
uid = uid or item[b'uid']
gid = gid or item[b'gid']
uid = item[b'uid'] if uid is None else uid
gid = item[b'gid'] if gid is None else gid
# This code is a bit of a mess due to os specific differences
try:
if fd:
@ -316,6 +317,7 @@ class Archive:
os.utime(path, None, ns=(item[b'mtime'], item[b'mtime']), follow_symlinks=False)
elif not symlink:
os.utime(path, (item[b'mtime'] / 10**9, item[b'mtime'] / 10**9))
acl_set(path, item, self.numeric_owner)
# Only available on OS X and FreeBSD
if has_lchflags and b'bsdflags' in item:
try:
@ -350,6 +352,7 @@ class Archive:
item[b'xattrs'] = StableDict(xattrs)
if has_lchflags and st.st_flags:
item[b'bsdflags'] = st.st_flags
item[b'acl'] = acl_get(path, item, self.numeric_owner)
return item
def process_item(self, path, st):

View File

@ -61,9 +61,11 @@ class UpgradableLock:
def check_extension_modules():
import attic.platform
if (attic.hashindex.API_VERSION != 1 or
attic.chunker.API_VERSION != 1 or
attic.crypto.API_VERSION != 1):
attic.crypto.API_VERSION != 1 or
attic.platform.API_VERSION != 1):
raise ExtensionModuleError
@ -328,35 +330,64 @@ def memoize(function):
@memoize
def uid2user(uid):
def uid2user(uid, default=None):
try:
return pwd.getpwuid(uid).pw_name
except KeyError:
return None
return default
@memoize
def user2uid(user):
def user2uid(user, default=None):
try:
return user and pwd.getpwnam(user).pw_uid
except KeyError:
return None
return default
@memoize
def gid2group(gid):
def gid2group(gid, default=None):
try:
return grp.getgrgid(gid).gr_name
except KeyError:
return None
return default
@memoize
def group2gid(group):
def group2gid(group, default=None):
try:
return group and grp.getgrnam(group).gr_gid
except KeyError:
return None
return default
def acl_use_local_uid_gid(acl):
"""Replace the user/group field with the local uid/gid if possible
"""
entries = []
for entry in acl.decode('ascii').split('\n'):
if entry:
fields = entry.split(':')
if fields[0] == 'user' and fields[1]:
fields[1] = user2uid(fields[1], fields[3])
elif fields[0] == 'group' and fields[1]:
fields[1] = group2gid(fields[1], fields[3])
entries.append(':'.join(entry.split(':')[:3]))
return ('\n'.join(entries)).encode('ascii')
def acl_use_stored_uid_gid(acl):
"""Replace the user/group field with the stored uid/gid
"""
entries = []
for entry in acl.decode('ascii').split('\n'):
if entry:
fields = entry.split(':')
if len(fields) == 4:
entries.append(':'.join([fields[0], fields[3], fields[2]]))
else:
entries.append(entry)
return ('\n'.join(entries)).encode('ascii')
class Location:

13
attic/platform.py Normal file
View File

@ -0,0 +1,13 @@
import os
platform = os.uname().sysname
if platform == 'Linux':
from attic.platform_linux import acl_get, acl_set, API_VERSION
else:
API_VERSION = 1
def acl_get(path, item, numeric_owner=False):
pass
def acl_set(path, item, numeric_owner=False):
pass

118
attic/platform_linux.pyx Normal file
View File

@ -0,0 +1,118 @@
import os
from attic.helpers import acl_use_local_uid_gid, acl_use_stored_uid_gid, user2uid, group2gid
API_VERSION = 1
cdef extern from "sys/types.h":
int ACL_TYPE_ACCESS
int ACL_TYPE_DEFAULT
cdef extern from "sys/acl.h":
ctypedef struct _acl_t:
pass
ctypedef _acl_t *acl_t
int acl_free(void *obj)
acl_t acl_get_file(const char *path, int type)
acl_t acl_set_file(const char *path, int type, acl_t acl)
acl_t acl_from_text(const char *buf)
char *acl_to_text(acl_t acl, ssize_t *len)
cdef extern from "acl/libacl.h":
int acl_extended_file_nofollow(const char *path)
def acl_append_numeric_ids(acl):
"""Extend the "POSIX 1003.1e draft standard 17" format with an additional uid/gid field
"""
entries = []
for entry in acl.decode('ascii').split('\n'):
if entry:
type, name, permission = entry.split(':')
if name and type == 'user':
entries.append(':'.join([type, name, permission, str(user2uid(name, name))]))
elif name and type == 'group':
entries.append(':'.join([type, name, permission, str(group2gid(name, name))]))
else:
entries.append(entry)
return ('\n'.join(entries)).encode('ascii')
def acl_numeric_ids(acl):
"""Replace the "POSIX 1003.1e draft standard 17" user/group field with uid/gid
"""
entries = []
for entry in acl.decode('ascii').split('\n'):
if entry:
type, name, permission = entry.split(':')
if name and type == 'user':
entries.append(':'.join([type, str(user2uid(name, name)), permission]))
elif name and type == 'group':
entries.append(':'.join([type, str(group2gid(name, name)), permission]))
else:
entries.append(entry)
return ('\n'.join(entries)).encode('ascii')
def acl_get(path, item, numeric_owner=False):
"""Saves ACL Entries
If `numeric_owner` is True the user/group field is not preserved only uid/gid
"""
cdef acl_t default_acl = NULL
cdef acl_t access_acl = NULL
cdef char *default_text = NULL
cdef char *access_text = NULL
if acl_extended_file_nofollow(<bytes>os.fsencode(path)) <= 0:
return
if numeric_owner:
converter = acl_numeric_ids
else:
converter = acl_append_numeric_ids
try:
access_acl = acl_get_file(<bytes>os.fsencode(path), ACL_TYPE_ACCESS)
if access_acl:
access_text = acl_to_text(access_acl, NULL)
if access_text:
item[b'acl_access'] = acl_append_numeric_ids(access_text)
default_acl = acl_get_file(<bytes>os.fsencode(path), ACL_TYPE_DEFAULT)
if default_acl:
default_text = acl_to_text(default_acl, NULL)
if default_text:
item[b'acl_default'] = acl_append_numeric_ids(default_text)
finally:
acl_free(default_text)
acl_free(default_acl)
acl_free(access_text)
acl_free(access_acl)
def acl_set(path, item, numeric_owner=False):
"""Restore ACL Entries
If `numeric_owner` is True the stored uid/gid is used instead
of the user/group names
"""
cdef acl_t access_acl = NULL
cdef acl_t default_acl = NULL
if numeric_owner:
converter = acl_use_stored_uid_gid
else:
converter = acl_use_local_uid_gid
access_text = item.get(b'acl_access')
default_text = item.get(b'acl_default')
if access_text:
try:
access_acl = acl_from_text(<bytes>converter(access_text))
if access_acl:
acl_set_file(<bytes>os.fsencode(path), ACL_TYPE_ACCESS, access_acl)
finally:
acl_free(access_acl)
if default_text:
try:
default_acl = acl_from_text(<bytes>converter(default_text))
if default_acl:
acl_set_file(<bytes>os.fsencode(path), ACL_TYPE_DEFAULT, default_acl)
finally:
acl_free(default_acl)

View File

@ -0,0 +1,65 @@
import os
import shutil
import tempfile
import unittest
from attic.platform import acl_get, acl_set
from attic.testsuite import AtticTestCase
ACCESS_ACL = """
user::rw-
user:root:rw-:0
user:9999:r--:9999
group::r--
group:root:r--:0
group:9999:r--:9999
mask::rw-
other::r--
""".strip().encode('ascii')
DEFAULT_ACL = """
user::rw-
user:root:r--:0
user:8888:r--:8888
group::r--
group:root:r--:0
group:8888:r--:8888
mask::rw-
other::r--
""".strip().encode('ascii')
def fakeroot_detected():
return 'FAKEROOTKEY' in os.environ
@unittest.skipIf(fakeroot_detected(), 'not compatible with fakeroot')
class PlatformLinuxTestCase(AtticTestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmpdir)
def get_acl(self, path):
item = {}
acl_get(path, item)
return item
def set_acl(self, path, access=None, default=None):
item = {b'acl_access': access, b'acl_default': default}
acl_set(path, item)
def test_access_acl(self):
file = tempfile.NamedTemporaryFile()
self.assert_equal(self.get_acl(file.name), {})
self.set_acl(file.name, access=ACCESS_ACL)
self.assert_equal(self.get_acl(file.name)[b'acl_access'], ACCESS_ACL)
def test_default_acl(self):
self.assert_equal(self.get_acl(self.tmpdir), {})
self.set_acl(self.tmpdir, access=ACCESS_ACL, default=DEFAULT_ACL)
self.assert_equal(self.get_acl(self.tmpdir)[b'acl_access'], ACCESS_ACL)
self.assert_equal(self.get_acl(self.tmpdir)[b'acl_default'], DEFAULT_ACL)

View File

@ -9,6 +9,7 @@ versioneer.versionfile_build = 'attic/_version.py'
versioneer.tag_prefix = ''
versioneer.parentdir_prefix = 'Attic-' # dirname like 'myproject-1.2.0'
platform = os.uname().sysname
min_python = (3, 2)
if sys.version_info < min_python:
@ -23,6 +24,7 @@ except ImportError:
crypto_source = 'attic/crypto.pyx'
chunker_source = 'attic/chunker.pyx'
hashindex_source = 'attic/hashindex.pyx'
platform_linux_source = 'attic/platform_linux.pyx'
try:
from Cython.Distutils import build_ext
@ -36,7 +38,7 @@ try:
versioneer.cmd_sdist.__init__(self, *args, **kwargs)
def make_distribution(self):
self.filelist.extend(['attic/crypto.c', 'attic/chunker.c', 'attic/_chunker.c', 'attic/hashindex.c', 'attic/_hashindex.c'])
self.filelist.extend(['attic/crypto.c', 'attic/chunker.c', 'attic/_chunker.c', 'attic/hashindex.c', 'attic/_hashindex.c', 'attic/platform_linux.c'])
super(Sdist, self).make_distribution()
except ImportError:
@ -47,8 +49,9 @@ except ImportError:
crypto_source = crypto_source.replace('.pyx', '.c')
chunker_source = chunker_source.replace('.pyx', '.c')
hashindex_source = hashindex_source.replace('.pyx', '.c')
acl_source = platform_linux_source.replace('.pyx', '.c')
from distutils.command.build_ext import build_ext
if not all(os.path.exists(path) for path in [crypto_source, chunker_source, hashindex_source]):
if not all(os.path.exists(path) for path in [crypto_source, chunker_source, hashindex_source, acl_source]):
raise ImportError('The GIT version of Attic needs Cython. Install Cython or use a released version')
@ -77,6 +80,14 @@ with open('README.rst', 'r') as fd:
cmdclass = versioneer.get_cmdclass()
cmdclass.update({'build_ext': build_ext, 'sdist': Sdist})
ext_modules = [
Extension('attic.crypto', [crypto_source], libraries=['crypto'], include_dirs=include_dirs, library_dirs=library_dirs),
Extension('attic.chunker', [chunker_source]),
Extension('attic.hashindex', [hashindex_source])
]
if platform == 'Linux':
ext_modules.append(Extension('attic.platform_linux', [platform_linux_source], libraries=['acl']))
setup(
name='Attic',
version=versioneer.get_version(),
@ -102,10 +113,6 @@ setup(
packages=['attic', 'attic.testsuite'],
scripts=['scripts/attic'],
cmdclass=cmdclass,
ext_modules=[
Extension('attic.crypto', [crypto_source], libraries=['crypto'], include_dirs=include_dirs, library_dirs=library_dirs),
Extension('attic.chunker', [chunker_source]),
Extension('attic.hashindex', [hashindex_source])
],
ext_modules=ext_modules,
install_requires=['msgpack-python']
)