raise OSError if acl_to_text / acl_from_text returns NULL

Also did a small structural refactors there.
This commit is contained in:
Thomas Waldmann 2024-02-26 20:07:10 +01:00
parent a75945ed0d
commit b3554cdc0f
No known key found for this signature in database
GPG Key ID: 243ACFA951F78E01
3 changed files with 49 additions and 46 deletions

View File

@ -130,7 +130,7 @@ def acl_get(path, item, st, numeric_ids=False, fd=None):
raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path)) raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path))
text = acl_to_text(acl, NULL) text = acl_to_text(acl, NULL)
if text == NULL: if text == NULL:
return raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path))
if numeric_ids: if numeric_ids:
item['acl_extended'] = _remove_non_numeric_identifier(text) item['acl_extended'] = _remove_non_numeric_identifier(text)
else: else:
@ -145,14 +145,14 @@ def acl_set(path, item, numeric_ids=False, fd=None):
acl_text = item.get('acl_extended') acl_text = item.get('acl_extended')
if acl_text is not None: if acl_text is not None:
try: try:
if isinstance(path, str):
path = os.fsencode(path)
if numeric_ids: if numeric_ids:
acl = acl_from_text(acl_text) acl = acl_from_text(acl_text)
else: else:
acl = acl_from_text(<bytes>_remove_numeric_id_if_possible(acl_text)) acl = acl_from_text(<bytes>_remove_numeric_id_if_possible(acl_text))
if acl == NULL: if acl == NULL:
return raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path))
if isinstance(path, str):
path = os.fsencode(path)
if fd is not None: if fd is not None:
if acl_set_fd_np(fd, acl, ACL_TYPE_EXTENDED) == -1: if acl_set_fd_np(fd, acl, ACL_TYPE_EXTENDED) == -1:
raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path)) raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path))

View File

@ -122,23 +122,21 @@ def setxattr(path, name, value, *, follow_symlinks=False):
cdef _get_acl(p, type, item, attribute, flags, fd=None): cdef _get_acl(p, type, item, attribute, flags, fd=None):
cdef acl_t acl = NULL cdef acl_t acl
cdef char *text = NULL cdef char *text
try: if fd is not None:
if fd is not None: acl = acl_get_fd_np(fd, type)
acl = acl_get_fd_np(fd, type)
else:
acl = acl_get_link_np(p, type)
if acl is not NULL:
text = acl_to_text_np(acl, NULL, flags)
if text is not NULL:
item[attribute] = text
finally:
acl_free(text)
acl_free(acl)
else: else:
acl = acl_get_link_np(p, type)
if acl == NULL:
raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(p)) raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(p))
text = acl_to_text_np(acl, NULL, flags)
if text == NULL:
acl_free(acl)
raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(p))
item[attribute] = text
acl_free(text)
acl_free(acl)
def acl_get(path, item, st, numeric_ids=False, fd=None): def acl_get(path, item, st, numeric_ids=False, fd=None):
"""Saves ACL Entries """Saves ACL Entries
@ -168,16 +166,17 @@ cdef _set_acl(path, type, item, attribute, numeric_ids=False, fd=None):
elif numeric_ids and type in (ACL_TYPE_ACCESS, ACL_TYPE_DEFAULT): elif numeric_ids and type in (ACL_TYPE_ACCESS, ACL_TYPE_DEFAULT):
text = posix_acl_use_stored_uid_gid(text) text = posix_acl_use_stored_uid_gid(text)
acl = acl_from_text(<bytes>text) acl = acl_from_text(<bytes>text)
if acl: if acl == NULL:
try: raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path))
if fd is not None: try:
if acl_set_fd_np(fd, acl, type) == -1: if fd is not None:
raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path)) if acl_set_fd_np(fd, acl, type) == -1:
else: raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path))
if acl_set_link_np(path, type, acl) == -1: else:
raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path)) if acl_set_link_np(path, type, acl) == -1:
finally: raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path))
acl_free(acl) finally:
acl_free(acl)
cdef _nfs4_use_stored_uid_gid(acl): cdef _nfs4_use_stored_uid_gid(acl):

View File

@ -261,12 +261,14 @@ def acl_get(path, item, st, numeric_ids=False, fd=None):
raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path)) raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path))
if access_acl: if access_acl:
access_text = acl_to_text(access_acl, NULL) access_text = acl_to_text(access_acl, NULL)
if access_text: if access_text == NULL:
item['acl_access'] = converter(access_text) raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path))
item['acl_access'] = converter(access_text)
if default_acl: if default_acl:
default_text = acl_to_text(default_acl, NULL) default_text = acl_to_text(default_acl, NULL)
if default_text: if default_text == NULL:
item['acl_default'] = converter(default_text) raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path))
item['acl_default'] = converter(default_text)
finally: finally:
acl_free(access_text) acl_free(access_text)
acl_free(access_acl) acl_free(access_acl)
@ -291,24 +293,26 @@ def acl_set(path, item, numeric_ids=False, fd=None):
access_text = item.get('acl_access') access_text = item.get('acl_access')
if access_text is not None: if access_text is not None:
try: try:
access_acl = acl_from_text(<bytes> converter(access_text)) access_acl = acl_from_text(<bytes>converter(access_text))
if access_acl is not NULL: if access_acl == NULL:
if fd is not None: raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path))
if acl_set_fd(fd, access_acl) == -1: if fd is not None:
raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path)) if acl_set_fd(fd, access_acl) == -1:
else: raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path))
if acl_set_file(path, ACL_TYPE_ACCESS, access_acl) == -1: else:
raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path)) if acl_set_file(path, ACL_TYPE_ACCESS, access_acl) == -1:
raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path))
finally: finally:
acl_free(access_acl) acl_free(access_acl)
default_text = item.get('acl_default') default_text = item.get('acl_default')
if default_text is not None: if default_text is not None:
try: try:
default_acl = acl_from_text(<bytes> converter(default_text)) default_acl = acl_from_text(<bytes>converter(default_text))
if default_acl is not NULL: if default_acl == NULL:
# only directories can get a default ACL. there is no fd-based api to set it. raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path))
if acl_set_file(path, ACL_TYPE_DEFAULT, default_acl) == -1: # only directories can get a default ACL. there is no fd-based api to set it.
raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path)) if acl_set_file(path, ACL_TYPE_DEFAULT, default_acl) == -1:
raise OSError(errno.errno, os.strerror(errno.errno), os.fsdecode(path))
finally: finally:
acl_free(default_acl) acl_free(default_acl)