diff --git a/src/borg/archiver/__init__.py b/src/borg/archiver/__init__.py index 69f14d5cc..915594d1b 100644 --- a/src/borg/archiver/__init__.py +++ b/src/borg/archiver/__init__.py @@ -482,8 +482,8 @@ class Archiver( func = get_func(args) # do not use loggers before this! is_serve = func == self.do_serve - self.log_json = args.log_json or is_serve - setup_logging(level=args.log_level, json=self.log_json) + self.log_json = args.log_json and not is_serve + setup_logging(level=args.log_level, is_serve=is_serve, json=self.log_json) args.progress |= is_serve self._setup_implied_logging(vars(args)) self._setup_topic_debugging(args) diff --git a/src/borg/logger.py b/src/borg/logger.py index 6788ff118..1422902f0 100644 --- a/src/borg/logger.py +++ b/src/borg/logger.py @@ -36,9 +36,30 @@ import logging import logging.config import logging.handlers # needed for handlers defined there being configurable in logging.conf file import os +import queue import warnings configured = False +borg_serve_log_queue = queue.SimpleQueue() + + +class BorgQueueHandler(logging.handlers.QueueHandler): + """borg serve writes log record dicts to a borg_serve_log_queue""" + + def prepare(self, record: logging.LogRecord) -> dict: + return dict( + # kwargs needed for LogRecord constructor: + name=record.name, + level=record.levelno, + pathname=record.pathname, + lineno=record.lineno, + msg=record.msg, + args=record.args, + exc_info=record.exc_info, + func=record.funcName, + sinfo=record.stack_info, + ) + # use something like this to ignore warnings: # warnings.filterwarnings('ignore', r'... regex for warning message to ignore ...') @@ -53,7 +74,7 @@ def _log_warning(message, category, filename, lineno, file=None, line=None): logger.warning(msg) -def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", level="info", json=False): +def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", level="info", is_serve=False, json=False): """setup logging module according to the arguments provided if conf_fname is given (or the config file name can be determined via @@ -61,6 +82,8 @@ def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", lev otherwise, set up a stream handler logger on stderr (by default, if no stream is provided). + + is_serve: are we setting up the logging for "borg serve"? """ global configured err_msg = None @@ -86,7 +109,7 @@ def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", lev err_msg = str(err) # if we did not / not successfully load a logging configuration, fallback to this: logger = logging.getLogger("") - handler = logging.StreamHandler(stream) + handler = BorgQueueHandler(borg_serve_log_queue) if is_serve else logging.StreamHandler(stream) fmt = "%(message)s" formatter = JsonFormatter(fmt) if json else logging.Formatter(fmt) handler.setFormatter(formatter) diff --git a/src/borg/remote.py b/src/borg/remote.py index d70ebada3..6d459cf24 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -1,9 +1,9 @@ import errno import functools import inspect -import json import logging import os +import queue import select import shlex import shutil @@ -26,7 +26,7 @@ from .helpers import sysinfo from .helpers import format_file_size from .helpers import safe_unlink from .helpers import prepare_subprocess_env, ignore_sigint -from .logger import create_logger +from .logger import create_logger, borg_serve_log_queue from .helpers import msgpack from .repository import Repository from .version import parse_version, format_version @@ -36,7 +36,7 @@ from .helpers.datastruct import EfficientCollectionQueue logger = create_logger(__name__) BORG_VERSION = parse_version(__version__) -MSGID, MSG, ARGS, RESULT = "i", "m", "a", "r" +MSGID, MSG, ARGS, RESULT, LOG = "i", "m", "a", "r", "l" MAX_INFLIGHT = 100 @@ -154,25 +154,28 @@ class RepositoryServer: # pragma: no cover def serve(self): stdin_fd = sys.stdin.fileno() stdout_fd = sys.stdout.fileno() - stderr_fd = sys.stdout.fileno() os.set_blocking(stdin_fd, False) os.set_blocking(stdout_fd, True) - os.set_blocking(stderr_fd, True) unpacker = get_limited_unpacker("server") while True: + # before processing any new RPCs, send out all pending log output + while True: + try: + # lr_dict contents see BorgQueueHandler + lr_dict = borg_serve_log_queue.get_nowait() + except queue.Empty: + break + else: + msg = msgpack.packb({LOG: lr_dict}) + os_write(stdout_fd, msg) + + # process new RPCs r, w, es = select.select([stdin_fd], [], [], 10) if r: data = os.read(stdin_fd, BUFSIZE) if not data: if self.repository is not None: self.repository.close() - else: - os_write( - stderr_fd, - "Borg {}: Got connection close before repository was opened.\n".format( - __version__ - ).encode(), - ) return unpacker.feed(data) for unpacked in unpacker: @@ -726,10 +729,18 @@ class RemoteRepository: self.rx_bytes += len(data) self.unpacker.feed(data) for unpacked in self.unpacker: - if isinstance(unpacked, dict): - msgid = unpacked[MSGID] - else: + if not isinstance(unpacked, dict): raise UnexpectedRPCDataFormatFromServer(data) + + lr_dict = unpacked.get(LOG) + if lr_dict is not None: + # Re-emit remote log messages locally. + _logger = logging.getLogger(lr_dict["name"]) + if _logger.isEnabledFor(lr_dict["level"]): + _logger.handle(logging.LogRecord(**lr_dict)) + continue + + msgid = unpacked[MSGID] if msgid in self.ignore_responses: self.ignore_responses.remove(msgid) # async methods never return values, but may raise exceptions. @@ -755,8 +766,14 @@ class RemoteRepository: if lines and not lines[-1].endswith((b"\r", b"\n")): self.stderr_received = lines.pop() # now we have complete lines in and any partial line in self.stderr_received. + _logger = logging.getLogger() for line in lines: - handle_remote_line(line.decode()) # decode late, avoid partial utf-8 sequences + # borg serve (remote/server side) should not emit stuff on stderr, + # but e.g. the ssh process (local/client side) might output errors there. + assert line.endswith((b"\r", b"\n")) + # something came in on stderr, log it to not lose it. + # decode late, avoid partial utf-8 sequences. + _logger.warning("stderr: " + line.decode().strip()) if w: while ( (len(self.to_send) <= maximum_to_send) @@ -886,57 +903,6 @@ class RemoteRepository: self.preload_ids += ids -def handle_remote_line(line): - """ - Handle a remote log line. - - This function is remarkably complex because it handles multiple wire formats. - """ - assert line.endswith(("\r", "\n")) - if line.startswith("{"): - msg = json.loads(line) - - if msg["type"] not in ("progress_message", "progress_percent", "log_message"): - logger.warning("Dropped remote log message with unknown type %r: %s", msg["type"], line) - return - - if msg["type"] == "log_message": - # Re-emit log messages on the same level as the remote to get correct log suppression and verbosity. - level = getattr(logging, msg["levelname"], logging.CRITICAL) - assert isinstance(level, int) - target_logger = logging.getLogger(msg["name"]) - msg["message"] = "Remote: " + msg["message"] - # In JSON mode, we manually check whether the log message should be propagated. - if logging.getLogger("borg").json and level >= target_logger.getEffectiveLevel(): - sys.stderr.write(json.dumps(msg) + "\n") - else: - target_logger.log(level, "%s", msg["message"]) - elif msg["type"].startswith("progress_"): - # Progress messages are a bit more complex. - # First of all, we check whether progress output is enabled. This is signalled - # through the effective level of the borg.output.progress logger - # (also see ProgressIndicatorBase in borg.helpers). - progress_logger = logging.getLogger("borg.output.progress") - if progress_logger.getEffectiveLevel() == logging.INFO: - # When progress output is enabled, we check whether the client is in - # --log-json mode, as signalled by the "json" attribute on the "borg" logger. - if logging.getLogger("borg").json: - # In --log-json mode we re-emit the progress JSON line as sent by the server, - # with the message, if any, prefixed with "Remote: ". - if "message" in msg: - msg["message"] = "Remote: " + msg["message"] - sys.stderr.write(json.dumps(msg) + "\n") - elif "message" in msg: - # In text log mode we write only the message to stderr and terminate with \r - # (carriage return, i.e. move the write cursor back to the beginning of the line) - # so that the next message, progress or not, overwrites it. This mirrors the behaviour - # of local progress displays. - sys.stderr.write("Remote: " + msg["message"] + "\r") - else: - # We don't know what priority the line had. - logging.getLogger("").warning("stderr/remote: " + line.strip()) - - class RepositoryNoCache: """A not caching Repository wrapper, passes through to repository. diff --git a/src/borg/testsuite/repository.py b/src/borg/testsuite/repository.py index 93957584a..f04b11784 100644 --- a/src/borg/testsuite/repository.py +++ b/src/borg/testsuite/repository.py @@ -1,4 +1,3 @@ -import io import logging import os import shutil @@ -13,7 +12,7 @@ from ..helpers import Location from ..helpers import IntegrityError from ..helpers import msgpack from ..locking import Lock, LockFailed -from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, handle_remote_line +from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE, TAG_PUT2, TAG_PUT, TAG_COMMIT from ..repoobj import RepoObj from . import BaseTestCase @@ -1064,75 +1063,3 @@ class RemoteRepositoryCheckTestCase(RepositoryCheckTestCase): def test_repair_missing_segment(self): # skip this test, files in RemoteRepository cannot be deleted pass - - -class RemoteLoggerTestCase(BaseTestCase): - def setUp(self): - self.stream = io.StringIO() - self.handler = logging.StreamHandler(self.stream) - logging.getLogger().handlers[:] = [self.handler] - logging.getLogger("borg.repository").handlers[:] = [] - logging.getLogger("borg.repository.foo").handlers[:] = [] - # capture stderr - sys.stderr.flush() - self.old_stderr = sys.stderr - self.stderr = sys.stderr = io.StringIO() - - def tearDown(self): - sys.stderr = self.old_stderr - - def test_stderr_messages(self): - handle_remote_line("unstructured stderr message\n") - self.assert_equal(self.stream.getvalue(), "stderr/remote: unstructured stderr message\n") - self.assert_equal(self.stderr.getvalue(), "") - - def test_post11_format_messages(self): - self.handler.setLevel(logging.DEBUG) - logging.getLogger().setLevel(logging.DEBUG) - - msg = ( - """{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,""" - """ "message": "borg >= 1.1 format message"}\n""" - ) - handle_remote_line(msg) - self.assert_equal(self.stream.getvalue(), "Remote: borg >= 1.1 format message\n") - self.assert_equal(self.stderr.getvalue(), "") - - def test_remote_messages_screened(self): - # default borg config for root logger - self.handler.setLevel(logging.WARNING) - logging.getLogger().setLevel(logging.WARNING) - - msg = ( - """{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,""" - """ "message": "new format info message"}\n""" - ) - handle_remote_line(msg) - self.assert_equal(self.stream.getvalue(), "") - self.assert_equal(self.stderr.getvalue(), "") - - def test_info_to_correct_local_child(self): - logging.getLogger("borg.repository").setLevel(logging.INFO) - logging.getLogger("borg.repository.foo").setLevel(logging.INFO) - # default borg config for root logger - self.handler.setLevel(logging.WARNING) - logging.getLogger().setLevel(logging.WARNING) - - child_stream = io.StringIO() - child_handler = logging.StreamHandler(child_stream) - child_handler.setLevel(logging.INFO) - logging.getLogger("borg.repository").handlers[:] = [child_handler] - foo_stream = io.StringIO() - foo_handler = logging.StreamHandler(foo_stream) - foo_handler.setLevel(logging.INFO) - logging.getLogger("borg.repository.foo").handlers[:] = [foo_handler] - - msg = ( - """{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,""" - """ "message": "new format child message"}\n""" - ) - handle_remote_line(msg) - self.assert_equal(foo_stream.getvalue(), "") - self.assert_equal(child_stream.getvalue(), "Remote: new format child message\n") - self.assert_equal(self.stream.getvalue(), "") - self.assert_equal(self.stderr.getvalue(), "")