From 48c7879887b4fca6c6ac0f22df3560e450270263 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 27 May 2023 11:32:03 +0200 Subject: [PATCH] RemoteRepository: add .close method - tears down logging (so no new log output is generated afterwards) - sends all queued log output - then returns also: make stdin_fd / stdout_fd instance variables --- src/borg/remote.py | 46 +++++++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/src/borg/remote.py b/src/borg/remote.py index 4b001e056..4976e6c7f 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -15,6 +15,7 @@ import traceback from subprocess import Popen, PIPE +import borg.logger from . import __version__ from .compress import Compressor from .constants import * # NOQA @@ -125,6 +126,7 @@ class RepositoryServer: # pragma: no cover "scan", "negotiate", "open", + "close", "info", "put", "rollback", @@ -151,24 +153,27 @@ def filter_args(self, f, kwargs): known = set(inspect.signature(f).parameters) return {name: kwargs[name] for name in kwargs if name in known} + def send_queued_log(self): + while True: + try: + # lr_dict contents see BorgQueueHandler + lr_dict = borg_serve_log_queue.get_nowait() + except queue.Empty: + break + else: + msg = msgpack.packb({LOG: lr_dict}) + os_write(self.stdout_fd, msg) + def serve(self): - stdin_fd = sys.stdin.fileno() - stdout_fd = sys.stdout.fileno() - os.set_blocking(stdin_fd, False) - os.set_blocking(stdout_fd, True) + self.stdin_fd = sys.stdin.fileno() + self.stdout_fd = sys.stdout.fileno() + os.set_blocking(self.stdin_fd, False) + os.set_blocking(self.stdout_fd, True) unpacker = get_limited_unpacker("server") shutdown_serve = False while True: # before processing any new RPCs, send out all pending log output - while True: - try: - # lr_dict contents see BorgQueueHandler - lr_dict = borg_serve_log_queue.get_nowait() - except queue.Empty: - break - else: - msg = msgpack.packb({LOG: lr_dict}) - os_write(stdout_fd, msg) + self.send_queued_log() if shutdown_serve: # shutdown wanted! get out of here after sending all log output. @@ -177,9 +182,9 @@ def serve(self): return # process new RPCs - r, w, es = select.select([stdin_fd], [], [], 10) + r, w, es = select.select([self.stdin_fd], [], [], 10) if r: - data = os.read(stdin_fd, BUFSIZE) + data = os.read(self.stdin_fd, BUFSIZE) if not data: shutdown_serve = True continue @@ -243,9 +248,9 @@ def serve(self): } ) - os_write(stdout_fd, msg) + os_write(self.stdout_fd, msg) else: - os_write(stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res})) + os_write(self.stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res})) if es: shutdown_serve = True continue @@ -310,6 +315,12 @@ def open( self.repository.__enter__() # clean exit handled by serve() method return self.repository.id + def close(self): + if self.repository is not None: + self.repository.__exit__(None, None, None) + borg.logger.teardown_logging() + self.send_queued_log() + def inject_exception(self, kind): s1 = "test string" s2 = "test string2" @@ -895,6 +906,7 @@ def break_lock(self): """actual remoting is done via self.call in the @api decorator""" def close(self): + self.call("close", {}, wait=True) if self.p: self.p.stdin.close() self.p.stdout.close()