1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2024-12-25 01:06:50 +00:00

more str vs bytes fixing

This commit is contained in:
Thomas Waldmann 2022-05-19 23:12:21 +02:00
parent 8e87f1111b
commit 33444be926
3 changed files with 70 additions and 28 deletions

View file

@ -22,7 +22,7 @@
from ..helpers.passphrase import Passphrase, PasswordRetriesExceeded, PassphraseWrong
from ..helpers import msgpack
from ..helpers.manifest import Manifest
from ..item import Key, EncryptedKey
from ..item import Key, EncryptedKey, want_bytes
from ..platform import SaveFile
from .nonces import NonceManager
@ -250,8 +250,10 @@ def unpack_and_verify_manifest(self, data, force_tam_not_required=False):
return unpacked, False
tam_hmac = tam.get('hmac')
tam_salt = tam.get('salt')
if not isinstance(tam_salt, bytes) or not isinstance(tam_hmac, bytes):
if not isinstance(tam_salt, (bytes, str)) or not isinstance(tam_hmac, (bytes, str)):
raise TAMInvalid()
tam_hmac = want_bytes(tam_hmac) # legacy
tam_salt = want_bytes(tam_salt) # legacy
offset = data.index(tam_hmac)
data[offset:offset + 64] = bytes(64)
tam_key = self._tam_key(tam_salt, context=b'manifest')

View file

@ -205,8 +205,8 @@ def borg1_hardlink_slave(self, item): # legacy
def hardlink_id_from_path(self, path):
"""compute a hardlink id from a path"""
assert isinstance(path, bytes)
return hashlib.sha256(path).digest()
assert isinstance(path, str)
return hashlib.sha256(path.encode('utf-8', errors='surrogateescape')).digest()
def hardlink_id_from_inode(self, *, ino, dev):
"""compute a hardlink id from an inode"""

View file

@ -15,11 +15,11 @@ cdef extern from "_item.c":
API_VERSION = '1.2_01'
def fix_key(data, key):
def fix_key(data, key, *, errors='strict'):
"""if k is a bytes-typed key, migrate key/value to a str-typed key in dict data"""
if isinstance(key, bytes):
value = data.pop(key)
key = key.decode()
key = key.decode('utf-8', errors=errors)
data[key] = value
assert isinstance(key, str)
return key
@ -29,46 +29,77 @@ def fix_str_value(data, key, errors='surrogateescape'):
"""makes sure that data[key] is a str (decode if it is bytes)"""
assert isinstance(key, str) # fix_key must be called first
value = data[key]
if isinstance(value, bytes):
value = value.decode('utf-8', errors=errors)
data[key] = value
assert isinstance(value, str)
value = want_str(value, errors=errors)
data[key] = value
return value
def fix_list_of_str(t):
def fix_bytes_value(data, key):
"""makes sure that data[key] is bytes (encode if it is str)"""
assert isinstance(key, str) # fix_key must be called first
value = data[key]
value = want_bytes(value)
data[key] = value
return value
def fix_list_of_str(v):
"""make sure we have a list of str"""
assert isinstance(t, (tuple, list))
l = [e.decode() if isinstance(e, bytes) else e for e in t]
assert all(isinstance(e, str) for e in l), repr(l)
return l
assert isinstance(v, (tuple, list))
return [want_str(e) for e in v]
def fix_tuple_of_str(t):
def fix_list_of_bytes(v):
"""make sure we have a list of bytes"""
assert isinstance(v, (tuple, list))
return [want_bytes(e) for e in v]
def fix_list_of_chunkentries(v):
"""make sure we have a list of correct chunkentries"""
assert isinstance(v, (tuple, list))
chunks = []
for ce in v:
assert isinstance(ce, (tuple, list))
assert len(ce) == 3 # id, size, csize
assert isinstance(ce[1], int)
assert isinstance(ce[2], int)
ce_fixed = [want_bytes(ce[0]), ce[1], ce[2]] # list!
chunks.append(ce_fixed) # create a list of lists
return chunks
def fix_tuple_of_str(v):
"""make sure we have a tuple of str"""
assert isinstance(t, (tuple, list))
t = tuple(e.decode() if isinstance(e, bytes) else e for e in t)
assert all(isinstance(e, str) for e in t), repr(t)
return t
assert isinstance(v, (tuple, list))
return tuple(want_str(e) for e in v)
def fix_tuple_of_str_and_int(t):
def fix_tuple_of_str_and_int(v):
"""make sure we have a tuple of str"""
assert isinstance(t, (tuple, list))
t = tuple(e.decode() if isinstance(e, bytes) else e for e in t)
assert isinstance(v, (tuple, list))
t = tuple(e.decode() if isinstance(e, bytes) else e for e in v)
assert all(isinstance(e, (str, int)) for e in t), repr(t)
return t
def want_bytes(v):
def want_bytes(v, *, errors='surrogateescape'):
"""we know that we want bytes and the value should be bytes"""
# legacy support: it being str can be caused by msgpack unpack decoding old data that was packed with use_bin_type=False
if isinstance(v, str):
v = v.encode('utf-8', errors='surrogateescape')
v = v.encode('utf-8', errors=errors)
assert isinstance(v, bytes)
return v
def want_str(v, *, errors='surrogateescape'):
"""we know that we want str and the value should be str"""
if isinstance(v, bytes):
v = v.decode('utf-8', errors=errors)
assert isinstance(v, str)
return v
class PropDict:
"""
Manage a dictionary via properties.
@ -349,6 +380,11 @@ class Item(PropDict):
k = fix_key(d, k)
if k in ('path', 'source', 'user', 'group'):
v = fix_str_value(d, k)
if k in ('chunks', 'chunks_healthy'):
v = fix_list_of_chunkentries(v)
if k in ('acl_access', 'acl_default', 'acl_extended', 'acl_nfs4'):
v = fix_bytes_value(d, k)
# TODO: xattrs
self._dict[k] = v
@ -476,6 +512,8 @@ class ArchiveItem(PropDict):
v = fix_tuple_of_str_and_int(v)
if k in ('cmdline', 'recreate_cmdline'):
v = fix_list_of_str(v)
if k == 'items':
v = fix_list_of_bytes(v)
self._dict[k] = v
@ -511,13 +549,15 @@ class ManifestItem(PropDict):
ad = v
assert isinstance(ad, dict)
for ak, av in list(ad.items()):
ak = fix_key(ad, ak)
ak = fix_key(ad, ak, errors='surrogateescape')
assert isinstance(av, dict)
for ik, iv in list(av.items()):
ik = fix_key(av, ik)
if ik == 'id':
fix_bytes_value(av, 'id')
if ik == 'time':
fix_str_value(av, 'time')
assert set(av) == {'id', 'time'}
assert isinstance(av['id'], bytes)
fix_str_value(av, 'time')
if k == 'timestamp':
v = fix_str_value(d, k, 'replace')
if k == 'config':