mirror of https://github.com/borgbase/vorta
Prevent running backups on metered network. By @ktosiek
This commit is contained in:
parent
79f29b2430
commit
a0e7d50318
|
@ -17,7 +17,8 @@
|
|||
"--talk-name=org.freedesktop.Flatpak.*",
|
||||
"--talk-name=org.freedesktop.secrets",
|
||||
"--socket=ssh-auth",
|
||||
"--talk-name=org.freedesktop.Notifications"
|
||||
"--talk-name=org.freedesktop.Notifications",
|
||||
"--system-talk-name=org.freedesktop.NetworkManager"
|
||||
],
|
||||
"build-options": {
|
||||
"env": {
|
||||
|
|
|
@ -26,15 +26,15 @@
|
|||
</font>
|
||||
</property>
|
||||
<property name="currentIndex">
|
||||
<number>0</number>
|
||||
<number>1</number>
|
||||
</property>
|
||||
<widget class="QWidget" name="schedule">
|
||||
<property name="geometry">
|
||||
<rect>
|
||||
<x>0</x>
|
||||
<y>0</y>
|
||||
<width>663</width>
|
||||
<height>354</height>
|
||||
<width>669</width>
|
||||
<height>361</height>
|
||||
</rect>
|
||||
</property>
|
||||
<property name="font">
|
||||
|
@ -355,8 +355,8 @@
|
|||
<rect>
|
||||
<x>0</x>
|
||||
<y>0</y>
|
||||
<width>663</width>
|
||||
<height>354</height>
|
||||
<width>669</width>
|
||||
<height>361</height>
|
||||
</rect>
|
||||
</property>
|
||||
<attribute name="label">
|
||||
|
@ -367,15 +367,48 @@
|
|||
<number>0</number>
|
||||
</property>
|
||||
<item row="1" column="0">
|
||||
<widget class="QListWidget" name="wifiListWidget"/>
|
||||
</item>
|
||||
<item row="0" column="0">
|
||||
<widget class="QLabel" name="wifiListLabel">
|
||||
<property name="enabled">
|
||||
<bool>true</bool>
|
||||
</property>
|
||||
<property name="text">
|
||||
<string>Allowed Networks:</string>
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
<item row="0" column="0">
|
||||
<widget class="QCheckBox" name="dontRunOnMeteredNetworksCheckBox">
|
||||
<property name="text">
|
||||
<string>Don't run backup over metered networks</string>
|
||||
</property>
|
||||
<property name="tristate">
|
||||
<bool>false</bool>
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
<item row="2" column="0">
|
||||
<widget class="QListWidget" name="wifiListWidget">
|
||||
<property name="sizePolicy">
|
||||
<sizepolicy hsizetype="Expanding" vsizetype="Expanding">
|
||||
<horstretch>0</horstretch>
|
||||
<verstretch>1</verstretch>
|
||||
</sizepolicy>
|
||||
</property>
|
||||
</widget>
|
||||
</item>
|
||||
<item row="3" column="0">
|
||||
<spacer name="verticalSpacer_3">
|
||||
<property name="orientation">
|
||||
<enum>Qt::Vertical</enum>
|
||||
</property>
|
||||
<property name="sizeHint" stdset="0">
|
||||
<size>
|
||||
<width>20</width>
|
||||
<height>0</height>
|
||||
</size>
|
||||
</property>
|
||||
</spacer>
|
||||
</item>
|
||||
</layout>
|
||||
</widget>
|
||||
<widget class="QWidget" name="page">
|
||||
|
@ -383,8 +416,8 @@
|
|||
<rect>
|
||||
<x>0</x>
|
||||
<y>0</y>
|
||||
<width>663</width>
|
||||
<height>354</height>
|
||||
<width>669</width>
|
||||
<height>361</height>
|
||||
</rect>
|
||||
</property>
|
||||
<attribute name="label">
|
||||
|
@ -444,8 +477,8 @@
|
|||
<rect>
|
||||
<x>0</x>
|
||||
<y>0</y>
|
||||
<width>663</width>
|
||||
<height>354</height>
|
||||
<width>669</width>
|
||||
<height>361</height>
|
||||
</rect>
|
||||
</property>
|
||||
<attribute name="label">
|
||||
|
|
|
@ -4,7 +4,7 @@ from dateutil import parser
|
|||
import subprocess
|
||||
|
||||
from vorta.i18n import trans_late
|
||||
from vorta.utils import get_current_wifi, format_archive_name, borg_compat
|
||||
from vorta.utils import format_archive_name, borg_compat, network_status_monitor
|
||||
from vorta.models import SourceFileModel, ArchiveModel, WifiSettingModel, RepoModel
|
||||
from .borg_thread import BorgThread
|
||||
|
||||
|
@ -78,7 +78,7 @@ class BorgCreateThread(BorgThread):
|
|||
ret['message'] = trans_late('messages', 'Add some folders to back up first.')
|
||||
return ret
|
||||
|
||||
current_wifi = get_current_wifi()
|
||||
current_wifi = network_status_monitor.get_current_wifi()
|
||||
if current_wifi is not None:
|
||||
wifi_is_disallowed = WifiSettingModel.select().where(
|
||||
(
|
||||
|
@ -92,6 +92,11 @@ class BorgCreateThread(BorgThread):
|
|||
if wifi_is_disallowed.count() > 0 and profile.repo.is_remote_repo():
|
||||
ret['message'] = trans_late('messages', 'Current Wifi is not allowed.')
|
||||
return ret
|
||||
|
||||
if profile.dont_run_on_metered_networks and network_status_monitor.is_network_metered():
|
||||
ret['message'] = trans_late('messages', 'Not running backup over metered connection.')
|
||||
return ret
|
||||
|
||||
ret['profile'] = profile
|
||||
ret['repo'] = profile.repo
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ from playhouse.migrate import SqliteMigrator, migrate
|
|||
from vorta.i18n import trans_late
|
||||
from vorta.utils import slugify
|
||||
|
||||
SCHEMA_VERSION = 14
|
||||
SCHEMA_VERSION = 15
|
||||
|
||||
db = pw.Proxy()
|
||||
|
||||
|
@ -89,6 +89,7 @@ class BackupProfileModel(pw.Model):
|
|||
prune_prefix = pw.CharField(default="{hostname}-{profile_slug}-")
|
||||
pre_backup_cmd = pw.CharField(default='')
|
||||
post_backup_cmd = pw.CharField(default='')
|
||||
dont_run_on_metered_networks = pw.BooleanField(default=True)
|
||||
|
||||
def refresh(self):
|
||||
return type(self).get(self._pk_expr())
|
||||
|
@ -351,6 +352,13 @@ def init_db(con=None):
|
|||
migrator.add_column(SettingsModel._meta.table_name,
|
||||
'str_value', pw.CharField(default='')))
|
||||
|
||||
if current_schema.version < 15:
|
||||
_apply_schema_update(
|
||||
current_schema, 15,
|
||||
migrator.add_column(BackupProfileModel._meta.table_name,
|
||||
'dont_run_on_metered_networks', pw.BooleanField(default=True))
|
||||
)
|
||||
|
||||
# Create missing settings and update labels. Leave setting values untouched.
|
||||
for setting in get_misc_settings():
|
||||
s, created = SettingsModel.get_or_create(key=setting['key'], defaults=setting)
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
import sys
|
||||
from datetime import datetime
|
||||
from typing import Optional, NamedTuple, List
|
||||
|
||||
|
||||
class NetworkStatusMonitor:
|
||||
@classmethod
|
||||
def get_network_status_monitor(cls) -> 'NetworkStatusMonitor':
|
||||
if sys.platform == 'darwin':
|
||||
from .darwin import DarwinNetworkStatus
|
||||
return DarwinNetworkStatus()
|
||||
else:
|
||||
from .network_manager import NetworkManagerMonitor, UnsupportedException
|
||||
try:
|
||||
return NetworkManagerMonitor()
|
||||
except UnsupportedException:
|
||||
return NullNetworkStatusMonitor()
|
||||
|
||||
def is_network_status_available(self):
|
||||
"""Is the network status really available, and not just a dummy implementation?"""
|
||||
return type(self) != NetworkStatusMonitor
|
||||
|
||||
def is_network_metered(self) -> bool:
|
||||
"""Is the currently connected network a metered connection?"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def get_current_wifi(self) -> Optional[str]:
|
||||
"""Get current SSID or None if Wifi is off."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def get_known_wifis(self) -> Optional[List['SystemWifiInfo']]:
|
||||
"""Get WiFi networks known to system."""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class SystemWifiInfo(NamedTuple):
|
||||
ssid: str
|
||||
last_connected: Optional[datetime]
|
||||
|
||||
|
||||
class NullNetworkStatusMonitor(NetworkStatusMonitor):
|
||||
"""Dummy implementation, in case we don't have one for current platform."""
|
||||
|
||||
def is_network_status_available(self):
|
||||
return False
|
||||
|
||||
def is_network_metered(self) -> bool:
|
||||
return False
|
||||
|
||||
def get_current_wifi(self) -> Optional[str]:
|
||||
pass
|
||||
|
||||
def get_known_wifis(self) -> Optional[List['SystemWifiInfo']]:
|
||||
pass
|
|
@ -0,0 +1,83 @@
|
|||
import plistlib
|
||||
import shlex
|
||||
import subprocess
|
||||
import xml
|
||||
from typing import Optional, Iterator
|
||||
|
||||
from peewee import format_date_time
|
||||
|
||||
from vorta.log import logger
|
||||
from vorta.network_status.abc import NetworkStatusMonitor, SystemWifiInfo
|
||||
|
||||
|
||||
class DarwinNetworkStatus(NetworkStatusMonitor):
|
||||
def is_network_metered(self) -> bool:
|
||||
return any(is_network_metered(d) for d in get_network_devices())
|
||||
|
||||
def get_current_wifi(self) -> Optional[str]:
|
||||
"""
|
||||
Get current SSID or None if Wifi is off.
|
||||
|
||||
From https://gist.github.com/keithweaver/00edf356e8194b89ed8d3b7bbead000c
|
||||
"""
|
||||
cmd = ['/System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport', '-I']
|
||||
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
|
||||
out, err = process.communicate()
|
||||
process.wait()
|
||||
for line in out.decode("utf-8").split('\n'):
|
||||
split_line = line.strip().split(':')
|
||||
if split_line[0] == 'SSID':
|
||||
return split_line[1].strip()
|
||||
|
||||
def get_known_wifis(self):
|
||||
from vorta.models import WifiSettingModel
|
||||
plist_path = '/Library/Preferences/SystemConfiguration/com.apple.airport.preferences.plist'
|
||||
|
||||
try:
|
||||
plist_file = open(plist_path, 'rb')
|
||||
wifis = plistlib.load(plist_file).get('KnownNetworks')
|
||||
except xml.parsers.expat.ExpatError:
|
||||
logger.error('Unable to parse list of Wifi networks.')
|
||||
return None
|
||||
|
||||
result = []
|
||||
if wifis is not None:
|
||||
for wifi in wifis.values():
|
||||
raw_last_connected = wifi.get('LastConnected', None)
|
||||
last_connected = None if not raw_last_connected \
|
||||
else format_date_time(raw_last_connected, WifiSettingModel.last_connected.formats)
|
||||
ssid = wifi.get('SSIDString', None)
|
||||
|
||||
if ssid is None:
|
||||
continue
|
||||
|
||||
result.append(SystemWifiInfo(ssid=ssid, last_connected=last_connected))
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def get_network_devices() -> Iterator[str]:
|
||||
for line in call_networksetup_listallhardwareports().splitlines():
|
||||
if line.startswith(b'Device: '):
|
||||
yield line.split()[1].strip().decode('ascii')
|
||||
|
||||
|
||||
def is_network_metered(bsd_device) -> bool:
|
||||
return b'ANDROID_METERED' in call_ipconfig_getpacket(bsd_device)
|
||||
|
||||
|
||||
def call_ipconfig_getpacket(bsd_device):
|
||||
cmd = ['ipconfig', 'getpacket', bsd_device]
|
||||
try:
|
||||
return subprocess.check_output(cmd)
|
||||
except subprocess.CalledProcessError:
|
||||
logger.warn("Command %s failed", shlex.join(cmd))
|
||||
return b''
|
||||
|
||||
|
||||
def call_networksetup_listallhardwareports():
|
||||
cmd = ['networksetup', '-listallhardwareports']
|
||||
try:
|
||||
return subprocess.check_output(cmd)
|
||||
except subprocess.CalledProcessError:
|
||||
logger.warn("Command %s failed", shlex.join(cmd))
|
|
@ -0,0 +1,164 @@
|
|||
import logging
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import Optional, List, Any, NamedTuple, Mapping
|
||||
|
||||
from PyQt5 import QtDBus
|
||||
from PyQt5.QtCore import QObject, QVersionNumber
|
||||
|
||||
from vorta.network_status.abc import NetworkStatusMonitor, SystemWifiInfo
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NetworkManagerMonitor(NetworkStatusMonitor):
|
||||
def __init__(self, nm_adapter: 'NetworkManagerDBusAdapter' = None):
|
||||
self._nm = nm_adapter or NetworkManagerDBusAdapter.get_system_nm_adapter()
|
||||
|
||||
def is_network_metered(self) -> bool:
|
||||
return self._nm.get_global_metered_status() in (NMMetered.YES, NMMetered.GUESS_YES)
|
||||
|
||||
def get_current_wifi(self) -> Optional[str]:
|
||||
# Only check the primary connection. VPN over WiFi will still show the WiFi as Primary Connection.
|
||||
# We don't check all active connections, as NM won't disable WiFi when connecting a cable.
|
||||
active_connection_path = self._nm.get_primary_connection_path()
|
||||
if not active_connection_path:
|
||||
return
|
||||
active_connection = self._nm.get_active_connection_info(active_connection_path)
|
||||
if active_connection.type == '802-11-wireless':
|
||||
settings = self._nm.get_settings(active_connection.connection)
|
||||
ssid = self._get_ssid_from_settings(settings)
|
||||
if ssid:
|
||||
return ssid
|
||||
|
||||
def get_known_wifis(self) -> Optional[List[SystemWifiInfo]]:
|
||||
wifis = []
|
||||
for connection_path in self._nm.get_connections_paths():
|
||||
settings = self._nm.get_settings(connection_path)
|
||||
ssid = self._get_ssid_from_settings(settings)
|
||||
if ssid:
|
||||
timestamp = settings['connection'].get('timestamp')
|
||||
wifis.append(SystemWifiInfo(
|
||||
ssid=ssid,
|
||||
last_connected=timestamp and datetime.utcfromtimestamp(timestamp),
|
||||
))
|
||||
return wifis
|
||||
|
||||
def _get_ssid_from_settings(self, settings):
|
||||
wireless_settings = settings.get('802-11-wireless') or {}
|
||||
raw_ssid = wireless_settings.get('ssid')
|
||||
ssid = raw_ssid and decode_ssid(raw_ssid)
|
||||
return ssid
|
||||
|
||||
|
||||
def decode_ssid(raw_ssid: List[int]) -> Optional[str]:
|
||||
"""SSIDs are binary strings, but we need something to show to the user."""
|
||||
# Best effort UTF-8 decoding, as most SSIDs are UTF-8 (or even ASCII)
|
||||
str_ssid = bytes(raw_ssid).decode('utf-8', 'surrogateescape')
|
||||
if str_ssid.isprintable():
|
||||
return str_ssid
|
||||
else:
|
||||
return ''.join(
|
||||
c if c.isprintable() else ascii(c)[1:-1]
|
||||
for c in str_ssid
|
||||
)
|
||||
|
||||
|
||||
class UnsupportedException(Exception):
|
||||
"""NetworkManager is not available"""
|
||||
|
||||
|
||||
class DBusException(Exception):
|
||||
"""Failed to call a DBus method"""
|
||||
|
||||
|
||||
class NetworkManagerDBusAdapter(QObject):
|
||||
"""Simple adapter to NetworkManager's DBus interface.
|
||||
This should be the only part of NM support that needs manual testing."""
|
||||
BUS_NAME = 'org.freedesktop.NetworkManager'
|
||||
NM_PATH = '/org/freedesktop/NetworkManager'
|
||||
|
||||
def __init__(self, parent, bus):
|
||||
super().__init__(parent)
|
||||
self._bus = bus
|
||||
self._nm = self._get_iface(self.NM_PATH, 'org.freedesktop.NetworkManager')
|
||||
|
||||
@classmethod
|
||||
def get_system_nm_adapter(cls) -> 'NetworkManagerDBusAdapter':
|
||||
bus = QtDBus.QDBusConnection.systemBus()
|
||||
if not bus.isConnected():
|
||||
raise UnsupportedException("Can't connect to system bus")
|
||||
nm_adapter = cls(parent=None, bus=bus)
|
||||
if not nm_adapter.isValid():
|
||||
raise UnsupportedException("Can't connect to NetworkManager")
|
||||
return nm_adapter
|
||||
|
||||
def isValid(self):
|
||||
if not self._nm.isValid():
|
||||
return False
|
||||
nm_version = self._get_nm_version()
|
||||
if nm_version < QVersionNumber(1, 2):
|
||||
logger.warning('NetworkManager version 1.2 or later required, found %s', nm_version.toString())
|
||||
return False
|
||||
return True
|
||||
|
||||
def get_primary_connection_path(self) -> Optional[str]:
|
||||
return read_dbus_property(self._nm, 'PrimaryConnection')
|
||||
|
||||
def get_active_connection_info(self, active_connection_path) -> 'ActiveConnectionInfo':
|
||||
active_connection = self._get_iface(active_connection_path, 'org.freedesktop.NetworkManager.Connection.Active')
|
||||
return ActiveConnectionInfo(
|
||||
connection=read_dbus_property(active_connection, 'Connection'),
|
||||
type=read_dbus_property(active_connection, 'Type'),
|
||||
)
|
||||
|
||||
def get_connections_paths(self) -> List[str]:
|
||||
settings_manager = self._get_iface(self.NM_PATH + '/Settings', 'org.freedesktop.NetworkManager.Settings')
|
||||
return get_result(settings_manager.call('ListConnections'))
|
||||
|
||||
def get_settings(self, connection_path) -> Mapping[str, Mapping[str, Any]]:
|
||||
settings = self._get_iface(connection_path, 'org.freedesktop.NetworkManager.Settings.Connection')
|
||||
return get_result(settings.call('GetSettings'))
|
||||
|
||||
def get_global_metered_status(self) -> 'NMMetered':
|
||||
return NMMetered(read_dbus_property(self._nm, 'Metered'))
|
||||
|
||||
def _get_nm_version(self):
|
||||
version, _suffindex = QVersionNumber.fromString(read_dbus_property(self._nm, 'Version'))
|
||||
return version
|
||||
|
||||
def _get_iface(self, path, interface) -> QtDBus.QDBusInterface:
|
||||
return QtDBus.QDBusInterface(self.BUS_NAME, path, interface, self._bus)
|
||||
|
||||
|
||||
def read_dbus_property(obj, property):
|
||||
# QDBusInterface.property() didn't work for some reason
|
||||
props = QtDBus.QDBusInterface(obj.service(), obj.path(), 'org.freedesktop.DBus.Properties', obj.connection())
|
||||
msg = props.call('Get', obj.interface(), property)
|
||||
return get_result(msg)
|
||||
|
||||
|
||||
def get_result(msg: QtDBus.QDBusMessage) -> Any:
|
||||
if msg.type() == msg.MessageType.ReplyMessage:
|
||||
return msg.arguments()[0]
|
||||
else:
|
||||
raise DBusException("DBus call failed: {}".format(msg.arguments()))
|
||||
|
||||
|
||||
class ActiveConnectionInfo(NamedTuple):
|
||||
connection: str
|
||||
type: str
|
||||
|
||||
|
||||
class NMMetered(Enum):
|
||||
UNKNOWN = 0
|
||||
YES = 1
|
||||
NO = 2
|
||||
GUESS_YES = 3
|
||||
GUESS_NO = 4
|
||||
|
||||
|
||||
class NMDeviceType(Enum):
|
||||
# Only the types we care about
|
||||
UNKNOWN = 0
|
||||
WIFI = 2
|
|
@ -4,12 +4,9 @@ import getpass
|
|||
import operator
|
||||
import os
|
||||
import platform
|
||||
import plistlib
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import unicodedata
|
||||
import xml
|
||||
from collections import defaultdict
|
||||
from datetime import datetime as dt
|
||||
from functools import reduce
|
||||
|
@ -25,12 +22,15 @@ from PyQt5.QtWidgets import QApplication, QFileDialog, QSystemTrayIcon
|
|||
from vorta.borg._compatibility import BorgCompatibility
|
||||
from vorta.keyring.abc import VortaKeyring
|
||||
from vorta.log import logger
|
||||
from vorta.network_status.abc import NetworkStatusMonitor
|
||||
|
||||
QApplication.setAttribute(QtCore.Qt.AA_EnableHighDpiScaling, True) # enable highdpi scaling
|
||||
QApplication.setAttribute(QtCore.Qt.AA_UseHighDpiPixmaps, True) # use highdpi icons
|
||||
|
||||
keyring = VortaKeyring.get_keyring()
|
||||
logger.info('Using %s Keyring implementation.', keyring.__class__.__name__)
|
||||
network_status_monitor = NetworkStatusMonitor.get_network_status_monitor()
|
||||
logger.info('Using %s NetworkStatusMonitor implementation.', network_status_monitor.__class__.__name__)
|
||||
|
||||
borg_compat = BorgCompatibility()
|
||||
|
||||
|
@ -127,63 +127,33 @@ def get_sorted_wifis(profile):
|
|||
|
||||
from vorta.models import WifiSettingModel
|
||||
|
||||
if sys.platform == 'darwin':
|
||||
plist_path = '/Library/Preferences/SystemConfiguration/com.apple.airport.preferences.plist'
|
||||
system_wifis = network_status_monitor.get_known_wifis()
|
||||
if system_wifis is None:
|
||||
# Don't show any networks if we can't get the current list
|
||||
return []
|
||||
|
||||
try:
|
||||
plist_file = open(plist_path, 'rb')
|
||||
wifis = plistlib.load(plist_file).get('KnownNetworks')
|
||||
except xml.parsers.expat.ExpatError:
|
||||
logger.error('Unable to parse list of Wifi networks.')
|
||||
return []
|
||||
for wifi in system_wifis:
|
||||
db_wifi, created = WifiSettingModel.get_or_create(
|
||||
ssid=wifi.ssid,
|
||||
profile=profile.id,
|
||||
defaults={'last_connected': wifi.last_connected, 'allowed': True}
|
||||
)
|
||||
|
||||
if wifis is not None:
|
||||
for wifi in wifis.values():
|
||||
timestamp = wifi.get('LastConnected', None)
|
||||
ssid = wifi.get('SSIDString', None)
|
||||
# update last connected time
|
||||
if not created and db_wifi.last_connected != wifi.last_connected:
|
||||
db_wifi.last_connected = wifi.last_connected
|
||||
db_wifi.save()
|
||||
|
||||
if ssid is None:
|
||||
continue
|
||||
|
||||
db_wifi, created = WifiSettingModel.get_or_create(
|
||||
ssid=ssid,
|
||||
profile=profile.id,
|
||||
defaults={'last_connected': timestamp, 'allowed': True}
|
||||
)
|
||||
|
||||
# update last connected time
|
||||
if not created and db_wifi.last_connected != timestamp:
|
||||
db_wifi.last_connected = timestamp
|
||||
db_wifi.save()
|
||||
|
||||
# remove Wifis that were deleted in the system.
|
||||
deleted_wifis = WifiSettingModel.select() \
|
||||
.where(WifiSettingModel.ssid.not_in([w['SSIDString'] for w in wifis.values() if 'SSIDString' in w]))
|
||||
for wifi in deleted_wifis:
|
||||
wifi.delete_instance()
|
||||
# remove Wifis that were deleted in the system.
|
||||
deleted_wifis = WifiSettingModel.select() \
|
||||
.where(WifiSettingModel.ssid.not_in([wifi.ssid for wifi in system_wifis]))
|
||||
for wifi in deleted_wifis:
|
||||
wifi.delete_instance()
|
||||
|
||||
return WifiSettingModel.select() \
|
||||
.where(WifiSettingModel.profile == profile.id).order_by(-WifiSettingModel.last_connected)
|
||||
|
||||
|
||||
def get_current_wifi():
|
||||
"""
|
||||
Get current SSID or None if Wifi is off.
|
||||
|
||||
From https://gist.github.com/keithweaver/00edf356e8194b89ed8d3b7bbead000c
|
||||
"""
|
||||
|
||||
if sys.platform == 'darwin':
|
||||
cmd = ['/System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport', '-I']
|
||||
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
|
||||
out, err = process.communicate()
|
||||
process.wait()
|
||||
for line in out.decode("utf-8").split('\n'):
|
||||
split_line = line.strip().split(':')
|
||||
if split_line[0] == 'SSID':
|
||||
return split_line[1].strip()
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description='Vorta Backup GUI for Borg.')
|
||||
parser.add_argument('--version', '-V',
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
import sys
|
||||
|
||||
from PyQt5 import QtCore, uic
|
||||
from PyQt5.QtWidgets import QShortcut, QMessageBox
|
||||
from PyQt5.QtGui import QKeySequence
|
||||
|
@ -7,7 +5,7 @@ from PyQt5.QtGui import QKeySequence
|
|||
from vorta.borg.borg_thread import BorgThread
|
||||
from vorta.i18n import trans_late
|
||||
from vorta.models import BackupProfileModel, SettingsModel
|
||||
from vorta.utils import borg_compat, get_asset, is_system_tray_available
|
||||
from vorta.utils import borg_compat, get_asset, is_system_tray_available, network_status_monitor
|
||||
from vorta.views.utils import get_colored_icon
|
||||
|
||||
from .archive_tab import ArchiveTab
|
||||
|
@ -76,7 +74,7 @@ class MainWindow(MainWindowBase, MainWindowUI):
|
|||
self.profileAddButton.clicked.connect(self.profile_add_action)
|
||||
|
||||
# OS-specific startup options:
|
||||
if sys.platform != 'darwin':
|
||||
if not network_status_monitor.is_network_status_available():
|
||||
# Hide Wifi-rule section in schedule tab.
|
||||
self.scheduleTab.wifiListLabel.hide()
|
||||
self.scheduleTab.wifiListWidget.hide()
|
||||
|
|
|
@ -25,6 +25,9 @@ class ScheduleTab(ScheduleBase, ScheduleUI, BackupProfileMixin):
|
|||
self.scheduleApplyButton.clicked.connect(self.on_scheduler_apply)
|
||||
self.app.backup_finished_event.connect(self.init_logs)
|
||||
|
||||
self.dontRunOnMeteredNetworksCheckBox.stateChanged.connect(
|
||||
self.on_dont_run_on_metered_networks_changed)
|
||||
|
||||
self.init_logs()
|
||||
self.populate_from_profile()
|
||||
self.set_icons()
|
||||
|
@ -52,6 +55,8 @@ class ScheduleTab(ScheduleBase, ScheduleUI, BackupProfileMixin):
|
|||
self.validationCheckBox.setTristate(False)
|
||||
self.pruneCheckBox.setTristate(False)
|
||||
|
||||
self.dontRunOnMeteredNetworksCheckBox.setChecked(profile.dont_run_on_metered_networks)
|
||||
|
||||
self.preBackupCmdLineEdit.setText(profile.pre_backup_cmd)
|
||||
self.postBackupCmdLineEdit.setText(profile.post_backup_cmd)
|
||||
self.postBackupCmdLineEdit.textEdited.connect(
|
||||
|
@ -131,3 +136,8 @@ class ScheduleTab(ScheduleBase, ScheduleUI, BackupProfileMixin):
|
|||
profile.save()
|
||||
self.app.scheduler.reload()
|
||||
self._draw_next_scheduled_backup()
|
||||
|
||||
def on_dont_run_on_metered_networks_changed(self, state):
|
||||
profile = self.profile()
|
||||
profile.dont_run_on_metered_networks = state
|
||||
profile.save()
|
||||
|
|
|
@ -28,6 +28,7 @@ def init_db(qapp):
|
|||
|
||||
profile = BackupProfileModel.get(id=1)
|
||||
profile.repo = new_repo.id
|
||||
profile.dont_run_on_metered_networks = False
|
||||
profile.save()
|
||||
|
||||
test_archive = ArchiveModel(snapshot_id='99999', name='test-archive', time=dt(2000, 1, 1, 0, 0), repo=1)
|
||||
|
|
|
@ -0,0 +1,107 @@
|
|||
import pytest
|
||||
|
||||
from vorta.network_status import darwin
|
||||
|
||||
|
||||
@pytest.mark.parametrize('getpacket_output_name, expected', [
|
||||
('normal_router', False),
|
||||
('phone', True),
|
||||
])
|
||||
def test_is_network_metered(getpacket_output_name, expected, monkeypatch):
|
||||
def mock_getpacket(device):
|
||||
assert device == 'en0'
|
||||
return GETPACKET_OUTPUTS[getpacket_output_name]
|
||||
|
||||
monkeypatch.setattr(darwin, 'call_ipconfig_getpacket', mock_getpacket)
|
||||
|
||||
result = darwin.is_network_metered('en0')
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_get_network_devices(monkeypatch):
|
||||
monkeypatch.setattr(darwin, 'call_networksetup_listallhardwareports', lambda: NETWORKSETUP_OUTPUT)
|
||||
|
||||
result = list(darwin.get_network_devices())
|
||||
assert result == ['Bluetooth-Modem', 'en0', 'en1', 'en2', 'bridge0']
|
||||
|
||||
|
||||
GETPACKET_OUTPUTS = {
|
||||
'normal_router': b"""\
|
||||
op = BOOTREPLY
|
||||
htype = 1
|
||||
flags = 0
|
||||
hlen = 6
|
||||
hops = 0
|
||||
xid = 0x8dc8db4d
|
||||
secs = 0
|
||||
ciaddr = 0.0.0.0
|
||||
yiaddr = 172.16.13.237
|
||||
siaddr = 0.0.0.0
|
||||
giaddr = 0.0.0.0
|
||||
chaddr = 8c:85:90:ad:ee:a3
|
||||
sname =
|
||||
file =
|
||||
options:
|
||||
Options count is 9
|
||||
dhcp_message_type (uint8): ACK 0x5
|
||||
subnet_mask (ip): 255.255.252.0
|
||||
router (ip_mult): {172.16.12.1}
|
||||
domain_name_server (ip_mult): {172.16.12.1, 8.8.8.8}
|
||||
domain_name (string): .
|
||||
lease_time (uint32): 0xe10
|
||||
interface_mtu (uint16): 0x5dc
|
||||
server_identifier (ip): 172.16.12.1
|
||||
end (none):
|
||||
""",
|
||||
'phone': b"""\
|
||||
op = BOOTREPLY
|
||||
htype = 1
|
||||
flags = 0
|
||||
hlen = 6
|
||||
hops = 0
|
||||
xid = 0x8dc8db4e
|
||||
secs = 0
|
||||
ciaddr = 0.0.0.0
|
||||
yiaddr = 192.168.43.223
|
||||
siaddr = 192.168.43.242
|
||||
giaddr = 0.0.0.0
|
||||
chaddr = 8c:85:90:ad:ee:a3
|
||||
sname =
|
||||
file =
|
||||
options:
|
||||
Options count is 11
|
||||
dhcp_message_type (uint8): ACK 0x5
|
||||
server_identifier (ip): 192.168.43.242
|
||||
lease_time (uint32): 0xe0f
|
||||
renewal_t1_time_value (uint32): 0x707
|
||||
rebinding_t2_time_value (uint32): 0xc4d
|
||||
subnet_mask (ip): 255.255.255.0
|
||||
broadcast_address (ip): 192.168.43.255
|
||||
router (ip_mult): {192.168.43.242}
|
||||
domain_name_server (ip_mult): {192.168.43.242}
|
||||
vendor_specific (opaque):
|
||||
0000 41 4e 44 52 4f 49 44 5f 4d 45 54 45 52 45 44 ANDROID_METERED
|
||||
"""
|
||||
}
|
||||
|
||||
NETWORKSETUP_OUTPUT = b"""\
|
||||
Hardware Port: Bluetooth DUN
|
||||
Device: Bluetooth-Modem
|
||||
Ethernet Address: N/A
|
||||
|
||||
Hardware Port: Wi-Fi
|
||||
Device: en0
|
||||
Ethernet Address: d7:02:65:7c:1e:14
|
||||
|
||||
Hardware Port: Bluetooth PAN
|
||||
Device: en1
|
||||
Ethernet Address: N/A
|
||||
|
||||
Hardware Port: Thunderbolt 1
|
||||
Device: en2
|
||||
Ethernet Address: bb:e8:c3:25:2b:12
|
||||
|
||||
Hardware Port: Thunderbolt Bridge
|
||||
Device: bridge0
|
||||
Ethernet Address: N/A
|
||||
"""
|
|
@ -0,0 +1,123 @@
|
|||
from datetime import datetime
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from vorta.network_status.abc import SystemWifiInfo
|
||||
from vorta.network_status.network_manager import NetworkManagerMonitor, NMMetered, NetworkManagerDBusAdapter, \
|
||||
ActiveConnectionInfo, decode_ssid
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_adapter():
|
||||
return MagicMock(
|
||||
spec_set=NetworkManagerDBusAdapter,
|
||||
wraps=UncallableNetworkManagerDBusAdapter()
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def nm_monitor(mock_adapter):
|
||||
return NetworkManagerMonitor(nm_adapter=mock_adapter)
|
||||
|
||||
|
||||
def test_is_network_status_available(nm_monitor):
|
||||
assert nm_monitor.is_network_status_available() is True
|
||||
|
||||
|
||||
@pytest.mark.parametrize('global_metered_status, expected', [
|
||||
(NMMetered.UNKNOWN, False),
|
||||
(NMMetered.YES, True),
|
||||
(NMMetered.NO, False),
|
||||
(NMMetered.GUESS_YES, True),
|
||||
(NMMetered.GUESS_NO, False),
|
||||
])
|
||||
def test_is_network_metered(global_metered_status, expected, nm_monitor):
|
||||
nm_monitor._nm.get_global_metered_status.return_value = global_metered_status
|
||||
|
||||
result = nm_monitor.is_network_metered()
|
||||
|
||||
assert result == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize('connection_path, connection_type, type_settings, expected', [
|
||||
('/org/freedesktop/NetworkManager/ActiveConnection/1',
|
||||
'802-11-wireless', {'ssid': bytes([84, 69, 83, 84])}, 'TEST'),
|
||||
('/org/freedesktop/NetworkManager/ActiveConnection/2',
|
||||
'802-11-ethernet', {}, None),
|
||||
])
|
||||
def test_get_current_wifi(connection_path, connection_type, type_settings, expected, nm_monitor):
|
||||
nm_monitor._nm.get_primary_connection_path.return_value = connection_path
|
||||
nm_monitor._nm.get_active_connection_info.return_value = ActiveConnectionInfo(
|
||||
connection='/org/freedesktop/NetworkManager/Settings/12',
|
||||
type=connection_type
|
||||
)
|
||||
nm_monitor._nm.get_settings.side_effect = [{connection_type: type_settings}]
|
||||
|
||||
result = nm_monitor.get_current_wifi()
|
||||
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_get_current_wifi_with_no_connection(nm_monitor):
|
||||
nm_monitor._nm.get_primary_connection_path.return_value = None
|
||||
|
||||
assert nm_monitor.get_current_wifi() is None
|
||||
|
||||
|
||||
def test_get_known_wifis(nm_monitor):
|
||||
nm_monitor._nm.get_connections_paths.return_value = ['/org/freedesktop/NetworkManager/Settings/12']
|
||||
nm_monitor._nm.get_settings.return_value = {
|
||||
'connection': {'timestamp': 1597303736},
|
||||
'802-11-wireless': {'ssid': [84, 69, 83, 84]},
|
||||
}
|
||||
|
||||
result = nm_monitor.get_known_wifis()
|
||||
|
||||
assert result == [SystemWifiInfo(
|
||||
ssid='TEST',
|
||||
last_connected=datetime(2020, 8, 13, 7, 28, 56),
|
||||
)]
|
||||
|
||||
|
||||
def test_get_known_wifis_with_never_used_connection(nm_monitor):
|
||||
nm_monitor._nm.get_connections_paths.return_value = ['/org/freedesktop/NetworkManager/Settings/12']
|
||||
nm_monitor._nm.get_settings.return_value = {
|
||||
'connection': {},
|
||||
'802-11-wireless': {'ssid': [84, 69, 83, 84]},
|
||||
}
|
||||
|
||||
result = nm_monitor.get_known_wifis()
|
||||
|
||||
assert result == [SystemWifiInfo(
|
||||
ssid='TEST',
|
||||
last_connected=None,
|
||||
)]
|
||||
|
||||
|
||||
def test_get_known_wifis_with_no_wifi_connections(nm_monitor):
|
||||
nm_monitor._nm.get_connections_paths.return_value = ['/org/freedesktop/NetworkManager/Settings/12']
|
||||
nm_monitor._nm.get_settings.return_value = {
|
||||
'connection': {},
|
||||
'802-11-ethernet': {},
|
||||
}
|
||||
|
||||
result = nm_monitor.get_known_wifis()
|
||||
|
||||
assert result == []
|
||||
|
||||
|
||||
@pytest.mark.parametrize('ssid_bytes, expected', [
|
||||
([84, 69, 83, 84], 'TEST'),
|
||||
([240, 159, 150, 150], '🖖'),
|
||||
([0, 1, 2, 10, 34, 39], '\\x00\\x01\\x02\\n"\''),
|
||||
])
|
||||
def test_decode_ssid(ssid_bytes, expected):
|
||||
result = decode_ssid(ssid_bytes)
|
||||
assert result == expected
|
||||
|
||||
|
||||
class UncallableNetworkManagerDBusAdapter(NetworkManagerDBusAdapter):
|
||||
def __init__(self):
|
||||
# Skip parent setup, this way none of the DBus calls can happen in tests
|
||||
super(NetworkManagerDBusAdapter, self).__init__(parent=None)
|
Loading…
Reference in New Issue