bazarr/libs/subliminal_patch/providers/argenteam.py

194 lines
5.7 KiB
Python
Raw Normal View History

2018-10-31 16:08:29 +00:00
# coding=utf-8
2019-09-17 02:04:27 +00:00
from __future__ import absolute_import
from json import JSONDecodeError
2018-10-31 16:08:29 +00:00
import logging
import os
import urllib.parse
2018-10-31 16:08:29 +00:00
from requests import Session
from subliminal import Episode
from subliminal import Movie
2018-10-31 16:08:29 +00:00
from subliminal_patch.providers import Provider
from subliminal_patch.providers.mixins import ProviderSubtitleArchiveMixin
from subliminal_patch.providers.utils import get_archive_from_bytes
from subliminal_patch.providers.utils import get_subtitle_from_archive
from subliminal_patch.providers.utils import update_matches
from subliminal_patch.subtitle import Subtitle
2018-10-31 16:08:29 +00:00
from subzero.language import Language
2021-11-30 21:40:51 +00:00
BASE_URL = "https://argenteam.net"
API_URL = f"{BASE_URL}/api/v1"
2018-10-31 16:08:29 +00:00
logger = logging.getLogger(__name__)
class ArgenteamSubtitle(Subtitle):
provider_name = "argenteam"
2018-10-31 16:08:29 +00:00
hearing_impaired_verifiable = False
def __init__(self, language, page_link, download_link, release_info, matches):
super(ArgenteamSubtitle, self).__init__(language, page_link=page_link)
self._found_matches = matches
self.page_link = page_link
2018-10-31 16:08:29 +00:00
self.download_link = download_link
self.release_info = release_info
2018-10-31 16:08:29 +00:00
@property
def id(self):
return self.download_link
def get_matches(self, video):
update_matches(self._found_matches, video, self.release_info)
return self._found_matches
2018-10-31 16:08:29 +00:00
class ArgenteamProvider(Provider, ProviderSubtitleArchiveMixin):
provider_name = "argenteam"
languages = {Language("spa", "MX")}
2018-10-31 16:08:29 +00:00
video_types = (Episode, Movie)
subtitle_class = ArgenteamSubtitle
_default_lang = Language("spa", "MX")
2018-10-31 16:08:29 +00:00
def __init__(self):
2021-11-30 21:40:51 +00:00
self.session = Session()
2018-10-31 16:08:29 +00:00
def initialize(self):
self.session.headers.update(
{"User-Agent": os.environ.get("SZ_USER_AGENT", "Sub-Zero/2")}
)
2018-10-31 16:08:29 +00:00
def terminate(self):
self.session.close()
def query(self, video):
2021-11-30 21:40:51 +00:00
is_episode = isinstance(video, Episode)
imdb_id = video.series_imdb_id if is_episode else video.imdb_id
if not imdb_id:
logger.debug("%s doesn't have IMDB ID. Can't search")
return []
2021-11-30 21:40:51 +00:00
if is_episode:
argenteam_ids = self._search_ids(
imdb_id, season=video.season, episode=video.episode
2021-11-30 21:40:51 +00:00
)
else:
argenteam_ids = self._search_ids(imdb_id)
2021-11-30 21:40:51 +00:00
if not argenteam_ids:
logger.debug("No IDs found")
2021-11-30 21:40:51 +00:00
return []
return self._parse_subtitles(argenteam_ids, is_episode)
def _parse_subtitles(self, ids, is_episode=True):
movie_kind = "episode" if is_episode else "movie"
2021-11-30 21:40:51 +00:00
subtitles = []
for aid in ids:
response = self.session.get(
f"{API_URL}/{movie_kind}", params={"id": aid}, timeout=10
)
2021-11-30 21:40:51 +00:00
response.raise_for_status()
try:
content = response.json()
except JSONDecodeError:
continue
if not content or not content.get("releases"):
continue
for r in content["releases"]:
for s in r["subtitles"]:
page_link = f"{BASE_URL}/{movie_kind}/{aid}"
release_info = self._combine_release_info(r, s)
logger.debug("Got release info: %s", release_info)
2021-11-30 21:40:51 +00:00
download_link = s["uri"].replace("http://", "https://")
# Already matched within query
if is_episode:
matches = {"series", "title", "season", "episode", "imdb_id", "year"}
else:
matches = {"title", "year", "imdb_id"}
subtitles.append(
ArgenteamSubtitle(
self._default_lang,
page_link,
download_link,
release_info,
matches,
)
)
2021-11-30 21:40:51 +00:00
return subtitles
def list_subtitles(self, video, languages):
return self.query(video)
2021-11-30 21:40:51 +00:00
def download_subtitle(self, subtitle):
r = self.session.get(subtitle.download_link, timeout=10)
r.raise_for_status()
archive = get_archive_from_bytes(r.content)
subtitle.content = get_subtitle_from_archive(archive)
2021-11-30 21:40:51 +00:00
def _search_ids(self, identifier, **kwargs):
"""
:param identifier: imdb_id or title (without year)
"""
identifier = identifier.lstrip("tt")
2018-10-31 16:08:29 +00:00
query = identifier
if kwargs.get("season") and kwargs.get("episode"):
query = f"{identifier} S{kwargs['season']:02}E{kwargs['episode']:02}"
logger.debug("Searching ID for %s", query)
2018-10-31 16:08:29 +00:00
2021-11-30 21:40:51 +00:00
r = self.session.get(f"{API_URL}/search", params={"q": query}, timeout=10)
2018-10-31 16:08:29 +00:00
r.raise_for_status()
try:
results = r.json()
except JSONDecodeError:
return []
if not results.get("results"):
return []
match_ids = [result["id"] for result in results["results"]]
logger.debug("Found matching IDs: %s", match_ids)
2018-10-31 16:08:29 +00:00
return match_ids
def _combine_release_info(self, release_dict, subtitle_dict):
releases = [
urllib.parse.unquote(subtitle_dict.get("uri", "Unknown").split("/")[-1])
]
combine = [
release_dict.get(key)
for key in ("source", "codec", "tags")
if release_dict.get(key)
]
if combine:
r_info = ".".join(combine)
if release_dict.get("team"):
r_info += f"-{release_dict['team']}"
releases.append(r_info)
return "\n".join(releases)