2014-06-13 18:07:01 +00:00
|
|
|
import errno
|
2016-11-10 08:56:18 +00:00
|
|
|
import functools
|
|
|
|
import inspect
|
2017-05-17 15:17:15 +00:00
|
|
|
import json
|
2015-12-12 17:25:38 +00:00
|
|
|
import logging
|
2010-11-15 21:18:47 +00:00
|
|
|
import os
|
|
|
|
import select
|
2015-10-05 22:54:00 +00:00
|
|
|
import shlex
|
2016-05-19 23:54:42 +00:00
|
|
|
import shutil
|
2017-05-28 11:16:52 +00:00
|
|
|
import struct
|
2010-11-15 21:18:47 +00:00
|
|
|
import sys
|
2014-03-13 21:29:47 +00:00
|
|
|
import tempfile
|
2017-01-12 14:01:41 +00:00
|
|
|
import textwrap
|
2017-01-14 02:07:11 +00:00
|
|
|
import time
|
2017-05-17 15:17:15 +00:00
|
|
|
import traceback
|
2016-05-30 23:18:03 +00:00
|
|
|
from subprocess import Popen, PIPE
|
2015-03-21 01:17:19 +00:00
|
|
|
|
2016-05-30 23:18:03 +00:00
|
|
|
from . import __version__
|
2019-07-14 09:39:58 +00:00
|
|
|
from .compress import Compressor
|
2017-06-23 03:56:41 +00:00
|
|
|
from .constants import * # NOQA
|
2016-05-30 22:33:13 +00:00
|
|
|
from .helpers import Error, IntegrityError
|
|
|
|
from .helpers import bin_to_hex
|
2017-12-21 05:18:49 +00:00
|
|
|
from .helpers import get_base_dir
|
2017-06-24 16:31:34 +00:00
|
|
|
from .helpers import get_limited_unpacker
|
2017-05-17 15:17:15 +00:00
|
|
|
from .helpers import replace_placeholders
|
|
|
|
from .helpers import sysinfo
|
2016-05-19 23:54:42 +00:00
|
|
|
from .helpers import format_file_size
|
2022-02-15 18:39:58 +00:00
|
|
|
from .helpers import safe_unlink
|
2017-09-23 17:04:46 +00:00
|
|
|
from .helpers import prepare_subprocess_env
|
2017-05-17 15:17:15 +00:00
|
|
|
from .logger import create_logger, setup_logging
|
2018-07-01 00:34:48 +00:00
|
|
|
from .helpers import msgpack
|
2017-06-24 16:31:34 +00:00
|
|
|
from .repository import Repository
|
2016-11-10 08:56:18 +00:00
|
|
|
from .version import parse_version, format_version
|
2022-03-16 23:24:49 +00:00
|
|
|
from .checksums import xxh64
|
2021-01-03 16:37:16 +00:00
|
|
|
from .helpers.datastruct import EfficientCollectionQueue
|
2017-01-12 14:01:41 +00:00
|
|
|
|
|
|
|
logger = create_logger(__name__)
|
2010-11-15 21:18:47 +00:00
|
|
|
|
2015-12-03 16:50:37 +00:00
|
|
|
RPC_PROTOCOL_VERSION = 2
|
2016-11-10 08:56:18 +00:00
|
|
|
BORG_VERSION = parse_version(__version__)
|
2022-05-06 01:59:10 +00:00
|
|
|
MSGID, MSG, ARGS, RESULT = 'i', 'm', 'a', 'r'
|
2015-12-03 16:50:37 +00:00
|
|
|
|
2016-05-25 22:14:27 +00:00
|
|
|
MAX_INFLIGHT = 100
|
|
|
|
|
2016-09-15 20:13:35 +00:00
|
|
|
RATELIMIT_PERIOD = 0.1
|
|
|
|
|
2011-07-05 19:29:15 +00:00
|
|
|
|
2017-01-14 02:07:11 +00:00
|
|
|
def os_write(fd, data):
|
|
|
|
"""os.write wrapper so we do not lose data for partial writes."""
|
2017-06-08 00:23:57 +00:00
|
|
|
# TODO: this issue is fixed in cygwin since at least 2.8.0, remove this
|
|
|
|
# wrapper / workaround when this version is considered ancient.
|
2017-01-14 02:07:11 +00:00
|
|
|
# This is happening frequently on cygwin due to its small pipe buffer size of only 64kiB
|
|
|
|
# and also due to its different blocking pipe behaviour compared to Linux/*BSD.
|
|
|
|
# Neither Linux nor *BSD ever do partial writes on blocking pipes, unless interrupted by a
|
|
|
|
# signal, in which case serve() would terminate.
|
|
|
|
amount = remaining = len(data)
|
|
|
|
while remaining:
|
|
|
|
count = os.write(fd, data)
|
|
|
|
remaining -= count
|
|
|
|
if not remaining:
|
|
|
|
break
|
|
|
|
data = data[count:]
|
|
|
|
time.sleep(count * 1e-09)
|
|
|
|
return amount
|
|
|
|
|
|
|
|
|
2013-12-15 19:35:29 +00:00
|
|
|
class ConnectionClosed(Error):
|
|
|
|
"""Connection closed by remote host"""
|
2013-06-28 11:31:57 +00:00
|
|
|
|
|
|
|
|
2015-10-31 21:41:08 +00:00
|
|
|
class ConnectionClosedWithHint(ConnectionClosed):
|
|
|
|
"""Connection closed by remote host. {}"""
|
|
|
|
|
|
|
|
|
2014-03-24 20:28:59 +00:00
|
|
|
class PathNotAllowed(Error):
|
2017-06-02 21:38:49 +00:00
|
|
|
"""Repository path not allowed: {}"""
|
2014-03-24 20:28:59 +00:00
|
|
|
|
2015-07-11 16:31:49 +00:00
|
|
|
|
2015-01-11 13:06:59 +00:00
|
|
|
class InvalidRPCMethod(Error):
|
2015-10-31 21:23:32 +00:00
|
|
|
"""RPC method {} is not valid"""
|
2014-03-24 20:28:59 +00:00
|
|
|
|
|
|
|
|
2016-08-22 17:50:53 +00:00
|
|
|
class UnexpectedRPCDataFormatFromClient(Error):
|
|
|
|
"""Borg {}: Got unexpected RPC data format from client."""
|
|
|
|
|
|
|
|
|
|
|
|
class UnexpectedRPCDataFormatFromServer(Error):
|
2017-01-12 14:01:41 +00:00
|
|
|
"""Got unexpected RPC data format from server:\n{}"""
|
|
|
|
|
|
|
|
def __init__(self, data):
|
|
|
|
try:
|
|
|
|
data = data.decode()[:128]
|
|
|
|
except UnicodeDecodeError:
|
|
|
|
data = data[:128]
|
|
|
|
data = ['%02X' % byte for byte in data]
|
|
|
|
data = textwrap.fill(' '.join(data), 16 * 3)
|
|
|
|
super().__init__(data)
|
2016-08-22 17:50:53 +00:00
|
|
|
|
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
# Protocol compatibility:
|
|
|
|
# In general the server is responsible for rejecting too old clients and the client it responsible for rejecting
|
|
|
|
# too old servers. This ensures that the knowledge what is compatible is always held by the newer component.
|
|
|
|
#
|
|
|
|
# The server can do checks for the client version in RepositoryServer.negotiate. If the client_data is 2 then
|
|
|
|
# client is in the version range [0.29.0, 1.0.x] inclusive. For newer clients client_data is a dict which contains
|
|
|
|
# client_version.
|
|
|
|
#
|
|
|
|
# For the client the return of the negotiate method is either 2 if the server is in the version range [0.29.0, 1.0.x]
|
|
|
|
# inclusive, or it is a dict which includes the server version.
|
|
|
|
#
|
2020-07-07 21:01:55 +00:00
|
|
|
# All method calls on the remote repository object must be allowlisted in RepositoryServer.rpc_methods and have api
|
2016-11-10 08:56:18 +00:00
|
|
|
# stubs in RemoteRepository. The @api decorator on these stubs is used to set server version requirements.
|
|
|
|
#
|
|
|
|
# Method parameters are identified only by name and never by position. Unknown parameters are ignored by the server side.
|
|
|
|
# If a new parameter is important and may not be ignored, on the client a parameter specific version requirement needs
|
|
|
|
# to be added.
|
|
|
|
# When parameters are removed, they need to be preserved as defaulted parameters on the client stubs so that older
|
|
|
|
# servers still get compatible input.
|
|
|
|
|
|
|
|
|
|
|
|
compatMap = {
|
|
|
|
'check': ('repair', 'save_space', ),
|
|
|
|
'commit': ('save_space', ),
|
|
|
|
'rollback': (),
|
|
|
|
'destroy': (),
|
|
|
|
'__len__': (),
|
|
|
|
'list': ('limit', 'marker', ),
|
|
|
|
'put': ('id', 'data', ),
|
2016-11-10 10:08:45 +00:00
|
|
|
'get': ('id', ),
|
2016-11-10 08:56:18 +00:00
|
|
|
'delete': ('id', ),
|
|
|
|
'save_key': ('keydata', ),
|
|
|
|
'load_key': (),
|
|
|
|
'break_lock': (),
|
|
|
|
'negotiate': ('client_data', ),
|
|
|
|
'open': ('path', 'create', 'lock_wait', 'lock', 'exclusive', 'append_only', ),
|
|
|
|
'get_free_nonce': (),
|
|
|
|
'commit_nonce_reservation': ('next_unreserved', 'start_nonce', ),
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2015-08-12 02:28:31 +00:00
|
|
|
class RepositoryServer: # pragma: no cover
|
2015-01-11 13:06:59 +00:00
|
|
|
rpc_methods = (
|
2015-07-11 16:31:49 +00:00
|
|
|
'__len__',
|
|
|
|
'check',
|
|
|
|
'commit',
|
|
|
|
'delete',
|
2015-07-26 15:38:16 +00:00
|
|
|
'destroy',
|
2015-07-11 16:31:49 +00:00
|
|
|
'get',
|
|
|
|
'list',
|
2016-09-23 19:45:01 +00:00
|
|
|
'scan',
|
2015-07-11 16:31:49 +00:00
|
|
|
'negotiate',
|
|
|
|
'open',
|
|
|
|
'put',
|
|
|
|
'rollback',
|
2015-07-14 22:01:07 +00:00
|
|
|
'save_key',
|
|
|
|
'load_key',
|
2015-11-21 19:50:53 +00:00
|
|
|
'break_lock',
|
2016-07-25 18:38:31 +00:00
|
|
|
'get_free_nonce',
|
2016-11-08 22:19:06 +00:00
|
|
|
'commit_nonce_reservation',
|
|
|
|
'inject_exception',
|
2015-07-11 16:31:49 +00:00
|
|
|
)
|
2010-11-15 21:18:47 +00:00
|
|
|
|
2017-06-02 21:38:02 +00:00
|
|
|
def __init__(self, restrict_to_paths, restrict_to_repositories, append_only, storage_quota):
|
2013-06-20 10:44:58 +00:00
|
|
|
self.repository = None
|
2014-03-24 20:28:59 +00:00
|
|
|
self.restrict_to_paths = restrict_to_paths
|
2017-06-02 21:38:02 +00:00
|
|
|
self.restrict_to_repositories = restrict_to_repositories
|
2017-05-12 18:34:45 +00:00
|
|
|
# This flag is parsed from the serve command line via Archiver.do_serve,
|
|
|
|
# i.e. it reflects local system policy and generally ranks higher than
|
|
|
|
# whatever the client wants, except when initializing a new repository
|
|
|
|
# (see RepositoryServer.open below).
|
2016-06-30 15:59:12 +00:00
|
|
|
self.append_only = append_only
|
2017-05-24 22:59:39 +00:00
|
|
|
self.storage_quota = storage_quota
|
2016-11-10 08:56:18 +00:00
|
|
|
self.client_version = parse_version('1.0.8') # fallback version if client is too old to send version information
|
|
|
|
|
|
|
|
def positional_to_named(self, method, argv):
|
|
|
|
"""Translate from positional protocol to named protocol."""
|
2017-10-20 14:47:32 +00:00
|
|
|
try:
|
|
|
|
return {name: argv[pos] for pos, name in enumerate(compatMap[method])}
|
|
|
|
except IndexError:
|
|
|
|
if method == 'open' and len(argv) == 4:
|
|
|
|
# borg clients < 1.0.7 use open() with 4 args
|
|
|
|
mapping = compatMap[method][:4]
|
|
|
|
else:
|
|
|
|
raise
|
|
|
|
return {name: argv[pos] for pos, name in enumerate(mapping)}
|
2016-11-10 08:56:18 +00:00
|
|
|
|
|
|
|
def filter_args(self, f, kwargs):
|
|
|
|
"""Remove unknown named parameters from call, because client did (implicitly) say it's ok."""
|
|
|
|
known = set(inspect.signature(f).parameters)
|
|
|
|
return {name: kwargs[name] for name in kwargs if name in known}
|
2010-11-15 21:18:47 +00:00
|
|
|
|
|
|
|
def serve(self):
|
2015-03-14 18:45:01 +00:00
|
|
|
stdin_fd = sys.stdin.fileno()
|
|
|
|
stdout_fd = sys.stdout.fileno()
|
2015-12-12 20:24:21 +00:00
|
|
|
stderr_fd = sys.stdout.fileno()
|
2017-07-29 18:37:14 +00:00
|
|
|
os.set_blocking(stdin_fd, False)
|
|
|
|
os.set_blocking(stdout_fd, True)
|
|
|
|
os.set_blocking(stderr_fd, True)
|
2017-02-17 04:00:37 +00:00
|
|
|
unpacker = get_limited_unpacker('server')
|
2010-11-15 21:18:47 +00:00
|
|
|
while True:
|
2015-03-14 18:45:01 +00:00
|
|
|
r, w, es = select.select([stdin_fd], [], [], 10)
|
2010-11-15 21:18:47 +00:00
|
|
|
if r:
|
2015-03-14 18:45:01 +00:00
|
|
|
data = os.read(stdin_fd, BUFSIZE)
|
2010-11-15 21:18:47 +00:00
|
|
|
if not data:
|
2016-08-22 17:48:39 +00:00
|
|
|
if self.repository is not None:
|
|
|
|
self.repository.close()
|
2016-08-22 18:22:02 +00:00
|
|
|
else:
|
2017-01-14 02:07:11 +00:00
|
|
|
os_write(stderr_fd, 'Borg {}: Got connection close before repository was opened.\n'
|
2016-08-22 18:22:02 +00:00
|
|
|
.format(__version__).encode())
|
2010-11-15 21:18:47 +00:00
|
|
|
return
|
|
|
|
unpacker.feed(data)
|
2015-03-09 20:59:10 +00:00
|
|
|
for unpacked in unpacker:
|
2016-11-10 08:56:18 +00:00
|
|
|
if isinstance(unpacked, dict):
|
|
|
|
dictFormat = True
|
2022-05-06 01:59:10 +00:00
|
|
|
msgid = unpacked[MSGID]
|
|
|
|
method = unpacked[MSG]
|
|
|
|
args = unpacked[ARGS]
|
2016-11-10 08:56:18 +00:00
|
|
|
elif isinstance(unpacked, tuple) and len(unpacked) == 4:
|
|
|
|
dictFormat = False
|
2016-11-15 22:18:21 +00:00
|
|
|
# The first field 'type' was always 1 and has always been ignored
|
|
|
|
_, msgid, method, args = unpacked
|
2016-11-10 08:56:18 +00:00
|
|
|
args = self.positional_to_named(method, args)
|
|
|
|
else:
|
2016-08-22 17:48:39 +00:00
|
|
|
if self.repository is not None:
|
|
|
|
self.repository.close()
|
2016-08-22 17:50:53 +00:00
|
|
|
raise UnexpectedRPCDataFormatFromClient(__version__)
|
2010-11-15 21:18:47 +00:00
|
|
|
try:
|
2015-07-11 16:31:49 +00:00
|
|
|
if method not in self.rpc_methods:
|
2015-01-11 13:06:59 +00:00
|
|
|
raise InvalidRPCMethod(method)
|
2010-11-17 21:40:39 +00:00
|
|
|
try:
|
|
|
|
f = getattr(self, method)
|
|
|
|
except AttributeError:
|
2013-06-20 10:44:58 +00:00
|
|
|
f = getattr(self.repository, method)
|
2016-11-10 08:56:18 +00:00
|
|
|
args = self.filter_args(f, args)
|
|
|
|
res = f(**args)
|
2015-03-21 01:17:19 +00:00
|
|
|
except BaseException as e:
|
2016-11-08 22:18:18 +00:00
|
|
|
if dictFormat:
|
|
|
|
ex_short = traceback.format_exception_only(e.__class__, e)
|
|
|
|
ex_full = traceback.format_exception(*sys.exc_info())
|
2017-05-31 16:48:48 +00:00
|
|
|
ex_trace = True
|
2016-08-20 15:23:02 +00:00
|
|
|
if isinstance(e, Error):
|
2017-05-17 14:27:52 +00:00
|
|
|
ex_short = [e.get_message()]
|
2017-05-31 16:48:48 +00:00
|
|
|
ex_trace = e.traceback
|
2016-11-08 22:18:18 +00:00
|
|
|
if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
|
|
|
|
# These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
|
|
|
|
# and will be handled just like locally raised exceptions. Suppress the remote traceback
|
|
|
|
# for these, except ErrorWithTraceback, which should always display a traceback.
|
|
|
|
pass
|
2016-08-20 15:23:02 +00:00
|
|
|
else:
|
2016-11-08 22:18:18 +00:00
|
|
|
logging.debug('\n'.join(ex_full))
|
|
|
|
|
|
|
|
try:
|
|
|
|
msg = msgpack.packb({MSGID: msgid,
|
2022-05-05 17:36:02 +00:00
|
|
|
'exception_class': e.__class__.__name__,
|
|
|
|
'exception_args': e.args,
|
|
|
|
'exception_full': ex_full,
|
|
|
|
'exception_short': ex_short,
|
|
|
|
'exception_trace': ex_trace,
|
|
|
|
'sysinfo': sysinfo()})
|
2016-11-08 22:18:18 +00:00
|
|
|
except TypeError:
|
|
|
|
msg = msgpack.packb({MSGID: msgid,
|
2022-05-05 17:36:02 +00:00
|
|
|
'exception_class': e.__class__.__name__,
|
|
|
|
'exception_args': [x if isinstance(x, (str, bytes, int)) else None
|
|
|
|
for x in e.args],
|
|
|
|
'exception_full': ex_full,
|
|
|
|
'exception_short': ex_short,
|
|
|
|
'exception_trace': ex_trace,
|
|
|
|
'sysinfo': sysinfo()})
|
2016-11-08 22:18:18 +00:00
|
|
|
|
2017-01-14 02:07:11 +00:00
|
|
|
os_write(stdout_fd, msg)
|
2016-11-10 08:56:18 +00:00
|
|
|
else:
|
2016-11-08 22:18:18 +00:00
|
|
|
if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
|
|
|
|
# These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
|
|
|
|
# and will be handled just like locally raised exceptions. Suppress the remote traceback
|
|
|
|
# for these, except ErrorWithTraceback, which should always display a traceback.
|
|
|
|
pass
|
|
|
|
else:
|
|
|
|
if isinstance(e, Error):
|
|
|
|
tb_log_level = logging.ERROR if e.traceback else logging.DEBUG
|
|
|
|
msg = e.get_message()
|
|
|
|
else:
|
|
|
|
tb_log_level = logging.ERROR
|
|
|
|
msg = '%s Exception in RPC call' % e.__class__.__name__
|
2022-02-27 18:31:33 +00:00
|
|
|
tb = f'{traceback.format_exc()}\n{sysinfo()}'
|
2016-11-08 22:18:18 +00:00
|
|
|
logging.error(msg)
|
|
|
|
logging.log(tb_log_level, tb)
|
|
|
|
exc = 'Remote Exception (see remote log for the traceback)'
|
2017-01-14 02:07:11 +00:00
|
|
|
os_write(stdout_fd, msgpack.packb((1, msgid, e.__class__.__name__, exc)))
|
2010-11-15 21:18:47 +00:00
|
|
|
else:
|
2016-11-10 08:56:18 +00:00
|
|
|
if dictFormat:
|
2017-01-14 02:07:11 +00:00
|
|
|
os_write(stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res}))
|
2016-11-10 08:56:18 +00:00
|
|
|
else:
|
2017-01-14 02:07:11 +00:00
|
|
|
os_write(stdout_fd, msgpack.packb((1, msgid, None, res)))
|
2010-11-15 21:18:47 +00:00
|
|
|
if es:
|
2016-03-22 23:41:15 +00:00
|
|
|
self.repository.close()
|
2010-11-15 21:18:47 +00:00
|
|
|
return
|
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
def negotiate(self, client_data):
|
|
|
|
# old format used in 1.0.x
|
|
|
|
if client_data == RPC_PROTOCOL_VERSION:
|
|
|
|
return RPC_PROTOCOL_VERSION
|
|
|
|
# clients since 1.1.0b3 use a dict as client_data
|
2017-09-21 02:11:59 +00:00
|
|
|
# clients since 1.1.0b6 support json log format from server
|
2016-11-10 08:56:18 +00:00
|
|
|
if isinstance(client_data, dict):
|
2022-05-06 01:59:10 +00:00
|
|
|
self.client_version = client_data['client_version']
|
2017-09-21 02:11:59 +00:00
|
|
|
level = logging.getLevelName(logging.getLogger('').level)
|
|
|
|
setup_logging(is_serve=True, json=True, level=level)
|
|
|
|
logger.debug('Initialized logging system for JSON-based protocol')
|
2016-11-10 08:56:18 +00:00
|
|
|
else:
|
|
|
|
self.client_version = BORG_VERSION # seems to be newer than current version (no known old format)
|
|
|
|
|
|
|
|
# not a known old format, send newest negotiate this version knows
|
|
|
|
return {'server_version': BORG_VERSION}
|
2011-09-12 19:51:23 +00:00
|
|
|
|
2016-12-17 13:33:40 +00:00
|
|
|
def _resolve_path(self, path):
|
2016-11-10 08:56:18 +00:00
|
|
|
if isinstance(path, bytes):
|
|
|
|
path = os.fsdecode(path)
|
2016-12-17 13:33:40 +00:00
|
|
|
# Leading slash is always present with URI (ssh://), but not with short-form (who@host:path).
|
|
|
|
if path.startswith('/~/'): # /~/x = path x relative to home dir
|
2017-12-21 05:18:49 +00:00
|
|
|
path = os.path.join(get_base_dir(), path[3:])
|
2016-12-17 13:33:40 +00:00
|
|
|
elif path.startswith('~/'):
|
2017-12-21 05:18:49 +00:00
|
|
|
path = os.path.join(get_base_dir(), path[2:])
|
2016-12-17 13:33:40 +00:00
|
|
|
elif path.startswith('/~'): # /~username/x = relative to "user" home dir
|
|
|
|
path = os.path.expanduser(path[1:])
|
|
|
|
elif path.startswith('~'):
|
|
|
|
path = os.path.expanduser(path)
|
2016-10-12 22:38:04 +00:00
|
|
|
elif path.startswith('/./'): # /./x = path x relative to cwd
|
|
|
|
path = path[3:]
|
2016-12-17 13:33:40 +00:00
|
|
|
return os.path.realpath(path)
|
|
|
|
|
2019-02-04 16:12:11 +00:00
|
|
|
def open(self, path, create=False, lock_wait=None, lock=True, exclusive=None, append_only=False,
|
|
|
|
make_parent_dirs=False):
|
2016-12-17 13:33:40 +00:00
|
|
|
logging.debug('Resolving repository path %r', path)
|
|
|
|
path = self._resolve_path(path)
|
|
|
|
logging.debug('Resolved repository path to %r', path)
|
2017-06-02 21:38:02 +00:00
|
|
|
path_with_sep = os.path.join(path, '') # make sure there is a trailing slash (os.sep)
|
2014-03-24 20:28:59 +00:00
|
|
|
if self.restrict_to_paths:
|
2016-08-02 13:50:21 +00:00
|
|
|
# if --restrict-to-path P is given, we make sure that we only operate in/below path P.
|
2017-06-09 14:49:30 +00:00
|
|
|
# for the prefix check, it is important that the compared paths both have trailing slashes,
|
2016-08-02 13:50:21 +00:00
|
|
|
# so that a path /foobar will NOT be accepted with --restrict-to-path /foo option.
|
2014-03-24 20:28:59 +00:00
|
|
|
for restrict_to_path in self.restrict_to_paths:
|
2016-08-02 13:50:21 +00:00
|
|
|
restrict_to_path_with_sep = os.path.join(os.path.realpath(restrict_to_path), '') # trailing slash
|
|
|
|
if path_with_sep.startswith(restrict_to_path_with_sep):
|
2014-03-24 20:28:59 +00:00
|
|
|
break
|
|
|
|
else:
|
|
|
|
raise PathNotAllowed(path)
|
2017-06-02 21:38:02 +00:00
|
|
|
if self.restrict_to_repositories:
|
|
|
|
for restrict_to_repository in self.restrict_to_repositories:
|
|
|
|
restrict_to_repository_with_sep = os.path.join(os.path.realpath(restrict_to_repository), '')
|
|
|
|
if restrict_to_repository_with_sep == path_with_sep:
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
raise PathNotAllowed(path)
|
2017-05-12 18:34:45 +00:00
|
|
|
# "borg init" on "borg serve --append-only" (=self.append_only) does not create an append only repo,
|
|
|
|
# while "borg init --append-only" (=append_only) does, regardless of the --append-only (self.append_only)
|
|
|
|
# flag for serve.
|
|
|
|
append_only = (not create and self.append_only) or append_only
|
2016-07-23 16:22:07 +00:00
|
|
|
self.repository = Repository(path, create, lock_wait=lock_wait, lock=lock,
|
2017-05-12 18:34:45 +00:00
|
|
|
append_only=append_only,
|
2017-05-24 22:59:39 +00:00
|
|
|
storage_quota=self.storage_quota,
|
2019-02-04 16:12:11 +00:00
|
|
|
exclusive=exclusive,
|
|
|
|
make_parent_dirs=make_parent_dirs)
|
2016-03-22 23:41:15 +00:00
|
|
|
self.repository.__enter__() # clean exit handled by serve() method
|
2013-06-20 10:44:58 +00:00
|
|
|
return self.repository.id
|
2010-11-17 21:40:39 +00:00
|
|
|
|
2016-11-08 22:19:06 +00:00
|
|
|
def inject_exception(self, kind):
|
|
|
|
s1 = 'test string'
|
|
|
|
s2 = 'test string2'
|
|
|
|
if kind == 'DoesNotExist':
|
|
|
|
raise Repository.DoesNotExist(s1)
|
|
|
|
elif kind == 'AlreadyExists':
|
|
|
|
raise Repository.AlreadyExists(s1)
|
|
|
|
elif kind == 'CheckNeeded':
|
|
|
|
raise Repository.CheckNeeded(s1)
|
|
|
|
elif kind == 'IntegrityError':
|
|
|
|
raise IntegrityError(s1)
|
|
|
|
elif kind == 'PathNotAllowed':
|
2017-06-02 21:38:49 +00:00
|
|
|
raise PathNotAllowed('foo')
|
2016-11-08 22:19:06 +00:00
|
|
|
elif kind == 'ObjectNotFound':
|
|
|
|
raise Repository.ObjectNotFound(s1, s2)
|
|
|
|
elif kind == 'InvalidRPCMethod':
|
|
|
|
raise InvalidRPCMethod(s1)
|
|
|
|
elif kind == 'divide':
|
|
|
|
0 // 0
|
|
|
|
|
2010-11-15 21:18:47 +00:00
|
|
|
|
2016-09-15 20:13:35 +00:00
|
|
|
class SleepingBandwidthLimiter:
|
|
|
|
def __init__(self, limit):
|
|
|
|
if limit:
|
|
|
|
self.ratelimit = int(limit * RATELIMIT_PERIOD)
|
|
|
|
self.ratelimit_last = time.monotonic()
|
|
|
|
self.ratelimit_quota = self.ratelimit
|
|
|
|
else:
|
|
|
|
self.ratelimit = None
|
|
|
|
|
|
|
|
def write(self, fd, to_send):
|
|
|
|
if self.ratelimit:
|
|
|
|
now = time.monotonic()
|
|
|
|
if self.ratelimit_last + RATELIMIT_PERIOD <= now:
|
|
|
|
self.ratelimit_quota += self.ratelimit
|
|
|
|
if self.ratelimit_quota > 2 * self.ratelimit:
|
|
|
|
self.ratelimit_quota = 2 * self.ratelimit
|
|
|
|
self.ratelimit_last = now
|
|
|
|
if self.ratelimit_quota == 0:
|
|
|
|
tosleep = self.ratelimit_last + RATELIMIT_PERIOD - now
|
|
|
|
time.sleep(tosleep)
|
|
|
|
self.ratelimit_quota += self.ratelimit
|
|
|
|
self.ratelimit_last = time.monotonic()
|
|
|
|
if len(to_send) > self.ratelimit_quota:
|
|
|
|
to_send = to_send[:self.ratelimit_quota]
|
|
|
|
written = os.write(fd, to_send)
|
|
|
|
if self.ratelimit:
|
|
|
|
self.ratelimit_quota -= written
|
|
|
|
return written
|
|
|
|
|
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
def api(*, since, **kwargs_decorator):
|
|
|
|
"""Check version requirements and use self.call to do the remote method call.
|
|
|
|
|
2019-11-03 13:09:33 +00:00
|
|
|
<since> specifies the version in which borg introduced this method.
|
|
|
|
Calling this method when connected to an older version will fail without transmitting anything to the server.
|
2016-11-10 08:56:18 +00:00
|
|
|
|
2019-11-03 13:09:33 +00:00
|
|
|
Further kwargs can be used to encode version specific restrictions:
|
2016-11-10 08:56:18 +00:00
|
|
|
|
2019-11-03 13:09:33 +00:00
|
|
|
<previously> is the value resulting in the behaviour before introducing the new parameter.
|
|
|
|
If a previous hardcoded behaviour is parameterized in a version, this allows calls that use the previously
|
|
|
|
hardcoded behaviour to pass through and generates an error if another behaviour is requested by the client.
|
|
|
|
E.g. when 'append_only' was introduced in 1.0.7 the previous behaviour was what now is append_only=False.
|
2016-11-10 08:56:18 +00:00
|
|
|
Thus @api(..., append_only={'since': parse_version('1.0.7'), 'previously': False}) allows calls
|
|
|
|
with append_only=False for all version but rejects calls using append_only=True on versions older than 1.0.7.
|
2019-11-03 13:09:33 +00:00
|
|
|
|
|
|
|
<dontcare> is a flag to set the behaviour if an old version is called the new way.
|
|
|
|
If set to True, the method is called without the (not yet supported) parameter (this should be done if that is the
|
|
|
|
more desirable behaviour). If False, an exception is generated.
|
|
|
|
E.g. before 'threshold' was introduced in 1.2.0a8, a hardcoded threshold of 0.1 was used in commit().
|
2016-11-10 08:56:18 +00:00
|
|
|
"""
|
|
|
|
def decorator(f):
|
|
|
|
@functools.wraps(f)
|
|
|
|
def do_rpc(self, *args, **kwargs):
|
|
|
|
sig = inspect.signature(f)
|
|
|
|
bound_args = sig.bind(self, *args, **kwargs)
|
2017-02-22 03:48:42 +00:00
|
|
|
named = {} # Arguments for the remote process
|
|
|
|
extra = {} # Arguments for the local process
|
2016-11-10 08:56:18 +00:00
|
|
|
for name, param in sig.parameters.items():
|
|
|
|
if name == 'self':
|
|
|
|
continue
|
|
|
|
if name in bound_args.arguments:
|
2017-02-22 03:48:42 +00:00
|
|
|
if name == 'wait':
|
|
|
|
extra[name] = bound_args.arguments[name]
|
|
|
|
else:
|
|
|
|
named[name] = bound_args.arguments[name]
|
2016-11-10 08:56:18 +00:00
|
|
|
else:
|
|
|
|
if param.default is not param.empty:
|
|
|
|
named[name] = param.default
|
|
|
|
|
|
|
|
if self.server_version < since:
|
|
|
|
raise self.RPCServerOutdated(f.__name__, format_version(since))
|
|
|
|
|
|
|
|
for name, restriction in kwargs_decorator.items():
|
|
|
|
if restriction['since'] <= self.server_version:
|
|
|
|
continue
|
|
|
|
if 'previously' in restriction and named[name] == restriction['previously']:
|
|
|
|
continue
|
2018-07-11 03:19:08 +00:00
|
|
|
if restriction.get('dontcare', False):
|
|
|
|
continue
|
2016-11-10 08:56:18 +00:00
|
|
|
|
2022-02-27 18:31:33 +00:00
|
|
|
raise self.RPCServerOutdated(f"{f.__name__} {name}={named[name]!s}",
|
2016-11-10 08:56:18 +00:00
|
|
|
format_version(restriction['since']))
|
|
|
|
|
2017-02-22 03:48:42 +00:00
|
|
|
return self.call(f.__name__, named, **extra)
|
2016-11-10 08:56:18 +00:00
|
|
|
return do_rpc
|
|
|
|
return decorator
|
|
|
|
|
|
|
|
|
2015-03-17 22:03:36 +00:00
|
|
|
class RemoteRepository:
|
2014-03-24 20:28:59 +00:00
|
|
|
extra_test_args = []
|
2010-11-15 21:18:47 +00:00
|
|
|
|
|
|
|
class RPCError(Exception):
|
2016-11-08 22:18:18 +00:00
|
|
|
def __init__(self, unpacked):
|
2022-05-06 01:59:10 +00:00
|
|
|
# for borg < 1.1: unpacked only has 'exception_class' as key
|
|
|
|
# for borg 1.1+: unpacked has keys: 'exception_args', 'exception_full', 'exception_short', 'sysinfo'
|
2016-11-08 22:18:18 +00:00
|
|
|
self.unpacked = unpacked
|
|
|
|
|
|
|
|
def get_message(self):
|
2022-05-06 01:59:10 +00:00
|
|
|
if 'exception_short' in self.unpacked:
|
|
|
|
return '\n'.join(self.unpacked['exception_short'])
|
2016-11-08 22:18:18 +00:00
|
|
|
else:
|
|
|
|
return self.exception_class
|
|
|
|
|
2017-05-31 16:48:48 +00:00
|
|
|
@property
|
|
|
|
def traceback(self):
|
2022-05-06 01:59:10 +00:00
|
|
|
return self.unpacked.get('exception_trace', True)
|
2017-05-31 16:48:48 +00:00
|
|
|
|
2016-11-08 22:18:18 +00:00
|
|
|
@property
|
|
|
|
def exception_class(self):
|
2022-05-06 01:59:10 +00:00
|
|
|
return self.unpacked['exception_class']
|
2016-11-08 22:18:18 +00:00
|
|
|
|
|
|
|
@property
|
|
|
|
def exception_full(self):
|
2022-05-06 01:59:10 +00:00
|
|
|
if 'exception_full' in self.unpacked:
|
|
|
|
return '\n'.join(self.unpacked['exception_full'])
|
2016-11-08 22:18:18 +00:00
|
|
|
else:
|
|
|
|
return self.get_message() + '\nRemote Exception (see remote log for the traceback)'
|
|
|
|
|
|
|
|
@property
|
|
|
|
def sysinfo(self):
|
2022-05-06 01:59:10 +00:00
|
|
|
if 'sysinfo' in self.unpacked:
|
|
|
|
return self.unpacked['sysinfo']
|
2016-11-08 22:18:18 +00:00
|
|
|
else:
|
|
|
|
return ''
|
2010-11-15 21:18:47 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
class RPCServerOutdated(Error):
|
|
|
|
"""Borg server is too old for {}. Required version {}"""
|
|
|
|
|
|
|
|
@property
|
|
|
|
def method(self):
|
|
|
|
return self.args[0]
|
|
|
|
|
|
|
|
@property
|
|
|
|
def required_version(self):
|
|
|
|
return self.args[1]
|
|
|
|
|
2016-11-10 11:12:32 +00:00
|
|
|
# If compatibility with 1.0.x is not longer needed, replace all checks of this with True and simplify the code
|
|
|
|
dictFormat = False # outside of __init__ for testing of legacy free protocol
|
|
|
|
|
2019-02-04 16:12:11 +00:00
|
|
|
def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock=True, append_only=False,
|
|
|
|
make_parent_dirs=False, args=None):
|
2015-12-19 13:30:05 +00:00
|
|
|
self.location = self._location = location
|
2014-01-22 19:58:48 +00:00
|
|
|
self.preload_ids = []
|
|
|
|
self.msgid = 0
|
2017-02-23 15:50:52 +00:00
|
|
|
self.rx_bytes = 0
|
|
|
|
self.tx_bytes = 0
|
2021-01-03 16:37:16 +00:00
|
|
|
self.to_send = EfficientCollectionQueue(1024 * 1024, bytes)
|
2017-09-24 23:15:13 +00:00
|
|
|
self.stderr_received = b'' # incomplete stderr line bytes received (no \n yet)
|
2016-08-05 19:51:01 +00:00
|
|
|
self.chunkid_to_msgids = {}
|
2014-01-22 19:58:48 +00:00
|
|
|
self.ignore_responses = set()
|
|
|
|
self.responses = {}
|
2017-03-05 04:19:32 +00:00
|
|
|
self.async_responses = {}
|
2017-03-21 22:44:47 +00:00
|
|
|
self.shutdown_time = None
|
2021-04-16 13:48:10 +00:00
|
|
|
self.ratelimit = SleepingBandwidthLimiter(args.upload_ratelimit * 1024 if args and args.upload_ratelimit else 0)
|
|
|
|
self.upload_buffer_size_limit = args.upload_buffer * 1024 * 1024 if args and args.upload_buffer else 0
|
2017-02-17 04:00:37 +00:00
|
|
|
self.unpacker = get_limited_unpacker('client')
|
2016-11-10 08:56:18 +00:00
|
|
|
self.server_version = parse_version('1.0.8') # fallback version if server is too old to send version information
|
2014-01-22 19:58:48 +00:00
|
|
|
self.p = None
|
2019-01-31 12:54:17 +00:00
|
|
|
self._args = args
|
2015-12-12 14:31:43 +00:00
|
|
|
testing = location.host == '__testsuite__'
|
2017-09-23 17:04:46 +00:00
|
|
|
# when testing, we invoke and talk to a borg process directly (no ssh).
|
|
|
|
# when not testing, we invoke the system-installed ssh binary to talk to a remote borg.
|
|
|
|
env = prepare_subprocess_env(system=not testing)
|
2015-12-12 14:31:43 +00:00
|
|
|
borg_cmd = self.borg_cmd(args, testing)
|
|
|
|
if not testing:
|
|
|
|
borg_cmd = self.ssh_cmd(location) + borg_cmd
|
2017-01-12 14:01:41 +00:00
|
|
|
logger.debug('SSH command line: %s', borg_cmd)
|
2016-01-11 01:08:58 +00:00
|
|
|
self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env)
|
2012-11-27 23:03:35 +00:00
|
|
|
self.stdin_fd = self.p.stdin.fileno()
|
2012-10-17 09:40:23 +00:00
|
|
|
self.stdout_fd = self.p.stdout.fileno()
|
2015-12-12 20:24:21 +00:00
|
|
|
self.stderr_fd = self.p.stderr.fileno()
|
2017-07-29 18:37:14 +00:00
|
|
|
os.set_blocking(self.stdin_fd, False)
|
|
|
|
os.set_blocking(self.stdout_fd, False)
|
|
|
|
os.set_blocking(self.stderr_fd, False)
|
2015-12-12 20:24:21 +00:00
|
|
|
self.r_fds = [self.stdout_fd, self.stderr_fd]
|
|
|
|
self.x_fds = [self.stdin_fd, self.stdout_fd, self.stderr_fd]
|
2012-11-27 23:03:35 +00:00
|
|
|
|
2015-06-18 21:18:05 +00:00
|
|
|
try:
|
2016-07-30 17:42:25 +00:00
|
|
|
try:
|
2017-05-17 18:49:52 +00:00
|
|
|
version = self.call('negotiate', {'client_data': {
|
2022-05-05 17:36:02 +00:00
|
|
|
'client_version': BORG_VERSION,
|
2017-05-17 18:49:52 +00:00
|
|
|
}})
|
2016-07-30 17:42:25 +00:00
|
|
|
except ConnectionClosed:
|
|
|
|
raise ConnectionClosedWithHint('Is borg working on the server?') from None
|
2016-11-10 08:56:18 +00:00
|
|
|
if version == RPC_PROTOCOL_VERSION:
|
|
|
|
self.dictFormat = False
|
2022-05-06 01:59:10 +00:00
|
|
|
elif isinstance(version, dict) and 'server_version' in version:
|
2016-11-10 08:56:18 +00:00
|
|
|
self.dictFormat = True
|
2022-05-06 01:59:10 +00:00
|
|
|
self.server_version = version['server_version']
|
2016-11-10 08:56:18 +00:00
|
|
|
else:
|
|
|
|
raise Exception('Server insisted on using unsupported protocol version %s' % version)
|
|
|
|
|
2016-07-31 19:19:30 +00:00
|
|
|
def do_open():
|
|
|
|
self.id = self.open(path=self.location.path, create=create, lock_wait=lock_wait,
|
2019-02-04 16:12:11 +00:00
|
|
|
lock=lock, exclusive=exclusive, append_only=append_only,
|
|
|
|
make_parent_dirs=make_parent_dirs)
|
2016-07-31 19:19:30 +00:00
|
|
|
|
|
|
|
if self.dictFormat:
|
|
|
|
do_open()
|
|
|
|
else:
|
|
|
|
# Ugly detection of versions prior to 1.0.7: If open throws it has to be 1.0.6 or lower
|
|
|
|
try:
|
|
|
|
do_open()
|
|
|
|
except self.RPCError as err:
|
2016-11-08 22:18:18 +00:00
|
|
|
if err.exception_class != 'TypeError':
|
2016-07-31 19:19:30 +00:00
|
|
|
raise
|
|
|
|
msg = """\
|
2016-08-09 15:35:27 +00:00
|
|
|
Please note:
|
|
|
|
If you see a TypeError complaining about the number of positional arguments
|
|
|
|
given to open(), you can ignore it if it comes from a borg version < 1.0.7.
|
|
|
|
This TypeError is a cosmetic side effect of the compatibility code borg
|
|
|
|
clients >= 1.0.7 have to support older borg servers.
|
|
|
|
This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
|
|
|
"""
|
2016-07-31 19:19:30 +00:00
|
|
|
# emit this msg in the same way as the 'Remote: ...' lines that show the remote TypeError
|
|
|
|
sys.stderr.write(msg)
|
|
|
|
self.server_version = parse_version('1.0.6')
|
2017-11-02 15:03:55 +00:00
|
|
|
compatMap['open'] = ('path', 'create', 'lock_wait', 'lock', )
|
2016-07-31 19:19:30 +00:00
|
|
|
# try again with corrected version and compatMap
|
|
|
|
do_open()
|
2016-04-03 19:44:29 +00:00
|
|
|
except Exception:
|
|
|
|
self.close()
|
|
|
|
raise
|
2012-10-17 09:40:23 +00:00
|
|
|
|
|
|
|
def __del__(self):
|
2016-08-09 21:26:56 +00:00
|
|
|
if len(self.responses):
|
2016-11-10 08:41:33 +00:00
|
|
|
logging.debug('still %d cached responses left in RemoteRepository' % (len(self.responses),))
|
2016-03-22 23:41:15 +00:00
|
|
|
if self.p:
|
|
|
|
self.close()
|
2016-11-10 08:41:33 +00:00
|
|
|
assert False, 'cleanup happened in Repository.__del__'
|
2012-11-30 20:47:35 +00:00
|
|
|
|
2015-07-14 22:01:07 +00:00
|
|
|
def __repr__(self):
|
2022-02-27 18:31:33 +00:00
|
|
|
return f'<{self.__class__.__name__} {self.location.canonical_path()}>'
|
2015-07-14 22:01:07 +00:00
|
|
|
|
2016-03-22 23:41:15 +00:00
|
|
|
def __enter__(self):
|
|
|
|
return self
|
|
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
2016-07-03 00:58:17 +00:00
|
|
|
try:
|
|
|
|
if exc_type is not None:
|
2017-03-21 22:44:47 +00:00
|
|
|
self.shutdown_time = time.monotonic() + 30
|
2016-07-03 00:58:17 +00:00
|
|
|
self.rollback()
|
|
|
|
finally:
|
|
|
|
# in any case, we want to cleanly close the repo, even if the
|
|
|
|
# rollback can not succeed (e.g. because the connection was
|
|
|
|
# already closed) and raised another exception:
|
2017-05-27 19:50:28 +00:00
|
|
|
logger.debug('RemoteRepository: %s bytes sent, %s bytes received, %d messages sent',
|
|
|
|
format_file_size(self.tx_bytes), format_file_size(self.rx_bytes), self.msgid)
|
2016-07-03 00:58:17 +00:00
|
|
|
self.close()
|
2016-03-22 23:41:15 +00:00
|
|
|
|
2016-04-23 20:42:56 +00:00
|
|
|
@property
|
|
|
|
def id_str(self):
|
|
|
|
return bin_to_hex(self.id)
|
|
|
|
|
2015-12-12 14:31:43 +00:00
|
|
|
def borg_cmd(self, args, testing):
|
|
|
|
"""return a borg serve command line"""
|
2016-11-10 08:41:33 +00:00
|
|
|
# give some args/options to 'borg serve' process as they were given to us
|
2015-12-12 14:31:43 +00:00
|
|
|
opts = []
|
|
|
|
if args is not None:
|
2015-12-12 17:25:38 +00:00
|
|
|
root_logger = logging.getLogger()
|
|
|
|
if root_logger.isEnabledFor(logging.DEBUG):
|
2015-12-12 14:31:43 +00:00
|
|
|
opts.append('--debug')
|
2015-12-12 17:25:38 +00:00
|
|
|
elif root_logger.isEnabledFor(logging.INFO):
|
2015-12-12 14:31:43 +00:00
|
|
|
opts.append('--info')
|
2015-12-12 17:25:38 +00:00
|
|
|
elif root_logger.isEnabledFor(logging.WARNING):
|
|
|
|
pass # warning is default
|
2016-04-03 18:17:09 +00:00
|
|
|
elif root_logger.isEnabledFor(logging.ERROR):
|
|
|
|
opts.append('--error')
|
|
|
|
elif root_logger.isEnabledFor(logging.CRITICAL):
|
|
|
|
opts.append('--critical')
|
2015-12-12 14:31:43 +00:00
|
|
|
else:
|
|
|
|
raise ValueError('log level missing, fix this code')
|
2017-05-17 15:43:48 +00:00
|
|
|
|
|
|
|
# Tell the remote server about debug topics it may need to consider.
|
|
|
|
# Note that debug topics are usable for "spew" or "trace" logs which would
|
|
|
|
# be too plentiful to transfer for normal use, so the server doesn't send
|
|
|
|
# them unless explicitly enabled.
|
|
|
|
#
|
|
|
|
# Needless to say, if you do --debug-topic=repository.compaction, for example,
|
|
|
|
# with a 1.0.x server it won't work, because the server does not recognize the
|
|
|
|
# option.
|
|
|
|
#
|
|
|
|
# This is not considered a problem, since this is a debugging feature that
|
|
|
|
# should not be used for regular use.
|
|
|
|
for topic in args.debug_topics:
|
|
|
|
if '.' not in topic:
|
|
|
|
topic = 'borg.debug.' + topic
|
|
|
|
if 'repository' in topic:
|
|
|
|
opts.append('--debug-topic=%s' % topic)
|
2017-05-24 22:59:39 +00:00
|
|
|
|
|
|
|
if 'storage_quota' in args and args.storage_quota:
|
|
|
|
opts.append('--storage-quota=%s' % args.storage_quota)
|
2016-10-04 12:26:57 +00:00
|
|
|
env_vars = []
|
2015-12-12 14:31:43 +00:00
|
|
|
if testing:
|
2016-10-04 12:26:57 +00:00
|
|
|
return env_vars + [sys.executable, '-m', 'borg.archiver', 'serve'] + opts + self.extra_test_args
|
2015-12-12 14:31:43 +00:00
|
|
|
else: # pragma: no cover
|
2016-07-04 13:06:20 +00:00
|
|
|
remote_path = args.remote_path or os.environ.get('BORG_REMOTE_PATH', 'borg')
|
2016-07-28 07:30:46 +00:00
|
|
|
remote_path = replace_placeholders(remote_path)
|
2016-10-04 12:26:57 +00:00
|
|
|
return env_vars + [remote_path, 'serve'] + opts
|
2015-10-05 22:51:20 +00:00
|
|
|
|
|
|
|
def ssh_cmd(self, location):
|
2015-12-12 14:31:43 +00:00
|
|
|
"""return a ssh command line that can be prefixed to a borg command line"""
|
2019-01-31 12:54:17 +00:00
|
|
|
rsh = self._args.rsh or os.environ.get('BORG_RSH', 'ssh')
|
|
|
|
args = shlex.split(rsh)
|
2015-10-05 22:51:20 +00:00
|
|
|
if location.port:
|
|
|
|
args += ['-p', str(location.port)]
|
|
|
|
if location.user:
|
2022-02-27 18:31:33 +00:00
|
|
|
args.append(f'{location.user}@{location.host}')
|
2015-10-05 22:51:20 +00:00
|
|
|
else:
|
|
|
|
args.append('%s' % location.host)
|
|
|
|
return args
|
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
def named_to_positional(self, method, kwargs):
|
|
|
|
return [kwargs[name] for name in compatMap[method]]
|
|
|
|
|
|
|
|
def call(self, cmd, args, **kw):
|
2014-01-22 19:58:48 +00:00
|
|
|
for resp in self.call_many(cmd, [args], **kw):
|
|
|
|
return resp
|
|
|
|
|
2017-03-05 04:19:32 +00:00
|
|
|
def call_many(self, cmd, calls, wait=True, is_preloaded=False, async_wait=True):
|
|
|
|
if not calls and cmd != 'async_responses':
|
2014-01-30 21:16:21 +00:00
|
|
|
return
|
2015-03-17 22:47:21 +00:00
|
|
|
|
2021-01-03 16:37:16 +00:00
|
|
|
def send_buffer():
|
|
|
|
if self.to_send:
|
|
|
|
try:
|
|
|
|
written = self.ratelimit.write(self.stdin_fd, self.to_send.peek_front())
|
|
|
|
self.tx_bytes += written
|
|
|
|
self.to_send.pop_front(written)
|
|
|
|
except OSError as e:
|
|
|
|
# io.write might raise EAGAIN even though select indicates
|
|
|
|
# that the fd should be writable.
|
|
|
|
# EWOULDBLOCK is added for defensive programming sake.
|
|
|
|
if e.errno not in [errno.EAGAIN, errno.EWOULDBLOCK]:
|
|
|
|
raise
|
|
|
|
|
2016-08-05 19:51:01 +00:00
|
|
|
def pop_preload_msgid(chunkid):
|
|
|
|
msgid = self.chunkid_to_msgids[chunkid].pop(0)
|
|
|
|
if not self.chunkid_to_msgids[chunkid]:
|
|
|
|
del self.chunkid_to_msgids[chunkid]
|
2014-01-22 19:58:48 +00:00
|
|
|
return msgid
|
|
|
|
|
2016-11-08 22:18:18 +00:00
|
|
|
def handle_error(unpacked):
|
2022-05-06 01:59:10 +00:00
|
|
|
error = unpacked['exception_class']
|
|
|
|
old_server = 'exception_args' not in unpacked
|
|
|
|
args = unpacked.get('exception_args')
|
2016-11-08 22:18:18 +00:00
|
|
|
|
2016-08-15 13:52:19 +00:00
|
|
|
if error == 'DoesNotExist':
|
2022-01-30 02:46:11 +00:00
|
|
|
raise Repository.DoesNotExist(self.location.processed)
|
2016-08-15 13:52:19 +00:00
|
|
|
elif error == 'AlreadyExists':
|
2022-01-30 02:46:11 +00:00
|
|
|
raise Repository.AlreadyExists(self.location.processed)
|
2016-08-15 13:52:19 +00:00
|
|
|
elif error == 'CheckNeeded':
|
2022-01-30 02:46:11 +00:00
|
|
|
raise Repository.CheckNeeded(self.location.processed)
|
2016-08-15 13:52:19 +00:00
|
|
|
elif error == 'IntegrityError':
|
2016-11-08 22:18:18 +00:00
|
|
|
if old_server:
|
|
|
|
raise IntegrityError('(not available)')
|
|
|
|
else:
|
2022-05-06 01:59:10 +00:00
|
|
|
raise IntegrityError(args[0])
|
2017-08-06 14:46:08 +00:00
|
|
|
elif error == 'AtticRepository':
|
|
|
|
if old_server:
|
|
|
|
raise Repository.AtticRepository('(not available)')
|
|
|
|
else:
|
2022-05-06 01:59:10 +00:00
|
|
|
raise Repository.AtticRepository(args[0])
|
2016-08-15 13:52:19 +00:00
|
|
|
elif error == 'PathNotAllowed':
|
2017-06-02 21:38:49 +00:00
|
|
|
if old_server:
|
|
|
|
raise PathNotAllowed('(unknown)')
|
|
|
|
else:
|
2022-05-06 01:59:10 +00:00
|
|
|
raise PathNotAllowed(args[0])
|
2019-02-04 16:12:11 +00:00
|
|
|
elif error == 'ParentPathDoesNotExist':
|
2022-05-06 01:59:10 +00:00
|
|
|
raise Repository.ParentPathDoesNotExist(args[0])
|
2016-08-15 13:52:19 +00:00
|
|
|
elif error == 'ObjectNotFound':
|
2016-11-08 22:18:18 +00:00
|
|
|
if old_server:
|
2022-01-30 02:46:11 +00:00
|
|
|
raise Repository.ObjectNotFound('(not available)', self.location.processed)
|
2016-11-08 22:18:18 +00:00
|
|
|
else:
|
2022-05-06 01:59:10 +00:00
|
|
|
raise Repository.ObjectNotFound(args[0], self.location.processed)
|
2016-08-15 13:52:19 +00:00
|
|
|
elif error == 'InvalidRPCMethod':
|
2016-11-08 22:18:18 +00:00
|
|
|
if old_server:
|
|
|
|
raise InvalidRPCMethod('(not available)')
|
|
|
|
else:
|
2022-05-06 01:59:10 +00:00
|
|
|
raise InvalidRPCMethod(args[0])
|
2016-06-27 20:44:41 +00:00
|
|
|
else:
|
2016-11-08 22:18:18 +00:00
|
|
|
raise self.RPCError(unpacked)
|
2016-06-27 20:44:41 +00:00
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
calls = list(calls)
|
|
|
|
waiting_for = []
|
2021-01-03 16:37:16 +00:00
|
|
|
maximum_to_send = 0 if wait else self.upload_buffer_size_limit
|
|
|
|
send_buffer() # Try to send data, as some cases (async_response) will never try to send data otherwise.
|
2014-01-22 19:58:48 +00:00
|
|
|
while wait or calls:
|
2017-03-21 22:44:47 +00:00
|
|
|
if self.shutdown_time and time.monotonic() > self.shutdown_time:
|
|
|
|
# we are shutting this RemoteRepository down already, make sure we do not waste
|
|
|
|
# a lot of time in case a lot of async stuff is coming in or remote is gone or slow.
|
|
|
|
logger.debug('shutdown_time reached, shutting down with %d waiting_for and %d async_responses.',
|
|
|
|
len(waiting_for), len(self.async_responses))
|
|
|
|
return
|
2014-01-22 19:58:48 +00:00
|
|
|
while waiting_for:
|
|
|
|
try:
|
2016-11-10 08:56:18 +00:00
|
|
|
unpacked = self.responses.pop(waiting_for[0])
|
2014-01-22 19:58:48 +00:00
|
|
|
waiting_for.pop(0)
|
2022-05-06 01:59:10 +00:00
|
|
|
if 'exception_class' in unpacked:
|
2016-11-08 22:18:18 +00:00
|
|
|
handle_error(unpacked)
|
2014-01-22 19:58:48 +00:00
|
|
|
else:
|
2022-05-06 01:59:10 +00:00
|
|
|
yield unpacked[RESULT]
|
2014-01-22 19:58:48 +00:00
|
|
|
if not waiting_for and not calls:
|
|
|
|
return
|
|
|
|
except KeyError:
|
|
|
|
break
|
2017-03-05 04:19:32 +00:00
|
|
|
if cmd == 'async_responses':
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
msgid, unpacked = self.async_responses.popitem()
|
|
|
|
except KeyError:
|
|
|
|
# there is nothing left what we already have received
|
|
|
|
if async_wait and self.ignore_responses:
|
|
|
|
# but do not return if we shall wait and there is something left to wait for:
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
return
|
|
|
|
else:
|
2022-05-06 01:59:10 +00:00
|
|
|
if 'exception_class' in unpacked:
|
2017-03-05 04:19:32 +00:00
|
|
|
handle_error(unpacked)
|
|
|
|
else:
|
2022-05-06 01:59:10 +00:00
|
|
|
yield unpacked[RESULT]
|
2016-05-25 22:14:27 +00:00
|
|
|
if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
|
|
|
|
w_fds = [self.stdin_fd]
|
|
|
|
else:
|
|
|
|
w_fds = []
|
2012-12-06 21:58:57 +00:00
|
|
|
r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1)
|
2012-11-30 20:47:35 +00:00
|
|
|
if x:
|
2015-07-15 09:14:53 +00:00
|
|
|
raise Exception('FD exception occurred')
|
2015-12-12 20:24:21 +00:00
|
|
|
for fd in r:
|
|
|
|
if fd is self.stdout_fd:
|
|
|
|
data = os.read(fd, BUFSIZE)
|
|
|
|
if not data:
|
|
|
|
raise ConnectionClosed()
|
2017-02-23 15:50:52 +00:00
|
|
|
self.rx_bytes += len(data)
|
2015-12-12 20:24:21 +00:00
|
|
|
self.unpacker.feed(data)
|
|
|
|
for unpacked in self.unpacker:
|
2016-11-10 08:56:18 +00:00
|
|
|
if isinstance(unpacked, dict):
|
2022-05-06 01:59:10 +00:00
|
|
|
msgid = unpacked[MSGID]
|
2016-11-10 08:56:18 +00:00
|
|
|
elif isinstance(unpacked, tuple) and len(unpacked) == 4:
|
2016-11-15 22:18:21 +00:00
|
|
|
# The first field 'type' was always 1 and has always been ignored
|
|
|
|
_, msgid, error, res = unpacked
|
2016-11-10 08:56:18 +00:00
|
|
|
if error:
|
2016-11-08 22:18:18 +00:00
|
|
|
# ignore res, because it is only a fixed string anyway.
|
2022-05-06 01:59:10 +00:00
|
|
|
unpacked = {MSGID: msgid, 'exception_class': error}
|
2016-11-10 08:56:18 +00:00
|
|
|
else:
|
2022-05-06 01:59:10 +00:00
|
|
|
unpacked = {MSGID: msgid, RESULT: res}
|
2016-11-10 08:56:18 +00:00
|
|
|
else:
|
2017-01-12 14:01:41 +00:00
|
|
|
raise UnexpectedRPCDataFormatFromServer(data)
|
2015-12-12 20:24:21 +00:00
|
|
|
if msgid in self.ignore_responses:
|
|
|
|
self.ignore_responses.remove(msgid)
|
2017-03-05 04:19:32 +00:00
|
|
|
# async methods never return values, but may raise exceptions.
|
2022-05-06 01:59:10 +00:00
|
|
|
if 'exception_class' in unpacked:
|
2017-03-05 04:19:32 +00:00
|
|
|
self.async_responses[msgid] = unpacked
|
|
|
|
else:
|
|
|
|
# we currently do not have async result values except "None",
|
|
|
|
# so we do not add them into async_responses.
|
2022-05-06 01:59:10 +00:00
|
|
|
if unpacked[RESULT] is not None:
|
2017-03-05 04:19:32 +00:00
|
|
|
self.async_responses[msgid] = unpacked
|
2015-12-12 20:24:21 +00:00
|
|
|
else:
|
2016-11-10 08:56:18 +00:00
|
|
|
self.responses[msgid] = unpacked
|
2015-12-12 20:24:21 +00:00
|
|
|
elif fd is self.stderr_fd:
|
|
|
|
data = os.read(fd, 32768)
|
|
|
|
if not data:
|
|
|
|
raise ConnectionClosed()
|
2017-02-23 15:50:52 +00:00
|
|
|
self.rx_bytes += len(data)
|
2017-09-24 23:15:13 +00:00
|
|
|
# deal with incomplete lines (may appear due to block buffering)
|
|
|
|
if self.stderr_received:
|
|
|
|
data = self.stderr_received + data
|
|
|
|
self.stderr_received = b''
|
|
|
|
lines = data.splitlines(keepends=True)
|
|
|
|
if lines and not lines[-1].endswith((b'\r', b'\n')):
|
|
|
|
self.stderr_received = lines.pop()
|
|
|
|
# now we have complete lines in <lines> and any partial line in self.stderr_received.
|
|
|
|
for line in lines:
|
2019-02-22 16:05:57 +00:00
|
|
|
handle_remote_line(line.decode()) # decode late, avoid partial utf-8 sequences
|
2012-12-06 21:58:57 +00:00
|
|
|
if w:
|
2021-01-03 16:37:16 +00:00
|
|
|
while (len(self.to_send) <= maximum_to_send) and (calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT:
|
2014-01-22 19:58:48 +00:00
|
|
|
if calls:
|
|
|
|
if is_preloaded:
|
2016-11-10 08:41:33 +00:00
|
|
|
assert cmd == 'get', "is_preload is only supported for 'get'"
|
2016-11-10 10:08:45 +00:00
|
|
|
if calls[0]['id'] in self.chunkid_to_msgids:
|
|
|
|
waiting_for.append(pop_preload_msgid(calls.pop(0)['id']))
|
2014-01-22 19:58:48 +00:00
|
|
|
else:
|
|
|
|
args = calls.pop(0)
|
2016-11-10 10:08:45 +00:00
|
|
|
if cmd == 'get' and args['id'] in self.chunkid_to_msgids:
|
|
|
|
waiting_for.append(pop_preload_msgid(args['id']))
|
2014-01-22 19:58:48 +00:00
|
|
|
else:
|
|
|
|
self.msgid += 1
|
|
|
|
waiting_for.append(self.msgid)
|
2016-11-10 08:56:18 +00:00
|
|
|
if self.dictFormat:
|
2021-01-03 16:37:16 +00:00
|
|
|
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
|
2016-11-10 08:56:18 +00:00
|
|
|
else:
|
2021-01-03 16:37:16 +00:00
|
|
|
self.to_send.push_back(msgpack.packb((1, self.msgid, cmd, self.named_to_positional(cmd, args))))
|
2014-01-22 19:58:48 +00:00
|
|
|
if not self.to_send and self.preload_ids:
|
2016-08-05 19:48:23 +00:00
|
|
|
chunk_id = self.preload_ids.pop(0)
|
2016-11-10 10:08:45 +00:00
|
|
|
args = {'id': chunk_id}
|
2014-01-22 19:58:48 +00:00
|
|
|
self.msgid += 1
|
2016-08-05 19:51:01 +00:00
|
|
|
self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid)
|
2016-11-10 08:56:18 +00:00
|
|
|
if self.dictFormat:
|
2021-01-03 16:37:16 +00:00
|
|
|
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: 'get', ARGS: args}))
|
2016-11-10 08:56:18 +00:00
|
|
|
else:
|
2021-01-03 16:37:16 +00:00
|
|
|
self.to_send.push_back(msgpack.packb((1, self.msgid, 'get', self.named_to_positional('get', args))))
|
2012-11-27 23:03:35 +00:00
|
|
|
|
2021-01-03 16:37:16 +00:00
|
|
|
send_buffer()
|
2017-03-05 04:19:32 +00:00
|
|
|
self.ignore_responses |= set(waiting_for) # we lose order here
|
2011-07-06 20:16:07 +00:00
|
|
|
|
2016-07-31 19:19:30 +00:00
|
|
|
@api(since=parse_version('1.0.0'),
|
2019-02-04 16:12:11 +00:00
|
|
|
append_only={'since': parse_version('1.0.7'), 'previously': False},
|
|
|
|
make_parent_dirs={'since': parse_version('1.1.9'), 'previously': False})
|
|
|
|
def open(self, path, create=False, lock_wait=None, lock=True, exclusive=False, append_only=False,
|
|
|
|
make_parent_dirs=False):
|
2016-07-31 19:19:30 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
|
|
|
|
2019-03-10 19:21:22 +00:00
|
|
|
@api(since=parse_version('1.0.0'),
|
|
|
|
max_duration={'since': parse_version('1.2.0a4'), 'previously': 0})
|
|
|
|
def check(self, repair=False, save_space=False, max_duration=0):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2014-02-04 22:49:10 +00:00
|
|
|
|
2018-06-24 17:08:49 +00:00
|
|
|
@api(since=parse_version('1.0.0'),
|
2018-07-11 03:19:08 +00:00
|
|
|
compact={'since': parse_version('1.2.0a0'), 'previously': True, 'dontcare': True},
|
2019-10-24 08:12:58 +00:00
|
|
|
threshold={'since': parse_version('1.2.0a8'), 'previously': 0.1, 'dontcare': True},
|
2018-07-11 03:19:08 +00:00
|
|
|
cleanup_commits={'since': parse_version('1.2.0a0'), 'previously': False, 'dontcare': True})
|
2019-10-24 08:12:58 +00:00
|
|
|
def commit(self, save_space=False, compact=True, threshold=0.1, cleanup_commits=False):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2010-11-15 21:18:47 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version('1.0.0'))
|
|
|
|
def rollback(self):
|
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2010-11-15 21:18:47 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version('1.0.0'))
|
2015-03-09 15:02:06 +00:00
|
|
|
def destroy(self):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2015-03-09 15:02:06 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version('1.0.0'))
|
2014-02-08 23:17:32 +00:00
|
|
|
def __len__(self):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2014-02-08 23:17:32 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version('1.0.0'))
|
2014-02-10 20:51:25 +00:00
|
|
|
def list(self, limit=None, marker=None):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2014-02-10 20:51:25 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version('1.1.0b3'))
|
2016-09-23 19:45:01 +00:00
|
|
|
def scan(self, limit=None, marker=None):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2016-09-23 19:45:01 +00:00
|
|
|
|
2016-11-10 10:08:45 +00:00
|
|
|
def get(self, id):
|
|
|
|
for resp in self.get_many([id]):
|
2014-01-22 19:58:48 +00:00
|
|
|
return resp
|
|
|
|
|
|
|
|
def get_many(self, ids, is_preloaded=False):
|
2022-02-27 18:31:33 +00:00
|
|
|
yield from self.call_many('get', [{'id': id} for id in ids], is_preloaded=is_preloaded)
|
2011-07-06 20:16:07 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version('1.0.0'))
|
|
|
|
def put(self, id, data, wait=True):
|
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2012-10-17 09:40:23 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version('1.0.0'))
|
|
|
|
def delete(self, id, wait=True):
|
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2012-11-30 20:47:35 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version('1.0.0'))
|
2015-07-14 22:01:07 +00:00
|
|
|
def save_key(self, keydata):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2015-07-14 22:01:07 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version('1.0.0'))
|
2015-07-14 22:01:07 +00:00
|
|
|
def load_key(self):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2015-07-14 22:01:07 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version('1.0.0'))
|
2016-07-25 18:38:31 +00:00
|
|
|
def get_free_nonce(self):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2016-07-25 18:38:31 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version('1.0.0'))
|
2016-07-25 18:38:31 +00:00
|
|
|
def commit_nonce_reservation(self, next_unreserved, start_nonce):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2016-07-25 18:38:31 +00:00
|
|
|
|
2016-11-10 08:56:18 +00:00
|
|
|
@api(since=parse_version('1.0.0'))
|
2015-11-21 19:50:53 +00:00
|
|
|
def break_lock(self):
|
2016-11-10 08:56:18 +00:00
|
|
|
"""actual remoting is done via self.call in the @api decorator"""
|
2015-11-21 19:50:53 +00:00
|
|
|
|
2012-11-30 20:47:35 +00:00
|
|
|
def close(self):
|
|
|
|
if self.p:
|
|
|
|
self.p.stdin.close()
|
|
|
|
self.p.stdout.close()
|
|
|
|
self.p.wait()
|
|
|
|
self.p = None
|
2014-01-22 19:58:48 +00:00
|
|
|
|
2017-03-05 04:19:32 +00:00
|
|
|
def async_response(self, wait=True):
|
|
|
|
for resp in self.call_many('async_responses', calls=[], wait=True, async_wait=wait):
|
|
|
|
return resp
|
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
def preload(self, ids):
|
|
|
|
self.preload_ids += ids
|
2014-03-13 21:29:47 +00:00
|
|
|
|
|
|
|
|
2016-05-27 21:16:31 +00:00
|
|
|
def handle_remote_line(line):
|
2017-05-17 15:17:15 +00:00
|
|
|
"""
|
|
|
|
Handle a remote log line.
|
|
|
|
|
2017-05-20 10:52:32 +00:00
|
|
|
This function is remarkably complex because it handles multiple wire formats.
|
2017-05-17 15:17:15 +00:00
|
|
|
"""
|
2017-09-24 23:15:13 +00:00
|
|
|
assert line.endswith(('\r', '\n'))
|
2017-05-17 15:17:15 +00:00
|
|
|
if line.startswith('{'):
|
|
|
|
# This format is used by Borg since 1.1.0b6 for new-protocol clients.
|
|
|
|
# It is the same format that is exposed by --log-json.
|
|
|
|
msg = json.loads(line)
|
|
|
|
|
|
|
|
if msg['type'] not in ('progress_message', 'progress_percent', 'log_message'):
|
|
|
|
logger.warning('Dropped remote log message with unknown type %r: %s', msg['type'], line)
|
|
|
|
return
|
|
|
|
|
|
|
|
if msg['type'] == 'log_message':
|
|
|
|
# Re-emit log messages on the same level as the remote to get correct log suppression and verbosity.
|
|
|
|
level = getattr(logging, msg['levelname'], logging.CRITICAL)
|
|
|
|
assert isinstance(level, int)
|
|
|
|
target_logger = logging.getLogger(msg['name'])
|
2017-05-18 14:54:44 +00:00
|
|
|
msg['message'] = 'Remote: ' + msg['message']
|
2017-05-20 10:52:32 +00:00
|
|
|
# In JSON mode, we manually check whether the log message should be propagated.
|
|
|
|
if logging.getLogger('borg').json and level >= target_logger.getEffectiveLevel():
|
2017-05-18 14:54:44 +00:00
|
|
|
sys.stderr.write(json.dumps(msg) + '\n')
|
2017-05-17 15:17:15 +00:00
|
|
|
else:
|
|
|
|
target_logger.log(level, '%s', msg['message'])
|
2017-05-18 14:54:44 +00:00
|
|
|
elif msg['type'].startswith('progress_'):
|
2017-05-17 15:17:15 +00:00
|
|
|
# Progress messages are a bit more complex.
|
|
|
|
# First of all, we check whether progress output is enabled. This is signalled
|
|
|
|
# through the effective level of the borg.output.progress logger
|
|
|
|
# (also see ProgressIndicatorBase in borg.helpers).
|
|
|
|
progress_logger = logging.getLogger('borg.output.progress')
|
|
|
|
if progress_logger.getEffectiveLevel() == logging.INFO:
|
2017-05-18 14:54:44 +00:00
|
|
|
# When progress output is enabled, we check whether the client is in
|
2017-05-17 15:17:15 +00:00
|
|
|
# --log-json mode, as signalled by the "json" attribute on the "borg" logger.
|
|
|
|
if logging.getLogger('borg').json:
|
2017-05-20 10:52:32 +00:00
|
|
|
# In --log-json mode we re-emit the progress JSON line as sent by the server,
|
|
|
|
# with the message, if any, prefixed with "Remote: ".
|
2017-05-18 14:54:44 +00:00
|
|
|
if 'message' in msg:
|
|
|
|
msg['message'] = 'Remote: ' + msg['message']
|
|
|
|
sys.stderr.write(json.dumps(msg) + '\n')
|
|
|
|
elif 'message' in msg:
|
2017-05-17 15:17:15 +00:00
|
|
|
# In text log mode we write only the message to stderr and terminate with \r
|
|
|
|
# (carriage return, i.e. move the write cursor back to the beginning of the line)
|
|
|
|
# so that the next message, progress or not, overwrites it. This mirrors the behaviour
|
|
|
|
# of local progress displays.
|
2017-05-18 14:54:44 +00:00
|
|
|
sys.stderr.write('Remote: ' + msg['message'] + '\r')
|
2017-05-17 15:17:15 +00:00
|
|
|
elif line.startswith('$LOG '):
|
2017-09-21 02:11:59 +00:00
|
|
|
# This format is used by borg serve 0.xx, 1.0.x and 1.1.0b1..b5.
|
2017-05-17 15:17:15 +00:00
|
|
|
# It prefixed log lines with $LOG as a marker, followed by the log level
|
|
|
|
# and optionally a logger name, then "Remote:" as a separator followed by the original
|
|
|
|
# message.
|
2016-05-27 21:16:31 +00:00
|
|
|
_, level, msg = line.split(' ', 2)
|
|
|
|
level = getattr(logging, level, logging.CRITICAL) # str -> int
|
|
|
|
if msg.startswith('Remote:'):
|
|
|
|
# server format: '$LOG <level> Remote: <msg>'
|
|
|
|
logging.log(level, msg.rstrip())
|
|
|
|
else:
|
|
|
|
# server format '$LOG <level> <logname> Remote: <msg>'
|
|
|
|
logname, msg = msg.split(' ', 1)
|
|
|
|
logging.getLogger(logname).log(level, msg.rstrip())
|
|
|
|
else:
|
2017-05-17 15:17:15 +00:00
|
|
|
# Plain 1.0.x and older format - re-emit to stderr (mirroring what the 1.0.x
|
|
|
|
# client did) or as a generic log message.
|
|
|
|
# We don't know what priority the line had.
|
|
|
|
if logging.getLogger('borg').json:
|
|
|
|
logging.getLogger('').warning('Remote: ' + line.strip())
|
|
|
|
else:
|
|
|
|
# In non-JSON mode we circumvent logging to preserve carriage returns (\r)
|
|
|
|
# which are generated by remote progress displays.
|
|
|
|
sys.stderr.write('Remote: ' + line)
|
2016-05-27 21:16:31 +00:00
|
|
|
|
|
|
|
|
2016-01-16 22:42:54 +00:00
|
|
|
class RepositoryNoCache:
|
|
|
|
"""A not caching Repository wrapper, passes through to repository.
|
2014-03-13 21:29:47 +00:00
|
|
|
|
2016-01-16 22:42:54 +00:00
|
|
|
Just to have same API (including the context manager) as RepositoryCache.
|
2017-05-28 11:16:52 +00:00
|
|
|
|
|
|
|
*transform* is a callable taking two arguments, key and raw repository data.
|
|
|
|
The return value is returned from get()/get_many(). By default, the raw
|
|
|
|
repository data is returned.
|
2014-03-13 21:29:47 +00:00
|
|
|
"""
|
2017-05-28 11:16:52 +00:00
|
|
|
def __init__(self, repository, transform=None):
|
2014-03-13 21:29:47 +00:00
|
|
|
self.repository = repository
|
2017-05-28 11:16:52 +00:00
|
|
|
self.transform = transform or (lambda key, data: data)
|
2014-03-13 21:29:47 +00:00
|
|
|
|
2016-01-16 22:42:54 +00:00
|
|
|
def close(self):
|
|
|
|
pass
|
|
|
|
|
|
|
|
def __enter__(self):
|
|
|
|
return self
|
|
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
|
self.close()
|
2014-03-13 21:29:47 +00:00
|
|
|
|
|
|
|
def get(self, key):
|
2016-05-19 23:54:42 +00:00
|
|
|
return next(self.get_many([key], cache=False))
|
2014-03-13 21:29:47 +00:00
|
|
|
|
2016-05-19 23:54:42 +00:00
|
|
|
def get_many(self, keys, cache=True):
|
2017-05-28 11:16:52 +00:00
|
|
|
for key, data in zip(keys, self.repository.get_many(keys)):
|
|
|
|
yield self.transform(key, data)
|
2016-01-16 22:42:54 +00:00
|
|
|
|
2017-06-13 18:03:39 +00:00
|
|
|
def log_instrumentation(self):
|
|
|
|
pass
|
|
|
|
|
2016-01-16 22:42:54 +00:00
|
|
|
|
|
|
|
class RepositoryCache(RepositoryNoCache):
|
2016-05-19 23:54:42 +00:00
|
|
|
"""
|
|
|
|
A caching Repository wrapper.
|
2016-01-16 22:42:54 +00:00
|
|
|
|
2016-05-19 23:54:42 +00:00
|
|
|
Caches Repository GET operations locally.
|
2017-05-28 11:16:52 +00:00
|
|
|
|
|
|
|
*pack* and *unpack* complement *transform* of the base class.
|
|
|
|
*pack* receives the output of *transform* and should return bytes,
|
|
|
|
which are stored in the cache. *unpack* receives these bytes and
|
|
|
|
should return the initial data (as returned by *transform*).
|
2016-01-16 22:42:54 +00:00
|
|
|
"""
|
2016-05-20 00:01:07 +00:00
|
|
|
|
2017-05-28 11:16:52 +00:00
|
|
|
def __init__(self, repository, pack=None, unpack=None, transform=None):
|
|
|
|
super().__init__(repository, transform)
|
|
|
|
self.pack = pack or (lambda data: data)
|
|
|
|
self.unpack = unpack or (lambda data: data)
|
2016-05-19 23:54:42 +00:00
|
|
|
self.cache = set()
|
|
|
|
self.basedir = tempfile.mkdtemp(prefix='borg-cache-')
|
|
|
|
self.query_size_limit()
|
|
|
|
self.size = 0
|
|
|
|
# Instrumentation
|
|
|
|
self.hits = 0
|
|
|
|
self.misses = 0
|
|
|
|
self.slow_misses = 0
|
|
|
|
self.slow_lat = 0.0
|
|
|
|
self.evictions = 0
|
|
|
|
self.enospc = 0
|
|
|
|
|
|
|
|
def query_size_limit(self):
|
2019-11-03 16:40:34 +00:00
|
|
|
available_space = shutil.disk_usage(self.basedir).free
|
2016-05-19 23:54:42 +00:00
|
|
|
self.size_limit = int(min(available_space * 0.25, 2**31))
|
|
|
|
|
|
|
|
def key_filename(self, key):
|
|
|
|
return os.path.join(self.basedir, bin_to_hex(key))
|
|
|
|
|
|
|
|
def backoff(self):
|
|
|
|
self.query_size_limit()
|
|
|
|
target_size = int(0.9 * self.size_limit)
|
|
|
|
while self.size > target_size and self.cache:
|
|
|
|
key = self.cache.pop()
|
|
|
|
file = self.key_filename(key)
|
|
|
|
self.size -= os.stat(file).st_size
|
|
|
|
os.unlink(file)
|
|
|
|
self.evictions += 1
|
|
|
|
|
2017-05-28 11:16:52 +00:00
|
|
|
def add_entry(self, key, data, cache):
|
|
|
|
transformed = self.transform(key, data)
|
|
|
|
if not cache:
|
|
|
|
return transformed
|
|
|
|
packed = self.pack(transformed)
|
2016-05-19 23:54:42 +00:00
|
|
|
file = self.key_filename(key)
|
|
|
|
try:
|
|
|
|
with open(file, 'wb') as fd:
|
2017-05-28 11:16:52 +00:00
|
|
|
fd.write(packed)
|
2016-05-19 23:54:42 +00:00
|
|
|
except OSError as os_error:
|
2017-06-11 17:44:33 +00:00
|
|
|
try:
|
2022-02-15 18:39:58 +00:00
|
|
|
safe_unlink(file)
|
2017-06-11 17:44:33 +00:00
|
|
|
except FileNotFoundError:
|
|
|
|
pass # open() could have failed as well
|
2016-05-19 23:54:42 +00:00
|
|
|
if os_error.errno == errno.ENOSPC:
|
|
|
|
self.enospc += 1
|
|
|
|
self.backoff()
|
|
|
|
else:
|
|
|
|
raise
|
|
|
|
else:
|
2017-05-28 11:16:52 +00:00
|
|
|
self.size += len(packed)
|
2016-05-19 23:54:42 +00:00
|
|
|
self.cache.add(key)
|
|
|
|
if self.size > self.size_limit:
|
|
|
|
self.backoff()
|
2017-05-28 11:16:52 +00:00
|
|
|
return transformed
|
2016-01-16 22:42:54 +00:00
|
|
|
|
2017-06-13 18:03:39 +00:00
|
|
|
def log_instrumentation(self):
|
2016-05-19 23:54:42 +00:00
|
|
|
logger.debug('RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), '
|
2017-06-03 10:27:35 +00:00
|
|
|
'%d evictions, %d ENOSPC hit',
|
2016-05-19 23:54:42 +00:00
|
|
|
len(self.cache), format_file_size(self.size), format_file_size(self.size_limit),
|
|
|
|
self.hits, self.misses, self.slow_misses, self.slow_lat,
|
2017-06-03 10:27:35 +00:00
|
|
|
self.evictions, self.enospc)
|
2017-06-13 18:03:39 +00:00
|
|
|
|
|
|
|
def close(self):
|
|
|
|
self.log_instrumentation()
|
2016-05-19 23:54:42 +00:00
|
|
|
self.cache.clear()
|
|
|
|
shutil.rmtree(self.basedir)
|
|
|
|
|
|
|
|
def get_many(self, keys, cache=True):
|
|
|
|
unknown_keys = [key for key in keys if key not in self.cache]
|
2014-03-13 21:29:47 +00:00
|
|
|
repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys))
|
|
|
|
for key in keys:
|
2016-05-19 23:54:42 +00:00
|
|
|
if key in self.cache:
|
|
|
|
file = self.key_filename(key)
|
2017-06-03 10:27:35 +00:00
|
|
|
with open(file, 'rb') as fd:
|
|
|
|
self.hits += 1
|
|
|
|
yield self.unpack(fd.read())
|
|
|
|
else:
|
2014-03-13 21:29:47 +00:00
|
|
|
for key_, data in repository_iterator:
|
|
|
|
if key_ == key:
|
2017-06-03 10:27:35 +00:00
|
|
|
transformed = self.add_entry(key, data, cache)
|
|
|
|
self.misses += 1
|
|
|
|
yield transformed
|
2014-03-13 21:29:47 +00:00
|
|
|
break
|
2017-06-03 10:27:35 +00:00
|
|
|
else:
|
|
|
|
# slow path: eviction during this get_many removed this key from the cache
|
|
|
|
t0 = time.perf_counter()
|
|
|
|
data = self.repository.get(key)
|
|
|
|
self.slow_lat += time.perf_counter() - t0
|
2017-05-28 11:16:52 +00:00
|
|
|
transformed = self.add_entry(key, data, cache)
|
2017-06-03 10:27:35 +00:00
|
|
|
self.slow_misses += 1
|
2017-05-28 11:16:52 +00:00
|
|
|
yield transformed
|
2014-03-13 21:29:47 +00:00
|
|
|
# Consume any pending requests
|
|
|
|
for _ in repository_iterator:
|
|
|
|
pass
|
2014-03-26 21:42:20 +00:00
|
|
|
|
|
|
|
|
2017-06-03 10:14:17 +00:00
|
|
|
def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None, transform=None, force_cache=False):
|
2017-05-28 11:16:52 +00:00
|
|
|
"""
|
|
|
|
Return a Repository(No)Cache for *repository*.
|
|
|
|
|
|
|
|
If *decrypted_cache* is a key object, then get and get_many will return a tuple
|
|
|
|
(csize, plaintext) instead of the actual data in the repository. The cache will
|
|
|
|
store decrypted data, which increases CPU efficiency (by avoiding repeatedly decrypting
|
|
|
|
and more importantly MAC and ID checking cached objects).
|
|
|
|
Internally, objects are compressed with LZ4.
|
|
|
|
"""
|
|
|
|
if decrypted_cache and (pack or unpack or transform):
|
|
|
|
raise ValueError('decrypted_cache and pack/unpack/transform are incompatible')
|
|
|
|
elif decrypted_cache:
|
|
|
|
key = decrypted_cache
|
2017-06-03 10:14:17 +00:00
|
|
|
# 32 bit csize, 64 bit (8 byte) xxh64
|
|
|
|
cache_struct = struct.Struct('=I8s')
|
2019-07-14 09:39:58 +00:00
|
|
|
compressor = Compressor('lz4')
|
2017-05-28 11:16:52 +00:00
|
|
|
|
|
|
|
def pack(data):
|
2017-06-03 10:14:17 +00:00
|
|
|
csize, decrypted = data
|
|
|
|
compressed = compressor.compress(decrypted)
|
|
|
|
return cache_struct.pack(csize, xxh64(compressed)) + compressed
|
2017-05-28 11:16:52 +00:00
|
|
|
|
|
|
|
def unpack(data):
|
2017-06-03 10:14:17 +00:00
|
|
|
data = memoryview(data)
|
|
|
|
csize, checksum = cache_struct.unpack(data[:cache_struct.size])
|
|
|
|
compressed = data[cache_struct.size:]
|
|
|
|
if checksum != xxh64(compressed):
|
2017-06-03 10:27:35 +00:00
|
|
|
raise IntegrityError('detected corrupted data in metadata cache')
|
2017-06-03 10:14:17 +00:00
|
|
|
return csize, compressor.decompress(compressed)
|
2017-05-28 11:16:52 +00:00
|
|
|
|
|
|
|
def transform(id_, data):
|
|
|
|
csize = len(data)
|
|
|
|
decrypted = key.decrypt(id_, data)
|
|
|
|
return csize, decrypted
|
|
|
|
|
2017-06-03 10:14:17 +00:00
|
|
|
if isinstance(repository, RemoteRepository) or force_cache:
|
2017-05-28 11:16:52 +00:00
|
|
|
return RepositoryCache(repository, pack, unpack, transform)
|
2016-01-16 22:42:54 +00:00
|
|
|
else:
|
2017-05-28 11:16:52 +00:00
|
|
|
return RepositoryNoCache(repository, transform)
|