archiver: add test for "create -" and "extract --stdout"

This commit is contained in:
Marian Beermann 2017-06-27 16:06:01 +02:00
parent 4af1142693
commit f037e2b64f
1 changed files with 26 additions and 6 deletions

View File

@ -19,7 +19,7 @@ from configparser import ConfigParser
from datetime import datetime
from datetime import timedelta
from hashlib import sha256
from io import StringIO
from io import BytesIO, StringIO
from unittest.mock import patch
import msgpack
@ -61,7 +61,7 @@ from . import key
src_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
def exec_cmd(*args, archiver=None, fork=False, exe=None, input=b'', **kw):
def exec_cmd(*args, archiver=None, fork=False, exe=None, input=b'', binary_output=False, **kw):
if fork:
try:
if exe is None:
@ -78,12 +78,18 @@ def exec_cmd(*args, archiver=None, fork=False, exe=None, input=b'', **kw):
except SystemExit as e: # possibly raised by argparse
output = ''
ret = e.code
return ret, os.fsdecode(output)
if binary_output:
return ret, output
else:
return ret, os.fsdecode(output)
else:
stdin, stdout, stderr = sys.stdin, sys.stdout, sys.stderr
try:
sys.stdin = StringIO(input.decode())
sys.stdout = sys.stderr = output = StringIO()
sys.stdin.buffer = BytesIO(input)
output = BytesIO()
# Always use utf-8 here, to simply .decode() below
output_text = sys.stdout = sys.stderr = io.TextIOWrapper(output, encoding='utf-8')
if archiver is None:
archiver = Archiver()
archiver.prerun_checks = lambda *args: None
@ -95,9 +101,11 @@ def exec_cmd(*args, archiver=None, fork=False, exe=None, input=b'', **kw):
# actions that abort early (eg. --help) where given. Catch this and return
# the error code as-if we invoked a Borg binary.
except SystemExit as e:
return e.code, output.getvalue()
output_text.flush()
return e.code, output.getvalue() if binary_output else output.getvalue().decode()
ret = archiver.run(args)
return ret, output.getvalue()
output_text.flush()
return ret, output.getvalue() if binary_output else output.getvalue().decode()
finally:
sys.stdin, sys.stdout, sys.stderr = stdin, stdout, stderr
@ -927,6 +935,18 @@ class ArchiverTestCase(ArchiverTestCaseBase):
os.mkdir('input/cache3')
os.link('input/cache1/%s' % CACHE_TAG_NAME, 'input/cache3/%s' % CACHE_TAG_NAME)
def test_create_stdin(self):
self.cmd('init', '--encryption=repokey', self.repository_location)
input_data = b'\x00foo\n\nbar\n \n'
self.cmd('create', self.repository_location + '::test', '-', input=input_data)
item = json.loads(self.cmd('list', '--json-lines', self.repository_location + '::test'))
assert item['uid'] == 0
assert item['gid'] == 0
assert item['size'] == len(input_data)
assert item['path'] == 'stdin'
extracted_data = self.cmd('extract', '--stdout', self.repository_location + '::test', binary_output=True)
assert extracted_data == input_data
def test_create_without_root(self):
"""test create without a root"""
self.cmd('init', '--encryption=repokey', self.repository_location)