borg/borg/testsuite/upgrader.py

209 lines
6.8 KiB
Python

import os
import pytest
try:
import attic.repository
import attic.key
import attic.helpers
except ImportError:
attic = None
from ..upgrader import AtticRepositoryUpgrader, AtticKeyfileKey
from ..helpers import get_keys_dir
from ..key import KeyfileKey
from ..archiver import UMASK_DEFAULT
from ..repository import Repository
def repo_valid(path):
"""
utility function to check if borg can open a repository
:param path: the path to the repository
:returns: if borg can check the repository
"""
repository = Repository(str(path), create=False)
# can't check raises() because check() handles the error
state = repository.check()
repository.close()
return state
def key_valid(path):
"""
check that the new keyfile is alright
:param path: the path to the key file
:returns: if the file starts with the borg magic string
"""
keyfile = os.path.join(get_keys_dir(),
os.path.basename(path))
with open(keyfile, 'r') as f:
return f.read().startswith(KeyfileKey.FILE_ID)
@pytest.fixture()
def attic_repo(tmpdir):
"""
create an attic repo with some stuff in it
:param tmpdir: path to the repository to be created
:returns: a attic.repository.Repository object
"""
attic_repo = attic.repository.Repository(str(tmpdir), create=True)
# throw some stuff in that repo, copied from `RepositoryTestCase.test1`
for x in range(100):
attic_repo.put(('%-32d' % x).encode('ascii'), b'SOMEDATA')
attic_repo.commit()
attic_repo.close()
return attic_repo
@pytest.fixture(params=[True, False])
def inplace(request):
return request.param
@pytest.mark.skipif(attic is None, reason='cannot find an attic install')
def test_convert_segments(tmpdir, attic_repo, inplace):
"""test segment conversion
this will load the given attic repository, list all the segments
then convert them one at a time. we need to close the repo before
conversion otherwise we have errors from borg
:param tmpdir: a temporary directory to run the test in (builtin
fixture)
:param attic_repo: a populated attic repository (fixture)
"""
# check should fail because of magic number
assert not repo_valid(tmpdir)
repo = AtticRepositoryUpgrader(str(tmpdir), create=False)
segments = [filename for i, filename in repo.io.segment_iterator()]
repo.close()
repo.convert_segments(segments, dryrun=False, inplace=inplace)
repo.convert_cache(dryrun=False)
assert repo_valid(tmpdir)
class MockArgs:
"""
mock attic location
this is used to simulate a key location with a properly loaded
repository object to create a key file
"""
def __init__(self, path):
self.repository = attic.helpers.Location(path)
@pytest.fixture()
def attic_key_file(attic_repo, tmpdir):
"""
create an attic key file from the given repo, in the keys
subdirectory of the given tmpdir
:param attic_repo: an attic.repository.Repository object (fixture
define above)
:param tmpdir: a temporary directory (a builtin fixture)
:returns: the KeyfileKey object as returned by
attic.key.KeyfileKey.create()
"""
keys_dir = str(tmpdir.mkdir('keys'))
# we use the repo dir for the created keyfile, because we do
# not want to clutter existing keyfiles
os.environ['ATTIC_KEYS_DIR'] = keys_dir
# we use the same directory for the converted files, which
# will clutter the previously created one, which we don't care
# about anyways. in real runs, the original key will be retained.
os.environ['BORG_KEYS_DIR'] = keys_dir
os.environ['ATTIC_PASSPHRASE'] = 'test'
return attic.key.KeyfileKey.create(attic_repo,
MockArgs(keys_dir))
@pytest.mark.skipif(attic is None, reason='cannot find an attic install')
def test_keys(tmpdir, attic_repo, attic_key_file):
"""test key conversion
test that we can convert the given key to a properly formatted
borg key. assumes that the ATTIC_KEYS_DIR and BORG_KEYS_DIR have
been properly populated by the attic_key_file fixture.
:param tmpdir: a temporary directory (a builtin fixture)
:param attic_repo: an attic.repository.Repository object (fixture
define above)
:param attic_key_file: an attic.key.KeyfileKey (fixture created above)
"""
repository = AtticRepositoryUpgrader(str(tmpdir), create=False)
keyfile = AtticKeyfileKey.find_key_file(repository)
AtticRepositoryUpgrader.convert_keyfiles(keyfile, dryrun=False)
assert key_valid(attic_key_file.path)
@pytest.mark.skipif(attic is None, reason='cannot find an attic install')
def test_convert_all(tmpdir, attic_repo, attic_key_file, inplace):
"""test all conversion steps
this runs everything. mostly redundant test, since everything is
done above. yet we expect a NotImplementedError because we do not
convert caches yet.
:param tmpdir: a temporary directory (a builtin fixture)
:param attic_repo: an attic.repository.Repository object (fixture
define above)
:param attic_key_file: an attic.key.KeyfileKey (fixture created above)
"""
# check should fail because of magic number
assert not repo_valid(tmpdir)
def stat_segment(path):
return os.stat(os.path.join(path, 'data', '0', '0'))
def first_inode(path):
return stat_segment(path).st_ino
orig_inode = first_inode(attic_repo.path)
repo = AtticRepositoryUpgrader(str(tmpdir), create=False)
# replicate command dispatch, partly
os.umask(UMASK_DEFAULT)
backup = repo.upgrade(dryrun=False, inplace=inplace)
if inplace:
assert backup is None
assert first_inode(repo.path) == orig_inode
else:
assert backup
assert first_inode(repo.path) != first_inode(backup)
# i have seen cases where the copied tree has world-readable
# permissions, which is wrong
assert stat_segment(backup).st_mode & UMASK_DEFAULT == 0
assert key_valid(attic_key_file.path)
assert repo_valid(tmpdir)
def test_hardlink(tmpdir, inplace):
"""test that we handle hard links properly
that is, if we are in "inplace" mode, hardlinks should *not*
change (ie. we write to the file directly, so we do not rewrite the
whole file, and we do not re-create the file).
if we are *not* in inplace mode, then the inode should change, as
we are supposed to leave the original inode alone."""
a = str(tmpdir.join('a'))
with open(a, 'wb') as tmp:
tmp.write(b'aXXX')
b = str(tmpdir.join('b'))
os.link(a, b)
AtticRepositoryUpgrader.header_replace(b, b'a', b'b', inplace=inplace)
if not inplace:
assert os.stat(a).st_ino != os.stat(b).st_ino
else:
assert os.stat(a).st_ino == os.stat(b).st_ino
with open(b, 'rb') as tmp:
assert tmp.read() == b'bXXX'