mirror of https://github.com/borgbackup/borg.git
Merge pull request #310 from ThomasWaldmann/non-ascii-acls
Non ascii acls
This commit is contained in:
commit
79482a1c46
|
@ -516,14 +516,24 @@ def posix_acl_use_stored_uid_gid(acl):
|
|||
"""Replace the user/group field with the stored uid/gid
|
||||
"""
|
||||
entries = []
|
||||
for entry in acl.decode('ascii').split('\n'):
|
||||
for entry in safe_decode(acl).split('\n'):
|
||||
if entry:
|
||||
fields = entry.split(':')
|
||||
if len(fields) == 4:
|
||||
entries.append(':'.join([fields[0], fields[3], fields[2]]))
|
||||
else:
|
||||
entries.append(entry)
|
||||
return ('\n'.join(entries)).encode('ascii')
|
||||
return safe_encode('\n'.join(entries))
|
||||
|
||||
|
||||
def safe_decode(s, coding='utf-8', errors='surrogateescape'):
|
||||
"""decode bytes to str, with round-tripping "invalid" bytes"""
|
||||
return s.decode(coding, errors)
|
||||
|
||||
|
||||
def safe_encode(s, coding='utf-8', errors='surrogateescape'):
|
||||
"""encode str to bytes, with round-tripping "invalid" bytes"""
|
||||
return s.encode(coding, errors)
|
||||
|
||||
|
||||
class Location:
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import os
|
||||
from .helpers import user2uid, group2gid
|
||||
from .helpers import user2uid, group2gid, safe_decode, safe_encode
|
||||
|
||||
API_VERSION = 2
|
||||
|
||||
|
@ -20,7 +20,7 @@ def _remove_numeric_id_if_possible(acl):
|
|||
"""Replace the user/group field with the local uid/gid if possible
|
||||
"""
|
||||
entries = []
|
||||
for entry in acl.decode('ascii').split('\n'):
|
||||
for entry in safe_decode(acl).split('\n'):
|
||||
if entry:
|
||||
fields = entry.split(':')
|
||||
if fields[0] == 'user':
|
||||
|
@ -30,22 +30,22 @@ def _remove_numeric_id_if_possible(acl):
|
|||
if group2gid(fields[2]) is not None:
|
||||
fields[1] = fields[3] = ''
|
||||
entries.append(':'.join(fields))
|
||||
return ('\n'.join(entries)).encode('ascii')
|
||||
return safe_encode('\n'.join(entries))
|
||||
|
||||
|
||||
def _remove_non_numeric_identifier(acl):
|
||||
"""Remove user and group names from the acl
|
||||
"""
|
||||
entries = []
|
||||
for entry in acl.split(b'\n'):
|
||||
for entry in safe_decode(acl).split('\n'):
|
||||
if entry:
|
||||
fields = entry.split(b':')
|
||||
if fields[0] in (b'user', b'group'):
|
||||
fields[2] = b''
|
||||
entries.append(b':'.join(fields))
|
||||
fields = entry.split(':')
|
||||
if fields[0] in ('user', 'group'):
|
||||
fields[2] = ''
|
||||
entries.append(':'.join(fields))
|
||||
else:
|
||||
entries.append(entry)
|
||||
return b'\n'.join(entries)
|
||||
return safe_encode('\n'.join(entries))
|
||||
|
||||
|
||||
def acl_get(path, item, st, numeric_owner=False):
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import os
|
||||
from .helpers import posix_acl_use_stored_uid_gid
|
||||
from .helpers import posix_acl_use_stored_uid_gid, safe_encode, safe_decode
|
||||
|
||||
API_VERSION = 2
|
||||
|
||||
|
@ -78,14 +78,14 @@ cdef _nfs4_use_stored_uid_gid(acl):
|
|||
"""Replace the user/group field with the stored uid/gid
|
||||
"""
|
||||
entries = []
|
||||
for entry in acl.decode('ascii').split('\n'):
|
||||
for entry in safe_decode(acl).split('\n'):
|
||||
if entry:
|
||||
if entry.startswith('user:') or entry.startswith('group:'):
|
||||
fields = entry.split(':')
|
||||
entries.append(':'.join(fields[0], fields[5], *fields[2:-1]))
|
||||
else:
|
||||
entries.append(entry)
|
||||
return ('\n'.join(entries)).encode('ascii')
|
||||
return safe_encode('\n'.join(entries))
|
||||
|
||||
|
||||
def acl_set(path, item, numeric_owner=False):
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import os
|
||||
import re
|
||||
from stat import S_ISLNK
|
||||
from .helpers import posix_acl_use_stored_uid_gid, user2uid, group2gid
|
||||
from .helpers import posix_acl_use_stored_uid_gid, user2uid, group2gid, safe_decode, safe_encode
|
||||
|
||||
API_VERSION = 2
|
||||
|
||||
|
@ -31,22 +31,22 @@ def acl_use_local_uid_gid(acl):
|
|||
"""Replace the user/group field with the local uid/gid if possible
|
||||
"""
|
||||
entries = []
|
||||
for entry in acl.decode('ascii').split('\n'):
|
||||
for entry in safe_decode(acl).split('\n'):
|
||||
if entry:
|
||||
fields = entry.split(':')
|
||||
if fields[0] == 'user' and fields[1]:
|
||||
fields[1] = user2uid(fields[1], fields[3])
|
||||
fields[1] = str(user2uid(fields[1], fields[3]))
|
||||
elif fields[0] == 'group' and fields[1]:
|
||||
fields[1] = group2gid(fields[1], fields[3])
|
||||
entries.append(':'.join(entry.split(':')[:3]))
|
||||
return ('\n'.join(entries)).encode('ascii')
|
||||
fields[1] = str(group2gid(fields[1], fields[3]))
|
||||
entries.append(':'.join(fields[:3]))
|
||||
return safe_encode('\n'.join(entries))
|
||||
|
||||
|
||||
cdef acl_append_numeric_ids(acl):
|
||||
"""Extend the "POSIX 1003.1e draft standard 17" format with an additional uid/gid field
|
||||
"""
|
||||
entries = []
|
||||
for entry in _comment_re.sub('', acl.decode('ascii')).split('\n'):
|
||||
for entry in _comment_re.sub('', safe_decode(acl)).split('\n'):
|
||||
if entry:
|
||||
type, name, permission = entry.split(':')
|
||||
if name and type == 'user':
|
||||
|
@ -55,14 +55,14 @@ cdef acl_append_numeric_ids(acl):
|
|||
entries.append(':'.join([type, name, permission, str(group2gid(name, name))]))
|
||||
else:
|
||||
entries.append(entry)
|
||||
return ('\n'.join(entries)).encode('ascii')
|
||||
return safe_encode('\n'.join(entries))
|
||||
|
||||
|
||||
cdef acl_numeric_ids(acl):
|
||||
"""Replace the "POSIX 1003.1e draft standard 17" user/group field with uid/gid
|
||||
"""
|
||||
entries = []
|
||||
for entry in _comment_re.sub('', acl.decode('ascii')).split('\n'):
|
||||
for entry in _comment_re.sub('', safe_decode(acl)).split('\n'):
|
||||
if entry:
|
||||
type, name, permission = entry.split(':')
|
||||
if name and type == 'user':
|
||||
|
@ -73,7 +73,7 @@ cdef acl_numeric_ids(acl):
|
|||
entries.append(':'.join([type, gid, permission, gid]))
|
||||
else:
|
||||
entries.append(entry)
|
||||
return ('\n'.join(entries)).encode('ascii')
|
||||
return safe_encode('\n'.join(entries))
|
||||
|
||||
|
||||
def acl_get(path, item, st, numeric_owner=False):
|
||||
|
|
|
@ -72,6 +72,42 @@ class PlatformLinuxTestCase(BaseTestCase):
|
|||
self.assert_equal(self.get_acl(self.tmpdir)[b'acl_access'], ACCESS_ACL)
|
||||
self.assert_equal(self.get_acl(self.tmpdir)[b'acl_default'], DEFAULT_ACL)
|
||||
|
||||
def test_non_ascii_acl(self):
|
||||
# Testing non-ascii ACL processing to see whether our code is robust.
|
||||
# I have no idea whether non-ascii ACLs are allowed by the standard,
|
||||
# but in practice they seem to be out there and must not make our code explode.
|
||||
file = tempfile.NamedTemporaryFile()
|
||||
self.assert_equal(self.get_acl(file.name), {})
|
||||
nothing_special = 'user::rw-\ngroup::r--\nmask::rw-\nother::---\n'.encode('ascii')
|
||||
# TODO: can this be tested without having an existing system user übel with uid 666 gid 666?
|
||||
user_entry = 'user:übel:rw-:666'.encode('utf-8')
|
||||
user_entry_numeric = 'user:666:rw-:666'.encode('ascii')
|
||||
group_entry = 'group:übel:rw-:666'.encode('utf-8')
|
||||
group_entry_numeric = 'group:666:rw-:666'.encode('ascii')
|
||||
acl = b'\n'.join([nothing_special, user_entry, group_entry])
|
||||
self.set_acl(file.name, access=acl, numeric_owner=False)
|
||||
acl_access = self.get_acl(file.name, numeric_owner=False)[b'acl_access']
|
||||
self.assert_in(user_entry, acl_access)
|
||||
self.assert_in(group_entry, acl_access)
|
||||
acl_access_numeric = self.get_acl(file.name, numeric_owner=True)[b'acl_access']
|
||||
self.assert_in(user_entry_numeric, acl_access_numeric)
|
||||
self.assert_in(group_entry_numeric, acl_access_numeric)
|
||||
file2 = tempfile.NamedTemporaryFile()
|
||||
self.set_acl(file2.name, access=acl, numeric_owner=True)
|
||||
acl_access = self.get_acl(file2.name, numeric_owner=False)[b'acl_access']
|
||||
self.assert_in(user_entry, acl_access)
|
||||
self.assert_in(group_entry, acl_access)
|
||||
acl_access_numeric = self.get_acl(file.name, numeric_owner=True)[b'acl_access']
|
||||
self.assert_in(user_entry_numeric, acl_access_numeric)
|
||||
self.assert_in(group_entry_numeric, acl_access_numeric)
|
||||
|
||||
def test_utils(self):
|
||||
from ..platform_linux import acl_use_local_uid_gid
|
||||
self.assert_equal(acl_use_local_uid_gid(b'user:nonexistent1234:rw-:1234'), b'user:1234:rw-')
|
||||
self.assert_equal(acl_use_local_uid_gid(b'group:nonexistent1234:rw-:1234'), b'group:1234:rw-')
|
||||
self.assert_equal(acl_use_local_uid_gid(b'user:root:rw-:0'), b'user:0:rw-')
|
||||
self.assert_equal(acl_use_local_uid_gid(b'group:root:rw-:0'), b'group:0:rw-')
|
||||
|
||||
|
||||
@unittest.skipUnless(sys.platform.startswith('darwin'), 'OS X only test')
|
||||
@unittest.skipIf(fakeroot_detected(), 'not compatible with fakeroot')
|
||||
|
|
Loading…
Reference in New Issue