1
0
Fork 0
mirror of https://github.com/morpheus65535/bazarr synced 2024-12-27 18:17:46 +00:00
bazarr/libs/subliminal_patch/providers/argenteam.py

265 lines
9 KiB
Python
Raw Normal View History

2018-10-31 16:08:29 +00:00
# coding=utf-8
2019-09-17 02:04:27 +00:00
from __future__ import absolute_import
2018-10-31 16:08:29 +00:00
import logging
import os
import io
import time
from zipfile import ZipFile
from guessit import guessit
from requests import Session
from subliminal import Episode, Movie
from subliminal.utils import sanitize
2018-10-31 16:08:29 +00:00
from subliminal_patch.providers import Provider
from subliminal_patch.subtitle import Subtitle, guess_matches
from subliminal_patch.providers.mixins import ProviderSubtitleArchiveMixin
from subzero.language import Language
BASE_URL = "https://argenteam.net/"
API_URL = BASE_URL + "api/v1/"
2018-10-31 16:08:29 +00:00
logger = logging.getLogger(__name__)
class ArgenteamSubtitle(Subtitle):
provider_name = "argenteam"
2018-10-31 16:08:29 +00:00
hearing_impaired_verifiable = False
def __init__(self, language, page_link, download_link, release_info, matches):
super(ArgenteamSubtitle, self).__init__(language, page_link=page_link)
self.page_link = page_link
2018-10-31 16:08:29 +00:00
self.download_link = download_link
self.found_matches = matches
self.release_info = release_info
2018-10-31 16:08:29 +00:00
@property
def id(self):
return self.download_link
def get_matches(self, video):
# Download links always have the srt filename with the release info.
# We combine it with the release info as guessit will return the first key match.
new_file = self.download_link.split("/")[-1] + self.release_info
self.found_matches |= guess_matches(video, guessit(new_file))
return self.found_matches
2018-10-31 16:08:29 +00:00
class ArgenteamProvider(Provider, ProviderSubtitleArchiveMixin):
provider_name = "argenteam"
languages = {Language.fromalpha2(l) for l in ["es"]}
2018-10-31 16:08:29 +00:00
video_types = (Episode, Movie)
subtitle_class = ArgenteamSubtitle
hearing_impaired_verifiable = False
language_list = list(languages)
multi_result_throttle = 2 # seconds
def __init__(self):
self.session = None
def initialize(self):
self.session = Session()
self.session.headers = {
"User-Agent": os.environ.get("SZ_USER_AGENT", "Sub-Zero/2")
}
2018-10-31 16:08:29 +00:00
def terminate(self):
self.session.close()
def search_ids(self, title, **kwargs):
2018-10-31 16:08:29 +00:00
query = title
titles = kwargs.get("titles") or []
2018-10-31 16:08:29 +00:00
is_episode = False
if kwargs.get("season") and kwargs.get("episode"):
2018-10-31 16:08:29 +00:00
is_episode = True
query = f"{title} S{kwargs['season']:02}E{kwargs['episode']:02}"
logger.info(f"Searching ID (episode: {is_episode}) for {query}")
2018-10-31 16:08:29 +00:00
r = self.session.get(API_URL + "search", params={"q": query}, timeout=10)
2018-10-31 16:08:29 +00:00
r.raise_for_status()
2018-10-31 16:08:29 +00:00
results = r.json()
match_ids = []
if results["total"] >= 1:
2018-10-31 16:08:29 +00:00
for result in results["results"]:
if (result["type"] == "episode" and not is_episode) or (
result["type"] == "movie" and is_episode
):
2018-10-31 16:08:29 +00:00
continue
# shortcut in case of matching imdb id (don't match NoneType)
if not is_episode and f"tt{result.get('imdb', 'n/a')}" == kwargs.get(
"imdb_id"
):
logger.debug(f"Movie matched by IMDB ID, taking shortcut")
match_ids = [result["id"]]
2018-10-31 16:08:29 +00:00
break
# advanced title check in case of multiple movie results
if results["total"] > 1:
if not is_episode and kwargs.get("year"):
if result["title"] and not (
sanitize(result["title"])
in (
"%s %s" % (sanitize(name), kwargs.get("year"))
for name in titles
)
):
2018-10-31 16:08:29 +00:00
continue
match_ids.append(result["id"])
2018-10-31 16:08:29 +00:00
else:
logger.error(f"No episode ID found for {query}")
2018-10-31 16:08:29 +00:00
if match_ids:
logger.debug(
f"Found matching IDs: {', '.join(str(id) for id in match_ids)}"
)
2018-10-31 16:08:29 +00:00
return match_ids
def get_query_matches(self, video, **kwargs):
matches = set()
if isinstance(video, Episode) and kwargs.get("movie_kind") == "episode":
if video.series and (
sanitize(kwargs.get("title"))
in (
sanitize(name) for name in [video.series] + video.alternative_series
)
):
matches.add("series")
if video.season and kwargs.get("season") == video.season:
matches.add("season")
if video.episode and kwargs.get("episode") == video.episode:
matches.add("episode")
if video.tvdb_id and kwargs.get("tvdb_id") == str(video.tvdb_id):
matches.add("tvdb_id")
# year (year is not available for series, but we assume it matches)
matches.add("year")
elif isinstance(video, Movie) and kwargs.get("movie_kind") == "movie":
if video.title and (
sanitize(kwargs.get("title"))
in (sanitize(name) for name in [video.title] + video.alternative_titles)
):
matches.add("title")
if video.imdb_id and f"tt{kwargs.get('imdb_id')}" == str(video.imdb_id):
matches.add("imdb_id")
if video.year and kwargs.get("year") == video.year:
matches.add("year")
else:
logger.info(f"{kwargs.get('movie_kind')} is not a valid movie_kind")
return matches
def combine_release_info(self, release_dict):
keys = ("source", "codec", "tags", "team")
combine = [release_dict.get(key) for key in keys if release_dict.get(key)]
if combine:
return ".".join(combine)
return "Unknown"
2018-10-31 16:08:29 +00:00
def query(self, title, video, titles=None):
is_episode = isinstance(video, Episode)
season = episode = None
url = API_URL + "movie"
2018-10-31 16:08:29 +00:00
if is_episode:
season = video.season
episode = video.episode
url = API_URL + "episode"
argenteam_ids = self.search_ids(
title, season=season, episode=episode, titles=titles
)
2018-10-31 16:08:29 +00:00
else:
argenteam_ids = self.search_ids(
title, year=video.year, imdb_id=video.imdb_id, titles=titles
)
2018-10-31 16:08:29 +00:00
if not argenteam_ids:
return []
language = self.language_list[0]
subtitles = []
has_multiple_ids = len(argenteam_ids) > 1
for aid in argenteam_ids:
response = self.session.get(url, params={"id": aid}, timeout=10)
2018-10-31 16:08:29 +00:00
response.raise_for_status()
content = response.json()
if not content:
continue
2021-02-23 11:50:36 +00:00
if not content.get("releases"):
continue
imdb_id = year = None
returned_title = title
if not is_episode and "info" in content:
imdb_id = content["info"].get("imdb")
year = content["info"].get("year")
returned_title = content["info"].get("title", title)
for r in content["releases"]:
for s in r["subtitles"]:
movie_kind = "episode" if is_episode else "movie"
page_link = f"{BASE_URL}{movie_kind}/{aid}"
release_info = self.combine_release_info(r)
download_link = s["uri"].replace("http", "https")
matches_ = self.get_query_matches(
video,
movie_kind=movie_kind,
season=season,
episode=episode,
title=returned_title,
year=year,
imdb_id=imdb_id,
tvdb_id=content.get("tvdb"),
)
subtitles.append(
ArgenteamSubtitle(
language,
page_link,
download_link,
release_info,
matches_,
)
)
2018-10-31 16:08:29 +00:00
if has_multiple_ids:
time.sleep(self.multi_result_throttle)
return subtitles
def list_subtitles(self, video, languages):
if isinstance(video, Episode):
2020-10-16 20:36:03 +00:00
titles = [video.series] + video.alternative_series[:2]
2018-10-31 16:08:29 +00:00
else:
2020-10-16 20:36:03 +00:00
titles = [video.title] + video.alternative_titles[:2]
2018-10-31 16:08:29 +00:00
for title in titles:
subs = self.query(title, video, titles=titles)
if subs:
return subs
time.sleep(self.multi_result_throttle)
return []
def download_subtitle(self, subtitle):
# download as a zip
logger.info("Downloading subtitle %r", subtitle)
2018-10-31 16:08:29 +00:00
r = self.session.get(subtitle.download_link, timeout=10)
r.raise_for_status()
# open the zip
with ZipFile(io.BytesIO(r.content)) as zf:
subtitle.content = self.get_subtitle_from_archive(subtitle, zf)