import os from io import StringIO import stat import subprocess import sys import shutil import tempfile import time import unittest from hashlib import sha256 from attic import xattr from attic.archive import Archive, ChunkBuffer from attic.archiver import Archiver from attic.crypto import bytes_to_long, num_aes_blocks from attic.helpers import Manifest from attic.remote import RemoteRepository, PathNotAllowed from attic.repository import Repository from attic.testsuite import AtticTestCase from attic.testsuite.mock import patch try: import llfuse has_llfuse = True except ImportError: has_llfuse = False has_lchflags = hasattr(os, 'lchflags') src_dir = os.path.join(os.getcwd(), os.path.dirname(__file__), '..') class changedir: def __init__(self, dir): self.dir = dir def __enter__(self): self.old = os.getcwd() os.chdir(self.dir) def __exit__(self, *args, **kw): os.chdir(self.old) class ArchiverTestCaseBase(AtticTestCase): prefix = '' def setUp(self): os.environ['ATTIC_CHECK_I_KNOW_WHAT_I_AM_DOING'] = '1' self.archiver = Archiver() self.tmpdir = tempfile.mkdtemp() self.repository_path = os.path.join(self.tmpdir, 'repository') self.repository_location = self.prefix + self.repository_path self.input_path = os.path.join(self.tmpdir, 'input') self.output_path = os.path.join(self.tmpdir, 'output') self.keys_path = os.path.join(self.tmpdir, 'keys') self.cache_path = os.path.join(self.tmpdir, 'cache') self.exclude_file_path = os.path.join(self.tmpdir, 'excludes') os.environ['ATTIC_KEYS_DIR'] = self.keys_path os.environ['ATTIC_CACHE_DIR'] = self.cache_path os.mkdir(self.input_path) os.mkdir(self.output_path) os.mkdir(self.keys_path) os.mkdir(self.cache_path) with open(self.exclude_file_path, 'wb') as fd: fd.write(b'input/file2\n# A commment line, then a blank line\n\n') self._old_wd = os.getcwd() os.chdir(self.tmpdir) def tearDown(self): shutil.rmtree(self.tmpdir) os.chdir(self._old_wd) def attic(self, *args, **kw): exit_code = kw.get('exit_code', 0) fork = kw.get('fork', False) if fork: try: output = subprocess.check_output((sys.executable, '-m', 'attic.archiver') + args) ret = 0 except subprocess.CalledProcessError as e: output = e.output ret = e.returncode output = os.fsdecode(output) if ret != exit_code: print(output) self.assert_equal(exit_code, ret) return output args = list(args) stdout, stderr = sys.stdout, sys.stderr try: output = StringIO() sys.stdout = sys.stderr = output ret = self.archiver.run(args) sys.stdout, sys.stderr = stdout, stderr if ret != exit_code: print(output.getvalue()) self.assert_equal(exit_code, ret) return output.getvalue() finally: sys.stdout, sys.stderr = stdout, stderr def create_src_archive(self, name): self.attic('create', self.repository_location + '::' + name, src_dir) class ArchiverTestCase(ArchiverTestCaseBase): def create_regual_file(self, name, size=0): filename = os.path.join(self.input_path, name) if not os.path.exists(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) with open(filename, 'wb') as fd: fd.write(b'X' * size) def create_test_files(self): """Create a minimal test case including all supported file types """ # File self.create_regual_file('empty', size=0) self.create_regual_file('file1', size=1024 * 80) # Directory self.create_regual_file('dir2/file2', size=1024 * 80) # File owner os.chown('input/file1', 100, 200) # File mode os.chmod('input/file1', 0o7755) os.chmod('input/dir2', 0o555) # Block device os.mknod('input/bdev', 0o600 | stat.S_IFBLK, os.makedev(10, 20)) # Char device os.mknod('input/cdev', 0o600 | stat.S_IFCHR, os.makedev(30, 40)) # Hard link os.link(os.path.join(self.input_path, 'file1'), os.path.join(self.input_path, 'hardlink')) # Symlink os.symlink('somewhere', os.path.join(self.input_path, 'link1')) if xattr.is_enabled(): xattr.setxattr(os.path.join(self.input_path, 'file1'), 'user.foo', b'bar') xattr.setxattr(os.path.join(self.input_path, 'link1'), 'user.foo_symlink', b'bar_symlink', follow_symlinks=False) # FIFO node os.mkfifo(os.path.join(self.input_path, 'fifo1')) if has_lchflags: os.lchflags(os.path.join(self.input_path, 'file1'), stat.UF_NODUMP) def test_basic_functionality(self): self.create_test_files() self.attic('init', self.repository_location) self.attic('create', self.repository_location + '::test', 'input') self.attic('create', self.repository_location + '::test.2', 'input') with changedir('output'): self.attic('extract', self.repository_location + '::test') self.assert_equal(len(self.attic('list', self.repository_location).splitlines()), 2) self.assert_equal(len(self.attic('list', self.repository_location + '::test').splitlines()), 10) self.assert_dirs_equal('input', 'output/input') info_output = self.attic('info', self.repository_location + '::test') shutil.rmtree(self.cache_path) info_output2 = self.attic('info', self.repository_location + '::test') # info_output2 starts with some "initializing cache" text but should # end the same way as info_output assert info_output2.endswith(info_output) def test_extract_include_exclude(self): self.attic('init', self.repository_location) self.create_regual_file('file1', size=1024 * 80) self.create_regual_file('file2', size=1024 * 80) self.create_regual_file('file3', size=1024 * 80) self.create_regual_file('file4', size=1024 * 80) self.attic('create', '--exclude=input/file4', self.repository_location + '::test', 'input') with changedir('output'): self.attic('extract', self.repository_location + '::test', 'input/file1', ) self.assert_equal(sorted(os.listdir('output/input')), ['file1']) with changedir('output'): self.attic('extract', '--exclude=input/file2', self.repository_location + '::test') self.assert_equal(sorted(os.listdir('output/input')), ['file1', 'file3']) with changedir('output'): self.attic('extract', '--exclude-from=' + self.exclude_file_path, self.repository_location + '::test') self.assert_equal(sorted(os.listdir('output/input')), ['file1', 'file3']) def test_path_normalization(self): self.attic('init', self.repository_location) self.create_regual_file('dir1/dir2/file', size=1024 * 80) with changedir('input/dir1/dir2'): self.attic('create', self.repository_location + '::test', '../../../input/dir1/../dir1/dir2/..') output = self.attic('list', self.repository_location + '::test') self.assert_not_in('..', output) self.assert_in(' input/dir1/dir2/file', output) def test_repeated_files(self): self.create_regual_file('file1', size=1024 * 80) self.attic('init', self.repository_location) self.attic('create', self.repository_location + '::test', 'input', 'input') def test_overwrite(self): self.create_regual_file('file1', size=1024 * 80) self.create_regual_file('dir2/file2', size=1024 * 80) self.attic('init', self.repository_location) self.attic('create', self.repository_location + '::test', 'input') # Overwriting regular files and directories should be supported os.mkdir('output/input') os.mkdir('output/input/file1') os.mkdir('output/input/dir2') with changedir('output'): self.attic('extract', self.repository_location + '::test') self.assert_dirs_equal('input', 'output/input') # But non-empty dirs should fail os.unlink('output/input/file1') os.mkdir('output/input/file1') os.mkdir('output/input/file1/dir') with changedir('output'): self.attic('extract', self.repository_location + '::test', exit_code=1) def test_delete(self): self.create_regual_file('file1', size=1024 * 80) self.create_regual_file('dir2/file2', size=1024 * 80) self.attic('init', self.repository_location) self.attic('create', self.repository_location + '::test', 'input') self.attic('create', self.repository_location + '::test.2', 'input') self.attic('extract', '--dry-run', self.repository_location + '::test') self.attic('extract', '--dry-run', self.repository_location + '::test.2') self.attic('delete', self.repository_location + '::test') self.attic('extract', '--dry-run', self.repository_location + '::test.2') self.attic('delete', self.repository_location + '::test.2') # Make sure all data except the manifest has been deleted repository = Repository(self.repository_path) self.assert_equal(len(repository), 1) def test_corrupted_repository(self): self.attic('init', self.repository_location) self.create_src_archive('test') self.attic('extract', '--dry-run', self.repository_location + '::test') self.attic('check', self.repository_location) name = sorted(os.listdir(os.path.join(self.tmpdir, 'repository', 'data', '0')), reverse=True)[0] fd = open(os.path.join(self.tmpdir, 'repository', 'data', '0', name), 'r+') fd.seek(100) fd.write('XXXX') fd.close() self.attic('check', self.repository_location, exit_code=1) def test_readonly_repository(self): self.attic('init', self.repository_location) self.create_src_archive('test') os.system('chmod -R ugo-w ' + self.repository_path) try: self.attic('extract', '--dry-run', self.repository_location + '::test') finally: # Restore permissions so shutil.rmtree is able to delete it os.system('chmod -R u+w ' + self.repository_path) def test_cmdline_compatibility(self): self.create_regual_file('file1', size=1024 * 80) self.attic('init', self.repository_location) self.attic('create', self.repository_location + '::test', 'input') output = self.attic('verify', '-v', self.repository_location + '::test') self.assert_in('"attic verify" has been deprecated', output) output = self.attic('prune', self.repository_location, '--hourly=1') self.assert_in('"--hourly" has been deprecated. Use "--keep-hourly" instead', output) def test_prune_repository(self): self.attic('init', self.repository_location) self.attic('create', self.repository_location + '::test1', src_dir) self.attic('create', self.repository_location + '::test2', src_dir) output = self.attic('prune', '-v', '--dry-run', self.repository_location, '--keep-daily=2') self.assert_in('Keeping archive: test2', output) self.assert_in('Would prune: test1', output) output = self.attic('list', self.repository_location) self.assert_in('test1', output) self.assert_in('test2', output) self.attic('prune', self.repository_location, '--keep-daily=2') output = self.attic('list', self.repository_location) self.assert_not_in('test1', output) self.assert_in('test2', output) def test_usage(self): self.assert_raises(SystemExit, lambda: self.attic()) self.assert_raises(SystemExit, lambda: self.attic('-h')) @unittest.skipUnless(has_llfuse, 'llfuse not installed') def test_fuse_mount_repository(self): mountpoint = os.path.join(self.tmpdir, 'mountpoint') os.mkdir(mountpoint) self.attic('init', self.repository_location) self.create_test_files() self.attic('create', self.repository_location + '::archive', 'input') self.attic('create', self.repository_location + '::archive2', 'input') try: self.attic('mount', self.repository_location, mountpoint, fork=True) self.wait_for_mount(mountpoint) self.assert_dirs_equal(self.input_path, os.path.join(mountpoint, 'archive', 'input')) self.assert_dirs_equal(self.input_path, os.path.join(mountpoint, 'archive2', 'input')) finally: if sys.platform.startswith('linux'): os.system('fusermount -u ' + mountpoint) else: os.system('umount ' + mountpoint) os.rmdir(mountpoint) # Give the daemon some time to exit time.sleep(.2) @unittest.skipUnless(has_llfuse, 'llfuse not installed') def test_fuse_mount_archive(self): mountpoint = os.path.join(self.tmpdir, 'mountpoint') os.mkdir(mountpoint) self.attic('init', self.repository_location) self.create_test_files() self.attic('create', self.repository_location + '::archive', 'input') try: self.attic('mount', self.repository_location + '::archive', mountpoint, fork=True) self.wait_for_mount(mountpoint) self.assert_dirs_equal(self.input_path, os.path.join(mountpoint, 'input')) finally: if sys.platform.startswith('linux'): os.system('fusermount -u ' + mountpoint) else: os.system('umount ' + mountpoint) os.rmdir(mountpoint) # Give the daemon some time to exit time.sleep(.2) def verify_aes_counter_uniqueness(self, method): seen = set() # Chunks already seen used = set() # counter values already used def verify_uniqueness(): repository = Repository(self.repository_path) for key, _ in repository.get_read_only_index(repository.get_transaction_id()).iteritems(): data = repository.get(key) hash = sha256(data).digest() if not hash in seen: seen.add(hash) num_blocks = num_aes_blocks(len(data) - 41) nonce = bytes_to_long(data[33:41]) for counter in range(nonce, nonce + num_blocks): self.assert_not_in(counter, used) used.add(counter) self.create_test_files() os.environ['ATTIC_PASSPHRASE'] = 'passphrase' self.attic('init', '--encryption=' + method, self.repository_location) verify_uniqueness() self.attic('create', self.repository_location + '::test', 'input') verify_uniqueness() self.attic('create', self.repository_location + '::test.2', 'input') verify_uniqueness() self.attic('delete', self.repository_location + '::test.2') verify_uniqueness() self.assert_equal(used, set(range(len(used)))) def test_aes_counter_uniqueness_keyfile(self): self.verify_aes_counter_uniqueness('keyfile') def test_aes_counter_uniqueness_passphrase(self): self.verify_aes_counter_uniqueness('passphrase') class ArchiverCheckTestCase(ArchiverTestCaseBase): def setUp(self): super(ArchiverCheckTestCase, self).setUp() with patch.object(ChunkBuffer, 'BUFFER_SIZE', 10): self.attic('init', self.repository_location) self.create_src_archive('archive1') self.create_src_archive('archive2') def open_archive(self, name): repository = Repository(self.repository_path) manifest, key = Manifest.load(repository) archive = Archive(repository, key, manifest, name) return archive, repository def test_check_usage(self): output = self.attic('check', self.repository_location, exit_code=0) self.assert_in('Starting repository check', output) self.assert_in('Starting archive consistency check', output) output = self.attic('check', '--repository-only', self.repository_location, exit_code=0) self.assert_in('Starting repository check', output) self.assert_not_in('Starting archive consistency check', output) output = self.attic('check', '--archives-only', self.repository_location, exit_code=0) self.assert_not_in('Starting repository check', output) self.assert_in('Starting archive consistency check', output) def test_missing_file_chunk(self): archive, repository = self.open_archive('archive1') for item in archive.iter_items(): if item[b'path'].endswith('testsuite/archiver.py'): repository.delete(item[b'chunks'][-1][0]) break repository.commit() self.attic('check', self.repository_location, exit_code=1) self.attic('check', '--repair', self.repository_location, exit_code=0) self.attic('check', self.repository_location, exit_code=0) def test_missing_archive_item_chunk(self): archive, repository = self.open_archive('archive1') repository.delete(archive.metadata[b'items'][-5]) repository.commit() self.attic('check', self.repository_location, exit_code=1) self.attic('check', '--repair', self.repository_location, exit_code=0) self.attic('check', self.repository_location, exit_code=0) def test_missing_archive_metadata(self): archive, repository = self.open_archive('archive1') repository.delete(archive.id) repository.commit() self.attic('check', self.repository_location, exit_code=1) self.attic('check', '--repair', self.repository_location, exit_code=0) self.attic('check', self.repository_location, exit_code=0) def test_missing_manifest(self): archive, repository = self.open_archive('archive1') repository.delete(Manifest.MANIFEST_ID) repository.commit() self.attic('check', self.repository_location, exit_code=1) output = self.attic('check', '--repair', self.repository_location, exit_code=0) self.assert_in('archive1', output) self.assert_in('archive2', output) self.attic('check', self.repository_location, exit_code=0) def test_extra_chunks(self): self.attic('check', self.repository_location, exit_code=0) repository = Repository(self.repository_location) repository.put(b'01234567890123456789012345678901', b'xxxx') repository.commit() repository.close() self.attic('check', self.repository_location, exit_code=1) self.attic('check', self.repository_location, exit_code=1) self.attic('check', '--repair', self.repository_location, exit_code=0) self.attic('check', self.repository_location, exit_code=0) self.attic('extract', '--dry-run', self.repository_location + '::archive1', exit_code=0) class RemoteArchiverTestCase(ArchiverTestCase): prefix = '__testsuite__:' def test_remote_repo_restrict_to_path(self): self.attic('init', self.repository_location) path_prefix = os.path.dirname(self.repository_path) with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-path', '/foo']): self.assert_raises(PathNotAllowed, lambda: self.attic('init', self.repository_location + '_1')) with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-path', path_prefix]): self.attic('init', self.repository_location + '_2') with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-path', '/foo', '--restrict-to-path', path_prefix]): self.attic('init', self.repository_location + '_3')