1
0
Fork 0
mirror of https://github.com/borgbase/vorta synced 2025-01-03 13:45:49 +00:00
vorta/tests/test_lock.py
Bastien 54d8bbe6b1
Implement multiple queues. By @bastiencyr (#1045)
Scheduler now has the ability to run jobs on multiple repositories concurrently and run multiple jobs on one repo (by queuing them).

For each repository, there is one queue. I have represented a queue by a 'site'. Between sites (ie repository), tasks run concurrently. On one site, tasks run one by one. The user also run tasks by adding them to the queue but he can't run multiple backups because start backup button is disabled when a job is running.
2021-10-04 15:31:41 +04:00

44 lines
1.8 KiB
Python

import pytest
from PyQt5 import QtCore
import vorta.borg.borg_job
import vorta.application
def test_create_perm_error(qapp, borg_json_output, mocker, qtbot):
main = qapp.main_window
mocker.patch.object(vorta.application.QMessageBox, 'show')
stdout, stderr = borg_json_output('create_perm')
popen_result = mocker.MagicMock(stdout=stdout, stderr=stderr, returncode=0)
mocker.patch.object(vorta.borg.borg_job, 'Popen', return_value=popen_result)
qtbot.mouseClick(main.createStartBtn, QtCore.Qt.LeftButton)
qtbot.waitUntil(lambda: hasattr(qapp, '_msg'), **pytest._wait_defaults)
assert qapp._msg.text().startswith("You do not have permission")
del qapp._msg
def test_create_lock(qapp, borg_json_output, mocker, qtbot):
main = qapp.main_window
mocker.patch.object(vorta.application.QMessageBox, 'show')
# Trigger locked repo
stdout, stderr = borg_json_output('create_lock')
popen_result = mocker.MagicMock(stdout=stdout, stderr=stderr, returncode=0)
mocker.patch.object(vorta.borg.borg_job, 'Popen', return_value=popen_result)
qtbot.mouseClick(main.createStartBtn, QtCore.Qt.LeftButton)
qtbot.waitUntil(lambda: hasattr(qapp, '_msg'), **pytest._wait_defaults)
assert "The repository at" in qapp._msg.text()
# Break locked repo
stdout, stderr = borg_json_output('create_break')
popen_result = mocker.MagicMock(stdout=stdout, stderr=stderr, returncode=0)
mocker.patch.object(vorta.borg.borg_job, 'Popen', return_value=popen_result)
qtbot.waitUntil(lambda: main.createStartBtn.isEnabled(), **pytest._wait_defaults) # Prevent thread collision
qapp._msg.accept()
exp_message_text = 'Repository lock broken. Please redo your last action.'
qtbot.waitUntil(lambda: main.progressText.text() == exp_message_text, **pytest._wait_defaults)