vorta/src/vorta/views/archive_tab.py

571 lines
24 KiB
Python
Raw Normal View History

import os.path
import sys
from datetime import timedelta
2019-07-04 19:17:09 +00:00
from PyQt5 import QtCore, uic
from PyQt5.QtGui import QDesktopServices
2019-07-04 19:17:09 +00:00
from PyQt5.QtWidgets import (QHeaderView, QMessageBox, QTableView,
QTableWidgetItem, QInputDialog, QMenu,
QToolButton, QApplication)
from vorta.borg.check import BorgCheckJob
from vorta.borg.delete import BorgDeleteJob
from vorta.borg.diff import BorgDiffJob
from vorta.borg.extract import BorgExtractJob
from vorta.borg.list_archive import BorgListArchiveJob
from vorta.borg.list_repo import BorgListRepoJob
from vorta.borg.info_archive import BorgInfoArchiveJob
from vorta.borg.mount import BorgMountJob
from vorta.borg.prune import BorgPruneJob
from vorta.borg.umount import BorgUmountJob
from vorta.borg.rename import BorgRenameJob
from vorta.i18n import trans_late
2019-07-04 19:17:09 +00:00
from vorta.models import ArchiveModel, BackupProfileMixin
from vorta.utils import (choose_file_dialog, format_archive_name, get_asset,
get_mount_points, pretty_bytes)
from vorta.views.source_tab import SizeItem
2019-07-04 19:17:09 +00:00
from vorta.views.diff_dialog import DiffDialog
from vorta.views.diff_result import DiffResult
from vorta.views.extract_dialog import ExtractDialog
from vorta.views.utils import get_colored_icon
2018-10-27 17:24:34 +00:00
uifile = get_asset('UI/archivetab.ui')
ArchiveTabUI, ArchiveTabBase = uic.loadUiType(uifile)
2018-10-27 17:24:34 +00:00
class ArchiveTab(ArchiveTabBase, ArchiveTabUI, BackupProfileMixin):
prune_intervals = ['hour', 'day', 'week', 'month', 'year']
def __init__(self, parent=None, app=None):
2018-10-27 17:24:34 +00:00
super().__init__(parent)
self.setupUi(parent)
self.mount_points = {}
self.menu = None
self.app = app
self.toolBox.setCurrentIndex(0)
2018-10-27 17:24:34 +00:00
header = self.archiveTable.horizontalHeader()
2018-10-27 17:24:34 +00:00
header.setVisible(True)
header.setSectionResizeMode(0, QHeaderView.ResizeToContents)
2018-10-27 17:24:34 +00:00
header.setSectionResizeMode(1, QHeaderView.ResizeToContents)
header.setSectionResizeMode(2, QHeaderView.ResizeToContents)
header.setSectionResizeMode(3, QHeaderView.Interactive)
header.setSectionResizeMode(4, QHeaderView.Stretch)
header.setStretchLastSection(True)
2018-10-27 17:24:34 +00:00
if sys.platform != 'darwin':
self._set_status('') # Set platform-specific hints.
self.archiveTable.setSelectionBehavior(QTableView.SelectRows)
self.archiveTable.setSelectionMode(QTableView.SingleSelection)
self.archiveTable.setEditTriggers(QTableView.NoEditTriggers)
self.archiveTable.setWordWrap(False)
self.archiveTable.setTextElideMode(QtCore.Qt.ElideLeft)
self.archiveTable.setAlternatingRowColors(True)
self.archiveTable.cellDoubleClicked.connect(self.cell_double_clicked)
self.archiveTable.setSortingEnabled(True)
2018-10-27 17:24:34 +00:00
self.listButton.clicked.connect(self.list_action)
self.pruneButton.clicked.connect(self.prune_action)
self.checkButton.clicked.connect(self.check_action)
2019-07-04 19:17:09 +00:00
self.diffButton.clicked.connect(self.diff_action)
self.archiveActionMenu = QMenu(parent=self)
self.archiveActionMenu.aboutToShow.connect(self.showArchiveActionMenu)
self.archiveActionButton.setMenu(self.archiveActionMenu)
self.archiveActionButton.setPopupMode(QToolButton.InstantPopup)
2018-10-27 17:24:34 +00:00
self.archiveNameTemplate.textChanged.connect(
lambda tpl, key='new_archive_name': self.save_archive_template(tpl, key))
self.prunePrefixTemplate.textChanged.connect(
lambda tpl, key='prune_prefix': self.save_archive_template(tpl, key))
self.populate_from_profile()
self.selected_archives = None
self.set_icons()
def set_icons(self):
"Used when changing between light- and dark mode"
self.checkButton.setIcon(get_colored_icon('check-circle'))
self.diffButton.setIcon(get_colored_icon('stream-solid'))
self.pruneButton.setIcon(get_colored_icon('cut'))
self.listButton.setIcon(get_colored_icon('refresh'))
self.toolBox.setItemIcon(0, get_colored_icon('tasks'))
self.toolBox.setItemIcon(1, get_colored_icon('cut'))
self.archiveActionButton.setIcon(get_colored_icon('ellipsis-v'))
def cancel_action(self):
self._set_status(self.tr("Action cancelled."))
self._toggle_all_buttons(True)
def _set_status(self, text):
self.mountErrors.setText(text)
self.mountErrors.repaint()
def _toggle_all_buttons(self, enabled=True):
for button in [self.checkButton, self.listButton, self.pruneButton,
self.diffButton, self.archiveActionButton]:
button.setEnabled(enabled)
button.repaint()
def showArchiveActionMenu(self):
archive_name = self.selected_archive_name()
menu = self.archiveActionMenu
menu.clear()
if not archive_name:
action = menu.addAction(self.tr("Select an archive first."))
action.setEnabled(False)
return menu
if archive_name in self.mount_points:
2021-10-28 11:02:09 +00:00
unmountAction = menu.addAction("Unmount", self.umount_action)
unmountAction.setIcon(get_colored_icon('eject'))
else:
mountAction = menu.addAction("Mount", self.mount_action)
mountAction.setIcon(get_colored_icon('folder-open'))
extractAction = menu.addAction("Extract", self.list_archive_action)
refreshAction = menu.addAction("Refresh", self.refresh_archive_action)
renameAction = menu.addAction("Rename", self.rename_action)
deleteAction = menu.addAction("Delete", self.delete_action)
extractAction.setIcon(get_colored_icon('cloud-download'))
refreshAction.setIcon(get_colored_icon('refresh'))
renameAction.setIcon(get_colored_icon('edit'))
deleteAction.setIcon(get_colored_icon('trash'))
return menu
def populate_from_profile(self):
"""Populate archive list and prune settings from profile."""
profile = self.profile()
if profile.repo is not None:
self.mount_points = get_mount_points(profile.repo.url)
self.toolBox.setItemText(0, self.tr('Archives for %s') % profile.repo.url)
archives = [s for s in profile.repo.archives.select().order_by(ArchiveModel.time.desc())]
2018-10-27 17:24:34 +00:00
sorting = self.archiveTable.isSortingEnabled()
self.archiveTable.setSortingEnabled(False)
for row, archive in enumerate(archives):
self.archiveTable.insertRow(row)
formatted_time = archive.time.strftime('%Y-%m-%d %H:%M')
self.archiveTable.setItem(row, 0, QTableWidgetItem(formatted_time))
self.archiveTable.setItem(row, 1, SizeItem(pretty_bytes(archive.size)))
if archive.duration is not None:
formatted_duration = str(timedelta(seconds=round(archive.duration)))
else:
formatted_duration = ''
self.archiveTable.setItem(row, 2, QTableWidgetItem(formatted_duration))
mount_point = self.mount_points.get(archive.name)
if mount_point is not None:
item = QTableWidgetItem(mount_point)
self.archiveTable.setItem(row, 3, item)
self.archiveTable.setItem(row, 4, QTableWidgetItem(archive.name))
self.archiveTable.setRowCount(len(archives))
self.archiveTable.setSortingEnabled(sorting)
item = self.archiveTable.item(0, 0)
self.archiveTable.scrollToItem(item)
self._toggle_all_buttons(enabled=True)
2018-11-02 11:14:54 +00:00
else:
self.mount_points = {}
self.archiveTable.setRowCount(0)
self.toolBox.setItemText(0, self.tr('Archives'))
self._toggle_all_buttons(enabled=False)
self.archiveNameTemplate.setText(profile.new_archive_name)
self.prunePrefixTemplate.setText(profile.prune_prefix)
# Populate pruning options from database
profile = self.profile()
for i in self.prune_intervals:
getattr(self, f'prune_{i}').setValue(getattr(profile, f'prune_{i}'))
getattr(self, f'prune_{i}').valueChanged.connect(self.save_prune_setting)
self.prune_keep_within.setText(profile.prune_keep_within)
self.prune_keep_within.editingFinished.connect(self.save_prune_setting)
def save_archive_template(self, tpl, key):
profile = self.profile()
try:
preview = self.tr('Preview: %s') % format_archive_name(profile, tpl)
setattr(profile, key, tpl)
profile.save()
except Exception:
preview = self.tr('Error in archive name template.')
if key == 'new_archive_name':
self.archiveNamePreview.setText(preview)
else:
self.prunePrefixPreview.setText(preview)
def check_action(self):
params = BorgCheckJob.prepare(self.profile())
if not params['ok']:
self._set_status(params['message'])
return
# Conditions are met (borg binary available, etc)
row_selected = self.archiveTable.selectionModel().selectedRows()
if row_selected:
archive_cell = self.archiveTable.item(row_selected[0].row(), 4)
if archive_cell:
archive_name = archive_cell.text()
params['cmd'][-1] += f'::{archive_name}'
job = BorgCheckJob(params['cmd'], params, self.profile().repo.id)
job.updated.connect(self._set_status)
job.result.connect(self.check_result)
self._toggle_all_buttons(False)
QApplication.instance().scheduler.jobs_manager.add_job(job)
def check_result(self, result):
if result['returncode'] == 0:
self._toggle_all_buttons(True)
def prune_action(self):
params = BorgPruneJob.prepare(self.profile())
if params['ok']:
job = BorgPruneJob(params['cmd'], params, self.profile().repo.id)
job.updated.connect(self._set_status)
job.result.connect(self.prune_result)
self._toggle_all_buttons(False)
QApplication.instance().scheduler.jobs_manager.add_job(job)
else:
self._set_status(params['message'])
def prune_result(self, result):
if result['returncode'] == 0:
self._set_status(self.tr('Pruning finished.'))
self.list_action()
else:
self._toggle_all_buttons(True)
def list_action(self):
params = BorgListRepoJob.prepare(self.profile())
if params['ok']:
job = BorgListRepoJob(params['cmd'], params, self.profile().repo.id)
job.updated.connect(self._set_status)
job.result.connect(self.list_result)
self._toggle_all_buttons(False)
QApplication.instance().scheduler.jobs_manager.add_job(job)
else:
self._set_status(params['message'])
def list_result(self, result):
self._toggle_all_buttons(True)
if result['returncode'] == 0:
self._set_status(self.tr('Refreshed archives.'))
self.populate_from_profile()
def refresh_archive_action(self):
archive_name = self.selected_archive_name()
if archive_name is not None:
params = BorgInfoArchiveJob.prepare(self.profile(), archive_name)
if params['ok']:
job = BorgInfoArchiveJob(params['cmd'], params, self.profile().repo.id)
job.updated.connect(self._set_status)
job.result.connect(self.refresh_archive_result)
self._toggle_all_buttons(False)
QApplication.instance().scheduler.jobs_manager.add_job(job)
def refresh_archive_result(self, result):
self._toggle_all_buttons(True)
if result['returncode'] == 0:
self._set_status(self.tr('Refreshed archive.'))
self.populate_from_profile()
def selected_archive_name(self):
row_selected = self.archiveTable.selectionModel().selectedRows()
if row_selected:
archive_cell = self.archiveTable.item(row_selected[0].row(), 4)
if archive_cell:
return archive_cell.text()
return None
def mount_action(self):
profile = self.profile()
params = BorgMountJob.prepare(profile)
if not params['ok']:
self._set_status(params['message'])
return
# Conditions are met (borg binary available, etc)
archive_name = self.selected_archive_name()
if archive_name:
params['cmd'][-1] += f'::{archive_name}'
params['current_archive'] = archive_name
2018-10-27 17:24:34 +00:00
def receive():
mount_point = dialog.selectedFiles()
if mount_point:
params['cmd'].append(mount_point[0])
if params.get('current_archive', False):
self.mount_points[params['current_archive']] = mount_point[0]
if params['ok']:
self._toggle_all_buttons(False)
job = BorgMountJob(params['cmd'], params, self.profile().repo.id)
job.updated.connect(self.mountErrors.setText)
job.result.connect(self.mount_result)
QApplication.instance().scheduler.jobs_manager.add_job(job)
dialog = choose_file_dialog(self, self.tr("Choose Mount Point"), want_folder=True)
dialog.open(receive)
def mount_result(self, result):
self._toggle_all_buttons(True)
2018-10-27 17:24:34 +00:00
if result['returncode'] == 0:
self._set_status(self.tr('Mounted successfully.'))
if result['params'].get('current_archive'):
archive_name = result['params']['current_archive']
row = self.row_of_archive(archive_name)
item = QTableWidgetItem(result['cmd'][-1])
self.archiveTable.setItem(row, 3, item)
def umount_action(self):
archive_name = self.selected_archive_name()
mount_point = self.mount_points.get(archive_name)
if mount_point is not None:
profile = self.profile()
params = BorgUmountJob.prepare(profile)
if not params['ok']:
self._set_status(params['message'])
return
params['current_archive'] = archive_name
if os.path.normpath(mount_point) in params['active_mount_points']:
params['cmd'].append(mount_point)
job = BorgUmountJob(params['cmd'], params, self.profile().repo.id)
job.updated.connect(self.mountErrors.setText)
job.result.connect(self.umount_result)
QApplication.instance().scheduler.jobs_manager.add_job(job)
else:
self._set_status(self.tr('Mount point not active.'))
return
def umount_result(self, result):
self._toggle_all_buttons(True)
archive_name = result['params']['current_archive']
if result['returncode'] == 0:
self._set_status(self.tr('Un-mounted successfully.'))
del self.mount_points[archive_name]
row = self.row_of_archive(archive_name)
item = QTableWidgetItem('')
self.archiveTable.setItem(row, 3, item)
else:
self._set_status(self.tr('Unmounting failed. Make sure no programs are using {}').format(
self.mount_points.get(archive_name)))
def save_prune_setting(self, new_value=None):
profile = self.profile()
for i in self.prune_intervals:
setattr(profile, f'prune_{i}', getattr(self, f'prune_{i}').value())
profile.prune_keep_within = self.prune_keep_within.text()
profile.save()
def list_archive_action(self):
profile = self.profile()
row_selected = self.archiveTable.selectionModel().selectedRows()
if row_selected:
archive_cell = self.archiveTable.item(row_selected[0].row(), 4)
if archive_cell:
archive_name = archive_cell.text()
params = BorgListArchiveJob.prepare(profile, archive_name)
if not params['ok']:
self._set_status(params['message'])
return
self._set_status('')
self._toggle_all_buttons(False)
job = BorgListArchiveJob(params['cmd'], params, self.profile().repo.id)
job.updated.connect(self.mountErrors.setText)
job.result.connect(self.list_archive_result)
QApplication.instance().scheduler.jobs_manager.add_job(job)
return job
else:
self._set_status(self.tr('Select an archive to restore first.'))
def list_archive_result(self, result):
self._set_status('')
if result['returncode'] == 0:
def process_result():
def receive():
extraction_folder = dialog.selectedFiles()
if extraction_folder:
params = BorgExtractJob.prepare(
self.profile(), archive.name, window.selected, extraction_folder[0])
if params['ok']:
self._toggle_all_buttons(False)
job = BorgExtractJob(params['cmd'], params, self.profile().repo.id)
job.updated.connect(self.mountErrors.setText)
job.result.connect(self.extract_archive_result)
QApplication.instance().scheduler.jobs_manager.add_job(job)
else:
self._set_status(params['message'])
dialog = choose_file_dialog(self, self.tr("Choose Extraction Point"), want_folder=True)
dialog.open(receive)
archive = ArchiveModel.get(name=result['params']['archive_name'])
window = ExtractDialog(result['data'], archive)
self._toggle_all_buttons(True)
window.setParent(self, QtCore.Qt.Sheet)
self._window = window # for testing
window.show()
window.accepted.connect(process_result)
def extract_archive_result(self, result):
self._toggle_all_buttons(True)
def cell_double_clicked(self, row, column):
if column == 3:
archive_name = self.selected_archive_name()
if not archive_name:
return
mount_point = self.mount_points.get(archive_name)
if mount_point is not None:
QDesktopServices.openUrl(QtCore.QUrl(f'file:///{mount_point}'))
def row_of_archive(self, archive_name):
items = self.archiveTable.findItems(archive_name, QtCore.Qt.MatchExactly)
rows = [item.row() for item in items if item.column() == 4]
return rows[0] if rows else None
def confirm_dialog(self, title, text):
2019-02-02 04:29:33 +00:00
msg = QMessageBox()
msg.setIcon(QMessageBox.Information)
msg.setText(text)
msg.setWindowTitle(title)
msg.setStandardButtons(QMessageBox.Yes | QMessageBox.Cancel)
msg.button(msg.Yes).setText(self.tr("Yes"))
msg.button(msg.Cancel).setText(self.tr("Cancel"))
return msg.exec_() == QMessageBox.Yes
def delete_action(self):
# Since this function modify the UI, we can't put the whole function in a JobQUeue.
params = BorgDeleteJob.prepare(self.profile())
if not params['ok']:
self._set_status(params['message'])
return
self.archive_name = self.selected_archive_name()
if self.archive_name is not None:
2019-02-02 04:29:33 +00:00
if not self.confirm_dialog(trans_late('ArchiveTab', "Confirm deletion"),
trans_late('ArchiveTab', "Are you sure you want to delete the archive?")):
return
params['cmd'][-1] += f'::{self.archive_name}'
job = BorgDeleteJob(params['cmd'], params, self.profile().repo.id)
job.updated.connect(self._set_status)
job.result.connect(self.delete_result)
self._toggle_all_buttons(False)
QApplication.instance().scheduler.jobs_manager.add_job(job)
else:
self._set_status(self.tr("No archive selected"))
def delete_result(self, result):
if result['returncode'] == 0:
self._set_status(self.tr('Archive deleted.'))
deleted_row = self.archiveTable.findItems(self.archive_name, QtCore.Qt.MatchExactly)[0].row()
self.archiveTable.removeRow(deleted_row)
ArchiveModel.get(name=self.archive_name).delete_instance()
del self.archive_name
else:
self._toggle_all_buttons(True)
2019-07-04 19:17:09 +00:00
def diff_action(self):
def process_result():
if window.selected_archives:
self.selected_archives = window.selected_archives
archive_cell_newer = self.archiveTable.item(self.selected_archives[0], 4)
archive_cell_older = self.archiveTable.item(self.selected_archives[1], 4)
2019-07-04 19:17:09 +00:00
if archive_cell_older and archive_cell_newer:
archive_name_newer = archive_cell_newer.text()
archive_name_older = archive_cell_older.text()
params = BorgDiffJob.prepare(profile, archive_name_older, archive_name_newer)
2019-07-04 19:17:09 +00:00
if params['ok']:
self._toggle_all_buttons(False)
job = BorgDiffJob(params['cmd'], params, self.profile().repo.id)
job.updated.connect(self.mountErrors.setText)
job.result.connect(self.list_diff_result)
QApplication.instance().scheduler.jobs_manager.add_job(job)
2019-07-04 19:17:09 +00:00
else:
self._set_status(params['message'])
profile = self.profile()
window = DiffDialog(self.archiveTable)
self._toggle_all_buttons(True)
window.setParent(self, QtCore.Qt.Sheet)
self._window = window # for testing
window.show()
window.accepted.connect(process_result)
2019-07-04 19:17:09 +00:00
def list_diff_result(self, result):
self._set_status('')
if result['returncode'] == 0:
archive_newer = ArchiveModel.get(name=result['params']['archive_name_newer'])
archive_older = ArchiveModel.get(name=result['params']['archive_name_older'])
window = DiffResult(result['data'], archive_newer, archive_older, result['params']['json_lines'])
2019-07-04 19:17:09 +00:00
self._toggle_all_buttons(True)
window.setParent(self, QtCore.Qt.Sheet)
self._resultwindow = window # for testing
2019-07-04 19:17:09 +00:00
window.show()
def rename_action(self):
profile = self.profile()
params = BorgRenameJob.prepare(profile)
if not params['ok']:
self._set_status(params['message'])
return
archive_name = self.selected_archive_name()
if archive_name is not None:
new_name, finished = QInputDialog.getText(
self,
self.tr("Change name"),
self.tr("New archive name:"),
text=archive_name)
if not finished:
return
if not new_name:
self._set_status(self.tr('Archive name cannot be blank.'))
return
new_name_exists = ArchiveModel.get_or_none(name=new_name, repo=profile.repo)
if new_name_exists is not None:
self._set_status(self.tr('An archive with this name already exists.'))
return
params['cmd'][-1] += f'::{archive_name}'
params['cmd'].append(new_name)
job = BorgRenameJob(params['cmd'], params, self.profile().repo.id)
job.updated.connect(self._set_status)
job.result.connect(self.rename_result)
self._toggle_all_buttons(False)
QApplication.instance().scheduler.jobs_manager.add_job(job)
else:
self._set_status(self.tr("No archive selected"))
def rename_result(self, result):
if result['returncode'] == 0:
self._set_status(self.tr('Archive renamed.'))
self.populate_from_profile()
else:
self._toggle_all_buttons(True)