acls cleanup (linux), #6908

https://github.com/borgbackup/borg/issues/6908#issuecomment-1224872992

also:
- added some type assertions
- made NULL pointer checks more explicit
- fsencode only called when needed
- structure of code more similar now on all OSes
This commit is contained in:
Thomas Waldmann 2022-09-14 13:09:43 +02:00
parent 6a1c64b0dc
commit 3280603e43
4 changed files with 67 additions and 56 deletions

View File

@ -81,6 +81,7 @@ def setxattr(path, name, value, *, follow_symlinks=False):
def _remove_numeric_id_if_possible(acl):
"""Replace the user/group field with the local uid/gid if possible
"""
assert isinstance(acl, bytes)
entries = []
for entry in safe_decode(acl).split('\n'):
if entry:
@ -98,6 +99,7 @@ def _remove_numeric_id_if_possible(acl):
def _remove_non_numeric_identifier(acl):
"""Remove user and group names from the acl
"""
assert isinstance(acl, bytes)
entries = []
for entry in safe_decode(acl).split('\n'):
if entry:
@ -113,22 +115,20 @@ def _remove_non_numeric_identifier(acl):
def acl_get(path, item, st, numeric_ids=False, fd=None):
cdef acl_t acl = NULL
cdef char *text = NULL
if isinstance(path, str):
path = os.fsencode(path)
try:
if fd is not None:
acl = acl_get_fd_np(fd, ACL_TYPE_EXTENDED)
else:
if isinstance(path, str):
path = os.fsencode(path)
acl = acl_get_link_np(path, ACL_TYPE_EXTENDED)
if acl == NULL:
return
text = acl_to_text(acl, NULL)
if text == NULL:
return
if numeric_ids:
item['acl_extended'] = _remove_non_numeric_identifier(text)
else:
item['acl_extended'] = text
if acl is not NULL:
text = acl_to_text(acl, NULL)
if text is not NULL:
if numeric_ids:
item['acl_extended'] = _remove_non_numeric_identifier(text)
else:
item['acl_extended'] = text
finally:
acl_free(text)
acl_free(acl)
@ -143,13 +143,12 @@ def acl_set(path, item, numeric_ids=False, fd=None):
acl = acl_from_text(acl_text)
else:
acl = acl_from_text(<bytes>_remove_numeric_id_if_possible(acl_text))
if acl == NULL:
return
if isinstance(path, str):
path = os.fsencode(path)
if fd is not None:
acl_set_fd_np(fd, acl, ACL_TYPE_EXTENDED)
else:
acl_set_link_np(path, ACL_TYPE_EXTENDED, acl)
if acl is not NULL:
if fd is not None:
acl_set_fd_np(fd, acl, ACL_TYPE_EXTENDED)
else:
if isinstance(path, str):
path = os.fsencode(path)
acl_set_link_np(path, ACL_TYPE_EXTENDED, acl)
finally:
acl_free(acl)

View File

@ -106,17 +106,19 @@ def setxattr(path, name, value, *, follow_symlinks=False):
cdef _get_acl(p, type, item, attribute, flags, fd=None):
cdef acl_t acl
cdef char *text
if fd is not None:
acl = acl_get_fd_np(fd, type)
else:
acl = acl_get_link_np(p, type)
if acl:
text = acl_to_text_np(acl, NULL, flags)
if text:
item[attribute] = text
acl_free(text)
cdef acl_t acl = NULL
cdef char *text = NULL
try:
if fd is not None:
acl = acl_get_fd_np(fd, type)
else:
acl = acl_get_link_np(p, type)
if acl is not NULL:
text = acl_to_text_np(acl, NULL, flags)
if text is not NULL:
item[attribute] = text
finally:
acl_free(text)
acl_free(acl)
@ -139,26 +141,29 @@ def acl_get(path, item, st, numeric_ids=False, fd=None):
_get_acl(path, ACL_TYPE_DEFAULT, item, 'acl_default', flags, fd=fd)
cdef _set_acl(p, type, item, attribute, numeric_ids=False, fd=None):
cdef acl_t acl
cdef _set_acl(path, type, item, attribute, numeric_ids=False, fd=None):
cdef acl_t acl = NULL
text = item.get(attribute)
if text:
if text is not None:
if numeric_ids and type == ACL_TYPE_NFS4:
text = _nfs4_use_stored_uid_gid(text)
elif numeric_ids and type in(ACL_TYPE_ACCESS, ACL_TYPE_DEFAULT):
elif numeric_ids and type in (ACL_TYPE_ACCESS, ACL_TYPE_DEFAULT):
text = posix_acl_use_stored_uid_gid(text)
acl = acl_from_text(<bytes>text)
if acl:
if fd is not None:
acl_set_fd_np(fd, acl, type)
else:
acl_set_link_np(p, type, acl)
try:
acl = acl_from_text(<bytes> text)
if acl is not NULL:
if fd is not None:
acl_set_fd_np(fd, acl, type)
else:
acl_set_link_np(path, type, acl)
finally:
acl_free(acl)
cdef _nfs4_use_stored_uid_gid(acl):
"""Replace the user/group field with the stored uid/gid
"""
assert isinstance(acl, bytes)
entries = []
for entry in safe_decode(acl).split('\n'):
if entry:

