RepositoryServer: do not use stderr for logging, see #7604

Instead, install a handler that sends the LogRecord dicts to a queue.
That queue is then emptied in the borg serve main loop and
the LogRecords are sent msgpacked via stdout to the client,
similar to the RPC results.

On the client side, the LogRecords are recreated from the
received dicts and fed into the clientside logging system.

As we use msgpacked LogRecord dicts, we don't need JSON for
this purpose on the borg serve side any more.
On the client side, the LogRecords will then be either formatted
as normal text or as JSON log output (by the clientside log
formatter).
This commit is contained in:
Thomas Waldmann 2023-05-25 00:55:43 +02:00
parent ca68dd2565
commit e351e67aee
No known key found for this signature in database
GPG Key ID: 243ACFA951F78E01
4 changed files with 61 additions and 145 deletions

View File

@ -482,8 +482,8 @@ class Archiver(
func = get_func(args)
# do not use loggers before this!
is_serve = func == self.do_serve
self.log_json = args.log_json or is_serve
setup_logging(level=args.log_level, json=self.log_json)
self.log_json = args.log_json and not is_serve
setup_logging(level=args.log_level, is_serve=is_serve, json=self.log_json)
args.progress |= is_serve
self._setup_implied_logging(vars(args))
self._setup_topic_debugging(args)

View File

@ -36,9 +36,30 @@ import logging
import logging.config
import logging.handlers # needed for handlers defined there being configurable in logging.conf file
import os
import queue
import warnings
configured = False
borg_serve_log_queue = queue.SimpleQueue()
class BorgQueueHandler(logging.handlers.QueueHandler):
"""borg serve writes log record dicts to a borg_serve_log_queue"""
def prepare(self, record: logging.LogRecord) -> dict:
return dict(
# kwargs needed for LogRecord constructor:
name=record.name,
level=record.levelno,
pathname=record.pathname,
lineno=record.lineno,
msg=record.msg,
args=record.args,
exc_info=record.exc_info,
func=record.funcName,
sinfo=record.stack_info,
)
# use something like this to ignore warnings:
# warnings.filterwarnings('ignore', r'... regex for warning message to ignore ...')
@ -53,7 +74,7 @@ def _log_warning(message, category, filename, lineno, file=None, line=None):
logger.warning(msg)
def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", level="info", json=False):
def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", level="info", is_serve=False, json=False):
"""setup logging module according to the arguments provided
if conf_fname is given (or the config file name can be determined via
@ -61,6 +82,8 @@ def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", lev
otherwise, set up a stream handler logger on stderr (by default, if no
stream is provided).
is_serve: are we setting up the logging for "borg serve"?
"""
global configured
err_msg = None
@ -86,7 +109,7 @@ def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", lev
err_msg = str(err)
# if we did not / not successfully load a logging configuration, fallback to this:
logger = logging.getLogger("")
handler = logging.StreamHandler(stream)
handler = BorgQueueHandler(borg_serve_log_queue) if is_serve else logging.StreamHandler(stream)
fmt = "%(message)s"
formatter = JsonFormatter(fmt) if json else logging.Formatter(fmt)
handler.setFormatter(formatter)

View File

@ -1,9 +1,9 @@
import errno
import functools
import inspect
import json
import logging
import os
import queue
import select
import shlex
import shutil
@ -26,7 +26,7 @@ from .helpers import sysinfo
from .helpers import format_file_size
from .helpers import safe_unlink
from .helpers import prepare_subprocess_env, ignore_sigint
from .logger import create_logger
from .logger import create_logger, borg_serve_log_queue
from .helpers import msgpack
from .repository import Repository
from .version import parse_version, format_version
@ -36,7 +36,7 @@ from .helpers.datastruct import EfficientCollectionQueue
logger = create_logger(__name__)
BORG_VERSION = parse_version(__version__)
MSGID, MSG, ARGS, RESULT = "i", "m", "a", "r"
MSGID, MSG, ARGS, RESULT, LOG = "i", "m", "a", "r", "l"
MAX_INFLIGHT = 100
@ -154,25 +154,28 @@ class RepositoryServer: # pragma: no cover
def serve(self):
stdin_fd = sys.stdin.fileno()
stdout_fd = sys.stdout.fileno()
stderr_fd = sys.stdout.fileno()
os.set_blocking(stdin_fd, False)
os.set_blocking(stdout_fd, True)
os.set_blocking(stderr_fd, True)
unpacker = get_limited_unpacker("server")
while True:
# before processing any new RPCs, send out all pending log output
while True:
try:
# lr_dict contents see BorgQueueHandler
lr_dict = borg_serve_log_queue.get_nowait()
except queue.Empty:
break
else:
msg = msgpack.packb({LOG: lr_dict})
os_write(stdout_fd, msg)
# process new RPCs
r, w, es = select.select([stdin_fd], [], [], 10)
if r:
data = os.read(stdin_fd, BUFSIZE)
if not data:
if self.repository is not None:
self.repository.close()
else:
os_write(
stderr_fd,
"Borg {}: Got connection close before repository was opened.\n".format(
__version__
).encode(),
)
return
unpacker.feed(data)
for unpacked in unpacker:
@ -726,10 +729,18 @@ class RemoteRepository:
self.rx_bytes += len(data)
self.unpacker.feed(data)
for unpacked in self.unpacker:
if isinstance(unpacked, dict):
msgid = unpacked[MSGID]
else:
if not isinstance(unpacked, dict):
raise UnexpectedRPCDataFormatFromServer(data)
lr_dict = unpacked.get(LOG)
if lr_dict is not None:
# Re-emit remote log messages locally.
_logger = logging.getLogger(lr_dict["name"])
if _logger.isEnabledFor(lr_dict["level"]):
_logger.handle(logging.LogRecord(**lr_dict))
continue
msgid = unpacked[MSGID]
if msgid in self.ignore_responses:
self.ignore_responses.remove(msgid)
# async methods never return values, but may raise exceptions.
@ -755,8 +766,14 @@ class RemoteRepository:
if lines and not lines[-1].endswith((b"\r", b"\n")):
self.stderr_received = lines.pop()
# now we have complete lines in <lines> and any partial line in self.stderr_received.
_logger = logging.getLogger()
for line in lines:
handle_remote_line(line.decode()) # decode late, avoid partial utf-8 sequences
# borg serve (remote/server side) should not emit stuff on stderr,
# but e.g. the ssh process (local/client side) might output errors there.
assert line.endswith((b"\r", b"\n"))
# something came in on stderr, log it to not lose it.
# decode late, avoid partial utf-8 sequences.
_logger.warning("stderr: " + line.decode().strip())
if w:
while (
(len(self.to_send) <= maximum_to_send)
@ -886,57 +903,6 @@ class RemoteRepository:
self.preload_ids += ids
def handle_remote_line(line):
"""
Handle a remote log line.
This function is remarkably complex because it handles multiple wire formats.
"""
assert line.endswith(("\r", "\n"))
if line.startswith("{"):
msg = json.loads(line)
if msg["type"] not in ("progress_message", "progress_percent", "log_message"):
logger.warning("Dropped remote log message with unknown type %r: %s", msg["type"], line)
return
if msg["type"] == "log_message":
# Re-emit log messages on the same level as the remote to get correct log suppression and verbosity.
level = getattr(logging, msg["levelname"], logging.CRITICAL)
assert isinstance(level, int)
target_logger = logging.getLogger(msg["name"])
msg["message"] = "Remote: " + msg["message"]
# In JSON mode, we manually check whether the log message should be propagated.
if logging.getLogger("borg").json and level >= target_logger.getEffectiveLevel():
sys.stderr.write(json.dumps(msg) + "\n")
else:
target_logger.log(level, "%s", msg["message"])
elif msg["type"].startswith("progress_"):
# Progress messages are a bit more complex.
# First of all, we check whether progress output is enabled. This is signalled
# through the effective level of the borg.output.progress logger
# (also see ProgressIndicatorBase in borg.helpers).
progress_logger = logging.getLogger("borg.output.progress")
if progress_logger.getEffectiveLevel() == logging.INFO:
# When progress output is enabled, we check whether the client is in
# --log-json mode, as signalled by the "json" attribute on the "borg" logger.
if logging.getLogger("borg").json:
# In --log-json mode we re-emit the progress JSON line as sent by the server,
# with the message, if any, prefixed with "Remote: ".
if "message" in msg:
msg["message"] = "Remote: " + msg["message"]
sys.stderr.write(json.dumps(msg) + "\n")
elif "message" in msg:
# In text log mode we write only the message to stderr and terminate with \r
# (carriage return, i.e. move the write cursor back to the beginning of the line)
# so that the next message, progress or not, overwrites it. This mirrors the behaviour
# of local progress displays.
sys.stderr.write("Remote: " + msg["message"] + "\r")
else:
# We don't know what priority the line had.
logging.getLogger("").warning("stderr/remote: " + line.strip())
class RepositoryNoCache:
"""A not caching Repository wrapper, passes through to repository.

View File

@ -1,4 +1,3 @@
import io
import logging
import os
import shutil
@ -13,7 +12,7 @@ from ..helpers import Location
from ..helpers import IntegrityError
from ..helpers import msgpack
from ..locking import Lock, LockFailed
from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, handle_remote_line
from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed
from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE, TAG_PUT2, TAG_PUT, TAG_COMMIT
from ..repoobj import RepoObj
from . import BaseTestCase
@ -1064,75 +1063,3 @@ class RemoteRepositoryCheckTestCase(RepositoryCheckTestCase):
def test_repair_missing_segment(self):
# skip this test, files in RemoteRepository cannot be deleted
pass
class RemoteLoggerTestCase(BaseTestCase):
def setUp(self):
self.stream = io.StringIO()
self.handler = logging.StreamHandler(self.stream)
logging.getLogger().handlers[:] = [self.handler]
logging.getLogger("borg.repository").handlers[:] = []
logging.getLogger("borg.repository.foo").handlers[:] = []
# capture stderr
sys.stderr.flush()
self.old_stderr = sys.stderr
self.stderr = sys.stderr = io.StringIO()
def tearDown(self):
sys.stderr = self.old_stderr
def test_stderr_messages(self):
handle_remote_line("unstructured stderr message\n")
self.assert_equal(self.stream.getvalue(), "stderr/remote: unstructured stderr message\n")
self.assert_equal(self.stderr.getvalue(), "")
def test_post11_format_messages(self):
self.handler.setLevel(logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
msg = (
"""{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,"""
""" "message": "borg >= 1.1 format message"}\n"""
)
handle_remote_line(msg)
self.assert_equal(self.stream.getvalue(), "Remote: borg >= 1.1 format message\n")
self.assert_equal(self.stderr.getvalue(), "")
def test_remote_messages_screened(self):
# default borg config for root logger
self.handler.setLevel(logging.WARNING)
logging.getLogger().setLevel(logging.WARNING)
msg = (
"""{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,"""
""" "message": "new format info message"}\n"""
)
handle_remote_line(msg)
self.assert_equal(self.stream.getvalue(), "")
self.assert_equal(self.stderr.getvalue(), "")
def test_info_to_correct_local_child(self):
logging.getLogger("borg.repository").setLevel(logging.INFO)
logging.getLogger("borg.repository.foo").setLevel(logging.INFO)
# default borg config for root logger
self.handler.setLevel(logging.WARNING)
logging.getLogger().setLevel(logging.WARNING)
child_stream = io.StringIO()
child_handler = logging.StreamHandler(child_stream)
child_handler.setLevel(logging.INFO)
logging.getLogger("borg.repository").handlers[:] = [child_handler]
foo_stream = io.StringIO()
foo_handler = logging.StreamHandler(foo_stream)
foo_handler.setLevel(logging.INFO)
logging.getLogger("borg.repository.foo").handlers[:] = [foo_handler]
msg = (
"""{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,"""
""" "message": "new format child message"}\n"""
)
handle_remote_line(msg)
self.assert_equal(foo_stream.getvalue(), "")
self.assert_equal(child_stream.getvalue(), "Remote: new format child message\n")
self.assert_equal(self.stream.getvalue(), "")
self.assert_equal(self.stderr.getvalue(), "")