diff --git a/src/borg/archive.py b/src/borg/archive.py index 50c90721d..1e0b8df0c 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1392,6 +1392,60 @@ class FilesystemObjectProcessors: return status +class TarfileObjectProcessors: + def __init__(self, *, cache, key, + add_item, process_file_chunks, + chunker_params, show_progress, + log_json, iec, file_status_printer=None): + self.cache = cache + self.key = key + self.add_item = add_item + self.process_file_chunks = process_file_chunks + self.show_progress = show_progress + self.print_file_status = file_status_printer or (lambda *args: None) + + self.stats = Statistics(output_json=log_json, iec=iec) # threading: done by cache (including progress) + self.chunker = get_chunker(*chunker_params, seed=key.chunk_seed, sparse=False) + + @contextmanager + def create_helper(self, tarinfo, status=None, type=None): + item = Item(path=make_path_safe(tarinfo.name), mode=tarinfo.mode | type, + uid=tarinfo.uid, gid=tarinfo.gid, user=tarinfo.uname, group=tarinfo.gname, + mtime=tarinfo.mtime * 1000**3) + yield item, status + # if we get here, "with"-block worked ok without error/exception, the item was processed ok... + self.add_item(item, stats=self.stats) + + def process_dir(self, *, tarinfo, status, type): + with self.create_helper(tarinfo, status, type) as (item, status): + return status + + def process_fifo(self, *, tarinfo, status, type): + with self.create_helper(tarinfo, status, type) as (item, status): + return status + + def process_dev(self, *, tarinfo, status, type): + with self.create_helper(tarinfo, status, type) as (item, status): + item.rdev = os.makedev(tarinfo.devmajor, tarinfo.devminor) + return status + + def process_link(self, *, tarinfo, status, type): + with self.create_helper(tarinfo, status, type) as (item, status): + item.source = tarinfo.linkname + return status + + def process_file(self, *, tarinfo, status, type, tar): + with self.create_helper(tarinfo, status, type) as (item, status): + self.print_file_status(status, tarinfo.name) + status = None # we already printed the status + fd = tar.extractfile(tarinfo) + self.process_file_chunks(item, self.cache, self.stats, self.show_progress, + backup_io_iter(self.chunker.chunkify(fd))) + item.get_size(memorize=True) + self.stats.nfiles += 1 + return status + + def valid_msgpacked_dict(d, keys_serialized): """check if the data looks like a msgpacked dict""" d_len = len(d) diff --git a/src/borg/archiver.py b/src/borg/archiver.py index d52e0e5cf..d69e0e454 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -39,7 +39,7 @@ try: from .algorithms.checksums import crc32 from .archive import Archive, ArchiveChecker, ArchiveRecreater, Statistics, is_special from .archive import BackupError, BackupOSError, backup_io, OsOpen, stat_update_check - from .archive import FilesystemObjectProcessors, MetadataCollector, ChunksProcessor + from .archive import FilesystemObjectProcessors, TarfileObjectProcessors, MetadataCollector, ChunksProcessor from .archive import has_link from .cache import Cache, assert_secure, SecurityManager from .constants import * # NOQA @@ -68,13 +68,14 @@ try: from .helpers import basic_json_data, json_print from .helpers import replace_placeholders from .helpers import ChunkIteratorFileWrapper - from .helpers import popen_with_error_handling, prepare_subprocess_env + from .helpers import popen_with_error_handling, prepare_subprocess_env, create_filter_process from .helpers import dash_open from .helpers import umount from .helpers import flags_root, flags_dir, flags_special_follow, flags_special from .helpers import msgpack from .helpers import sig_int from .helpers import iter_separated + from .helpers import get_tar_filter from .nanorst import rst_to_terminal from .patterns import ArgparsePatternAction, ArgparseExcludeFileAction, ArgparsePatternFileAction, parse_exclude_pattern from .patterns import PatternMatcher @@ -951,67 +952,17 @@ class Archiver: # that it has to be installed -- hardly a problem, considering that # the decompressor must be installed as well to make use of the exported tarball! - filter = None - if args.tar_filter == 'auto': - # Note that filter remains None if tarfile is '-'. - if args.tarfile.endswith('.tar.gz'): - filter = 'gzip' - elif args.tarfile.endswith('.tar.bz2'): - filter = 'bzip2' - elif args.tarfile.endswith('.tar.xz'): - filter = 'xz' - logger.debug('Automatically determined tar filter: %s', filter) - else: - filter = args.tar_filter + filter = get_tar_filter(args.tarfile, decompress=False) if args.tar_filter == 'auto' else args.tar_filter tarstream = dash_open(args.tarfile, 'wb') tarstream_close = args.tarfile != '-' - if filter: - # When we put a filter between us and the final destination, - # the selected output (tarstream until now) becomes the output of the filter (=filterout). - # The decision whether to close that or not remains the same. - filterout = tarstream - filterout_close = tarstream_close - env = prepare_subprocess_env(system=True) - # There is no deadlock potential here (the subprocess docs warn about this), because - # communication with the process is a one-way road, i.e. the process can never block - # for us to do something while we block on the process for something different. - filterproc = popen_with_error_handling(filter, stdin=subprocess.PIPE, stdout=filterout, - log_prefix='--tar-filter: ', env=env) - if not filterproc: - return EXIT_ERROR - # Always close the pipe, otherwise the filter process would not notice when we are done. - tarstream = filterproc.stdin - tarstream_close = True - - # The | (pipe) symbol instructs tarfile to use a streaming mode of operation - # where it never seeks on the passed fileobj. - tar = tarfile.open(fileobj=tarstream, mode='w|', format=tarfile.GNU_FORMAT) - - self._export_tar(args, archive, tar) - - # This does not close the fileobj (tarstream) we passed to it -- a side effect of the | mode. - tar.close() - - if tarstream_close: - tarstream.close() - - if filter: - logger.debug('Done creating tar, waiting for filter to die...') - rc = filterproc.wait() - if rc: - logger.error('--tar-filter exited with code %d, output file is likely unusable!', rc) - self.exit_code = EXIT_ERROR - else: - logger.debug('filter exited with code %d', rc) - - if filterout_close: - filterout.close() + with create_filter_process(filter, stream=tarstream, stream_close=tarstream_close, inbound=False) as _stream: + self._export_tar(args, archive, _stream) return self.exit_code - def _export_tar(self, args, archive, tar): + def _export_tar(self, args, archive, tarstream): matcher = self.build_matcher(args.patterns, args.paths) progress = args.progress @@ -1027,6 +978,10 @@ class Archiver: filter = self.build_filter(matcher, peek_and_store_hardlink_masters, strip_components) + # The | (pipe) symbol instructs tarfile to use a streaming mode of operation + # where it never seeks on the passed fileobj. + tar = tarfile.open(fileobj=tarstream, mode='w|', format=tarfile.GNU_FORMAT) + if progress: pi = ProgressIndicatorPercent(msg='%5.1f%% Processing: %s', step=0.1, msgid='extract') pi.output('Calculating size') @@ -1138,6 +1093,9 @@ class Archiver: if pi: pi.finish() + # This does not close the fileobj (tarstream) we passed to it -- a side effect of the | mode. + tar.close() + for pattern in matcher.get_unmatched_include_patterns(): self.print_warning("Include pattern '%s' never matched.", pattern) return self.exit_code @@ -1700,6 +1658,90 @@ class Archiver: cache.commit() return self.exit_code + @with_repository(cache=True, exclusive=True, compatibility=(Manifest.Operation.WRITE,)) + def do_import_tar(self, args, repository, manifest, key, cache): + self.output_filter = args.output_filter + self.output_list = args.output_list + + filter = get_tar_filter(args.tarfile, decompress=True) if args.tar_filter == 'auto' else args.tar_filter + + tarstream = dash_open(args.tarfile, 'rb') + tarstream_close = args.tarfile != '-' + + with create_filter_process(filter, stream=tarstream, stream_close=tarstream_close, inbound=True) as _stream: + self._import_tar(args, repository, manifest, key, cache, _stream) + + return self.exit_code + + def _import_tar(self, args, repository, manifest, key, cache, tarstream): + t0 = datetime.utcnow() + t0_monotonic = time.monotonic() + + archive = Archive(repository, key, manifest, args.location.archive, cache=cache, + create=True, checkpoint_interval=args.checkpoint_interval, + progress=args.progress, + chunker_params=args.chunker_params, start=t0, start_monotonic=t0_monotonic, + log_json=args.log_json) + cp = ChunksProcessor(cache=cache, key=key, + add_item=archive.add_item, write_checkpoint=archive.write_checkpoint, + checkpoint_interval=args.checkpoint_interval, rechunkify=False) + tfo = TarfileObjectProcessors(cache=cache, key=key, + process_file_chunks=cp.process_file_chunks, add_item=archive.add_item, + chunker_params=args.chunker_params, show_progress=args.progress, + log_json=args.log_json, iec=args.iec, + file_status_printer=self.print_file_status) + + tar = tarfile.open(fileobj=tarstream, mode='r|') + + while True: + tarinfo = tar.next() + if not tarinfo: + break + if tarinfo.isreg(): + status = tfo.process_file(tarinfo=tarinfo, status='A', type=stat.S_IFREG, tar=tar) + archive.stats.nfiles += 1 + elif tarinfo.isdir(): + status = tfo.process_dir(tarinfo=tarinfo, status='d', type=stat.S_IFDIR) + elif tarinfo.issym(): + status = tfo.process_link(tarinfo=tarinfo, status='s', type=stat.S_IFLNK) + elif tarinfo.islnk(): + # tar uses the same hardlink model as borg (rather vice versa); the first instance of a hardlink + # is stored as a regular file, later instances are special entries referencing back to the + # first instance. + status = tfo.process_link(tarinfo=tarinfo, status='h', type=stat.S_IFREG) + elif tarinfo.isblk(): + status = tfo.process_dev(tarinfo=tarinfo, status='b', type=stat.S_IFBLK) + elif tarinfo.ischr(): + status = tfo.process_dev(tarinfo=tarinfo, status='c', type=stat.S_IFCHR) + elif tarinfo.isfifo(): + status = tfo.process_fifo(tarinfo=tarinfo, status='f', type=stat.S_IFIFO) + else: + status = 'E' + self.print_warning('%s: Unsupported tarinfo type %s', tarinfo.name, tarinfo.type) + self.print_file_status(status, tarinfo.name) + + # This does not close the fileobj (tarstream) we passed to it -- a side effect of the | mode. + tar.close() + + if args.progress: + archive.stats.show_progress(final=True) + archive.stats += tfo.stats + archive.save(comment=args.comment, timestamp=args.timestamp) + args.stats |= args.json + if args.stats: + if args.json: + json_print(basic_json_data(archive.manifest, cache=archive.cache, extra={ + 'archive': archive, + })) + else: + log_multi(DASHES, + str(archive), + DASHES, + STATS_HEADER, + str(archive.stats), + str(archive.cache), + DASHES, logger=logging.getLogger('borg.output.stats')) + @with_repository(manifest=False, exclusive=True) def do_with_lock(self, args, repository): """run a user specified command with the repository lock held""" @@ -3749,11 +3791,13 @@ class Archiver: based on its file extension and pipe the tarball through an appropriate filter before writing it to FILE: - - .tar.gz: gzip - - .tar.bz2: bzip2 - - .tar.xz: xz + - .tar.gz or .tgz: gzip + - .tar.bz2 or .tbz: bzip2 + - .tar.xz or .txz: xz + - .tar.zstd: zstd + - .tar.lz4: lz4 - Alternatively a ``--tar-filter`` program may be explicitly specified. It should + Alternatively, a ``--tar-filter`` program may be explicitly specified. It should read the uncompressed tar stream from stdin and write a compressed/filtered tar stream to stdout. @@ -4612,6 +4656,83 @@ class Archiver: subparser.add_argument('args', metavar='ARGS', nargs=argparse.REMAINDER, help='command arguments') + # borg import-tar + import_tar_epilog = process_epilog(""" + This command creates a backup archive from a tarball. + + When giving '-' as path, Borg will read a tar stream from standard input. + + By default (--tar-filter=auto) Borg will detect whether the file is compressed + based on its file extension and pipe the file through an appropriate filter: + + - .tar.gz or .tgz: gzip -d + - .tar.bz2 or .tbz: bzip2 -d + - .tar.xz or .txz: xz -d + - .tar.zstd: zstd -d + - .tar.lz4: lz4 -d + + Alternatively, a --tar-filter program may be explicitly specified. It should + read compressed data from stdin and output an uncompressed tar stream on + stdout. + + Most documentation of borg create applies. Note that this command does not + support excluding files. + + import-tar is a lossy conversion: + BSD flags, ACLs, extended attributes (xattrs), atime and ctime are not exported. + Timestamp resolution is limited to whole seconds, not the nanosecond resolution + otherwise supported by Borg. + + A ``--sparse`` option (as found in borg create) is not supported. + + import-tar reads POSIX.1-1988 (ustar), POSIX.1-2001 (pax), GNU tar, UNIX V7 tar + and SunOS tar with extended attributes. + """) + subparser = subparsers.add_parser('import-tar', parents=[common_parser], add_help=False, + description=self.do_import_tar.__doc__, + epilog=import_tar_epilog, + formatter_class=argparse.RawDescriptionHelpFormatter, + help=self.do_import_tar.__doc__) + subparser.set_defaults(func=self.do_import_tar) + subparser.add_argument('--tar-filter', dest='tar_filter', default='auto', + help='filter program to pipe data through') + subparser.add_argument('-s', '--stats', dest='stats', + action='store_true', default=False, + help='print statistics for the created archive') + subparser.add_argument('--list', dest='output_list', + action='store_true', default=False, + help='output verbose list of items (files, dirs, ...)') + subparser.add_argument('--filter', dest='output_filter', metavar='STATUSCHARS', + help='only display items with the given status characters') + subparser.add_argument('--json', action='store_true', + help='output stats as JSON (implies --stats)') + + archive_group = subparser.add_argument_group('Archive options') + archive_group.add_argument('--comment', dest='comment', metavar='COMMENT', default='', + help='add a comment text to the archive') + archive_group.add_argument('--timestamp', dest='timestamp', + type=timestamp, default=None, + metavar='TIMESTAMP', + help='manually specify the archive creation date/time (UTC, yyyy-mm-ddThh:mm:ss format). ' + 'alternatively, give a reference file/directory.') + archive_group.add_argument('-c', '--checkpoint-interval', dest='checkpoint_interval', + type=int, default=1800, metavar='SECONDS', + help='write checkpoint every SECONDS seconds (Default: 1800)') + archive_group.add_argument('--chunker-params', dest='chunker_params', + type=ChunkerParams, default=CHUNKER_PARAMS, + metavar='PARAMS', + help='specify the chunker parameters (ALGO, CHUNK_MIN_EXP, CHUNK_MAX_EXP, ' + 'HASH_MASK_BITS, HASH_WINDOW_SIZE). default: %s,%d,%d,%d,%d' % CHUNKER_PARAMS) + archive_group.add_argument('-C', '--compression', metavar='COMPRESSION', dest='compression', + type=CompressionSpec, default=CompressionSpec('lz4'), + help='select compression algorithm, see the output of the ' + '"borg help compression" command for details.') + + subparser.add_argument('location', metavar='ARCHIVE', + type=location_validator(archive=True), + help='name of archive to create (must be also a valid directory name)') + subparser.add_argument('tarfile', metavar='TARFILE', + help='input tar file. "-" to read from stdin instead.') return parser def get_args(self, argv, cmd): diff --git a/src/borg/helpers/misc.py b/src/borg/helpers/misc.py index ea8655ab5..5ec6fca6e 100644 --- a/src/borg/helpers/misc.py +++ b/src/borg/helpers/misc.py @@ -238,3 +238,21 @@ def iter_separated(fd, sep=None, read_size=4096): # or if there was no data before EOF if len(part) > 0: yield part + + +def get_tar_filter(fname, decompress): + # Note that filter is None if fname is '-'. + if fname.endswith(('.tar.gz', '.tgz')): + filter = 'gzip -d' if decompress else 'gzip' + elif fname.endswith(('.tar.bz2', '.tbz')): + filter = 'bzip2 -d' if decompress else 'bzip2' + elif fname.endswith(('.tar.xz', '.txz')): + filter = 'xz -d' if decompress else 'xz' + elif fname.endswith(('.tar.lz4', )): + filter = 'lz4 -d' if decompress else 'lz4' + elif fname.endswith(('.tar.zstd', )): + filter = 'zstd -d' if decompress else 'zstd' + else: + filter = None + logger.debug('Automatically determined tar filter: %s', filter) + return filter diff --git a/src/borg/helpers/process.py b/src/borg/helpers/process.py index 2e61ad0a7..8847c09af 100644 --- a/src/borg/helpers/process.py +++ b/src/borg/helpers/process.py @@ -15,7 +15,7 @@ from ..platformflags import is_win32, is_linux, is_freebsd, is_darwin from ..logger import create_logger logger = create_logger() -from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_SIGNAL_BASE +from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_SIGNAL_BASE, Error @contextlib.contextmanager @@ -300,3 +300,45 @@ def prepare_subprocess_env(system, env=None): # for information, give borg version to the subprocess env['BORG_VERSION'] = __version__ return env + + +@contextlib.contextmanager +def create_filter_process(cmd, stream, stream_close, inbound=True): + if cmd: + # put a filter process between stream and us (e.g. a [de]compression command) + # inbound: --> filter --> us + # outbound: us --> filter --> + filter_stream = stream + filter_stream_close = stream_close + env = prepare_subprocess_env(system=True) + # There is no deadlock potential here (the subprocess docs warn about this), because + # communication with the process is a one-way road, i.e. the process can never block + # for us to do something while we block on the process for something different. + if inbound: + proc = popen_with_error_handling(cmd, stdout=subprocess.PIPE, stdin=filter_stream, + log_prefix='filter-process: ', env=env) + else: + proc = popen_with_error_handling(cmd, stdin=subprocess.PIPE, stdout=filter_stream, + log_prefix='filter-process: ', env=env) + if not proc: + raise Error('filter %s: process creation failed' % (cmd, )) + stream = proc.stdout if inbound else proc.stdin + # inbound: do not close the pipe (this is the task of the filter process [== writer]) + # outbound: close the pipe, otherwise the filter process would not notice when we are done. + stream_close = not inbound + + try: + yield stream + + finally: + if stream_close: + stream.close() + + if cmd: + logger.debug('Done, waiting for filter to die...') + rc = proc.wait() + logger.debug('filter cmd exited with code %d', rc) + if filter_stream_close: + filter_stream.close() + if rc: + raise Error('filter %s failed, rc=%d' % (cmd, rc)) diff --git a/src/borg/testsuite/archiver.py b/src/borg/testsuite/archiver.py index c0d4d06dc..409ca2cf7 100644 --- a/src/borg/testsuite/archiver.py +++ b/src/borg/testsuite/archiver.py @@ -3413,6 +3413,31 @@ id: 2 / e29442 3506da 4e1ea7 / 25f62a 5a3d41 - 02 assert os.stat('input/dir1/aaaa').st_nlink == 2 assert os.stat('input/dir1/source2').st_nlink == 2 + def test_import_tar(self): + self.create_test_files() + os.unlink('input/flagfile') + self.cmd('init', '--encryption=none', self.repository_location) + self.cmd('create', self.repository_location + '::src', 'input') + self.cmd('export-tar', self.repository_location + '::src', 'simple.tar') + self.cmd('import-tar', self.repository_location + '::dst', 'simple.tar') + with changedir(self.output_path): + self.cmd('extract', self.repository_location + '::dst') + self.assert_dirs_equal('input', 'output/input', ignore_ns=True, ignore_xattrs=True) + + @requires_gzip + def test_import_tar_gz(self): + if not shutil.which('gzip'): + pytest.skip('gzip is not installed') + self.create_test_files() + os.unlink('input/flagfile') + self.cmd('init', '--encryption=none', self.repository_location) + self.cmd('create', self.repository_location + '::src', 'input') + self.cmd('export-tar', self.repository_location + '::src', 'simple.tgz') + self.cmd('import-tar', self.repository_location + '::dst', 'simple.tgz') + with changedir(self.output_path): + self.cmd('extract', self.repository_location + '::dst') + self.assert_dirs_equal('input', 'output/input', ignore_ns=True, ignore_xattrs=True) + def test_detect_attic_repo(self): path = make_attic_repo(self.repository_path) cmds = [