From 3a0085a155ad81784b6132ec1b8048b4d626bf95 Mon Sep 17 00:00:00 2001 From: Vitiko Date: Thu, 2 Feb 2023 16:24:11 -0400 Subject: [PATCH] Add Subscene Cloudscraper Provider [Experimental] An alternative module to the Subscene provider. * Uses similar parsing mechanisms of the Subfm2 provider * Uses cloudscraper for server calls * No login required Not yet implemented in Web UI. --- .../providers/subscene_cloudscraper.py | 411 ++++++++++++++++++ tests/subliminal_patch/test_subscene.py | 50 +++ 2 files changed, 461 insertions(+) create mode 100644 libs/subliminal_patch/providers/subscene_cloudscraper.py create mode 100644 tests/subliminal_patch/test_subscene.py diff --git a/libs/subliminal_patch/providers/subscene_cloudscraper.py b/libs/subliminal_patch/providers/subscene_cloudscraper.py new file mode 100644 index 000000000..89418cfb6 --- /dev/null +++ b/libs/subliminal_patch/providers/subscene_cloudscraper.py @@ -0,0 +1,411 @@ +# -*- coding: utf-8 -*- + +from difflib import SequenceMatcher +import functools +import logging +import re +import time +import urllib.parse + +from bs4 import BeautifulSoup as bso +import cloudscraper +from guessit import guessit +from requests import Session +from requests.exceptions import HTTPError +from subliminal_patch.core import Episode +from subliminal_patch.core import Movie +from subliminal_patch.exceptions import APIThrottled +from subliminal_patch.providers import Provider +from subliminal_patch.providers.utils import get_archive_from_bytes +from subliminal_patch.providers.utils import get_subtitle_from_archive +from subliminal_patch.providers.utils import update_matches +from subliminal_patch.subtitle import Subtitle +from subzero.language import Language + +from libs.subliminal.exceptions import ProviderError + +logger = logging.getLogger(__name__) + + +class SubsceneSubtitle(Subtitle): + provider_name = "subscene_cloudscraper" + hash_verifiable = False + + def __init__(self, language, page_link, release_info, episode_number=None): + super().__init__(language, page_link=page_link) + + self.release_info = release_info + self.episode_number = episode_number + self.episode_title = None + + self._matches = set( + ("title", "year") + if episode_number is None + else ("title", "series", "year", "season", "episode") + ) + + def get_matches(self, video): + update_matches(self._matches, video, self.release_info) + + return self._matches + + @property + def id(self): + return self.page_link + + +_BASE_URL = "https://subscene.com" + +# TODO: add more seasons and languages + +_SEASONS = ( + "First", + "Second", + "Third", + "Fourth", + "Fifth", + "Sixth", + "Seventh", + "Eighth", + "Ninth", + "Tenth", + "Eleventh", + "Twelfth", + "Thirdteenth", + "Fourthteenth", + "Fifteenth", + "Sixteenth", + "Seventeenth", + "Eightheenth", + "Nineteenth", + "Tweentieth", +) + +_LANGUAGE_MAP = { + "english": "eng", + "farsi_persian": "per", + "arabic": "ara", + "spanish": "spa", + "portuguese": "por", + "italian": "ita", + "dutch": "dut", + "hebrew": "heb", + "indonesian": "ind", + "danish": "dan", + "norwegian": "nor", + "bengali": "ben", + "bulgarian": "bul", + "croatian": "hrv", + "swedish": "swe", + "vietnamese": "vie", + "czech": "cze", + "finnish": "fin", + "french": "fre", + "german": "ger", + "greek": "gre", + "hungarian": "hun", + "icelandic": "ice", + "japanese": "jpn", + "macedonian": "mac", + "malay": "may", + "polish": "pol", + "romanian": "rum", + "russian": "rus", + "serbian": "srp", + "thai": "tha", + "turkish": "tur", +} + + +class SubsceneProvider(Provider): + provider_name = "subscene_cloudscraper" + + _movie_title_regex = re.compile(r"^(.+?)( \((\d{4})\))?$") + _tv_show_title_regex = re.compile( + r"^(.+?) [-\(]\s?(.*?) (season|series)\)?( \((\d{4})\))?$" + ) + _supported_languages = {} + _supported_languages["brazillian-portuguese"] = Language("por", "BR") + + for key, val in _LANGUAGE_MAP.items(): + _supported_languages[key] = Language.fromalpha3b(val) + + _supported_languages_reversed = { + val: key for key, val in _supported_languages.items() + } + + languages = set(_supported_languages.values()) + + video_types = (Episode, Movie) + subtitle_class = SubsceneSubtitle + + def initialize(self): + pass + + def terminate(self): + pass + + def _scraper_call(self, url, retry=7, method="GET", sleep=5, **kwargs): + last_exc = None + + for n in range(retry): + # Creating an instance for every try in order to avoid dropped connections. + + # This could probably be improved! + scraper = cloudscraper.create_scraper() + if method == "GET": + req = scraper.get(url, **kwargs) + elif method == "POST": + req = scraper.post(url, **kwargs) + else: + raise NotImplementedError(f"{method} not allowed") + + try: + req.raise_for_status() + except HTTPError as error: + logger.debug( + "'%s' returned. Trying again [%d] in %s", error, n + 1, sleep + ) + last_exc = error + time.sleep(sleep) + else: + return req + + raise ProviderError("403 Retry count exceeded") from last_exc + + def _gen_results(self, query): + url = ( + f"{_BASE_URL}/subtitles/searchbytitle?query={urllib.parse.quote(query)}&l=" + ) + + result = self._scraper_call(url, method="POST") + soup = bso(result.content, "html.parser") + + for title in soup.select("li div[class='title'] a"): + yield title + + def _search_movie(self, title, year): + title = title.lower() + year = str(year) + + found_movie = None + + results = [] + for result in self._gen_results(title): + text = result.text.lower() + match = self._movie_title_regex.match(text) + if not match: + continue + match_title = match.group(1) + match_year = match.group(3) + if year == match_year: + results.append( + { + "href": result.get("href"), + "similarity": SequenceMatcher(None, title, match_title).ratio(), + } + ) + + if results: + results.sort(key=lambda x: x["similarity"], reverse=True) + found_movie = results[0]["href"] + logger.debug("Movie found: %s", results[0]) + return found_movie + + def _search_tv_show_season(self, title, season, year=None): + try: + season_str = _SEASONS[season - 1].lower() + except IndexError: + logger.debug("Season number not supported: %s", season) + return None + + found_tv_show_season = None + + results = [] + for result in self._gen_results(title): + text = result.text.lower() + + match = self._tv_show_title_regex.match(text) + if not match: + logger.debug("Series title not matched: %s", text) + continue + else: + logger.debug("Series title matched: %s", text) + + match_title = match.group(1) + match_season = match.group(2) + + # Match "complete series" titles as they usually contain season packs + if season_str == match_season or "complete" in match_season: + plus = 0.1 if year and str(year) in text else 0 + results.append( + { + "href": result.get("href"), + "similarity": SequenceMatcher(None, title, match_title).ratio() + + plus, + } + ) + + if results: + results.sort(key=lambda x: x["similarity"], reverse=True) + found_tv_show_season = results[0]["href"] + logger.debug("TV Show season found: %s", results[0]) + + return found_tv_show_season + + def _find_movie_subtitles(self, path, language): + soup = self._get_subtitle_page_soup(path, language) + + subtitles = [] + for item in soup.select("tr"): + subtitle = _get_subtitle_from_item(item, language) + if subtitle is None: + continue + + logger.debug("Found subtitle: %s", subtitle) + subtitles.append(subtitle) + + return subtitles + + def _find_episode_subtitles( + self, path, season, episode, language, episode_title=None + ): + soup = self._get_subtitle_page_soup(path, language) + + subtitles = [] + + for item in soup.select("tr"): + valid_item = None + clean_text = " ".join(item.text.split()) + + if not clean_text: + continue + + # It will return list values + guess = _memoized_episode_guess(clean_text) + + if "season" not in guess: + if "complete series" in clean_text.lower(): + logger.debug("Complete series pack found: %s", clean_text) + guess["season"] = [season] + else: + logger.debug("Nothing guessed from release: %s", clean_text) + continue + + if season in guess["season"] and episode in guess.get("episode", []): + logger.debug("Episode match found: %s - %s", guess, clean_text) + valid_item = item + + elif season in guess["season"] and not "episode" in guess: + logger.debug("Season pack found: %s", clean_text) + valid_item = item + + if valid_item is None: + continue + + subtitle = _get_subtitle_from_item(item, language, episode) + + if subtitle is None: + continue + + subtitle.episode_title = episode_title + + logger.debug("Found subtitle: %s", subtitle) + subtitles.append(subtitle) + + return subtitles + + def _get_subtitle_page_soup(self, path, language): + language_path = self._supported_languages_reversed[language] + result = self._scraper_call(f"{_BASE_URL}{path}/{language_path}") + return bso(result.content, "html.parser") + + def list_subtitles(self, video, languages): + is_episode = isinstance(video, Episode) + + if is_episode: + result = self._search_tv_show_season(video.series, video.season, video.year) + else: + result = self._search_movie(video.title, video.year) + + if result is None: + logger.debug("No results") + return [] + + subtitles = [] + + for language in languages: + if is_episode: + subtitles.extend( + self._find_episode_subtitles( + result, video.season, video.episode, language, video.title + ) + ) + else: + subtitles.extend(self._find_movie_subtitles(result, language)) + + return subtitles + + def download_subtitle(self, subtitle): + # TODO: add MustGetBlacklisted support + + result = self._scraper_call(subtitle.page_link) + soup = bso(result.content, "html.parser") + try: + download_url = _BASE_URL + str( + soup.select_one("a[id='downloadButton']")["href"] # type: ignore + ) + except (AttributeError, KeyError, TypeError): + raise APIThrottled(f"Couldn't get download url from {subtitle.page_link}") + + downloaded = self._scraper_call(download_url) + archive = get_archive_from_bytes(downloaded.content) + + if archive is None: + raise APIThrottled(f"Invalid archive: {subtitle.page_link}") + + subtitle.content = get_subtitle_from_archive( + archive, + episode=subtitle.episode_number, + episode_title=subtitle.episode_title, + ) + + +@functools.lru_cache(2048) +def _memoized_episode_guess(content): + # Use include to save time from unnecessary checks + return guessit( + content, + { + "type": "episode", + # Add codec keys to avoid matching x264, 5.1, etc as episode info + "includes": ["season", "episode", "video_codec", "audio_codec"], + "enforce_list": True, + }, + ) + + +def _get_subtitle_from_item(item, language, episode_number=None): + release_infos = [] + + try: + release_infos.append(item.find("td", {"class": "a6"}).text.strip()) + except (AttributeError, KeyError): + pass + + try: + release_infos.append( + item.find("td", {"class": "a1"}).find_all("span")[-1].text.strip() + ) + except (AttributeError, KeyError): + pass + + release_info = "".join(r_info for r_info in release_infos if r_info) + + try: + path = item.find("td", {"class": "a1"}).find("a")["href"] + except (AttributeError, KeyError): + logger.debug("Couldn't get path: %s", item) + return None + + return SubsceneSubtitle(language, _BASE_URL + path, release_info, episode_number) diff --git a/tests/subliminal_patch/test_subscene.py b/tests/subliminal_patch/test_subscene.py new file mode 100644 index 000000000..72063aae3 --- /dev/null +++ b/tests/subliminal_patch/test_subscene.py @@ -0,0 +1,50 @@ +from subliminal_patch.providers import subscene_cloudscraper as subscene + + +def test_provider_scraper_call(): + with subscene.SubsceneProvider() as provider: + result = provider._scraper_call( + "https://subscene.com/subtitles/breaking-bad-fifth-season" + ) + assert result.status_code == 200 + + +def test_provider_gen_results(): + with subscene.SubsceneProvider() as provider: + assert list(provider._gen_results("Breaking Bad")) + + +def test_provider_search_movie(): + with subscene.SubsceneProvider() as provider: + result = provider._search_movie("Taxi Driver", 1976) + assert result == "/subtitles/taxi-driver" + + +def test_provider_find_movie_subtitles(languages): + with subscene.SubsceneProvider() as provider: + result = provider._find_movie_subtitles( + "/subtitles/taxi-driver", languages["en"] + ) + assert result + + +def test_provider_search_tv_show_season(): + with subscene.SubsceneProvider() as provider: + result = provider._search_tv_show_season("The Wire", 1) + assert result == "/subtitles/the-wire--first-season" + + +def test_provider_find_episode_subtitles(languages): + with subscene.SubsceneProvider() as provider: + result = provider._find_episode_subtitles( + "/subtitles/the-wire--first-season", 1, 1, languages["en"] + ) + assert result + + +def test_provider_download_subtitle(languages): + path = "https://subscene.com/subtitles/the-wire--first-season/english/115904" + subtitle = subscene.SubsceneSubtitle(languages["en"], path, "", 1) + with subscene.SubsceneProvider() as provider: + provider.download_subtitle(subtitle) + assert subtitle.is_valid()