mirror of https://github.com/morpheus65535/bazarr
625 lines
18 KiB
Python
625 lines
18 KiB
Python
|
from __future__ import unicode_literals
|
||
|
import argparse
|
||
|
from codecs import open as codecs_open
|
||
|
from functools import lru_cache
|
||
|
# codecs_open = open
|
||
|
from os.path import isabs
|
||
|
import sys
|
||
|
from typing import Dict, Type, Union, Tuple, List, Optional
|
||
|
from urllib.parse import urlsplit, SplitResult
|
||
|
|
||
|
from .base import BaseTLDSourceParser
|
||
|
from .exceptions import (
|
||
|
TldBadUrl,
|
||
|
TldDomainNotFound,
|
||
|
TldImproperlyConfigured,
|
||
|
TldIOError,
|
||
|
)
|
||
|
from .helpers import project_dir
|
||
|
from .trie import Trie
|
||
|
from .registry import Registry
|
||
|
from .result import Result
|
||
|
|
||
|
__author__ = 'Artur Barseghyan'
|
||
|
__copyright__ = '2013-2020 Artur Barseghyan'
|
||
|
__license__ = 'MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later'
|
||
|
__all__ = (
|
||
|
'BaseMozillaTLDSourceParser',
|
||
|
'get_fld',
|
||
|
'get_tld',
|
||
|
'get_tld_names',
|
||
|
'get_tld_names_container',
|
||
|
'is_tld',
|
||
|
'MozillaTLDSourceParser',
|
||
|
'parse_tld',
|
||
|
'pop_tld_names_container',
|
||
|
'process_url',
|
||
|
'reset_tld_names',
|
||
|
'Result',
|
||
|
'tld_names',
|
||
|
'update_tld_names',
|
||
|
'update_tld_names_cli',
|
||
|
'update_tld_names_container',
|
||
|
)
|
||
|
|
||
|
tld_names: Dict[str, Trie] = {}
|
||
|
|
||
|
|
||
|
def get_tld_names_container() -> Dict[str, Trie]:
|
||
|
"""Get container of all tld names.
|
||
|
|
||
|
:return:
|
||
|
:rtype dict:
|
||
|
"""
|
||
|
global tld_names
|
||
|
return tld_names
|
||
|
|
||
|
|
||
|
def update_tld_names_container(tld_names_local_path: str,
|
||
|
trie_obj: Trie) -> None:
|
||
|
"""Update TLD Names container item.
|
||
|
|
||
|
:param tld_names_local_path:
|
||
|
:param trie_obj:
|
||
|
:return:
|
||
|
"""
|
||
|
global tld_names
|
||
|
# tld_names.update({tld_names_local_path: trie_obj})
|
||
|
tld_names[tld_names_local_path] = trie_obj
|
||
|
|
||
|
|
||
|
def pop_tld_names_container(tld_names_local_path: str) -> None:
|
||
|
"""Remove TLD names container item.
|
||
|
|
||
|
:param tld_names_local_path:
|
||
|
:return:
|
||
|
"""
|
||
|
global tld_names
|
||
|
tld_names.pop(tld_names_local_path, None)
|
||
|
|
||
|
|
||
|
@lru_cache(maxsize=128, typed=True)
|
||
|
def update_tld_names(
|
||
|
fail_silently: bool = False,
|
||
|
parser_uid: str = None
|
||
|
) -> bool:
|
||
|
"""Update TLD names.
|
||
|
|
||
|
:param fail_silently:
|
||
|
:param parser_uid:
|
||
|
:return:
|
||
|
"""
|
||
|
results: List[bool] = []
|
||
|
results_append = results.append
|
||
|
if parser_uid:
|
||
|
parser_cls = Registry.get(parser_uid, None)
|
||
|
if parser_cls and parser_cls.source_url:
|
||
|
results_append(
|
||
|
parser_cls.update_tld_names(fail_silently=fail_silently)
|
||
|
)
|
||
|
else:
|
||
|
for parser_uid, parser_cls in Registry.items():
|
||
|
if parser_cls and parser_cls.source_url:
|
||
|
results_append(
|
||
|
parser_cls.update_tld_names(fail_silently=fail_silently)
|
||
|
)
|
||
|
|
||
|
return all(results)
|
||
|
|
||
|
|
||
|
def update_tld_names_cli() -> int:
|
||
|
"""CLI wrapper for update_tld_names.
|
||
|
|
||
|
Since update_tld_names returns True on success, we need to negate the
|
||
|
result to match CLI semantics.
|
||
|
"""
|
||
|
parser = argparse.ArgumentParser(description='Update TLD names')
|
||
|
parser.add_argument(
|
||
|
'parser_uid',
|
||
|
nargs='?',
|
||
|
default=None,
|
||
|
help="UID of the parser to update TLD names for.",
|
||
|
)
|
||
|
parser.add_argument(
|
||
|
'--fail-silently',
|
||
|
dest="fail_silently",
|
||
|
default=False,
|
||
|
action='store_true',
|
||
|
help="Fail silently",
|
||
|
)
|
||
|
args = parser.parse_args(sys.argv[1:])
|
||
|
parser_uid = args.parser_uid
|
||
|
fail_silently = args.fail_silently
|
||
|
return int(
|
||
|
not update_tld_names(
|
||
|
parser_uid=parser_uid,
|
||
|
fail_silently=fail_silently
|
||
|
)
|
||
|
)
|
||
|
|
||
|
|
||
|
def get_tld_names(
|
||
|
fail_silently: bool = False,
|
||
|
retry_count: int = 0,
|
||
|
parser_class: Type[BaseTLDSourceParser] = None
|
||
|
) -> Dict[str, Trie]:
|
||
|
"""Build the ``tlds`` list if empty. Recursive.
|
||
|
|
||
|
:param fail_silently: If set to True, no exceptions are raised and None
|
||
|
is returned on failure.
|
||
|
:param retry_count: If greater than 1, we raise an exception in order
|
||
|
to avoid infinite loops.
|
||
|
:param parser_class:
|
||
|
:type fail_silently: bool
|
||
|
:type retry_count: int
|
||
|
:type parser_class: BaseTLDSourceParser
|
||
|
:return: List of TLD names
|
||
|
:rtype: obj:`tld.utils.Trie`
|
||
|
"""
|
||
|
if not parser_class:
|
||
|
parser_class = MozillaTLDSourceParser
|
||
|
|
||
|
return parser_class.get_tld_names(
|
||
|
fail_silently=fail_silently,
|
||
|
retry_count=retry_count
|
||
|
)
|
||
|
|
||
|
|
||
|
# **************************************************************************
|
||
|
# **************************** Parser classes ******************************
|
||
|
# **************************************************************************
|
||
|
|
||
|
class BaseMozillaTLDSourceParser(BaseTLDSourceParser):
|
||
|
|
||
|
@classmethod
|
||
|
def get_tld_names(
|
||
|
cls,
|
||
|
fail_silently: bool = False,
|
||
|
retry_count: int = 0
|
||
|
) -> Optional[Dict[str, Trie]]:
|
||
|
"""Parse.
|
||
|
|
||
|
:param fail_silently:
|
||
|
:param retry_count:
|
||
|
:return:
|
||
|
"""
|
||
|
if retry_count > 1:
|
||
|
if fail_silently:
|
||
|
return None
|
||
|
else:
|
||
|
raise TldIOError
|
||
|
|
||
|
global tld_names
|
||
|
_tld_names = tld_names
|
||
|
# _tld_names = get_tld_names_container()
|
||
|
|
||
|
# If already loaded, return
|
||
|
if (
|
||
|
cls.local_path in _tld_names
|
||
|
and _tld_names[cls.local_path] is not None
|
||
|
):
|
||
|
return _tld_names
|
||
|
|
||
|
try:
|
||
|
# Load the TLD names file
|
||
|
if isabs(cls.local_path):
|
||
|
local_path = cls.local_path
|
||
|
else:
|
||
|
local_path = project_dir(cls.local_path)
|
||
|
local_file = codecs_open(
|
||
|
local_path,
|
||
|
'r',
|
||
|
encoding='utf8'
|
||
|
)
|
||
|
trie = Trie()
|
||
|
trie_add = trie.add # Performance opt
|
||
|
# Make a list of it all, strip all garbage
|
||
|
private_section = False
|
||
|
|
||
|
for line in local_file:
|
||
|
if '===BEGIN PRIVATE DOMAINS===' in line:
|
||
|
private_section = True
|
||
|
|
||
|
# Puny code TLD names
|
||
|
if '// xn--' in line:
|
||
|
line = line.split()[1]
|
||
|
|
||
|
if line[0] in ('/', '\n'):
|
||
|
continue
|
||
|
|
||
|
trie_add(
|
||
|
f'{line.strip()}',
|
||
|
private=private_section
|
||
|
)
|
||
|
|
||
|
update_tld_names_container(cls.local_path, trie)
|
||
|
|
||
|
local_file.close()
|
||
|
except IOError as err:
|
||
|
# Grab the file
|
||
|
cls.update_tld_names(
|
||
|
fail_silently=fail_silently
|
||
|
)
|
||
|
# Increment ``retry_count`` in order to avoid infinite loops
|
||
|
retry_count += 1
|
||
|
# Run again
|
||
|
return cls.get_tld_names(
|
||
|
fail_silently=fail_silently,
|
||
|
retry_count=retry_count
|
||
|
)
|
||
|
except Exception as err:
|
||
|
if fail_silently:
|
||
|
return None
|
||
|
else:
|
||
|
raise err
|
||
|
finally:
|
||
|
try:
|
||
|
local_file.close()
|
||
|
except Exception:
|
||
|
pass
|
||
|
|
||
|
return _tld_names
|
||
|
|
||
|
|
||
|
class MozillaTLDSourceParser(BaseMozillaTLDSourceParser):
|
||
|
"""Mozilla TLD source."""
|
||
|
|
||
|
uid: str = 'mozilla'
|
||
|
source_url: str = 'https://publicsuffix.org/list/public_suffix_list.dat'
|
||
|
local_path: str = 'res/effective_tld_names.dat.txt'
|
||
|
|
||
|
# **************************************************************************
|
||
|
# **************************** Core functions ******************************
|
||
|
# **************************************************************************
|
||
|
|
||
|
|
||
|
def process_url(
|
||
|
url: str,
|
||
|
fail_silently: bool = False,
|
||
|
fix_protocol: bool = False,
|
||
|
search_public: bool = True,
|
||
|
search_private: bool = True,
|
||
|
parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser
|
||
|
) -> Union[Tuple[List[str], int, SplitResult], Tuple[None, None, SplitResult]]:
|
||
|
"""Process URL.
|
||
|
|
||
|
:param parser_class:
|
||
|
:param url:
|
||
|
:param fail_silently:
|
||
|
:param fix_protocol:
|
||
|
:param search_public:
|
||
|
:param search_private:
|
||
|
:return:
|
||
|
"""
|
||
|
if not (search_public or search_private):
|
||
|
raise TldImproperlyConfigured(
|
||
|
"Either `search_public` or `search_private` (or both) shall be "
|
||
|
"set to True."
|
||
|
)
|
||
|
|
||
|
# Init
|
||
|
_tld_names = get_tld_names(
|
||
|
fail_silently=fail_silently,
|
||
|
parser_class=parser_class
|
||
|
)
|
||
|
|
||
|
if not isinstance(url, SplitResult):
|
||
|
url = url.lower()
|
||
|
|
||
|
if (
|
||
|
fix_protocol and not url.startswith(('//', 'http://', 'https://'))
|
||
|
):
|
||
|
url = f'https://{url}'
|
||
|
|
||
|
# Get parsed URL as we might need it later
|
||
|
parsed_url = urlsplit(url)
|
||
|
else:
|
||
|
parsed_url = url
|
||
|
|
||
|
# Get (sub) domain name
|
||
|
domain_name = parsed_url.hostname
|
||
|
|
||
|
if not domain_name:
|
||
|
if fail_silently:
|
||
|
return None, None, parsed_url
|
||
|
else:
|
||
|
raise TldBadUrl(url=url)
|
||
|
|
||
|
# This will correctly handle dots at the end of domain name in URLs like
|
||
|
# https://github.com............/barseghyanartur/tld/
|
||
|
if domain_name.endswith('.'):
|
||
|
domain_name = domain_name.rstrip('.')
|
||
|
|
||
|
domain_parts = domain_name.split('.')
|
||
|
tld_names_local_path = parser_class.local_path
|
||
|
|
||
|
# Now we query our Trie iterating on the domain parts in reverse order
|
||
|
node = _tld_names[tld_names_local_path].root
|
||
|
current_length = 0
|
||
|
tld_length = 0
|
||
|
match = None
|
||
|
len_domain_parts = len(domain_parts)
|
||
|
for i in range(len_domain_parts-1, -1, -1):
|
||
|
part = domain_parts[i]
|
||
|
|
||
|
# Cannot go deeper
|
||
|
if node.children is None:
|
||
|
break
|
||
|
|
||
|
# Exception
|
||
|
if part == node.exception:
|
||
|
break
|
||
|
|
||
|
child = node.children.get(part)
|
||
|
|
||
|
# Wildcards
|
||
|
if child is None:
|
||
|
child = node.children.get('*')
|
||
|
|
||
|
# If the current part is not in current node's children, we can stop
|
||
|
if child is None:
|
||
|
break
|
||
|
|
||
|
# Else we move deeper and increment our tld offset
|
||
|
current_length += 1
|
||
|
node = child
|
||
|
|
||
|
if node.leaf:
|
||
|
tld_length = current_length
|
||
|
match = node
|
||
|
|
||
|
# Checking the node we finished on is a leaf and is one we allow
|
||
|
if (
|
||
|
(match is None) or
|
||
|
(not match.leaf) or
|
||
|
(not search_public and not match.private) or
|
||
|
(not search_private and match.private)
|
||
|
):
|
||
|
if fail_silently:
|
||
|
return None, None, parsed_url
|
||
|
else:
|
||
|
raise TldDomainNotFound(domain_name=domain_name)
|
||
|
|
||
|
if len_domain_parts == tld_length:
|
||
|
non_zero_i = -1 # hostname = tld
|
||
|
else:
|
||
|
non_zero_i = max(1, len_domain_parts - tld_length)
|
||
|
|
||
|
return domain_parts, non_zero_i, parsed_url
|
||
|
|
||
|
|
||
|
def get_fld(
|
||
|
url: str,
|
||
|
fail_silently: bool = False,
|
||
|
fix_protocol: bool = False,
|
||
|
search_public: bool = True,
|
||
|
search_private: bool = True,
|
||
|
parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser,
|
||
|
**kwargs
|
||
|
) -> Optional[str]:
|
||
|
"""Extract the first level domain.
|
||
|
|
||
|
Extract the top level domain based on the mozilla's effective TLD names
|
||
|
dat file. Returns a string. May throw ``TldBadUrl`` or
|
||
|
``TldDomainNotFound`` exceptions if there's bad URL provided or no TLD
|
||
|
match found respectively.
|
||
|
|
||
|
:param url: URL to get top level domain from.
|
||
|
:param fail_silently: If set to True, no exceptions are raised and None
|
||
|
is returned on failure.
|
||
|
:param fix_protocol: If set to True, missing or wrong protocol is
|
||
|
ignored (https is appended instead).
|
||
|
:param search_public: If set to True, search in public domains.
|
||
|
:param search_private: If set to True, search in private domains.
|
||
|
:param parser_class:
|
||
|
:type url: str
|
||
|
:type fail_silently: bool
|
||
|
:type fix_protocol: bool
|
||
|
:type search_public: bool
|
||
|
:type search_private: bool
|
||
|
:return: String with top level domain (if ``as_object`` argument
|
||
|
is set to False) or a ``tld.utils.Result`` object (if ``as_object``
|
||
|
argument is set to True); returns None on failure.
|
||
|
:rtype: str
|
||
|
"""
|
||
|
if 'as_object' in kwargs:
|
||
|
raise TldImproperlyConfigured(
|
||
|
"`as_object` argument is deprecated for `get_fld`. Use `get_tld` "
|
||
|
"instead."
|
||
|
)
|
||
|
|
||
|
domain_parts, non_zero_i, parsed_url = process_url(
|
||
|
url=url,
|
||
|
fail_silently=fail_silently,
|
||
|
fix_protocol=fix_protocol,
|
||
|
search_public=search_public,
|
||
|
search_private=search_private,
|
||
|
parser_class=parser_class
|
||
|
)
|
||
|
|
||
|
if domain_parts is None:
|
||
|
return None
|
||
|
|
||
|
# This should be None when domain_parts is None
|
||
|
# but mypy isn't quite smart enough to figure that out yet
|
||
|
assert non_zero_i is not None
|
||
|
if non_zero_i < 0:
|
||
|
# hostname = tld
|
||
|
return parsed_url.hostname
|
||
|
|
||
|
return ".".join(domain_parts[non_zero_i-1:])
|
||
|
|
||
|
|
||
|
def get_tld(
|
||
|
url: str,
|
||
|
fail_silently: bool = False,
|
||
|
as_object: bool = False,
|
||
|
fix_protocol: bool = False,
|
||
|
search_public: bool = True,
|
||
|
search_private: bool = True,
|
||
|
parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser
|
||
|
) -> Optional[Union[str, Result]]:
|
||
|
"""Extract the top level domain.
|
||
|
|
||
|
Extract the top level domain based on the mozilla's effective TLD names
|
||
|
dat file. Returns a string. May throw ``TldBadUrl`` or
|
||
|
``TldDomainNotFound`` exceptions if there's bad URL provided or no TLD
|
||
|
match found respectively.
|
||
|
|
||
|
:param url: URL to get top level domain from.
|
||
|
:param fail_silently: If set to True, no exceptions are raised and None
|
||
|
is returned on failure.
|
||
|
:param as_object: If set to True, ``tld.utils.Result`` object is returned,
|
||
|
``domain``, ``suffix`` and ``tld`` properties.
|
||
|
:param fix_protocol: If set to True, missing or wrong protocol is
|
||
|
ignored (https is appended instead).
|
||
|
:param search_public: If set to True, search in public domains.
|
||
|
:param search_private: If set to True, search in private domains.
|
||
|
:param parser_class:
|
||
|
:type url: str
|
||
|
:type fail_silently: bool
|
||
|
:type as_object: bool
|
||
|
:type fix_protocol: bool
|
||
|
:type search_public: bool
|
||
|
:type search_private: bool
|
||
|
:return: String with top level domain (if ``as_object`` argument
|
||
|
is set to False) or a ``tld.utils.Result`` object (if ``as_object``
|
||
|
argument is set to True); returns None on failure.
|
||
|
:rtype: str
|
||
|
"""
|
||
|
domain_parts, non_zero_i, parsed_url = process_url(
|
||
|
url=url,
|
||
|
fail_silently=fail_silently,
|
||
|
fix_protocol=fix_protocol,
|
||
|
search_public=search_public,
|
||
|
search_private=search_private,
|
||
|
parser_class=parser_class
|
||
|
)
|
||
|
|
||
|
if domain_parts is None:
|
||
|
return None
|
||
|
|
||
|
# This should be None when domain_parts is None
|
||
|
# but mypy isn't quite smart enough to figure that out yet
|
||
|
assert non_zero_i is not None
|
||
|
|
||
|
if not as_object:
|
||
|
if non_zero_i < 0:
|
||
|
# hostname = tld
|
||
|
return parsed_url.hostname
|
||
|
return ".".join(domain_parts[non_zero_i:])
|
||
|
|
||
|
if non_zero_i < 0:
|
||
|
# hostname = tld
|
||
|
subdomain = ""
|
||
|
domain = ""
|
||
|
# This is checked in process_url but the type is ambiguous (Optional[str])
|
||
|
# so this assertion is just to satisfy mypy
|
||
|
assert parsed_url.hostname is not None, "No hostname in URL"
|
||
|
_tld = parsed_url.hostname
|
||
|
else:
|
||
|
subdomain = ".".join(domain_parts[:non_zero_i-1])
|
||
|
domain = ".".join(
|
||
|
domain_parts[non_zero_i-1:non_zero_i]
|
||
|
)
|
||
|
_tld = ".".join(domain_parts[non_zero_i:])
|
||
|
|
||
|
return Result(
|
||
|
subdomain=subdomain,
|
||
|
domain=domain,
|
||
|
tld=_tld,
|
||
|
parsed_url=parsed_url
|
||
|
)
|
||
|
|
||
|
|
||
|
def parse_tld(
|
||
|
url: str,
|
||
|
fail_silently: bool = False,
|
||
|
fix_protocol: bool = False,
|
||
|
search_public: bool = True,
|
||
|
search_private: bool = True,
|
||
|
parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser
|
||
|
) -> Union[Tuple[None, None, None], Tuple[str, str, str]]:
|
||
|
"""Parse TLD into parts.
|
||
|
|
||
|
:param url:
|
||
|
:param fail_silently:
|
||
|
:param fix_protocol:
|
||
|
:param search_public:
|
||
|
:param search_private:
|
||
|
:param parser_class:
|
||
|
:return: Tuple (tld, domain, subdomain)
|
||
|
:rtype: tuple
|
||
|
"""
|
||
|
try:
|
||
|
obj = get_tld(
|
||
|
url,
|
||
|
fail_silently=fail_silently,
|
||
|
as_object=True,
|
||
|
fix_protocol=fix_protocol,
|
||
|
search_public=search_public,
|
||
|
search_private=search_private,
|
||
|
parser_class=parser_class
|
||
|
)
|
||
|
if obj is None:
|
||
|
return None, None, None
|
||
|
|
||
|
return obj.tld, obj.domain, obj.subdomain # type: ignore
|
||
|
|
||
|
except (
|
||
|
TldBadUrl,
|
||
|
TldDomainNotFound,
|
||
|
TldImproperlyConfigured,
|
||
|
TldIOError
|
||
|
):
|
||
|
pass
|
||
|
|
||
|
return None, None, None
|
||
|
|
||
|
|
||
|
def is_tld(
|
||
|
value: str,
|
||
|
search_public: bool = True,
|
||
|
search_private: bool = True,
|
||
|
parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser
|
||
|
) -> bool:
|
||
|
"""Check if given URL is tld.
|
||
|
|
||
|
:param value: URL to get top level domain from.
|
||
|
:param search_public: If set to True, search in public domains.
|
||
|
:param search_private: If set to True, search in private domains.
|
||
|
:param parser_class:
|
||
|
:type value: str
|
||
|
:type search_public: bool
|
||
|
:type search_private: bool
|
||
|
:return:
|
||
|
:rtype: bool
|
||
|
"""
|
||
|
_tld = get_tld(
|
||
|
url=value,
|
||
|
fail_silently=True,
|
||
|
fix_protocol=True,
|
||
|
search_public=search_public,
|
||
|
search_private=search_private,
|
||
|
parser_class=parser_class
|
||
|
)
|
||
|
return value == _tld
|
||
|
|
||
|
|
||
|
def reset_tld_names(tld_names_local_path: str = None) -> None:
|
||
|
"""Reset the ``tld_names`` to empty value.
|
||
|
|
||
|
If ``tld_names_local_path`` is given, removes specified
|
||
|
entry from ``tld_names`` instead.
|
||
|
|
||
|
:param tld_names_local_path:
|
||
|
:type tld_names_local_path: str
|
||
|
:return:
|
||
|
"""
|
||
|
|
||
|
if tld_names_local_path:
|
||
|
pop_tld_names_container(tld_names_local_path)
|
||
|
else:
|
||
|
global tld_names
|
||
|
tld_names = {}
|