vorta/src/vorta/views/archive_tab.py

213 lines
8.4 KiB
Python
Raw Normal View History

import sys
from datetime import timedelta
from PyQt5 import uic, QtCore
from PyQt5.QtCore import QSize
from PyQt5.QtGui import QIcon
from PyQt5.QtWidgets import QTableWidgetItem, QTableView, QHeaderView, QComboBox, QToolButton, QButtonGroup, QToolBar
2018-10-27 17:24:34 +00:00
from vorta.borg.prune import BorgPruneThread
from vorta.borg.list import BorgListThread
from vorta.borg.check import BorgCheckThread
from vorta.borg.mount import BorgMountThread
from vorta.borg.umount import BorgUmountThread
from vorta.views.extract_dialog import ExtractDialog
from vorta.utils import get_asset, pretty_bytes, choose_folder_dialog
from vorta.models import BackupProfileMixin, ArchiveModel
2018-10-27 17:24:34 +00:00
uifile = get_asset('UI/archivetab.ui')
ArchiveTabUI, ArchiveTabBase = uic.loadUiType(uifile, from_imports=True, import_from='vorta.views')
2018-10-27 17:24:34 +00:00
class ArchiveTab(ArchiveTabBase, ArchiveTabUI, BackupProfileMixin):
prune_intervals = ['hour', 'day', 'week', 'month', 'year']
2018-10-27 17:24:34 +00:00
def __init__(self, parent=None):
super().__init__(parent)
self.setupUi(parent)
self.mount_point = None
2018-10-27 17:24:34 +00:00
header = self.archiveTable.horizontalHeader()
2018-10-27 17:24:34 +00:00
header.setVisible(True)
header.setSectionResizeMode(0, QHeaderView.ResizeToContents)
2018-10-27 17:24:34 +00:00
header.setSectionResizeMode(1, QHeaderView.ResizeToContents)
header.setSectionResizeMode(2, QHeaderView.ResizeToContents)
header.setSectionResizeMode(3, QHeaderView.Stretch)
header.setStretchLastSection(True)
2018-10-27 17:24:34 +00:00
if sys.platform != 'darwin':
self._set_status('') # Set platform-specific hints.
self.archiveTable.setSelectionBehavior(QTableView.SelectRows)
self.archiveTable.setEditTriggers(QTableView.NoEditTriggers)
self.archiveTable.setAlternatingRowColors(True)
2018-10-27 17:24:34 +00:00
# Populate pruning options from database
for i in self.prune_intervals:
getattr(self, f'prune_{i}').setValue(getattr(self.profile(), f'prune_{i}'))
getattr(self, f'prune_{i}').valueChanged.connect(self.save_prune_setting)
self.mountButton.clicked.connect(self.mount_action)
self.listButton.clicked.connect(self.list_action)
self.pruneButton.clicked.connect(self.prune_action)
self.checkButton.clicked.connect(self.check_action)
self.extractButton.clicked.connect(self.extract_action)
2018-10-27 17:24:34 +00:00
self.populate_from_profile()
2018-10-27 17:24:34 +00:00
def _set_status(self, text):
self.mountErrors.setText(text)
self.mountErrors.repaint()
def _toggle_all_buttons(self, enabled=True):
self.checkButton.setEnabled(enabled)
self.listButton.setEnabled(enabled)
self.pruneButton.setEnabled(enabled)
self.mountButton.setEnabled(enabled)
def populate_from_profile(self):
profile = self.profile()
if profile.repo is not None:
self.currentRepoLabel.setText(profile.repo.url)
archives = [s for s in profile.repo.archives.select().order_by(ArchiveModel.time.desc())]
2018-10-27 17:24:34 +00:00
for row, archive in enumerate(archives):
self.archiveTable.insertRow(row)
formatted_time = archive.time.strftime('%Y-%m-%d %H:%M')
self.archiveTable.setItem(row, 0, QTableWidgetItem(formatted_time))
self.archiveTable.setItem(row, 1, QTableWidgetItem(pretty_bytes(archive.size)))
if archive.duration is not None:
formatted_duration = str(timedelta(seconds=round(archive.duration)))
else:
formatted_duration = ''
self.archiveTable.setItem(row, 2, QTableWidgetItem(formatted_duration))
self.archiveTable.setItem(row, 3, QTableWidgetItem(archive.name))
self.archiveTable.setRowCount(len(archives))
self._toggle_all_buttons(enabled=True)
2018-11-02 11:14:54 +00:00
else:
self.archiveTable.setRowCount(0)
self.currentRepoLabel.setText('N/A')
self._toggle_all_buttons(enabled=False)
def check_action(self):
params = BorgCheckThread.prepare(self.profile())
if params['ok']:
thread = BorgCheckThread(params['cmd'], params, parent=self)
thread.updated.connect(self._set_status)
thread.result.connect(self.check_result)
self._toggle_all_buttons(False)
thread.start()
def check_result(self, result):
if result['returncode'] == 0:
self._toggle_all_buttons(True)
def prune_action(self):
params = BorgPruneThread.prepare(self.profile())
if params['ok']:
thread = BorgPruneThread(params['cmd'], params, parent=self)
thread.updated.connect(self._set_status)
thread.result.connect(self.prune_result)
self._toggle_all_buttons(False)
thread.start()
def prune_result(self, result):
if result['returncode'] == 0:
self._set_status('Pruning finished.')
self.list_action()
else:
self._toggle_all_buttons(True)
def list_action(self):
params = BorgListThread.prepare(self.profile())
if params['ok']:
thread = BorgListThread(params['cmd'], params, parent=self)
thread.updated.connect(self._set_status)
thread.result.connect(self.list_result)
self._toggle_all_buttons(False)
thread.start()
def list_result(self, result):
self._toggle_all_buttons(True)
if result['returncode'] == 0:
self._set_status('Refreshed snapshots.')
self.populate_from_profile()
def mount_action(self):
profile = self.profile()
params = BorgMountThread.prepare(profile)
if not params['ok']:
self._set_status(params['message'])
return
# Conditions are met (borg binary available, etc)
row_selected = self.archiveTable.selectionModel().selectedRows()
2018-10-27 17:24:34 +00:00
if row_selected:
snapshot_cell = self.archiveTable.item(row_selected[0].row(), 3)
2018-10-27 17:24:34 +00:00
if snapshot_cell:
snapshot_name = snapshot_cell.text()
params['cmd'][-1] += f'::{snapshot_name}'
2018-10-27 17:24:34 +00:00
def receive():
mount_point = dialog.selectedFiles()
if mount_point:
params['cmd'].append(mount_point[0])
self.mount_point = mount_point[0]
if params['ok']:
self._toggle_all_buttons(False)
thread = BorgMountThread(params['cmd'], params, parent=self)
thread.updated.connect(self.mountErrors.setText)
thread.result.connect(self.mount_result)
thread.start()
dialog = choose_folder_dialog(self, "Choose Mount Point")
dialog.open(receive)
def mount_result(self, result):
self._toggle_all_buttons(True)
2018-10-27 17:24:34 +00:00
if result['returncode'] == 0:
self._set_status('Mounted successfully.')
self.mountButton.setText('Unmount')
self.mountButton.clicked.disconnect()
self.mountButton.clicked.connect(self.umount_action)
else:
self.mount_point = None
def umount_action(self):
if self.mount_point is not None:
profile = self.profile()
params = BorgUmountThread.prepare(profile)
if not params['ok']:
self._set_status(params['message'])
return
if self.mount_point in params['active_mount_points']:
params['cmd'].append(self.mount_point)
thread = BorgUmountThread(params['cmd'], params, parent=self)
thread.updated.connect(self.mountErrors.setText)
thread.result.connect(self.umount_result)
thread.start()
else:
self._set_status('Mount point not active. Try restarting Vorta.')
return
def umount_result(self, result):
self._toggle_all_buttons(True)
if result['returncode'] == 0:
self._set_status('Un-mounted successfully.')
self.mountButton.setText('Mount')
self.mountButton.clicked.disconnect()
self.mountButton.clicked.connect(self.mount_action)
self.mount_point = None
def save_prune_setting(self, new_value):
profile = self.profile()
for i in self.prune_intervals:
setattr(profile, f'prune_{i}', getattr(self, f'prune_{i}').value())
profile.save()
def extract_action(self):
window = ExtractDialog()
window.setParent(self, QtCore.Qt.Sheet)
window.show()