From e351e67aee227d8482e228d27597942b598c946a Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Thu, 25 May 2023 00:55:43 +0200 Subject: [PATCH 1/9] RepositoryServer: do not use stderr for logging, see #7604 Instead, install a handler that sends the LogRecord dicts to a queue. That queue is then emptied in the borg serve main loop and the LogRecords are sent msgpacked via stdout to the client, similar to the RPC results. On the client side, the LogRecords are recreated from the received dicts and fed into the clientside logging system. As we use msgpacked LogRecord dicts, we don't need JSON for this purpose on the borg serve side any more. On the client side, the LogRecords will then be either formatted as normal text or as JSON log output (by the clientside log formatter). --- src/borg/archiver/__init__.py | 4 +- src/borg/logger.py | 27 ++++++++- src/borg/remote.py | 100 ++++++++++--------------------- src/borg/testsuite/repository.py | 75 +---------------------- 4 files changed, 61 insertions(+), 145 deletions(-) diff --git a/src/borg/archiver/__init__.py b/src/borg/archiver/__init__.py index 69f14d5cc..915594d1b 100644 --- a/src/borg/archiver/__init__.py +++ b/src/borg/archiver/__init__.py @@ -482,8 +482,8 @@ class Archiver( func = get_func(args) # do not use loggers before this! is_serve = func == self.do_serve - self.log_json = args.log_json or is_serve - setup_logging(level=args.log_level, json=self.log_json) + self.log_json = args.log_json and not is_serve + setup_logging(level=args.log_level, is_serve=is_serve, json=self.log_json) args.progress |= is_serve self._setup_implied_logging(vars(args)) self._setup_topic_debugging(args) diff --git a/src/borg/logger.py b/src/borg/logger.py index 6788ff118..1422902f0 100644 --- a/src/borg/logger.py +++ b/src/borg/logger.py @@ -36,9 +36,30 @@ import logging import logging.config import logging.handlers # needed for handlers defined there being configurable in logging.conf file import os +import queue import warnings configured = False +borg_serve_log_queue = queue.SimpleQueue() + + +class BorgQueueHandler(logging.handlers.QueueHandler): + """borg serve writes log record dicts to a borg_serve_log_queue""" + + def prepare(self, record: logging.LogRecord) -> dict: + return dict( + # kwargs needed for LogRecord constructor: + name=record.name, + level=record.levelno, + pathname=record.pathname, + lineno=record.lineno, + msg=record.msg, + args=record.args, + exc_info=record.exc_info, + func=record.funcName, + sinfo=record.stack_info, + ) + # use something like this to ignore warnings: # warnings.filterwarnings('ignore', r'... regex for warning message to ignore ...') @@ -53,7 +74,7 @@ def _log_warning(message, category, filename, lineno, file=None, line=None): logger.warning(msg) -def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", level="info", json=False): +def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", level="info", is_serve=False, json=False): """setup logging module according to the arguments provided if conf_fname is given (or the config file name can be determined via @@ -61,6 +82,8 @@ def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", lev otherwise, set up a stream handler logger on stderr (by default, if no stream is provided). + + is_serve: are we setting up the logging for "borg serve"? """ global configured err_msg = None @@ -86,7 +109,7 @@ def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", lev err_msg = str(err) # if we did not / not successfully load a logging configuration, fallback to this: logger = logging.getLogger("") - handler = logging.StreamHandler(stream) + handler = BorgQueueHandler(borg_serve_log_queue) if is_serve else logging.StreamHandler(stream) fmt = "%(message)s" formatter = JsonFormatter(fmt) if json else logging.Formatter(fmt) handler.setFormatter(formatter) diff --git a/src/borg/remote.py b/src/borg/remote.py index d70ebada3..6d459cf24 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -1,9 +1,9 @@ import errno import functools import inspect -import json import logging import os +import queue import select import shlex import shutil @@ -26,7 +26,7 @@ from .helpers import sysinfo from .helpers import format_file_size from .helpers import safe_unlink from .helpers import prepare_subprocess_env, ignore_sigint -from .logger import create_logger +from .logger import create_logger, borg_serve_log_queue from .helpers import msgpack from .repository import Repository from .version import parse_version, format_version @@ -36,7 +36,7 @@ from .helpers.datastruct import EfficientCollectionQueue logger = create_logger(__name__) BORG_VERSION = parse_version(__version__) -MSGID, MSG, ARGS, RESULT = "i", "m", "a", "r" +MSGID, MSG, ARGS, RESULT, LOG = "i", "m", "a", "r", "l" MAX_INFLIGHT = 100 @@ -154,25 +154,28 @@ class RepositoryServer: # pragma: no cover def serve(self): stdin_fd = sys.stdin.fileno() stdout_fd = sys.stdout.fileno() - stderr_fd = sys.stdout.fileno() os.set_blocking(stdin_fd, False) os.set_blocking(stdout_fd, True) - os.set_blocking(stderr_fd, True) unpacker = get_limited_unpacker("server") while True: + # before processing any new RPCs, send out all pending log output + while True: + try: + # lr_dict contents see BorgQueueHandler + lr_dict = borg_serve_log_queue.get_nowait() + except queue.Empty: + break + else: + msg = msgpack.packb({LOG: lr_dict}) + os_write(stdout_fd, msg) + + # process new RPCs r, w, es = select.select([stdin_fd], [], [], 10) if r: data = os.read(stdin_fd, BUFSIZE) if not data: if self.repository is not None: self.repository.close() - else: - os_write( - stderr_fd, - "Borg {}: Got connection close before repository was opened.\n".format( - __version__ - ).encode(), - ) return unpacker.feed(data) for unpacked in unpacker: @@ -726,10 +729,18 @@ class RemoteRepository: self.rx_bytes += len(data) self.unpacker.feed(data) for unpacked in self.unpacker: - if isinstance(unpacked, dict): - msgid = unpacked[MSGID] - else: + if not isinstance(unpacked, dict): raise UnexpectedRPCDataFormatFromServer(data) + + lr_dict = unpacked.get(LOG) + if lr_dict is not None: + # Re-emit remote log messages locally. + _logger = logging.getLogger(lr_dict["name"]) + if _logger.isEnabledFor(lr_dict["level"]): + _logger.handle(logging.LogRecord(**lr_dict)) + continue + + msgid = unpacked[MSGID] if msgid in self.ignore_responses: self.ignore_responses.remove(msgid) # async methods never return values, but may raise exceptions. @@ -755,8 +766,14 @@ class RemoteRepository: if lines and not lines[-1].endswith((b"\r", b"\n")): self.stderr_received = lines.pop() # now we have complete lines in and any partial line in self.stderr_received. + _logger = logging.getLogger() for line in lines: - handle_remote_line(line.decode()) # decode late, avoid partial utf-8 sequences + # borg serve (remote/server side) should not emit stuff on stderr, + # but e.g. the ssh process (local/client side) might output errors there. + assert line.endswith((b"\r", b"\n")) + # something came in on stderr, log it to not lose it. + # decode late, avoid partial utf-8 sequences. + _logger.warning("stderr: " + line.decode().strip()) if w: while ( (len(self.to_send) <= maximum_to_send) @@ -886,57 +903,6 @@ class RemoteRepository: self.preload_ids += ids -def handle_remote_line(line): - """ - Handle a remote log line. - - This function is remarkably complex because it handles multiple wire formats. - """ - assert line.endswith(("\r", "\n")) - if line.startswith("{"): - msg = json.loads(line) - - if msg["type"] not in ("progress_message", "progress_percent", "log_message"): - logger.warning("Dropped remote log message with unknown type %r: %s", msg["type"], line) - return - - if msg["type"] == "log_message": - # Re-emit log messages on the same level as the remote to get correct log suppression and verbosity. - level = getattr(logging, msg["levelname"], logging.CRITICAL) - assert isinstance(level, int) - target_logger = logging.getLogger(msg["name"]) - msg["message"] = "Remote: " + msg["message"] - # In JSON mode, we manually check whether the log message should be propagated. - if logging.getLogger("borg").json and level >= target_logger.getEffectiveLevel(): - sys.stderr.write(json.dumps(msg) + "\n") - else: - target_logger.log(level, "%s", msg["message"]) - elif msg["type"].startswith("progress_"): - # Progress messages are a bit more complex. - # First of all, we check whether progress output is enabled. This is signalled - # through the effective level of the borg.output.progress logger - # (also see ProgressIndicatorBase in borg.helpers). - progress_logger = logging.getLogger("borg.output.progress") - if progress_logger.getEffectiveLevel() == logging.INFO: - # When progress output is enabled, we check whether the client is in - # --log-json mode, as signalled by the "json" attribute on the "borg" logger. - if logging.getLogger("borg").json: - # In --log-json mode we re-emit the progress JSON line as sent by the server, - # with the message, if any, prefixed with "Remote: ". - if "message" in msg: - msg["message"] = "Remote: " + msg["message"] - sys.stderr.write(json.dumps(msg) + "\n") - elif "message" in msg: - # In text log mode we write only the message to stderr and terminate with \r - # (carriage return, i.e. move the write cursor back to the beginning of the line) - # so that the next message, progress or not, overwrites it. This mirrors the behaviour - # of local progress displays. - sys.stderr.write("Remote: " + msg["message"] + "\r") - else: - # We don't know what priority the line had. - logging.getLogger("").warning("stderr/remote: " + line.strip()) - - class RepositoryNoCache: """A not caching Repository wrapper, passes through to repository. diff --git a/src/borg/testsuite/repository.py b/src/borg/testsuite/repository.py index 93957584a..f04b11784 100644 --- a/src/borg/testsuite/repository.py +++ b/src/borg/testsuite/repository.py @@ -1,4 +1,3 @@ -import io import logging import os import shutil @@ -13,7 +12,7 @@ from ..helpers import Location from ..helpers import IntegrityError from ..helpers import msgpack from ..locking import Lock, LockFailed -from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, handle_remote_line +from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE, TAG_PUT2, TAG_PUT, TAG_COMMIT from ..repoobj import RepoObj from . import BaseTestCase @@ -1064,75 +1063,3 @@ class RemoteRepositoryCheckTestCase(RepositoryCheckTestCase): def test_repair_missing_segment(self): # skip this test, files in RemoteRepository cannot be deleted pass - - -class RemoteLoggerTestCase(BaseTestCase): - def setUp(self): - self.stream = io.StringIO() - self.handler = logging.StreamHandler(self.stream) - logging.getLogger().handlers[:] = [self.handler] - logging.getLogger("borg.repository").handlers[:] = [] - logging.getLogger("borg.repository.foo").handlers[:] = [] - # capture stderr - sys.stderr.flush() - self.old_stderr = sys.stderr - self.stderr = sys.stderr = io.StringIO() - - def tearDown(self): - sys.stderr = self.old_stderr - - def test_stderr_messages(self): - handle_remote_line("unstructured stderr message\n") - self.assert_equal(self.stream.getvalue(), "stderr/remote: unstructured stderr message\n") - self.assert_equal(self.stderr.getvalue(), "") - - def test_post11_format_messages(self): - self.handler.setLevel(logging.DEBUG) - logging.getLogger().setLevel(logging.DEBUG) - - msg = ( - """{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,""" - """ "message": "borg >= 1.1 format message"}\n""" - ) - handle_remote_line(msg) - self.assert_equal(self.stream.getvalue(), "Remote: borg >= 1.1 format message\n") - self.assert_equal(self.stderr.getvalue(), "") - - def test_remote_messages_screened(self): - # default borg config for root logger - self.handler.setLevel(logging.WARNING) - logging.getLogger().setLevel(logging.WARNING) - - msg = ( - """{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,""" - """ "message": "new format info message"}\n""" - ) - handle_remote_line(msg) - self.assert_equal(self.stream.getvalue(), "") - self.assert_equal(self.stderr.getvalue(), "") - - def test_info_to_correct_local_child(self): - logging.getLogger("borg.repository").setLevel(logging.INFO) - logging.getLogger("borg.repository.foo").setLevel(logging.INFO) - # default borg config for root logger - self.handler.setLevel(logging.WARNING) - logging.getLogger().setLevel(logging.WARNING) - - child_stream = io.StringIO() - child_handler = logging.StreamHandler(child_stream) - child_handler.setLevel(logging.INFO) - logging.getLogger("borg.repository").handlers[:] = [child_handler] - foo_stream = io.StringIO() - foo_handler = logging.StreamHandler(foo_stream) - foo_handler.setLevel(logging.INFO) - logging.getLogger("borg.repository.foo").handlers[:] = [foo_handler] - - msg = ( - """{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,""" - """ "message": "new format child message"}\n""" - ) - handle_remote_line(msg) - self.assert_equal(foo_stream.getvalue(), "") - self.assert_equal(child_stream.getvalue(), "Remote: new format child message\n") - self.assert_equal(self.stream.getvalue(), "") - self.assert_equal(self.stderr.getvalue(), "") From 0be545dc4513a15b88a7cfd94f713ee423575a09 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Thu, 25 May 2023 01:34:06 +0200 Subject: [PATCH 2/9] remove ProgressIndicatorEndless (not used) --- src/borg/helpers/__init__.py | 2 +- src/borg/helpers/progress.py | 34 ---------------------------------- src/borg/testsuite/helpers.py | 31 +------------------------------ 3 files changed, 2 insertions(+), 65 deletions(-) diff --git a/src/borg/helpers/__init__.py b/src/borg/helpers/__init__.py index e42d78415..1a745c49a 100644 --- a/src/borg/helpers/__init__.py +++ b/src/borg/helpers/__init__.py @@ -33,7 +33,7 @@ from .parseformat import BorgJsonEncoder, basic_json_data, json_print, json_dump from .process import daemonize, daemonizing from .process import signal_handler, raising_signal_handler, sig_int, ignore_sigint, SigHup, SigTerm from .process import popen_with_error_handling, is_terminal, prepare_subprocess_env, create_filter_process -from .progress import ProgressIndicatorPercent, ProgressIndicatorEndless, ProgressIndicatorMessage +from .progress import ProgressIndicatorPercent, ProgressIndicatorMessage from .time import parse_timestamp, timestamp, safe_timestamp, safe_s, safe_ns, MAX_S, SUPPORT_32BIT_PLATFORMS from .time import format_time, format_timedelta, OutputTimestamp, archive_ts_now from .yes_no import yes, TRUISH, FALSISH, DEFAULTISH diff --git a/src/borg/helpers/progress.py b/src/borg/helpers/progress.py index 782c7fb34..f57446409 100644 --- a/src/borg/helpers/progress.py +++ b/src/borg/helpers/progress.py @@ -162,37 +162,3 @@ class ProgressIndicatorPercent(ProgressIndicatorBase): if justify: message = justify_to_terminal_size(message) self.logger.info(message) - - -class ProgressIndicatorEndless: - def __init__(self, step=10, file=None): - """ - Progress indicator (long row of dots) - - :param step: every Nth call, call the func - :param file: output file, default: sys.stderr - """ - self.counter = 0 # call counter - self.triggered = 0 # increases 1 per trigger event - self.step = step # trigger every calls - if file is None: - file = sys.stderr - self.file = file - - def progress(self): - self.counter += 1 - trigger = self.counter % self.step == 0 - if trigger: - self.triggered += 1 - return trigger - - def show(self): - trigger = self.progress() - if trigger: - return self.output(self.triggered) - - def output(self, triggered): - print(".", end="", file=self.file, flush=True) - - def finish(self): - print(file=self.file) diff --git a/src/borg/testsuite/helpers.py b/src/borg/testsuite/helpers.py index eba51b311..6e8dd55b3 100644 --- a/src/borg/testsuite/helpers.py +++ b/src/borg/testsuite/helpers.py @@ -34,7 +34,7 @@ from ..helpers import yes, TRUISH, FALSISH, DEFAULTISH from ..helpers import StableDict, bin_to_hex from ..helpers import parse_timestamp, ChunkIteratorFileWrapper, ChunkerParams from ..helpers import archivename_validator, text_validator -from ..helpers import ProgressIndicatorPercent, ProgressIndicatorEndless +from ..helpers import ProgressIndicatorPercent from ..helpers import swidth_slice from ..helpers import chunkit from ..helpers import safe_ns, safe_s, SUPPORT_32BIT_PLATFORMS @@ -1073,35 +1073,6 @@ def test_progress_percentage_quiet(capfd): assert err == "" -def test_progress_endless(capfd): - pi = ProgressIndicatorEndless(step=1, file=sys.stderr) - pi.show() - out, err = capfd.readouterr() - assert err == "." - pi.show() - out, err = capfd.readouterr() - assert err == "." - pi.finish() - out, err = capfd.readouterr() - assert err == "\n" - - -def test_progress_endless_step(capfd): - pi = ProgressIndicatorEndless(step=2, file=sys.stderr) - pi.show() - out, err = capfd.readouterr() - assert err == "" # no output here as we have step == 2 - pi.show() - out, err = capfd.readouterr() - assert err == "." - pi.show() - out, err = capfd.readouterr() - assert err == "" # no output here as we have step == 2 - pi.show() - out, err = capfd.readouterr() - assert err == "." - - def test_partial_format(): assert partial_format("{space:10}", {"space": " "}) == " " * 10 assert partial_format("{foobar}", {"bar": "wrong", "foobar": "correct"}) == "correct" From c3a4568870c34eba715307c084dd58d0cd2c02a8 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 26 May 2023 03:44:50 +0200 Subject: [PATCH 3/9] channel progress output via logging system - simplify progress output (no \r, no terminal size related tweaks) - emit progress output via the logging system (so it does not use stderr of borg serve) - progress code always logs a json string, the json has all needed to either do json log output or plain text log output. - use formatters to generate plain or json output from that. - clean up setup_logging - use a StderrHandler that always uses the **current** sys.stderr - tweak TestPassphrase to not accidentally trigger just because of seeing 12 in output --- src/borg/helpers/progress.py | 96 +++--------------------- src/borg/logger.py | 60 ++++++++++++--- src/borg/testsuite/archiver/__init__.py | 2 - src/borg/testsuite/archiver/check_cmd.py | 3 - src/borg/testsuite/helpers.py | 57 ++++---------- 5 files changed, 77 insertions(+), 141 deletions(-) diff --git a/src/borg/helpers/progress.py b/src/borg/helpers/progress.py index f57446409..8d6f8caff 100644 --- a/src/borg/helpers/progress.py +++ b/src/borg/helpers/progress.py @@ -1,28 +1,15 @@ import logging import json -import sys import time -from shutil import get_terminal_size from ..logger import create_logger logger = create_logger() -from .parseformat import ellipsis_truncate - - -def justify_to_terminal_size(message): - terminal_space = get_terminal_size(fallback=(-1, -1))[0] - # justify only if we are outputting to a terminal - if terminal_space != -1: - return message.ljust(terminal_space) - return message - class ProgressIndicatorBase: LOGGER = "borg.output.progress" JSON_TYPE: str = None - json = False operation_id_counter = 0 @@ -33,73 +20,27 @@ class ProgressIndicatorBase: return cls.operation_id_counter def __init__(self, msgid=None): - self.handler = None self.logger = logging.getLogger(self.LOGGER) self.id = self.operation_id() self.msgid = msgid - # If there are no handlers, set one up explicitly because the - # terminator and propagation needs to be set. If there are, - # they must have been set up by BORG_LOGGING_CONF: skip setup. - if not self.logger.handlers: - self.handler = logging.StreamHandler(stream=sys.stderr) - self.handler.setLevel(logging.INFO) - logger = logging.getLogger("borg") - # Some special attributes on the borg logger, created by setup_logging - # But also be able to work without that - try: - formatter = logger.formatter - terminator = "\n" if logger.json else "\r" - self.json = logger.json - except AttributeError: - terminator = "\r" - else: - self.handler.setFormatter(formatter) - self.handler.terminator = terminator - - self.logger.addHandler(self.handler) - if self.logger.level == logging.NOTSET: - self.logger.setLevel(logging.WARN) - self.logger.propagate = False - - # If --progress is not set then the progress logger level will be WARN - # due to setup_implied_logging (it may be NOTSET with a logging config file, - # but the interactions there are generally unclear), so self.emit becomes - # False, which is correct. - # If --progress is set then the level will be INFO as per setup_implied_logging; - # note that this is always the case for serve processes due to a "args.progress |= is_serve". - # In this case self.emit is True. - self.emit = self.logger.getEffectiveLevel() == logging.INFO - - def __del__(self): - if self.handler is not None: - self.logger.removeHandler(self.handler) - self.handler.close() - - def output_json(self, *, finished=False, **kwargs): - assert self.json - if not self.emit: - return + def make_json(self, *, finished=False, **kwargs): kwargs.update( dict(operation=self.id, msgid=self.msgid, type=self.JSON_TYPE, finished=finished, time=time.time()) ) - print(json.dumps(kwargs), file=sys.stderr, flush=True) + return json.dumps(kwargs) def finish(self): - if self.json: - self.output_json(finished=True) - else: - self.output("") + j = self.make_json(message="", finished=True) + self.logger.info(j) class ProgressIndicatorMessage(ProgressIndicatorBase): JSON_TYPE = "progress_message" def output(self, msg): - if self.json: - self.output_json(message=msg) - else: - self.logger.info(justify_to_terminal_size(msg)) + j = self.make_json(message=msg) + self.logger.info(j) class ProgressIndicatorPercent(ProgressIndicatorBase): @@ -141,24 +82,11 @@ class ProgressIndicatorPercent(ProgressIndicatorBase): """ pct = self.progress(current, increase) if pct is not None: - # truncate the last argument, if no space is available if info is not None: - if not self.json: - from ..platform import swidth # avoid circular import + return self.output(self.msg % tuple([pct] + info), info=info) + else: + return self.output(self.msg % pct) - # no need to truncate if we're not outputting to a terminal - terminal_space = get_terminal_size(fallback=(-1, -1))[0] - if terminal_space != -1: - space = terminal_space - swidth(self.msg % tuple([pct] + info[:-1] + [""])) - info[-1] = ellipsis_truncate(info[-1], space) - return self.output(self.msg % tuple([pct] + info), justify=False, info=info) - - return self.output(self.msg % pct) - - def output(self, message, justify=True, info=None): - if self.json: - self.output_json(message=message, current=self.counter, total=self.total, info=info) - else: - if justify: - message = justify_to_terminal_size(message) - self.logger.info(message) + def output(self, message, info=None): + j = self.make_json(message=message, current=self.counter, total=self.total, info=info) + self.logger.info(j) diff --git a/src/borg/logger.py b/src/borg/logger.py index 1422902f0..b4bc070f8 100644 --- a/src/borg/logger.py +++ b/src/borg/logger.py @@ -37,10 +37,11 @@ import logging.config import logging.handlers # needed for handlers defined there being configurable in logging.conf file import os import queue +import sys import warnings configured = False -borg_serve_log_queue = queue.SimpleQueue() +borg_serve_log_queue: queue.SimpleQueue = queue.SimpleQueue() class BorgQueueHandler(logging.handlers.QueueHandler): @@ -61,6 +62,35 @@ class BorgQueueHandler(logging.handlers.QueueHandler): ) +class StderrHandler(logging.StreamHandler): + """ + This class is like a StreamHandler using sys.stderr, but always uses + whatever sys.stderr is currently set to rather than the value of + sys.stderr at handler construction time. + """ + + def __init__(self, stream=None): + logging.Handler.__init__(self) + + @property + def stream(self): + return sys.stderr + + +class TextProgressFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + # record.msg contains json (because we always do json for progress log) + j = json.loads(record.msg) + # inside the json, the text log line can be found under "message" + return f"{j['message']}" + + +class JSONProgressFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + # record.msg contains json (because we always do json for progress log) + return f"{record.msg}" + + # use something like this to ignore warnings: # warnings.filterwarnings('ignore', r'... regex for warning message to ignore ...') @@ -100,25 +130,37 @@ def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", lev logging.config.fileConfig(f) configured = True logger = logging.getLogger(__name__) - borg_logger = logging.getLogger("borg") - borg_logger.json = json logger.debug(f'using logging configuration read from "{conf_fname}"') warnings.showwarning = _log_warning return None except Exception as err: # XXX be more precise err_msg = str(err) + # if we did not / not successfully load a logging configuration, fallback to this: - logger = logging.getLogger("") - handler = BorgQueueHandler(borg_serve_log_queue) if is_serve else logging.StreamHandler(stream) + level = level.upper() fmt = "%(message)s" formatter = JsonFormatter(fmt) if json else logging.Formatter(fmt) + SHandler = StderrHandler if stream is None else logging.StreamHandler + handler = BorgQueueHandler(borg_serve_log_queue) if is_serve else SHandler(stream) handler.setFormatter(formatter) - borg_logger = logging.getLogger("borg") - borg_logger.formatter = formatter - borg_logger.json = json + logger = logging.getLogger() + (h.close() for h in list(logger.handlers)) + logger.handlers.clear() logger.addHandler(handler) - logger.setLevel(level.upper()) + logger.setLevel(level) + + bop_formatter = JSONProgressFormatter() if json else TextProgressFormatter() + bop_handler = BorgQueueHandler(borg_serve_log_queue) if is_serve else SHandler(stream) + bop_handler.setFormatter(bop_formatter) + bop_logger = logging.getLogger("borg.output.progress") + (h.close() for h in list(bop_logger.handlers)) + bop_logger.handlers.clear() + bop_logger.addHandler(bop_handler) + bop_logger.setLevel("INFO") + bop_logger.propagate = False + configured = True + logger = logging.getLogger(__name__) if err_msg: logger.warning(f'setup_logging for "{conf_fname}" failed with "{err_msg}".') diff --git a/src/borg/testsuite/archiver/__init__.py b/src/borg/testsuite/archiver/__init__.py index 932dc0f9c..cdbc12193 100644 --- a/src/borg/testsuite/archiver/__init__.py +++ b/src/borg/testsuite/archiver/__init__.py @@ -22,7 +22,6 @@ from ...helpers import Location from ...helpers import EXIT_SUCCESS from ...helpers import bin_to_hex from ...manifest import Manifest -from ...logger import setup_logging from ...remote import RemoteRepository from ...repository import Repository from .. import has_lchflags @@ -155,7 +154,6 @@ class ArchiverTestCaseBase(BaseTestCase): os.chdir(self._old_wd) # note: ignore_errors=True as workaround for issue #862 shutil.rmtree(self.tmpdir, ignore_errors=True) - setup_logging() def cmd(self, *args, **kw): exit_code = kw.pop("exit_code", 0) diff --git a/src/borg/testsuite/archiver/check_cmd.py b/src/borg/testsuite/archiver/check_cmd.py index 9669ee7ac..390199baf 100644 --- a/src/borg/testsuite/archiver/check_cmd.py +++ b/src/borg/testsuite/archiver/check_cmd.py @@ -1,4 +1,3 @@ -import logging import shutil import unittest from unittest.mock import patch @@ -26,8 +25,6 @@ class ArchiverCheckTestCase(ArchiverTestCaseBase): self.assert_in("Starting repository check", output) self.assert_in("Starting archive consistency check", output) self.assert_in("Checking segments", output) - # reset logging to new process default to avoid need for fork=True on next check - logging.getLogger("borg.output.progress").setLevel(logging.NOTSET) output = self.cmd(f"--repo={self.repository_location}", "check", "-v", "--repository-only", exit_code=0) self.assert_in("Starting repository check", output) self.assert_not_in("Starting archive consistency check", output) diff --git a/src/borg/testsuite/helpers.py b/src/borg/testsuite/helpers.py index 6e8dd55b3..3dee5755f 100644 --- a/src/borg/testsuite/helpers.py +++ b/src/borg/testsuite/helpers.py @@ -998,65 +998,36 @@ def test_yes_env_output(capfd, monkeypatch): assert "yes" in err -def test_progress_percentage_sameline(capfd, monkeypatch): - # run the test as if it was in a 4x1 terminal - monkeypatch.setenv("COLUMNS", "4") - monkeypatch.setenv("LINES", "1") +def test_progress_percentage(capfd): pi = ProgressIndicatorPercent(1000, step=5, start=0, msg="%3.0f%%") pi.logger.setLevel("INFO") pi.show(0) out, err = capfd.readouterr() - assert err == " 0%\r" + assert err == " 0%\n" pi.show(420) pi.show(680) out, err = capfd.readouterr() - assert err == " 42%\r 68%\r" + assert err == " 42%\n 68%\n" pi.show(1000) out, err = capfd.readouterr() - assert err == "100%\r" + assert err == "100%\n" pi.finish() out, err = capfd.readouterr() - assert err == " " * 4 + "\r" + assert err == "\n" -@pytest.mark.skipif(is_win32, reason="no working swidth() implementation on this platform") -def test_progress_percentage_widechars(capfd, monkeypatch): - st = "スター・トレック" # "startrek" :-) - assert swidth(st) == 16 - path = "/カーク船長です。" # "Captain Kirk" - assert swidth(path) == 17 - spaces = " " * 4 # to avoid usage of "..." - width = len("100%") + 1 + swidth(st) + 1 + swidth(path) + swidth(spaces) - monkeypatch.setenv("COLUMNS", str(width)) - monkeypatch.setenv("LINES", "1") - pi = ProgressIndicatorPercent(100, step=5, start=0, msg=f"%3.0f%% {st} %s") - pi.logger.setLevel("INFO") - pi.show(0, info=[path]) - out, err = capfd.readouterr() - assert err == f" 0% {st} {path}{spaces}\r" - pi.show(100, info=[path]) - out, err = capfd.readouterr() - assert err == f"100% {st} {path}{spaces}\r" - pi.finish() - out, err = capfd.readouterr() - assert err == " " * width + "\r" - - -def test_progress_percentage_step(capfd, monkeypatch): - # run the test as if it was in a 4x1 terminal - monkeypatch.setenv("COLUMNS", "4") - monkeypatch.setenv("LINES", "1") +def test_progress_percentage_step(capfd): pi = ProgressIndicatorPercent(100, step=2, start=0, msg="%3.0f%%") pi.logger.setLevel("INFO") pi.show() out, err = capfd.readouterr() - assert err == " 0%\r" + assert err == " 0%\n" pi.show() out, err = capfd.readouterr() assert err == "" # no output at 1% as we have step == 2 pi.show() out, err = capfd.readouterr() - assert err == " 2%\r" + assert err == " 2%\n" def test_progress_percentage_quiet(capfd): @@ -1293,19 +1264,19 @@ def test_safe_unlink_is_safe_ENOSPC(tmpdir, monkeypatch): class TestPassphrase: def test_passphrase_new_verification(self, capsys, monkeypatch): - monkeypatch.setattr(getpass, "getpass", lambda prompt: "12aöäü") + monkeypatch.setattr(getpass, "getpass", lambda prompt: "1234aöäü") monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "no") Passphrase.new() out, err = capsys.readouterr() - assert "12" not in out - assert "12" not in err + assert "1234" not in out + assert "1234" not in err monkeypatch.setenv("BORG_DISPLAY_PASSPHRASE", "yes") passphrase = Passphrase.new() out, err = capsys.readouterr() - assert "313261c3b6c3a4c3bc" not in out - assert "313261c3b6c3a4c3bc" in err - assert passphrase == "12aöäü" + assert "3132333461c3b6c3a4c3bc" not in out + assert "3132333461c3b6c3a4c3bc" in err + assert passphrase == "1234aöäü" monkeypatch.setattr(getpass, "getpass", lambda prompt: "1234/@=") Passphrase.new() From dac4609468560a269f4a1571a9686ccf166fdcc2 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 26 May 2023 04:42:37 +0200 Subject: [PATCH 4/9] remove_handlers --- src/borg/logger.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/borg/logger.py b/src/borg/logger.py index b4bc070f8..d5f7f81c7 100644 --- a/src/borg/logger.py +++ b/src/borg/logger.py @@ -104,6 +104,12 @@ def _log_warning(message, category, filename, lineno, file=None, line=None): logger.warning(msg) +def remove_handlers(logger): + for handler in logger.handlers[:]: + handler.close() + logger.removeHandler(handler) + + def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", level="info", is_serve=False, json=False): """setup logging module according to the arguments provided @@ -144,8 +150,7 @@ def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", lev handler = BorgQueueHandler(borg_serve_log_queue) if is_serve else SHandler(stream) handler.setFormatter(formatter) logger = logging.getLogger() - (h.close() for h in list(logger.handlers)) - logger.handlers.clear() + remove_handlers(logger) logger.addHandler(handler) logger.setLevel(level) @@ -153,8 +158,7 @@ def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", lev bop_handler = BorgQueueHandler(borg_serve_log_queue) if is_serve else SHandler(stream) bop_handler.setFormatter(bop_formatter) bop_logger = logging.getLogger("borg.output.progress") - (h.close() for h in list(bop_logger.handlers)) - bop_logger.handlers.clear() + remove_handlers(bop_logger) bop_logger.addHandler(bop_handler) bop_logger.setLevel("INFO") bop_logger.propagate = False From 746cef1cbabf8fdc125db615a8df766c6953a122 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 26 May 2023 12:47:31 +0200 Subject: [PATCH 5/9] teardown logging in exec_cmd for normal borg command invocation: - logging is set up in Archiver.run - the atexit handler calls logging.shutdown when process terminates for tests: - Archiver.run called by exec_cmd - no atexit handler executed as process lives on - borg.logger.teardown (calls shutdown and configured=False) now called in exec_cmd --- src/borg/logger.py | 7 +++++++ src/borg/testsuite/archiver/__init__.py | 6 +++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/borg/logger.py b/src/borg/logger.py index d5f7f81c7..a3049ea92 100644 --- a/src/borg/logger.py +++ b/src/borg/logger.py @@ -106,10 +106,17 @@ def _log_warning(message, category, filename, lineno, file=None, line=None): def remove_handlers(logger): for handler in logger.handlers[:]: + handler.flush() handler.close() logger.removeHandler(handler) +def teardown_logging(): + global configured + logging.shutdown() + configured = False + + def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", level="info", is_serve=False, json=False): """setup logging module according to the arguments provided diff --git a/src/borg/testsuite/archiver/__init__.py b/src/borg/testsuite/archiver/__init__.py index cdbc12193..56b66a439 100644 --- a/src/borg/testsuite/archiver/__init__.py +++ b/src/borg/testsuite/archiver/__init__.py @@ -21,6 +21,7 @@ from ...constants import * # NOQA from ...helpers import Location from ...helpers import EXIT_SUCCESS from ...helpers import bin_to_hex +from ...logger import teardown_logging from ...manifest import Manifest from ...remote import RemoteRepository from ...repository import Repository @@ -81,7 +82,10 @@ def exec_cmd(*args, archiver=None, fork=False, exe=None, input=b"", binary_outpu except SystemExit as e: output_text.flush() return e.code, output.getvalue() if binary_output else output.getvalue().decode() - ret = archiver.run(args) + try: + ret = archiver.run(args) + finally: + teardown_logging() # usually done via atexit, but we do not exit here output_text.flush() return ret, output.getvalue() if binary_output else output.getvalue().decode() finally: From ac4b5c35daa6c7524ca0ca417e703bff85097914 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 26 May 2023 13:06:40 +0200 Subject: [PATCH 6/9] borg serve: shutdown server after sending all queued log records --- src/borg/remote.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/borg/remote.py b/src/borg/remote.py index 6d459cf24..4b001e056 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -157,6 +157,7 @@ class RepositoryServer: # pragma: no cover os.set_blocking(stdin_fd, False) os.set_blocking(stdout_fd, True) unpacker = get_limited_unpacker("server") + shutdown_serve = False while True: # before processing any new RPCs, send out all pending log output while True: @@ -169,14 +170,19 @@ class RepositoryServer: # pragma: no cover msg = msgpack.packb({LOG: lr_dict}) os_write(stdout_fd, msg) + if shutdown_serve: + # shutdown wanted! get out of here after sending all log output. + if self.repository is not None: + self.repository.close() + return + # process new RPCs r, w, es = select.select([stdin_fd], [], [], 10) if r: data = os.read(stdin_fd, BUFSIZE) if not data: - if self.repository is not None: - self.repository.close() - return + shutdown_serve = True + continue unpacker.feed(data) for unpacked in unpacker: if isinstance(unpacked, dict): @@ -241,8 +247,8 @@ class RepositoryServer: # pragma: no cover else: os_write(stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res})) if es: - self.repository.close() - return + shutdown_serve = True + continue def negotiate(self, client_data): if isinstance(client_data, dict): From f84951b53ce449ea2b2ae57b28410b639b7b95fa Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 26 May 2023 16:53:28 +0200 Subject: [PATCH 7/9] add logging debugging functionality --- src/borg/archiver/__init__.py | 3 ++- src/borg/logger.py | 37 ++++++++++++++++++++++++++++++----- 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/src/borg/archiver/__init__.py b/src/borg/archiver/__init__.py index 915594d1b..f9cad6179 100644 --- a/src/borg/archiver/__init__.py +++ b/src/borg/archiver/__init__.py @@ -483,7 +483,8 @@ class Archiver( # do not use loggers before this! is_serve = func == self.do_serve self.log_json = args.log_json and not is_serve - setup_logging(level=args.log_level, is_serve=is_serve, json=self.log_json) + func_name = getattr(func, "__name__", "none") + setup_logging(level=args.log_level, is_serve=is_serve, log_json=self.log_json, func=func_name) args.progress |= is_serve self._setup_implied_logging(vars(args)) self._setup_topic_debugging(args) diff --git a/src/borg/logger.py b/src/borg/logger.py index a3049ea92..f158aa797 100644 --- a/src/borg/logger.py +++ b/src/borg/logger.py @@ -38,8 +38,12 @@ import logging.handlers # needed for handlers defined there being configurable import os import queue import sys +import time +from typing import Optional import warnings +logging_debugging_path: Optional[str] = None # if set, write borg.logger debugging log to path/borg-*.log + configured = False borg_serve_log_queue: queue.SimpleQueue = queue.SimpleQueue() @@ -117,7 +121,9 @@ def teardown_logging(): configured = False -def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", level="info", is_serve=False, json=False): +def setup_logging( + stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", level="info", is_serve=False, log_json=False, func=None +): """setup logging module according to the arguments provided if conf_fname is given (or the config file name can be determined via @@ -152,24 +158,45 @@ def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", lev # if we did not / not successfully load a logging configuration, fallback to this: level = level.upper() fmt = "%(message)s" - formatter = JsonFormatter(fmt) if json else logging.Formatter(fmt) + formatter = JsonFormatter(fmt) if log_json else logging.Formatter(fmt) SHandler = StderrHandler if stream is None else logging.StreamHandler handler = BorgQueueHandler(borg_serve_log_queue) if is_serve else SHandler(stream) handler.setFormatter(formatter) logger = logging.getLogger() remove_handlers(logger) - logger.addHandler(handler) logger.setLevel(level) - bop_formatter = JSONProgressFormatter() if json else TextProgressFormatter() + if logging_debugging_path is not None: + # add an addtl. root handler for debugging purposes + log_fname = os.path.join(logging_debugging_path, f"borg-{'serve' if is_serve else 'client'}-root.log") + handler2 = logging.StreamHandler(open(log_fname, "a")) + handler2.setFormatter(formatter) + logger.addHandler(handler2) + logger.warning(f"--- {func} ---") # only handler2 shall get this + + logger.addHandler(handler) # do this late, so handler is not added while debug handler is set up + + bop_formatter = JSONProgressFormatter() if log_json else TextProgressFormatter() bop_handler = BorgQueueHandler(borg_serve_log_queue) if is_serve else SHandler(stream) bop_handler.setFormatter(bop_formatter) bop_logger = logging.getLogger("borg.output.progress") remove_handlers(bop_logger) - bop_logger.addHandler(bop_handler) bop_logger.setLevel("INFO") bop_logger.propagate = False + if logging_debugging_path is not None: + # add an addtl. progress handler for debugging purposes + log_fname = os.path.join(logging_debugging_path, f"borg-{'serve' if is_serve else 'client'}-progress.log") + bop_handler2 = logging.StreamHandler(open(log_fname, "a")) + bop_handler2.setFormatter(bop_formatter) + bop_logger.addHandler(bop_handler2) + json_dict = dict( + message=f"--- {func} ---", operation=0, msgid="", type="progress_message", finished=False, time=time.time() + ) + bop_logger.warning(json.dumps(json_dict)) # only bop_handler2 shall get this + + bop_logger.addHandler(bop_handler) # do this late, so bop_handler is not added while debug handler is set up + configured = True logger = logging.getLogger(__name__) From 48c7879887b4fca6c6ac0f22df3560e450270263 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 27 May 2023 11:32:03 +0200 Subject: [PATCH 8/9] RemoteRepository: add .close method - tears down logging (so no new log output is generated afterwards) - sends all queued log output - then returns also: make stdin_fd / stdout_fd instance variables --- src/borg/remote.py | 46 +++++++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/src/borg/remote.py b/src/borg/remote.py index 4b001e056..4976e6c7f 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -15,6 +15,7 @@ import time import traceback from subprocess import Popen, PIPE +import borg.logger from . import __version__ from .compress import Compressor from .constants import * # NOQA @@ -125,6 +126,7 @@ class RepositoryServer: # pragma: no cover "scan", "negotiate", "open", + "close", "info", "put", "rollback", @@ -151,24 +153,27 @@ class RepositoryServer: # pragma: no cover known = set(inspect.signature(f).parameters) return {name: kwargs[name] for name in kwargs if name in known} + def send_queued_log(self): + while True: + try: + # lr_dict contents see BorgQueueHandler + lr_dict = borg_serve_log_queue.get_nowait() + except queue.Empty: + break + else: + msg = msgpack.packb({LOG: lr_dict}) + os_write(self.stdout_fd, msg) + def serve(self): - stdin_fd = sys.stdin.fileno() - stdout_fd = sys.stdout.fileno() - os.set_blocking(stdin_fd, False) - os.set_blocking(stdout_fd, True) + self.stdin_fd = sys.stdin.fileno() + self.stdout_fd = sys.stdout.fileno() + os.set_blocking(self.stdin_fd, False) + os.set_blocking(self.stdout_fd, True) unpacker = get_limited_unpacker("server") shutdown_serve = False while True: # before processing any new RPCs, send out all pending log output - while True: - try: - # lr_dict contents see BorgQueueHandler - lr_dict = borg_serve_log_queue.get_nowait() - except queue.Empty: - break - else: - msg = msgpack.packb({LOG: lr_dict}) - os_write(stdout_fd, msg) + self.send_queued_log() if shutdown_serve: # shutdown wanted! get out of here after sending all log output. @@ -177,9 +182,9 @@ class RepositoryServer: # pragma: no cover return # process new RPCs - r, w, es = select.select([stdin_fd], [], [], 10) + r, w, es = select.select([self.stdin_fd], [], [], 10) if r: - data = os.read(stdin_fd, BUFSIZE) + data = os.read(self.stdin_fd, BUFSIZE) if not data: shutdown_serve = True continue @@ -243,9 +248,9 @@ class RepositoryServer: # pragma: no cover } ) - os_write(stdout_fd, msg) + os_write(self.stdout_fd, msg) else: - os_write(stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res})) + os_write(self.stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res})) if es: shutdown_serve = True continue @@ -310,6 +315,12 @@ class RepositoryServer: # pragma: no cover self.repository.__enter__() # clean exit handled by serve() method return self.repository.id + def close(self): + if self.repository is not None: + self.repository.__exit__(None, None, None) + borg.logger.teardown_logging() + self.send_queued_log() + def inject_exception(self, kind): s1 = "test string" s2 = "test string2" @@ -895,6 +906,7 @@ class RemoteRepository: """actual remoting is done via self.call in the @api decorator""" def close(self): + self.call("close", {}, wait=True) if self.p: self.p.stdin.close() self.p.stdout.close() From e2ea5cf164abf32670f83953faf3bc923a705837 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Mon, 29 May 2023 17:30:09 +0200 Subject: [PATCH 9/9] tests: fix usage of .reopen() also: add missing param to RemoteRepositoryTestCase.open method, but ignore it. --- src/borg/testsuite/repository.py | 69 ++++++++++++++------------------ 1 file changed, 29 insertions(+), 40 deletions(-) diff --git a/src/borg/testsuite/repository.py b/src/borg/testsuite/repository.py index f04b11784..2b33bf9c1 100644 --- a/src/borg/testsuite/repository.py +++ b/src/borg/testsuite/repository.py @@ -97,13 +97,12 @@ class RepositoryTestCase(RepositoryTestCaseBase): self.repository.delete(key50) self.assert_raises(Repository.ObjectNotFound, lambda: self.repository.get(key50)) self.repository.commit(compact=False) - self.repository.close() - with self.open() as repository2: - self.assert_raises(Repository.ObjectNotFound, lambda: repository2.get(key50)) - for x in range(100): - if x == 50: - continue - self.assert_equal(pdchunk(repository2.get(H(x))), b"SOMEDATA") + self.reopen() + self.assert_raises(Repository.ObjectNotFound, lambda: self.repository.get(key50)) + for x in range(100): + if x == 50: + continue + self.assert_equal(pdchunk(self.repository.get(H(x))), b"SOMEDATA") def test2(self): """Test multiple sequential transactions""" @@ -158,17 +157,14 @@ class RepositoryTestCase(RepositoryTestCaseBase): # put self.repository.put(H(0), fchunk(b"foo")) self.repository.commit(compact=False) - self.repository.close() + self.reopen() # replace - self.repository = self.open() - with self.repository: - self.repository.put(H(0), fchunk(b"bar")) - self.repository.commit(compact=False) + self.repository.put(H(0), fchunk(b"bar")) + self.repository.commit(compact=False) + self.reopen() # delete - self.repository = self.open() - with self.repository: - self.repository.delete(H(0)) - self.repository.commit(compact=False) + self.repository.delete(H(0)) + self.repository.commit(compact=False) def test_list(self): for x in range(100): @@ -275,14 +271,11 @@ class RepositoryTestCase(RepositoryTestCaseBase): # we do not set flags for H(0), so we can later check their default state. self.repository.flags(H(1), mask=0x00000007, value=0x00000006) self.repository.commit(compact=False) - self.repository.close() - - self.repository = self.open() - with self.repository: - # we query all flags to check if the initial flags were all zero and - # only the ones we explicitly set to one are as expected. - self.assert_equal(self.repository.flags(H(0), mask=0xFFFFFFFF), 0x00000000) - self.assert_equal(self.repository.flags(H(1), mask=0xFFFFFFFF), 0x00000006) + self.reopen() + # we query all flags to check if the initial flags were all zero and + # only the ones we explicitly set to one are as expected. + self.assert_equal(self.repository.flags(H(0), mask=0xFFFFFFFF), 0x00000000) + self.assert_equal(self.repository.flags(H(1), mask=0xFFFFFFFF), 0x00000006) class LocalRepositoryTestCase(RepositoryTestCaseBase): @@ -336,12 +329,10 @@ class LocalRepositoryTestCase(RepositoryTestCaseBase): last_segment = self.repository.io.get_latest_segment() with open(self.repository.io.segment_filename(last_segment + 1), "wb") as f: f.write(MAGIC + b"crapcrapcrap") - self.repository.close() + self.reopen() # usually, opening the repo and starting a transaction should trigger a cleanup. - self.repository = self.open() - with self.repository: - self.repository.put(H(0), fchunk(b"bar")) # this may trigger compact_segments() - self.repository.commit(compact=True) + self.repository.put(H(0), fchunk(b"bar")) # this may trigger compact_segments() + self.repository.commit(compact=True) # the point here is that nothing blows up with an exception. @@ -384,11 +375,10 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase): os.unlink(os.path.join(self.repository.path, name)) with patch.object(Lock, "upgrade", side_effect=LockFailed) as upgrade: self.reopen(exclusive=None) # simulate old client that always does lock upgrades - with self.repository: - # the repo is only locked by a shared read lock, but to replay segments, - # we need an exclusive write lock - check if the lock gets upgraded. - self.assert_raises(LockFailed, lambda: len(self.repository)) - upgrade.assert_called_once_with() + # the repo is only locked by a shared read lock, but to replay segments, + # we need an exclusive write lock - check if the lock gets upgraded. + self.assert_raises(LockFailed, lambda: len(self.repository)) + upgrade.assert_called_once_with() def test_replay_lock_upgrade(self): self.add_keys() @@ -397,11 +387,10 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase): os.unlink(os.path.join(self.repository.path, name)) with patch.object(Lock, "upgrade", side_effect=LockFailed) as upgrade: self.reopen(exclusive=False) # current client usually does not do lock upgrade, except for replay - with self.repository: - # the repo is only locked by a shared read lock, but to replay segments, - # we need an exclusive write lock - check if the lock gets upgraded. - self.assert_raises(LockFailed, lambda: len(self.repository)) - upgrade.assert_called_once_with() + # the repo is only locked by a shared read lock, but to replay segments, + # we need an exclusive write lock - check if the lock gets upgraded. + self.assert_raises(LockFailed, lambda: len(self.repository)) + upgrade.assert_called_once_with() def test_crash_before_deleting_compacted_segments(self): self.add_keys() @@ -931,7 +920,7 @@ class RepositoryHintsTestCase(RepositoryTestCaseBase): class RemoteRepositoryTestCase(RepositoryTestCase): repository = None # type: RemoteRepository - def open(self, create=False): + def open(self, create=False, exclusive=UNSPECIFIED): return RemoteRepository( Location("ssh://__testsuite__" + os.path.join(self.tmppath, "repository")), exclusive=True, create=create )