mirror of
https://github.com/borgbackup/borg.git
synced 2025-01-01 12:45:34 +00:00
Merge pull request #7607 from ThomasWaldmann/new-remote-logging
new remote and progress logging, cleanups
This commit is contained in:
commit
87b74f3b0d
9 changed files with 246 additions and 407 deletions
|
@ -482,8 +482,9 @@ def run(self, args):
|
|||
func = get_func(args)
|
||||
# do not use loggers before this!
|
||||
is_serve = func == self.do_serve
|
||||
self.log_json = args.log_json or is_serve
|
||||
setup_logging(level=args.log_level, json=self.log_json)
|
||||
self.log_json = args.log_json and not is_serve
|
||||
func_name = getattr(func, "__name__", "none")
|
||||
setup_logging(level=args.log_level, is_serve=is_serve, log_json=self.log_json, func=func_name)
|
||||
args.progress |= is_serve
|
||||
self._setup_implied_logging(vars(args))
|
||||
self._setup_topic_debugging(args)
|
||||
|
|
|
@ -33,7 +33,7 @@
|
|||
from .process import daemonize, daemonizing
|
||||
from .process import signal_handler, raising_signal_handler, sig_int, ignore_sigint, SigHup, SigTerm
|
||||
from .process import popen_with_error_handling, is_terminal, prepare_subprocess_env, create_filter_process
|
||||
from .progress import ProgressIndicatorPercent, ProgressIndicatorEndless, ProgressIndicatorMessage
|
||||
from .progress import ProgressIndicatorPercent, ProgressIndicatorMessage
|
||||
from .time import parse_timestamp, timestamp, safe_timestamp, safe_s, safe_ns, MAX_S, SUPPORT_32BIT_PLATFORMS
|
||||
from .time import format_time, format_timedelta, OutputTimestamp, archive_ts_now
|
||||
from .yes_no import yes, TRUISH, FALSISH, DEFAULTISH
|
||||
|
|
|
@ -1,28 +1,15 @@
|
|||
import logging
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from shutil import get_terminal_size
|
||||
|
||||
from ..logger import create_logger
|
||||
|
||||
logger = create_logger()
|
||||
|
||||
from .parseformat import ellipsis_truncate
|
||||
|
||||
|
||||
def justify_to_terminal_size(message):
|
||||
terminal_space = get_terminal_size(fallback=(-1, -1))[0]
|
||||
# justify only if we are outputting to a terminal
|
||||
if terminal_space != -1:
|
||||
return message.ljust(terminal_space)
|
||||
return message
|
||||
|
||||
|
||||
class ProgressIndicatorBase:
|
||||
LOGGER = "borg.output.progress"
|
||||
JSON_TYPE: str = None
|
||||
json = False
|
||||
|
||||
operation_id_counter = 0
|
||||
|
||||
|
@ -33,73 +20,27 @@ def operation_id(cls):
|
|||
return cls.operation_id_counter
|
||||
|
||||
def __init__(self, msgid=None):
|
||||
self.handler = None
|
||||
self.logger = logging.getLogger(self.LOGGER)
|
||||
self.id = self.operation_id()
|
||||
self.msgid = msgid
|
||||
|
||||
# If there are no handlers, set one up explicitly because the
|
||||
# terminator and propagation needs to be set. If there are,
|
||||
# they must have been set up by BORG_LOGGING_CONF: skip setup.
|
||||
if not self.logger.handlers:
|
||||
self.handler = logging.StreamHandler(stream=sys.stderr)
|
||||
self.handler.setLevel(logging.INFO)
|
||||
logger = logging.getLogger("borg")
|
||||
# Some special attributes on the borg logger, created by setup_logging
|
||||
# But also be able to work without that
|
||||
try:
|
||||
formatter = logger.formatter
|
||||
terminator = "\n" if logger.json else "\r"
|
||||
self.json = logger.json
|
||||
except AttributeError:
|
||||
terminator = "\r"
|
||||
else:
|
||||
self.handler.setFormatter(formatter)
|
||||
self.handler.terminator = terminator
|
||||
|
||||
self.logger.addHandler(self.handler)
|
||||
if self.logger.level == logging.NOTSET:
|
||||
self.logger.setLevel(logging.WARN)
|
||||
self.logger.propagate = False
|
||||
|
||||
# If --progress is not set then the progress logger level will be WARN
|
||||
# due to setup_implied_logging (it may be NOTSET with a logging config file,
|
||||
# but the interactions there are generally unclear), so self.emit becomes
|
||||
# False, which is correct.
|
||||
# If --progress is set then the level will be INFO as per setup_implied_logging;
|
||||
# note that this is always the case for serve processes due to a "args.progress |= is_serve".
|
||||
# In this case self.emit is True.
|
||||
self.emit = self.logger.getEffectiveLevel() == logging.INFO
|
||||
|
||||
def __del__(self):
|
||||
if self.handler is not None:
|
||||
self.logger.removeHandler(self.handler)
|
||||
self.handler.close()
|
||||
|
||||
def output_json(self, *, finished=False, **kwargs):
|
||||
assert self.json
|
||||
if not self.emit:
|
||||
return
|
||||
def make_json(self, *, finished=False, **kwargs):
|
||||
kwargs.update(
|
||||
dict(operation=self.id, msgid=self.msgid, type=self.JSON_TYPE, finished=finished, time=time.time())
|
||||
)
|
||||
print(json.dumps(kwargs), file=sys.stderr, flush=True)
|
||||
return json.dumps(kwargs)
|
||||
|
||||
def finish(self):
|
||||
if self.json:
|
||||
self.output_json(finished=True)
|
||||
else:
|
||||
self.output("")
|
||||
j = self.make_json(message="", finished=True)
|
||||
self.logger.info(j)
|
||||
|
||||
|
||||
class ProgressIndicatorMessage(ProgressIndicatorBase):
|
||||
JSON_TYPE = "progress_message"
|
||||
|
||||
def output(self, msg):
|
||||
if self.json:
|
||||
self.output_json(message=msg)
|
||||
else:
|
||||
self.logger.info(justify_to_terminal_size(msg))
|
||||
j = self.make_json(message=msg)
|
||||
self.logger.info(j)
|
||||
|
||||
|
||||
class ProgressIndicatorPercent(ProgressIndicatorBase):
|
||||
|
@ -141,58 +82,11 @@ def show(self, current=None, increase=1, info=None):
|
|||
"""
|
||||
pct = self.progress(current, increase)
|
||||
if pct is not None:
|
||||
# truncate the last argument, if no space is available
|
||||
if info is not None:
|
||||
if not self.json:
|
||||
from ..platform import swidth # avoid circular import
|
||||
return self.output(self.msg % tuple([pct] + info), info=info)
|
||||
else:
|
||||
return self.output(self.msg % pct)
|
||||
|
||||
# no need to truncate if we're not outputting to a terminal
|
||||
terminal_space = get_terminal_size(fallback=(-1, -1))[0]
|
||||
if terminal_space != -1:
|
||||
space = terminal_space - swidth(self.msg % tuple([pct] + info[:-1] + [""]))
|
||||
info[-1] = ellipsis_truncate(info[-1], space)
|
||||
return self.output(self.msg % tuple([pct] + info), justify=False, info=info)
|
||||
|
||||
return self.output(self.msg % pct)
|
||||
|
||||
def output(self, message, justify=True, info=None):
|
||||
if self.json:
|
||||
self.output_json(message=message, current=self.counter, total=self.total, info=info)
|
||||
else:
|
||||
if justify:
|
||||
message = justify_to_terminal_size(message)
|
||||
self.logger.info(message)
|
||||
|
||||
|
||||
class ProgressIndicatorEndless:
|
||||
def __init__(self, step=10, file=None):
|
||||
"""
|
||||
Progress indicator (long row of dots)
|
||||
|
||||
:param step: every Nth call, call the func
|
||||
:param file: output file, default: sys.stderr
|
||||
"""
|
||||
self.counter = 0 # call counter
|
||||
self.triggered = 0 # increases 1 per trigger event
|
||||
self.step = step # trigger every <step> calls
|
||||
if file is None:
|
||||
file = sys.stderr
|
||||
self.file = file
|
||||
|
||||
def progress(self):
|
||||
self.counter += 1
|
||||
trigger = self.counter % self.step == 0
|
||||
if trigger:
|
||||
self.triggered += 1
|
||||
return trigger
|
||||
|
||||
def show(self):
|
||||
trigger = self.progress()
|
||||
if trigger:
|
||||
return self.output(self.triggered)
|
||||
|
||||
def output(self, triggered):
|
||||
print(".", end="", file=self.file, flush=True)
|
||||
|
||||
def finish(self):
|
||||
print(file=self.file)
|
||||
def output(self, message, info=None):
|
||||
j = self.make_json(message=message, current=self.counter, total=self.total, info=info)
|
||||
self.logger.info(j)
|
||||
|
|
|
@ -36,9 +36,64 @@
|
|||
import logging.config
|
||||
import logging.handlers # needed for handlers defined there being configurable in logging.conf file
|
||||
import os
|
||||
import queue
|
||||
import sys
|
||||
import time
|
||||
from typing import Optional
|
||||
import warnings
|
||||
|
||||
logging_debugging_path: Optional[str] = None # if set, write borg.logger debugging log to path/borg-*.log
|
||||
|
||||
configured = False
|
||||
borg_serve_log_queue: queue.SimpleQueue = queue.SimpleQueue()
|
||||
|
||||
|
||||
class BorgQueueHandler(logging.handlers.QueueHandler):
|
||||
"""borg serve writes log record dicts to a borg_serve_log_queue"""
|
||||
|
||||
def prepare(self, record: logging.LogRecord) -> dict:
|
||||
return dict(
|
||||
# kwargs needed for LogRecord constructor:
|
||||
name=record.name,
|
||||
level=record.levelno,
|
||||
pathname=record.pathname,
|
||||
lineno=record.lineno,
|
||||
msg=record.msg,
|
||||
args=record.args,
|
||||
exc_info=record.exc_info,
|
||||
func=record.funcName,
|
||||
sinfo=record.stack_info,
|
||||
)
|
||||
|
||||
|
||||
class StderrHandler(logging.StreamHandler):
|
||||
"""
|
||||
This class is like a StreamHandler using sys.stderr, but always uses
|
||||
whatever sys.stderr is currently set to rather than the value of
|
||||
sys.stderr at handler construction time.
|
||||
"""
|
||||
|
||||
def __init__(self, stream=None):
|
||||
logging.Handler.__init__(self)
|
||||
|
||||
@property
|
||||
def stream(self):
|
||||
return sys.stderr
|
||||
|
||||
|
||||
class TextProgressFormatter(logging.Formatter):
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
# record.msg contains json (because we always do json for progress log)
|
||||
j = json.loads(record.msg)
|
||||
# inside the json, the text log line can be found under "message"
|
||||
return f"{j['message']}"
|
||||
|
||||
|
||||
class JSONProgressFormatter(logging.Formatter):
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
# record.msg contains json (because we always do json for progress log)
|
||||
return f"{record.msg}"
|
||||
|
||||
|
||||
# use something like this to ignore warnings:
|
||||
# warnings.filterwarnings('ignore', r'... regex for warning message to ignore ...')
|
||||
|
@ -53,7 +108,22 @@ def _log_warning(message, category, filename, lineno, file=None, line=None):
|
|||
logger.warning(msg)
|
||||
|
||||
|
||||
def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", level="info", json=False):
|
||||
def remove_handlers(logger):
|
||||
for handler in logger.handlers[:]:
|
||||
handler.flush()
|
||||
handler.close()
|
||||
logger.removeHandler(handler)
|
||||
|
||||
|
||||
def teardown_logging():
|
||||
global configured
|
||||
logging.shutdown()
|
||||
configured = False
|
||||
|
||||
|
||||
def setup_logging(
|
||||
stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", level="info", is_serve=False, log_json=False, func=None
|
||||
):
|
||||
"""setup logging module according to the arguments provided
|
||||
|
||||
if conf_fname is given (or the config file name can be determined via
|
||||
|
@ -61,6 +131,8 @@ def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", lev
|
|||
|
||||
otherwise, set up a stream handler logger on stderr (by default, if no
|
||||
stream is provided).
|
||||
|
||||
is_serve: are we setting up the logging for "borg serve"?
|
||||
"""
|
||||
global configured
|
||||
err_msg = None
|
||||
|
@ -77,25 +149,56 @@ def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", lev
|
|||
logging.config.fileConfig(f)
|
||||
configured = True
|
||||
logger = logging.getLogger(__name__)
|
||||
borg_logger = logging.getLogger("borg")
|
||||
borg_logger.json = json
|
||||
logger.debug(f'using logging configuration read from "{conf_fname}"')
|
||||
warnings.showwarning = _log_warning
|
||||
return None
|
||||
except Exception as err: # XXX be more precise
|
||||
err_msg = str(err)
|
||||
|
||||
# if we did not / not successfully load a logging configuration, fallback to this:
|
||||
logger = logging.getLogger("")
|
||||
handler = logging.StreamHandler(stream)
|
||||
level = level.upper()
|
||||
fmt = "%(message)s"
|
||||
formatter = JsonFormatter(fmt) if json else logging.Formatter(fmt)
|
||||
formatter = JsonFormatter(fmt) if log_json else logging.Formatter(fmt)
|
||||
SHandler = StderrHandler if stream is None else logging.StreamHandler
|
||||
handler = BorgQueueHandler(borg_serve_log_queue) if is_serve else SHandler(stream)
|
||||
handler.setFormatter(formatter)
|
||||
borg_logger = logging.getLogger("borg")
|
||||
borg_logger.formatter = formatter
|
||||
borg_logger.json = json
|
||||
logger.addHandler(handler)
|
||||
logger.setLevel(level.upper())
|
||||
logger = logging.getLogger()
|
||||
remove_handlers(logger)
|
||||
logger.setLevel(level)
|
||||
|
||||
if logging_debugging_path is not None:
|
||||
# add an addtl. root handler for debugging purposes
|
||||
log_fname = os.path.join(logging_debugging_path, f"borg-{'serve' if is_serve else 'client'}-root.log")
|
||||
handler2 = logging.StreamHandler(open(log_fname, "a"))
|
||||
handler2.setFormatter(formatter)
|
||||
logger.addHandler(handler2)
|
||||
logger.warning(f"--- {func} ---") # only handler2 shall get this
|
||||
|
||||
logger.addHandler(handler) # do this late, so handler is not added while debug handler is set up
|
||||
|
||||
bop_formatter = JSONProgressFormatter() if log_json else TextProgressFormatter()
|
||||
bop_handler = BorgQueueHandler(borg_serve_log_queue) if is_serve else SHandler(stream)
|
||||
bop_handler.setFormatter(bop_formatter)
|
||||
bop_logger = logging.getLogger("borg.output.progress")
|
||||
remove_handlers(bop_logger)
|
||||
bop_logger.setLevel("INFO")
|
||||
bop_logger.propagate = False
|
||||
|
||||
if logging_debugging_path is not None:
|
||||
# add an addtl. progress handler for debugging purposes
|
||||
log_fname = os.path.join(logging_debugging_path, f"borg-{'serve' if is_serve else 'client'}-progress.log")
|
||||
bop_handler2 = logging.StreamHandler(open(log_fname, "a"))
|
||||
bop_handler2.setFormatter(bop_formatter)
|
||||
bop_logger.addHandler(bop_handler2)
|
||||
json_dict = dict(
|
||||
message=f"--- {func} ---", operation=0, msgid="", type="progress_message", finished=False, time=time.time()
|
||||
)
|
||||
bop_logger.warning(json.dumps(json_dict)) # only bop_handler2 shall get this
|
||||
|
||||
bop_logger.addHandler(bop_handler) # do this late, so bop_handler is not added while debug handler is set up
|
||||
|
||||
configured = True
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
if err_msg:
|
||||
logger.warning(f'setup_logging for "{conf_fname}" failed with "{err_msg}".')
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
import errno
|
||||
import functools
|
||||
import inspect
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import queue
|
||||
import select
|
||||
import shlex
|
||||
import shutil
|
||||
|
@ -15,6 +15,7 @@
|
|||
import traceback
|
||||
from subprocess import Popen, PIPE
|
||||
|
||||
import borg.logger
|
||||
from . import __version__
|
||||
from .compress import Compressor
|
||||
from .constants import * # NOQA
|
||||
|
@ -26,7 +27,7 @@
|
|||
from .helpers import format_file_size
|
||||
from .helpers import safe_unlink
|
||||
from .helpers import prepare_subprocess_env, ignore_sigint
|
||||
from .logger import create_logger
|
||||
from .logger import create_logger, borg_serve_log_queue
|
||||
from .helpers import msgpack
|
||||
from .repository import Repository
|
||||
from .version import parse_version, format_version
|
||||
|
@ -36,7 +37,7 @@
|
|||
logger = create_logger(__name__)
|
||||
|
||||
BORG_VERSION = parse_version(__version__)
|
||||
MSGID, MSG, ARGS, RESULT = "i", "m", "a", "r"
|
||||
MSGID, MSG, ARGS, RESULT, LOG = "i", "m", "a", "r", "l"
|
||||
|
||||
MAX_INFLIGHT = 100
|
||||
|
||||
|
@ -125,6 +126,7 @@ class RepositoryServer: # pragma: no cover
|
|||
"scan",
|
||||
"negotiate",
|
||||
"open",
|
||||
"close",
|
||||
"info",
|
||||
"put",
|
||||
"rollback",
|
||||
|
@ -151,29 +153,41 @@ def filter_args(self, f, kwargs):
|
|||
known = set(inspect.signature(f).parameters)
|
||||
return {name: kwargs[name] for name in kwargs if name in known}
|
||||
|
||||
def serve(self):
|
||||
stdin_fd = sys.stdin.fileno()
|
||||
stdout_fd = sys.stdout.fileno()
|
||||
stderr_fd = sys.stdout.fileno()
|
||||
os.set_blocking(stdin_fd, False)
|
||||
os.set_blocking(stdout_fd, True)
|
||||
os.set_blocking(stderr_fd, True)
|
||||
unpacker = get_limited_unpacker("server")
|
||||
def send_queued_log(self):
|
||||
while True:
|
||||
r, w, es = select.select([stdin_fd], [], [], 10)
|
||||
try:
|
||||
# lr_dict contents see BorgQueueHandler
|
||||
lr_dict = borg_serve_log_queue.get_nowait()
|
||||
except queue.Empty:
|
||||
break
|
||||
else:
|
||||
msg = msgpack.packb({LOG: lr_dict})
|
||||
os_write(self.stdout_fd, msg)
|
||||
|
||||
def serve(self):
|
||||
self.stdin_fd = sys.stdin.fileno()
|
||||
self.stdout_fd = sys.stdout.fileno()
|
||||
os.set_blocking(self.stdin_fd, False)
|
||||
os.set_blocking(self.stdout_fd, True)
|
||||
unpacker = get_limited_unpacker("server")
|
||||
shutdown_serve = False
|
||||
while True:
|
||||
# before processing any new RPCs, send out all pending log output
|
||||
self.send_queued_log()
|
||||
|
||||
if shutdown_serve:
|
||||
# shutdown wanted! get out of here after sending all log output.
|
||||
if self.repository is not None:
|
||||
self.repository.close()
|
||||
return
|
||||
|
||||
# process new RPCs
|
||||
r, w, es = select.select([self.stdin_fd], [], [], 10)
|
||||
if r:
|
||||
data = os.read(stdin_fd, BUFSIZE)
|
||||
data = os.read(self.stdin_fd, BUFSIZE)
|
||||
if not data:
|
||||
if self.repository is not None:
|
||||
self.repository.close()
|
||||
else:
|
||||
os_write(
|
||||
stderr_fd,
|
||||
"Borg {}: Got connection close before repository was opened.\n".format(
|
||||
__version__
|
||||
).encode(),
|
||||
)
|
||||
return
|
||||
shutdown_serve = True
|
||||
continue
|
||||
unpacker.feed(data)
|
||||
for unpacked in unpacker:
|
||||
if isinstance(unpacked, dict):
|
||||
|
@ -234,12 +248,12 @@ def serve(self):
|
|||
}
|
||||
)
|
||||
|
||||
os_write(stdout_fd, msg)
|
||||
os_write(self.stdout_fd, msg)
|
||||
else:
|
||||
os_write(stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res}))
|
||||
os_write(self.stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res}))
|
||||
if es:
|
||||
self.repository.close()
|
||||
return
|
||||
shutdown_serve = True
|
||||
continue
|
||||
|
||||
def negotiate(self, client_data):
|
||||
if isinstance(client_data, dict):
|
||||
|
@ -301,6 +315,12 @@ def open(
|
|||
self.repository.__enter__() # clean exit handled by serve() method
|
||||
return self.repository.id
|
||||
|
||||
def close(self):
|
||||
if self.repository is not None:
|
||||
self.repository.__exit__(None, None, None)
|
||||
borg.logger.teardown_logging()
|
||||
self.send_queued_log()
|
||||
|
||||
def inject_exception(self, kind):
|
||||
s1 = "test string"
|
||||
s2 = "test string2"
|
||||
|
@ -726,10 +746,18 @@ def handle_error(unpacked):
|
|||
self.rx_bytes += len(data)
|
||||
self.unpacker.feed(data)
|
||||
for unpacked in self.unpacker:
|
||||
if isinstance(unpacked, dict):
|
||||
msgid = unpacked[MSGID]
|
||||
else:
|
||||
if not isinstance(unpacked, dict):
|
||||
raise UnexpectedRPCDataFormatFromServer(data)
|
||||
|
||||
lr_dict = unpacked.get(LOG)
|
||||
if lr_dict is not None:
|
||||
# Re-emit remote log messages locally.
|
||||
_logger = logging.getLogger(lr_dict["name"])
|
||||
if _logger.isEnabledFor(lr_dict["level"]):
|
||||
_logger.handle(logging.LogRecord(**lr_dict))
|
||||
continue
|
||||
|
||||
msgid = unpacked[MSGID]
|
||||
if msgid in self.ignore_responses:
|
||||
self.ignore_responses.remove(msgid)
|
||||
# async methods never return values, but may raise exceptions.
|
||||
|
@ -755,8 +783,14 @@ def handle_error(unpacked):
|
|||
if lines and not lines[-1].endswith((b"\r", b"\n")):
|
||||
self.stderr_received = lines.pop()
|
||||
# now we have complete lines in <lines> and any partial line in self.stderr_received.
|
||||
_logger = logging.getLogger()
|
||||
for line in lines:
|
||||
handle_remote_line(line.decode()) # decode late, avoid partial utf-8 sequences
|
||||
# borg serve (remote/server side) should not emit stuff on stderr,
|
||||
# but e.g. the ssh process (local/client side) might output errors there.
|
||||
assert line.endswith((b"\r", b"\n"))
|
||||
# something came in on stderr, log it to not lose it.
|
||||
# decode late, avoid partial utf-8 sequences.
|
||||
_logger.warning("stderr: " + line.decode().strip())
|
||||
if w:
|
||||
while (
|
||||
(len(self.to_send) <= maximum_to_send)
|
||||
|
@ -872,6 +906,7 @@ def break_lock(self):
|
|||
"""actual remoting is done via self.call in the @api decorator"""
|
||||
|
||||
def close(self):
|
||||
self.call("close", {}, wait=True)
|
||||
if self.p:
|
||||
self.p.stdin.close()
|
||||
self.p.stdout.close()
|
||||
|
@ -886,57 +921,6 @@ def preload(self, ids):
|
|||
self.preload_ids += ids
|
||||
|
||||
|
||||
def handle_remote_line(line):
|
||||
"""
|
||||
Handle a remote log line.
|
||||
|
||||
This function is remarkably complex because it handles multiple wire formats.
|
||||
"""
|
||||
assert line.endswith(("\r", "\n"))
|
||||
if line.startswith("{"):
|
||||
msg = json.loads(line)
|
||||
|
||||
if msg["type"] not in ("progress_message", "progress_percent", "log_message"):
|
||||
logger.warning("Dropped remote log message with unknown type %r: %s", msg["type"], line)
|
||||
return
|
||||
|
||||
if msg["type"] == "log_message":
|
||||
# Re-emit log messages on the same level as the remote to get correct log suppression and verbosity.
|
||||
level = getattr(logging, msg["levelname"], logging.CRITICAL)
|
||||
assert isinstance(level, int)
|
||||
target_logger = logging.getLogger(msg["name"])
|
||||
msg["message"] = "Remote: " + msg["message"]
|
||||
# In JSON mode, we manually check whether the log message should be propagated.
|
||||
if logging.getLogger("borg").json and level >= target_logger.getEffectiveLevel():
|
||||
sys.stderr.write(json.dumps(msg) + "\n")
|
||||
else:
|
||||
target_logger.log(level, "%s", msg["message"])
|
||||
elif msg["type"].startswith("progress_"):
|
||||
# Progress messages are a bit more complex.
|
||||
# First of all, we check whether progress output is enabled. This is signalled
|
||||
# through the effective level of the borg.output.progress logger
|
||||
# (also see ProgressIndicatorBase in borg.helpers).
|
||||
progress_logger = logging.getLogger("borg.output.progress")
|
||||
if progress_logger.getEffectiveLevel() == logging.INFO:
|
||||
# When progress output is enabled, we check whether the client is in
|
||||
# --log-json mode, as signalled by the "json" attribute on the "borg" logger.
|
||||
if logging.getLogger("borg").json:
|
||||
# In --log-json mode we re-emit the progress JSON line as sent by the server,
|
||||
# with the message, if any, prefixed with "Remote: ".
|
||||
if "message" in msg:
|
||||
msg["message"] = "Remote: " + msg["message"]
|
||||
sys.stderr.write(json.dumps(msg) + "\n")
|
||||
elif "message" in msg:
|
||||
# In text log mode we write only the message to stderr and terminate with \r
|
||||
# (carriage return, i.e. move the write cursor back to the beginning of the line)
|
||||
# so that the next message, progress or not, overwrites it. This mirrors the behaviour
|
||||
# of local progress displays.
|
||||
sys.stderr.write("Remote: " + msg["message"] + "\r")
|
||||
else:
|
||||
# We don't know what priority the line had.
|
||||
logging.getLogger("").warning("stderr/remote: " + line.strip())
|
||||
|
||||
|
||||
class RepositoryNoCache:
|
||||
"""A not caching Repository wrapper, passes through to repository.
|
||||
|
||||
|
|
|
@ -21,8 +21,8 @@
|
|||
from ...helpers import Location
|
||||
from ...helpers import EXIT_SUCCESS
|
||||
from ...helpers import bin_to_hex
|
||||
from ...logger import teardown_logging
|
||||
from ...manifest import Manifest
|
||||
from ...logger import setup_logging
|
||||
from ...remote import RemoteRepository
|
||||
from ...repository import Repository
|
||||
from .. import has_lchflags
|
||||
|
@ -82,7 +82,10 @@ def exec_cmd(*args, archiver=None, fork=False, exe=None, input=b"", binary_outpu
|
|||
except SystemExit as e:
|
||||
output_text.flush()
|
||||
return e.code, output.getvalue() if binary_output else output.getvalue().decode()
|
||||
ret = archiver.run(args)
|
||||
try:
|
||||
ret = archiver.run(args)
|
||||
finally:
|
||||
teardown_logging() # usually done via atexit, but we do not exit here
|
||||
output_text.flush()
|
||||
return ret, output.getvalue() if binary_output else output.getvalue().decode()
|
||||
finally:
|
||||
|
@ -155,7 +158,6 @@ def tearDown(self):
|
|||
os.chdir(self._old_wd)
|
||||
# note: ignore_errors=True as workaround for issue #862
|
||||
shutil.rmtree(self.tmpdir, ignore_errors=True)
|
||||
setup_logging()
|
||||
|
||||
def cmd(self, *args, **kw):
|
||||
exit_code = kw.pop("exit_code", 0)
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
import logging
|
||||
import shutil
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
@ -26,8 +25,6 @@ def test_check_usage(self):
|
|||
self.assert_in("Starting repository check", output)
|
||||
self.assert_in("Starting archive consistency check", output)
|
||||
self.assert_in("Checking segments", output)
|
||||
# reset logging to new process default to avoid need for fork=True on next check
|
||||
logging.getLogger("borg.output.progress").setLevel(logging.NOTSET)
|
||||
output = self.cmd(f"--repo={self.repository_location}", "check", "-v", "--repository-only", exit_code=0)
|
||||
self.assert_in("Starting repository check", output)
|
||||
self.assert_not_in("Starting archive consistency check", output)
|
||||
|
|
|
@ -34,7 +34,7 @@
|
|||
from ..helpers import StableDict, bin_to_hex
|
||||
from ..helpers import parse_timestamp, ChunkIteratorFileWrapper, ChunkerParams
|
||||
from ..helpers import archivename_validator, text_validator
|
||||
from ..helpers import ProgressIndicatorPercent, ProgressIndicatorEndless
|
||||
from ..helpers import ProgressIndicatorPercent
|
||||
from ..helpers import swidth_slice
|
||||
from ..helpers import chunkit
|
||||
from ..helpers import safe_ns, safe_s, SUPPORT_32BIT_PLATFORMS
|
||||
|
@ -998,65 +998,36 @@ def test_yes_env_output(capfd, monkeypatch):
|
|||
assert "yes" in err
|
||||
|
||||
|
||||
def test_progress_percentage_sameline(capfd, monkeypatch):
|
||||
# run the test as if it was in a 4x1 terminal
|
||||
monkeypatch.setenv("COLUMNS", "4")
|
||||
monkeypatch.setenv("LINES", "1")
|
||||
def test_progress_percentage(capfd):
|
||||
pi = ProgressIndicatorPercent(1000, step=5, start=0, msg="%3.0f%%")
|
||||
pi.logger.setLevel("INFO")
|
||||
pi.show(0)
|
||||
out, err = capfd.readouterr()
|
||||
assert err == " 0%\r"
|
||||
assert err == " 0%\n"
|
||||
pi.show(420)
|
||||
pi.show(680)
|
||||
out, err = capfd.readouterr()
|
||||
assert err == " 42%\r 68%\r"
|
||||
assert err == " 42%\n 68%\n"
|
||||
pi.show(1000)
|
||||
out, err = capfd.readouterr()
|
||||
assert err == "100%\r"
|
||||
assert err == "100%\n"
|
||||
pi.finish()
|
||||
out, err = capfd.readouterr()
|
||||
assert err == " " * 4 + "\r"
|
||||
assert err == "\n"
|
||||
|
||||
|
||||
@pytest.mark.skipif(is_win32, reason="no working swidth() implementation on this platform")
|
||||
def test_progress_percentage_widechars(capfd, monkeypatch):
|
||||
st = "スター・トレック" # "startrek" :-)
|
||||
assert swidth(st) == 16
|
||||
path = "/カーク船長です。" # "Captain Kirk"
|
||||
assert swidth(path) == 17
|
||||
spaces = " " * 4 # to avoid usage of "..."
|
||||
width = len("100%") + 1 + swidth(st) + 1 + swidth(path) + swidth(spaces)
|
||||
monkeypatch.setenv("COLUMNS", str(width))
|
||||
monkeypatch.setenv("LINES", "1")
|
||||
pi = ProgressIndicatorPercent(100, step=5, start=0, msg=f"%3.0f%% {st} %s")
|
||||
pi.logger.setLevel("INFO")
|
||||
pi.show(0, info=[path])
|
||||
out, err = capfd.readouterr()
|
||||
assert err == f" 0% {st} {path}{spaces}\r"
|
||||
pi.show(100, info=[path])
|
||||
out, err = capfd.readouterr()
|
||||
assert err == f"100% {st} {path}{spaces}\r"
|
||||
pi.finish()
|
||||
out, err = capfd.readouterr()
|
||||
assert err == " " * width + "\r"
|
||||
|
||||
|
||||
def test_progress_percentage_step(capfd, monkeypatch):
|
||||
# run the test as if it was in a 4x1 terminal
|
||||
monkeypatch.setenv("COLUMNS", "4")
|
||||
monkeypatch.setenv("LINES", "1")
|
||||
def test_progress_percentage_step(capfd):
|
||||
pi = ProgressIndicatorPercent(100, step=2, start=0, msg="%3.0f%%")
|
||||
pi.logger.setLevel("INFO")
|
||||
pi.show()
|
||||
out, err = capfd.readouterr()
|
||||
assert err == " 0%\r"
|
||||
assert err == " 0%\n"
|
||||
pi.show()
|
||||
out, err = capfd.readouterr()
|
||||
assert err == "" # no output at 1% as we have step == 2
|
||||
pi.show()
|
||||
out, err = capfd.readouterr()
|
||||
assert err == " 2%\r"
|
||||
assert err == " 2%\n"
|
||||
|
||||
|
||||
def test_progress_percentage_quiet(capfd):
|
||||
|
@ -1073,35 +1044,6 @@ def test_progress_percentage_quiet(capfd):
|
|||
assert err == ""
|
||||
|
||||
|
||||
def test_progress_endless(capfd):
|
||||
pi = ProgressIndicatorEndless(step=1, file=sys.stderr)
|
||||
pi.show()
|
||||
out, err = capfd.readouterr()
|
||||
assert err == "."
|
||||
pi.show()
|
||||
out, err = capfd.readouterr()
|
||||
assert err == "."
|
||||
pi.finish()
|
||||
out, err = capfd.readouterr()
|
||||
assert err == "\n"
|
||||
|
||||
|
||||
def test_progress_endless_step(capfd):
|
||||
pi = ProgressIndicatorEndless(step=2, file=sys.stderr)
|
||||
pi.show()
|
||||
out, err = capfd.readouterr()
|
||||
assert err == "" # no output here as we have step == 2
|
||||
pi.show()
|
||||
out, err = capfd.readouterr()
|
||||
assert err == "."
|
||||
pi.show()
|
||||
out, err = capfd.readouterr()
|
||||
assert err == "" # no output here as we have step == 2
|
||||
pi.show()
|
||||
out, err = capfd.readouterr()
|
||||
assert err == "."
|
||||
|
||||
|
||||
def test_partial_format():
|
||||
assert partial_format("{space:10}", {"space": " "}) == " " * 10
|
||||
assert partial_format("{foobar}", {"bar": "wrong", "foobar": "correct"}) == "correct"
|
||||
|
@ -1322,19 +1264,19 @@ def os_unlink(_):
|
|||
|
||||
class TestPassphrase:
|
||||
def test_passphrase_new_verification(self, capsys, monkeypatch):
|
||||
monkeypatch.setattr(getpass, "getpass", lambda prompt: "12aöäü")
|
||||
monkeypatch.setattr(getpass, "getpass", lambda prompt: "1234aöäü")
|
||||
monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "no")
|
||||
Passphrase.new()
|
||||
out, err = capsys.readouterr()
|
||||
assert "12" not in out
|
||||
assert "12" not in err
|
||||
assert "1234" not in out
|
||||
assert "1234" not in err
|
||||
|
||||
monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "yes")
|
||||
passphrase = Passphrase.new()
|
||||
out, err = capsys.readouterr()
|
||||
assert "313261c3b6c3a4c3bc" not in out
|
||||
assert "313261c3b6c3a4c3bc" in err
|
||||
assert passphrase == "12aöäü"
|
||||
assert "3132333461c3b6c3a4c3bc" not in out
|
||||
assert "3132333461c3b6c3a4c3bc" in err
|
||||
assert passphrase == "1234aöäü"
|
||||
|
||||
monkeypatch.setattr(getpass, "getpass", lambda prompt: "1234/@=")
|
||||
Passphrase.new()
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
import io
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
|
@ -13,7 +12,7 @@
|
|||
from ..helpers import IntegrityError
|
||||
from ..helpers import msgpack
|
||||
from ..locking import Lock, LockFailed
|
||||
from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, handle_remote_line
|
||||
from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed
|
||||
from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE, TAG_PUT2, TAG_PUT, TAG_COMMIT
|
||||
from ..repoobj import RepoObj
|
||||
from . import BaseTestCase
|
||||
|
@ -98,13 +97,12 @@ def test1(self):
|
|||
self.repository.delete(key50)
|
||||
self.assert_raises(Repository.ObjectNotFound, lambda: self.repository.get(key50))
|
||||
self.repository.commit(compact=False)
|
||||
self.repository.close()
|
||||
with self.open() as repository2:
|
||||
self.assert_raises(Repository.ObjectNotFound, lambda: repository2.get(key50))
|
||||
for x in range(100):
|
||||
if x == 50:
|
||||
continue
|
||||
self.assert_equal(pdchunk(repository2.get(H(x))), b"SOMEDATA")
|
||||
self.reopen()
|
||||
self.assert_raises(Repository.ObjectNotFound, lambda: self.repository.get(key50))
|
||||
for x in range(100):
|
||||
if x == 50:
|
||||
continue
|
||||
self.assert_equal(pdchunk(self.repository.get(H(x))), b"SOMEDATA")
|
||||
|
||||
def test2(self):
|
||||
"""Test multiple sequential transactions"""
|
||||
|
@ -159,17 +157,14 @@ def test_single_kind_transactions(self):
|
|||
# put
|
||||
self.repository.put(H(0), fchunk(b"foo"))
|
||||
self.repository.commit(compact=False)
|
||||
self.repository.close()
|
||||
self.reopen()
|
||||
# replace
|
||||
self.repository = self.open()
|
||||
with self.repository:
|
||||
self.repository.put(H(0), fchunk(b"bar"))
|
||||
self.repository.commit(compact=False)
|
||||
self.repository.put(H(0), fchunk(b"bar"))
|
||||
self.repository.commit(compact=False)
|
||||
self.reopen()
|
||||
# delete
|
||||
self.repository = self.open()
|
||||
with self.repository:
|
||||
self.repository.delete(H(0))
|
||||
self.repository.commit(compact=False)
|
||||
self.repository.delete(H(0))
|
||||
self.repository.commit(compact=False)
|
||||
|
||||
def test_list(self):
|
||||
for x in range(100):
|
||||
|
@ -276,14 +271,11 @@ def test_flags_persistence(self):
|
|||
# we do not set flags for H(0), so we can later check their default state.
|
||||
self.repository.flags(H(1), mask=0x00000007, value=0x00000006)
|
||||
self.repository.commit(compact=False)
|
||||
self.repository.close()
|
||||
|
||||
self.repository = self.open()
|
||||
with self.repository:
|
||||
# we query all flags to check if the initial flags were all zero and
|
||||
# only the ones we explicitly set to one are as expected.
|
||||
self.assert_equal(self.repository.flags(H(0), mask=0xFFFFFFFF), 0x00000000)
|
||||
self.assert_equal(self.repository.flags(H(1), mask=0xFFFFFFFF), 0x00000006)
|
||||
self.reopen()
|
||||
# we query all flags to check if the initial flags were all zero and
|
||||
# only the ones we explicitly set to one are as expected.
|
||||
self.assert_equal(self.repository.flags(H(0), mask=0xFFFFFFFF), 0x00000000)
|
||||
self.assert_equal(self.repository.flags(H(1), mask=0xFFFFFFFF), 0x00000006)
|
||||
|
||||
|
||||
class LocalRepositoryTestCase(RepositoryTestCaseBase):
|
||||
|
@ -337,12 +329,10 @@ def test_uncommitted_garbage(self):
|
|||
last_segment = self.repository.io.get_latest_segment()
|
||||
with open(self.repository.io.segment_filename(last_segment + 1), "wb") as f:
|
||||
f.write(MAGIC + b"crapcrapcrap")
|
||||
self.repository.close()
|
||||
self.reopen()
|
||||
# usually, opening the repo and starting a transaction should trigger a cleanup.
|
||||
self.repository = self.open()
|
||||
with self.repository:
|
||||
self.repository.put(H(0), fchunk(b"bar")) # this may trigger compact_segments()
|
||||
self.repository.commit(compact=True)
|
||||
self.repository.put(H(0), fchunk(b"bar")) # this may trigger compact_segments()
|
||||
self.repository.commit(compact=True)
|
||||
# the point here is that nothing blows up with an exception.
|
||||
|
||||
|
||||
|
@ -385,11 +375,10 @@ def test_replay_lock_upgrade_old(self):
|
|||
os.unlink(os.path.join(self.repository.path, name))
|
||||
with patch.object(Lock, "upgrade", side_effect=LockFailed) as upgrade:
|
||||
self.reopen(exclusive=None) # simulate old client that always does lock upgrades
|
||||
with self.repository:
|
||||
# the repo is only locked by a shared read lock, but to replay segments,
|
||||
# we need an exclusive write lock - check if the lock gets upgraded.
|
||||
self.assert_raises(LockFailed, lambda: len(self.repository))
|
||||
upgrade.assert_called_once_with()
|
||||
# the repo is only locked by a shared read lock, but to replay segments,
|
||||
# we need an exclusive write lock - check if the lock gets upgraded.
|
||||
self.assert_raises(LockFailed, lambda: len(self.repository))
|
||||
upgrade.assert_called_once_with()
|
||||
|
||||
def test_replay_lock_upgrade(self):
|
||||
self.add_keys()
|
||||
|
@ -398,11 +387,10 @@ def test_replay_lock_upgrade(self):
|
|||
os.unlink(os.path.join(self.repository.path, name))
|
||||
with patch.object(Lock, "upgrade", side_effect=LockFailed) as upgrade:
|
||||
self.reopen(exclusive=False) # current client usually does not do lock upgrade, except for replay
|
||||
with self.repository:
|
||||
# the repo is only locked by a shared read lock, but to replay segments,
|
||||
# we need an exclusive write lock - check if the lock gets upgraded.
|
||||
self.assert_raises(LockFailed, lambda: len(self.repository))
|
||||
upgrade.assert_called_once_with()
|
||||
# the repo is only locked by a shared read lock, but to replay segments,
|
||||
# we need an exclusive write lock - check if the lock gets upgraded.
|
||||
self.assert_raises(LockFailed, lambda: len(self.repository))
|
||||
upgrade.assert_called_once_with()
|
||||
|
||||
def test_crash_before_deleting_compacted_segments(self):
|
||||
self.add_keys()
|
||||
|
@ -932,7 +920,7 @@ def test_hints_behaviour(self):
|
|||
class RemoteRepositoryTestCase(RepositoryTestCase):
|
||||
repository = None # type: RemoteRepository
|
||||
|
||||
def open(self, create=False):
|
||||
def open(self, create=False, exclusive=UNSPECIFIED):
|
||||
return RemoteRepository(
|
||||
Location("ssh://__testsuite__" + os.path.join(self.tmppath, "repository")), exclusive=True, create=create
|
||||
)
|
||||
|
@ -1064,75 +1052,3 @@ def test_repair_missing_commit_segment(self):
|
|||
def test_repair_missing_segment(self):
|
||||
# skip this test, files in RemoteRepository cannot be deleted
|
||||
pass
|
||||
|
||||
|
||||
class RemoteLoggerTestCase(BaseTestCase):
|
||||
def setUp(self):
|
||||
self.stream = io.StringIO()
|
||||
self.handler = logging.StreamHandler(self.stream)
|
||||
logging.getLogger().handlers[:] = [self.handler]
|
||||
logging.getLogger("borg.repository").handlers[:] = []
|
||||
logging.getLogger("borg.repository.foo").handlers[:] = []
|
||||
# capture stderr
|
||||
sys.stderr.flush()
|
||||
self.old_stderr = sys.stderr
|
||||
self.stderr = sys.stderr = io.StringIO()
|
||||
|
||||
def tearDown(self):
|
||||
sys.stderr = self.old_stderr
|
||||
|
||||
def test_stderr_messages(self):
|
||||
handle_remote_line("unstructured stderr message\n")
|
||||
self.assert_equal(self.stream.getvalue(), "stderr/remote: unstructured stderr message\n")
|
||||
self.assert_equal(self.stderr.getvalue(), "")
|
||||
|
||||
def test_post11_format_messages(self):
|
||||
self.handler.setLevel(logging.DEBUG)
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
msg = (
|
||||
"""{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,"""
|
||||
""" "message": "borg >= 1.1 format message"}\n"""
|
||||
)
|
||||
handle_remote_line(msg)
|
||||
self.assert_equal(self.stream.getvalue(), "Remote: borg >= 1.1 format message\n")
|
||||
self.assert_equal(self.stderr.getvalue(), "")
|
||||
|
||||
def test_remote_messages_screened(self):
|
||||
# default borg config for root logger
|
||||
self.handler.setLevel(logging.WARNING)
|
||||
logging.getLogger().setLevel(logging.WARNING)
|
||||
|
||||
msg = (
|
||||
"""{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,"""
|
||||
""" "message": "new format info message"}\n"""
|
||||
)
|
||||
handle_remote_line(msg)
|
||||
self.assert_equal(self.stream.getvalue(), "")
|
||||
self.assert_equal(self.stderr.getvalue(), "")
|
||||
|
||||
def test_info_to_correct_local_child(self):
|
||||
logging.getLogger("borg.repository").setLevel(logging.INFO)
|
||||
logging.getLogger("borg.repository.foo").setLevel(logging.INFO)
|
||||
# default borg config for root logger
|
||||
self.handler.setLevel(logging.WARNING)
|
||||
logging.getLogger().setLevel(logging.WARNING)
|
||||
|
||||
child_stream = io.StringIO()
|
||||
child_handler = logging.StreamHandler(child_stream)
|
||||
child_handler.setLevel(logging.INFO)
|
||||
logging.getLogger("borg.repository").handlers[:] = [child_handler]
|
||||
foo_stream = io.StringIO()
|
||||
foo_handler = logging.StreamHandler(foo_stream)
|
||||
foo_handler.setLevel(logging.INFO)
|
||||
logging.getLogger("borg.repository.foo").handlers[:] = [foo_handler]
|
||||
|
||||
msg = (
|
||||
"""{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,"""
|
||||
""" "message": "new format child message"}\n"""
|
||||
)
|
||||
handle_remote_line(msg)
|
||||
self.assert_equal(foo_stream.getvalue(), "")
|
||||
self.assert_equal(child_stream.getvalue(), "Remote: new format child message\n")
|
||||
self.assert_equal(self.stream.getvalue(), "")
|
||||
self.assert_equal(self.stderr.getvalue(), "")
|
||||
|
|
Loading…
Reference in a new issue