raise IntegrityError if ro_type is not as expected

This commit is contained in:
Thomas Waldmann 2023-09-16 18:31:46 +02:00
parent cb4676048a
commit 170380c657
No known key found for this signature in database
GPG Key ID: 243ACFA951F78E01
2 changed files with 8 additions and 4 deletions

View File

@ -2,6 +2,7 @@ from struct import Struct
from .constants import * # NOQA
from .helpers import msgpack, workarounds
from .helpers.errors import IntegrityError
from .compress import Compressor, LZ4_COMPRESSOR, get_compressor
# workaround for lost passphrase or key in "authenticated" or "authenticated-blake2" mode
@ -77,7 +78,8 @@ class RepoObj:
meta_encrypted = obj[offs : offs + len_meta_encrypted]
meta_packed = self.key.decrypt(id, meta_encrypted)
meta = msgpack.unpackb(meta_packed)
assert ro_type == ROBJ_DONTCARE or meta["type"] == ro_type
if ro_type != ROBJ_DONTCARE and meta["type"] != ro_type:
raise IntegrityError(f"ro_type expected: {ro_type} got: {meta['type']}")
return meta
def parse(
@ -106,7 +108,8 @@ class RepoObj:
offs += len_meta_encrypted
meta_packed = self.key.decrypt(id, meta_encrypted)
meta_compressed = msgpack.unpackb(meta_packed) # means: before adding more metadata in decompress block
assert ro_type == ROBJ_DONTCARE or meta_compressed["type"] == ro_type
if ro_type != ROBJ_DONTCARE and meta_compressed["type"] != ro_type:
raise IntegrityError(f"ro_type expected: {ro_type} got: {meta_compressed['type']}")
data_encrypted = obj[offs:]
data_compressed = self.key.decrypt(id, data_encrypted) # does not include the type/level bytes
if decompress:

View File

@ -2,6 +2,7 @@ import pytest
from ..constants import ROBJ_FILE_STREAM, ROBJ_MANIFEST, ROBJ_ARCHIVE_META
from ..crypto.key import PlaintextKey
from ..helpers.errors import IntegrityError
from ..repository import Repository
from ..repoobj import RepoObj, RepoObj1
from ..compress import LZ4
@ -113,7 +114,7 @@ def test_spoof_manifest(key):
cdata = repo_objs.format(id, {}, data, ro_type=ROBJ_FILE_STREAM)
# let's assume an attacker somehow managed to replace the manifest with that repo object.
# as borg always give the ro_type it wants to read, this should fail:
with pytest.raises(AssertionError):
with pytest.raises(IntegrityError):
repo_objs.parse(id, cdata, ro_type=ROBJ_MANIFEST)
@ -125,5 +126,5 @@ def test_spoof_archive(key):
cdata = repo_objs.format(id, {}, data, ro_type=ROBJ_FILE_STREAM)
# let's assume an attacker somehow managed to replace an archive with that repo object.
# as borg always give the ro_type it wants to read, this should fail:
with pytest.raises(AssertionError):
with pytest.raises(IntegrityError):
repo_objs.parse(id, cdata, ro_type=ROBJ_ARCHIVE_META)