from __future__ import unicode_literals import argparse from codecs import open as codecs_open from functools import lru_cache # codecs_open = open from os.path import isabs import sys from typing import Dict, Type, Union, Tuple, List, Optional from urllib.parse import urlsplit, SplitResult from .base import BaseTLDSourceParser from .exceptions import ( TldBadUrl, TldDomainNotFound, TldImproperlyConfigured, TldIOError, ) from .helpers import project_dir from .trie import Trie from .registry import Registry from .result import Result __author__ = 'Artur Barseghyan' __copyright__ = '2013-2020 Artur Barseghyan' __license__ = 'MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later' __all__ = ( 'BaseMozillaTLDSourceParser', 'get_fld', 'get_tld', 'get_tld_names', 'get_tld_names_container', 'is_tld', 'MozillaTLDSourceParser', 'parse_tld', 'pop_tld_names_container', 'process_url', 'reset_tld_names', 'Result', 'tld_names', 'update_tld_names', 'update_tld_names_cli', 'update_tld_names_container', ) tld_names: Dict[str, Trie] = {} def get_tld_names_container() -> Dict[str, Trie]: """Get container of all tld names. :return: :rtype dict: """ global tld_names return tld_names def update_tld_names_container(tld_names_local_path: str, trie_obj: Trie) -> None: """Update TLD Names container item. :param tld_names_local_path: :param trie_obj: :return: """ global tld_names # tld_names.update({tld_names_local_path: trie_obj}) tld_names[tld_names_local_path] = trie_obj def pop_tld_names_container(tld_names_local_path: str) -> None: """Remove TLD names container item. :param tld_names_local_path: :return: """ global tld_names tld_names.pop(tld_names_local_path, None) @lru_cache(maxsize=128, typed=True) def update_tld_names( fail_silently: bool = False, parser_uid: str = None ) -> bool: """Update TLD names. :param fail_silently: :param parser_uid: :return: """ results: List[bool] = [] results_append = results.append if parser_uid: parser_cls = Registry.get(parser_uid, None) if parser_cls and parser_cls.source_url: results_append( parser_cls.update_tld_names(fail_silently=fail_silently) ) else: for parser_uid, parser_cls in Registry.items(): if parser_cls and parser_cls.source_url: results_append( parser_cls.update_tld_names(fail_silently=fail_silently) ) return all(results) def update_tld_names_cli() -> int: """CLI wrapper for update_tld_names. Since update_tld_names returns True on success, we need to negate the result to match CLI semantics. """ parser = argparse.ArgumentParser(description='Update TLD names') parser.add_argument( 'parser_uid', nargs='?', default=None, help="UID of the parser to update TLD names for.", ) parser.add_argument( '--fail-silently', dest="fail_silently", default=False, action='store_true', help="Fail silently", ) args = parser.parse_args(sys.argv[1:]) parser_uid = args.parser_uid fail_silently = args.fail_silently return int( not update_tld_names( parser_uid=parser_uid, fail_silently=fail_silently ) ) def get_tld_names( fail_silently: bool = False, retry_count: int = 0, parser_class: Type[BaseTLDSourceParser] = None ) -> Dict[str, Trie]: """Build the ``tlds`` list if empty. Recursive. :param fail_silently: If set to True, no exceptions are raised and None is returned on failure. :param retry_count: If greater than 1, we raise an exception in order to avoid infinite loops. :param parser_class: :type fail_silently: bool :type retry_count: int :type parser_class: BaseTLDSourceParser :return: List of TLD names :rtype: obj:`tld.utils.Trie` """ if not parser_class: parser_class = MozillaTLDSourceParser return parser_class.get_tld_names( fail_silently=fail_silently, retry_count=retry_count ) # ************************************************************************** # **************************** Parser classes ****************************** # ************************************************************************** class BaseMozillaTLDSourceParser(BaseTLDSourceParser): @classmethod def get_tld_names( cls, fail_silently: bool = False, retry_count: int = 0 ) -> Optional[Dict[str, Trie]]: """Parse. :param fail_silently: :param retry_count: :return: """ if retry_count > 1: if fail_silently: return None else: raise TldIOError global tld_names _tld_names = tld_names # _tld_names = get_tld_names_container() # If already loaded, return if ( cls.local_path in _tld_names and _tld_names[cls.local_path] is not None ): return _tld_names try: # Load the TLD names file if isabs(cls.local_path): local_path = cls.local_path else: local_path = project_dir(cls.local_path) local_file = codecs_open( local_path, 'r', encoding='utf8' ) trie = Trie() trie_add = trie.add # Performance opt # Make a list of it all, strip all garbage private_section = False for line in local_file: if '===BEGIN PRIVATE DOMAINS===' in line: private_section = True # Puny code TLD names if '// xn--' in line: line = line.split()[1] if line[0] in ('/', '\n'): continue trie_add( f'{line.strip()}', private=private_section ) update_tld_names_container(cls.local_path, trie) local_file.close() except IOError as err: # Grab the file cls.update_tld_names( fail_silently=fail_silently ) # Increment ``retry_count`` in order to avoid infinite loops retry_count += 1 # Run again return cls.get_tld_names( fail_silently=fail_silently, retry_count=retry_count ) except Exception as err: if fail_silently: return None else: raise err finally: try: local_file.close() except Exception: pass return _tld_names class MozillaTLDSourceParser(BaseMozillaTLDSourceParser): """Mozilla TLD source.""" uid: str = 'mozilla' source_url: str = 'https://publicsuffix.org/list/public_suffix_list.dat' local_path: str = 'res/effective_tld_names.dat.txt' # ************************************************************************** # **************************** Core functions ****************************** # ************************************************************************** def process_url( url: str, fail_silently: bool = False, fix_protocol: bool = False, search_public: bool = True, search_private: bool = True, parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser ) -> Union[Tuple[List[str], int, SplitResult], Tuple[None, None, SplitResult]]: """Process URL. :param parser_class: :param url: :param fail_silently: :param fix_protocol: :param search_public: :param search_private: :return: """ if not (search_public or search_private): raise TldImproperlyConfigured( "Either `search_public` or `search_private` (or both) shall be " "set to True." ) # Init _tld_names = get_tld_names( fail_silently=fail_silently, parser_class=parser_class ) if not isinstance(url, SplitResult): url = url.lower() if ( fix_protocol and not url.startswith(('//', 'http://', 'https://')) ): url = f'https://{url}' # Get parsed URL as we might need it later parsed_url = urlsplit(url) else: parsed_url = url # Get (sub) domain name domain_name = parsed_url.hostname if not domain_name: if fail_silently: return None, None, parsed_url else: raise TldBadUrl(url=url) # This will correctly handle dots at the end of domain name in URLs like # https://github.com............/barseghyanartur/tld/ if domain_name.endswith('.'): domain_name = domain_name.rstrip('.') domain_parts = domain_name.split('.') tld_names_local_path = parser_class.local_path # Now we query our Trie iterating on the domain parts in reverse order node = _tld_names[tld_names_local_path].root current_length = 0 tld_length = 0 match = None len_domain_parts = len(domain_parts) for i in range(len_domain_parts-1, -1, -1): part = domain_parts[i] # Cannot go deeper if node.children is None: break # Exception if part == node.exception: break child = node.children.get(part) # Wildcards if child is None: child = node.children.get('*') # If the current part is not in current node's children, we can stop if child is None: break # Else we move deeper and increment our tld offset current_length += 1 node = child if node.leaf: tld_length = current_length match = node # Checking the node we finished on is a leaf and is one we allow if ( (match is None) or (not match.leaf) or (not search_public and not match.private) or (not search_private and match.private) ): if fail_silently: return None, None, parsed_url else: raise TldDomainNotFound(domain_name=domain_name) if len_domain_parts == tld_length: non_zero_i = -1 # hostname = tld else: non_zero_i = max(1, len_domain_parts - tld_length) return domain_parts, non_zero_i, parsed_url def get_fld( url: str, fail_silently: bool = False, fix_protocol: bool = False, search_public: bool = True, search_private: bool = True, parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser, **kwargs ) -> Optional[str]: """Extract the first level domain. Extract the top level domain based on the mozilla's effective TLD names dat file. Returns a string. May throw ``TldBadUrl`` or ``TldDomainNotFound`` exceptions if there's bad URL provided or no TLD match found respectively. :param url: URL to get top level domain from. :param fail_silently: If set to True, no exceptions are raised and None is returned on failure. :param fix_protocol: If set to True, missing or wrong protocol is ignored (https is appended instead). :param search_public: If set to True, search in public domains. :param search_private: If set to True, search in private domains. :param parser_class: :type url: str :type fail_silently: bool :type fix_protocol: bool :type search_public: bool :type search_private: bool :return: String with top level domain (if ``as_object`` argument is set to False) or a ``tld.utils.Result`` object (if ``as_object`` argument is set to True); returns None on failure. :rtype: str """ if 'as_object' in kwargs: raise TldImproperlyConfigured( "`as_object` argument is deprecated for `get_fld`. Use `get_tld` " "instead." ) domain_parts, non_zero_i, parsed_url = process_url( url=url, fail_silently=fail_silently, fix_protocol=fix_protocol, search_public=search_public, search_private=search_private, parser_class=parser_class ) if domain_parts is None: return None # This should be None when domain_parts is None # but mypy isn't quite smart enough to figure that out yet assert non_zero_i is not None if non_zero_i < 0: # hostname = tld return parsed_url.hostname return ".".join(domain_parts[non_zero_i-1:]) def get_tld( url: str, fail_silently: bool = False, as_object: bool = False, fix_protocol: bool = False, search_public: bool = True, search_private: bool = True, parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser ) -> Optional[Union[str, Result]]: """Extract the top level domain. Extract the top level domain based on the mozilla's effective TLD names dat file. Returns a string. May throw ``TldBadUrl`` or ``TldDomainNotFound`` exceptions if there's bad URL provided or no TLD match found respectively. :param url: URL to get top level domain from. :param fail_silently: If set to True, no exceptions are raised and None is returned on failure. :param as_object: If set to True, ``tld.utils.Result`` object is returned, ``domain``, ``suffix`` and ``tld`` properties. :param fix_protocol: If set to True, missing or wrong protocol is ignored (https is appended instead). :param search_public: If set to True, search in public domains. :param search_private: If set to True, search in private domains. :param parser_class: :type url: str :type fail_silently: bool :type as_object: bool :type fix_protocol: bool :type search_public: bool :type search_private: bool :return: String with top level domain (if ``as_object`` argument is set to False) or a ``tld.utils.Result`` object (if ``as_object`` argument is set to True); returns None on failure. :rtype: str """ domain_parts, non_zero_i, parsed_url = process_url( url=url, fail_silently=fail_silently, fix_protocol=fix_protocol, search_public=search_public, search_private=search_private, parser_class=parser_class ) if domain_parts is None: return None # This should be None when domain_parts is None # but mypy isn't quite smart enough to figure that out yet assert non_zero_i is not None if not as_object: if non_zero_i < 0: # hostname = tld return parsed_url.hostname return ".".join(domain_parts[non_zero_i:]) if non_zero_i < 0: # hostname = tld subdomain = "" domain = "" # This is checked in process_url but the type is ambiguous (Optional[str]) # so this assertion is just to satisfy mypy assert parsed_url.hostname is not None, "No hostname in URL" _tld = parsed_url.hostname else: subdomain = ".".join(domain_parts[:non_zero_i-1]) domain = ".".join( domain_parts[non_zero_i-1:non_zero_i] ) _tld = ".".join(domain_parts[non_zero_i:]) return Result( subdomain=subdomain, domain=domain, tld=_tld, parsed_url=parsed_url ) def parse_tld( url: str, fail_silently: bool = False, fix_protocol: bool = False, search_public: bool = True, search_private: bool = True, parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser ) -> Union[Tuple[None, None, None], Tuple[str, str, str]]: """Parse TLD into parts. :param url: :param fail_silently: :param fix_protocol: :param search_public: :param search_private: :param parser_class: :return: Tuple (tld, domain, subdomain) :rtype: tuple """ try: obj = get_tld( url, fail_silently=fail_silently, as_object=True, fix_protocol=fix_protocol, search_public=search_public, search_private=search_private, parser_class=parser_class ) if obj is None: return None, None, None return obj.tld, obj.domain, obj.subdomain # type: ignore except ( TldBadUrl, TldDomainNotFound, TldImproperlyConfigured, TldIOError ): pass return None, None, None def is_tld( value: str, search_public: bool = True, search_private: bool = True, parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser ) -> bool: """Check if given URL is tld. :param value: URL to get top level domain from. :param search_public: If set to True, search in public domains. :param search_private: If set to True, search in private domains. :param parser_class: :type value: str :type search_public: bool :type search_private: bool :return: :rtype: bool """ _tld = get_tld( url=value, fail_silently=True, fix_protocol=True, search_public=search_public, search_private=search_private, parser_class=parser_class ) return value == _tld def reset_tld_names(tld_names_local_path: str = None) -> None: """Reset the ``tld_names`` to empty value. If ``tld_names_local_path`` is given, removes specified entry from ``tld_names`` instead. :param tld_names_local_path: :type tld_names_local_path: str :return: """ if tld_names_local_path: pop_tld_names_container(tld_names_local_path) else: global tld_names tld_names = {}