mirror of
https://github.com/morpheus65535/bazarr
synced 2025-01-03 13:35:18 +00:00
648 lines
19 KiB
Python
648 lines
19 KiB
Python
from __future__ import unicode_literals
|
|
|
|
import argparse
|
|
import sys
|
|
from codecs import open as codecs_open
|
|
from functools import lru_cache
|
|
from os.path import isabs
|
|
from typing import Dict, List, Optional, Tuple, Type, Union
|
|
from urllib.parse import SplitResult, urlsplit
|
|
|
|
from .base import BaseTLDSourceParser, Registry
|
|
from .exceptions import (
|
|
TldBadUrl,
|
|
TldDomainNotFound,
|
|
TldImproperlyConfigured,
|
|
TldIOError,
|
|
)
|
|
from .helpers import project_dir
|
|
from .result import Result
|
|
from .trie import Trie
|
|
|
|
# codecs_open = open
|
|
|
|
|
|
__author__ = "Artur Barseghyan"
|
|
__copyright__ = "2013-2023 Artur Barseghyan"
|
|
__license__ = "MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later"
|
|
__all__ = (
|
|
"BaseMozillaTLDSourceParser",
|
|
"get_fld",
|
|
"get_tld",
|
|
"get_tld_names",
|
|
"get_tld_names_container",
|
|
"is_tld",
|
|
"MozillaTLDSourceParser",
|
|
"MozillaPublicOnlyTLDSourceParser",
|
|
"parse_tld",
|
|
"pop_tld_names_container",
|
|
"process_url",
|
|
"reset_tld_names",
|
|
"Result",
|
|
"tld_names",
|
|
"update_tld_names",
|
|
"update_tld_names_cli",
|
|
"update_tld_names_container",
|
|
)
|
|
|
|
tld_names: Dict[str, Trie] = {}
|
|
|
|
|
|
def get_tld_names_container() -> Dict[str, Trie]:
|
|
"""Get container of all tld names.
|
|
|
|
:return:
|
|
:rtype dict:
|
|
"""
|
|
global tld_names
|
|
return tld_names
|
|
|
|
|
|
def update_tld_names_container(
|
|
tld_names_local_path: str, trie_obj: Trie
|
|
) -> None:
|
|
"""Update TLD Names container item.
|
|
|
|
:param tld_names_local_path:
|
|
:param trie_obj:
|
|
:return:
|
|
"""
|
|
global tld_names
|
|
# tld_names.update({tld_names_local_path: trie_obj})
|
|
tld_names[tld_names_local_path] = trie_obj
|
|
|
|
|
|
def pop_tld_names_container(tld_names_local_path: str) -> None:
|
|
"""Remove TLD names container item.
|
|
|
|
:param tld_names_local_path:
|
|
:return:
|
|
"""
|
|
global tld_names
|
|
tld_names.pop(tld_names_local_path, None)
|
|
|
|
|
|
@lru_cache(maxsize=128, typed=True)
|
|
def update_tld_names(
|
|
fail_silently: bool = False, parser_uid: str = None
|
|
) -> bool:
|
|
"""Update TLD names.
|
|
|
|
:param fail_silently:
|
|
:param parser_uid:
|
|
:return:
|
|
"""
|
|
results: List[bool] = []
|
|
results_append = results.append
|
|
if parser_uid:
|
|
parser_cls = Registry.get(parser_uid, None)
|
|
if parser_cls and parser_cls.source_url:
|
|
results_append(
|
|
parser_cls.update_tld_names(fail_silently=fail_silently)
|
|
)
|
|
else:
|
|
for parser_uid, parser_cls in Registry.items():
|
|
if parser_cls and parser_cls.source_url:
|
|
results_append(
|
|
parser_cls.update_tld_names(fail_silently=fail_silently)
|
|
)
|
|
|
|
return all(results)
|
|
|
|
|
|
def update_tld_names_cli() -> int:
|
|
"""CLI wrapper for update_tld_names.
|
|
|
|
Since update_tld_names returns True on success, we need to negate the
|
|
result to match CLI semantics.
|
|
"""
|
|
parser = argparse.ArgumentParser(description="Update TLD names")
|
|
parser.add_argument(
|
|
"parser_uid",
|
|
nargs="?",
|
|
default=None,
|
|
help="UID of the parser to update TLD names for.",
|
|
)
|
|
parser.add_argument(
|
|
"--fail-silently",
|
|
dest="fail_silently",
|
|
default=False,
|
|
action="store_true",
|
|
help="Fail silently",
|
|
)
|
|
args = parser.parse_args(sys.argv[1:])
|
|
parser_uid = args.parser_uid
|
|
fail_silently = args.fail_silently
|
|
return int(
|
|
not update_tld_names(parser_uid=parser_uid, fail_silently=fail_silently)
|
|
)
|
|
|
|
|
|
def get_tld_names(
|
|
fail_silently: bool = False,
|
|
retry_count: int = 0,
|
|
parser_class: Type[BaseTLDSourceParser] = None,
|
|
) -> Dict[str, Trie]:
|
|
"""Build the ``tlds`` list if empty. Recursive.
|
|
|
|
:param fail_silently: If set to True, no exceptions are raised and None
|
|
is returned on failure.
|
|
:param retry_count: If greater than 1, we raise an exception in order
|
|
to avoid infinite loops.
|
|
:param parser_class:
|
|
:type fail_silently: bool
|
|
:type retry_count: int
|
|
:type parser_class: BaseTLDSourceParser
|
|
:return: List of TLD names
|
|
:rtype: obj:`tld.utils.Trie`
|
|
"""
|
|
if not parser_class:
|
|
parser_class = MozillaTLDSourceParser
|
|
|
|
return parser_class.get_tld_names(
|
|
fail_silently=fail_silently, retry_count=retry_count
|
|
)
|
|
|
|
|
|
# **************************************************************************
|
|
# **************************** Parser classes ******************************
|
|
# **************************************************************************
|
|
|
|
|
|
class BaseMozillaTLDSourceParser(BaseTLDSourceParser):
|
|
@classmethod
|
|
def get_tld_names(
|
|
cls, fail_silently: bool = False, retry_count: int = 0
|
|
) -> Optional[Dict[str, Trie]]:
|
|
"""Parse.
|
|
|
|
:param fail_silently:
|
|
:param retry_count:
|
|
:return:
|
|
"""
|
|
if retry_count > 1:
|
|
if fail_silently:
|
|
return None
|
|
else:
|
|
raise TldIOError
|
|
|
|
global tld_names
|
|
_tld_names = tld_names
|
|
# _tld_names = get_tld_names_container()
|
|
|
|
# If already loaded, return
|
|
if (
|
|
cls.local_path in _tld_names
|
|
and _tld_names[cls.local_path] is not None
|
|
):
|
|
return _tld_names
|
|
|
|
try:
|
|
# Load the TLD names file
|
|
if isabs(cls.local_path):
|
|
local_path = cls.local_path
|
|
else:
|
|
local_path = project_dir(cls.local_path)
|
|
local_file = codecs_open(local_path, "r", encoding="utf8")
|
|
trie = Trie()
|
|
trie_add = trie.add # Performance opt
|
|
# Make a list of it all, strip all garbage
|
|
private_section = False
|
|
include_private = cls.include_private
|
|
|
|
for line in local_file:
|
|
if "===BEGIN PRIVATE DOMAINS===" in line:
|
|
private_section = True
|
|
|
|
if private_section and not include_private:
|
|
break
|
|
|
|
# Puny code TLD names
|
|
if "// xn--" in line:
|
|
line = line.split()[1]
|
|
|
|
if line[0] in ("/", "\n"):
|
|
continue
|
|
|
|
trie_add(f"{line.strip()}", private=private_section)
|
|
|
|
update_tld_names_container(cls.local_path, trie)
|
|
|
|
local_file.close()
|
|
except IOError:
|
|
# Grab the file
|
|
cls.update_tld_names(fail_silently=fail_silently)
|
|
# Increment ``retry_count`` in order to avoid infinite loops
|
|
retry_count += 1
|
|
# Run again
|
|
return cls.get_tld_names(
|
|
fail_silently=fail_silently, retry_count=retry_count
|
|
)
|
|
except Exception as err:
|
|
if fail_silently:
|
|
return None
|
|
else:
|
|
raise err
|
|
finally:
|
|
try:
|
|
local_file.close()
|
|
except Exception:
|
|
pass
|
|
|
|
return _tld_names
|
|
|
|
|
|
class MozillaTLDSourceParser(BaseMozillaTLDSourceParser):
|
|
"""Mozilla TLD source."""
|
|
|
|
uid: str = "mozilla"
|
|
source_url: str = "https://publicsuffix.org/list/public_suffix_list.dat"
|
|
local_path: str = "res/effective_tld_names.dat.txt"
|
|
|
|
|
|
class MozillaPublicOnlyTLDSourceParser(BaseMozillaTLDSourceParser):
|
|
"""Mozilla TLD source."""
|
|
|
|
uid: str = "mozilla_public_only"
|
|
source_url: str = (
|
|
"https://publicsuffix.org/list/public_suffix_list.dat?publiconly"
|
|
)
|
|
local_path: str = "res/effective_tld_names_public_only.dat.txt"
|
|
include_private: bool = False
|
|
|
|
|
|
# **************************************************************************
|
|
# **************************** Core functions ******************************
|
|
# **************************************************************************
|
|
|
|
|
|
def process_url(
|
|
url: Union[str, SplitResult],
|
|
fail_silently: bool = False,
|
|
fix_protocol: bool = False,
|
|
search_public: bool = True,
|
|
search_private: bool = True,
|
|
parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser,
|
|
) -> Union[Tuple[List[str], int, SplitResult], Tuple[None, None, SplitResult]]:
|
|
"""Process URL.
|
|
|
|
:param parser_class:
|
|
:param url:
|
|
:param fail_silently:
|
|
:param fix_protocol:
|
|
:param search_public:
|
|
:param search_private:
|
|
:return:
|
|
"""
|
|
if not (search_public or search_private):
|
|
raise TldImproperlyConfigured(
|
|
"Either `search_public` or `search_private` (or both) shall be "
|
|
"set to True."
|
|
)
|
|
|
|
# Init
|
|
_tld_names = get_tld_names(
|
|
fail_silently=fail_silently, parser_class=parser_class
|
|
)
|
|
|
|
if not isinstance(url, SplitResult):
|
|
if fix_protocol and not url.startswith(("//", "http://", "https://")):
|
|
url = f"https://{url}"
|
|
|
|
# Get parsed URL as we might need it later
|
|
try:
|
|
parsed_url = urlsplit(url)
|
|
except ValueError as e:
|
|
if fail_silently:
|
|
return None, None, url
|
|
else:
|
|
raise e
|
|
else:
|
|
parsed_url = url
|
|
|
|
# Get (sub) domain name
|
|
domain_name = parsed_url.hostname
|
|
|
|
if not domain_name:
|
|
if fail_silently:
|
|
return None, None, parsed_url
|
|
else:
|
|
raise TldBadUrl(url=url)
|
|
|
|
domain_name = domain_name.lower()
|
|
|
|
# This will correctly handle dots at the end of domain name in URLs like
|
|
# https://github.com............/barseghyanartur/tld/
|
|
if domain_name.endswith("."):
|
|
domain_name = domain_name.rstrip(".")
|
|
|
|
domain_parts = domain_name.split(".")
|
|
tld_names_local_path = parser_class.local_path
|
|
|
|
# Now we query our Trie iterating on the domain parts in reverse order
|
|
node = _tld_names[tld_names_local_path].root
|
|
current_length = 0
|
|
tld_length = 0
|
|
match = None
|
|
len_domain_parts = len(domain_parts)
|
|
for i in range(len_domain_parts - 1, -1, -1):
|
|
part = domain_parts[i]
|
|
|
|
# Cannot go deeper
|
|
if node.children is None:
|
|
break
|
|
|
|
# Exception
|
|
if part == node.exception:
|
|
break
|
|
|
|
child = node.children.get(part)
|
|
|
|
# Wildcards
|
|
if child is None:
|
|
child = node.children.get("*")
|
|
|
|
# If the current part is not in current node's children, we can stop
|
|
if child is None:
|
|
break
|
|
|
|
# Else we move deeper and increment our tld offset
|
|
current_length += 1
|
|
node = child
|
|
|
|
if node.leaf:
|
|
tld_length = current_length
|
|
match = node
|
|
|
|
# Checking the node we finished on is a leaf and is one we allow
|
|
if (
|
|
(match is None)
|
|
or (not match.leaf)
|
|
or (not search_public and not match.private)
|
|
or (not search_private and match.private)
|
|
):
|
|
if fail_silently:
|
|
return None, None, parsed_url
|
|
else:
|
|
raise TldDomainNotFound(domain_name=domain_name)
|
|
|
|
if len_domain_parts == tld_length:
|
|
non_zero_i = -1 # hostname = tld
|
|
else:
|
|
non_zero_i = max(1, len_domain_parts - tld_length)
|
|
|
|
return domain_parts, non_zero_i, parsed_url
|
|
|
|
|
|
def get_fld(
|
|
url: Union[str, SplitResult],
|
|
fail_silently: bool = False,
|
|
fix_protocol: bool = False,
|
|
search_public: bool = True,
|
|
search_private: bool = True,
|
|
parser_class: Type[BaseTLDSourceParser] = None,
|
|
**kwargs,
|
|
) -> Optional[str]:
|
|
"""Extract the first level domain.
|
|
|
|
Extract the top level domain based on the mozilla's effective TLD names
|
|
dat file. Returns a string. May throw ``TldBadUrl`` or
|
|
``TldDomainNotFound`` exceptions if there's bad URL provided or no TLD
|
|
match found respectively.
|
|
|
|
:param url: URL to get top level domain from.
|
|
:param fail_silently: If set to True, no exceptions are raised and None
|
|
is returned on failure.
|
|
:param fix_protocol: If set to True, missing or wrong protocol is
|
|
ignored (https is appended instead).
|
|
:param search_public: If set to True, search in public domains.
|
|
:param search_private: If set to True, search in private domains.
|
|
:param parser_class:
|
|
:type url: str | SplitResult
|
|
:type fail_silently: bool
|
|
:type fix_protocol: bool
|
|
:type search_public: bool
|
|
:type search_private: bool
|
|
:return: String with top level domain (if ``as_object`` argument
|
|
is set to False) or a ``tld.utils.Result`` object (if ``as_object``
|
|
argument is set to True); returns None on failure.
|
|
:rtype: str
|
|
"""
|
|
if "as_object" in kwargs:
|
|
raise TldImproperlyConfigured(
|
|
"`as_object` argument is deprecated for `get_fld`. Use `get_tld` "
|
|
"instead."
|
|
)
|
|
|
|
if not parser_class:
|
|
parser_class = (
|
|
MozillaTLDSourceParser
|
|
if search_private
|
|
else MozillaPublicOnlyTLDSourceParser
|
|
)
|
|
|
|
domain_parts, non_zero_i, parsed_url = process_url(
|
|
url=url,
|
|
fail_silently=fail_silently,
|
|
fix_protocol=fix_protocol,
|
|
search_public=search_public,
|
|
search_private=search_private,
|
|
parser_class=parser_class,
|
|
)
|
|
|
|
if domain_parts is None:
|
|
return None
|
|
|
|
# This should be None when domain_parts is None
|
|
# but mypy isn't quite smart enough to figure that out yet
|
|
assert non_zero_i is not None
|
|
if non_zero_i < 0:
|
|
# hostname = tld
|
|
return parsed_url.hostname
|
|
|
|
return ".".join(domain_parts[non_zero_i - 1 :])
|
|
|
|
|
|
def get_tld(
|
|
url: Union[str, SplitResult],
|
|
fail_silently: bool = False,
|
|
as_object: bool = False,
|
|
fix_protocol: bool = False,
|
|
search_public: bool = True,
|
|
search_private: bool = True,
|
|
parser_class: Type[BaseTLDSourceParser] = None,
|
|
) -> Optional[Union[str, Result]]:
|
|
"""Extract the top level domain.
|
|
|
|
Extract the top level domain based on the mozilla's effective TLD names
|
|
dat file. Returns a string. May throw ``TldBadUrl`` or
|
|
``TldDomainNotFound`` exceptions if there's bad URL provided or no TLD
|
|
match found respectively.
|
|
|
|
:param url: URL to get top level domain from.
|
|
:param fail_silently: If set to True, no exceptions are raised and None
|
|
is returned on failure.
|
|
:param as_object: If set to True, ``tld.utils.Result`` object is returned,
|
|
``domain``, ``suffix`` and ``tld`` properties.
|
|
:param fix_protocol: If set to True, missing or wrong protocol is
|
|
ignored (https is appended instead).
|
|
:param search_public: If set to True, search in public domains.
|
|
:param search_private: If set to True, search in private domains.
|
|
:param parser_class:
|
|
:type url: str | SplitResult
|
|
:type fail_silently: bool
|
|
:type as_object: bool
|
|
:type fix_protocol: bool
|
|
:type search_public: bool
|
|
:type search_private: bool
|
|
:return: String with top level domain (if ``as_object`` argument
|
|
is set to False) or a ``tld.utils.Result`` object (if ``as_object``
|
|
argument is set to True); returns None on failure.
|
|
:rtype: str
|
|
"""
|
|
if not parser_class:
|
|
parser_class = (
|
|
MozillaTLDSourceParser
|
|
if search_private
|
|
else MozillaPublicOnlyTLDSourceParser
|
|
)
|
|
|
|
domain_parts, non_zero_i, parsed_url = process_url(
|
|
url=url,
|
|
fail_silently=fail_silently,
|
|
fix_protocol=fix_protocol,
|
|
search_public=search_public,
|
|
search_private=search_private,
|
|
parser_class=parser_class,
|
|
)
|
|
|
|
if domain_parts is None:
|
|
return None
|
|
|
|
# This should be None when domain_parts is None
|
|
# but mypy isn't quite smart enough to figure that out yet
|
|
assert non_zero_i is not None
|
|
|
|
if not as_object:
|
|
if non_zero_i < 0:
|
|
# hostname = tld
|
|
return parsed_url.hostname
|
|
return ".".join(domain_parts[non_zero_i:])
|
|
|
|
if non_zero_i < 0:
|
|
# hostname = tld
|
|
subdomain = ""
|
|
domain = ""
|
|
# This is checked in `process_url`, but the type is
|
|
# ambiguous (Optional[str]) so this assertion is just to satisfy mypy
|
|
assert parsed_url.hostname is not None, "No hostname in URL"
|
|
_tld = parsed_url.hostname
|
|
else:
|
|
subdomain = ".".join(domain_parts[: non_zero_i - 1])
|
|
domain = ".".join(domain_parts[non_zero_i - 1 : non_zero_i])
|
|
_tld = ".".join(domain_parts[non_zero_i:])
|
|
|
|
return Result(
|
|
subdomain=subdomain, domain=domain, tld=_tld, parsed_url=parsed_url
|
|
)
|
|
|
|
|
|
def parse_tld(
|
|
url: Union[str, SplitResult],
|
|
fail_silently: bool = False,
|
|
fix_protocol: bool = False,
|
|
search_public: bool = True,
|
|
search_private: bool = True,
|
|
parser_class: Type[BaseTLDSourceParser] = None,
|
|
) -> Union[Tuple[None, None, None], Tuple[str, str, str]]:
|
|
"""Parse TLD into parts.
|
|
|
|
:param url:
|
|
:param fail_silently:
|
|
:param fix_protocol:
|
|
:param search_public:
|
|
:param search_private:
|
|
:param parser_class:
|
|
:return: Tuple (tld, domain, subdomain)
|
|
:rtype: tuple
|
|
"""
|
|
if not parser_class:
|
|
parser_class = (
|
|
MozillaTLDSourceParser
|
|
if search_private
|
|
else MozillaPublicOnlyTLDSourceParser
|
|
)
|
|
|
|
try:
|
|
obj = get_tld(
|
|
url,
|
|
fail_silently=fail_silently,
|
|
as_object=True,
|
|
fix_protocol=fix_protocol,
|
|
search_public=search_public,
|
|
search_private=search_private,
|
|
parser_class=parser_class,
|
|
)
|
|
if obj is None:
|
|
return None, None, None
|
|
|
|
return obj.tld, obj.domain, obj.subdomain # type: ignore
|
|
|
|
except (TldBadUrl, TldDomainNotFound, TldImproperlyConfigured, TldIOError):
|
|
pass
|
|
|
|
return None, None, None
|
|
|
|
|
|
def is_tld(
|
|
value: Union[str, SplitResult],
|
|
search_public: bool = True,
|
|
search_private: bool = True,
|
|
parser_class: Type[BaseTLDSourceParser] = None,
|
|
) -> bool:
|
|
"""Check if given URL is tld.
|
|
|
|
:param value: URL to get top level domain from.
|
|
:param search_public: If set to True, search in public domains.
|
|
:param search_private: If set to True, search in private domains.
|
|
:param parser_class:
|
|
:type value: str
|
|
:type search_public: bool
|
|
:type search_private: bool
|
|
:return:
|
|
:rtype: bool
|
|
"""
|
|
if not parser_class:
|
|
parser_class = (
|
|
MozillaTLDSourceParser
|
|
if search_private
|
|
else MozillaPublicOnlyTLDSourceParser
|
|
)
|
|
|
|
_tld = get_tld(
|
|
url=value,
|
|
fail_silently=True,
|
|
fix_protocol=True,
|
|
search_public=search_public,
|
|
search_private=search_private,
|
|
parser_class=parser_class,
|
|
)
|
|
return value == _tld
|
|
|
|
|
|
def reset_tld_names(tld_names_local_path: str = None) -> None:
|
|
"""Reset the ``tld_names`` to empty value.
|
|
|
|
If ``tld_names_local_path`` is given, removes specified
|
|
entry from ``tld_names`` instead.
|
|
|
|
:param tld_names_local_path:
|
|
:type tld_names_local_path: str
|
|
:return:
|
|
"""
|
|
|
|
if tld_names_local_path:
|
|
pop_tld_names_container(tld_names_local_path)
|
|
else:
|
|
global tld_names
|
|
tld_names = {}
|