diff --git a/borg/archiver.py b/borg/archiver.py index 3583870ce..ff0c259ba 100644 --- a/borg/archiver.py +++ b/borg/archiver.py @@ -34,6 +34,9 @@ from .remote import RepositoryServer, RemoteRepository has_lchflags = hasattr(os, 'lchflags') +# default umask, overriden by --umask, defaults to read/write only for owner +UMASK_DEFAULT = 0o077 + class ToggleAction(argparse.Action): """argparse action to handle "toggle" flags easily @@ -55,9 +58,10 @@ class Archiver: self.exit_code = EXIT_SUCCESS self.lock_wait = lock_wait - def open_repository(self, location, create=False, exclusive=False, lock=True): + def open_repository(self, args, create=False, exclusive=False, lock=True): + location = args.location # note: 'location' must be always present in args if location.proto == 'ssh': - repository = RemoteRepository(location, create=create, lock_wait=self.lock_wait, lock=lock) + repository = RemoteRepository(location, create=create, lock_wait=self.lock_wait, lock=lock, args=args) else: repository = Repository(location.path, create=create, exclusive=exclusive, lock_wait=self.lock_wait, lock=lock) repository._location = location @@ -84,8 +88,8 @@ class Archiver: def do_init(self, args): """Initialize an empty repository""" - logger.info('Initializing repository at "%s"' % args.repository.canonical_path()) - repository = self.open_repository(args.repository, create=True, exclusive=True) + logger.info('Initializing repository at "%s"' % args.location.canonical_path()) + repository = self.open_repository(args, create=True, exclusive=True) key = key_creator(repository, args) manifest = Manifest(key, repository) manifest.key = key @@ -96,7 +100,7 @@ class Archiver: def do_check(self, args): """Check repository consistency""" - repository = self.open_repository(args.repository, exclusive=args.repair) + repository = self.open_repository(args, exclusive=args.repair) if args.repair: msg = ("'check --repair' is an experimental feature that might result in data loss." + "\n" + @@ -108,14 +112,14 @@ class Archiver: if not repository.check(repair=args.repair, save_space=args.save_space): return EXIT_WARNING if not args.repo_only and not ArchiveChecker().check( - repository, repair=args.repair, archive=args.repository.archive, + repository, repair=args.repair, archive=args.location.archive, last=args.last, save_space=args.save_space): return EXIT_WARNING return EXIT_SUCCESS def do_change_passphrase(self, args): """Change repository key file passphrase""" - repository = self.open_repository(args.repository) + repository = self.open_repository(args) manifest, key = Manifest.load(repository) key.change_passphrase() return EXIT_SUCCESS @@ -126,13 +130,13 @@ class Archiver: dry_run = args.dry_run t0 = datetime.now() if not dry_run: - repository = self.open_repository(args.archive, exclusive=True) + repository = self.open_repository(args, exclusive=True) manifest, key = Manifest.load(repository) compr_args = dict(buffer=COMPR_BUFFER) compr_args.update(args.compression) key.compressor = Compressor(**compr_args) cache = Cache(repository, key, manifest, do_files=args.cache_files, lock_wait=self.lock_wait) - archive = Archive(repository, key, manifest, args.archive.archive, cache=cache, + archive = Archive(repository, key, manifest, args.location.archive, cache=cache, create=True, checkpoint_interval=args.checkpoint_interval, numeric_owner=args.numeric_owner, progress=args.progress, chunker_params=args.chunker_params, start=t0) @@ -146,9 +150,9 @@ class Archiver: except IOError: pass # Add local repository dir to inode_skip list - if not args.archive.host: + if not args.location.host: try: - st = os.stat(args.archive.path) + st = os.stat(args.location.path) skip_inodes.add((st.st_ino, st.st_dev)) except IOError: pass @@ -271,9 +275,9 @@ class Archiver: logger.warning('Warning: File system encoding is "ascii", extracting non-ascii filenames will not be supported.') if sys.platform.startswith(('linux', 'freebsd', 'netbsd', 'openbsd', 'darwin', )): logger.warning('Hint: You likely need to fix your locale setup. E.g. install locales and use: LANG=en_US.UTF-8') - repository = self.open_repository(args.archive) + repository = self.open_repository(args) manifest, key = Manifest.load(repository) - archive = Archive(repository, key, manifest, args.archive.archive, + archive = Archive(repository, key, manifest, args.location.archive, numeric_owner=args.numeric_owner) patterns = adjust_patterns(args.paths, args.excludes) dry_run = args.dry_run @@ -313,10 +317,10 @@ class Archiver: def do_rename(self, args): """Rename an existing archive""" - repository = self.open_repository(args.archive, exclusive=True) + repository = self.open_repository(args, exclusive=True) manifest, key = Manifest.load(repository) cache = Cache(repository, key, manifest, lock_wait=self.lock_wait) - archive = Archive(repository, key, manifest, args.archive.archive, cache=cache) + archive = Archive(repository, key, manifest, args.location.archive, cache=cache) archive.rename(args.name) manifest.write() repository.commit() @@ -325,11 +329,11 @@ class Archiver: def do_delete(self, args): """Delete an existing repository or archive""" - repository = self.open_repository(args.target, exclusive=True) + repository = self.open_repository(args, exclusive=True) manifest, key = Manifest.load(repository) cache = Cache(repository, key, manifest, do_files=args.cache_files, lock_wait=self.lock_wait) - if args.target.archive: - archive = Archive(repository, key, manifest, args.target.archive, cache=cache) + if args.location.archive: + archive = Archive(repository, key, manifest, args.location.archive, cache=cache) stats = Statistics() archive.delete(stats) manifest.write() @@ -368,11 +372,11 @@ class Archiver: self.print_error('%s: Mountpoint must be a writable directory' % args.mountpoint) return self.exit_code - repository = self.open_repository(args.src) + repository = self.open_repository(args) try: manifest, key = Manifest.load(repository) - if args.src.archive: - archive = Archive(repository, key, manifest, args.src.archive) + if args.location.archive: + archive = Archive(repository, key, manifest, args.location.archive) else: archive = None operations = FuseOperations(key, repository, manifest, archive) @@ -388,10 +392,10 @@ class Archiver: def do_list(self, args): """List archive or repository contents""" - repository = self.open_repository(args.src) + repository = self.open_repository(args) manifest, key = Manifest.load(repository) - if args.src.archive: - archive = Archive(repository, key, manifest, args.src.archive) + if args.location.archive: + archive = Archive(repository, key, manifest, args.location.archive) if args.short: for item in archive.iter_items(): print(remove_surrogates(item[b'path'])) @@ -432,10 +436,10 @@ class Archiver: def do_info(self, args): """Show archive details such as disk space used""" - repository = self.open_repository(args.archive) + repository = self.open_repository(args) manifest, key = Manifest.load(repository) cache = Cache(repository, key, manifest, do_files=args.cache_files, lock_wait=self.lock_wait) - archive = Archive(repository, key, manifest, args.archive.archive, cache=cache) + archive = Archive(repository, key, manifest, args.location.archive, cache=cache) stats = archive.calc_stats(cache) print('Name:', archive.name) print('Fingerprint: %s' % hexlify(archive.id).decode('ascii')) @@ -451,7 +455,7 @@ class Archiver: def do_prune(self, args): """Prune repository archives according to specified rules""" - repository = self.open_repository(args.repository, exclusive=True) + repository = self.open_repository(args, exclusive=True) manifest, key = Manifest.load(repository) cache = Cache(repository, key, manifest, do_files=args.cache_files, lock_wait=self.lock_wait) archives = manifest.list_archive_infos(sort_by='ts', reverse=True) # just a ArchiveInfo list @@ -506,7 +510,7 @@ class Archiver: # to be implemented. # XXX: should auto-detect if it is an attic repository here - repo = AtticRepositoryUpgrader(args.repository.path, create=False) + repo = AtticRepositoryUpgrader(args.location.path, create=False) try: repo.upgrade(args.dry_run, inplace=args.inplace) except NotImplementedError as e: @@ -515,9 +519,9 @@ class Archiver: def do_debug_dump_archive_items(self, args): """dump (decrypted, decompressed) archive items metadata (not: data)""" - repository = self.open_repository(args.archive) + repository = self.open_repository(args) manifest, key = Manifest.load(repository) - archive = Archive(repository, key, manifest, args.archive.archive) + archive = Archive(repository, key, manifest, args.location.archive) for i, item_id in enumerate(archive.metadata[b'items']): data = key.decrypt(item_id, repository.get(item_id)) filename = '%06d_%s.items' %(i, hexlify(item_id).decode('ascii')) @@ -529,7 +533,7 @@ class Archiver: def do_debug_get_obj(self, args): """get object contents from the repository and write it into file""" - repository = self.open_repository(args.repository) + repository = self.open_repository(args) manifest, key = Manifest.load(repository) hex_id = args.id try: @@ -549,7 +553,7 @@ class Archiver: def do_debug_put_obj(self, args): """put file(s) contents into the repository""" - repository = self.open_repository(args.repository) + repository = self.open_repository(args) manifest, key = Manifest.load(repository) for path in args.paths: with open(path, "rb") as f: @@ -562,7 +566,7 @@ class Archiver: def do_debug_delete_obj(self, args): """delete the objects with the given IDs from the repo""" - repository = self.open_repository(args.repository) + repository = self.open_repository(args) manifest, key = Manifest.load(repository) modified = False for hex_id in args.ids: @@ -584,7 +588,7 @@ class Archiver: def do_break_lock(self, args): """Break the repository lock (e.g. in case it was left by a dead borg.""" - repository = self.open_repository(args.repository, lock=False) + repository = self.open_repository(args, lock=False) try: repository.break_lock() Cache.break_lock(repository) @@ -673,9 +677,9 @@ class Archiver: help='show/log the return code (rc)') common_parser.add_argument('--no-files-cache', dest='cache_files', action='store_false', help='do not load/update the file metadata cache used to detect unchanged files') - common_parser.add_argument('--umask', dest='umask', type=lambda s: int(s, 8), default=RemoteRepository.umask, metavar='M', + common_parser.add_argument('--umask', dest='umask', type=lambda s: int(s, 8), default=UMASK_DEFAULT, metavar='M', help='set umask to M (local and remote, default: %(default)04o)') - common_parser.add_argument('--remote-path', dest='remote_path', default=RemoteRepository.remote_path, metavar='PATH', + common_parser.add_argument('--remote-path', dest='remote_path', default='borg', metavar='PATH', help='set remote path to executable (default: "%(default)s")') parser = argparse.ArgumentParser(prog=prog, description='Borg - Deduplicated Backups') @@ -703,7 +707,7 @@ class Archiver: description=self.do_init.__doc__, epilog=init_epilog, formatter_class=argparse.RawDescriptionHelpFormatter) subparser.set_defaults(func=self.do_init) - subparser.add_argument('repository', metavar='REPOSITORY', nargs='?', default='', + subparser.add_argument('location', metavar='REPOSITORY', nargs='?', default='', type=location_validator(archive=False), help='repository to create') subparser.add_argument('-e', '--encryption', dest='encryption', @@ -751,7 +755,7 @@ class Archiver: epilog=check_epilog, formatter_class=argparse.RawDescriptionHelpFormatter) subparser.set_defaults(func=self.do_check) - subparser.add_argument('repository', metavar='REPOSITORY_OR_ARCHIVE', nargs='?', default='', + subparser.add_argument('location', metavar='REPOSITORY_OR_ARCHIVE', nargs='?', default='', type=location_validator(), help='repository or archive to check consistency of') subparser.add_argument('--repository-only', dest='repo_only', action='store_true', @@ -779,7 +783,7 @@ class Archiver: epilog=change_passphrase_epilog, formatter_class=argparse.RawDescriptionHelpFormatter) subparser.set_defaults(func=self.do_change_passphrase) - subparser.add_argument('repository', metavar='REPOSITORY', nargs='?', default='', + subparser.add_argument('location', metavar='REPOSITORY', nargs='?', default='', type=location_validator(archive=False)) create_epilog = textwrap.dedent(""" @@ -853,7 +857,7 @@ class Archiver: subparser.add_argument('-n', '--dry-run', dest='dry_run', action='store_true', default=False, help='do not create a backup archive') - subparser.add_argument('archive', metavar='ARCHIVE', + subparser.add_argument('location', metavar='ARCHIVE', type=location_validator(archive=True), help='name of archive to create (must be also a valid directory name)') subparser.add_argument('paths', metavar='PATH', nargs='+', type=str, @@ -893,7 +897,7 @@ class Archiver: subparser.add_argument('--sparse', dest='sparse', action='store_true', default=False, help='create holes in output sparse file from all-zero chunks') - subparser.add_argument('archive', metavar='ARCHIVE', + subparser.add_argument('location', metavar='ARCHIVE', type=location_validator(archive=True), help='archive to extract') subparser.add_argument('paths', metavar='PATH', nargs='*', type=str, @@ -907,7 +911,7 @@ class Archiver: epilog=rename_epilog, formatter_class=argparse.RawDescriptionHelpFormatter) subparser.set_defaults(func=self.do_rename) - subparser.add_argument('archive', metavar='ARCHIVE', + subparser.add_argument('location', metavar='ARCHIVE', type=location_validator(archive=True), help='archive to rename') subparser.add_argument('name', metavar='NEWNAME', type=str, @@ -932,7 +936,7 @@ class Archiver: subparser.add_argument('--save-space', dest='save_space', action='store_true', default=False, help='work slower, but using less space') - subparser.add_argument('target', metavar='TARGET', nargs='?', default='', + subparser.add_argument('location', metavar='TARGET', nargs='?', default='', type=location_validator(), help='archive or repository to delete') @@ -949,7 +953,7 @@ class Archiver: help='only print file/directory names, nothing else') subparser.add_argument('-p', '--prefix', dest='prefix', type=str, help='only consider archive names starting with this prefix') - subparser.add_argument('src', metavar='REPOSITORY_OR_ARCHIVE', nargs='?', default='', + subparser.add_argument('location', metavar='REPOSITORY_OR_ARCHIVE', nargs='?', default='', type=location_validator(), help='repository/archive to list contents of') @@ -964,7 +968,7 @@ class Archiver: epilog=mount_epilog, formatter_class=argparse.RawDescriptionHelpFormatter) subparser.set_defaults(func=self.do_mount) - subparser.add_argument('src', metavar='REPOSITORY_OR_ARCHIVE', type=location_validator(), + subparser.add_argument('location', metavar='REPOSITORY_OR_ARCHIVE', type=location_validator(), help='repository/archive to mount') subparser.add_argument('mountpoint', metavar='MOUNTPOINT', type=str, help='where to mount filesystem') @@ -982,7 +986,7 @@ class Archiver: epilog=info_epilog, formatter_class=argparse.RawDescriptionHelpFormatter) subparser.set_defaults(func=self.do_info) - subparser.add_argument('archive', metavar='ARCHIVE', + subparser.add_argument('location', metavar='ARCHIVE', type=location_validator(archive=True), help='archive to display information about') @@ -996,7 +1000,7 @@ class Archiver: epilog=break_lock_epilog, formatter_class=argparse.RawDescriptionHelpFormatter) subparser.set_defaults(func=self.do_break_lock) - subparser.add_argument('repository', metavar='REPOSITORY', + subparser.add_argument('location', metavar='REPOSITORY', type=location_validator(archive=False), help='repository for which to break the locks') @@ -1052,7 +1056,7 @@ class Archiver: subparser.add_argument('--save-space', dest='save_space', action='store_true', default=False, help='work slower, but using less space') - subparser.add_argument('repository', metavar='REPOSITORY', nargs='?', default='', + subparser.add_argument('location', metavar='REPOSITORY', nargs='?', default='', type=location_validator(archive=False), help='repository to prune') @@ -1104,7 +1108,7 @@ class Archiver: default=False, action='store_true', help="""rewrite repository in place, with no chance of going back to older versions of the repository.""") - subparser.add_argument('repository', metavar='REPOSITORY', nargs='?', default='', + subparser.add_argument('location', metavar='REPOSITORY', nargs='?', default='', type=location_validator(archive=False), help='path to the repository to be upgraded') @@ -1126,7 +1130,7 @@ class Archiver: epilog=debug_dump_archive_items_epilog, formatter_class=argparse.RawDescriptionHelpFormatter) subparser.set_defaults(func=self.do_debug_dump_archive_items) - subparser.add_argument('archive', metavar='ARCHIVE', + subparser.add_argument('location', metavar='ARCHIVE', type=location_validator(archive=True), help='archive to dump') @@ -1138,7 +1142,7 @@ class Archiver: epilog=debug_get_obj_epilog, formatter_class=argparse.RawDescriptionHelpFormatter) subparser.set_defaults(func=self.do_debug_get_obj) - subparser.add_argument('repository', metavar='REPOSITORY', nargs='?', default='', + subparser.add_argument('location', metavar='REPOSITORY', nargs='?', default='', type=location_validator(archive=False), help='repository to use') subparser.add_argument('id', metavar='ID', type=str, @@ -1154,7 +1158,7 @@ class Archiver: epilog=debug_put_obj_epilog, formatter_class=argparse.RawDescriptionHelpFormatter) subparser.set_defaults(func=self.do_debug_put_obj) - subparser.add_argument('repository', metavar='REPOSITORY', nargs='?', default='', + subparser.add_argument('location', metavar='REPOSITORY', nargs='?', default='', type=location_validator(archive=False), help='repository to use') subparser.add_argument('paths', metavar='PATH', nargs='+', type=str, @@ -1168,7 +1172,7 @@ class Archiver: epilog=debug_delete_obj_epilog, formatter_class=argparse.RawDescriptionHelpFormatter) subparser.set_defaults(func=self.do_debug_delete_obj) - subparser.add_argument('repository', metavar='REPOSITORY', nargs='?', default='', + subparser.add_argument('location', metavar='REPOSITORY', nargs='?', default='', type=location_validator(archive=False), help='repository to use') subparser.add_argument('ids', metavar='IDs', nargs='+', type=str, @@ -1187,9 +1191,7 @@ class Archiver: def run(self, args): os.umask(args.umask) # early, before opening files self.lock_wait = args.lock_wait - RemoteRepository.remote_path = args.remote_path - RemoteRepository.umask = args.umask - setup_logging(level=args.log_level) # do not use loggers before this! + setup_logging(level=args.log_level, is_serve=args.func == self.do_serve) # do not use loggers before this! check_extension_modules() keys_dir = get_keys_dir() if not os.path.exists(keys_dir): @@ -1261,7 +1263,7 @@ def main(): # pragma: no cover msg += "\n%s\n%s" % (traceback.format_exc(), sysinfo()) exit_code = e.exit_code except RemoteRepository.RPCError as e: - msg = 'Remote Exception.\n%s\n%s' % (str(e), sysinfo()) + msg = '%s\n%s' % (str(e), sysinfo()) exit_code = EXIT_ERROR except Exception: msg = 'Local Exception.\n%s\n%s' % (traceback.format_exc(), sysinfo()) diff --git a/borg/helpers.py b/borg/helpers.py index df6f1163b..925dfb113 100644 --- a/borg/helpers.py +++ b/borg/helpers.py @@ -912,7 +912,8 @@ class ProgressIndicatorPercent: return self.output(pct) def output(self, percent): - print(self.msg % percent, file=self.file, end='\r' if self.same_line else '\n') + print(self.msg % percent, file=self.file, end='\r' if self.same_line else '\n') # python 3.3 gives us flush=True + self.file.flush() def finish(self): if self.same_line: diff --git a/borg/key.py b/borg/key.py index a712cd133..a39b1e318 100644 --- a/borg/key.py +++ b/borg/key.py @@ -363,7 +363,7 @@ class KeyfileKey(KeyfileKeyBase): raise KeyfileNotFoundError(self.repository._location.canonical_path(), get_keys_dir()) def get_new_target(self, args): - filename = args.repository.to_key_filename() + filename = args.location.to_key_filename() path = filename i = 1 while os.path.exists(path): diff --git a/borg/logger.py b/borg/logger.py index a40f676c7..f2350f8d7 100644 --- a/borg/logger.py +++ b/borg/logger.py @@ -52,7 +52,7 @@ def _log_warning(message, category, filename, lineno, file=None, line=None): logger.warning(msg) -def setup_logging(stream=None, conf_fname=None, env_var='BORG_LOGGING_CONF', level='info'): +def setup_logging(stream=None, conf_fname=None, env_var='BORG_LOGGING_CONF', level='info', is_serve=False): """setup logging module according to the arguments provided if conf_fname is given (or the config file name can be determined via @@ -60,6 +60,9 @@ def setup_logging(stream=None, conf_fname=None, env_var='BORG_LOGGING_CONF', lev otherwise, set up a stream handler logger on stderr (by default, if no stream is provided). + + if is_serve == True, we configure a special log format as expected by + the borg client log message interceptor. """ global configured err_msg = None @@ -84,9 +87,11 @@ def setup_logging(stream=None, conf_fname=None, env_var='BORG_LOGGING_CONF', lev # if we did not / not successfully load a logging configuration, fallback to this: logger = logging.getLogger('') handler = logging.StreamHandler(stream) - # other formatters will probably want this, but let's remove clutter on stderr - # example: - # handler.setFormatter(logging.Formatter('%(name)s: %(message)s')) + if is_serve: + fmt = '$LOG %(levelname)s Remote: %(message)s' + else: + fmt = '%(message)s' + handler.setFormatter(logging.Formatter(fmt)) logger.addHandler(handler) logger.setLevel(level.upper()) configured = True diff --git a/borg/remote.py b/borg/remote.py index f724b80d7..1fcd97c50 100644 --- a/borg/remote.py +++ b/borg/remote.py @@ -1,5 +1,6 @@ import errno import fcntl +import logging import os import select import shlex @@ -10,7 +11,7 @@ import traceback from . import __version__ -from .helpers import Error, IntegrityError +from .helpers import Error, IntegrityError, sysinfo from .repository import Repository import msgpack @@ -62,12 +63,16 @@ class RepositoryServer: # pragma: no cover def serve(self): stdin_fd = sys.stdin.fileno() stdout_fd = sys.stdout.fileno() + stderr_fd = sys.stdout.fileno() # Make stdin non-blocking fl = fcntl.fcntl(stdin_fd, fcntl.F_GETFL) fcntl.fcntl(stdin_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) # Make stdout blocking fl = fcntl.fcntl(stdout_fd, fcntl.F_GETFL) fcntl.fcntl(stdout_fd, fcntl.F_SETFL, fl & ~os.O_NONBLOCK) + # Make stderr blocking + fl = fcntl.fcntl(stderr_fd, fcntl.F_GETFL) + fcntl.fcntl(stderr_fd, fcntl.F_SETFL, fl & ~os.O_NONBLOCK) unpacker = msgpack.Unpacker(use_list=False) while True: r, w, es = select.select([stdin_fd], [], [], 10) @@ -90,7 +95,9 @@ class RepositoryServer: # pragma: no cover f = getattr(self.repository, method) res = f(*args) except BaseException as e: - exc = "Remote Traceback by Borg %s%s%s" % (__version__, os.linesep, traceback.format_exc()) + logging.exception('Borg %s: exception in RPC call:', __version__) + logging.error(sysinfo()) + exc = "Remote Exception (see remote log for the traceback)" os.write(stdout_fd, msgpack.packb((1, msgid, e.__class__.__name__, exc))) else: os.write(stdout_fd, msgpack.packb((1, msgid, None, res))) @@ -117,15 +124,12 @@ class RepositoryServer: # pragma: no cover class RemoteRepository: extra_test_args = [] - remote_path = 'borg' - # default umask, overriden by --umask, defaults to read/write only for owner - umask = 0o077 class RPCError(Exception): def __init__(self, name): self.name = name - def __init__(self, location, create=False, lock_wait=None, lock=True): + def __init__(self, location, create=False, lock_wait=None, lock=True, args=None): self.location = location self.preload_ids = [] self.msgid = 0 @@ -135,21 +139,19 @@ class RemoteRepository: self.responses = {} self.unpacker = msgpack.Unpacker(use_list=False) self.p = None - # XXX: ideally, the testsuite would subclass Repository and - # override ssh_cmd() instead of this crude hack, although - # __testsuite__ is not a valid domain name so this is pretty - # safe. - if location.host == '__testsuite__': - args = [sys.executable, '-m', 'borg.archiver', 'serve' ] + self.extra_test_args - else: # pragma: no cover - args = self.ssh_cmd(location) - self.p = Popen(args, bufsize=0, stdin=PIPE, stdout=PIPE) + testing = location.host == '__testsuite__' + borg_cmd = self.borg_cmd(args, testing) + if not testing: + borg_cmd = self.ssh_cmd(location) + borg_cmd + self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE) self.stdin_fd = self.p.stdin.fileno() self.stdout_fd = self.p.stdout.fileno() + self.stderr_fd = self.p.stderr.fileno() fcntl.fcntl(self.stdin_fd, fcntl.F_SETFL, fcntl.fcntl(self.stdin_fd, fcntl.F_GETFL) | os.O_NONBLOCK) fcntl.fcntl(self.stdout_fd, fcntl.F_SETFL, fcntl.fcntl(self.stdout_fd, fcntl.F_GETFL) | os.O_NONBLOCK) - self.r_fds = [self.stdout_fd] - self.x_fds = [self.stdin_fd, self.stdout_fd] + fcntl.fcntl(self.stderr_fd, fcntl.F_SETFL, fcntl.fcntl(self.stderr_fd, fcntl.F_GETFL) | os.O_NONBLOCK) + self.r_fds = [self.stdout_fd, self.stderr_fd] + self.x_fds = [self.stdin_fd, self.stdout_fd, self.stderr_fd] try: version = self.call('negotiate', RPC_PROTOCOL_VERSION) @@ -165,10 +167,28 @@ class RemoteRepository: def __repr__(self): return '<%s %s>' % (self.__class__.__name__, self.location.canonical_path()) - def umask_flag(self): - return ['--umask', '%03o' % self.umask] + def borg_cmd(self, args, testing): + """return a borg serve command line""" + # give some args/options to "borg serve" process as they were given to us + opts = [] + if args is not None: + opts.append('--umask=%03o' % args.umask) + root_logger = logging.getLogger() + if root_logger.isEnabledFor(logging.DEBUG): + opts.append('--debug') + elif root_logger.isEnabledFor(logging.INFO): + opts.append('--info') + elif root_logger.isEnabledFor(logging.WARNING): + pass # warning is default + else: + raise ValueError('log level missing, fix this code') + if testing: + return [sys.executable, '-m', 'borg.archiver', 'serve' ] + opts + self.extra_test_args + else: # pragma: no cover + return [args.remote_path, 'serve'] + opts def ssh_cmd(self, location): + """return a ssh command line that can be prefixed to a borg command line""" args = shlex.split(os.environ.get('BORG_RSH', 'ssh')) if location.port: args += ['-p', str(location.port)] @@ -176,8 +196,6 @@ class RemoteRepository: args.append('%s@%s' % (location.user, location.host)) else: args.append('%s' % location.host) - # use local umask also for the remote process - args += [self.remote_path, 'serve'] + self.umask_flag() return args def call(self, cmd, *args, **kw): @@ -228,19 +246,32 @@ class RemoteRepository: r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1) if x: raise Exception('FD exception occurred') - if r: - data = os.read(self.stdout_fd, BUFSIZE) - if not data: - raise ConnectionClosed() - self.unpacker.feed(data) - for unpacked in self.unpacker: - if not (isinstance(unpacked, tuple) and len(unpacked) == 4): - raise Exception("Unexpected RPC data format.") - type, msgid, error, res = unpacked - if msgid in self.ignore_responses: - self.ignore_responses.remove(msgid) - else: - self.responses[msgid] = error, res + for fd in r: + if fd is self.stdout_fd: + data = os.read(fd, BUFSIZE) + if not data: + raise ConnectionClosed() + self.unpacker.feed(data) + for unpacked in self.unpacker: + if not (isinstance(unpacked, tuple) and len(unpacked) == 4): + raise Exception("Unexpected RPC data format.") + type, msgid, error, res = unpacked + if msgid in self.ignore_responses: + self.ignore_responses.remove(msgid) + else: + self.responses[msgid] = error, res + elif fd is self.stderr_fd: + data = os.read(fd, 32768) + if not data: + raise ConnectionClosed() + data = data.decode('utf-8') + for line in data.splitlines(True): # keepends=True for py3.3+ + if line.startswith('$LOG '): + _, level, msg = line.split(' ', 2) + level = getattr(logging, level, logging.CRITICAL) # str -> int + logging.log(level, msg.rstrip()) + else: + sys.stderr.write("Remote: " + line) if w: while not self.to_send and (calls or self.preload_ids) and len(waiting_for) < 100: if calls: diff --git a/borg/testsuite/key.py b/borg/testsuite/key.py index 2f234dd8a..b2011d8f5 100644 --- a/borg/testsuite/key.py +++ b/borg/testsuite/key.py @@ -13,7 +13,7 @@ from . import BaseTestCase class KeyTestCase(BaseTestCase): class MockArgs: - repository = Location(tempfile.mkstemp()[1]) + location = Location(tempfile.mkstemp()[1]) keyfile2_key_file = """ BORG_KEY 0000000000000000000000000000000000000000000000000000000000000000 diff --git a/borg/testsuite/repository.py b/borg/testsuite/repository.py index 4094df407..7027cb59a 100644 --- a/borg/testsuite/repository.py +++ b/borg/testsuite/repository.py @@ -1,5 +1,6 @@ import os import shutil +import sys import tempfile from mock import patch @@ -326,13 +327,24 @@ class RemoteRepositoryTestCase(RepositoryTestCase): self.assert_raises(InvalidRPCMethod, lambda: self.repository.call('__init__', None)) def test_ssh_cmd(self): - assert self.repository.umask is not None - assert self.repository.ssh_cmd(Location('example.com:foo')) == ['ssh', 'example.com', 'borg', 'serve'] + self.repository.umask_flag() - assert self.repository.ssh_cmd(Location('ssh://example.com/foo')) == ['ssh', 'example.com', 'borg', 'serve'] + self.repository.umask_flag() - assert self.repository.ssh_cmd(Location('ssh://user@example.com/foo')) == ['ssh', 'user@example.com', 'borg', 'serve'] + self.repository.umask_flag() - assert self.repository.ssh_cmd(Location('ssh://user@example.com:1234/foo')) == ['ssh', '-p', '1234', 'user@example.com', 'borg', 'serve'] + self.repository.umask_flag() + assert self.repository.ssh_cmd(Location('example.com:foo')) == ['ssh', 'example.com'] + assert self.repository.ssh_cmd(Location('ssh://example.com/foo')) == ['ssh', 'example.com'] + assert self.repository.ssh_cmd(Location('ssh://user@example.com/foo')) == ['ssh', 'user@example.com'] + assert self.repository.ssh_cmd(Location('ssh://user@example.com:1234/foo')) == ['ssh', '-p', '1234', 'user@example.com'] os.environ['BORG_RSH'] = 'ssh --foo' - assert self.repository.ssh_cmd(Location('example.com:foo')) == ['ssh', '--foo', 'example.com', 'borg', 'serve'] + self.repository.umask_flag() + assert self.repository.ssh_cmd(Location('example.com:foo')) == ['ssh', '--foo', 'example.com'] + + def test_borg_cmd(self): + class MockArgs: + remote_path = 'borg' + umask = 0o077 + + assert self.repository.borg_cmd(None, testing=True) == [sys.executable, '-m', 'borg.archiver', 'serve' ] + args = MockArgs() + # note: test logger is on info log level, so --info gets added automagically + assert self.repository.borg_cmd(args, testing=False) == ['borg', 'serve', '--umask=077', '--info'] + args.remote_path = 'borg-0.28.2' + assert self.repository.borg_cmd(args, testing=False) == ['borg-0.28.2', 'serve', '--umask=077', '--info'] class RemoteRepositoryCheckTestCase(RepositoryCheckTestCase): diff --git a/borg/testsuite/upgrader.py b/borg/testsuite/upgrader.py index 3d0459126..9a1f823f9 100644 --- a/borg/testsuite/upgrader.py +++ b/borg/testsuite/upgrader.py @@ -12,7 +12,7 @@ except ImportError: from ..upgrader import AtticRepositoryUpgrader, AtticKeyfileKey from ..helpers import get_keys_dir from ..key import KeyfileKey -from ..remote import RemoteRepository +from ..archiver import UMASK_DEFAULT from ..repository import Repository @@ -169,7 +169,7 @@ def test_convert_all(tmpdir, attic_repo, attic_key_file, inplace): orig_inode = first_inode(attic_repo.path) repo = AtticRepositoryUpgrader(str(tmpdir), create=False) # replicate command dispatch, partly - os.umask(RemoteRepository.umask) + os.umask(UMASK_DEFAULT) backup = repo.upgrade(dryrun=False, inplace=inplace) if inplace: assert backup is None @@ -179,7 +179,7 @@ def test_convert_all(tmpdir, attic_repo, attic_key_file, inplace): assert first_inode(repo.path) != first_inode(backup) # i have seen cases where the copied tree has world-readable # permissions, which is wrong - assert stat_segment(backup).st_mode & 0o007 == 0 + assert stat_segment(backup).st_mode & UMASK_DEFAULT == 0 assert key_valid(attic_key_file.path) assert repo_valid(tmpdir)