1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2025-02-25 07:23:28 +00:00

Merge pull request #7108 from pgerber/dotdot

Sanitize paths during archive creation and extraction
This commit is contained in:
TW 2023-06-10 14:38:57 +02:00 committed by GitHub
commit aca2021112
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 221 additions and 51 deletions

View file

@ -32,7 +32,7 @@
from .platform import uid2user, user2uid, gid2group, group2gid
from .helpers import parse_timestamp, archive_ts_now
from .helpers import OutputTimestamp, format_timedelta, format_file_size, file_status, FileSize
from .helpers import safe_encode, make_path_safe, remove_surrogates, text_to_json, join_cmd
from .helpers import safe_encode, make_path_safe, remove_surrogates, text_to_json, join_cmd, remove_dotdot_prefixes
from .helpers import StableDict
from .helpers import bin_to_hex
from .helpers import safe_ns
@ -853,8 +853,6 @@ def same_item(item, st):
return
dest = self.cwd
if item.path.startswith(("/", "../")):
raise Exception("Path should be relative and local")
path = os.path.join(dest, item.path)
# Attempt to remove existing files, ignore errors on failure
try:
@ -1376,8 +1374,8 @@ def __init__(
@contextmanager
def create_helper(self, path, st, status=None, hardlinkable=True):
safe_path = make_path_safe(path)
item = Item(path=safe_path)
sanitized_path = remove_dotdot_prefixes(path)
item = Item(path=sanitized_path)
hardlinked = hardlinkable and st.st_nlink > 1
hl_chunks = None
update_map = False

View file

@ -9,6 +9,7 @@
from ..cache import Cache, assert_secure
from ..helpers import Error
from ..helpers import SortBySpec, positive_int_validator, location_validator, Location, relative_time_marker_validator
from ..helpers import Highlander
from ..helpers.nanorst import rst_to_terminal
from ..manifest import Manifest, AI_HUMAN_SORT_KEYS
from ..patterns import PatternMatcher
@ -246,20 +247,6 @@ def wrapper(self, args, repository, manifest, **kwargs):
return wrapper
class Highlander(argparse.Action):
"""make sure some option is only given once"""
def __init__(self, *args, **kwargs):
self.__called = False
super().__init__(*args, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
if self.__called:
raise argparse.ArgumentError(self, "There can be only one.")
self.__called = True
setattr(namespace, self.dest, values)
# You can use :ref:`xyz` in the following usage pages. However, for plain-text view,
# e.g. through "borg ... --help", define a substitution for the reference here.
# It will replace the entire :ref:`foo` verbatim.

View file

@ -27,6 +27,7 @@
from ..helpers import flags_root, flags_dir, flags_special_follow, flags_special
from ..helpers import sig_int, ignore_sigint
from ..helpers import iter_separated
from ..helpers import MakePathSafeAction
from ..manifest import Manifest
from ..patterns import PatternMatcher
from ..platform import is_win32
@ -766,7 +767,7 @@ def build_parser_create(self, subparsers, common_parser, mid_common_parser):
metavar="NAME",
dest="stdin_name",
default="stdin",
action=Highlander,
action=MakePathSafeAction,
help="use NAME in archive for stdin data (default: %(default)r)",
)
subparser.add_argument(

View file

@ -13,7 +13,7 @@
from .errors import Error, ErrorWithTraceback, IntegrityError, DecompressionError
from .fs import ensure_dir, join_base_dir, get_socket_filename
from .fs import get_security_dir, get_keys_dir, get_base_dir, get_cache_dir, get_config_dir, get_runtime_dir
from .fs import dir_is_tagged, dir_is_cachedir, make_path_safe, scandir_inorder
from .fs import dir_is_tagged, dir_is_cachedir, remove_dotdot_prefixes, make_path_safe, scandir_inorder
from .fs import secure_erase, safe_unlink, dash_open, os_open, os_stat, umount
from .fs import O_, flags_root, flags_dir, flags_special_follow, flags_special, flags_base, flags_normal, flags_noatime
from .fs import HardLinkManager
@ -31,6 +31,7 @@
from .parseformat import BaseFormatter, ArchiveFormatter, ItemFormatter, file_status
from .parseformat import swidth_slice, ellipsis_truncate
from .parseformat import BorgJsonEncoder, basic_json_data, json_print, json_dump, prepare_dump_dict
from .parseformat import Highlander, MakePathSafeAction
from .process import daemonize, daemonizing
from .process import signal_handler, raising_signal_handler, sig_int, ignore_sigint, SigHup, SigTerm
from .process import popen_with_error_handling, is_terminal, prepare_subprocess_env, create_filter_process

View file

@ -217,12 +217,66 @@ def dir_is_tagged(path, exclude_caches, exclude_if_present):
return tag_names
_safe_re = re.compile(r"^((\.\.)?/+)+")
def make_path_safe(path):
"""Make path safe by making it relative and local"""
return _safe_re.sub("", path) or "."
"""
Make path safe by making it relative and normalized.
`path` is sanitized by making it relative, removing
consecutive slashes (e.g. '//'), removing '.' elements,
and removing trailing slashes.
For reasons of security, a ValueError is raised should
`path` contain any '..' elements.
"""
path = path.lstrip("/")
if "\\" in path: # borg always wants slashes, never backslashes.
raise ValueError(f"unexpected backslash(es) in path {path!r}")
if path.startswith("../") or "/../" in path or path.endswith("/..") or path == "..":
raise ValueError(f"unexpected '..' element in path {path!r}")
path = os.path.normpath(path)
return path
_dotdot_re = re.compile(r"^(\.\./)+")
def remove_dotdot_prefixes(path):
"""
Remove '../'s at the beginning of `path`. Additionally,
the path is made relative.
`path` is expected to be normalized already (e.g. via `os.path.normpath()`).
"""
path = path.lstrip("/")
path = _dotdot_re.sub("", path)
if path in ["", ".."]:
return "."
return path
def assert_sanitized_path(path):
assert isinstance(path, str)
# `path` should have been sanitized earlier. Some features,
# like pattern matching rely on a sanitized path. As a
# precaution we check here again.
if make_path_safe(path) != path:
raise ValueError(f"path {path!r} is not sanitized")
return path
def to_sanitized_path(path):
assert isinstance(path, str)
# Legacy versions of Borg still allowed non-sanitized paths
# to be stored. So, we sanitize them when reading.
#
# Borg 2 ensures paths are safe before storing them. Thus, when
# support for reading Borg 1 archives is dropped, this should be
# changed to a simple check to verify paths aren't malicious.
# Namely, absolute paths and paths containing '..' elements must
# be rejected.
#
# Also checks for '..' elements in `path` for reasons of security.
return make_path_safe(path)
class HardLinkManager:

View file

@ -19,7 +19,7 @@
logger = create_logger()
from .errors import Error
from .fs import get_keys_dir
from .fs import get_keys_dir, make_path_safe
from .msgpack import Timestamp
from .time import OutputTimestamp, format_time, safe_timestamp
from .. import __version__ as borg_version
@ -840,7 +840,7 @@ class FakeArchive:
from ..item import Item
fake_item = Item(mode=0, path="", user="", group="", mtime=0, uid=0, gid=0)
fake_item = Item(mode=0, path="foo", user="", group="", mtime=0, uid=0, gid=0)
formatter = cls(FakeArchive, "")
keys = []
keys.extend(formatter.call_keys.keys())
@ -1147,3 +1147,28 @@ def decode(d):
return res
return decode(d)
class Highlander(argparse.Action):
"""make sure some option is only given once"""
def __init__(self, *args, **kwargs):
self.__called = False
super().__init__(*args, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
if self.__called:
raise argparse.ArgumentError(self, "There can be only one.")
self.__called = True
setattr(namespace, self.dest, values)
class MakePathSafeAction(Highlander):
def __call__(self, parser, namespace, path, option_string=None):
try:
sanitized_path = make_path_safe(path)
except ValueError as e:
raise argparse.ArgumentError(self, e)
if sanitized_path == ".":
raise argparse.ArgumentError(self, f"{path!r} is not a valid file name")
setattr(namespace, self.dest, sanitized_path)

View file

@ -7,6 +7,7 @@ from cpython.bytes cimport PyBytes_AsStringAndSize
from .constants import ITEM_KEYS, ARCHIVE_KEYS
from .helpers import StableDict
from .helpers import format_file_size
from .helpers.fs import assert_sanitized_path, to_sanitized_path
from .helpers.msgpack import timestamp_to_int, int_to_timestamp, Timestamp
from .helpers.time import OutputTimestamp, safe_timestamp
@ -262,7 +263,7 @@ cdef class Item(PropDict):
# properties statically defined, so that IDEs can know their names:
path = PropDictProperty(str, 'surrogate-escaped str')
path = PropDictProperty(str, 'surrogate-escaped str', encode=assert_sanitized_path, decode=to_sanitized_path)
source = PropDictProperty(str, 'surrogate-escaped str') # legacy borg 1.x. borg 2: see .target
target = PropDictProperty(str, 'surrogate-escaped str')
user = PropDictProperty(str, 'surrogate-escaped str')

View file

@ -23,6 +23,7 @@
from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR
from .. import platform
# Note: this is used by borg.selftest, do not use or import py.test functionality here.
from ..fuse_impl import llfuse, has_pyfuse3, has_llfuse
@ -61,6 +62,24 @@ def same_ts_ns(ts_ns1, ts_ns2):
return diff_ts <= diff_max
rejected_dotdot_paths = (
"..",
"../",
"../etc/shadow",
"/..",
"/../",
"/../etc",
"/../etc/",
"etc/..",
"/etc/..",
"/etc/../etc/shadow",
"//etc/..",
"etc//..",
"etc/..//",
"foo/../bar",
)
@contextmanager
def unopened_tempfile():
with tempfile.TemporaryDirectory() as tempdir:

View file

@ -8,6 +8,7 @@
import pytest
from . import BaseTestCase
from . import rejected_dotdot_paths
from ..crypto.key import PlaintextKey
from ..archive import Archive, CacheChunkBuffer, RobustUnpacker, valid_msgpacked_dict, ITEM_KEYS, Statistics
from ..archive import BackupOSError, backup_io, backup_io_iter, get_item_uid_gid
@ -394,3 +395,9 @@ def test_get_item_uid_gid():
# as there is nothing, it'll fall back to uid_default/gid_default.
assert uid == 0
assert gid == 16
def test_reject_non_sanitized_item():
for path in rejected_dotdot_paths:
with pytest.raises(ValueError, match="unexpected '..' element in path"):
Item(path=path, user="root", group="root")

View file

@ -279,6 +279,33 @@ def test_create_no_permission_file(self):
assert "input/file2" not in out # it skipped file2
assert "input/file3" in out
def test_sanitized_stdin_name(self):
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
self.cmd(f"--repo={self.repository_location}", "create", "--stdin-name", "./a//path", "test", "-", input=b"")
item = json.loads(self.cmd(f"--repo={self.repository_location}", "list", "test", "--json-lines"))
assert item["path"] == "a/path"
def test_dotdot_stdin_name(self):
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
output = self.cmd(
f"--repo={self.repository_location}",
"create",
"--stdin-name",
"foo/../bar",
"test",
"-",
input=b"",
exit_code=2,
)
assert output.endswith("'..' element in path 'foo/../bar'" + os.linesep)
def test_dot_stdin_name(self):
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
output = self.cmd(
f"--repo={self.repository_location}", "create", "--stdin-name", "./", "test", "-", input=b"", exit_code=2
)
assert output.endswith("'./' is not a valid file name" + os.linesep)
def test_create_content_from_command(self):
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
input_data = "some test content"
@ -586,7 +613,7 @@ def test_exclude_keep_tagged(self):
)
self._assert_test_keep_tagged()
def test_path_normalization(self):
def test_path_sanitation(self):
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
self.create_regular_file("dir1/dir2/file", size=1024 * 80)
with changedir("input/dir1/dir2"):
@ -595,7 +622,7 @@ def test_path_normalization(self):
self.assert_not_in("..", output)
self.assert_in(" input/dir1/dir2/file", output)
def test_exclude_normalization(self):
def test_exclude_sanitation(self):
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
self.create_regular_file("file1", size=1024 * 80)
self.create_regular_file("file2", size=1024 * 80)

Binary file not shown.

View file

@ -129,6 +129,29 @@ def test_import_tar(self, tar_format="PAX"):
self.cmd(f"--repo={self.repository_location}", "extract", "dst")
self.assert_dirs_equal("input", "output/input", ignore_ns=True, ignore_xattrs=True)
def test_import_unusual_tar(self):
# Contains these, unusual entries:
# /foobar
# ./bar
# ./foo2/
# ./foo//bar
# ./
tar_archive = os.path.join(os.path.dirname(__file__), "unusual_paths.tar")
self.cmd(f"--repo={self.repository_location}", "rcreate", "--encryption=none")
self.cmd(f"--repo={self.repository_location}", "import-tar", "dst", tar_archive)
files = self.cmd(f"--repo={self.repository_location}", "list", "dst", "--format", "{path}{NL}").splitlines()
self.assert_equal(set(files), {"foobar", "bar", "foo2", "foo/bar", "."})
def test_import_tar_with_dotdot(self):
# Contains this file:
# ../../../../etc/shadow
tar_archive = os.path.join(os.path.dirname(__file__), "dotdot_path.tar")
self.cmd(f"--repo={self.repository_location}", "rcreate", "--encryption=none")
with pytest.raises(ValueError, match="unexpected '..' element in path '../../../../etc/shadow'"):
self.cmd(f"--repo={self.repository_location}", "import-tar", "dst", tar_archive, exit_code=2)
@requires_gzip
def test_import_tar_gz(self, tar_format="GNU"):
if not shutil.which("gzip"):
@ -212,3 +235,8 @@ class RemoteArchiverTestCase(RemoteArchiverTestCaseBase, ArchiverTestCase):
@unittest.skipUnless("binary" in BORG_EXES, "no borg.exe available")
class ArchiverTestCaseBinary(ArchiverTestCaseBinaryBase, ArchiverTestCase):
"""runs the same tests, but via the borg binary"""
@unittest.skip("does not work with binaries")
def test_import_tar_with_dotdot(self):
# the test checks for a raised exception. that can't work if the code runs in a separate process.
pass

Binary file not shown.

View file

@ -25,7 +25,7 @@
PlaceholderError,
replace_placeholders,
)
from ..helpers import make_path_safe, clean_lines
from ..helpers import remove_dotdot_prefixes, make_path_safe, clean_lines
from ..helpers import interval
from ..helpers import get_base_dir, get_cache_dir, get_keys_dir, get_security_dir, get_config_dir, get_runtime_dir
from ..helpers import is_slow_msgpack
@ -48,6 +48,7 @@
from ..platform import is_cygwin, is_win32, is_darwin, swidth
from . import BaseTestCase, FakeInputs, are_hardlinks_supported
from . import rejected_dotdot_paths
def test_bin_to_hex():
@ -393,16 +394,37 @@ def test_chunkerparams():
ChunkerParams("fixed,%d,%d" % (4096, MAX_DATA_SIZE + 1)) # too big header size
class RemoveDotdotPrefixesTestCase(BaseTestCase):
def test(self):
self.assert_equal(remove_dotdot_prefixes("."), ".")
self.assert_equal(remove_dotdot_prefixes(".."), ".")
self.assert_equal(remove_dotdot_prefixes("/"), ".")
self.assert_equal(remove_dotdot_prefixes("//"), ".")
self.assert_equal(remove_dotdot_prefixes("foo"), "foo")
self.assert_equal(remove_dotdot_prefixes("foo/bar"), "foo/bar")
self.assert_equal(remove_dotdot_prefixes("/foo/bar"), "foo/bar")
self.assert_equal(remove_dotdot_prefixes("../foo/bar"), "foo/bar")
class MakePathSafeTestCase(BaseTestCase):
def test(self):
self.assert_equal(make_path_safe("."), ".")
self.assert_equal(make_path_safe("./"), ".")
self.assert_equal(make_path_safe("./foo"), "foo")
self.assert_equal(make_path_safe(".//foo"), "foo")
self.assert_equal(make_path_safe(".//foo//bar//"), "foo/bar")
self.assert_equal(make_path_safe("/foo/bar"), "foo/bar")
self.assert_equal(make_path_safe("/foo/bar"), "foo/bar")
self.assert_equal(make_path_safe("/f/bar"), "f/bar")
self.assert_equal(make_path_safe("fo/bar"), "fo/bar")
self.assert_equal(make_path_safe("../foo/bar"), "foo/bar")
self.assert_equal(make_path_safe("../../foo/bar"), "foo/bar")
self.assert_equal(make_path_safe("/"), ".")
self.assert_equal(make_path_safe("/"), ".")
self.assert_equal(make_path_safe("//foo/bar"), "foo/bar")
self.assert_equal(make_path_safe("//foo/./bar"), "foo/bar")
self.assert_equal(make_path_safe(".test"), ".test")
self.assert_equal(make_path_safe(".test."), ".test.")
self.assert_equal(make_path_safe("..test.."), "..test..")
self.assert_equal(make_path_safe("/te..st/foo/bar"), "te..st/foo/bar")
self.assert_equal(make_path_safe("/..test../abc//"), "..test../abc")
for path in rejected_dotdot_paths:
with pytest.raises(ValueError, match="unexpected '..' element in path"):
make_path_safe(path)
class MockArchive:

View file

@ -38,8 +38,8 @@ def test_item_empty():
@pytest.mark.parametrize(
"item_dict, path, mode",
[ # does not matter whether we get str or bytes keys
({b"path": "/a/b/c", b"mode": 0o666}, "/a/b/c", 0o666),
({"path": "/a/b/c", "mode": 0o666}, "/a/b/c", 0o666),
({b"path": "a/b/c", b"mode": 0o666}, "a/b/c", 0o666),
({"path": "a/b/c", "mode": 0o666}, "a/b/c", 0o666),
],
)
def test_item_from_dict(item_dict, path, mode):
@ -64,8 +64,8 @@ def test_item_invalid(invalid_item, error):
def test_item_from_kw():
item = Item(path="/a/b/c", mode=0o666)
assert item.path == "/a/b/c"
item = Item(path="a/b/c", mode=0o666)
assert item.path == "a/b/c"
assert item.mode == 0o666
@ -91,22 +91,22 @@ def test_item_mptimestamp_property(atime):
def test_item_se_str_property():
# start simple
item = Item()
item.path = "/a/b/c"
assert item.path == "/a/b/c"
assert item.as_dict() == {"path": "/a/b/c"}
item.path = "a/b/c"
assert item.path == "a/b/c"
assert item.as_dict() == {"path": "a/b/c"}
del item.path
assert item.as_dict() == {}
with pytest.raises(TypeError):
item.path = 42
# non-utf-8 path, needing surrogate-escaping for latin-1 u-umlaut
item = Item(internal_dict={"path": b"/a/\xfc/c"})
assert item.path == "/a/\udcfc/c" # getting a surrogate-escaped representation
assert item.as_dict() == {"path": "/a/\udcfc/c"}
item = Item(internal_dict={"path": b"a/\xfc/c"})
assert item.path == "a/\udcfc/c" # getting a surrogate-escaped representation
assert item.as_dict() == {"path": "a/\udcfc/c"}
del item.path
assert "path" not in item
item.path = "/a/\udcfc/c" # setting using a surrogate-escaped representation
assert item.as_dict() == {"path": "/a/\udcfc/c"}
item.path = "a/\udcfc/c" # setting using a surrogate-escaped representation
assert item.as_dict() == {"path": "a/\udcfc/c"}
def test_item_list_property():