View File

@ -179,6 +179,7 @@ def get_flags(path, st, fd=None):
def acl_use_local_uid_gid(acl):
"""Replace the user/group field with the local uid/gid if possible
"""
assert isinstance(acl, bytes)
entries = []
for entry in safe_decode(acl).split('\n'):
if entry:
@ -194,6 +195,7 @@ def acl_use_local_uid_gid(acl):
cdef acl_append_numeric_ids(acl):
"""Extend the "POSIX 1003.1e draft standard 17" format with an additional uid/gid field
"""
assert isinstance(acl, bytes)
entries = []
for entry in _comment_re.sub('', safe_decode(acl)).split('\n'):
if entry:
@ -210,6 +212,7 @@ cdef acl_append_numeric_ids(acl):
cdef acl_numeric_ids(acl):
"""Replace the "POSIX 1003.1e draft standard 17" user/group field with uid/gid
"""
assert isinstance(acl, bytes)
entries = []
for entry in _comment_re.sub('', safe_decode(acl)).split('\n'):
if entry:
@ -249,22 +252,25 @@ def acl_get(path, item, st, numeric_ids=False, fd=None):
access_acl = acl_get_fd(fd)
else:
access_acl = acl_get_file(path, ACL_TYPE_ACCESS)
if access_acl is not NULL:
access_text = acl_to_text(access_acl, NULL)
if access_text is not NULL:
item['acl_access'] = converter(access_text)
finally:
acl_free(access_text)
acl_free(access_acl)
try:
if stat.S_ISDIR(st.st_mode):
# only directories can have a default ACL. there is no fd-based api to get it.
default_acl = acl_get_file(path, ACL_TYPE_DEFAULT)
if access_acl:
access_text = acl_to_text(access_acl, NULL)
if access_text:
item['acl_access'] = converter(access_text)
if default_acl:
default_text = acl_to_text(default_acl, NULL)
if default_text:
item['acl_default'] = converter(default_text)
if default_acl is not NULL:
default_text = acl_to_text(default_acl, NULL)
if default_text is not NULL:
item['acl_default'] = converter(default_text)
finally:
acl_free(default_text)
acl_free(default_acl)
acl_free(access_text)
acl_free(access_acl)
def acl_set(path, item, numeric_ids=False, fd=None):
@ -282,10 +288,10 @@ def acl_set(path, item, numeric_ids=False, fd=None):
else:
converter = acl_use_local_uid_gid
access_text = item.get('acl_access')
if access_text:
if access_text is not None:
try:
access_acl = acl_from_text(<bytes>converter(access_text))
if access_acl:
access_acl = acl_from_text(<bytes> converter(access_text))
if access_acl is not NULL:
if fd is not None:
acl_set_fd(fd, access_acl)
else:
@ -293,10 +299,10 @@ def acl_set(path, item, numeric_ids=False, fd=None):
finally:
acl_free(access_acl)
default_text = item.get('acl_default')
if default_text:
if default_text is not None:
try:
default_acl = acl_from_text(<bytes>converter(default_text))
if default_acl:
default_acl = acl_from_text(<bytes> converter(default_text))
if default_acl is not NULL:
# only directories can get a default ACL. there is no fd-based api to set it.
acl_set_file(path, ACL_TYPE_DEFAULT, default_acl)
finally:

View File

@ -112,6 +112,7 @@ def group2gid(group, default=None):
def posix_acl_use_stored_uid_gid(acl):
"""Replace the user/group field with the stored uid/gid
"""
assert isinstance(acl, bytes)
from ..helpers import safe_decode, safe_encode
entries = []
for entry in safe_decode(acl).split('\n'):