2014-06-13 18:07:01 +00:00
|
|
|
import errno
|
2016-11-10 08:56:18 +00:00
|
|
|
import functools
|
|
|
|
import inspect
|
2017-05-17 15:17:15 +00:00
|
|
|
import json
|
2015-12-12 17:25:38 +00:00
|
|
|
import logging
|
2010-11-15 21:18:47 +00:00
|
|
|
import os
|
|
|
|
import select
|
2015-10-05 22:54:00 +00:00
|
|
|
import shlex
|
2016-05-19 23:54:42 +00:00
|
|
|
import shutil
|
2017-05-28 11:16:52 +00:00
|
|
|
import struct
|
2010-11-15 21:18:47 +00:00
|
|
|
import sys
|
2014-03-13 21:29:47 +00:00
|
|
|
import tempfile
|
2017-01-12 14:01:41 +00:00
|
|
|
import textwrap
|
2017-01-14 02:07:11 +00:00
|
|
|
import time
|
2017-05-17 15:17:15 +00:00
|
|
|
import traceback
|
2016-05-30 23:18:03 +00:00
|
|
|
from subprocess import Popen, PIPE
|
2015-03-21 01:17:19 +00:00
|
|
|
|
2016-05-30 23:18:03 +00:00
|
|
|
from . import __version__
|
2019-07-14 09:39:58 +00:00
|
|
|
from .compress import Compressor
|
2017-06-23 03:56:41 +00:00
|
|
|
from .constants import * # NOQA
|
2016-05-30 22:33:13 +00:00
|
|
|
from .helpers import Error, IntegrityError
|
|
|
|
from .helpers import bin_to_hex
|
2017-12-21 05:18:49 +00:00
|
|
|
from .helpers import get_base_dir
|
2017-06-24 16:31:34 +00:00
|
|
|
from .helpers import get_limited_unpacker
|
2017-05-17 15:17:15 +00:00
|
|
|
from .helpers import replace_placeholders
|
|
|
|
from .helpers import sysinfo
|
2016-05-19 23:54:42 +00:00
|
|
|
from .helpers import format_file_size
|
2022-02-15 18:39:58 +00:00
|
|
|
from .helpers import safe_unlink
|
2022-08-06 10:24:50 +00:00
|
|
|
from .helpers import prepare_subprocess_env, ignore_sigint
|
2017-05-17 15:17:15 +00:00
|
|
|
from .logger import create_logger, setup_logging
|
2018-07-01 00:34:48 +00:00
|
|
|
from .helpers import msgpack
|
2017-06-24 16:31:34 +00:00
|
|
|
from .repository import Repository
|
2016-11-10 08:56:18 +00:00
|
|
|
from .version import parse_version, format_version
|
2022-03-16 23:24:49 +00:00
|
|
|
from .checksums import xxh64
|
2021-01-03 16:37:16 +00:00
|
|
|
from .helpers.datastruct import EfficientCollectionQueue
|
2017-01-12 14:01:41 +00:00
|
|
|
|
|
|
|
logger = create_logger(__name__)
|
2010-11-15 21:18:47 +00:00
|
|
|
|
2015-12-03 16:50:37 +00:00
|
|
|
RPC_PROTOCOL_VERSION = 2
|
2016-11-10 08:56:18 +00:00
|
|
|
BORG_VERSION = parse_version(__version__)
|
2022-05-06 01:59:10 +00:00
|
|
|
MSGID, MSG, ARGS, RESULT = "i", "m", "a", "r"
|
2015-12-03 16:50:37 +00:00
|
|
|
|
2016-05-25 22:14:27 +00:00
|
|
|
MAX_INFLIGHT = 100
|
|
|
|
|
2016-09-15 20:13:35 +00:00
|
|
|
RATELIMIT_PERIOD = 0.1
|
|
|
|
|
2011-07-05 19:29:15 +00:00
|
|
|
|
2017-01-14 02:07:11 +00:00
|
|
|
def os_write(fd, data):
|
|
|
|
"""os.write wrapper so we do not lose data for partial writes."""
|
2017-06-08 00:23:57 +00:00
|
|
|
# TODO: this issue is fixed in cygwin since at least 2.8.0, remove this
|
|
|
|
# wrapper / workaround when this version is considered ancient.
|
2017-01-14 02:07:11 +00:00
|
|
|
# This is happening frequently on cygwin due to its small pipe buffer size of only 64kiB
|
|
|
|
# and also due to its different blocking pipe behaviour compared to Linux/*BSD.
|
|
|
|
# Neither Linux nor *BSD ever do partial writes on blocking pipes, unless interrupted by a
|
|
|
|
# signal, in which case serve() would terminate.
|
|
|
|
amount = remaining = len(data)
|
|
|
|
while remaining:
|
|
|
|
count = os.write(fd, data)
|
|
|
|
remaining -= count
|
|
|
|
if not remaining:
|
|
|
|
break
|
|
|
|
data = data[count:]
|
|
|
|
time.sleep(count * 1e-09)
|
|
|
|
return amount
|
|
|
|
|
|
|
|
|
2013-12-15 19:35:29 +00:00
|
|
|
class ConnectionClosed(Error):
|
|
|
|
"""Connection closed by remote host"""
|
2013-06-28 11:31:57 +00:00
|
|
|
|
|
|
|
|
2015-10-31 21:41:08 +00:00
|
|
|
class ConnectionClosedWithHint(ConnectionClosed):
|
|
|
|
"""Connection closed by remote host. {}"""
|
|
|
|
|
|
|
|
|
2014-03-24 20:28:59 +00:00
|
|
|
class PathNotAllowed(Error):
|
2017-06-02 21:38:49 +00:00
|
|
|
"""Repository path not allowed: {}"""
|
2014-03-24 20:28:59 +00:00
|
|
|
|
2015-07-11 16:31:49 +00:00
|
|
|
|
2015-01-11 13:06:59 +00:00
|
|
|
class InvalidRPCMethod(Error):
|
2015-10-31 21:23:32 +00:00
|
|
|
"""RPC method {} is not valid"""
|
2014-03-24 20:28:59 +00:00
|
|
|
|
|
|
|
|
2016-08-22 17:50:53 +00:00
|
|
|
class UnexpectedRPCDataFormatFromClient(Error):
|
|
|
|
"""Borg {}: Got unexpected RPC data format from client."""
|
|
|
|
|
|
|
|
|
|
|
|
class UnexpectedRPCDataFormatFromServer(Error):
|
2017-01-12 14:01:41 +00:00
|
|
|
"""Got unexpected RPC data format from server:\n{}"""
|
|
|
|
|
|
|
|
def __init__(self, data):
|
|
|
|
try:
|
|
|
|
data = data.decode()[:128]
|
|
|
|
except UnicodeDecodeError:
|
|
|
|
data = data[:128]
|
|
|
|
data = ["%02X" % byte for byte in data]
|
|
|
|
data = textwrap.fill(" ".join(data), 16 * 3)
|
|
|
|
super().__init__(data)
|
2016-08-22 17:50:53 +00:00
|
|
|
|
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
# Protocol compatibility:
|
|
|
|
# In general the server is responsible for rejecting too old clients and the client it responsible for rejecting
|
|
|
|
# too old servers. This ensures that the knowledge what is compatible is always held by the newer component.
|
|
|
|
#
|
|
|
|
# The server can do checks for the client version in RepositoryServer.negotiate. If the client_data is 2 then
|
|
|
|
# client is in the version range [0.29.0, 1.0.x] inclusive. For newer clients client_data is a dict which contains
|
|
|
|
# client_version.
|
|
|
|
#
|
|
|
|
# For the client the return of the negotiate method is either 2 if the server is in the version range [0.29.0, 1.0.x]
|
|
|
|
# inclusive, or it is a dict which includes the server version.
|
|
|
|
#
|
2020-07-07 21:01:55 +00:00
|
|
|
# All method calls on the remote repository object must be allowlisted in RepositoryServer.rpc_methods and have api
|
2016-11-10 08:56:18 +00:00
|
|
|
# stubs in RemoteRepository. The @api decorator on these stubs is used to set server version requirements.
|
|
|
|
#
|
|
|
|
# Method parameters are identified only by name and never by position. Unknown parameters are ignored by the server side.
|
|
|
|
# If a new parameter is important and may not be ignored, on the client a parameter specific version requirement needs
|
|
|
|
# to be added.
|
|
|
|
# When parameters are removed, they need to be preserved as defaulted parameters on the client stubs so that older
|
|
|
|
# servers still get compatible input.
|
|
|
|
|
|
|
|
|
|
|
|
compatMap = {
|
|
|
|
"check": ("repair", "save_space"),
|
|
|
|
"commit": ("save_space",),
|
|
|
|
"rollback": (),
|
|
|
|
"destroy": (),
|
|
|
|
"__len__": (),
|
|
|
|
"list": ("limit", "marker"),
|
|
|
|
"put": ("id", "data"),
|
2016-11-10 10:08:45 +00:00
|
|
|
"get": ("id",),
|
2016-11-10 08:56:18 +00:00
|
|
|
"delete": ("id",),
|
|
|
|
"save_key": ("keydata",),
|
|
|
|
"load_key": (),
|
|
|
|
"break_lock": (),
|
|
|
|
"negotiate": ("client_data",),
|
|
|
|
"open": ("path", "create", "lock_wait", "lock", "exclusive", "append_only"),
|
2022-06-30 21:55:51 +00:00
|
|
|
"info": (),
|
2016-11-10 08:56:18 +00:00
|
|
|
"get_free_nonce": (),
|
|
|
|
"commit_nonce_reservation": ("next_unreserved", "start_nonce"),
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2015-08-12 02:28:31 +00:00
|
|
|
class RepositoryServer: # pragma: no cover
|
2015-01-11 13:06:59 +00:00
|
|
|
rpc_methods = (
|
2015-07-11 16:31:49 +00:00
|
|
|
"__len__",
|
|
|
|
"check",
|
|
|
|
"commit",
|
|
|
|
"delete",
|
2015-07-26 15:38:16 +00:00
|
|
|
"destroy",
|
2015-07-11 16:31:49 +00:00
|
|
|
"get",
|
|
|
|
"list",
|
2016-09-23 19:45:01 +00:00
|
|
|
"scan",
|
2015-07-11 16:31:49 +00:00
|
|
|
"negotiate",
|
|
|
|
"open",
|
2022-06-30 21:55:51 +00:00
|
|
|
"info",
|
2015-07-11 16:31:49 +00:00
|
|
|
"put",
|
|
|
|
"rollback",
|
2015-07-14 22:01:07 +00:00
|
|
|
"save_key",
|
|
|
|
"load_key",
|
2015-11-21 19:50:53 +00:00
|
|
|
"break_lock",
|
2016-07-25 18:38:31 +00:00
|
|
|
"get_free_nonce",
|
2016-11-08 22:19:06 +00:00
|
|
|
"commit_nonce_reservation",
|
|
|
|
"inject_exception",
|
2015-07-11 16:31:49 +00:00
|
|
|
)
|
2010-11-15 21:18:47 +00:00
|
|
|
|
2017-06-02 21:38:02 +00:00
|
|
|
def __init__(self, restrict_to_paths, restrict_to_repositories, append_only, storage_quota):
|
2013-06-20 10:44:58 +00:00
|
|
|
self.repository = None
|
2014-03-24 20:28:59 +00:00
|
|
|
self.restrict_to_paths = restrict_to_paths
|
2017-06-02 21:38:02 +00:00
|
|
|
self.restrict_to_repositories = restrict_to_repositories
|
2017-05-12 18:34:45 +00:00
|
|
|
# This flag is parsed from the serve command line via Archiver.do_serve,
|
|
|
|
# i.e. it reflects local system policy and generally ranks higher than
|
|
|
|
# whatever the client wants, except when initializing a new repository
|
|
|
|
# (see RepositoryServer.open below).
|
2016-06-30 15:59:12 +00:00
|
|
|
self.append_only = append_only
|
2017-05-24 22:59:39 +00:00
|
|
|
self.storage_quota = storage_quota
|
2016-11-10 08:56:18 +00:00
|
|
|
self.client_version = parse_version(
|
|
|
|
"1.0.8"
|
|
|
|
) # fallback version if client is too old to send version information
|
|
|
|
|
|
|
|
def positional_to_named(self, method, argv):
|
|
|
|
"""Translate from positional protocol to named protocol."""
|
2017-10-20 14:47:32 +00:00
|
|
|
try:
|
|
|
|
return {name: argv[pos] for pos, name in enumerate(compatMap[method])}
|
|
|
|
except IndexError:
|
|
|
|
if method == "open" and len(argv) == 4:
|
|
|
|
# borg clients < 1.0.7 use open() with 4 args
|
|
|
|
mapping = compatMap[method][:4]
|
|
|
|
else:
|
|
|
|
raise
|
|
|
|
return {name: argv[pos] for pos, name in enumerate(mapping)}
|
2016-11-10 08:56:18 +00:00
|
|
|
|
|
|
|
def filter_args(self, f, kwargs):
|
|
|
|
"""Remove unknown named parameters from call, because client did (implicitly) say it's ok."""
|
|
|
|
known = set(inspect.signature(f).parameters)
|
|
|
|
return {name: kwargs[name] for name in kwargs if name in known}
|
2010-11-15 21:18:47 +00:00
|
|
|
|
|
|
|
def serve(self):
|
2015-03-14 18:45:01 +00:00
|
|
|
stdin_fd = sys.stdin.fileno()
|
|
|
|
stdout_fd = sys.stdout.fileno()
|
2015-12-12 20:24:21 +00:00
|
|
|
stderr_fd = sys.stdout.fileno()
|
2017-07-29 18:37:14 +00:00
|
|
|
os.set_blocking(stdin_fd, False)
|
|
|
|
os.set_blocking(stdout_fd, True)
|
|
|
|
os.set_blocking(stderr_fd, True)
|
2017-02-17 04:00:37 +00:00
|
|
|
unpacker = get_limited_unpacker("server")
|
2010-11-15 21:18:47 +00:00
|
|
|
while True:
|
2015-03-14 18:45:01 +00:00
|
|
|
r, w, es = select.select([stdin_fd], [], [], 10)
|
2010-11-15 21:18:47 +00:00
|
|
|
if r:
|
2015-03-14 18:45:01 +00:00
|
|
|
data = os.read(stdin_fd, BUFSIZE)
|
2010-11-15 21:18:47 +00:00
|
|
|
if not data:
|
2016-08-22 17:48:39 +00:00
|
|
|
if self.repository is not None:
|
|
|
|
self.repository.close()
|
2016-08-22 18:22:02 +00:00
|
|
|
else:
|
2017-01-14 02:07:11 +00:00
|
|
|
os_write(
|
|
|
|
stderr_fd,
|
|
|
|
"Borg {}: Got connection close before repository was opened.\n".format(
|
2016-08-22 18:22:02 +00:00
|
|
|
__version__
|
|
|
|
).encode(),
|
|
|
|
)
|
2010-11-15 21:18:47 +00:00
|
|
|
return
|
|
|
|
unpacker.feed(data)
|
2015-03-09 20:59:10 +00:00
|
|
|
for unpacked in unpacker:
|
2016-11-10 08:56:18 +00:00
|
|
|
if isinstance(unpacked, dict):
|
|
|
|
dictFormat = True
|
2022-05-06 01:59:10 +00:00
|
|
|
msgid = unpacked[MSGID]
|
|
|
|
method = unpacked[MSG]
|
|
|
|
args = unpacked[ARGS]
|
2016-11-10 08:56:18 +00:00
|
|
|
elif isinstance(unpacked, tuple) and len(unpacked) == 4:
|
|
|
|
dictFormat = False
|
2016-11-15 22:18:21 +00:00
|
|
|
# The first field 'type' was always 1 and has always been ignored
|
|
|
|
_, msgid, method, args = unpacked
|
2016-11-10 08:56:18 +00:00
|
|
|
args = self.positional_to_named(method, args)
|
|
|
|
else:
|
2016-08-22 17:48:39 +00:00
|
|
|
if self.repository is not None:
|
|
|
|
self.repository.close()
|
2016-08-22 17:50:53 +00:00
|
|
|
raise UnexpectedRPCDataFormatFromClient(__version__)
|
2010-11-15 21:18:47 +00:00
|
|
|
try:
|
2015-07-11 16:31:49 +00:00
|
|
|
if method not in self.rpc_methods:
|
2015-01-11 13:06:59 +00:00
|
|
|
raise InvalidRPCMethod(method)
|
2010-11-17 21:40:39 +00:00
|
|
|
try:
|
|
|
|
f = getattr(self, method)
|
|
|
|
except AttributeError:
|
2013-06-20 10:44:58 +00:00
|
|
|
f = getattr(self.repository, method)
|
2016-11-10 08:56:18 +00:00
|
|
|
args = self.filter_args(f, args)
|
|
|
|
res = f(**args)
|
2015-03-21 01:17:19 +00:00
|
|
|
except BaseException as e:
|
2016-11-08 22:18:18 +00:00
|
|
|
if dictFormat:
|
|
|
|
ex_short = traceback.format_exception_only(e.__class__, e)
|
|
|
|
ex_full = traceback.format_exception(*sys.exc_info())
|
2017-05-31 16:48:48 +00:00
|
|
|
ex_trace = True
|
2016-08-20 15:23:02 +00:00
|
|
|
if isinstance(e, Error):
|
2017-05-17 14:27:52 +00:00
|
|
|
ex_short = [e.get_message()]
|
2017-05-31 16:48:48 +00:00
|
|
|
ex_trace = e.traceback
|
2016-11-08 22:18:18 +00:00
|
|
|
if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
|
|
|
|
# These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
|
|
|
|
# and will be handled just like locally raised exceptions. Suppress the remote traceback
|
|
|
|
# for these, except ErrorWithTraceback, which should always display a traceback.
|
|
|
|
pass
|
2016-08-20 15:23:02 +00:00
|
|
|
else:
|
2016-11-08 22:18:18 +00:00
|
|
|
logging.debug("\n".join(ex_full))
|
|
|
|
|
|
|
|
try:
|
|
|
|
msg = msgpack.packb(
|
2022-07-06 13:37:27 +00:00
|
|
|
{
|
2016-11-08 22:18:18 +00:00
|
|
|
MSGID: msgid,
|
2022-05-05 17:36:02 +00:00
|
|
|
"exception_class": e.__class__.__name__,
|
|
|
|
"exception_args": e.args,
|
|
|
|
"exception_full": ex_full,
|
|
|
|
"exception_short": ex_short,
|
|
|
|
"exception_trace": ex_trace,
|
|
|
|
"sysinfo": sysinfo(),
|
2022-07-06 13:37:27 +00:00
|
|
|
}
|
2022-05-05 17:36:02 +00:00
|
|
|
)
|
2016-11-08 22:18:18 +00:00
|
|
|
except TypeError:
|
|
|
|
msg = msgpack.packb(
|
2022-07-06 13:37:27 +00:00
|
|
|
{
|
2016-11-08 22:18:18 +00:00
|
|
|
MSGID: msgid,
|
2022-05-05 17:36:02 +00:00
|
|
|
"exception_class": e.__class__.__name__,
|
|
|
|
"exception_args": [
|
|
|
|
x if isinstance(x, (str, bytes, int)) else None for x in e.args
|
|
|
|
],
|
|
|
|
"exception_full": ex_full,
|
|
|
|
"exception_short": ex_short,
|
|
|
|
"exception_trace": ex_trace,
|
|
|
|
"sysinfo": sysinfo(),
|
2022-07-06 13:37:27 +00:00
|
|
|
}
|
2022-05-05 17:36:02 +00:00
|
|
|
)
|
2016-11-08 22:18:18 +00:00
|
|
|
|
2017-01-14 02:07:11 +00:00
|
|
|
os_write(stdout_fd, msg)
|
2016-11-10 08:56:18 +00:00
|
|
|
else:
|
2016-11-08 22:18:18 +00:00
|
|
|
if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
|
|
|
|
# These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
|
|
|
|
# and will be handled just like locally raised exceptions. Suppress the remote traceback
|
|
|
|
# for these, except ErrorWithTraceback, which should always display a traceback.
|
|
|
|
pass
|
|
|
|
else:
|
|
|
|
if isinstance(e, Error):
|
|
|
|
tb_log_level = logging.ERROR if e.traceback else logging.DEBUG
|
|
|
|
msg = e.get_message()
|
|
|
|
else:
|
|
|
|
tb_log_level = logging.ERROR
|
|
|
|
msg = "%s Exception in RPC call" % e.__class__.__name__
|
2022-02-27 18:31:33 +00:00
|
|
|
tb = f"{traceback.format_exc()}\n{sysinfo()}"
|
2016-11-08 22:18:18 +00:00
|
|
|
logging.error(msg)
|
|
|
|
logging.log(tb_log_level, tb)
|
|
|
|
exc = "Remote Exception (see remote log for the traceback)"
|
2017-01-14 02:07:11 +00:00
|
|
|
os_write(stdout_fd, msgpack.packb((1, msgid, e.__class__.__name__, exc)))
|
2010-11-15 21:18:47 +00:00
|
|
|
else:
|
2016-11-10 08:56:18 +00:00
|
|
|
if dictFormat:
|
2017-01-14 02:07:11 +00:00
|
|
|
os_write(stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res}))
|
2016-11-10 08:56:18 +00:00
|
|
|
else:
|
2017-01-14 02:07:11 +00:00
|
|
|
os_write(stdout_fd, msgpack.packb((1, msgid, None, res)))
|
2010-11-15 21:18:47 +00:00
|
|
|
if es:
|
2016-03-22 23:41:15 +00:00
|
|
|
self.repository.close()
|
2010-11-15 21:18:47 +00:00
|
|
|
return
|
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
def negotiate(self, client_data):
|
|
|
|
# old format used in 1.0.x
|
|
|
|
if client_data == RPC_PROTOCOL_VERSION:
|
|
|
|
return RPC_PROTOCOL_VERSION
|
|
|
|
# clients since 1.1.0b3 use a dict as client_data
|
2017-09-21 02:11:59 +00:00
|
|
|
# clients since 1.1.0b6 support json log format from server
|
2016-11-10 08:56:18 +00:00
|
|
|
if isinstance(client_data, dict):
|
2022-05-06 01:59:10 +00:00
|
|
|
self.client_version = client_data["client_version"]
|
2017-09-21 02:11:59 +00:00
|
|
|
level = logging.getLevelName(logging.getLogger("").level)
|
|
|
|
setup_logging(is_serve=True, json=True, level=level)
|
|
|
|
logger.debug("Initialized logging system for JSON-based protocol")
|
2016-11-10 08:56:18 +00:00
|
|
|
else:
|
|
|
|
self.client_version = BORG_VERSION # seems to be newer than current version (no known old format)
|
|
|
|
|
|
|
|
# not a known old format, send newest negotiate this version knows
|
|
|
|
return {"server_version": BORG_VERSION}
|
2011-09-12 19:51:23 +00:00
|
|
|
|
2016-12-17 13:33:40 +00:00
|
|
|
def _resolve_path(self, path):
|
2016-11-10 08:56:18 +00:00
|
|
|
if isinstance(path, bytes):
|
|
|
|
path = os.fsdecode(path)
|
2022-07-15 14:02:45 +00:00
|
|
|
if path.startswith("/~/"): # /~/x = path x relative to own home dir
|
2017-12-21 05:18:49 +00:00
|
|
|
path = os.path.join(get_base_dir(), path[3:])
|
2016-10-12 22:38:04 +00:00
|
|
|
elif path.startswith("/./"): # /./x = path x relative to cwd
|
|
|
|
path = path[3:]
|
2016-12-17 13:33:40 +00:00
|
|
|
return os.path.realpath(path)
|
|
|
|
|
2019-02-04 16:12:11 +00:00
|
|
|
def open(
|
|
|
|
self, path, create=False, lock_wait=None, lock=True, exclusive=None, append_only=False, make_parent_dirs=False
|
|
|
|
):
|
2016-12-17 13:33:40 +00:00
|
|
|
logging.debug("Resolving repository path %r", path)
|
|
|
|
path = self._resolve_path(path)
|
|
|
|
logging.debug("Resolved repository path to %r", path)
|
2017-06-02 21:38:02 +00:00
|
|
|
path_with_sep = os.path.join(path, "") # make sure there is a trailing slash (os.sep)
|
2014-03-24 20:28:59 +00:00
|
|
|
if self.restrict_to_paths:
|
2016-08-02 13:50:21 +00:00
|
|
|
# if --restrict-to-path P is given, we make sure that we only operate in/below path P.
|
2017-06-09 14:49:30 +00:00
|
|
|
# for the prefix check, it is important that the compared paths both have trailing slashes,
|
2016-08-02 13:50:21 +00:00
|
|
|
# so that a path /foobar will NOT be accepted with --restrict-to-path /foo option.
|
2014-03-24 20:28:59 +00:00
|
|
|
for restrict_to_path in self.restrict_to_paths:
|
2016-08-02 13:50:21 +00:00
|
|
|
restrict_to_path_with_sep = os.path.join(os.path.realpath(restrict_to_path), "") # trailing slash
|
|
|
|
if path_with_sep.startswith(restrict_to_path_with_sep):
|
2014-03-24 20:28:59 +00:00
|
|
|
break
|
|
|
|
else:
|
|
|
|
raise PathNotAllowed(path)
|
2017-06-02 21:38:02 +00:00
|
|
|
if self.restrict_to_repositories:
|
|
|
|
for restrict_to_repository in self.restrict_to_repositories:
|
|
|
|
restrict_to_repository_with_sep = os.path.join(os.path.realpath(restrict_to_repository), "")
|
|
|
|
if restrict_to_repository_with_sep == path_with_sep:
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
raise PathNotAllowed(path)
|
2017-05-12 18:34:45 +00:00
|
|
|
# "borg init" on "borg serve --append-only" (=self.append_only) does not create an append only repo,
|
|
|
|
# while "borg init --append-only" (=append_only) does, regardless of the --append-only (self.append_only)
|
|
|
|
# flag for serve.
|
|
|
|
append_only = (not create and self.append_only) or append_only
|
2016-07-23 16:22:07 +00:00
|
|
|
self.repository = Repository(
|
|
|
|
path,
|
|
|
|
create,
|
|
|
|
lock_wait=lock_wait,
|
|
|
|
lock=lock,
|
2017-05-12 18:34:45 +00:00
|
|
|
append_only=append_only,
|
2017-05-24 22:59:39 +00:00
|
|
|
storage_quota=self.storage_quota,
|
2019-02-04 16:12:11 +00:00
|
|
|
exclusive=exclusive,
|
|
|
|
make_parent_dirs=make_parent_dirs,
|
|
|
|
)
|
2016-03-22 23:41:15 +00:00
|
|
|
self.repository.__enter__() # clean exit handled by serve() method
|
2013-06-20 10:44:58 +00:00
|
|
|
return self.repository.id
|
2010-11-17 21:40:39 +00:00
|
|
|
|
2016-11-08 22:19:06 +00:00
|
|
|
def inject_exception(self, kind):
|
|
|
|
s1 = "test string"
|
|
|
|
s2 = "test string2"
|
|
|
|
if kind == "DoesNotExist":
|
|
|
|
raise Repository.DoesNotExist(s1)
|
|
|
|
elif kind == "AlreadyExists":
|
|
|
|
raise Repository.AlreadyExists(s1)
|
|
|
|
elif kind == "CheckNeeded":
|
|
|
|
raise Repository.CheckNeeded(s1)
|
|
|
|
elif kind == "IntegrityError":
|
|
|
|
raise IntegrityError(s1)
|
|
|
|
elif kind == "PathNotAllowed":
|
2017-06-02 21:38:49 +00:00
|
|
|
raise PathNotAllowed("foo")
|
2016-11-08 22:19:06 +00:00
|
|
|
elif kind == "ObjectNotFound":
|
|
|
|
raise Repository.ObjectNotFound(s1, s2)
|
|
|
|
elif kind == "InvalidRPCMethod":
|
|
|
|
raise InvalidRPCMethod(s1)
|
|
|
|
elif kind == "divide":
|
|
|
|
0 // 0
|
|
|
|
|
2010-11-15 21:18:47 +00:00
|
|
|
|
2016-09-15 20:13:35 +00:00
|
|
|
class SleepingBandwidthLimiter:
|
|
|
|
def __init__(self, limit):
|
|
|
|
if limit:
|
|
|
|
self.ratelimit = int(limit * RATELIMIT_PERIOD)
|
|
|
|
self.ratelimit_last = time.monotonic()
|
|
|
|
self.ratelimit_quota = self.ratelimit
|
|
|
|
else:
|
|
|
|
self.ratelimit = None
|
|
|
|
|
|
|
|
def write(self, fd, to_send):
|
|
|
|
if self.ratelimit:
|
|
|
|
now = time.monotonic()
|
|
|
|
if self.ratelimit_last + RATELIMIT_PERIOD <= now:
|
|
|
|
self.ratelimit_quota += self.ratelimit
|
|
|
|
if self.ratelimit_quota > 2 * self.ratelimit:
|
|
|
|
self.ratelimit_quota = 2 * self.ratelimit
|
|
|
|
self.ratelimit_last = now
|
|
|
|
if self.ratelimit_quota == 0:
|
|
|
|
tosleep = self.ratelimit_last + RATELIMIT_PERIOD - now
|
|
|
|
time.sleep(tosleep)
|
|
|
|
self.ratelimit_quota += self.ratelimit
|
|
|
|
self.ratelimit_last = time.monotonic()
|
|
|
|
if len(to_send) > self.ratelimit_quota:
|
|
|
|
to_send = to_send[: self.ratelimit_quota]
|
|
|
|
written = os.write(fd, to_send)
|
|
|
|
if self.ratelimit:
|
|
|
|
self.ratelimit_quota -= written
|
|
|
|
return written
|
|
|
|
|
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
def api(*, since, **kwargs_decorator):
|
|
|
|
"""Check version requirements and use self.call to do the remote method call.
|
|
|
|
|
2019-11-03 13:09:33 +00:00
|
|
|
<since> specifies the version in which borg introduced this method.
|
|
|
|
Calling this method when connected to an older version will fail without transmitting anything to the server.
|
2016-11-10 08:56:18 +00:00
|
|
|
|
2019-11-03 13:09:33 +00:00
|
|
|
Further kwargs can be used to encode version specific restrictions:
|
2016-11-10 08:56:18 +00:00
|
|
|
|
2019-11-03 13:09:33 +00:00
|
|
|
<previously> is the value resulting in the behaviour before introducing the new parameter.
|
|
|
|
If a previous hardcoded behaviour is parameterized in a version, this allows calls that use the previously
|
|
|
|
hardcoded behaviour to pass through and generates an error if another behaviour is requested by the client.
|
|
|
|
E.g. when 'append_only' was introduced in 1.0.7 the previous behaviour was what now is append_only=False.
|
2016-11-10 08:56:18 +00:00
|
|
|
Thus @api(..., append_only={'since': parse_version('1.0.7'), 'previously': False}) allows calls
|
|
|
|
with append_only=False for all version but rejects calls using append_only=True on versions older than 1.0.7.
|
2019-11-03 13:09:33 +00:00
|
|
|
|
|
|
|
<dontcare> is a flag to set the behaviour if an old version is called the new way.
|
|
|
|
If set to True, the method is called without the (not yet supported) parameter (this should be done if that is the
|
|
|
|
more desirable behaviour). If False, an exception is generated.
|
|
|
|
E.g. before 'threshold' was introduced in 1.2.0a8, a hardcoded threshold of 0.1 was used in commit().
|
2016-11-10 08:56:18 +00:00
|
|
|
"""
|
2022-07-06 13:37:27 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
def decorator(f):
|
|
|
|
@functools.wraps(f)
|
|
|
|
def do_rpc(self, *args, **kwargs):
|
|
|
|
sig = inspect.signature(f)
|
|
|
|
bound_args = sig.bind(self, *args, **kwargs)
|
2017-02-22 03:48:42 +00:00
|
|
|
named = {} # Arguments for the remote process
|
|
|
|
extra = {} # Arguments for the local process
|
2016-11-10 08:56:18 +00:00
|
|
|
for name, param in sig.parameters.items():
|
|
|
|
if name == "self":
|
|
|
|
continue
|
|
|
|
if name in bound_args.arguments:
|
2017-02-22 03:48:42 +00:00
|
|
|
if name == "wait":
|
|
|
|
extra[name] = bound_args.arguments[name]
|
|
|
|
else:
|
|
|
|
named[name] = bound_args.arguments[name]
|
2016-11-10 08:56:18 +00:00
|
|
|
else:
|
|
|
|
if param.default is not param.empty:
|
|
|
|
named[name] = param.default
|
|
|
|
|
|
|
|
if self.server_version < since:
|
|
|
|
raise self.RPCServerOutdated(f.__name__, format_version(since))
|
|
|
|
|
|
|
|
for name, restriction in kwargs_decorator.items():
|
|
|
|
if restriction["since"] <= self.server_version:
|
|
|
|
continue
|
|
|
|
if "previously" in restriction and named[name] == restriction["previously"]:
|
|
|
|
continue
|
2018-07-11 03:19:08 +00:00
|
|
|
if restriction.get("dontcare", False):
|
|
|
|
continue
|
2016-11-10 08:56:18 +00:00
|
|
|
|
2022-02-27 18:31:33 +00:00
|
|
|
raise self.RPCServerOutdated(
|
|
|
|
f"{f.__name__} {name}={named[name]!s}", format_version(restriction["since"])
|
2016-11-10 08:56:18 +00:00
|
|
|
)
|
|
|
|
|
2017-02-22 03:48:42 +00:00
|
|
|
return self.call(f.__name__, named, **extra)
|
2022-07-06 13:37:27 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
return do_rpc
|
2022-07-06 13:37:27 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
return decorator
|
|
|
|
|
|
|
|
|
2015-03-17 22:03:36 +00:00
|
|
|
class RemoteRepository:
|
2022-07-15 11:26:35 +00:00
|
|
|
extra_test_args = [] # type: ignore
|
2010-11-15 21:18:47 +00:00
|
|
|
|
|
|
|
class RPCError(Exception):
|
2016-11-08 22:18:18 +00:00
|
|
|
def __init__(self, unpacked):
|
2022-05-06 01:59:10 +00:00
|
|
|
# for borg < 1.1: unpacked only has 'exception_class' as key
|
|
|
|
# for borg 1.1+: unpacked has keys: 'exception_args', 'exception_full', 'exception_short', 'sysinfo'
|
2016-11-08 22:18:18 +00:00
|
|
|
self.unpacked = unpacked
|
|
|
|
|
|
|
|
def get_message(self):
|
2022-05-06 01:59:10 +00:00
|
|
|
if "exception_short" in self.unpacked:
|
|
|
|
return "\n".join(self.unpacked["exception_short"])
|
2016-11-08 22:18:18 +00:00
|
|
|
else:
|
|
|
|
return self.exception_class
|
|
|
|
|
2017-05-31 16:48:48 +00:00
|
|
|
@property
|
|
|
|
def traceback(self):
|
2022-05-06 01:59:10 +00:00
|
|
|
return self.unpacked.get("exception_trace", True)
|
2017-05-31 16:48:48 +00:00
|
|
|
|
2016-11-08 22:18:18 +00:00
|
|
|
@property
|
|
|
|
def exception_class(self):
|
2022-05-06 01:59:10 +00:00
|
|
|
return self.unpacked["exception_class"]
|
2016-11-08 22:18:18 +00:00
|
|
|
|
|
|
|
@property
|
|
|
|
def exception_full(self):
|
2022-05-06 01:59:10 +00:00
|
|
|
if "exception_full" in self.unpacked:
|
|
|
|
return "\n".join(self.unpacked["exception_full"])
|
2016-11-08 22:18:18 +00:00
|
|
|
else:
|
|
|
|
return self.get_message() + "\nRemote Exception (see remote log for the traceback)"
|
|
|
|
|
|
|
|
@property
|
|
|
|
def sysinfo(self):
|
2022-05-06 01:59:10 +00:00
|
|
|
if "sysinfo" in self.unpacked:
|
|
|
|
return self.unpacked["sysinfo"]
|
2016-11-08 22:18:18 +00:00
|
|
|
else:
|
|
|
|
return ""
|
2010-11-15 21:18:47 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
class RPCServerOutdated(Error):
|
|
|
|
"""Borg server is too old for {}. Required version {}"""
|
|
|
|
|
|
|
|
@property
|
|
|
|
def method(self):
|
|
|
|
return self.args[0]
|
|
|
|
|
|
|
|
@property
|
|
|
|
def required_version(self):
|
|
|
|
return self.args[1]
|
|
|
|
|
2016-11-10 11:12:32 +00:00
|
|
|
# If compatibility with 1.0.x is not longer needed, replace all checks of this with True and simplify the code
|
|
|
|
dictFormat = False # outside of __init__ for testing of legacy free protocol
|
|
|
|
|
2019-02-04 16:12:11 +00:00
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
location,
|
|
|
|
create=False,
|
|
|
|
exclusive=False,
|
|
|
|
lock_wait=None,
|
|
|
|
lock=True,
|
|
|
|
append_only=False,
|
|
|
|
make_parent_dirs=False,
|
|
|
|
args=None,
|
|
|
|
):
|
2015-12-19 13:30:05 +00:00
|
|
|
self.location = self._location = location
|
2014-01-22 19:58:48 +00:00
|
|
|
self.preload_ids = []
|
|
|
|
self.msgid = 0
|
2017-02-23 15:50:52 +00:00
|
|
|
self.rx_bytes = 0
|
|
|
|
self.tx_bytes = 0
|
2021-01-03 16:37:16 +00:00
|
|
|
self.to_send = EfficientCollectionQueue(1024 * 1024, bytes)
|
2017-09-24 23:15:13 +00:00
|
|
|
self.stderr_received = b"" # incomplete stderr line bytes received (no \n yet)
|
2016-08-05 19:51:01 +00:00
|
|
|
self.chunkid_to_msgids = {}
|
2014-01-22 19:58:48 +00:00
|
|
|
self.ignore_responses = set()
|
|
|
|
self.responses = {}
|
2017-03-05 04:19:32 +00:00
|
|
|
self.async_responses = {}
|
2017-03-21 22:44:47 +00:00
|
|
|
self.shutdown_time = None
|
2021-04-16 13:48:10 +00:00
|
|
|
self.ratelimit = SleepingBandwidthLimiter(args.upload_ratelimit * 1024 if args and args.upload_ratelimit else 0)
|
|
|
|
self.upload_buffer_size_limit = args.upload_buffer * 1024 * 1024 if args and args.upload_buffer else 0
|
2017-02-17 04:00:37 +00:00
|
|
|
self.unpacker = get_limited_unpacker("client")
|
2016-11-10 08:56:18 +00:00
|
|
|
self.server_version = parse_version(
|
|
|
|
"1.0.8"
|
|
|
|
) # fallback version if server is too old to send version information
|
2014-01-22 19:58:48 +00:00
|
|
|
self.p = None
|
2019-01-31 12:54:17 +00:00
|
|
|
self._args = args
|
2015-12-12 14:31:43 +00:00
|
|
|
testing = location.host == "__testsuite__"
|
2017-09-23 17:04:46 +00:00
|
|
|
# when testing, we invoke and talk to a borg process directly (no ssh).
|
|
|
|
# when not testing, we invoke the system-installed ssh binary to talk to a remote borg.
|
|
|
|
env = prepare_subprocess_env(system=not testing)
|
2015-12-12 14:31:43 +00:00
|
|
|
borg_cmd = self.borg_cmd(args, testing)
|
|
|
|
if not testing:
|
|
|
|
borg_cmd = self.ssh_cmd(location) + borg_cmd
|
2017-01-12 14:01:41 +00:00
|
|
|
logger.debug("SSH command line: %s", borg_cmd)
|
2022-08-06 10:24:50 +00:00
|
|
|
# we do not want the ssh getting killed by Ctrl-C/SIGINT because it is needed for clean shutdown of borg.
|
|
|
|
# borg's SIGINT handler tries to write a checkpoint and requires the remote repo connection.
|
|
|
|
self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env, preexec_fn=ignore_sigint)
|
2012-11-27 23:03:35 +00:00
|
|
|
self.stdin_fd = self.p.stdin.fileno()
|
2012-10-17 09:40:23 +00:00
|
|
|
self.stdout_fd = self.p.stdout.fileno()
|
2015-12-12 20:24:21 +00:00
|
|
|
self.stderr_fd = self.p.stderr.fileno()
|
2017-07-29 18:37:14 +00:00
|
|
|
os.set_blocking(self.stdin_fd, False)
|
|
|
|
os.set_blocking(self.stdout_fd, False)
|
|
|
|
os.set_blocking(self.stderr_fd, False)
|
2015-12-12 20:24:21 +00:00
|
|
|
self.r_fds = [self.stdout_fd, self.stderr_fd]
|
|
|
|
self.x_fds = [self.stdin_fd, self.stdout_fd, self.stderr_fd]
|
2012-11-27 23:03:35 +00:00
|
|
|
|
2015-06-18 21:18:05 +00:00
|
|
|
try:
|
2016-07-30 17:42:25 +00:00
|
|
|
try:
|
2017-05-17 18:49:52 +00:00
|
|
|
version = self.call("negotiate", {"client_data": {"client_version": BORG_VERSION}})
|
2016-07-30 17:42:25 +00:00
|
|
|
except ConnectionClosed:
|
|
|
|
raise ConnectionClosedWithHint("Is borg working on the server?") from None
|
2016-11-10 08:56:18 +00:00
|
|
|
if version == RPC_PROTOCOL_VERSION:
|
|
|
|
self.dictFormat = False
|
2022-05-06 01:59:10 +00:00
|
|
|
elif isinstance(version, dict) and "server_version" in version:
|
2016-11-10 08:56:18 +00:00
|
|
|
self.dictFormat = True
|
2022-05-06 01:59:10 +00:00
|
|
|
self.server_version = version["server_version"]
|
2016-11-10 08:56:18 +00:00
|
|
|
else:
|
|
|
|
raise Exception("Server insisted on using unsupported protocol version %s" % version)
|
|
|
|
|
2016-07-31 19:19:30 +00:00
|
|
|
def do_open():
|
|
|
|
self.id = self.open(
|
|
|
|
path=self.location.path,
|
|
|
|
create=create,
|
|
|
|
lock_wait=lock_wait,
|
2019-02-04 16:12:11 +00:00
|
|
|
lock=lock,
|
|
|
|
exclusive=exclusive,
|
|
|
|
append_only=append_only,
|
|
|
|
make_parent_dirs=make_parent_dirs,
|
|
|
|
)
|
2022-06-30 21:55:51 +00:00
|
|
|
info = self.info()
|
|
|
|
self.version = info["version"]
|
|
|
|
self.append_only = info["append_only"]
|
2016-07-31 19:19:30 +00:00
|
|
|
|
|
|
|
if self.dictFormat:
|
|
|
|
do_open()
|
|
|
|
else:
|
|
|
|
# Ugly detection of versions prior to 1.0.7: If open throws it has to be 1.0.6 or lower
|
|
|
|
try:
|
|
|
|
do_open()
|
|
|
|
except self.RPCError as err:
|
2016-11-08 22:18:18 +00:00
|
|
|
if err.exception_class != "TypeError":
|
2016-07-31 19:19:30 +00:00
|
|
|
raise
|
|
|
|
msg = """\
|
2016-08-09 15:35:27 +00:00
|
|
|
Please note:
|
|
|
|
If you see a TypeError complaining about the number of positional arguments
|
|
|
|
given to open(), you can ignore it if it comes from a borg version < 1.0.7.
|
|
|
|
This TypeError is a cosmetic side effect of the compatibility code borg
|
|
|
|
clients >= 1.0.7 have to support older borg servers.
|
|
|
|
This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
|
|
|
"""
|
2016-07-31 19:19:30 +00:00
|
|
|
# emit this msg in the same way as the 'Remote: ...' lines that show the remote TypeError
|
|
|
|
sys.stderr.write(msg)
|
|
|
|
self.server_version = parse_version("1.0.6")
|
2017-11-02 15:03:55 +00:00
|
|
|
compatMap["open"] = ("path", "create", "lock_wait", "lock")
|
2016-07-31 19:19:30 +00:00
|
|
|
# try again with corrected version and compatMap
|
|
|
|
do_open()
|
2016-04-03 19:44:29 +00:00
|
|
|
except Exception:
|
|
|
|
self.close()
|
|
|
|
raise
|
2012-10-17 09:40:23 +00:00
|
|
|
|
|
|
|
def __del__(self):
|
2016-08-09 21:26:56 +00:00
|
|
|
if len(self.responses):
|
2016-11-10 08:41:33 +00:00
|
|
|
logging.debug("still %d cached responses left in RemoteRepository" % (len(self.responses),))
|
2016-03-22 23:41:15 +00:00
|
|
|
if self.p:
|
|
|
|
self.close()
|
2016-11-10 08:41:33 +00:00
|
|
|
assert False, "cleanup happened in Repository.__del__"
|
2012-11-30 20:47:35 +00:00
|
|
|
|
2015-07-14 22:01:07 +00:00
|
|
|
def __repr__(self):
|
2022-02-27 18:31:33 +00:00
|
|
|
return f"<{self.__class__.__name__} {self.location.canonical_path()}>"
|
2015-07-14 22:01:07 +00:00
|
|
|
|
2016-03-22 23:41:15 +00:00
|
|
|
def __enter__(self):
|
|
|
|
return self
|
|
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
2016-07-03 00:58:17 +00:00
|
|
|
try:
|
|
|
|
if exc_type is not None:
|
2017-03-21 22:44:47 +00:00
|
|
|
self.shutdown_time = time.monotonic() + 30
|
2016-07-03 00:58:17 +00:00
|
|
|
self.rollback()
|
|
|
|
finally:
|
|
|
|
# in any case, we want to cleanly close the repo, even if the
|
|
|
|
# rollback can not succeed (e.g. because the connection was
|
|
|
|
# already closed) and raised another exception:
|
2017-05-27 19:50:28 +00:00
|
|
|
logger.debug(
|
|
|
|
"RemoteRepository: %s bytes sent, %s bytes received, %d messages sent",
|
|
|
|
format_file_size(self.tx_bytes),
|
|
|
|
format_file_size(self.rx_bytes),
|
|
|
|
self.msgid,
|
|
|
|
)
|
2016-07-03 00:58:17 +00:00
|
|
|
self.close()
|
2016-03-22 23:41:15 +00:00
|
|
|
|
2016-04-23 20:42:56 +00:00
|
|
|
@property
|
|
|
|
def id_str(self):
|
|
|
|
return bin_to_hex(self.id)
|
|
|
|
|
2015-12-12 14:31:43 +00:00
|
|
|
def borg_cmd(self, args, testing):
|
|
|
|
"""return a borg serve command line"""
|
2016-11-10 08:41:33 +00:00
|
|
|
# give some args/options to 'borg serve' process as they were given to us
|
2015-12-12 14:31:43 +00:00
|
|
|
opts = []
|
|
|
|
if args is not None:
|
2015-12-12 17:25:38 +00:00
|
|
|
root_logger = logging.getLogger()
|
|
|
|
if root_logger.isEnabledFor(logging.DEBUG):
|
2015-12-12 14:31:43 +00:00
|
|
|
opts.append("--debug")
|
2015-12-12 17:25:38 +00:00
|
|
|
elif root_logger.isEnabledFor(logging.INFO):
|
2015-12-12 14:31:43 +00:00
|
|
|
opts.append("--info")
|
2015-12-12 17:25:38 +00:00
|
|
|
elif root_logger.isEnabledFor(logging.WARNING):
|
|
|
|
pass # warning is default
|
2016-04-03 18:17:09 +00:00
|
|
|
elif root_logger.isEnabledFor(logging.ERROR):
|
|
|
|
opts.append("--error")
|
|
|
|
elif root_logger.isEnabledFor(logging.CRITICAL):
|
|
|
|
opts.append("--critical")
|
2015-12-12 14:31:43 +00:00
|
|
|
else:
|
|
|
|
raise ValueError("log level missing, fix this code")
|
2017-05-17 15:43:48 +00:00
|
|
|
|
|
|
|
# Tell the remote server about debug topics it may need to consider.
|
|
|
|
# Note that debug topics are usable for "spew" or "trace" logs which would
|
|
|
|
# be too plentiful to transfer for normal use, so the server doesn't send
|
|
|
|
# them unless explicitly enabled.
|
|
|
|
#
|
|
|
|
# Needless to say, if you do --debug-topic=repository.compaction, for example,
|
|
|
|
# with a 1.0.x server it won't work, because the server does not recognize the
|
|
|
|
# option.
|
|
|
|
#
|
|
|
|
# This is not considered a problem, since this is a debugging feature that
|
|
|
|
# should not be used for regular use.
|
|
|
|
for topic in args.debug_topics:
|
|
|
|
if "." not in topic:
|
|
|
|
topic = "borg.debug." + topic
|
|
|
|
if "repository" in topic:
|
|
|
|
opts.append("--debug-topic=%s" % topic)
|
2017-05-24 22:59:39 +00:00
|
|
|
|
|
|
|
if "storage_quota" in args and args.storage_quota:
|
|
|
|
opts.append("--storage-quota=%s" % args.storage_quota)
|
2016-10-04 12:26:57 +00:00
|
|
|
env_vars = []
|
2015-12-12 14:31:43 +00:00
|
|
|
if testing:
|
2022-07-07 23:12:11 +00:00
|
|
|
return env_vars + [sys.executable, "-m", "borg", "serve"] + opts + self.extra_test_args
|
2015-12-12 14:31:43 +00:00
|
|
|
else: # pragma: no cover
|
2016-07-04 13:06:20 +00:00
|
|
|
remote_path = args.remote_path or os.environ.get("BORG_REMOTE_PATH", "borg")
|
2016-07-28 07:30:46 +00:00
|
|
|
remote_path = replace_placeholders(remote_path)
|
2016-10-04 12:26:57 +00:00
|
|
|
return env_vars + [remote_path, "serve"] + opts
|
2015-10-05 22:51:20 +00:00
|
|
|
|
|
|
|
def ssh_cmd(self, location):
|
2015-12-12 14:31:43 +00:00
|
|
|
"""return a ssh command line that can be prefixed to a borg command line"""
|
2019-01-31 12:54:17 +00:00
|
|
|
rsh = self._args.rsh or os.environ.get("BORG_RSH", "ssh")
|
|
|
|
args = shlex.split(rsh)
|
2015-10-05 22:51:20 +00:00
|
|
|
if location.port:
|
|
|
|
args += ["-p", str(location.port)]
|
|
|
|
if location.user:
|
2022-02-27 18:31:33 +00:00
|
|
|
args.append(f"{location.user}@{location.host}")
|
2015-10-05 22:51:20 +00:00
|
|
|
else:
|
|
|
|
args.append("%s" % location.host)
|
|
|
|
return args
|
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
def named_to_positional(self, method, kwargs):
|
|
|
|
return [kwargs[name] for name in compatMap[method]]
|
|
|
|
|
|
|
|
def call(self, cmd, args, **kw):
|
2014-01-22 19:58:48 +00:00
|
|
|
for resp in self.call_many(cmd, [args], **kw):
|
|
|
|
return resp
|
|
|
|
|
2017-03-05 04:19:32 +00:00
|
|
|
def call_many(self, cmd, calls, wait=True, is_preloaded=False, async_wait=True):
|
|
|
|
if not calls and cmd != "async_responses":
|
2014-01-30 21:16:21 +00:00
|
|
|
return
|
2015-03-17 22:47:21 +00:00
|
|
|
|
2021-01-03 16:37:16 +00:00
|
|
|
def send_buffer():
|
|
|
|
if self.to_send:
|
|
|
|
try:
|
|
|
|
written = self.ratelimit.write(self.stdin_fd, self.to_send.peek_front())
|
|
|
|
self.tx_bytes += written
|
|
|
|
self.to_send.pop_front(written)
|
|
|
|
except OSError as e:
|
|
|
|
# io.write might raise EAGAIN even though select indicates
|
|
|
|
# that the fd should be writable.
|
|
|
|
# EWOULDBLOCK is added for defensive programming sake.
|
|
|
|
if e.errno not in [errno.EAGAIN, errno.EWOULDBLOCK]:
|
|
|
|
raise
|
|
|
|
|
2016-08-05 19:51:01 +00:00
|
|
|
def pop_preload_msgid(chunkid):
|
|
|
|
msgid = self.chunkid_to_msgids[chunkid].pop(0)
|
|
|
|
if not self.chunkid_to_msgids[chunkid]:
|
|
|
|
del self.chunkid_to_msgids[chunkid]
|
2014-01-22 19:58:48 +00:00
|
|
|
return msgid
|
|
|
|
|
2016-11-08 22:18:18 +00:00
|
|
|
def handle_error(unpacked):
|
2022-05-06 01:59:10 +00:00
|
|
|
error = unpacked["exception_class"]
|
|
|
|
old_server = "exception_args" not in unpacked
|
|
|
|
args = unpacked.get("exception_args")
|
2016-11-08 22:18:18 +00:00
|
|
|
|
2016-08-15 13:52:19 +00:00
|
|
|
if error == "DoesNotExist":
|
2022-01-30 02:46:11 +00:00
|
|
|
raise Repository.DoesNotExist(self.location.processed)
|
2016-08-15 13:52:19 +00:00
|
|
|
elif error == "AlreadyExists":
|
2022-01-30 02:46:11 +00:00
|
|
|
raise Repository.AlreadyExists(self.location.processed)
|
2016-08-15 13:52:19 +00:00
|
|
|
elif error == "CheckNeeded":
|
2022-01-30 02:46:11 +00:00
|
|
|
raise Repository.CheckNeeded(self.location.processed)
|
2016-08-15 13:52:19 +00:00
|
|
|
elif error == "IntegrityError":
|
2016-11-08 22:18:18 +00:00
|
|
|
if old_server:
|
|
|
|
raise IntegrityError("(not available)")
|
|
|
|
else:
|
2022-05-06 01:59:10 +00:00
|
|
|
raise IntegrityError(args[0])
|
2016-08-15 13:52:19 +00:00
|
|
|
elif error == "PathNotAllowed":
|
2017-06-02 21:38:49 +00:00
|
|
|
if old_server:
|
|
|
|
raise PathNotAllowed("(unknown)")
|
|
|
|
else:
|
2022-05-06 01:59:10 +00:00
|
|
|
raise PathNotAllowed(args[0])
|
2019-02-04 16:12:11 +00:00
|
|
|
elif error == "ParentPathDoesNotExist":
|
2022-05-06 01:59:10 +00:00
|
|
|
raise Repository.ParentPathDoesNotExist(args[0])
|
2016-08-15 13:52:19 +00:00
|
|
|
elif error == "ObjectNotFound":
|
2016-11-08 22:18:18 +00:00
|
|
|
if old_server:
|
2022-01-30 02:46:11 +00:00
|
|
|
raise Repository.ObjectNotFound("(not available)", self.location.processed)
|
2016-11-08 22:18:18 +00:00
|
|
|
else:
|
2022-05-06 01:59:10 +00:00
|
|
|
raise Repository.ObjectNotFound(args[0], self.location.processed)
|
2016-08-15 13:52:19 +00:00
|
|
|
elif error == "InvalidRPCMethod":
|
2016-11-08 22:18:18 +00:00
|
|
|
if old_server:
|
|
|
|
raise InvalidRPCMethod("(not available)")
|
|
|
|
else:
|
2022-05-06 01:59:10 +00:00
|
|
|
raise InvalidRPCMethod(args[0])
|
2016-06-27 20:44:41 +00:00
|
|
|
else:
|
2016-11-08 22:18:18 +00:00
|
|
|
raise self.RPCError(unpacked)
|
2016-06-27 20:44:41 +00:00
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
calls = list(calls)
|
|
|
|
waiting_for = []
|
2021-01-03 16:37:16 +00:00
|
|
|
maximum_to_send = 0 if wait else self.upload_buffer_size_limit
|
|
|
|
send_buffer() # Try to send data, as some cases (async_response) will never try to send data otherwise.
|
2014-01-22 19:58:48 +00:00
|
|
|
while wait or calls:
|
2017-03-21 22:44:47 +00:00
|
|
|
if self.shutdown_time and time.monotonic() > self.shutdown_time:
|
|
|
|
# we are shutting this RemoteRepository down already, make sure we do not waste
|
|
|
|
# a lot of time in case a lot of async stuff is coming in or remote is gone or slow.
|
|
|
|
logger.debug(
|
|
|
|
"shutdown_time reached, shutting down with %d waiting_for and %d async_responses.",
|
|
|
|
len(waiting_for),
|
|
|
|
len(self.async_responses),
|
|
|
|
)
|
|
|
|
return
|
2014-01-22 19:58:48 +00:00
|
|
|
while waiting_for:
|
|
|
|
try:
|
2016-11-10 08:56:18 +00:00
|
|
|
unpacked = self.responses.pop(waiting_for[0])
|
2014-01-22 19:58:48 +00:00
|
|
|
waiting_for.pop(0)
|
2022-05-06 01:59:10 +00:00
|
|
|
if "exception_class" in unpacked:
|
2016-11-08 22:18:18 +00:00
|
|
|
handle_error(unpacked)
|
2014-01-22 19:58:48 +00:00
|
|
|
else:
|
2022-05-06 01:59:10 +00:00
|
|
|
yield unpacked[RESULT]
|
2014-01-22 19:58:48 +00:00
|
|
|
if not waiting_for and not calls:
|
|
|
|
return
|
|
|
|
except KeyError:
|
|
|
|
break
|
2017-03-05 04:19:32 +00:00
|
|
|
if cmd == "async_responses":
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
msgid, unpacked = self.async_responses.popitem()
|
|
|
|
except KeyError:
|
|
|
|
# there is nothing left what we already have received
|
|
|
|
if async_wait and self.ignore_responses:
|
|
|
|
# but do not return if we shall wait and there is something left to wait for:
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
return
|
|
|
|
else:
|
2022-05-06 01:59:10 +00:00
|
|
|
if "exception_class" in unpacked:
|
2017-03-05 04:19:32 +00:00
|
|
|
handle_error(unpacked)
|
|
|
|
else:
|
2022-05-06 01:59:10 +00:00
|
|
|
yield unpacked[RESULT]
|
2016-05-25 22:14:27 +00:00
|
|
|
if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
|
|
|
|
w_fds = [self.stdin_fd]
|
|
|
|
else:
|
|
|
|
w_fds = []
|
2012-12-06 21:58:57 +00:00
|
|
|
r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1)
|
2012-11-30 20:47:35 +00:00
|
|
|
if x:
|
2015-07-15 09:14:53 +00:00
|
|
|
raise Exception("FD exception occurred")
|
2015-12-12 20:24:21 +00:00
|
|
|
for fd in r:
|
|
|
|
if fd is self.stdout_fd:
|
|
|
|
data = os.read(fd, BUFSIZE)
|
|
|
|
if not data:
|
|
|
|
raise ConnectionClosed()
|
2017-02-23 15:50:52 +00:00
|
|
|
self.rx_bytes += len(data)
|
2015-12-12 20:24:21 +00:00
|
|
|
self.unpacker.feed(data)
|
|
|
|
for unpacked in self.unpacker:
|
2016-11-10 08:56:18 +00:00
|
|
|
if isinstance(unpacked, dict):
|
2022-05-06 01:59:10 +00:00
|
|
|
msgid = unpacked[MSGID]
|
2016-11-10 08:56:18 +00:00
|
|
|
elif isinstance(unpacked, tuple) and len(unpacked) == 4:
|
2016-11-15 22:18:21 +00:00
|
|
|
# The first field 'type' was always 1 and has always been ignored
|
|
|
|
_, msgid, error, res = unpacked
|
2016-11-10 08:56:18 +00:00
|
|
|
if error:
|
2016-11-08 22:18:18 +00:00
|
|
|
# ignore res, because it is only a fixed string anyway.
|
2022-05-06 01:59:10 +00:00
|
|
|
unpacked = {MSGID: msgid, "exception_class": error}
|
2016-11-10 08:56:18 +00:00
|
|
|
else:
|
2022-05-06 01:59:10 +00:00
|
|
|
unpacked = {MSGID: msgid, RESULT: res}
|
2016-11-10 08:56:18 +00:00
|
|
|
else:
|
2017-01-12 14:01:41 +00:00
|
|
|
raise UnexpectedRPCDataFormatFromServer(data)
|
2015-12-12 20:24:21 +00:00
|
|
|
if msgid in self.ignore_responses:
|
|
|
|
self.ignore_responses.remove(msgid)
|
2017-03-05 04:19:32 +00:00
|
|
|
# async methods never return values, but may raise exceptions.
|
2022-05-06 01:59:10 +00:00
|
|
|
if "exception_class" in unpacked:
|
2017-03-05 04:19:32 +00:00
|
|
|
self.async_responses[msgid] = unpacked
|
|
|
|
else:
|
|
|
|
# we currently do not have async result values except "None",
|
|
|
|
# so we do not add them into async_responses.
|
2022-05-06 01:59:10 +00:00
|
|
|
if unpacked[RESULT] is not None:
|
2017-03-05 04:19:32 +00:00
|
|
|
self.async_responses[msgid] = unpacked
|
2015-12-12 20:24:21 +00:00
|
|
|
else:
|
2016-11-10 08:56:18 +00:00
|
|
|
self.responses[msgid] = unpacked
|
2015-12-12 20:24:21 +00:00
|
|
|
elif fd is self.stderr_fd:
|
|
|
|
data = os.read(fd, 32768)
|
|
|
|
if not data:
|
|
|
|
raise ConnectionClosed()
|
2017-02-23 15:50:52 +00:00
|
|
|
self.rx_bytes += len(data)
|
2017-09-24 23:15:13 +00:00
|
|
|
# deal with incomplete lines (may appear due to block buffering)
|
|
|
|
if self.stderr_received:
|
|
|
|
data = self.stderr_received + data
|
|
|
|
self.stderr_received = b""
|
|
|
|
lines = data.splitlines(keepends=True)
|
|
|
|
if lines and not lines[-1].endswith((b"\r", b"\n")):
|
|
|
|
self.stderr_received = lines.pop()
|
|
|
|
# now we have complete lines in <lines> and any partial line in self.stderr_received.
|
|
|
|
for line in lines:
|
2019-02-22 16:05:57 +00:00
|
|
|
handle_remote_line(line.decode()) # decode late, avoid partial utf-8 sequences
|
2012-12-06 21:58:57 +00:00
|
|
|
if w:
|
2021-01-03 16:37:16 +00:00
|
|
|
while (
|
|
|
|
(len(self.to_send) <= maximum_to_send)
|
|
|
|
and (calls or self.preload_ids)
|
|
|
|
and len(waiting_for) < MAX_INFLIGHT
|
|
|
|
):
|
2014-01-22 19:58:48 +00:00
|
|
|
if calls:
|
|
|
|
if is_preloaded:
|
2016-11-10 08:41:33 +00:00
|
|
|
assert cmd == "get", "is_preload is only supported for 'get'"
|
2016-11-10 10:08:45 +00:00
|
|
|
if calls[0]["id"] in self.chunkid_to_msgids:
|
|
|
|
waiting_for.append(pop_preload_msgid(calls.pop(0)["id"]))
|
2014-01-22 19:58:48 +00:00
|
|
|
else:
|
|
|
|
args = calls.pop(0)
|
2016-11-10 10:08:45 +00:00
|
|
|
if cmd == "get" and args["id"] in self.chunkid_to_msgids:
|
|
|
|
waiting_for.append(pop_preload_msgid(args["id"]))
|
2014-01-22 19:58:48 +00:00
|
|
|
else:
|
|
|
|
self.msgid += 1
|
|
|
|
waiting_for.append(self.msgid)
|
2016-11-10 08:56:18 +00:00
|
|
|
if self.dictFormat:
|
2021-01-03 16:37:16 +00:00
|
|
|
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
|
2016-11-10 08:56:18 +00:00
|
|
|
else:
|
2021-01-03 16:37:16 +00:00
|
|
|
self.to_send.push_back(
|
|
|
|
msgpack.packb((1, self.msgid, cmd, self.named_to_positional(cmd, args)))
|
2022-07-06 13:37:27 +00:00
|
|
|
)
|
2014-01-22 19:58:48 +00:00
|
|
|
if not self.to_send and self.preload_ids:
|
2016-08-05 19:48:23 +00:00
|
|
|
chunk_id = self.preload_ids.pop(0)
|
2016-11-10 10:08:45 +00:00
|
|
|
args = {"id": chunk_id}
|
2014-01-22 19:58:48 +00:00
|
|
|
self.msgid += 1
|
2016-08-05 19:51:01 +00:00
|
|
|
self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid)
|
2016-11-10 08:56:18 +00:00
|
|
|
if self.dictFormat:
|
2021-01-03 16:37:16 +00:00
|
|
|
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args}))
|
2016-11-10 08:56:18 +00:00
|
|
|
else:
|
2021-01-03 16:37:16 +00:00
|
|
|
self.to_send.push_back(
|
|
|
|
msgpack.packb((1, self.msgid, "get", self.named_to_positional("get", args)))
|
2022-07-06 13:37:27 +00:00
|
|
|
)
|
2012-11-27 23:03:35 +00:00
|
|
|
|
2021-01-03 16:37:16 +00:00
|
|
|
send_buffer()
|
2017-03-05 04:19:32 +00:00
|
|
|
self.ignore_responses |= set(waiting_for) # we lose order here
|
2011-07-06 20:16:07 +00:00
|
|
|
|
2016-07-31 19:19:30 +00:00
|
|
|
@api(
|
|
|
|
since=parse_version("1.0.0"),
|
2019-02-04 16:12:11 +00:00
|
|
|
append_only={"since": parse_version("1.0.7"), "previously": False},
|
|
|
|
make_parent_dirs={"since": parse_version("1.1.9"), "previously": False},
|
|
|
|
)
|
|
|
|
def open(
|
|
|
|
self, path, create=False, lock_wait=None, lock=True, exclusive=False, append_only=False, make_parent_dirs=False
|
|
|
|
):
|
2016-07-31 19:19:30 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
|
|
|
|
2022-06-30 21:55:51 +00:00
|
|
|
@api(since=parse_version("2.0.0a3"))
|
|
|
|
def info(self):
|
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
|
|
|
|
2019-03-10 19:21:22 +00:00
|
|
|
@api(since=parse_version("1.0.0"), max_duration={"since": parse_version("1.2.0a4"), "previously": 0})
|
|
|
|
def check(self, repair=False, save_space=False, max_duration=0):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2014-02-04 22:49:10 +00:00
|
|
|
|
2018-06-24 17:08:49 +00:00
|
|
|
@api(
|
|
|
|
since=parse_version("1.0.0"),
|
2018-07-11 03:19:08 +00:00
|
|
|
compact={"since": parse_version("1.2.0a0"), "previously": True, "dontcare": True},
|
2022-06-25 22:07:07 +00:00
|
|
|
threshold={"since": parse_version("1.2.0a8"), "previously": 0.1, "dontcare": True},
|
|
|
|
)
|
|
|
|
def commit(self, save_space=False, compact=True, threshold=0.1):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2010-11-15 21:18:47 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version("1.0.0"))
|
|
|
|
def rollback(self):
|
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2010-11-15 21:18:47 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version("1.0.0"))
|
2015-03-09 15:02:06 +00:00
|
|
|
def destroy(self):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2015-03-09 15:02:06 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version("1.0.0"))
|
2014-02-08 23:17:32 +00:00
|
|
|
def __len__(self):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2014-02-08 23:17:32 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version("1.0.0"))
|
2014-02-10 20:51:25 +00:00
|
|
|
def list(self, limit=None, marker=None):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2014-02-10 20:51:25 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version("1.1.0b3"))
|
2016-09-23 19:45:01 +00:00
|
|
|
def scan(self, limit=None, marker=None):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2016-09-23 19:45:01 +00:00
|
|
|
|
2016-11-10 10:08:45 +00:00
|
|
|
def get(self, id):
|
|
|
|
for resp in self.get_many([id]):
|
2014-01-22 19:58:48 +00:00
|
|
|
return resp
|
|
|
|
|
|
|
|
def get_many(self, ids, is_preloaded=False):
|
2022-02-27 18:31:33 +00:00
|
|
|
yield from self.call_many("get", [{"id": id} for id in ids], is_preloaded=is_preloaded)
|
2011-07-06 20:16:07 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version("1.0.0"))
|
|
|
|
def put(self, id, data, wait=True):
|
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2012-10-17 09:40:23 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version("1.0.0"))
|
|
|
|
def delete(self, id, wait=True):
|
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2012-11-30 20:47:35 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version("1.0.0"))
|
2015-07-14 22:01:07 +00:00
|
|
|
def save_key(self, keydata):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2015-07-14 22:01:07 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version("1.0.0"))
|
2015-07-14 22:01:07 +00:00
|
|
|
def load_key(self):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2015-07-14 22:01:07 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version("1.0.0"))
|
2016-07-25 18:38:31 +00:00
|
|
|
def get_free_nonce(self):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2016-07-25 18:38:31 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version("1.0.0"))
|
2016-07-25 18:38:31 +00:00
|
|
|
def commit_nonce_reservation(self, next_unreserved, start_nonce):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2016-07-25 18:38:31 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version("1.0.0"))
|
2015-11-21 19:50:53 +00:00
|
|
|
def break_lock(self):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2015-11-21 19:50:53 +00:00
|
|
|
|
2012-11-30 20:47:35 +00:00
|
|
|
def close(self):
|
|
|
|
if self.p:
|
|
|
|
self.p.stdin.close()
|
|
|
|
self.p.stdout.close()
|
|
|
|
self.p.wait()
|
|
|
|
self.p = None
|
2014-01-22 19:58:48 +00:00
|
|
|
|
2017-03-05 04:19:32 +00:00
|
|
|
def async_response(self, wait=True):
|
|
|
|
for resp in self.call_many("async_responses", calls=[], wait=True, async_wait=wait):
|
|
|
|
return resp
|
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
def preload(self, ids):
|
|
|
|
self.preload_ids += ids
|
2014-03-13 21:29:47 +00:00
|
|
|
|
|
|
|
|
2016-05-27 21:16:31 +00:00
|
|
|
def handle_remote_line(line):
|
2017-05-17 15:17:15 +00:00
|
|
|
"""
|
|
|
|
Handle a remote log line.
|
|
|
|
|
2017-05-20 10:52:32 +00:00
|
|
|
This function is remarkably complex because it handles multiple wire formats.
|
2017-05-17 15:17:15 +00:00
|
|
|
"""
|
2017-09-24 23:15:13 +00:00
|
|
|
assert line.endswith(("\r", "\n"))
|
2017-05-17 15:17:15 +00:00
|
|
|
if line.startswith("{"):
|
|
|
|
# This format is used by Borg since 1.1.0b6 for new-protocol clients.
|
|
|
|
# It is the same format that is exposed by --log-json.
|
|
|
|
msg = json.loads(line)
|
|
|
|
|
|
|
|
if msg["type"] not in ("progress_message", "progress_percent", "log_message"):
|
|
|
|
logger.warning("Dropped remote log message with unknown type %r: %s", msg["type"], line)
|
|
|
|
return
|
|
|
|
|
|
|
|
if msg["type"] == "log_message":
|
|
|
|
# Re-emit log messages on the same level as the remote to get correct log suppression and verbosity.
|
|
|
|
level = getattr(logging, msg["levelname"], logging.CRITICAL)
|
|
|
|
assert isinstance(level, int)
|
|
|
|
target_logger = logging.getLogger(msg["name"])
|
2017-05-18 14:54:44 +00:00
|
|
|
msg["message"] = "Remote: " + msg["message"]
|
2017-05-20 10:52:32 +00:00
|
|
|
# In JSON mode, we manually check whether the log message should be propagated.
|
|
|
|
if logging.getLogger("borg").json and level >= target_logger.getEffectiveLevel():
|
2017-05-18 14:54:44 +00:00
|
|
|
sys.stderr.write(json.dumps(msg) + "\n")
|
2017-05-17 15:17:15 +00:00
|
|
|
else:
|
|
|
|
target_logger.log(level, "%s", msg["message"])
|
2017-05-18 14:54:44 +00:00
|
|
|
elif msg["type"].startswith("progress_"):
|
2017-05-17 15:17:15 +00:00
|
|
|
# Progress messages are a bit more complex.
|
|
|
|
# First of all, we check whether progress output is enabled. This is signalled
|
|
|
|
# through the effective level of the borg.output.progress logger
|
|
|
|
# (also see ProgressIndicatorBase in borg.helpers).
|
|
|
|
progress_logger = logging.getLogger("borg.output.progress")
|
|
|
|
if progress_logger.getEffectiveLevel() == logging.INFO:
|
2017-05-18 14:54:44 +00:00
|
|
|
# When progress output is enabled, we check whether the client is in
|
2017-05-17 15:17:15 +00:00
|
|
|
# --log-json mode, as signalled by the "json" attribute on the "borg" logger.
|
|
|
|
if logging.getLogger("borg").json:
|
2017-05-20 10:52:32 +00:00
|
|
|
# In --log-json mode we re-emit the progress JSON line as sent by the server,
|
|
|
|
# with the message, if any, prefixed with "Remote: ".
|
2017-05-18 14:54:44 +00:00
|
|
|
if "message" in msg:
|
|
|
|
msg["message"] = "Remote: " + msg["message"]
|
|
|
|
sys.stderr.write(json.dumps(msg) + "\n")
|
|
|
|
elif "message" in msg:
|
2017-05-17 15:17:15 +00:00
|
|
|
# In text log mode we write only the message to stderr and terminate with \r
|
|
|
|
# (carriage return, i.e. move the write cursor back to the beginning of the line)
|
|
|
|
# so that the next message, progress or not, overwrites it. This mirrors the behaviour
|
|
|
|
# of local progress displays.
|
2017-05-18 14:54:44 +00:00
|
|
|
sys.stderr.write("Remote: " + msg["message"] + "\r")
|
2017-05-17 15:17:15 +00:00
|
|
|
elif line.startswith("$LOG "):
|
2017-09-21 02:11:59 +00:00
|
|
|
# This format is used by borg serve 0.xx, 1.0.x and 1.1.0b1..b5.
|
2017-05-17 15:17:15 +00:00
|
|
|
# It prefixed log lines with $LOG as a marker, followed by the log level
|
|
|
|
# and optionally a logger name, then "Remote:" as a separator followed by the original
|
|
|
|
# message.
|
2016-05-27 21:16:31 +00:00
|
|
|
_, level, msg = line.split(" ", 2)
|
|
|
|
level = getattr(logging, level, logging.CRITICAL) # str -> int
|
|
|
|
if msg.startswith("Remote:"):
|
|
|
|
# server format: '$LOG <level> Remote: <msg>'
|
|
|
|
logging.log(level, msg.rstrip())
|
|
|
|
else:
|
|
|
|
# server format '$LOG <level> <logname> Remote: <msg>'
|
|
|
|
logname, msg = msg.split(" ", 1)
|
|
|
|
logging.getLogger(logname).log(level, msg.rstrip())
|
|
|
|
else:
|
2017-05-17 15:17:15 +00:00
|
|
|
# Plain 1.0.x and older format - re-emit to stderr (mirroring what the 1.0.x
|
|
|
|
# client did) or as a generic log message.
|
|
|
|
# We don't know what priority the line had.
|
|
|
|
if logging.getLogger("borg").json:
|
|
|
|
logging.getLogger("").warning("Remote: " + line.strip())
|
|
|
|
else:
|
|
|
|
# In non-JSON mode we circumvent logging to preserve carriage returns (\r)
|
|
|
|
# which are generated by remote progress displays.
|
|
|
|
sys.stderr.write("Remote: " + line)
|
2016-05-27 21:16:31 +00:00
|
|
|
|
|
|
|
|
2016-01-16 22:42:54 +00:00
|
|
|
class RepositoryNoCache:
|
|
|
|
"""A not caching Repository wrapper, passes through to repository.
|
2014-03-13 21:29:47 +00:00
|
|
|
|
2016-01-16 22:42:54 +00:00
|
|
|
Just to have same API (including the context manager) as RepositoryCache.
|
2017-05-28 11:16:52 +00:00
|
|
|
|
|
|
|
*transform* is a callable taking two arguments, key and raw repository data.
|
|
|
|
The return value is returned from get()/get_many(). By default, the raw
|
|
|
|
repository data is returned.
|
2014-03-13 21:29:47 +00:00
|
|
|
"""
|
2022-07-06 13:37:27 +00:00
|
|
|
|
2017-05-28 11:16:52 +00:00
|
|
|
def __init__(self, repository, transform=None):
|
2014-03-13 21:29:47 +00:00
|
|
|
self.repository = repository
|
2017-05-28 11:16:52 +00:00
|
|
|
self.transform = transform or (lambda key, data: data)
|
2014-03-13 21:29:47 +00:00
|
|
|
|
2016-01-16 22:42:54 +00:00
|
|
|
def close(self):
|
|
|
|
pass
|
|
|
|
|
|
|
|
def __enter__(self):
|
|
|
|
return self
|
|
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
|
self.close()
|
2014-03-13 21:29:47 +00:00
|
|
|
|
|
|
|
def get(self, key):
|
2016-05-19 23:54:42 +00:00
|
|
|
return next(self.get_many([key], cache=False))
|
2014-03-13 21:29:47 +00:00
|
|
|
|
2016-05-19 23:54:42 +00:00
|
|
|
def get_many(self, keys, cache=True):
|
2017-05-28 11:16:52 +00:00
|
|
|
for key, data in zip(keys, self.repository.get_many(keys)):
|
|
|
|
yield self.transform(key, data)
|
2016-01-16 22:42:54 +00:00
|
|
|
|
2017-06-13 18:03:39 +00:00
|
|
|
def log_instrumentation(self):
|
|
|
|
pass
|
|
|
|
|
2016-01-16 22:42:54 +00:00
|
|
|
|
|
|
|
class RepositoryCache(RepositoryNoCache):
|
2016-05-19 23:54:42 +00:00
|
|
|
"""
|
|
|
|
A caching Repository wrapper.
|
2016-01-16 22:42:54 +00:00
|
|
|
|
2016-05-19 23:54:42 +00:00
|
|
|
Caches Repository GET operations locally.
|
2017-05-28 11:16:52 +00:00
|
|
|
|
|
|
|
*pack* and *unpack* complement *transform* of the base class.
|
|
|
|
*pack* receives the output of *transform* and should return bytes,
|
|
|
|
which are stored in the cache. *unpack* receives these bytes and
|
|
|
|
should return the initial data (as returned by *transform*).
|
2016-01-16 22:42:54 +00:00
|
|
|
"""
|
2016-05-20 00:01:07 +00:00
|
|
|
|
2017-05-28 11:16:52 +00:00
|
|
|
def __init__(self, repository, pack=None, unpack=None, transform=None):
|
|
|
|
super().__init__(repository, transform)
|
|
|
|
self.pack = pack or (lambda data: data)
|
|
|
|
self.unpack = unpack or (lambda data: data)
|
2016-05-19 23:54:42 +00:00
|
|
|
self.cache = set()
|
|
|
|
self.basedir = tempfile.mkdtemp(prefix="borg-cache-")
|
|
|
|
self.query_size_limit()
|
|
|
|
self.size = 0
|
|
|
|
# Instrumentation
|
|
|
|
self.hits = 0
|
|
|
|
self.misses = 0
|
|
|
|
self.slow_misses = 0
|
|
|
|
self.slow_lat = 0.0
|
|
|
|
self.evictions = 0
|
|
|
|
self.enospc = 0
|
|
|
|
|
|
|
|
def query_size_limit(self):
|
2019-11-03 16:40:34 +00:00
|
|
|
available_space = shutil.disk_usage(self.basedir).free
|
2016-05-19 23:54:42 +00:00
|
|
|
self.size_limit = int(min(available_space * 0.25, 2**31))
|
|
|
|
|
|
|
|
def key_filename(self, key):
|
|
|
|
return os.path.join(self.basedir, bin_to_hex(key))
|
|
|
|
|
|
|
|
def backoff(self):
|
|
|
|
self.query_size_limit()
|
|
|
|
target_size = int(0.9 * self.size_limit)
|
|
|
|
while self.size > target_size and self.cache:
|
|
|
|
key = self.cache.pop()
|
|
|
|
file = self.key_filename(key)
|
|
|
|
self.size -= os.stat(file).st_size
|
|
|
|
os.unlink(file)
|
|
|
|
self.evictions += 1
|
|
|
|
|
2017-05-28 11:16:52 +00:00
|
|
|
def add_entry(self, key, data, cache):
|
|
|
|
transformed = self.transform(key, data)
|
|
|
|
if not cache:
|
|
|
|
return transformed
|
|
|
|
packed = self.pack(transformed)
|
2016-05-19 23:54:42 +00:00
|
|
|
file = self.key_filename(key)
|
|
|
|
try:
|
|
|
|
with open(file, "wb") as fd:
|
2017-05-28 11:16:52 +00:00
|
|
|
fd.write(packed)
|
2016-05-19 23:54:42 +00:00
|
|
|
except OSError as os_error:
|
2017-06-11 17:44:33 +00:00
|
|
|
try:
|
2022-02-15 18:39:58 +00:00
|
|
|
safe_unlink(file)
|
2017-06-11 17:44:33 +00:00
|
|
|
except FileNotFoundError:
|
|
|
|
pass # open() could have failed as well
|
2016-05-19 23:54:42 +00:00
|
|
|
if os_error.errno == errno.ENOSPC:
|
|
|
|
self.enospc += 1
|
|
|
|
self.backoff()
|
|
|
|
else:
|
|
|
|
raise
|
|
|
|
else:
|
2017-05-28 11:16:52 +00:00
|
|
|
self.size += len(packed)
|
2016-05-19 23:54:42 +00:00
|
|
|
self.cache.add(key)
|
|
|
|
if self.size > self.size_limit:
|
|
|
|
self.backoff()
|
2017-05-28 11:16:52 +00:00
|
|
|
return transformed
|
2016-01-16 22:42:54 +00:00
|
|
|
|
2017-06-13 18:03:39 +00:00
|
|
|
def log_instrumentation(self):
|
2016-05-19 23:54:42 +00:00
|
|
|
logger.debug(
|
|
|
|
"RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), "
|
2017-06-03 10:27:35 +00:00
|
|
|
"%d evictions, %d ENOSPC hit",
|
2016-05-19 23:54:42 +00:00
|
|
|
len(self.cache),
|
|
|
|
format_file_size(self.size),
|
|
|
|
format_file_size(self.size_limit),
|
|
|
|
self.hits,
|
|
|
|
self.misses,
|
|
|
|
self.slow_misses,
|
|
|
|
self.slow_lat,
|
2017-06-03 10:27:35 +00:00
|
|
|
self.evictions,
|
|
|
|
self.enospc,
|
|
|
|
)
|
2017-06-13 18:03:39 +00:00
|
|
|
|
|
|
|
def close(self):
|
|
|
|
self.log_instrumentation()
|
2016-05-19 23:54:42 +00:00
|
|
|
self.cache.clear()
|
|
|
|
shutil.rmtree(self.basedir)
|
|
|
|
|
|
|
|
def get_many(self, keys, cache=True):
|
|
|
|
unknown_keys = [key for key in keys if key not in self.cache]
|
2014-03-13 21:29:47 +00:00
|
|
|
repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys))
|
|
|
|
for key in keys:
|
2016-05-19 23:54:42 +00:00
|
|
|
if key in self.cache:
|
|
|
|
file = self.key_filename(key)
|
2017-06-03 10:27:35 +00:00
|
|
|
with open(file, "rb") as fd:
|
|
|
|
self.hits += 1
|
|
|
|
yield self.unpack(fd.read())
|
|
|
|
else:
|
2014-03-13 21:29:47 +00:00
|
|
|
for key_, data in repository_iterator:
|
|
|
|
if key_ == key:
|
2017-06-03 10:27:35 +00:00
|
|
|
transformed = self.add_entry(key, data, cache)
|
|
|
|
self.misses += 1
|
|
|
|
yield transformed
|
2014-03-13 21:29:47 +00:00
|
|
|
break
|
2017-06-03 10:27:35 +00:00
|
|
|
else:
|
|
|
|
# slow path: eviction during this get_many removed this key from the cache
|
|
|
|
t0 = time.perf_counter()
|
|
|
|
data = self.repository.get(key)
|
|
|
|
self.slow_lat += time.perf_counter() - t0
|
2017-05-28 11:16:52 +00:00
|
|
|
transformed = self.add_entry(key, data, cache)
|
2017-06-03 10:27:35 +00:00
|
|
|
self.slow_misses += 1
|
2017-05-28 11:16:52 +00:00
|
|
|
yield transformed
|
2014-03-13 21:29:47 +00:00
|
|
|
# Consume any pending requests
|
|
|
|
for _ in repository_iterator:
|
|
|
|
pass
|
2014-03-26 21:42:20 +00:00
|
|
|
|
|
|
|
|
2017-06-03 10:14:17 +00:00
|
|
|
def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None, transform=None, force_cache=False):
|
2017-05-28 11:16:52 +00:00
|
|
|
"""
|
|
|
|
Return a Repository(No)Cache for *repository*.
|
|
|
|
|
|
|
|
If *decrypted_cache* is a key object, then get and get_many will return a tuple
|
|
|
|
(csize, plaintext) instead of the actual data in the repository. The cache will
|
|
|
|
store decrypted data, which increases CPU efficiency (by avoiding repeatedly decrypting
|
|
|
|
and more importantly MAC and ID checking cached objects).
|
|
|
|
Internally, objects are compressed with LZ4.
|
|
|
|
"""
|
|
|
|
if decrypted_cache and (pack or unpack or transform):
|
|
|
|
raise ValueError("decrypted_cache and pack/unpack/transform are incompatible")
|
|
|
|
elif decrypted_cache:
|
|
|
|
key = decrypted_cache
|
2017-06-03 10:14:17 +00:00
|
|
|
# 32 bit csize, 64 bit (8 byte) xxh64
|
|
|
|
cache_struct = struct.Struct("=I8s")
|
2019-07-14 09:39:58 +00:00
|
|
|
compressor = Compressor("lz4")
|
2017-05-28 11:16:52 +00:00
|
|
|
|
|
|
|
def pack(data):
|
2017-06-03 10:14:17 +00:00
|
|
|
csize, decrypted = data
|
|
|
|
compressed = compressor.compress(decrypted)
|
|
|
|
return cache_struct.pack(csize, xxh64(compressed)) + compressed
|
2017-05-28 11:16:52 +00:00
|
|
|
|
|
|
|
def unpack(data):
|
2017-06-03 10:14:17 +00:00
|
|
|
data = memoryview(data)
|
|
|
|
csize, checksum = cache_struct.unpack(data[: cache_struct.size])
|
|
|
|
compressed = data[cache_struct.size :]
|
|
|
|
if checksum != xxh64(compressed):
|
2017-06-03 10:27:35 +00:00
|
|
|
raise IntegrityError("detected corrupted data in metadata cache")
|
2017-06-03 10:14:17 +00:00
|
|
|
return csize, compressor.decompress(compressed)
|
2017-05-28 11:16:52 +00:00
|
|
|
|
|
|
|
def transform(id_, data):
|
|
|
|
csize = len(data)
|
|
|
|
decrypted = key.decrypt(id_, data)
|
|
|
|
return csize, decrypted
|
|
|
|
|
2017-06-03 10:14:17 +00:00
|
|
|
if isinstance(repository, RemoteRepository) or force_cache:
|
2017-05-28 11:16:52 +00:00
|
|
|
return RepositoryCache(repository, pack, unpack, transform)
|
2016-01-16 22:42:54 +00:00
|
|
|
else:
|
2017-05-28 11:16:52 +00:00
|
|
|
return RepositoryNoCache(repository, transform)
|