bazarr/libs/subliminal_patch/providers/subf2m.py

522 lines
16 KiB
Python
Raw Normal View History

2022-04-18 02:39:37 +00:00
# -*- coding: utf-8 -*-
from difflib import SequenceMatcher
import functools
2022-04-18 02:39:37 +00:00
import logging
import re
import time
import urllib.parse
2022-04-18 02:39:37 +00:00
from bs4 import BeautifulSoup as bso
from guessit import guessit
from requests import Session
from subliminal.exceptions import ConfigurationError
2022-04-18 02:39:37 +00:00
from subliminal_patch.core import Episode
from subliminal_patch.core import Movie
from subliminal_patch.exceptions import APIThrottled
2022-04-18 02:39:37 +00:00
from subliminal_patch.providers import Provider
2022-04-18 20:58:11 +00:00
from subliminal_patch.providers.utils import get_archive_from_bytes
from subliminal_patch.providers.utils import get_subtitle_from_archive
from subliminal_patch.providers.utils import update_matches
from subliminal_patch.subtitle import Subtitle
2022-04-18 02:39:37 +00:00
from subzero.language import Language
logger = logging.getLogger(__name__)
class Subf2mSubtitle(Subtitle):
provider_name = "subf2m"
hash_verifiable = False
def __init__(self, language, page_link, release_info, episode_number=None):
2022-04-18 02:39:37 +00:00
super().__init__(language, page_link=page_link)
self.release_info = release_info
self.episode_number = episode_number
self.episode_title = None
self._matches = set(
("title", "year", "imdb_id")
if episode_number is None
else ("title", "series", "year", "season", "episode", "imdb_id")
)
2022-04-18 02:39:37 +00:00
def get_matches(self, video):
update_matches(self._matches, video, self.release_info)
2022-04-18 02:39:37 +00:00
return self._matches
@property
def id(self):
return self.page_link
_BASE_URL = "https://subf2m.co"
# TODO: add more seasons and languages
_SEASONS = (
"First",
"Second",
"Third",
"Fourth",
"Fifth",
"Sixth",
"Seventh",
"Eighth",
"Ninth",
"Tenth",
"Eleventh",
"Twelfth",
"Thirdteenth",
"Fourthteenth",
"Fifteenth",
"Sixteenth",
"Seventeenth",
"Eightheenth",
"Nineteenth",
"Tweentieth",
)
_LANGUAGE_MAP = {
"english": "eng",
"farsi_persian": "per",
"arabic": "ara",
"spanish": "spa",
"portuguese": "por",
"italian": "ita",
"dutch": "dut",
"hebrew": "heb",
"indonesian": "ind",
"danish": "dan",
"norwegian": "nor",
"bengali": "ben",
"bulgarian": "bul",
"croatian": "hrv",
"swedish": "swe",
"vietnamese": "vie",
"czech": "cze",
"finnish": "fin",
"french": "fre",
"german": "ger",
"greek": "gre",
"hungarian": "hun",
"icelandic": "ice",
"japanese": "jpn",
"macedonian": "mac",
"malay": "may",
"polish": "pol",
"romanian": "rum",
"russian": "rus",
"serbian": "srp",
"thai": "tha",
"turkish": "tur",
2022-04-18 02:39:37 +00:00
}
_DEFAULT_HEADERS = {
"authority": "subf2m.co",
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "en-US,en;q=0.9",
"referer": "https://subf2m.co",
"sec-ch-ua": '"Chromium";v="111", "Not(A:Brand";v="8"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Unknown"',
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "same-origin",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
}
2022-04-18 02:39:37 +00:00
2022-04-18 20:58:11 +00:00
class Subf2mProvider(Provider):
2022-04-18 02:39:37 +00:00
provider_name = "subf2m"
_movie_title_regex = re.compile(r"^(.+?)( \((\d{4})\))?$")
_tv_show_title_regex = re.compile(
r"^(.+?) [-\(]\s?(.*?) (season|series)\)?( \((\d{4})\))?$"
)
_tv_show_title_alt_regex = re.compile(r"(.+)\s(\d{1,2})(?:\s|$)")
2022-04-18 02:39:37 +00:00
_supported_languages = {}
_supported_languages["brazillian-portuguese"] = Language("por", "BR")
for key, val in _LANGUAGE_MAP.items():
_supported_languages[key] = Language.fromalpha3b(val)
_supported_languages_reversed = {
val: key for key, val in _supported_languages.items()
}
languages = set(_supported_languages.values())
video_types = (Episode, Movie)
subtitle_class = Subf2mSubtitle
def __init__(self, user_agent, verify_ssl=True, session_factory=None):
super().__init__()
if not (user_agent or "").strip():
raise ConfigurationError("User-agent config missing")
self._user_agent = user_agent
self._verify_ssl = verify_ssl
self._session_factory = session_factory
2022-04-18 02:39:37 +00:00
def initialize(self):
if self._session_factory is not None:
self._session = self._session_factory()
else:
logger.debug("No session factory set. Using default requests.Session.")
self._session = Session()
self._session.verify = self._verify_ssl
self._session.headers.update(_DEFAULT_HEADERS)
self._session.headers.update({"user-agent": self._user_agent})
2022-04-18 02:39:37 +00:00
def terminate(self):
self._session.close()
def _safe_get_text(self, url, retry=3, default_return=""):
req = None
for n in range(retry):
req = self._session.get(url, stream=True)
2022-12-17 05:48:05 +00:00
if req.status_code == 403:
logger.debug("Access to this resource is forbidden: %s", url)
break
2023-06-14 08:08:36 +00:00
# Sometimes subf2m will return 404 or 503. This error usually disappears
# retrying the query
2023-06-14 08:08:36 +00:00
if req.status_code in (404, 503):
logger.debug("503/404 returned. Trying again [%d] in 3 seconds", n + 1)
time.sleep(3)
continue
else:
req.raise_for_status()
break
if req is not None:
return "\n".join(
line for line in req.iter_lines(decode_unicode=True) if line
)
return default_return
2022-04-18 02:39:37 +00:00
def _gen_results(self, query):
query = urllib.parse.quote(query)
url = f"{_BASE_URL}/subtitles/searchbytitle?query={query}&l="
text = self._safe_get_text(url)
2022-04-18 02:39:37 +00:00
soup = bso(text, "html.parser")
for title in soup.select("li div[class='title'] a"):
yield title
def _search_movie(self, title, year, return_len=3):
2022-04-18 02:39:37 +00:00
title = title.lower()
year = str(year)
2022-04-18 02:39:37 +00:00
results = []
2022-04-18 02:39:37 +00:00
for result in self._gen_results(title):
text = result.text.lower()
match = self._movie_title_regex.match(text)
if not match:
continue
match_title = match.group(1)
match_year = match.group(3)
if year == match_year:
results.append(
{
"href": result.get("href"),
"similarity": SequenceMatcher(None, title, match_title).ratio(),
}
)
if results:
results.sort(key=lambda x: x["similarity"], reverse=True)
results = [result["href"] for result in results]
if results:
results = set(results[:return_len])
logger.debug("Results: %s", results)
return results
return []
2022-04-18 02:39:37 +00:00
def _search_tv_show_season(self, title, season, year=None, return_len=3):
2022-04-18 02:39:37 +00:00
try:
season_strs = (_SEASONS[season - 1].lower(), str(season))
2022-04-18 02:39:37 +00:00
except IndexError:
logger.debug("Season number not supported: %s", season)
return None
results = []
2022-04-18 02:39:37 +00:00
for result in self._gen_results(title):
text = result.text.lower()
match = self._tv_show_title_regex.match(text)
if not match:
match = self._tv_show_title_alt_regex.match(text)
if not match:
logger.debug("Series title not matched: %s", text)
continue
match_title = match.group(1).strip()
match_season = match.group(2).strip().lower()
if match_season in season_strs or "complete" in match_season:
logger.debug("OK: '%s' IN %s|complete", match_season, season_strs)
plus = 0.1 if year and str(year) in text else 0
results.append(
{
"href": result.get("href"),
"similarity": SequenceMatcher(None, title, match_title).ratio()
+ plus,
}
)
else:
logger.debug("Invalid: '%s' IN %s|complete", match_season, season_strs)
if results:
results.sort(key=lambda x: x["similarity"], reverse=True)
results = [result["href"] for result in results]
if results:
results = set(results[:return_len])
logger.debug("Results: %s", results)
return results
2022-04-18 02:39:37 +00:00
return []
2022-04-18 02:39:37 +00:00
def _find_movie_subtitles(self, path, language, imdb_id):
2022-04-18 02:39:37 +00:00
soup = self._get_subtitle_page_soup(path, language)
imdb_matched = _match_imdb(soup, imdb_id)
if not imdb_matched:
return []
2022-04-18 02:39:37 +00:00
subtitles = []
for item in soup.select("li.item"):
subtitle = _get_subtitle_from_item(item, language)
2022-04-18 02:39:37 +00:00
if subtitle is None:
continue
logger.debug("Found subtitle: %s", subtitle)
subtitles.append(subtitle)
return subtitles
def _find_episode_subtitles(
self, path, season, episode, language, episode_title=None, imdb_id=None
):
2022-04-18 02:39:37 +00:00
soup = self._get_subtitle_page_soup(path, language)
imdb_matched = _match_imdb(soup, imdb_id)
if not imdb_matched:
return []
2022-04-18 02:39:37 +00:00
subtitles = []
for item in soup.select("li.item"):
valid_item = None
clean_text = " ".join(item.text.split())
if not clean_text:
continue
# First try with the special episode matches for subf2m
guess = _get_episode_from_release(clean_text)
if guess is None:
guess = _memoized_episode_guess(clean_text)
if "season" not in guess:
if "complete series" in clean_text.lower():
logger.debug("Complete series pack found: %s", clean_text)
guess["season"] = [season]
else:
logger.debug("Nothing guessed from release: %s", clean_text)
continue
if season in guess["season"] and episode in guess.get("episode", []):
logger.debug("Episode match found: %s - %s", guess, clean_text)
valid_item = item
2022-04-18 02:39:37 +00:00
elif season in guess["season"] and not "episode" in guess:
logger.debug("Season pack found: %s", clean_text)
valid_item = item
if valid_item is None:
continue
subtitle = _get_subtitle_from_item(item, language, episode)
if subtitle is None:
continue
subtitle.episode_title = episode_title
logger.debug("Found subtitle: %s", subtitle)
subtitles.append(subtitle)
2022-04-18 02:39:37 +00:00
return subtitles
def _get_subtitle_page_soup(self, path, language):
language_path = self._supported_languages_reversed[language]
text = self._safe_get_text(f"{_BASE_URL}{path}/{language_path}")
2022-04-18 02:39:37 +00:00
return bso(text, "html.parser")
def list_subtitles(self, video, languages):
is_episode = isinstance(video, Episode)
if is_episode:
paths = self._search_tv_show_season(video.series, video.season, video.year)
2022-04-18 02:39:37 +00:00
else:
paths = self._search_movie(video.title, video.year)
2022-04-18 02:39:37 +00:00
if not paths:
2022-04-18 02:39:37 +00:00
logger.debug("No results")
return []
languages = set([lang for lang in languages if lang in self.languages])
subs = []
for path in paths:
must_break = False
logger.debug("Looking for subs from %s", path)
for language in languages:
if is_episode:
subs.extend(
self._find_episode_subtitles(
path,
video.season,
video.episode,
language,
video.title,
video.series_imdb_id,
)
)
2022-04-18 02:39:37 +00:00
else:
subs.extend(
self._find_movie_subtitles(path, language, video.imdb_id)
2022-04-18 02:39:37 +00:00
)
must_break = subs != []
if must_break:
logger.debug("Good path found: %s. Not running over others.", path)
break
return subs
2022-04-18 02:39:37 +00:00
def download_subtitle(self, subtitle):
# TODO: add MustGetBlacklisted support
text = self._safe_get_text(subtitle.page_link)
2022-04-18 02:39:37 +00:00
soup = bso(text, "html.parser")
try:
download_url = _BASE_URL + str(
soup.select_one("a[id='downloadButton']")["href"] # type: ignore
)
except (AttributeError, KeyError, TypeError):
2022-04-18 02:39:37 +00:00
raise APIThrottled(f"Couldn't get download url from {subtitle.page_link}")
downloaded = self._session.get(download_url, allow_redirects=True)
2022-04-18 20:58:11 +00:00
archive = get_archive_from_bytes(downloaded.content)
2022-04-18 02:39:37 +00:00
2022-04-18 20:58:11 +00:00
if archive is None:
2022-04-18 02:39:37 +00:00
raise APIThrottled(f"Invalid archive: {subtitle.page_link}")
subtitle.content = get_subtitle_from_archive(
archive,
episode=subtitle.episode_number,
episode_title=subtitle.episode_title,
)
2022-04-18 02:39:37 +00:00
@functools.lru_cache(2048)
def _memoized_episode_guess(content):
# Use include to save time from unnecessary checks
return guessit(
content,
{
"type": "episode",
# Add codec keys to avoid matching x264, 5.1, etc as episode info
"includes": ["season", "episode", "video_codec", "audio_codec"],
"enforce_list": True,
},
)
_EPISODE_SPECIAL_RE = re.compile(
r"(season|s)\s*?(?P<x>\d{,2})\s?[-]\s?(?P<y>\d{,2})", flags=re.IGNORECASE
)
def _match_imdb(soup, imdb_id):
try:
parsed_imdb_id = (
soup.select_one(
"#content > div.subtitles.byFilm > div.box.clearfix > div.top.left > div.header > h2 > a"
)
.get("href") # type: ignore
.split("/")[-1] # type: ignore
.strip()
)
except AttributeError:
logger.debug("Couldn't get IMDB ID")
parsed_imdb_id = None
if parsed_imdb_id is not None and parsed_imdb_id != imdb_id:
logger.debug("Wrong IMDB ID: '%s' != '%s'", parsed_imdb_id, imdb_id)
return False
if parsed_imdb_id is None:
logger.debug("Matching subtitles as IMDB ID was not parsed.")
else:
logger.debug("Good IMDB ID: '%s' == '%s'", parsed_imdb_id, imdb_id)
return True
def _get_episode_from_release(release: str):
match = _EPISODE_SPECIAL_RE.search(release)
if match is None:
return None
try:
season, episode = [int(item) for item in match.group("x", "y")]
except (IndexError, ValueError):
return None
return {"season": [season], "episode": [episode]}
def _get_subtitle_from_item(item, language, episode_number=None):
release_info = [
rel.text.strip() for rel in item.find("ul", {"class": "scrolllist"})
]
try:
text = item.find("div", {"class": "comment-col"}).find("p").text
release_info.append(text.replace("\n", " ").strip())
except AttributeError:
pass
release_info = "\n".join([item for item in release_info if item])
2022-04-18 02:39:37 +00:00
try:
path = item.find("a", {"class": "download icon-download"})["href"] # type: ignore
except (AttributeError, KeyError):
logger.debug("Couldn't get path: %s", item)
return None
return Subf2mSubtitle(language, _BASE_URL + path, release_info, episode_number)