mirror of
https://github.com/borgbackup/borg.git
synced 2025-02-20 21:27:32 +00:00
Merge pull request #2523 from enkore/f/tarexport-tests
expor-tar: more tests
This commit is contained in:
commit
3db27f950e
4 changed files with 115 additions and 10 deletions
|
@ -63,6 +63,7 @@
|
|||
from .helpers import basic_json_data, json_print
|
||||
from .helpers import replace_placeholders
|
||||
from .helpers import ChunkIteratorFileWrapper
|
||||
from .helpers import popen_with_error_handling
|
||||
from .patterns import ArgparsePatternAction, ArgparseExcludeFileAction, ArgparsePatternFileAction, parse_exclude_pattern
|
||||
from .patterns import PatternMatcher
|
||||
from .item import Item
|
||||
|
@ -747,9 +748,9 @@ def do_export_tar(self, args, repository, manifest, key, archive):
|
|||
# There is no deadlock potential here (the subprocess docs warn about this), because
|
||||
# communication with the process is a one-way road, i.e. the process can never block
|
||||
# for us to do something while we block on the process for something different.
|
||||
filtercmd = shlex.split(filter)
|
||||
logger.debug('--tar-filter command line: %s', filtercmd)
|
||||
filterproc = subprocess.Popen(filtercmd, stdin=subprocess.PIPE, stdout=filterout)
|
||||
filterproc = popen_with_error_handling(filter, stdin=subprocess.PIPE, stdout=filterout, log_prefix='--tar-filter: ')
|
||||
if not filterproc:
|
||||
return EXIT_ERROR
|
||||
# Always close the pipe, otherwise the filter process would not notice when we are done.
|
||||
tarstream = filterproc.stdin
|
||||
tarstream_close = True
|
||||
|
@ -771,7 +772,7 @@ def do_export_tar(self, args, repository, manifest, key, archive):
|
|||
rc = filterproc.wait()
|
||||
if rc:
|
||||
logger.error('--tar-filter exited with code %d, output file is likely unusable!', rc)
|
||||
self.exit_code = set_ec(EXIT_ERROR)
|
||||
self.exit_code = EXIT_ERROR
|
||||
else:
|
||||
logger.debug('filter exited with code %d', rc)
|
||||
|
||||
|
|
|
@ -11,9 +11,11 @@
|
|||
import platform
|
||||
import pwd
|
||||
import re
|
||||
import shlex
|
||||
import signal
|
||||
import socket
|
||||
import stat
|
||||
import subprocess
|
||||
import sys
|
||||
import textwrap
|
||||
import threading
|
||||
|
@ -1962,3 +1964,34 @@ def secure_erase(path):
|
|||
fd.flush()
|
||||
os.fsync(fd.fileno())
|
||||
os.unlink(path)
|
||||
|
||||
|
||||
def popen_with_error_handling(cmd_line: str, log_prefix='', **kwargs):
|
||||
"""
|
||||
Handle typical errors raised by subprocess.Popen. Return None if an error occurred,
|
||||
otherwise return the Popen object.
|
||||
|
||||
*cmd_line* is split using shlex (e.g. 'gzip -9' => ['gzip', '-9']).
|
||||
|
||||
Log messages will be prefixed with *log_prefix*; if set, it should end with a space
|
||||
(e.g. log_prefix='--some-option: ').
|
||||
|
||||
Does not change the exit code.
|
||||
"""
|
||||
assert not kwargs.get('shell'), 'Sorry pal, shell mode is a no-no'
|
||||
try:
|
||||
command = shlex.split(cmd_line)
|
||||
if not command:
|
||||
raise ValueError('an empty command line is not permitted')
|
||||
except ValueError as ve:
|
||||
logger.error('%s%s', log_prefix, ve)
|
||||
return
|
||||
logger.debug('%scommand line: %s', log_prefix, command)
|
||||
try:
|
||||
return subprocess.Popen(command, **kwargs)
|
||||
except FileNotFoundError:
|
||||
logger.error('%sexecutable not found: %s', log_prefix, command[0])
|
||||
return
|
||||
except PermissionError:
|
||||
logger.error('%spermission denied: %s', log_prefix, command[0])
|
||||
return
|
||||
|
|
|
@ -728,7 +728,9 @@ def _extract_hardlinks_setup(self):
|
|||
self.cmd('init', '--encryption=repokey', self.repository_location)
|
||||
self.cmd('create', self.repository_location + '::test', 'input')
|
||||
|
||||
@pytest.mark.skipif(not are_hardlinks_supported(), reason='hardlinks not supported')
|
||||
requires_hardlinks = pytest.mark.skipif(not are_hardlinks_supported(), reason='hardlinks not supported')
|
||||
|
||||
@requires_hardlinks
|
||||
def test_strip_components_links(self):
|
||||
self._extract_hardlinks_setup()
|
||||
with changedir('output'):
|
||||
|
@ -741,7 +743,7 @@ def test_strip_components_links(self):
|
|||
self.cmd('extract', self.repository_location + '::test')
|
||||
assert os.stat('input/dir1/hardlink').st_nlink == 4
|
||||
|
||||
@pytest.mark.skipif(not are_hardlinks_supported(), reason='hardlinks not supported')
|
||||
@requires_hardlinks
|
||||
def test_extract_hardlinks(self):
|
||||
self._extract_hardlinks_setup()
|
||||
with changedir('output'):
|
||||
|
@ -2386,10 +2388,10 @@ def test_export_tar(self):
|
|||
os.unlink('input/flagfile')
|
||||
self.cmd('init', '--encryption=repokey', self.repository_location)
|
||||
self.cmd('create', self.repository_location + '::test', 'input')
|
||||
self.cmd('export-tar', self.repository_location + '::test', 'simple.tar')
|
||||
self.cmd('export-tar', self.repository_location + '::test', 'simple.tar', '--progress')
|
||||
with changedir('output'):
|
||||
# This probably assumes GNU tar. Note -p switch to extract permissions regardless of umask.
|
||||
subprocess.check_output(['tar', 'xpf', '../simple.tar'])
|
||||
subprocess.check_call(['tar', 'xpf', '../simple.tar'])
|
||||
self.assert_dirs_equal('input', 'output/input', ignore_bsdflags=True, ignore_xattrs=True, ignore_ns=True)
|
||||
|
||||
@requires_gnutar
|
||||
|
@ -2401,11 +2403,53 @@ def test_export_tar_gz(self):
|
|||
os.unlink('input/flagfile')
|
||||
self.cmd('init', '--encryption=repokey', self.repository_location)
|
||||
self.cmd('create', self.repository_location + '::test', 'input')
|
||||
self.cmd('export-tar', self.repository_location + '::test', 'simple.tar.gz')
|
||||
list = self.cmd('export-tar', self.repository_location + '::test', 'simple.tar.gz', '--list')
|
||||
assert 'input/file1\n' in list
|
||||
assert 'input/dir2\n' in list
|
||||
with changedir('output'):
|
||||
subprocess.check_output(['tar', 'xpf', '../simple.tar.gz'])
|
||||
subprocess.check_call(['tar', 'xpf', '../simple.tar.gz'])
|
||||
self.assert_dirs_equal('input', 'output/input', ignore_bsdflags=True, ignore_xattrs=True, ignore_ns=True)
|
||||
|
||||
@requires_gnutar
|
||||
def test_export_tar_strip_components(self):
|
||||
if not shutil.which('gzip'):
|
||||
pytest.skip('gzip is not installed')
|
||||
self.create_test_files()
|
||||
os.unlink('input/flagfile')
|
||||
self.cmd('init', '--encryption=repokey', self.repository_location)
|
||||
self.cmd('create', self.repository_location + '::test', 'input')
|
||||
list = self.cmd('export-tar', self.repository_location + '::test', 'simple.tar', '--strip-components=1', '--list')
|
||||
# --list's path are those before processing with --strip-components
|
||||
assert 'input/file1\n' in list
|
||||
assert 'input/dir2\n' in list
|
||||
with changedir('output'):
|
||||
subprocess.check_call(['tar', 'xpf', '../simple.tar'])
|
||||
self.assert_dirs_equal('input', 'output/', ignore_bsdflags=True, ignore_xattrs=True, ignore_ns=True)
|
||||
|
||||
@requires_hardlinks
|
||||
@requires_gnutar
|
||||
def test_export_tar_strip_components_links(self):
|
||||
self._extract_hardlinks_setup()
|
||||
self.cmd('export-tar', self.repository_location + '::test', 'output.tar', '--strip-components=2')
|
||||
with changedir('output'):
|
||||
subprocess.check_call(['tar', 'xpf', '../output.tar'])
|
||||
assert os.stat('hardlink').st_nlink == 2
|
||||
assert os.stat('subdir/hardlink').st_nlink == 2
|
||||
assert os.stat('aaaa').st_nlink == 2
|
||||
assert os.stat('source2').st_nlink == 2
|
||||
|
||||
@requires_hardlinks
|
||||
@requires_gnutar
|
||||
def test_extract_hardlinks(self):
|
||||
self._extract_hardlinks_setup()
|
||||
self.cmd('export-tar', self.repository_location + '::test', 'output.tar', 'input/dir1')
|
||||
with changedir('output'):
|
||||
subprocess.check_call(['tar', 'xpf', '../output.tar'])
|
||||
assert os.stat('input/dir1/hardlink').st_nlink == 2
|
||||
assert os.stat('input/dir1/subdir/hardlink').st_nlink == 2
|
||||
assert os.stat('input/dir1/aaaa').st_nlink == 2
|
||||
assert os.stat('input/dir1/source2').st_nlink == 2
|
||||
|
||||
|
||||
@unittest.skipUnless('binary' in BORG_EXES, 'no borg.exe available')
|
||||
class ArchiverTestCaseBinary(ArchiverTestCase):
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
import hashlib
|
||||
import io
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from time import mktime, strptime, sleep
|
||||
|
@ -26,6 +27,7 @@
|
|||
from ..helpers import swidth_slice
|
||||
from ..helpers import chunkit
|
||||
from ..helpers import safe_ns, safe_s, SUPPORT_32BIT_PLATFORMS
|
||||
from ..helpers import popen_with_error_handling
|
||||
|
||||
from . import BaseTestCase, FakeInputs
|
||||
|
||||
|
@ -816,3 +818,28 @@ def test_safe_timestamps():
|
|||
datetime.utcfromtimestamp(beyond_y10k)
|
||||
assert datetime.utcfromtimestamp(safe_s(beyond_y10k)) > datetime(2262, 1, 1)
|
||||
assert datetime.utcfromtimestamp(safe_ns(beyond_y10k) / 1000000000) > datetime(2262, 1, 1)
|
||||
|
||||
|
||||
class TestPopenWithErrorHandling:
|
||||
@pytest.mark.skipif(not shutil.which('test'), reason='"test" binary is needed')
|
||||
def test_simple(self):
|
||||
proc = popen_with_error_handling('test 1')
|
||||
assert proc.wait() == 0
|
||||
|
||||
@pytest.mark.skipif(shutil.which('borg-foobar-test-notexist'), reason='"borg-foobar-test-notexist" binary exists (somehow?)')
|
||||
def test_not_found(self):
|
||||
proc = popen_with_error_handling('borg-foobar-test-notexist 1234')
|
||||
assert proc is None
|
||||
|
||||
@pytest.mark.parametrize('cmd', (
|
||||
'mismatched "quote',
|
||||
'foo --bar="baz',
|
||||
''
|
||||
))
|
||||
def test_bad_syntax(self, cmd):
|
||||
proc = popen_with_error_handling(cmd)
|
||||
assert proc is None
|
||||
|
||||
def test_shell(self):
|
||||
with pytest.raises(AssertionError):
|
||||
popen_with_error_handling('', shell=True)
|
||||
|
|
Loading…
Reference in a new issue