mirror of https://github.com/borgbackup/borg.git
Item implementation + tests
This commit is contained in:
parent
b8303a38bf
commit
c6f5989905
|
@ -0,0 +1,129 @@
|
|||
from .constants import ITEM_KEYS
|
||||
from .helpers import safe_encode, safe_decode, StableDict
|
||||
|
||||
# we want str keys for this code
|
||||
ITEM_KEYS = set(key.decode() for key in ITEM_KEYS)
|
||||
|
||||
|
||||
class Item:
|
||||
"""
|
||||
Item abstraction that deals with validation and the low-level details internally:
|
||||
|
||||
- msgpack gives us a dict with bytes-typed keys - but we do not want to have the ugly
|
||||
bytes-typed keys and the hard-to-type dict item access all over the place (like: item[b'keyname']),
|
||||
so we define properties (and use it like: item.keyname)
|
||||
- msgpack gives us byte-typed values for stuff that should be str, we need to decode/encode them here.
|
||||
- we want to be safe against typos in keys and badly typed values, so we check them.
|
||||
|
||||
Items are created either from msgpack unpacker output, from another dict or
|
||||
built step-by-step by setting attributes.
|
||||
|
||||
If an Item shall be serialized, give as_dict() method output to msgpack packer.
|
||||
"""
|
||||
|
||||
VALID_KEYS = ITEM_KEYS
|
||||
|
||||
def __init__(self, data_dict=None, **kw):
|
||||
if data_dict is None:
|
||||
data = kw
|
||||
elif not isinstance(data_dict, dict):
|
||||
raise TypeError("data_dict must be dict")
|
||||
else:
|
||||
data = data_dict
|
||||
# internally, we want an dict with only str-typed keys
|
||||
_dict = {}
|
||||
for k, v in data.items():
|
||||
if isinstance(k, bytes):
|
||||
k = k.decode()
|
||||
elif not isinstance(k, str):
|
||||
raise TypeError("dict keys must be str or bytes, not %r" % k)
|
||||
_dict[k] = v
|
||||
unknown_keys = set(_dict) - self.VALID_KEYS
|
||||
if unknown_keys:
|
||||
raise ValueError("dict contains unknown keys %s" % ','.join(unknown_keys))
|
||||
self._dict = _dict
|
||||
|
||||
def as_dict(self):
|
||||
"""return the internal dictionary"""
|
||||
return self._dict # XXX use StableDict?
|
||||
|
||||
def _check_key(self, key):
|
||||
"""make sure key is of type str and known"""
|
||||
if not isinstance(key, str):
|
||||
raise TypeError("key must be str")
|
||||
if key not in self.VALID_KEYS:
|
||||
raise ValueError("key '%s' unknown" % key)
|
||||
return key
|
||||
|
||||
def __contains__(self, key):
|
||||
"""do we have this key?"""
|
||||
return self._check_key(key) in self._dict
|
||||
|
||||
def get(self, key, default=None):
|
||||
"""get value for key, return default if key does not exist"""
|
||||
return getattr(self, self._check_key(key), default)
|
||||
|
||||
def _make_property(key, value_type, value_type_name=None, encode=None, decode=None):
|
||||
"""return a property that deals with self._dict[key]:
|
||||
|
||||
- sets the value (checking type and optionally encoding it)
|
||||
- gets the value (optionally decoding it)
|
||||
- deletes the entry from the internal dict
|
||||
- creates reasonable docstring and exceptions / exception messages
|
||||
"""
|
||||
assert isinstance(key, str)
|
||||
if value_type_name is None:
|
||||
value_type_name = value_type.__name__
|
||||
doc = "%s (%s)" % (key, value_type_name)
|
||||
type_error_msg = "%s value must be %s" % (key, value_type_name)
|
||||
attr_error_msg = "attribute %s not found" % key
|
||||
|
||||
def _get(self):
|
||||
try:
|
||||
value = self._dict[key]
|
||||
except KeyError:
|
||||
raise AttributeError(attr_error_msg) from None
|
||||
if decode is not None:
|
||||
value = decode(value)
|
||||
return value
|
||||
|
||||
def _set(self, value):
|
||||
if not isinstance(value, value_type):
|
||||
raise TypeError(type_error_msg)
|
||||
if encode is not None:
|
||||
value = encode(value)
|
||||
self._dict[key] = value
|
||||
|
||||
def _del(self):
|
||||
try:
|
||||
del self._dict[key]
|
||||
except KeyError:
|
||||
raise AttributeError(attr_error_msg) from None
|
||||
|
||||
return property(_get, _set, _del, doc=doc)
|
||||
|
||||
# properties statically defined, so that IDEs can know their names:
|
||||
|
||||
path = _make_property('path', str, 'surrogate-escaped str', encode=safe_encode, decode=safe_decode)
|
||||
source = _make_property('source', str, 'surrogate-escaped str', encode=safe_encode, decode=safe_decode)
|
||||
user = _make_property('user', str, 'surrogate-escaped str', encode=safe_encode, decode=safe_decode)
|
||||
group = _make_property('group', str, 'surrogate-escaped str', encode=safe_encode, decode=safe_decode)
|
||||
acl_access = _make_property('acl_access', str, 'surrogate-escaped str', encode=safe_encode, decode=safe_decode)
|
||||
acl_default = _make_property('acl_default', str, 'surrogate-escaped str', encode=safe_encode, decode=safe_decode)
|
||||
acl_extended = _make_property('acl_extended', str, 'surrogate-escaped str', encode=safe_encode, decode=safe_decode)
|
||||
acl_nfs4 = _make_property('acl_nfs4', str, 'surrogate-escaped str', encode=safe_encode, decode=safe_decode)
|
||||
|
||||
mode = _make_property('mode', int)
|
||||
uid = _make_property('uid', int)
|
||||
gid = _make_property('gid', int)
|
||||
atime = _make_property('atime', int)
|
||||
ctime = _make_property('ctime', int)
|
||||
mtime = _make_property('mtime', int)
|
||||
rdev = _make_property('rdev', int)
|
||||
bsdflags = _make_property('bsdflags', int)
|
||||
|
||||
hardlink_master = _make_property('hardlink_master', bool)
|
||||
|
||||
chunks = _make_property('chunks', list)
|
||||
|
||||
xattrs = _make_property('xattrs', StableDict)
|
|
@ -0,0 +1,106 @@
|
|||
import pytest
|
||||
|
||||
from ..item import Item
|
||||
from ..helpers import StableDict
|
||||
|
||||
|
||||
def test_item_empty():
|
||||
item = Item()
|
||||
|
||||
assert item.as_dict() == {}
|
||||
|
||||
assert 'path' not in item
|
||||
with pytest.raises(ValueError):
|
||||
'invalid-key' in item
|
||||
with pytest.raises(TypeError):
|
||||
b'path' in item
|
||||
with pytest.raises(TypeError):
|
||||
42 in item
|
||||
|
||||
assert item.get('mode') is None
|
||||
assert item.get('mode', 0o666) == 0o666
|
||||
with pytest.raises(ValueError):
|
||||
item.get('invalid-key')
|
||||
with pytest.raises(TypeError):
|
||||
item.get(b'mode')
|
||||
with pytest.raises(TypeError):
|
||||
item.get(42)
|
||||
|
||||
with pytest.raises(AttributeError):
|
||||
item.path
|
||||
|
||||
with pytest.raises(AttributeError):
|
||||
del item.path
|
||||
|
||||
|
||||
def test_item_from_dict():
|
||||
# does not matter whether we get str or bytes keys
|
||||
item = Item({b'path': b'/a/b/c', b'mode': 0o666})
|
||||
assert item.path == '/a/b/c'
|
||||
assert item.mode == 0o666
|
||||
assert 'path' in item
|
||||
|
||||
item = Item({'path': b'/a/b/c', 'mode': 0o666})
|
||||
assert item.path == '/a/b/c'
|
||||
assert item.mode == 0o666
|
||||
assert 'mode' in item
|
||||
|
||||
|
||||
def test_item_from_kw():
|
||||
item = Item(path=b'/a/b/c', mode=0o666)
|
||||
assert item.path == '/a/b/c'
|
||||
assert item.mode == 0o666
|
||||
|
||||
|
||||
def test_item_int_property():
|
||||
item = Item()
|
||||
item.mode = 0o666
|
||||
assert item.mode == 0o666
|
||||
assert item.as_dict() == {'mode': 0o666}
|
||||
del item.mode
|
||||
assert item.as_dict() == {}
|
||||
with pytest.raises(TypeError):
|
||||
item.mode = "invalid"
|
||||
|
||||
|
||||
def test_item_se_str_property():
|
||||
# start simple
|
||||
item = Item()
|
||||
item.path = '/a/b/c'
|
||||
assert item.path == '/a/b/c'
|
||||
assert item.as_dict() == {'path': b'/a/b/c'}
|
||||
del item.path
|
||||
assert item.as_dict() == {}
|
||||
with pytest.raises(TypeError):
|
||||
item.path = 42
|
||||
|
||||
# non-utf-8 path, needing surrogate-escaping for latin-1 u-umlaut
|
||||
item = Item({'path': b'/a/\xfc/c'})
|
||||
assert item.path == '/a/\udcfc/c' # getting a surrogate-escaped representation
|
||||
assert item.as_dict() == {'path': b'/a/\xfc/c'}
|
||||
del item.path
|
||||
assert 'path' not in item
|
||||
item.path = '/a/\udcfc/c' # setting using a surrogate-escaped representation
|
||||
assert item.as_dict() == {'path': b'/a/\xfc/c'}
|
||||
|
||||
|
||||
def test_item_list_property():
|
||||
item = Item()
|
||||
item.chunks = []
|
||||
assert item.chunks == []
|
||||
item.chunks.append(0)
|
||||
assert item.chunks == [0]
|
||||
item.chunks.append(1)
|
||||
assert item.chunks == [0, 1]
|
||||
assert item.as_dict() == {'chunks': [0, 1]}
|
||||
|
||||
|
||||
def test_item_dict_property():
|
||||
item = Item()
|
||||
item.xattrs = StableDict()
|
||||
assert item.xattrs == StableDict()
|
||||
item.xattrs['foo'] = 'bar'
|
||||
assert item.xattrs['foo'] == 'bar'
|
||||
item.xattrs['bar'] = 'baz'
|
||||
assert item.xattrs == StableDict({'foo': 'bar', 'bar': 'baz'})
|
||||
assert item.as_dict() == {'xattrs': {'foo': 'bar', 'bar': 'baz'}}
|
Loading…
Reference in New Issue