diff --git a/src/borg/remote.py b/src/borg/remote.py index 8d6f861fb..594cc4a8a 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -1,10 +1,8 @@ -import atexit import errno import functools import inspect import logging import os -import queue import select import shlex import shutil @@ -14,10 +12,8 @@ import tempfile import textwrap import time -import traceback from subprocess import Popen, PIPE -import borg.logger from . import __version__ from .compress import Compressor from .constants import * # NOQA @@ -25,13 +21,12 @@ from .helpers import bin_to_hex from .helpers import get_limited_unpacker from .helpers import replace_placeholders -from .helpers import sysinfo from .helpers import format_file_size from .helpers import safe_unlink from .helpers import prepare_subprocess_env, ignore_sigint from .helpers import get_socket_filename from .locking import LockTimeout, NotLocked, NotMyLock, LockFailed -from .logger import create_logger, borg_serve_log_queue +from .logger import create_logger from .helpers import msgpack from .repository import Repository from .version import parse_version, format_version @@ -48,25 +43,6 @@ RATELIMIT_PERIOD = 0.1 -def os_write(fd, data): - """os.write wrapper so we do not lose data for partial writes.""" - # TODO: this issue is fixed in cygwin since at least 2.8.0, remove this - # wrapper / workaround when this version is considered ancient. - # This is happening frequently on cygwin due to its small pipe buffer size of only 64kiB - # and also due to its different blocking pipe behaviour compared to Linux/*BSD. - # Neither Linux nor *BSD ever do partial writes on blocking pipes, unless interrupted by a - # signal, in which case serve() would terminate. - amount = remaining = len(data) - while remaining: - count = os.write(fd, data) - remaining -= count - if not remaining: - break - data = data[count:] - time.sleep(count * 1e-09) - return amount - - class ConnectionClosed(Error): """Connection closed by remote host""" @@ -134,281 +110,6 @@ class ConnectionBrokenWithHint(Error): # servers still get compatible input. -class RepositoryServer: # pragma: no cover - rpc_methods = ( - "__len__", - "check", - "commit", - "delete", - "destroy", - "get", - "list", - "negotiate", - "open", - "close", - "info", - "put", - "rollback", - "save_key", - "load_key", - "break_lock", - "inject_exception", - "get_manifest", - "put_manifest", - ) - - def __init__(self, restrict_to_paths, restrict_to_repositories, append_only, storage_quota, use_socket): - self.repository = None - self.restrict_to_paths = restrict_to_paths - self.restrict_to_repositories = restrict_to_repositories - # This flag is parsed from the serve command line via Archiver.do_serve, - # i.e. it reflects local system policy and generally ranks higher than - # whatever the client wants, except when initializing a new repository - # (see RepositoryServer.open below). - self.append_only = append_only - self.storage_quota = storage_quota - self.client_version = None # we update this after client sends version information - if use_socket is False: - self.socket_path = None - elif use_socket is True: # --socket - self.socket_path = get_socket_filename() - else: # --socket=/some/path - self.socket_path = use_socket - - def filter_args(self, f, kwargs): - """Remove unknown named parameters from call, because client did (implicitly) say it's ok.""" - known = set(inspect.signature(f).parameters) - return {name: kwargs[name] for name in kwargs if name in known} - - def send_queued_log(self): - while True: - try: - # lr_dict contents see BorgQueueHandler - lr_dict = borg_serve_log_queue.get_nowait() - except queue.Empty: - break - else: - msg = msgpack.packb({LOG: lr_dict}) - os_write(self.stdout_fd, msg) - - def serve(self): - def inner_serve(): - os.set_blocking(self.stdin_fd, False) - assert not os.get_blocking(self.stdin_fd) - os.set_blocking(self.stdout_fd, True) - assert os.get_blocking(self.stdout_fd) - - unpacker = get_limited_unpacker("server") - shutdown_serve = False - while True: - # before processing any new RPCs, send out all pending log output - self.send_queued_log() - - if shutdown_serve: - # shutdown wanted! get out of here after sending all log output. - assert self.repository is None - return - - # process new RPCs - r, w, es = select.select([self.stdin_fd], [], [], 10) - if r: - data = os.read(self.stdin_fd, BUFSIZE) - if not data: - shutdown_serve = True - continue - unpacker.feed(data) - for unpacked in unpacker: - if isinstance(unpacked, dict): - msgid = unpacked[MSGID] - method = unpacked[MSG] - args = unpacked[ARGS] - else: - if self.repository is not None: - self.repository.close() - raise UnexpectedRPCDataFormatFromClient(__version__) - try: - if method not in self.rpc_methods: - raise InvalidRPCMethod(method) - try: - f = getattr(self, method) - except AttributeError: - f = getattr(self.repository, method) - args = self.filter_args(f, args) - res = f(**args) - except BaseException as e: - ex_short = traceback.format_exception_only(e.__class__, e) - ex_full = traceback.format_exception(*sys.exc_info()) - ex_trace = True - if isinstance(e, Error): - ex_short = [e.get_message()] - ex_trace = e.traceback - if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)): - # These exceptions are reconstructed on the client end in RemoteRepository.call_many(), - # and will be handled just like locally raised exceptions. Suppress the remote traceback - # for these, except ErrorWithTraceback, which should always display a traceback. - pass - else: - logging.debug("\n".join(ex_full)) - - sys_info = sysinfo() - try: - msg = msgpack.packb( - { - MSGID: msgid, - "exception_class": e.__class__.__name__, - "exception_args": e.args, - "exception_full": ex_full, - "exception_short": ex_short, - "exception_trace": ex_trace, - "sysinfo": sys_info, - } - ) - except TypeError: - msg = msgpack.packb( - { - MSGID: msgid, - "exception_class": e.__class__.__name__, - "exception_args": [ - x if isinstance(x, (str, bytes, int)) else None for x in e.args - ], - "exception_full": ex_full, - "exception_short": ex_short, - "exception_trace": ex_trace, - "sysinfo": sys_info, - } - ) - os_write(self.stdout_fd, msg) - else: - os_write(self.stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res})) - if es: - shutdown_serve = True - continue - - if self.socket_path: # server for socket:// connections - try: - # remove any left-over socket file - os.unlink(self.socket_path) - except OSError: - if os.path.exists(self.socket_path): - raise - sock_dir = os.path.dirname(self.socket_path) - os.makedirs(sock_dir, exist_ok=True) - if self.socket_path.endswith(".sock"): - pid_file = self.socket_path.replace(".sock", ".pid") - else: - pid_file = self.socket_path + ".pid" - pid = os.getpid() - with open(pid_file, "w") as f: - f.write(str(pid)) - atexit.register(functools.partial(os.remove, pid_file)) - atexit.register(functools.partial(os.remove, self.socket_path)) - sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) - sock.bind(self.socket_path) # this creates the socket file in the fs - sock.listen(0) # no backlog - os.chmod(self.socket_path, mode=0o0770) # group members may use the socket, too. - print(f"borg serve: PID {pid}, listening on socket {self.socket_path} ...", file=sys.stderr) - - while True: - connection, client_address = sock.accept() - print(f"Accepted a connection on socket {self.socket_path} ...", file=sys.stderr) - self.stdin_fd = connection.makefile("rb").fileno() - self.stdout_fd = connection.makefile("wb").fileno() - inner_serve() - print(f"Finished with connection on socket {self.socket_path} .", file=sys.stderr) - else: # server for one ssh:// connection - self.stdin_fd = sys.stdin.fileno() - self.stdout_fd = sys.stdout.fileno() - inner_serve() - - def negotiate(self, client_data): - if isinstance(client_data, dict): - self.client_version = client_data["client_version"] - else: - self.client_version = BORG_VERSION # seems to be newer than current version (no known old format) - - # not a known old format, send newest negotiate this version knows - return {"server_version": BORG_VERSION} - - def _resolve_path(self, path): - if isinstance(path, bytes): - path = os.fsdecode(path) - if path.startswith("/~/"): # /~/x = path x relative to own home dir - home_dir = os.environ.get("HOME") or os.path.expanduser("~%s" % os.environ.get("USER", "")) - path = os.path.join(home_dir, path[3:]) - elif path.startswith("/./"): # /./x = path x relative to cwd - path = path[3:] - return os.path.realpath(path) - - def open( - self, path, create=False, lock_wait=None, lock=True, exclusive=None, append_only=False, make_parent_dirs=False - ): - logging.debug("Resolving repository path %r", path) - path = self._resolve_path(path) - logging.debug("Resolved repository path to %r", path) - path_with_sep = os.path.join(path, "") # make sure there is a trailing slash (os.sep) - if self.restrict_to_paths: - # if --restrict-to-path P is given, we make sure that we only operate in/below path P. - # for the prefix check, it is important that the compared paths both have trailing slashes, - # so that a path /foobar will NOT be accepted with --restrict-to-path /foo option. - for restrict_to_path in self.restrict_to_paths: - restrict_to_path_with_sep = os.path.join(os.path.realpath(restrict_to_path), "") # trailing slash - if path_with_sep.startswith(restrict_to_path_with_sep): - break - else: - raise PathNotAllowed(path) - if self.restrict_to_repositories: - for restrict_to_repository in self.restrict_to_repositories: - restrict_to_repository_with_sep = os.path.join(os.path.realpath(restrict_to_repository), "") - if restrict_to_repository_with_sep == path_with_sep: - break - else: - raise PathNotAllowed(path) - # "borg init" on "borg serve --append-only" (=self.append_only) does not create an append only repo, - # while "borg init --append-only" (=append_only) does, regardless of the --append-only (self.append_only) - # flag for serve. - append_only = (not create and self.append_only) or append_only - self.repository = Repository( - path, - create, - lock_wait=lock_wait, - lock=lock, - append_only=append_only, - storage_quota=self.storage_quota, - exclusive=exclusive, - make_parent_dirs=make_parent_dirs, - send_log_cb=self.send_queued_log, - ) - self.repository.__enter__() # clean exit handled by serve() method - return self.repository.id - - def close(self): - if self.repository is not None: - self.repository.__exit__(None, None, None) - self.repository = None - borg.logger.flush_logging() - self.send_queued_log() - - def inject_exception(self, kind): - s1 = "test string" - s2 = "test string2" - if kind == "DoesNotExist": - raise Repository.DoesNotExist(s1) - elif kind == "AlreadyExists": - raise Repository.AlreadyExists(s1) - elif kind == "CheckNeeded": - raise Repository.CheckNeeded(s1) - elif kind == "IntegrityError": - raise IntegrityError(s1) - elif kind == "PathNotAllowed": - raise PathNotAllowed("foo") - elif kind == "ObjectNotFound": - raise Repository.ObjectNotFound(s1, s2) - elif kind == "InvalidRPCMethod": - raise InvalidRPCMethod(s1) - elif kind == "divide": - 0 // 0 - - class SleepingBandwidthLimiter: def __init__(self, limit): if limit: