1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2025-01-01 12:45:34 +00:00

RemoteRepository: add .close method

- tears down logging (so no new log output is generated afterwards)
- sends all queued log output
- then returns

also: make stdin_fd / stdout_fd instance variables
This commit is contained in:
Thomas Waldmann 2023-05-27 11:32:03 +02:00
parent f84951b53c
commit 48c7879887
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01

View file

@ -15,6 +15,7 @@
import traceback
from subprocess import Popen, PIPE
import borg.logger
from . import __version__
from .compress import Compressor
from .constants import * # NOQA
@ -125,6 +126,7 @@ class RepositoryServer: # pragma: no cover
"scan",
"negotiate",
"open",
"close",
"info",
"put",
"rollback",
@ -151,24 +153,27 @@ def filter_args(self, f, kwargs):
known = set(inspect.signature(f).parameters)
return {name: kwargs[name] for name in kwargs if name in known}
def send_queued_log(self):
while True:
try:
# lr_dict contents see BorgQueueHandler
lr_dict = borg_serve_log_queue.get_nowait()
except queue.Empty:
break
else:
msg = msgpack.packb({LOG: lr_dict})
os_write(self.stdout_fd, msg)
def serve(self):
stdin_fd = sys.stdin.fileno()
stdout_fd = sys.stdout.fileno()
os.set_blocking(stdin_fd, False)
os.set_blocking(stdout_fd, True)
self.stdin_fd = sys.stdin.fileno()
self.stdout_fd = sys.stdout.fileno()
os.set_blocking(self.stdin_fd, False)
os.set_blocking(self.stdout_fd, True)
unpacker = get_limited_unpacker("server")
shutdown_serve = False
while True:
# before processing any new RPCs, send out all pending log output
while True:
try:
# lr_dict contents see BorgQueueHandler
lr_dict = borg_serve_log_queue.get_nowait()
except queue.Empty:
break
else:
msg = msgpack.packb({LOG: lr_dict})
os_write(stdout_fd, msg)
self.send_queued_log()
if shutdown_serve:
# shutdown wanted! get out of here after sending all log output.
@ -177,9 +182,9 @@ def serve(self):
return
# process new RPCs
r, w, es = select.select([stdin_fd], [], [], 10)
r, w, es = select.select([self.stdin_fd], [], [], 10)
if r:
data = os.read(stdin_fd, BUFSIZE)
data = os.read(self.stdin_fd, BUFSIZE)
if not data:
shutdown_serve = True
continue
@ -243,9 +248,9 @@ def serve(self):
}
)
os_write(stdout_fd, msg)
os_write(self.stdout_fd, msg)
else:
os_write(stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res}))
os_write(self.stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res}))
if es:
shutdown_serve = True
continue
@ -310,6 +315,12 @@ def open(
self.repository.__enter__() # clean exit handled by serve() method
return self.repository.id
def close(self):
if self.repository is not None:
self.repository.__exit__(None, None, None)
borg.logger.teardown_logging()
self.send_queued_log()
def inject_exception(self, kind):
s1 = "test string"
s2 = "test string2"
@ -895,6 +906,7 @@ def break_lock(self):
"""actual remoting is done via self.call in the @api decorator"""
def close(self):
self.call("close", {}, wait=True)
if self.p:
self.p.stdin.close()
self.p.stdout.close()