mirror of https://github.com/borgbackup/borg.git
Merge pull request #981 from ThomasWaldmann/items-refactor
Items refactoring
This commit is contained in:
commit
1a0277021c
|
@ -681,11 +681,15 @@ def posix_acl_use_stored_uid_gid(acl):
|
|||
|
||||
def safe_decode(s, coding='utf-8', errors='surrogateescape'):
|
||||
"""decode bytes to str, with round-tripping "invalid" bytes"""
|
||||
if s is None:
|
||||
return None
|
||||
return s.decode(coding, errors)
|
||||
|
||||
|
||||
def safe_encode(s, coding='utf-8', errors='surrogateescape'):
|
||||
"""encode str to bytes, with round-tripping "invalid" bytes"""
|
||||
if s is None:
|
||||
return None
|
||||
return s.encode(coding, errors)
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,141 @@
|
|||
from .constants import ITEM_KEYS
|
||||
from .helpers import safe_encode, safe_decode, bigint_to_int, int_to_bigint, StableDict
|
||||
|
||||
|
||||
class PropDict:
|
||||
"""
|
||||
Manage a dictionary via properties.
|
||||
|
||||
- initialization by giving a dict or kw args
|
||||
- on initialization, normalize dict keys to be str type
|
||||
- access dict via properties, like: x.key_name
|
||||
- membership check via: 'key_name' in x
|
||||
- optionally, encode when setting a value
|
||||
- optionally, decode when getting a value
|
||||
- be safe against typos in key names: check against VALID_KEYS
|
||||
- when setting a value: check type of value
|
||||
"""
|
||||
VALID_KEYS = None # override with <set of str> in child class
|
||||
|
||||
__slots__ = ("_dict", ) # avoid setting attributes not supported by properties
|
||||
|
||||
def __init__(self, data_dict=None, **kw):
|
||||
if data_dict is None:
|
||||
data = kw
|
||||
elif not isinstance(data_dict, dict):
|
||||
raise TypeError("data_dict must be dict")
|
||||
else:
|
||||
data = data_dict
|
||||
# internally, we want an dict with only str-typed keys
|
||||
_dict = {}
|
||||
for k, v in data.items():
|
||||
if isinstance(k, bytes):
|
||||
k = k.decode()
|
||||
elif not isinstance(k, str):
|
||||
raise TypeError("dict keys must be str or bytes, not %r" % k)
|
||||
_dict[k] = v
|
||||
unknown_keys = set(_dict) - self.VALID_KEYS
|
||||
if unknown_keys:
|
||||
raise ValueError("dict contains unknown keys %s" % ','.join(unknown_keys))
|
||||
self._dict = _dict
|
||||
|
||||
def as_dict(self):
|
||||
"""return the internal dictionary"""
|
||||
return StableDict(self._dict)
|
||||
|
||||
def _check_key(self, key):
|
||||
"""make sure key is of type str and known"""
|
||||
if not isinstance(key, str):
|
||||
raise TypeError("key must be str")
|
||||
if key not in self.VALID_KEYS:
|
||||
raise ValueError("key '%s' is not a valid key" % key)
|
||||
return key
|
||||
|
||||
def __contains__(self, key):
|
||||
"""do we have this key?"""
|
||||
return self._check_key(key) in self._dict
|
||||
|
||||
def get(self, key, default=None):
|
||||
"""get value for key, return default if key does not exist"""
|
||||
return getattr(self, self._check_key(key), default)
|
||||
|
||||
@staticmethod
|
||||
def _make_property(key, value_type, value_type_name=None, encode=None, decode=None):
|
||||
"""return a property that deals with self._dict[key]"""
|
||||
assert isinstance(key, str)
|
||||
if value_type_name is None:
|
||||
value_type_name = value_type.__name__
|
||||
doc = "%s (%s)" % (key, value_type_name)
|
||||
type_error_msg = "%s value must be %s" % (key, value_type_name)
|
||||
attr_error_msg = "attribute %s not found" % key
|
||||
|
||||
def _get(self):
|
||||
try:
|
||||
value = self._dict[key]
|
||||
except KeyError:
|
||||
raise AttributeError(attr_error_msg) from None
|
||||
if decode is not None:
|
||||
value = decode(value)
|
||||
return value
|
||||
|
||||
def _set(self, value):
|
||||
if not isinstance(value, value_type):
|
||||
raise TypeError(type_error_msg)
|
||||
if encode is not None:
|
||||
value = encode(value)
|
||||
self._dict[key] = value
|
||||
|
||||
def _del(self):
|
||||
try:
|
||||
del self._dict[key]
|
||||
except KeyError:
|
||||
raise AttributeError(attr_error_msg) from None
|
||||
|
||||
return property(_get, _set, _del, doc=doc)
|
||||
|
||||
|
||||
class Item(PropDict):
|
||||
"""
|
||||
Item abstraction that deals with validation and the low-level details internally:
|
||||
|
||||
Items are created either from msgpack unpacker output, from another dict, from kwargs or
|
||||
built step-by-step by setting attributes.
|
||||
|
||||
msgpack gives us a dict with bytes-typed keys, just give it to Item(d) and use item.key_name later.
|
||||
msgpack gives us byte-typed values for stuff that should be str, we automatically decode when getting
|
||||
such a property and encode when setting it.
|
||||
|
||||
If an Item shall be serialized, give as_dict() method output to msgpack packer.
|
||||
"""
|
||||
|
||||
VALID_KEYS = set(key.decode() for key in ITEM_KEYS) # we want str-typed keys
|
||||
|
||||
__slots__ = ("_dict", ) # avoid setting attributes not supported by properties
|
||||
|
||||
# properties statically defined, so that IDEs can know their names:
|
||||
|
||||
path = PropDict._make_property('path', str, 'surrogate-escaped str', encode=safe_encode, decode=safe_decode)
|
||||
source = PropDict._make_property('source', str, 'surrogate-escaped str', encode=safe_encode, decode=safe_decode)
|
||||
acl_access = PropDict._make_property('acl_access', str, 'surrogate-escaped str', encode=safe_encode, decode=safe_decode)
|
||||
acl_default = PropDict._make_property('acl_default', str, 'surrogate-escaped str', encode=safe_encode, decode=safe_decode)
|
||||
acl_extended = PropDict._make_property('acl_extended', str, 'surrogate-escaped str', encode=safe_encode, decode=safe_decode)
|
||||
acl_nfs4 = PropDict._make_property('acl_nfs4', str, 'surrogate-escaped str', encode=safe_encode, decode=safe_decode)
|
||||
|
||||
user = PropDict._make_property('user', (str, type(None)), 'surrogate-escaped str or None', encode=safe_encode, decode=safe_decode)
|
||||
group = PropDict._make_property('group', (str, type(None)), 'surrogate-escaped str or None', encode=safe_encode, decode=safe_decode)
|
||||
|
||||
mode = PropDict._make_property('mode', int)
|
||||
uid = PropDict._make_property('uid', int)
|
||||
gid = PropDict._make_property('gid', int)
|
||||
rdev = PropDict._make_property('rdev', int)
|
||||
bsdflags = PropDict._make_property('bsdflags', int)
|
||||
|
||||
atime = PropDict._make_property('atime', int, 'bigint', encode=int_to_bigint, decode=bigint_to_int)
|
||||
ctime = PropDict._make_property('ctime', int, 'bigint', encode=int_to_bigint, decode=bigint_to_int)
|
||||
mtime = PropDict._make_property('mtime', int, 'bigint', encode=int_to_bigint, decode=bigint_to_int)
|
||||
|
||||
hardlink_master = PropDict._make_property('hardlink_master', bool)
|
||||
|
||||
chunks = PropDict._make_property('chunks', list)
|
||||
|
||||
xattrs = PropDict._make_property('xattrs', StableDict)
|
|
@ -0,0 +1,147 @@
|
|||
import pytest
|
||||
|
||||
from ..item import Item
|
||||
from ..helpers import StableDict
|
||||
|
||||
|
||||
def test_item_empty():
|
||||
item = Item()
|
||||
|
||||
assert item.as_dict() == {}
|
||||
|
||||
assert 'path' not in item
|
||||
with pytest.raises(ValueError):
|
||||
'invalid-key' in item
|
||||
with pytest.raises(TypeError):
|
||||
b'path' in item
|
||||
with pytest.raises(TypeError):
|
||||
42 in item
|
||||
|
||||
assert item.get('mode') is None
|
||||
assert item.get('mode', 0o666) == 0o666
|
||||
with pytest.raises(ValueError):
|
||||
item.get('invalid-key')
|
||||
with pytest.raises(TypeError):
|
||||
item.get(b'mode')
|
||||
with pytest.raises(TypeError):
|
||||
item.get(42)
|
||||
|
||||
with pytest.raises(AttributeError):
|
||||
item.path
|
||||
|
||||
with pytest.raises(AttributeError):
|
||||
del item.path
|
||||
|
||||
|
||||
def test_item_from_dict():
|
||||
# does not matter whether we get str or bytes keys
|
||||
item = Item({b'path': b'/a/b/c', b'mode': 0o666})
|
||||
assert item.path == '/a/b/c'
|
||||
assert item.mode == 0o666
|
||||
assert 'path' in item
|
||||
|
||||
# does not matter whether we get str or bytes keys
|
||||
item = Item({'path': b'/a/b/c', 'mode': 0o666})
|
||||
assert item.path == '/a/b/c'
|
||||
assert item.mode == 0o666
|
||||
assert 'mode' in item
|
||||
|
||||
# invalid - no dict
|
||||
with pytest.raises(TypeError):
|
||||
Item(42)
|
||||
|
||||
# invalid - no bytes/str key
|
||||
with pytest.raises(TypeError):
|
||||
Item({42: 23})
|
||||
|
||||
# invalid - unknown key
|
||||
with pytest.raises(ValueError):
|
||||
Item({'foobar': 'baz'})
|
||||
|
||||
|
||||
def test_item_from_kw():
|
||||
item = Item(path=b'/a/b/c', mode=0o666)
|
||||
assert item.path == '/a/b/c'
|
||||
assert item.mode == 0o666
|
||||
|
||||
|
||||
def test_item_int_property():
|
||||
item = Item()
|
||||
item.mode = 0o666
|
||||
assert item.mode == 0o666
|
||||
assert item.as_dict() == {'mode': 0o666}
|
||||
del item.mode
|
||||
assert item.as_dict() == {}
|
||||
with pytest.raises(TypeError):
|
||||
item.mode = "invalid"
|
||||
|
||||
|
||||
def test_item_bigint_property():
|
||||
item = Item()
|
||||
small, big = 42, 2 ** 65
|
||||
item.atime = small
|
||||
assert item.atime == small
|
||||
assert item.as_dict() == {'atime': small}
|
||||
item.atime = big
|
||||
assert item.atime == big
|
||||
assert item.as_dict() == {'atime': b'\0' * 8 + b'\x02'}
|
||||
|
||||
|
||||
def test_item_user_group_none():
|
||||
item = Item()
|
||||
item.user = None
|
||||
assert item.user is None
|
||||
item.group = None
|
||||
assert item.group is None
|
||||
|
||||
|
||||
def test_item_se_str_property():
|
||||
# start simple
|
||||
item = Item()
|
||||
item.path = '/a/b/c'
|
||||
assert item.path == '/a/b/c'
|
||||
assert item.as_dict() == {'path': b'/a/b/c'}
|
||||
del item.path
|
||||
assert item.as_dict() == {}
|
||||
with pytest.raises(TypeError):
|
||||
item.path = 42
|
||||
|
||||
# non-utf-8 path, needing surrogate-escaping for latin-1 u-umlaut
|
||||
item = Item({'path': b'/a/\xfc/c'})
|
||||
assert item.path == '/a/\udcfc/c' # getting a surrogate-escaped representation
|
||||
assert item.as_dict() == {'path': b'/a/\xfc/c'}
|
||||
del item.path
|
||||
assert 'path' not in item
|
||||
item.path = '/a/\udcfc/c' # setting using a surrogate-escaped representation
|
||||
assert item.as_dict() == {'path': b'/a/\xfc/c'}
|
||||
|
||||
|
||||
def test_item_list_property():
|
||||
item = Item()
|
||||
item.chunks = []
|
||||
assert item.chunks == []
|
||||
item.chunks.append(0)
|
||||
assert item.chunks == [0]
|
||||
item.chunks.append(1)
|
||||
assert item.chunks == [0, 1]
|
||||
assert item.as_dict() == {'chunks': [0, 1]}
|
||||
|
||||
|
||||
def test_item_dict_property():
|
||||
item = Item()
|
||||
item.xattrs = StableDict()
|
||||
assert item.xattrs == StableDict()
|
||||
item.xattrs['foo'] = 'bar'
|
||||
assert item.xattrs['foo'] == 'bar'
|
||||
item.xattrs['bar'] = 'baz'
|
||||
assert item.xattrs == StableDict({'foo': 'bar', 'bar': 'baz'})
|
||||
assert item.as_dict() == {'xattrs': {'foo': 'bar', 'bar': 'baz'}}
|
||||
|
||||
|
||||
def test_unknown_property():
|
||||
# we do not want the user to be able to set unknown attributes -
|
||||
# they won't get into the .as_dict() result dictionary.
|
||||
# also they might be just typos of known attributes.
|
||||
item = Item()
|
||||
with pytest.raises(AttributeError):
|
||||
item.unknown_attribute = None
|
Loading…
Reference in New Issue