mirror of
https://github.com/borgbase/vorta
synced 2025-01-02 21:25:48 +00:00
added borg logic for validating urls #1631
This commit is contained in:
parent
b2cf5b1fc9
commit
f7fa3fa58d
2 changed files with 198 additions and 1 deletions
|
@ -530,3 +530,190 @@ def func(x):
|
|||
return i, item
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class Location:
|
||||
"""Object representing a repository location"""
|
||||
|
||||
# user must not contain "@", ":" or "/".
|
||||
# Quoting adduser error message:
|
||||
# "To avoid problems, the username should consist only of letters, digits,
|
||||
# underscores, periods, at signs and dashes, and not start with a dash
|
||||
# (as defined by IEEE Std 1003.1-2001)."
|
||||
# We use "@" as separator between username and hostname, so we must
|
||||
# disallow it within the pure username part.
|
||||
optional_user_re = r"""
|
||||
(?:(?P<user>[^@:/]+)@)?
|
||||
"""
|
||||
|
||||
# path must not contain :: (it ends at :: or string end), but may contain single colons.
|
||||
# to avoid ambiguities with other regexes, it must also not start with ":" nor with "//" nor with "ssh://".
|
||||
local_path_re = r"""
|
||||
(?!(:|//|ssh://|socket://|smb://)) # not starting with ":" or // or ssh:// or socket://
|
||||
(?P<path>([^:]|(:(?!:)))+) # any chars, but no "::"
|
||||
"""
|
||||
|
||||
# file_path must not contain :: (it ends at :: or string end), but may contain single colons.
|
||||
# it must start with a / and that slash is part of the path.
|
||||
file_path_re = r"""
|
||||
(?P<path>(([^/]*)/([^:]|(:(?!:)))+)) # start opt. servername, then /, then any chars, but no "::"
|
||||
"""
|
||||
|
||||
# abs_path must not contain :: (it ends at :: or string end), but may contain single colons.
|
||||
# it must start with a / and that slash is part of the path.
|
||||
abs_path_re = r"""
|
||||
(?P<path>(/([^:]|(:(?!:)))+)) # start with /, then any chars, but no "::"
|
||||
"""
|
||||
|
||||
# host NAME, or host IP ADDRESS (v4 or v6, v6 must be in square brackets)
|
||||
host_re = r"""
|
||||
(?P<host>(
|
||||
(?!\[)[^:/]+(?<!\]) # hostname or v4 addr, not containing : or / (does not match v6 addr: no brackets!)
|
||||
|
|
||||
\[[0-9a-fA-F:.]+\]) # ipv6 address in brackets
|
||||
)
|
||||
"""
|
||||
|
||||
# regexes for misc. kinds of supported location specifiers:
|
||||
ssh_re = re.compile(
|
||||
r"""
|
||||
(?P<proto>ssh):// # ssh://
|
||||
"""
|
||||
+ optional_user_re
|
||||
+ host_re
|
||||
+ r""" # user@ (optional), host name or address
|
||||
(?::(?P<port>\d+))? # :port (optional)
|
||||
"""
|
||||
+ abs_path_re,
|
||||
re.VERBOSE,
|
||||
) # path
|
||||
|
||||
socket_re = re.compile(
|
||||
r"""
|
||||
(?P<proto>socket):// # socket://
|
||||
"""
|
||||
+ abs_path_re,
|
||||
re.VERBOSE,
|
||||
) # path
|
||||
|
||||
file_re = re.compile(
|
||||
r"""
|
||||
(?P<proto>file):// # file://
|
||||
"""
|
||||
+ file_path_re,
|
||||
re.VERBOSE,
|
||||
) # servername/path or path
|
||||
|
||||
local_re = re.compile(local_path_re, re.VERBOSE) # local path
|
||||
|
||||
win_file_re = re.compile(
|
||||
r"""
|
||||
(?:file://)? # optional file protocol
|
||||
(?P<path>
|
||||
(?:[a-zA-Z]:)? # Drive letter followed by a colon (optional)
|
||||
(?:[^:]+) # Anything which does not contain a :, at least one char
|
||||
)
|
||||
""",
|
||||
re.VERBOSE,
|
||||
)
|
||||
|
||||
def __init__(self, text="", overrides={}, other=False):
|
||||
self.repo_env_var = "BORG_OTHER_REPO" if other else "BORG_REPO"
|
||||
self.valid = False
|
||||
self.proto = None
|
||||
self.user = None
|
||||
self._host = None
|
||||
self.port = None
|
||||
self.path = None
|
||||
self.raw = None
|
||||
self.processed = None
|
||||
self.parse(text, overrides)
|
||||
|
||||
def parse(self, text, overrides={}):
|
||||
if not text:
|
||||
# we did not get a text to parse, so we try to fetch from the environment
|
||||
text = os.environ.get(self.repo_env_var)
|
||||
if text is None:
|
||||
return
|
||||
|
||||
self.raw = text # as given by user, might contain placeholders
|
||||
# self.processed = replace_placeholders(self.raw, overrides) # after placeholder replacement
|
||||
valid = self._parse(text)
|
||||
if valid:
|
||||
self.valid = True
|
||||
else:
|
||||
self.valid = False
|
||||
|
||||
def _parse(self, text):
|
||||
def normpath_special(p):
|
||||
# avoid that normpath strips away our relative path hack and even makes p absolute
|
||||
relative = p.startswith("/./")
|
||||
p = os.path.normpath(p)
|
||||
return ("/." + p) if relative else p
|
||||
|
||||
m = self.ssh_re.match(text)
|
||||
if m:
|
||||
self.proto = m.group("proto")
|
||||
self.user = m.group("user")
|
||||
self._host = m.group("host")
|
||||
self.port = m.group("port") and int(m.group("port")) or None
|
||||
self.path = normpath_special(m.group("path"))
|
||||
return True
|
||||
m = self.file_re.match(text)
|
||||
if m:
|
||||
self.proto = m.group("proto")
|
||||
self.path = normpath_special(m.group("path"))
|
||||
return True
|
||||
m = self.socket_re.match(text)
|
||||
if m:
|
||||
self.proto = m.group("proto")
|
||||
self.path = normpath_special(m.group("path"))
|
||||
return True
|
||||
m = self.local_re.match(text)
|
||||
if m:
|
||||
self.proto = "file"
|
||||
self.path = normpath_special(m.group("path"))
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def __str__(self):
|
||||
items = [
|
||||
"proto=%r" % self.proto,
|
||||
"user=%r" % self.user,
|
||||
"host=%r" % self.host,
|
||||
"port=%r" % self.port,
|
||||
"path=%r" % self.path,
|
||||
]
|
||||
return ", ".join(items)
|
||||
|
||||
def __repr__(self):
|
||||
return "Location(%s)" % self
|
||||
|
||||
@property
|
||||
def host(self):
|
||||
# strip square brackets used for IPv6 addrs
|
||||
if self._host is not None:
|
||||
return self._host.lstrip("[").rstrip("]")
|
||||
|
||||
def canonical_path(self):
|
||||
if self.proto in ("file", "socket"):
|
||||
return self.path
|
||||
else:
|
||||
if self.path and self.path.startswith("~"):
|
||||
path = "/" + self.path # /~/x = path x relative to home dir
|
||||
elif self.path and not self.path.startswith("/"):
|
||||
path = "/./" + self.path # /./x = path x relative to cwd
|
||||
else:
|
||||
path = self.path
|
||||
return "ssh://{}{}{}{}".format(
|
||||
f"{self.user}@" if self.user else "",
|
||||
self._host, # needed for ipv6 addrs
|
||||
f":{self.port}" if self.port else "",
|
||||
path,
|
||||
)
|
||||
|
||||
|
||||
def is_valid_url(url):
|
||||
location = Location(url)
|
||||
return True if location.valid else False
|
||||
|
|
|
@ -14,7 +14,13 @@
|
|||
from vorta.borg.init import BorgInitJob
|
||||
from vorta.keyring.abc import VortaKeyring
|
||||
from vorta.store.models import RepoModel
|
||||
from vorta.utils import borg_compat, choose_file_dialog, get_asset, get_private_keys
|
||||
from vorta.utils import (
|
||||
borg_compat,
|
||||
choose_file_dialog,
|
||||
get_asset,
|
||||
get_private_keys,
|
||||
is_valid_url,
|
||||
)
|
||||
from vorta.views.partials.password_input import PasswordInput, PasswordLineEdit
|
||||
from vorta.views.utils import get_colored_icon
|
||||
|
||||
|
@ -102,6 +108,10 @@ def init_ssh_key(self):
|
|||
|
||||
def validate(self):
|
||||
"""Pre-flight check for valid input and borg binary."""
|
||||
if not is_valid_url(self.values['repo_url']):
|
||||
self._set_status(self.tr('Please use a supported URL format.'))
|
||||
return False
|
||||
|
||||
if self.is_remote_repo and not re.match(r'.+:.+', self.values['repo_url']):
|
||||
self._set_status(self.tr('Please enter a valid repo URL or select a local path.'))
|
||||
return False
|
||||
|
|
Loading…
Reference in a new issue