Argenteam provider: fix decoding error

This commit is contained in:
Vitiko 2021-11-30 17:40:51 -04:00
parent e1386aedc0
commit d2bbc479bc
2 changed files with 110 additions and 112 deletions

View File

@ -7,8 +7,7 @@ import io
import time
import urllib.parse
from json.decoder import JSONDecodeError
from simplejson.errors import JSONDecodeError
from zipfile import ZipFile
from guessit import guessit
from requests import Session
@ -19,8 +18,8 @@ from subliminal_patch.subtitle import Subtitle, guess_matches
from subliminal_patch.providers.mixins import ProviderSubtitleArchiveMixin
from subzero.language import Language
BASE_URL = "https://argenteam.net/"
API_URL = BASE_URL + "api/v1/"
BASE_URL = "https://argenteam.net"
API_URL = f"{BASE_URL}/api/v1"
logger = logging.getLogger(__name__)
@ -69,10 +68,9 @@ class ArgenteamProvider(Provider, ProviderSubtitleArchiveMixin):
multi_result_throttle = 2 # seconds
def __init__(self):
self.session = None
self.session = Session()
def initialize(self):
self.session = Session()
self.session.headers.update(
{"User-Agent": os.environ.get("SZ_USER_AGENT", "Sub-Zero/2")}
)
@ -80,118 +78,20 @@ class ArgenteamProvider(Provider, ProviderSubtitleArchiveMixin):
def terminate(self):
self.session.close()
def search_ids(self, title, **kwargs):
query = title
titles = kwargs.get("titles") or []
is_episode = False
if kwargs.get("season") and kwargs.get("episode"):
is_episode = True
query = f"{title} S{kwargs['season']:02}E{kwargs['episode']:02}"
logger.debug(f"Searching ID (episode: {is_episode}) for {query}")
r = self.session.get(API_URL + "search", params={"q": query}, timeout=10)
r.raise_for_status()
try:
results = r.json()
except JSONDecodeError:
return []
if not results.get("results"):
return []
match_ids = []
for result in results["results"]:
if result["type"] == "movie" and is_episode:
continue
imdb = f"tt{result.get('imdb', 'n/a')}"
if not is_episode and imdb == kwargs.get("imdb_id"):
logger.debug("Movie matched by IMDB ID, taking shortcut")
match_ids = [result["id"]]
break
# advanced title check in case of multiple movie results
title_year = kwargs.get("year") and kwargs.get("title")
if results["total"] > 1 and not is_episode and title_year:
sanitized = sanitize(result["title"])
titles = [f"{sanitize(name)} {kwargs['year']}" for name in titles]
if sanitized not in titles:
continue
match_ids.append(result["id"])
if match_ids:
ids = ", ".join(str(id) for id in match_ids)
logger.debug("Found matching IDs: %s", ids)
else:
logger.debug("Nothing found from %s query", query)
return match_ids
def get_query_matches(self, video, **kwargs):
matches = set()
if isinstance(video, Episode) and kwargs.get("movie_kind") == "episode":
if video.series and (
sanitize(kwargs.get("title"))
in (
sanitize(name) for name in [video.series] + video.alternative_series
)
):
matches.add("series")
if video.season and kwargs.get("season") == video.season:
matches.add("season")
if video.episode and kwargs.get("episode") == video.episode:
matches.add("episode")
if video.tvdb_id and kwargs.get("tvdb_id") == str(video.tvdb_id):
matches.add("tvdb_id")
# year (year is not available for series, but we assume it matches)
matches.add("year")
elif isinstance(video, Movie) and kwargs.get("movie_kind") == "movie":
if video.title and (
sanitize(kwargs.get("title"))
in (sanitize(name) for name in [video.title] + video.alternative_titles)
):
matches.add("title")
if video.imdb_id and f"tt{kwargs.get('imdb_id')}" == str(video.imdb_id):
matches.add("imdb_id")
if video.year and kwargs.get("year") == video.year:
matches.add("year")
else:
logger.info(f"{kwargs.get('movie_kind')} is not a valid movie_kind")
return matches
def combine_release_info(self, release_dict):
keys = ("source", "codec", "tags", "team")
combine = [release_dict.get(key) for key in keys if release_dict.get(key)]
if combine:
return ".".join(combine)
return "Unknown"
def query(self, title, video, titles=None):
is_episode = isinstance(video, Episode)
season = episode = None
url = API_URL + "movie"
url = f"{API_URL}/movie"
if is_episode:
season = video.season
episode = video.episode
url = API_URL + "episode"
argenteam_ids = self.search_ids(
url = f"{API_URL}/episode"
argenteam_ids = self._search_ids(
title, season=season, episode=episode, titles=titles
)
else:
argenteam_ids = self.search_ids(
argenteam_ids = self._search_ids(
title, year=video.year, imdb_id=video.imdb_id, titles=titles
)
@ -223,11 +123,11 @@ class ArgenteamProvider(Provider, ProviderSubtitleArchiveMixin):
for r in content["releases"]:
for s in r["subtitles"]:
movie_kind = "episode" if is_episode else "movie"
page_link = f"{BASE_URL}{movie_kind}/{aid}"
release_info = self.combine_release_info(r)
page_link = f"{BASE_URL}/{movie_kind}/{aid}"
release_info = self._combine_release_info(r)
download_link = s["uri"].replace("http://", "https://")
matches_ = self.get_query_matches(
matches_ = self._get_query_matches(
video,
movie_kind=movie_kind,
season=season,
@ -275,3 +175,101 @@ class ArgenteamProvider(Provider, ProviderSubtitleArchiveMixin):
# open the zip
with ZipFile(io.BytesIO(r.content)) as zf:
subtitle.content = self.get_subtitle_from_archive(subtitle, zf)
def _search_ids(self, title, **kwargs):
query = title
titles = kwargs.get("titles") or []
is_episode = False
if kwargs.get("season") and kwargs.get("episode"):
is_episode = True
query = f"{title} S{kwargs['season']:02}E{kwargs['episode']:02}"
logger.debug(f"Searching ID (episode: {is_episode}) for {query}")
r = self.session.get(f"{API_URL}/search", params={"q": query}, timeout=10)
r.raise_for_status()
try:
results = r.json()
except JSONDecodeError:
return []
if not results.get("results"):
return []
match_ids = []
for result in results["results"]:
if result["type"] == "movie" and is_episode:
continue
imdb = f"tt{result.get('imdb', 'n/a')}"
if not is_episode and imdb == kwargs.get("imdb_id"):
logger.debug("Movie matched by IMDB ID, taking shortcut")
match_ids = [result["id"]]
break
# advanced title check in case of multiple movie results
title_year = kwargs.get("year") and kwargs.get("title")
if results["total"] > 1 and not is_episode and title_year:
sanitized = sanitize(result["title"])
titles = [f"{sanitize(name)} {kwargs['year']}" for name in titles]
if sanitized not in titles:
continue
match_ids.append(result["id"])
if match_ids:
ids = ", ".join(str(id) for id in match_ids)
logger.debug("Found matching IDs: %s", ids)
else:
logger.debug("Nothing found from %s query", query)
return match_ids
def _get_query_matches(self, video, **kwargs):
matches = set()
if isinstance(video, Episode) and kwargs.get("movie_kind") == "episode":
if video.series and (
sanitize(kwargs.get("title"))
in (
sanitize(name) for name in [video.series] + video.alternative_series
)
):
matches.add("series")
if video.season and kwargs.get("season") == video.season:
matches.add("season")
if video.episode and kwargs.get("episode") == video.episode:
matches.add("episode")
if video.tvdb_id and kwargs.get("tvdb_id") == str(video.tvdb_id):
matches.add("tvdb_id")
# year (year is not available for series, but we assume it matches)
matches.add("year")
elif isinstance(video, Movie) and kwargs.get("movie_kind") == "movie":
if video.title and (
sanitize(kwargs.get("title"))
in (sanitize(name) for name in [video.title] + video.alternative_titles)
):
matches.add("title")
if video.imdb_id and f"tt{kwargs.get('imdb_id')}" == str(video.imdb_id):
matches.add("imdb_id")
if video.year and kwargs.get("year") == video.year:
matches.add("year")
else:
logger.info(f"{kwargs.get('movie_kind')} is not a valid movie_kind")
return matches
def _combine_release_info(self, release_dict):
keys = ("source", "codec", "tags", "team")
combine = [release_dict.get(key) for key in keys if release_dict.get(key)]
if combine:
return ".".join(combine)
return "Unknown"

View File

@ -44,7 +44,7 @@ ftfy_defaults = {
class Subtitle(Subtitle_):
storage_path = None
release_info = None
matches = None
matches = {}
hash_verifiable = False
hearing_impaired_verifiable = False
mods = None