1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2025-01-01 12:45:34 +00:00

remove unused remote.RepositoryServer

This commit is contained in:
Thomas Waldmann 2024-08-22 19:25:44 +02:00
parent e23231b2c4
commit d9f24def6a
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01

View file

@ -1,10 +1,8 @@
import atexit
import errno
import functools
import inspect
import logging
import os
import queue
import select
import shlex
import shutil
@ -14,10 +12,8 @@
import tempfile
import textwrap
import time
import traceback
from subprocess import Popen, PIPE
import borg.logger
from . import __version__
from .compress import Compressor
from .constants import * # NOQA
@ -25,13 +21,12 @@
from .helpers import bin_to_hex
from .helpers import get_limited_unpacker
from .helpers import replace_placeholders
from .helpers import sysinfo
from .helpers import format_file_size
from .helpers import safe_unlink
from .helpers import prepare_subprocess_env, ignore_sigint
from .helpers import get_socket_filename
from .locking import LockTimeout, NotLocked, NotMyLock, LockFailed
from .logger import create_logger, borg_serve_log_queue
from .logger import create_logger
from .helpers import msgpack
from .repository import Repository
from .version import parse_version, format_version
@ -48,25 +43,6 @@
RATELIMIT_PERIOD = 0.1
def os_write(fd, data):
"""os.write wrapper so we do not lose data for partial writes."""
# TODO: this issue is fixed in cygwin since at least 2.8.0, remove this
# wrapper / workaround when this version is considered ancient.
# This is happening frequently on cygwin due to its small pipe buffer size of only 64kiB
# and also due to its different blocking pipe behaviour compared to Linux/*BSD.
# Neither Linux nor *BSD ever do partial writes on blocking pipes, unless interrupted by a
# signal, in which case serve() would terminate.
amount = remaining = len(data)
while remaining:
count = os.write(fd, data)
remaining -= count
if not remaining:
break
data = data[count:]
time.sleep(count * 1e-09)
return amount
class ConnectionClosed(Error):
"""Connection closed by remote host"""
@ -134,281 +110,6 @@ class ConnectionBrokenWithHint(Error):
# servers still get compatible input.
class RepositoryServer: # pragma: no cover
rpc_methods = (
"__len__",
"check",
"commit",
"delete",
"destroy",
"get",
"list",
"negotiate",
"open",
"close",
"info",
"put",
"rollback",
"save_key",
"load_key",
"break_lock",
"inject_exception",
"get_manifest",
"put_manifest",
)
def __init__(self, restrict_to_paths, restrict_to_repositories, append_only, storage_quota, use_socket):
self.repository = None
self.restrict_to_paths = restrict_to_paths
self.restrict_to_repositories = restrict_to_repositories
# This flag is parsed from the serve command line via Archiver.do_serve,
# i.e. it reflects local system policy and generally ranks higher than
# whatever the client wants, except when initializing a new repository
# (see RepositoryServer.open below).
self.append_only = append_only
self.storage_quota = storage_quota
self.client_version = None # we update this after client sends version information
if use_socket is False:
self.socket_path = None
elif use_socket is True: # --socket
self.socket_path = get_socket_filename()
else: # --socket=/some/path
self.socket_path = use_socket
def filter_args(self, f, kwargs):
"""Remove unknown named parameters from call, because client did (implicitly) say it's ok."""
known = set(inspect.signature(f).parameters)
return {name: kwargs[name] for name in kwargs if name in known}
def send_queued_log(self):
while True:
try:
# lr_dict contents see BorgQueueHandler
lr_dict = borg_serve_log_queue.get_nowait()
except queue.Empty:
break
else:
msg = msgpack.packb({LOG: lr_dict})
os_write(self.stdout_fd, msg)
def serve(self):
def inner_serve():
os.set_blocking(self.stdin_fd, False)
assert not os.get_blocking(self.stdin_fd)
os.set_blocking(self.stdout_fd, True)
assert os.get_blocking(self.stdout_fd)
unpacker = get_limited_unpacker("server")
shutdown_serve = False
while True:
# before processing any new RPCs, send out all pending log output
self.send_queued_log()
if shutdown_serve:
# shutdown wanted! get out of here after sending all log output.
assert self.repository is None
return
# process new RPCs
r, w, es = select.select([self.stdin_fd], [], [], 10)
if r:
data = os.read(self.stdin_fd, BUFSIZE)
if not data:
shutdown_serve = True
continue
unpacker.feed(data)
for unpacked in unpacker:
if isinstance(unpacked, dict):
msgid = unpacked[MSGID]
method = unpacked[MSG]
args = unpacked[ARGS]
else:
if self.repository is not None:
self.repository.close()
raise UnexpectedRPCDataFormatFromClient(__version__)
try:
if method not in self.rpc_methods:
raise InvalidRPCMethod(method)
try:
f = getattr(self, method)
except AttributeError:
f = getattr(self.repository, method)
args = self.filter_args(f, args)
res = f(**args)
except BaseException as e:
ex_short = traceback.format_exception_only(e.__class__, e)
ex_full = traceback.format_exception(*sys.exc_info())
ex_trace = True
if isinstance(e, Error):
ex_short = [e.get_message()]
ex_trace = e.traceback
if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
# These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
# and will be handled just like locally raised exceptions. Suppress the remote traceback
# for these, except ErrorWithTraceback, which should always display a traceback.
pass
else:
logging.debug("\n".join(ex_full))
sys_info = sysinfo()
try:
msg = msgpack.packb(
{
MSGID: msgid,
"exception_class": e.__class__.__name__,
"exception_args": e.args,
"exception_full": ex_full,
"exception_short": ex_short,
"exception_trace": ex_trace,
"sysinfo": sys_info,
}
)
except TypeError:
msg = msgpack.packb(
{
MSGID: msgid,
"exception_class": e.__class__.__name__,
"exception_args": [
x if isinstance(x, (str, bytes, int)) else None for x in e.args
],
"exception_full": ex_full,
"exception_short": ex_short,
"exception_trace": ex_trace,
"sysinfo": sys_info,
}
)
os_write(self.stdout_fd, msg)
else:
os_write(self.stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res}))
if es:
shutdown_serve = True
continue
if self.socket_path: # server for socket:// connections
try:
# remove any left-over socket file
os.unlink(self.socket_path)
except OSError:
if os.path.exists(self.socket_path):
raise
sock_dir = os.path.dirname(self.socket_path)
os.makedirs(sock_dir, exist_ok=True)
if self.socket_path.endswith(".sock"):
pid_file = self.socket_path.replace(".sock", ".pid")
else:
pid_file = self.socket_path + ".pid"
pid = os.getpid()
with open(pid_file, "w") as f:
f.write(str(pid))
atexit.register(functools.partial(os.remove, pid_file))
atexit.register(functools.partial(os.remove, self.socket_path))
sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
sock.bind(self.socket_path) # this creates the socket file in the fs
sock.listen(0) # no backlog
os.chmod(self.socket_path, mode=0o0770) # group members may use the socket, too.
print(f"borg serve: PID {pid}, listening on socket {self.socket_path} ...", file=sys.stderr)
while True:
connection, client_address = sock.accept()
print(f"Accepted a connection on socket {self.socket_path} ...", file=sys.stderr)
self.stdin_fd = connection.makefile("rb").fileno()
self.stdout_fd = connection.makefile("wb").fileno()
inner_serve()
print(f"Finished with connection on socket {self.socket_path} .", file=sys.stderr)
else: # server for one ssh:// connection
self.stdin_fd = sys.stdin.fileno()
self.stdout_fd = sys.stdout.fileno()
inner_serve()
def negotiate(self, client_data):
if isinstance(client_data, dict):
self.client_version = client_data["client_version"]
else:
self.client_version = BORG_VERSION # seems to be newer than current version (no known old format)
# not a known old format, send newest negotiate this version knows
return {"server_version": BORG_VERSION}
def _resolve_path(self, path):
if isinstance(path, bytes):
path = os.fsdecode(path)
if path.startswith("/~/"): # /~/x = path x relative to own home dir
home_dir = os.environ.get("HOME") or os.path.expanduser("~%s" % os.environ.get("USER", ""))
path = os.path.join(home_dir, path[3:])
elif path.startswith("/./"): # /./x = path x relative to cwd
path = path[3:]
return os.path.realpath(path)
def open(
self, path, create=False, lock_wait=None, lock=True, exclusive=None, append_only=False, make_parent_dirs=False
):
logging.debug("Resolving repository path %r", path)
path = self._resolve_path(path)
logging.debug("Resolved repository path to %r", path)
path_with_sep = os.path.join(path, "") # make sure there is a trailing slash (os.sep)
if self.restrict_to_paths:
# if --restrict-to-path P is given, we make sure that we only operate in/below path P.
# for the prefix check, it is important that the compared paths both have trailing slashes,
# so that a path /foobar will NOT be accepted with --restrict-to-path /foo option.
for restrict_to_path in self.restrict_to_paths:
restrict_to_path_with_sep = os.path.join(os.path.realpath(restrict_to_path), "") # trailing slash
if path_with_sep.startswith(restrict_to_path_with_sep):
break
else:
raise PathNotAllowed(path)
if self.restrict_to_repositories:
for restrict_to_repository in self.restrict_to_repositories:
restrict_to_repository_with_sep = os.path.join(os.path.realpath(restrict_to_repository), "")
if restrict_to_repository_with_sep == path_with_sep:
break
else:
raise PathNotAllowed(path)
# "borg init" on "borg serve --append-only" (=self.append_only) does not create an append only repo,
# while "borg init --append-only" (=append_only) does, regardless of the --append-only (self.append_only)
# flag for serve.
append_only = (not create and self.append_only) or append_only
self.repository = Repository(
path,
create,
lock_wait=lock_wait,
lock=lock,
append_only=append_only,
storage_quota=self.storage_quota,
exclusive=exclusive,
make_parent_dirs=make_parent_dirs,
send_log_cb=self.send_queued_log,
)
self.repository.__enter__() # clean exit handled by serve() method
return self.repository.id
def close(self):
if self.repository is not None:
self.repository.__exit__(None, None, None)
self.repository = None
borg.logger.flush_logging()
self.send_queued_log()
def inject_exception(self, kind):
s1 = "test string"
s2 = "test string2"
if kind == "DoesNotExist":
raise Repository.DoesNotExist(s1)
elif kind == "AlreadyExists":
raise Repository.AlreadyExists(s1)
elif kind == "CheckNeeded":
raise Repository.CheckNeeded(s1)
elif kind == "IntegrityError":
raise IntegrityError(s1)
elif kind == "PathNotAllowed":
raise PathNotAllowed("foo")
elif kind == "ObjectNotFound":
raise Repository.ObjectNotFound(s1, s2)
elif kind == "InvalidRPCMethod":
raise InvalidRPCMethod(s1)
elif kind == "divide":
0 // 0
class SleepingBandwidthLimiter:
def __init__(self, limit):
if limit: