mirror of https://github.com/borgbackup/borg.git
Merge pull request #7603 from ThomasWaldmann/remove-legacy
borg.remote: remove legacy
This commit is contained in:
commit
fbb60140ac
|
@ -482,8 +482,8 @@ class Archiver(
|
||||||
func = get_func(args)
|
func = get_func(args)
|
||||||
# do not use loggers before this!
|
# do not use loggers before this!
|
||||||
is_serve = func == self.do_serve
|
is_serve = func == self.do_serve
|
||||||
setup_logging(level=args.log_level, is_serve=is_serve, json=args.log_json)
|
self.log_json = args.log_json or is_serve
|
||||||
self.log_json = args.log_json
|
setup_logging(level=args.log_level, json=self.log_json)
|
||||||
args.progress |= is_serve
|
args.progress |= is_serve
|
||||||
self._setup_implied_logging(vars(args))
|
self._setup_implied_logging(vars(args))
|
||||||
self._setup_topic_debugging(args)
|
self._setup_topic_debugging(args)
|
||||||
|
|
|
@ -53,7 +53,7 @@ def _log_warning(message, category, filename, lineno, file=None, line=None):
|
||||||
logger.warning(msg)
|
logger.warning(msg)
|
||||||
|
|
||||||
|
|
||||||
def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", level="info", is_serve=False, json=False):
|
def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", level="info", json=False):
|
||||||
"""setup logging module according to the arguments provided
|
"""setup logging module according to the arguments provided
|
||||||
|
|
||||||
if conf_fname is given (or the config file name can be determined via
|
if conf_fname is given (or the config file name can be determined via
|
||||||
|
@ -61,9 +61,6 @@ def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", lev
|
||||||
|
|
||||||
otherwise, set up a stream handler logger on stderr (by default, if no
|
otherwise, set up a stream handler logger on stderr (by default, if no
|
||||||
stream is provided).
|
stream is provided).
|
||||||
|
|
||||||
if is_serve == True, we configure a special log format as expected by
|
|
||||||
the borg client log message interceptor.
|
|
||||||
"""
|
"""
|
||||||
global configured
|
global configured
|
||||||
err_msg = None
|
err_msg = None
|
||||||
|
@ -90,21 +87,12 @@ def setup_logging(stream=None, conf_fname=None, env_var="BORG_LOGGING_CONF", lev
|
||||||
# if we did not / not successfully load a logging configuration, fallback to this:
|
# if we did not / not successfully load a logging configuration, fallback to this:
|
||||||
logger = logging.getLogger("")
|
logger = logging.getLogger("")
|
||||||
handler = logging.StreamHandler(stream)
|
handler = logging.StreamHandler(stream)
|
||||||
if is_serve and not json:
|
fmt = "%(message)s"
|
||||||
fmt = "$LOG %(levelname)s %(name)s Remote: %(message)s"
|
|
||||||
else:
|
|
||||||
fmt = "%(message)s"
|
|
||||||
formatter = JsonFormatter(fmt) if json else logging.Formatter(fmt)
|
formatter = JsonFormatter(fmt) if json else logging.Formatter(fmt)
|
||||||
handler.setFormatter(formatter)
|
handler.setFormatter(formatter)
|
||||||
borg_logger = logging.getLogger("borg")
|
borg_logger = logging.getLogger("borg")
|
||||||
borg_logger.formatter = formatter
|
borg_logger.formatter = formatter
|
||||||
borg_logger.json = json
|
borg_logger.json = json
|
||||||
if configured and logger.handlers:
|
|
||||||
# The RepositoryServer can call setup_logging a second time to adjust the output
|
|
||||||
# mode from text-ish is_serve to json is_serve.
|
|
||||||
# Thus, remove the previously installed handler, if any.
|
|
||||||
logger.handlers[0].close()
|
|
||||||
logger.handlers.clear()
|
|
||||||
logger.addHandler(handler)
|
logger.addHandler(handler)
|
||||||
logger.setLevel(level.upper())
|
logger.setLevel(level.upper())
|
||||||
configured = True
|
configured = True
|
||||||
|
|
|
@ -27,7 +27,7 @@ from .helpers import sysinfo
|
||||||
from .helpers import format_file_size
|
from .helpers import format_file_size
|
||||||
from .helpers import safe_unlink
|
from .helpers import safe_unlink
|
||||||
from .helpers import prepare_subprocess_env, ignore_sigint
|
from .helpers import prepare_subprocess_env, ignore_sigint
|
||||||
from .logger import create_logger, setup_logging
|
from .logger import create_logger
|
||||||
from .helpers import msgpack
|
from .helpers import msgpack
|
||||||
from .repository import Repository
|
from .repository import Repository
|
||||||
from .version import parse_version, format_version
|
from .version import parse_version, format_version
|
||||||
|
@ -36,7 +36,6 @@ from .helpers.datastruct import EfficientCollectionQueue
|
||||||
|
|
||||||
logger = create_logger(__name__)
|
logger = create_logger(__name__)
|
||||||
|
|
||||||
RPC_PROTOCOL_VERSION = 2
|
|
||||||
BORG_VERSION = parse_version(__version__)
|
BORG_VERSION = parse_version(__version__)
|
||||||
MSGID, MSG, ARGS, RESULT = "i", "m", "a", "r"
|
MSGID, MSG, ARGS, RESULT = "i", "m", "a", "r"
|
||||||
|
|
||||||
|
@ -101,12 +100,7 @@ class UnexpectedRPCDataFormatFromServer(Error):
|
||||||
# In general the server is responsible for rejecting too old clients and the client it responsible for rejecting
|
# In general the server is responsible for rejecting too old clients and the client it responsible for rejecting
|
||||||
# too old servers. This ensures that the knowledge what is compatible is always held by the newer component.
|
# too old servers. This ensures that the knowledge what is compatible is always held by the newer component.
|
||||||
#
|
#
|
||||||
# The server can do checks for the client version in RepositoryServer.negotiate. If the client_data is 2 then
|
# For the client the return of the negotiate method is a dict which includes the server version.
|
||||||
# client is in the version range [0.29.0, 1.0.x] inclusive. For newer clients client_data is a dict which contains
|
|
||||||
# client_version.
|
|
||||||
#
|
|
||||||
# For the client the return of the negotiate method is either 2 if the server is in the version range [0.29.0, 1.0.x]
|
|
||||||
# inclusive, or it is a dict which includes the server version.
|
|
||||||
#
|
#
|
||||||
# All method calls on the remote repository object must be allowlisted in RepositoryServer.rpc_methods and have api
|
# All method calls on the remote repository object must be allowlisted in RepositoryServer.rpc_methods and have api
|
||||||
# stubs in RemoteRepository. The @api decorator on these stubs is used to set server version requirements.
|
# stubs in RemoteRepository. The @api decorator on these stubs is used to set server version requirements.
|
||||||
|
@ -118,25 +112,6 @@ class UnexpectedRPCDataFormatFromServer(Error):
|
||||||
# servers still get compatible input.
|
# servers still get compatible input.
|
||||||
|
|
||||||
|
|
||||||
compatMap = {
|
|
||||||
"check": ("repair",),
|
|
||||||
"commit": (),
|
|
||||||
"rollback": (),
|
|
||||||
"destroy": (),
|
|
||||||
"__len__": (),
|
|
||||||
"list": ("limit", "marker"),
|
|
||||||
"put": ("id", "data"),
|
|
||||||
"get": ("id",),
|
|
||||||
"delete": ("id",),
|
|
||||||
"save_key": ("keydata",),
|
|
||||||
"load_key": (),
|
|
||||||
"break_lock": (),
|
|
||||||
"negotiate": ("client_data",),
|
|
||||||
"open": ("path", "create", "lock_wait", "lock", "exclusive", "append_only"),
|
|
||||||
"info": (),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class RepositoryServer: # pragma: no cover
|
class RepositoryServer: # pragma: no cover
|
||||||
rpc_methods = (
|
rpc_methods = (
|
||||||
"__len__",
|
"__len__",
|
||||||
|
@ -170,21 +145,7 @@ class RepositoryServer: # pragma: no cover
|
||||||
# (see RepositoryServer.open below).
|
# (see RepositoryServer.open below).
|
||||||
self.append_only = append_only
|
self.append_only = append_only
|
||||||
self.storage_quota = storage_quota
|
self.storage_quota = storage_quota
|
||||||
self.client_version = parse_version(
|
self.client_version = None # we update this after client sends version information
|
||||||
"1.0.8"
|
|
||||||
) # fallback version if client is too old to send version information
|
|
||||||
|
|
||||||
def positional_to_named(self, method, argv):
|
|
||||||
"""Translate from positional protocol to named protocol."""
|
|
||||||
try:
|
|
||||||
return {name: argv[pos] for pos, name in enumerate(compatMap[method])}
|
|
||||||
except IndexError:
|
|
||||||
if method == "open" and len(argv) == 4:
|
|
||||||
# borg clients < 1.0.7 use open() with 4 args
|
|
||||||
mapping = compatMap[method][:4]
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
return {name: argv[pos] for pos, name in enumerate(mapping)}
|
|
||||||
|
|
||||||
def filter_args(self, f, kwargs):
|
def filter_args(self, f, kwargs):
|
||||||
"""Remove unknown named parameters from call, because client did (implicitly) say it's ok."""
|
"""Remove unknown named parameters from call, because client did (implicitly) say it's ok."""
|
||||||
|
@ -217,15 +178,9 @@ class RepositoryServer: # pragma: no cover
|
||||||
unpacker.feed(data)
|
unpacker.feed(data)
|
||||||
for unpacked in unpacker:
|
for unpacked in unpacker:
|
||||||
if isinstance(unpacked, dict):
|
if isinstance(unpacked, dict):
|
||||||
dictFormat = True
|
|
||||||
msgid = unpacked[MSGID]
|
msgid = unpacked[MSGID]
|
||||||
method = unpacked[MSG]
|
method = unpacked[MSG]
|
||||||
args = unpacked[ARGS]
|
args = unpacked[ARGS]
|
||||||
elif isinstance(unpacked, tuple) and len(unpacked) == 4:
|
|
||||||
dictFormat = False
|
|
||||||
# The first field 'type' was always 1 and has always been ignored
|
|
||||||
_, msgid, method, args = unpacked
|
|
||||||
args = self.positional_to_named(method, args)
|
|
||||||
else:
|
else:
|
||||||
if self.repository is not None:
|
if self.repository is not None:
|
||||||
self.repository.close()
|
self.repository.close()
|
||||||
|
@ -240,87 +195,56 @@ class RepositoryServer: # pragma: no cover
|
||||||
args = self.filter_args(f, args)
|
args = self.filter_args(f, args)
|
||||||
res = f(**args)
|
res = f(**args)
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
if dictFormat:
|
ex_short = traceback.format_exception_only(e.__class__, e)
|
||||||
ex_short = traceback.format_exception_only(e.__class__, e)
|
ex_full = traceback.format_exception(*sys.exc_info())
|
||||||
ex_full = traceback.format_exception(*sys.exc_info())
|
ex_trace = True
|
||||||
ex_trace = True
|
if isinstance(e, Error):
|
||||||
if isinstance(e, Error):
|
ex_short = [e.get_message()]
|
||||||
ex_short = [e.get_message()]
|
ex_trace = e.traceback
|
||||||
ex_trace = e.traceback
|
if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
|
||||||
if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
|
# These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
|
||||||
# These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
|
# and will be handled just like locally raised exceptions. Suppress the remote traceback
|
||||||
# and will be handled just like locally raised exceptions. Suppress the remote traceback
|
# for these, except ErrorWithTraceback, which should always display a traceback.
|
||||||
# for these, except ErrorWithTraceback, which should always display a traceback.
|
pass
|
||||||
pass
|
|
||||||
else:
|
|
||||||
logging.debug("\n".join(ex_full))
|
|
||||||
|
|
||||||
try:
|
|
||||||
msg = msgpack.packb(
|
|
||||||
{
|
|
||||||
MSGID: msgid,
|
|
||||||
"exception_class": e.__class__.__name__,
|
|
||||||
"exception_args": e.args,
|
|
||||||
"exception_full": ex_full,
|
|
||||||
"exception_short": ex_short,
|
|
||||||
"exception_trace": ex_trace,
|
|
||||||
"sysinfo": sysinfo(),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
except TypeError:
|
|
||||||
msg = msgpack.packb(
|
|
||||||
{
|
|
||||||
MSGID: msgid,
|
|
||||||
"exception_class": e.__class__.__name__,
|
|
||||||
"exception_args": [
|
|
||||||
x if isinstance(x, (str, bytes, int)) else None for x in e.args
|
|
||||||
],
|
|
||||||
"exception_full": ex_full,
|
|
||||||
"exception_short": ex_short,
|
|
||||||
"exception_trace": ex_trace,
|
|
||||||
"sysinfo": sysinfo(),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
os_write(stdout_fd, msg)
|
|
||||||
else:
|
else:
|
||||||
if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
|
logging.debug("\n".join(ex_full))
|
||||||
# These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
|
|
||||||
# and will be handled just like locally raised exceptions. Suppress the remote traceback
|
sys_info = sysinfo()
|
||||||
# for these, except ErrorWithTraceback, which should always display a traceback.
|
try:
|
||||||
pass
|
msg = msgpack.packb(
|
||||||
else:
|
{
|
||||||
if isinstance(e, Error):
|
MSGID: msgid,
|
||||||
tb_log_level = logging.ERROR if e.traceback else logging.DEBUG
|
"exception_class": e.__class__.__name__,
|
||||||
msg = e.get_message()
|
"exception_args": e.args,
|
||||||
else:
|
"exception_full": ex_full,
|
||||||
tb_log_level = logging.ERROR
|
"exception_short": ex_short,
|
||||||
msg = "%s Exception in RPC call" % e.__class__.__name__
|
"exception_trace": ex_trace,
|
||||||
tb = f"{traceback.format_exc()}\n{sysinfo()}"
|
"sysinfo": sys_info,
|
||||||
logging.error(msg)
|
}
|
||||||
logging.log(tb_log_level, tb)
|
)
|
||||||
exc = "Remote Exception (see remote log for the traceback)"
|
except TypeError:
|
||||||
os_write(stdout_fd, msgpack.packb((1, msgid, e.__class__.__name__, exc)))
|
msg = msgpack.packb(
|
||||||
|
{
|
||||||
|
MSGID: msgid,
|
||||||
|
"exception_class": e.__class__.__name__,
|
||||||
|
"exception_args": [x if isinstance(x, (str, bytes, int)) else None for x in e.args],
|
||||||
|
"exception_full": ex_full,
|
||||||
|
"exception_short": ex_short,
|
||||||
|
"exception_trace": ex_trace,
|
||||||
|
"sysinfo": sys_info,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
os_write(stdout_fd, msg)
|
||||||
else:
|
else:
|
||||||
if dictFormat:
|
os_write(stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res}))
|
||||||
os_write(stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res}))
|
|
||||||
else:
|
|
||||||
os_write(stdout_fd, msgpack.packb((1, msgid, None, res)))
|
|
||||||
if es:
|
if es:
|
||||||
self.repository.close()
|
self.repository.close()
|
||||||
return
|
return
|
||||||
|
|
||||||
def negotiate(self, client_data):
|
def negotiate(self, client_data):
|
||||||
# old format used in 1.0.x
|
|
||||||
if client_data == RPC_PROTOCOL_VERSION:
|
|
||||||
return RPC_PROTOCOL_VERSION
|
|
||||||
# clients since 1.1.0b3 use a dict as client_data
|
|
||||||
# clients since 1.1.0b6 support json log format from server
|
|
||||||
if isinstance(client_data, dict):
|
if isinstance(client_data, dict):
|
||||||
self.client_version = client_data["client_version"]
|
self.client_version = client_data["client_version"]
|
||||||
level = logging.getLevelName(logging.getLogger("").level)
|
|
||||||
setup_logging(is_serve=True, json=True, level=level)
|
|
||||||
logger.debug("Initialized logging system for JSON-based protocol")
|
|
||||||
else:
|
else:
|
||||||
self.client_version = BORG_VERSION # seems to be newer than current version (no known old format)
|
self.client_version = BORG_VERSION # seems to be newer than current version (no known old format)
|
||||||
|
|
||||||
|
@ -495,15 +419,11 @@ class RemoteRepository:
|
||||||
|
|
||||||
class RPCError(Exception):
|
class RPCError(Exception):
|
||||||
def __init__(self, unpacked):
|
def __init__(self, unpacked):
|
||||||
# for borg < 1.1: unpacked only has 'exception_class' as key
|
# unpacked has keys: 'exception_args', 'exception_full', 'exception_short', 'sysinfo'
|
||||||
# for borg 1.1+: unpacked has keys: 'exception_args', 'exception_full', 'exception_short', 'sysinfo'
|
|
||||||
self.unpacked = unpacked
|
self.unpacked = unpacked
|
||||||
|
|
||||||
def get_message(self):
|
def get_message(self):
|
||||||
if "exception_short" in self.unpacked:
|
return "\n".join(self.unpacked["exception_short"])
|
||||||
return "\n".join(self.unpacked["exception_short"])
|
|
||||||
else:
|
|
||||||
return self.exception_class
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def traceback(self):
|
def traceback(self):
|
||||||
|
@ -515,17 +435,11 @@ class RemoteRepository:
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def exception_full(self):
|
def exception_full(self):
|
||||||
if "exception_full" in self.unpacked:
|
return "\n".join(self.unpacked["exception_full"])
|
||||||
return "\n".join(self.unpacked["exception_full"])
|
|
||||||
else:
|
|
||||||
return self.get_message() + "\nRemote Exception (see remote log for the traceback)"
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def sysinfo(self):
|
def sysinfo(self):
|
||||||
if "sysinfo" in self.unpacked:
|
return self.unpacked["sysinfo"]
|
||||||
return self.unpacked["sysinfo"]
|
|
||||||
else:
|
|
||||||
return ""
|
|
||||||
|
|
||||||
class RPCServerOutdated(Error):
|
class RPCServerOutdated(Error):
|
||||||
"""Borg server is too old for {}. Required version {}"""
|
"""Borg server is too old for {}. Required version {}"""
|
||||||
|
@ -538,9 +452,6 @@ class RemoteRepository:
|
||||||
def required_version(self):
|
def required_version(self):
|
||||||
return self.args[1]
|
return self.args[1]
|
||||||
|
|
||||||
# If compatibility with 1.0.x is not longer needed, replace all checks of this with True and simplify the code
|
|
||||||
dictFormat = False # outside of __init__ for testing of legacy free protocol
|
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
location,
|
location,
|
||||||
|
@ -567,9 +478,7 @@ class RemoteRepository:
|
||||||
self.ratelimit = SleepingBandwidthLimiter(args.upload_ratelimit * 1024 if args and args.upload_ratelimit else 0)
|
self.ratelimit = SleepingBandwidthLimiter(args.upload_ratelimit * 1024 if args and args.upload_ratelimit else 0)
|
||||||
self.upload_buffer_size_limit = args.upload_buffer * 1024 * 1024 if args and args.upload_buffer else 0
|
self.upload_buffer_size_limit = args.upload_buffer * 1024 * 1024 if args and args.upload_buffer else 0
|
||||||
self.unpacker = get_limited_unpacker("client")
|
self.unpacker = get_limited_unpacker("client")
|
||||||
self.server_version = parse_version(
|
self.server_version = None # we update this after server sends its version
|
||||||
"1.0.8"
|
|
||||||
) # fallback version if server is too old to send version information
|
|
||||||
self.p = None
|
self.p = None
|
||||||
self._args = args
|
self._args = args
|
||||||
testing = location.host == "__testsuite__"
|
testing = location.host == "__testsuite__"
|
||||||
|
@ -597,51 +506,24 @@ class RemoteRepository:
|
||||||
version = self.call("negotiate", {"client_data": {"client_version": BORG_VERSION}})
|
version = self.call("negotiate", {"client_data": {"client_version": BORG_VERSION}})
|
||||||
except ConnectionClosed:
|
except ConnectionClosed:
|
||||||
raise ConnectionClosedWithHint("Is borg working on the server?") from None
|
raise ConnectionClosedWithHint("Is borg working on the server?") from None
|
||||||
if version == RPC_PROTOCOL_VERSION:
|
if isinstance(version, dict):
|
||||||
self.dictFormat = False
|
|
||||||
elif isinstance(version, dict) and "server_version" in version:
|
|
||||||
self.dictFormat = True
|
|
||||||
self.server_version = version["server_version"]
|
self.server_version = version["server_version"]
|
||||||
else:
|
else:
|
||||||
raise Exception("Server insisted on using unsupported protocol version %s" % version)
|
raise Exception("Server insisted on using unsupported protocol version %s" % version)
|
||||||
|
|
||||||
def do_open():
|
self.id = self.open(
|
||||||
self.id = self.open(
|
path=self.location.path,
|
||||||
path=self.location.path,
|
create=create,
|
||||||
create=create,
|
lock_wait=lock_wait,
|
||||||
lock_wait=lock_wait,
|
lock=lock,
|
||||||
lock=lock,
|
exclusive=exclusive,
|
||||||
exclusive=exclusive,
|
append_only=append_only,
|
||||||
append_only=append_only,
|
make_parent_dirs=make_parent_dirs,
|
||||||
make_parent_dirs=make_parent_dirs,
|
)
|
||||||
)
|
info = self.info()
|
||||||
info = self.info()
|
self.version = info["version"]
|
||||||
self.version = info["version"]
|
self.append_only = info["append_only"]
|
||||||
self.append_only = info["append_only"]
|
|
||||||
|
|
||||||
if self.dictFormat:
|
|
||||||
do_open()
|
|
||||||
else:
|
|
||||||
# Ugly detection of versions prior to 1.0.7: If open throws it has to be 1.0.6 or lower
|
|
||||||
try:
|
|
||||||
do_open()
|
|
||||||
except self.RPCError as err:
|
|
||||||
if err.exception_class != "TypeError":
|
|
||||||
raise
|
|
||||||
msg = """\
|
|
||||||
Please note:
|
|
||||||
If you see a TypeError complaining about the number of positional arguments
|
|
||||||
given to open(), you can ignore it if it comes from a borg version < 1.0.7.
|
|
||||||
This TypeError is a cosmetic side effect of the compatibility code borg
|
|
||||||
clients >= 1.0.7 have to support older borg servers.
|
|
||||||
This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
|
||||||
"""
|
|
||||||
# emit this msg in the same way as the 'Remote: ...' lines that show the remote TypeError
|
|
||||||
sys.stderr.write(msg)
|
|
||||||
self.server_version = parse_version("1.0.6")
|
|
||||||
compatMap["open"] = ("path", "create", "lock_wait", "lock")
|
|
||||||
# try again with corrected version and compatMap
|
|
||||||
do_open()
|
|
||||||
except Exception:
|
except Exception:
|
||||||
self.close()
|
self.close()
|
||||||
raise
|
raise
|
||||||
|
@ -738,9 +620,6 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
||||||
args.append("%s" % location.host)
|
args.append("%s" % location.host)
|
||||||
return args
|
return args
|
||||||
|
|
||||||
def named_to_positional(self, method, kwargs):
|
|
||||||
return [kwargs[name] for name in compatMap[method]]
|
|
||||||
|
|
||||||
def call(self, cmd, args, **kw):
|
def call(self, cmd, args, **kw):
|
||||||
for resp in self.call_many(cmd, [args], **kw):
|
for resp in self.call_many(cmd, [args], **kw):
|
||||||
return resp
|
return resp
|
||||||
|
@ -769,9 +648,11 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
||||||
return msgid
|
return msgid
|
||||||
|
|
||||||
def handle_error(unpacked):
|
def handle_error(unpacked):
|
||||||
|
if "exception_class" not in unpacked:
|
||||||
|
return
|
||||||
|
|
||||||
error = unpacked["exception_class"]
|
error = unpacked["exception_class"]
|
||||||
old_server = "exception_args" not in unpacked
|
args = unpacked["exception_args"]
|
||||||
args = unpacked.get("exception_args")
|
|
||||||
|
|
||||||
if error == "DoesNotExist":
|
if error == "DoesNotExist":
|
||||||
raise Repository.DoesNotExist(self.location.processed)
|
raise Repository.DoesNotExist(self.location.processed)
|
||||||
|
@ -780,27 +661,15 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
||||||
elif error == "CheckNeeded":
|
elif error == "CheckNeeded":
|
||||||
raise Repository.CheckNeeded(self.location.processed)
|
raise Repository.CheckNeeded(self.location.processed)
|
||||||
elif error == "IntegrityError":
|
elif error == "IntegrityError":
|
||||||
if old_server:
|
raise IntegrityError(args[0])
|
||||||
raise IntegrityError("(not available)")
|
|
||||||
else:
|
|
||||||
raise IntegrityError(args[0])
|
|
||||||
elif error == "PathNotAllowed":
|
elif error == "PathNotAllowed":
|
||||||
if old_server:
|
raise PathNotAllowed(args[0])
|
||||||
raise PathNotAllowed("(unknown)")
|
|
||||||
else:
|
|
||||||
raise PathNotAllowed(args[0])
|
|
||||||
elif error == "ParentPathDoesNotExist":
|
elif error == "ParentPathDoesNotExist":
|
||||||
raise Repository.ParentPathDoesNotExist(args[0])
|
raise Repository.ParentPathDoesNotExist(args[0])
|
||||||
elif error == "ObjectNotFound":
|
elif error == "ObjectNotFound":
|
||||||
if old_server:
|
raise Repository.ObjectNotFound(args[0], self.location.processed)
|
||||||
raise Repository.ObjectNotFound("(not available)", self.location.processed)
|
|
||||||
else:
|
|
||||||
raise Repository.ObjectNotFound(args[0], self.location.processed)
|
|
||||||
elif error == "InvalidRPCMethod":
|
elif error == "InvalidRPCMethod":
|
||||||
if old_server:
|
raise InvalidRPCMethod(args[0])
|
||||||
raise InvalidRPCMethod("(not available)")
|
|
||||||
else:
|
|
||||||
raise InvalidRPCMethod(args[0])
|
|
||||||
else:
|
else:
|
||||||
raise self.RPCError(unpacked)
|
raise self.RPCError(unpacked)
|
||||||
|
|
||||||
|
@ -822,12 +691,10 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
||||||
try:
|
try:
|
||||||
unpacked = self.responses.pop(waiting_for[0])
|
unpacked = self.responses.pop(waiting_for[0])
|
||||||
waiting_for.pop(0)
|
waiting_for.pop(0)
|
||||||
if "exception_class" in unpacked:
|
handle_error(unpacked)
|
||||||
handle_error(unpacked)
|
yield unpacked[RESULT]
|
||||||
else:
|
if not waiting_for and not calls:
|
||||||
yield unpacked[RESULT]
|
return
|
||||||
if not waiting_for and not calls:
|
|
||||||
return
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
break
|
break
|
||||||
if cmd == "async_responses":
|
if cmd == "async_responses":
|
||||||
|
@ -842,10 +709,8 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
if "exception_class" in unpacked:
|
handle_error(unpacked)
|
||||||
handle_error(unpacked)
|
yield unpacked[RESULT]
|
||||||
else:
|
|
||||||
yield unpacked[RESULT]
|
|
||||||
if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
|
if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
|
||||||
w_fds = [self.stdin_fd]
|
w_fds = [self.stdin_fd]
|
||||||
else:
|
else:
|
||||||
|
@ -863,14 +728,6 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
||||||
for unpacked in self.unpacker:
|
for unpacked in self.unpacker:
|
||||||
if isinstance(unpacked, dict):
|
if isinstance(unpacked, dict):
|
||||||
msgid = unpacked[MSGID]
|
msgid = unpacked[MSGID]
|
||||||
elif isinstance(unpacked, tuple) and len(unpacked) == 4:
|
|
||||||
# The first field 'type' was always 1 and has always been ignored
|
|
||||||
_, msgid, error, res = unpacked
|
|
||||||
if error:
|
|
||||||
# ignore res, because it is only a fixed string anyway.
|
|
||||||
unpacked = {MSGID: msgid, "exception_class": error}
|
|
||||||
else:
|
|
||||||
unpacked = {MSGID: msgid, RESULT: res}
|
|
||||||
else:
|
else:
|
||||||
raise UnexpectedRPCDataFormatFromServer(data)
|
raise UnexpectedRPCDataFormatFromServer(data)
|
||||||
if msgid in self.ignore_responses:
|
if msgid in self.ignore_responses:
|
||||||
|
@ -918,23 +775,13 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
||||||
else:
|
else:
|
||||||
self.msgid += 1
|
self.msgid += 1
|
||||||
waiting_for.append(self.msgid)
|
waiting_for.append(self.msgid)
|
||||||
if self.dictFormat:
|
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
|
||||||
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
|
|
||||||
else:
|
|
||||||
self.to_send.push_back(
|
|
||||||
msgpack.packb((1, self.msgid, cmd, self.named_to_positional(cmd, args)))
|
|
||||||
)
|
|
||||||
if not self.to_send and self.preload_ids:
|
if not self.to_send and self.preload_ids:
|
||||||
chunk_id = self.preload_ids.pop(0)
|
chunk_id = self.preload_ids.pop(0)
|
||||||
args = {"id": chunk_id}
|
args = {"id": chunk_id}
|
||||||
self.msgid += 1
|
self.msgid += 1
|
||||||
self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid)
|
self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid)
|
||||||
if self.dictFormat:
|
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args}))
|
||||||
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args}))
|
|
||||||
else:
|
|
||||||
self.to_send.push_back(
|
|
||||||
msgpack.packb((1, self.msgid, "get", self.named_to_positional("get", args)))
|
|
||||||
)
|
|
||||||
|
|
||||||
send_buffer()
|
send_buffer()
|
||||||
self.ignore_responses |= set(waiting_for) # we lose order here
|
self.ignore_responses |= set(waiting_for) # we lose order here
|
||||||
|
@ -1047,8 +894,6 @@ def handle_remote_line(line):
|
||||||
"""
|
"""
|
||||||
assert line.endswith(("\r", "\n"))
|
assert line.endswith(("\r", "\n"))
|
||||||
if line.startswith("{"):
|
if line.startswith("{"):
|
||||||
# This format is used by Borg since 1.1.0b6 for new-protocol clients.
|
|
||||||
# It is the same format that is exposed by --log-json.
|
|
||||||
msg = json.loads(line)
|
msg = json.loads(line)
|
||||||
|
|
||||||
if msg["type"] not in ("progress_message", "progress_percent", "log_message"):
|
if msg["type"] not in ("progress_message", "progress_percent", "log_message"):
|
||||||
|
@ -1087,30 +932,9 @@ def handle_remote_line(line):
|
||||||
# so that the next message, progress or not, overwrites it. This mirrors the behaviour
|
# so that the next message, progress or not, overwrites it. This mirrors the behaviour
|
||||||
# of local progress displays.
|
# of local progress displays.
|
||||||
sys.stderr.write("Remote: " + msg["message"] + "\r")
|
sys.stderr.write("Remote: " + msg["message"] + "\r")
|
||||||
elif line.startswith("$LOG "):
|
|
||||||
# This format is used by borg serve 0.xx, 1.0.x and 1.1.0b1..b5.
|
|
||||||
# It prefixed log lines with $LOG as a marker, followed by the log level
|
|
||||||
# and optionally a logger name, then "Remote:" as a separator followed by the original
|
|
||||||
# message.
|
|
||||||
_, level, msg = line.split(" ", 2)
|
|
||||||
level = getattr(logging, level, logging.CRITICAL) # str -> int
|
|
||||||
if msg.startswith("Remote:"):
|
|
||||||
# server format: '$LOG <level> Remote: <msg>'
|
|
||||||
logging.log(level, msg.rstrip())
|
|
||||||
else:
|
|
||||||
# server format '$LOG <level> <logname> Remote: <msg>'
|
|
||||||
logname, msg = msg.split(" ", 1)
|
|
||||||
logging.getLogger(logname).log(level, msg.rstrip())
|
|
||||||
else:
|
else:
|
||||||
# Plain 1.0.x and older format - re-emit to stderr (mirroring what the 1.0.x
|
|
||||||
# client did) or as a generic log message.
|
|
||||||
# We don't know what priority the line had.
|
# We don't know what priority the line had.
|
||||||
if logging.getLogger("borg").json:
|
logging.getLogger("").warning("stderr/remote: " + line.strip())
|
||||||
logging.getLogger("").warning("Remote: " + line.strip())
|
|
||||||
else:
|
|
||||||
# In non-JSON mode we circumvent logging to preserve carriage returns (\r)
|
|
||||||
# which are generated by remote progress displays.
|
|
||||||
sys.stderr.write("Remote: " + line)
|
|
||||||
|
|
||||||
|
|
||||||
class RepositoryNoCache:
|
class RepositoryNoCache:
|
||||||
|
|
|
@ -1047,34 +1047,6 @@ class RemoteRepositoryTestCase(RepositoryTestCase):
|
||||||
assert self.repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "-i", "foo", "example.com"]
|
assert self.repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "-i", "foo", "example.com"]
|
||||||
|
|
||||||
|
|
||||||
class RemoteLegacyFree(RepositoryTestCaseBase):
|
|
||||||
# Keep testing this so we can someday safely remove the legacy tuple format.
|
|
||||||
|
|
||||||
def open(self, create=False):
|
|
||||||
with patch.object(RemoteRepository, "dictFormat", True):
|
|
||||||
return RemoteRepository(
|
|
||||||
Location("ssh://__testsuite__" + os.path.join(self.tmppath, "repository")),
|
|
||||||
exclusive=True,
|
|
||||||
create=create,
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_legacy_free(self):
|
|
||||||
# put
|
|
||||||
self.repository.put(H(0), fchunk(b"foo"))
|
|
||||||
self.repository.commit(compact=False)
|
|
||||||
self.repository.close()
|
|
||||||
# replace
|
|
||||||
self.repository = self.open()
|
|
||||||
with self.repository:
|
|
||||||
self.repository.put(H(0), fchunk(b"bar"))
|
|
||||||
self.repository.commit(compact=False)
|
|
||||||
# delete
|
|
||||||
self.repository = self.open()
|
|
||||||
with self.repository:
|
|
||||||
self.repository.delete(H(0))
|
|
||||||
self.repository.commit(compact=False)
|
|
||||||
|
|
||||||
|
|
||||||
class RemoteRepositoryCheckTestCase(RepositoryCheckTestCase):
|
class RemoteRepositoryCheckTestCase(RepositoryCheckTestCase):
|
||||||
def open(self, create=False):
|
def open(self, create=False):
|
||||||
return RemoteRepository(
|
return RemoteRepository(
|
||||||
|
@ -1111,29 +1083,18 @@ class RemoteLoggerTestCase(BaseTestCase):
|
||||||
|
|
||||||
def test_stderr_messages(self):
|
def test_stderr_messages(self):
|
||||||
handle_remote_line("unstructured stderr message\n")
|
handle_remote_line("unstructured stderr message\n")
|
||||||
self.assert_equal(self.stream.getvalue(), "")
|
self.assert_equal(self.stream.getvalue(), "stderr/remote: unstructured stderr message\n")
|
||||||
# stderr messages don't get an implicit newline
|
|
||||||
self.assert_equal(self.stderr.getvalue(), "Remote: unstructured stderr message\n")
|
|
||||||
|
|
||||||
def test_stderr_progress_messages(self):
|
|
||||||
handle_remote_line("unstructured stderr progress message\r")
|
|
||||||
self.assert_equal(self.stream.getvalue(), "")
|
|
||||||
# stderr messages don't get an implicit newline
|
|
||||||
self.assert_equal(self.stderr.getvalue(), "Remote: unstructured stderr progress message\r")
|
|
||||||
|
|
||||||
def test_pre11_format_messages(self):
|
|
||||||
self.handler.setLevel(logging.DEBUG)
|
|
||||||
logging.getLogger().setLevel(logging.DEBUG)
|
|
||||||
|
|
||||||
handle_remote_line("$LOG INFO Remote: borg < 1.1 format message\n")
|
|
||||||
self.assert_equal(self.stream.getvalue(), "Remote: borg < 1.1 format message\n")
|
|
||||||
self.assert_equal(self.stderr.getvalue(), "")
|
self.assert_equal(self.stderr.getvalue(), "")
|
||||||
|
|
||||||
def test_post11_format_messages(self):
|
def test_post11_format_messages(self):
|
||||||
self.handler.setLevel(logging.DEBUG)
|
self.handler.setLevel(logging.DEBUG)
|
||||||
logging.getLogger().setLevel(logging.DEBUG)
|
logging.getLogger().setLevel(logging.DEBUG)
|
||||||
|
|
||||||
handle_remote_line("$LOG INFO borg.repository Remote: borg >= 1.1 format message\n")
|
msg = (
|
||||||
|
"""{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,"""
|
||||||
|
""" "message": "borg >= 1.1 format message"}\n"""
|
||||||
|
)
|
||||||
|
handle_remote_line(msg)
|
||||||
self.assert_equal(self.stream.getvalue(), "Remote: borg >= 1.1 format message\n")
|
self.assert_equal(self.stream.getvalue(), "Remote: borg >= 1.1 format message\n")
|
||||||
self.assert_equal(self.stderr.getvalue(), "")
|
self.assert_equal(self.stderr.getvalue(), "")
|
||||||
|
|
||||||
|
@ -1142,7 +1103,11 @@ class RemoteLoggerTestCase(BaseTestCase):
|
||||||
self.handler.setLevel(logging.WARNING)
|
self.handler.setLevel(logging.WARNING)
|
||||||
logging.getLogger().setLevel(logging.WARNING)
|
logging.getLogger().setLevel(logging.WARNING)
|
||||||
|
|
||||||
handle_remote_line("$LOG INFO borg.repository Remote: new format info message\n")
|
msg = (
|
||||||
|
"""{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,"""
|
||||||
|
""" "message": "new format info message"}\n"""
|
||||||
|
)
|
||||||
|
handle_remote_line(msg)
|
||||||
self.assert_equal(self.stream.getvalue(), "")
|
self.assert_equal(self.stream.getvalue(), "")
|
||||||
self.assert_equal(self.stderr.getvalue(), "")
|
self.assert_equal(self.stderr.getvalue(), "")
|
||||||
|
|
||||||
|
@ -1162,7 +1127,11 @@ class RemoteLoggerTestCase(BaseTestCase):
|
||||||
foo_handler.setLevel(logging.INFO)
|
foo_handler.setLevel(logging.INFO)
|
||||||
logging.getLogger("borg.repository.foo").handlers[:] = [foo_handler]
|
logging.getLogger("borg.repository.foo").handlers[:] = [foo_handler]
|
||||||
|
|
||||||
handle_remote_line("$LOG INFO borg.repository Remote: new format child message\n")
|
msg = (
|
||||||
|
"""{"type": "log_message", "levelname": "INFO", "name": "borg.repository", "msgid": 42,"""
|
||||||
|
""" "message": "new format child message"}\n"""
|
||||||
|
)
|
||||||
|
handle_remote_line(msg)
|
||||||
self.assert_equal(foo_stream.getvalue(), "")
|
self.assert_equal(foo_stream.getvalue(), "")
|
||||||
self.assert_equal(child_stream.getvalue(), "Remote: new format child message\n")
|
self.assert_equal(child_stream.getvalue(), "Remote: new format child message\n")
|
||||||
self.assert_equal(self.stream.getvalue(), "")
|
self.assert_equal(self.stream.getvalue(), "")
|
||||||
|
|
Loading…
Reference in New Issue