borg/src/borg/testsuite/__init__.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

215 lines
5.9 KiB
Python
Raw Normal View History

from contextlib import contextmanager
import functools
2013-07-26 11:18:57 +00:00
import os
try:
import posix
except ImportError:
posix = None
import stat
2013-07-26 11:18:57 +00:00
import sys
2013-07-29 11:57:43 +00:00
import sysconfig
import tempfile
2013-06-24 20:41:05 +00:00
import unittest
# Note: this is used by borg.selftest, do not *require* pytest functionality here.
try:
from pytest import raises
except: # noqa
raises = None
2023-07-25 22:19:37 +00:00
from ..fuse_impl import llfuse, has_llfuse, has_pyfuse3 # NOQA
from .. import platform
from ..platformflags import is_win32
# Does this version of llfuse support ns precision?
have_fuse_mtime_ns = hasattr(llfuse.EntryAttributes, "st_mtime_ns") if llfuse else False
has_lchflags = hasattr(os, "lchflags") or sys.platform.startswith("linux")
try:
with tempfile.NamedTemporaryFile() as file:
platform.set_flags(file.name, stat.UF_NODUMP)
except OSError:
has_lchflags = False
2015-02-28 02:43:08 +00:00
# The mtime get/set precision varies on different OS and Python versions
if posix and "HAVE_FUTIMENS" in getattr(posix, "_have_functions", []):
st_mtime_ns_round = 0 # 1ns resolution
2013-07-29 11:57:43 +00:00
elif "HAVE_UTIMES" in sysconfig.get_config_vars():
st_mtime_ns_round = -3 # 1us resolution
2013-07-29 11:57:43 +00:00
else:
st_mtime_ns_round = -9 # 1s resolution
2013-07-29 11:57:43 +00:00
if sys.platform.startswith("netbsd"):
st_mtime_ns_round = -4 # 10us - strange: only >1 microsecond resolution here?
def same_ts_ns(ts_ns1, ts_ns2):
"""compare 2 timestamps (both in nanoseconds) whether they are (roughly) equal"""
diff_ts = int(abs(ts_ns1 - ts_ns2))
diff_max = 10 ** (-st_mtime_ns_round)
return diff_ts <= diff_max
2013-07-26 11:18:57 +00:00
2013-06-24 20:41:05 +00:00
Sanitize paths during archive creation/extraction/... Paths are not always sanitized when creating an archive and, more importantly, never when extracting one. The following example shows how this can be used to attempt to write a file outside the extraction directory: $ echo abcdef | borg create -r ~/borg/a --stdin-name x/../../../../../etc/shadow archive-1 - $ borg list -r ~/borg/a archive-1 -rw-rw---- root root 7 Sun, 2022-10-23 19:14:27 x/../../../../../etc/shadow $ mkdir borg/target $ cd borg/target $ borg extract -r ~/borg/a archive-1 x/../../../../../etc/shadow: makedirs: [Errno 13] Permission denied: '/home/user/borg/target/x/../../../../../etc' Note that Borg tries to extract the file to /etc/shadow and the permission error is a result of the user not having access. This patch ensures file names are sanitized before archiving. As for files extracted from the archive, paths are sanitized by making all paths relative, removing '.' elements, and removing superfluous slashes (as in '//'). '..' elements, however, are rejected outright. The reasoning here is that it is easy to start a path with './' or insert a '//' by accident (e.g. via --stdin-name or import-tar). '..', however, seem unlikely to be the result of an accident and could indicate a tampered repository. With paths being sanitized as they are being read, this "errors" will be corrected during the `borg transfer` required when upgrading to Borg 2. Hence, the sanitation, when reading the archive, can be removed once support for reading v1 repositories is dropped. V2 repository will not contain non-sanitized paths. Of course, a check for absolute paths and '..' elements needs to kept in place to detect tempered archives. I recommend treating this as a security issue. I see the following cases where extracting a file outside the extraction path could constitute a security risk: a) When extraction is done as a different user than archive creation. The user that created the archive may be able to get a file overwritten as a different user. b) When the archive is created on one host and extracted on another. The user that created the archive may be able to get a file overwritten on another host. c) When an archive is created and extracted after a OS reinstall. When a host is suspected compromised, it is common to reinstall (or set up a new machine), extract the backups and then evaluate their integrity. A user that manipulates the archive before such a reinstall may be able to get a file overwritten outside the extraction path and may evade integrity checks. Notably absent is the creation and extraction on the same host as the same user. In such case, an adversary must be assumed to be able to replace any file directly. This also (partially) fixes #7099.
2022-10-23 16:39:09 +00:00
rejected_dotdot_paths = (
"..",
"../",
"../etc/shadow",
"/..",
"/../",
"/../etc",
"/../etc/",
"etc/..",
"/etc/..",
"/etc/../etc/shadow",
"//etc/..",
"etc//..",
"etc/..//",
"foo/../bar",
)
@contextmanager
def unopened_tempfile():
with tempfile.TemporaryDirectory() as tempdir:
yield os.path.join(tempdir, "file")
@contextmanager
def changedir(dir):
cwd = os.getcwd()
os.chdir(dir)
yield
os.chdir(cwd)
def is_root():
"""return True if running with high privileges, like as root"""
if is_win32:
return False # TODO
else:
return os.getuid() == 0
2022-02-27 18:31:33 +00:00
@functools.lru_cache
def are_symlinks_supported():
with unopened_tempfile() as filepath:
try:
os.symlink("somewhere", filepath)
if os.stat(filepath, follow_symlinks=False) and os.readlink(filepath) == "somewhere":
return True
except OSError:
pass
return False
2022-02-27 18:31:33 +00:00
@functools.lru_cache
def are_hardlinks_supported():
if not hasattr(os, "link"):
# some pythons do not have os.link
return False
with unopened_tempfile() as file1path, unopened_tempfile() as file2path:
open(file1path, "w").close()
try:
os.link(file1path, file2path)
stat1 = os.stat(file1path)
stat2 = os.stat(file2path)
if stat1.st_nlink == stat2.st_nlink == 2 and stat1.st_ino == stat2.st_ino:
return True
except OSError:
pass
return False
2022-02-27 18:31:33 +00:00
@functools.lru_cache
def are_fifos_supported():
with unopened_tempfile() as filepath:
try:
os.mkfifo(filepath)
return True
except OSError:
pass
except NotImplementedError:
pass
except AttributeError:
pass
return False
2022-02-27 18:31:33 +00:00
@functools.lru_cache
def is_utime_fully_supported():
with unopened_tempfile() as filepath:
# Some filesystems (such as SSHFS) don't support utime on symlinks
if are_symlinks_supported():
os.symlink("something", filepath)
else:
open(filepath, "w").close()
try:
os.utime(filepath, (1000, 2000), follow_symlinks=False)
new_stats = os.stat(filepath, follow_symlinks=False)
if new_stats.st_atime == 1000 and new_stats.st_mtime == 2000:
return True
2018-10-29 10:54:24 +00:00
except OSError:
pass
except NotImplementedError:
pass
return False
2022-02-27 18:31:33 +00:00
@functools.lru_cache
def is_birthtime_fully_supported():
if not hasattr(os.stat_result, "st_birthtime"):
return False
with unopened_tempfile() as filepath:
# Some filesystems (such as SSHFS) don't support utime on symlinks
if are_symlinks_supported():
os.symlink("something", filepath)
else:
open(filepath, "w").close()
try:
birthtime, mtime, atime = 946598400, 946684800, 946771200
os.utime(filepath, (atime, birthtime), follow_symlinks=False)
os.utime(filepath, (atime, mtime), follow_symlinks=False)
new_stats = os.stat(filepath, follow_symlinks=False)
if new_stats.st_birthtime == birthtime and new_stats.st_mtime == mtime and new_stats.st_atime == atime:
return True
2018-10-29 10:54:24 +00:00
except OSError:
pass
except NotImplementedError:
pass
return False
def no_selinux(x):
# selinux fails our FUSE tests, thus ignore selinux xattrs
SELINUX_KEY = b"security.selinux"
if isinstance(x, dict):
return {k: v for k, v in x.items() if k != SELINUX_KEY}
if isinstance(x, list):
return [k for k in x if k != SELINUX_KEY]
2015-05-09 18:47:50 +00:00
class BaseTestCase(unittest.TestCase):
assert_in = unittest.TestCase.assertIn
assert_not_in = unittest.TestCase.assertNotIn
2013-06-24 20:41:05 +00:00
assert_equal = unittest.TestCase.assertEqual
assert_not_equal = unittest.TestCase.assertNotEqual
2023-07-25 13:14:46 +00:00
assert_raises = staticmethod(raises) if raises else unittest.TestCase.assertRaises # type: ignore
class FakeInputs:
"""Simulate multiple user inputs, can be used as input() replacement"""
def __init__(self, inputs):
self.inputs = inputs
def __call__(self, prompt=None):
if prompt is not None:
print(prompt, end="")
try:
return self.inputs.pop(0)
except IndexError:
raise EOFError from None