From e351e67aee227d8482e228d27597942b598c946a Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Thu, 25 May 2023 00:55:43 +0200 Subject: [PATCH] RepositoryServer: do not use stderr for logging, see #7604 Instead, install a handler that sends the LogRecord dicts to a queue. That queue is then emptied in the borg serve main loop and the LogRecords are sent msgpacked via stdout to the client, similar to the RPC results. On the client side, the LogRecords are recreated from the received dicts and fed into the clientside logging system. As we use msgpacked LogRecord dicts, we don't need JSON for this purpose on the borg serve side any more. On the client side, the LogRecords will then be either formatted as normal text or as JSON log output (by the clientside log formatter). --- src/borg/archiver/__init__.py | 4 +- src/borg/logger.py | 27 ++++++++- src/borg/remote.py | 100 ++++++++++--------------------- src/borg/testsuite/repository.py | 75 +---------------------- 4 files changed, 61 insertions(+), 145 deletions(-) diff --git a/src/borg/archiver/__init__.py b/src/borg/archiver/__init__.py index 69f14d5cc..915594d1b 100644 --- a/src/borg/archiver/__init__.py +++ b/src/borg/archiver/__init__.py @@ -482,8 +482,8 @@ class Archiver( func = get_func(args) # do not use loggers before this! is_serve = func == self.do_serve - self.log_json = args.log_json or is_serve - setup_logging(level=args.log_level, json=self.log_json) + self.log_json = args.log_json and not is_serve + setup_logging(level=args.log_level, is_serve=is_serve, json=self.log_json) args.progress |= is_serve self._setup_implied_logging(vars(args)) self._setup_topic_debugging(args) diff --git a/src/borg/logger.py b/src/borg/logger.py index 6788ff118..1422902f0 100644 --- a/src/borg/logger.py +++ b/src/borg/logger.py @@ -36,9 +36,30 @@ import logging import logging.config import logging.handlers # needed for handlers defined there being configurable in logging.conf file import os +import queue import warnings configured = False +borg_serve_log_queue = queue.SimpleQueue() + + +class BorgQueueHandler(logging.handlers.QueueHandler): + """borg serve writes log record dicts to a borg_serve_log_queue""" + + def prepare(self, record: logging.LogRecord) -> dict: + return dict( + # kwargs needed for LogRecord constructor: + name=record.name, + level=record.levelno, + pathname=record.pathname, + lineno=record.lineno, + msg=record.msg, + args=record.args, + exc_info=record.exc_info, + func=record.funcName, + sinfo=record.stack_info, + ) + # use something like this to ignore warnings: # warnings.filterwarnings('ignore', r'... regex for warning message to ignore ...') @@ -53,7 +74,7 @@ def _log_warning(message, category, filename, lineno, file=None, line=None): logger.warning(msg) -def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", level="info", json=False): +def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", level="info", is_serve=False, json=False): """setup logging module according to the arguments provided if conf_fname is given (or the config file name can be determined via @@ -61,6 +82,8 @@ def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", lev otherwise, set up a stream handler logger on stderr (by default, if no stream is provided). + + is_serve: are we setting up the logging for "borg serve"? """ global configured err_msg = None @@ -86,7 +109,7 @@ def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", lev err_msg = str(err) # if we did not / not successfully load a logging configuration, fallback to this: logger = logging.getLogger("") - handler = logging.StreamHandler(stream) + handler = BorgQueueHandler(borg_serve_log_queue) if is_serve else logging.StreamHandler(stream) fmt = "%(message)s" formatter = JsonFormatter(fmt) if json else logging.Formatter(fmt) handler.setFormatter(formatter) diff --git a/src/borg/remote.py b/src/borg/remote.py index d70ebada3..6d459cf24 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -1,9 +1,9 @@ import errno import functools import inspect -import json import logging import os +import queue import select import shlex import shutil @@ -26,7 +26,7 @@ from .helpers import sysinfo from .helpers import format_file_size from .helpers import safe_unlink from .helpers import prepare_subprocess_env, ignore_sigint -from .logger import create_logger +from .logger import create_logger, borg_serve_log_queue from .helpers import msgpack from .repository import Repository from .version import parse_version, format_version @@ -36,7 +36,7 @@ from .helpers.datastruct import EfficientCollectionQueue logger = create_logger(__name__) BORG_VERSION = parse_version(__version__) -MSGID, MSG, ARGS, RESULT = "i", "m", "a", "r" +MSGID, MSG, ARGS, RESULT, LOG = "i", "m", "a", "r", "l" MAX_INFLIGHT = 100 @@ -154,25 +154,28 @@ class RepositoryServer: # pragma: no cover def serve(self): stdin_fd = sys.stdin.fileno() stdout_fd = sys.stdout.fileno() - stderr_fd = sys.stdout.fileno() os.set_blocking(stdin_fd, False) os.set_blocking(stdout_fd, True) - os.set_blocking(stderr_fd, True) unpacker = get_limited_unpacker("server") while True: + # before processing any new RPCs, send out all pending log output + while True: + try: + # lr_dict contents see BorgQueueHandler + lr_dict = borg_serve_log_queue.get_nowait() + except queue.Empty: + break + else: + msg = msgpack.packb({LOG: lr_dict}) + os_write(stdout_fd, msg) + + # process new RPCs r, w, es = select.select([stdin_fd], [], [], 10) if r: data = os.read(stdin_fd, BUFSIZE) if not data: if self.repository is not None: self.repository.close() - else: - os_write( - stderr_fd, - "Borg {}: Got connection close before repository was opened.\n".format( - __version__ - ).encode(), - ) return unpacker.feed(data) for unpacked in unpacker: @@ -726,10 +729,18 @@ class RemoteRepository: self.rx_bytes += len(data) self.unpacker.feed(data) for unpacked in self.unpacker: - if isinstance(unpacked, dict): - msgid = unpacked[MSGID] - else: + if not isinstance(unpacked, dict): raise UnexpectedRPCDataFormatFromServer(data) + + lr_dict = unpacked.get(LOG) + if lr_dict is not None: + # Re-emit remote log messages locally. + _logger = logging.getLogger(lr_dict["name"]) + if _logger.isEnabledFor(lr_dict["level"]): + _logger.handle(logging.LogRecord(**lr_dict)) + continue + + msgid = unpacked[MSGID] if msgid in self.ignore_responses: self.ignore_responses.remove(msgid) # async methods never return values, but may raise exceptions. @@ -755,8 +766,14 @@ class RemoteRepository: if lines and not lines[-1].endswith((b"\r", b"\n")): self.stderr_received = lines.pop() # now we have complete lines in and any partial line in self.stderr_received. + _logger = logging.getLogger() for line in lines: - handle_remote_line(line.decode()) # decode late, avoid partial utf-8 sequences + # borg serve (remote/server side) should not emit stuff on stderr, + # but e.g. the ssh process (local/client side) might output errors there. + assert line.endswith((b"\r", b"\n")) + # something came in on stderr, log it to not lose it. + # decode late, avoid partial utf-8 sequences. + _logger.warning("stderr: " + line.decode().strip()) if w: while ( (len(self.to_send) <= maximum_to_send) @@ -886,57 +903,6 @@ class RemoteRepository: self.preload_ids += ids -def handle_remote_line(line): - """ - Handle a remote log line. - - This function is remarkably complex because it handles multiple wire formats. - """ - assert line.endswith(("\r", "\n")) - if line.startswith("{"): - msg = json.loads(line) - - if msg["type"] not in ("progress_message", "progress_percent", "log_message"): - logger.warning("Dropped remote log message with unknown type %r: %s", msg["type"], line) - return - - if msg["type"] == "log_message": - # Re-emit log messages on the same level as the remote to get correct log suppression and verbosity. - level = getattr(logging, msg["levelname"], logging.CRITICAL) - assert isinstance(level, int) - target_logger = logging.getLogger(msg["name"]) - msg["message"] = "Remote: " + msg["message"] - # In JSON mode, we manually check whether the log message should be propagated. - if logging.getLogger("borg").json and level >= target_logger.getEffectiveLevel(): - sys.stderr.write(json.dumps(msg) + "\n") - else: - target_logger.log(level, "%s", msg["message"]) - elif msg["type"].startswith("progress_"): - # Progress messages are a bit more complex. - # First of all, we check whether progress output is enabled. This is signalled - # through the effective level of the borg.output.progress logger - # (also see ProgressIndicatorBase in borg.helpers). - progress_logger = logging.getLogger("borg.output.progress") - if progress_logger.getEffectiveLevel() == logging.INFO: - # When progress output is enabled, we check whether the client is in - # --log-json mode, as signalled by the "json" attribute on the "borg" logger. - if logging.getLogger("borg").json: - # In --log-json mode we re-emit the progress JSON line as sent by the server, - # with the message, if any, prefixed with "Remote: ". - if "message" in msg: - msg["message"] = "Remote: " + msg["message"] - sys.stderr.write(json.dumps(msg) + "\n") - elif "message" in msg: - # In text log mode we write only the message to stderr and terminate with \r - # (carriage return, i.e. move the write cursor back to the beginning of the line) - # so that the next message, progress or not, overwrites it. This mirrors the behaviour - # of local progress displays. - sys.stderr.write("Remote: " + msg["message"] + "\r") - else: - # We don't know what priority the line had. - logging.getLogger("").warning("stderr/remote: " + line.strip()) - - class RepositoryNoCache: """A not caching Repository wrapper, passes through to repository. diff --git a/src/borg/testsuite/repository.py b/src/borg/testsuite/repository.py index 93957584a..f04b11784 100644 --- a/src/borg/testsuite/repository.py +++ b/src/borg/testsuite/repository.py @@ -1,4 +1,3 @@ -import io import logging import os import shutil @@ -13,7 +12,7 @@ from ..helpers import Location from ..helpers import IntegrityError from ..helpers import msgpack from ..locking import Lock, LockFailed -from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, handle_remote_line +from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE, TAG_PUT2, TAG_PUT, TAG_COMMIT from ..repoobj import RepoObj from . import BaseTestCase @@ -1064,75 +1063,3 @@ class RemoteRepositoryCheckTestCase(RepositoryCheckTestCase): def test_repair_missing_segment(self): # skip this test, files in RemoteRepository cannot be deleted pass - - -class RemoteLoggerTestCase(BaseTestCase): - def setUp(self): - self.stream = io.StringIO() - self.handler = logging.StreamHandler(self.stream) - logging.getLogger().handlers[:] = [self.handler] - logging.getLogger("borg.repository").handlers[:] = [] - logging.getLogger("borg.repository.foo").handlers[:] = [] - # capture stderr - sys.stderr.flush() - self.old_stderr = sys.stderr - self.stderr = sys.stderr = io.StringIO() - - def tearDown(self): - sys.stderr = self.old_stderr - - def test_stderr_messages(self): - handle_remote_line("unstructured stderr message\n") - self.assert_equal(self.stream.getvalue(), "stderr/remote: unstructured stderr message\n") - self.assert_equal(self.stderr.getvalue(), "") - - def test_post11_format_messages(self): - self.handler.setLevel(logging.DEBUG) - logging.getLogger().setLevel(logging.DEBUG) - - msg = ( - """{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,""" - """ "message": "borg >= 1.1 format message"}\n""" - ) - handle_remote_line(msg) - self.assert_equal(self.stream.getvalue(), "Remote: borg >= 1.1 format message\n") - self.assert_equal(self.stderr.getvalue(), "") - - def test_remote_messages_screened(self): - # default borg config for root logger - self.handler.setLevel(logging.WARNING) - logging.getLogger().setLevel(logging.WARNING) - - msg = ( - """{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,""" - """ "message": "new format info message"}\n""" - ) - handle_remote_line(msg) - self.assert_equal(self.stream.getvalue(), "") - self.assert_equal(self.stderr.getvalue(), "") - - def test_info_to_correct_local_child(self): - logging.getLogger("borg.repository").setLevel(logging.INFO) - logging.getLogger("borg.repository.foo").setLevel(logging.INFO) - # default borg config for root logger - self.handler.setLevel(logging.WARNING) - logging.getLogger().setLevel(logging.WARNING) - - child_stream = io.StringIO() - child_handler = logging.StreamHandler(child_stream) - child_handler.setLevel(logging.INFO) - logging.getLogger("borg.repository").handlers[:] = [child_handler] - foo_stream = io.StringIO() - foo_handler = logging.StreamHandler(foo_stream) - foo_handler.setLevel(logging.INFO) - logging.getLogger("borg.repository.foo").handlers[:] = [foo_handler] - - msg = ( - """{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,""" - """ "message": "new format child message"}\n""" - ) - handle_remote_line(msg) - self.assert_equal(foo_stream.getvalue(), "") - self.assert_equal(child_stream.getvalue(), "Remote: new format child message\n") - self.assert_equal(self.stream.getvalue(), "") - self.assert_equal(self.stderr.getvalue(), "")