1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2025-01-01 12:45:34 +00:00

remote repos: remove support for borg < 1.1.0 (rpc data format, version)

rpc format:
ancient borg used tuples in the rpc protocol,
but recent ones use easier-to-work-with dicts.

version info:
we expect dicts with server/client version now.
This commit is contained in:
Thomas Waldmann 2023-05-23 23:52:36 +02:00
parent 0ba40c16fb
commit 591d8efac4
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01
2 changed files with 58 additions and 213 deletions

View file

@ -36,7 +36,6 @@
logger = create_logger(__name__)
RPC_PROTOCOL_VERSION = 2
BORG_VERSION = parse_version(__version__)
MSGID, MSG, ARGS, RESULT = "i", "m", "a", "r"
@ -101,12 +100,7 @@ def __init__(self, data):
# In general the server is responsible for rejecting too old clients and the client it responsible for rejecting
# too old servers. This ensures that the knowledge what is compatible is always held by the newer component.
#
# The server can do checks for the client version in RepositoryServer.negotiate. If the client_data is 2 then
# client is in the version range [0.29.0, 1.0.x] inclusive. For newer clients client_data is a dict which contains
# client_version.
#
# For the client the return of the negotiate method is either 2 if the server is in the version range [0.29.0, 1.0.x]
# inclusive, or it is a dict which includes the server version.
# For the client the return of the negotiate method is a dict which includes the server version.
#
# All method calls on the remote repository object must be allowlisted in RepositoryServer.rpc_methods and have api
# stubs in RemoteRepository. The @api decorator on these stubs is used to set server version requirements.
@ -118,25 +112,6 @@ def __init__(self, data):
# servers still get compatible input.
compatMap = {
"check": ("repair",),
"commit": (),
"rollback": (),
"destroy": (),
"__len__": (),
"list": ("limit", "marker"),
"put": ("id", "data"),
"get": ("id",),
"delete": ("id",),
"save_key": ("keydata",),
"load_key": (),
"break_lock": (),
"negotiate": ("client_data",),
"open": ("path", "create", "lock_wait", "lock", "exclusive", "append_only"),
"info": (),
}
class RepositoryServer: # pragma: no cover
rpc_methods = (
"__len__",
@ -170,21 +145,7 @@ def __init__(self, restrict_to_paths, restrict_to_repositories, append_only, sto
# (see RepositoryServer.open below).
self.append_only = append_only
self.storage_quota = storage_quota
self.client_version = parse_version(
"1.0.8"
) # fallback version if client is too old to send version information
def positional_to_named(self, method, argv):
"""Translate from positional protocol to named protocol."""
try:
return {name: argv[pos] for pos, name in enumerate(compatMap[method])}
except IndexError:
if method == "open" and len(argv) == 4:
# borg clients < 1.0.7 use open() with 4 args
mapping = compatMap[method][:4]
else:
raise
return {name: argv[pos] for pos, name in enumerate(mapping)}
self.client_version = None # we update this after client sends version information
def filter_args(self, f, kwargs):
"""Remove unknown named parameters from call, because client did (implicitly) say it's ok."""
@ -217,15 +178,9 @@ def serve(self):
unpacker.feed(data)
for unpacked in unpacker:
if isinstance(unpacked, dict):
dictFormat = True
msgid = unpacked[MSGID]
method = unpacked[MSG]
args = unpacked[ARGS]
elif isinstance(unpacked, tuple) and len(unpacked) == 4:
dictFormat = False
# The first field 'type' was always 1 and has always been ignored
_, msgid, method, args = unpacked
args = self.positional_to_named(method, args)
else:
if self.repository is not None:
self.repository.close()
@ -240,82 +195,53 @@ def serve(self):
args = self.filter_args(f, args)
res = f(**args)
except BaseException as e:
if dictFormat:
ex_short = traceback.format_exception_only(e.__class__, e)
ex_full = traceback.format_exception(*sys.exc_info())
ex_trace = True
if isinstance(e, Error):
ex_short = [e.get_message()]
ex_trace = e.traceback
if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
# These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
# and will be handled just like locally raised exceptions. Suppress the remote traceback
# for these, except ErrorWithTraceback, which should always display a traceback.
pass
else:
logging.debug("\n".join(ex_full))
try:
msg = msgpack.packb(
{
MSGID: msgid,
"exception_class": e.__class__.__name__,
"exception_args": e.args,
"exception_full": ex_full,
"exception_short": ex_short,
"exception_trace": ex_trace,
"sysinfo": sysinfo(),
}
)
except TypeError:
msg = msgpack.packb(
{
MSGID: msgid,
"exception_class": e.__class__.__name__,
"exception_args": [
x if isinstance(x, (str, bytes, int)) else None for x in e.args
],
"exception_full": ex_full,
"exception_short": ex_short,
"exception_trace": ex_trace,
"sysinfo": sysinfo(),
}
)
os_write(stdout_fd, msg)
ex_short = traceback.format_exception_only(e.__class__, e)
ex_full = traceback.format_exception(*sys.exc_info())
ex_trace = True
if isinstance(e, Error):
ex_short = [e.get_message()]
ex_trace = e.traceback
if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
# These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
# and will be handled just like locally raised exceptions. Suppress the remote traceback
# for these, except ErrorWithTraceback, which should always display a traceback.
pass
else:
if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
# These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
# and will be handled just like locally raised exceptions. Suppress the remote traceback
# for these, except ErrorWithTraceback, which should always display a traceback.
pass
else:
if isinstance(e, Error):
tb_log_level = logging.ERROR if e.traceback else logging.DEBUG
msg = e.get_message()
else:
tb_log_level = logging.ERROR
msg = "%s Exception in RPC call" % e.__class__.__name__
tb = f"{traceback.format_exc()}\n{sysinfo()}"
logging.error(msg)
logging.log(tb_log_level, tb)
exc = "Remote Exception (see remote log for the traceback)"
os_write(stdout_fd, msgpack.packb((1, msgid, e.__class__.__name__, exc)))
logging.debug("\n".join(ex_full))
try:
msg = msgpack.packb(
{
MSGID: msgid,
"exception_class": e.__class__.__name__,
"exception_args": e.args,
"exception_full": ex_full,
"exception_short": ex_short,
"exception_trace": ex_trace,
"sysinfo": sysinfo(),
}
)
except TypeError:
msg = msgpack.packb(
{
MSGID: msgid,
"exception_class": e.__class__.__name__,
"exception_args": [x if isinstance(x, (str, bytes, int)) else None for x in e.args],
"exception_full": ex_full,
"exception_short": ex_short,
"exception_trace": ex_trace,
"sysinfo": sysinfo(),
}
)
os_write(stdout_fd, msg)
else:
if dictFormat:
os_write(stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res}))
else:
os_write(stdout_fd, msgpack.packb((1, msgid, None, res)))
os_write(stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res}))
if es:
self.repository.close()
return
def negotiate(self, client_data):
# old format used in 1.0.x
if client_data == RPC_PROTOCOL_VERSION:
return RPC_PROTOCOL_VERSION
# clients since 1.1.0b3 use a dict as client_data
# clients since 1.1.0b6 support json log format from server
if isinstance(client_data, dict):
self.client_version = client_data["client_version"]
level = logging.getLevelName(logging.getLogger("").level)
@ -538,9 +464,6 @@ def method(self):
def required_version(self):
return self.args[1]
# If compatibility with 1.0.x is not longer needed, replace all checks of this with True and simplify the code
dictFormat = False # outside of __init__ for testing of legacy free protocol
def __init__(
self,
location,
@ -567,9 +490,7 @@ def __init__(
self.ratelimit = SleepingBandwidthLimiter(args.upload_ratelimit * 1024 if args and args.upload_ratelimit else 0)
self.upload_buffer_size_limit = args.upload_buffer * 1024 * 1024 if args and args.upload_buffer else 0
self.unpacker = get_limited_unpacker("client")
self.server_version = parse_version(
"1.0.8"
) # fallback version if server is too old to send version information
self.server_version = None # we update this after server sends its version
self.p = None
self._args = args
testing = location.host == "__testsuite__"
@ -597,51 +518,24 @@ def __init__(
version = self.call("negotiate", {"client_data": {"client_version": BORG_VERSION}})
except ConnectionClosed:
raise ConnectionClosedWithHint("Is borg working on the server?") from None
if version == RPC_PROTOCOL_VERSION:
self.dictFormat = False
elif isinstance(version, dict) and "server_version" in version:
self.dictFormat = True
if isinstance(version, dict):
self.server_version = version["server_version"]
else:
raise Exception("Server insisted on using unsupported protocol version %s" % version)
def do_open():
self.id = self.open(
path=self.location.path,
create=create,
lock_wait=lock_wait,
lock=lock,
exclusive=exclusive,
append_only=append_only,
make_parent_dirs=make_parent_dirs,
)
info = self.info()
self.version = info["version"]
self.append_only = info["append_only"]
self.id = self.open(
path=self.location.path,
create=create,
lock_wait=lock_wait,
lock=lock,
exclusive=exclusive,
append_only=append_only,
make_parent_dirs=make_parent_dirs,
)
info = self.info()
self.version = info["version"]
self.append_only = info["append_only"]
if self.dictFormat:
do_open()
else:
# Ugly detection of versions prior to 1.0.7: If open throws it has to be 1.0.6 or lower
try:
do_open()
except self.RPCError as err:
if err.exception_class != "TypeError":
raise
msg = """\
Please note:
If you see a TypeError complaining about the number of positional arguments
given to open(), you can ignore it if it comes from a borg version < 1.0.7.
This TypeError is a cosmetic side effect of the compatibility code borg
clients >= 1.0.7 have to support older borg servers.
This problem will go away as soon as the server has been upgraded to 1.0.7+.
"""
# emit this msg in the same way as the 'Remote: ...' lines that show the remote TypeError
sys.stderr.write(msg)
self.server_version = parse_version("1.0.6")
compatMap["open"] = ("path", "create", "lock_wait", "lock")
# try again with corrected version and compatMap
do_open()
except Exception:
self.close()
raise
@ -738,9 +632,6 @@ def ssh_cmd(self, location):
args.append("%s" % location.host)
return args
def named_to_positional(self, method, kwargs):
return [kwargs[name] for name in compatMap[method]]
def call(self, cmd, args, **kw):
for resp in self.call_many(cmd, [args], **kw):
return resp
@ -863,14 +754,6 @@ def handle_error(unpacked):
for unpacked in self.unpacker:
if isinstance(unpacked, dict):
msgid = unpacked[MSGID]
elif isinstance(unpacked, tuple) and len(unpacked) == 4:
# The first field 'type' was always 1 and has always been ignored
_, msgid, error, res = unpacked
if error:
# ignore res, because it is only a fixed string anyway.
unpacked = {MSGID: msgid, "exception_class": error}
else:
unpacked = {MSGID: msgid, RESULT: res}
else:
raise UnexpectedRPCDataFormatFromServer(data)
if msgid in self.ignore_responses:
@ -918,23 +801,13 @@ def handle_error(unpacked):
else:
self.msgid += 1
waiting_for.append(self.msgid)
if self.dictFormat:
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
else:
self.to_send.push_back(
msgpack.packb((1, self.msgid, cmd, self.named_to_positional(cmd, args)))
)
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
if not self.to_send and self.preload_ids:
chunk_id = self.preload_ids.pop(0)
args = {"id": chunk_id}
self.msgid += 1
self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid)
if self.dictFormat:
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args}))
else:
self.to_send.push_back(
msgpack.packb((1, self.msgid, "get", self.named_to_positional("get", args)))
)
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args}))
send_buffer()
self.ignore_responses |= set(waiting_for) # we lose order here

View file

@ -1047,34 +1047,6 @@ def test_borg_cmd(self):
assert self.repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "-i", "foo", "example.com"]
class RemoteLegacyFree(RepositoryTestCaseBase):
# Keep testing this so we can someday safely remove the legacy tuple format.
def open(self, create=False):
with patch.object(RemoteRepository, "dictFormat", True):
return RemoteRepository(
Location("ssh://__testsuite__" + os.path.join(self.tmppath, "repository")),
exclusive=True,
create=create,
)
def test_legacy_free(self):
# put
self.repository.put(H(0), fchunk(b"foo"))
self.repository.commit(compact=False)
self.repository.close()
# replace
self.repository = self.open()
with self.repository:
self.repository.put(H(0), fchunk(b"bar"))
self.repository.commit(compact=False)
# delete
self.repository = self.open()
with self.repository:
self.repository.delete(H(0))
self.repository.commit(compact=False)
class RemoteRepositoryCheckTestCase(RepositoryCheckTestCase):
def open(self, create=False):
return RemoteRepository(