mirror of https://github.com/morpheus65535/bazarr
501 lines
17 KiB
Python
501 lines
17 KiB
Python
# -*- coding: utf-8 -*-
|
|
import io
|
|
import logging
|
|
import os
|
|
import json
|
|
|
|
from subzero.language import Language
|
|
from guessit import guessit
|
|
from requests import Session
|
|
|
|
from subliminal.providers import ParserBeautifulSoup
|
|
from subliminal_patch.providers import Provider
|
|
from subliminal_patch.subtitle import Subtitle
|
|
from subliminal.subtitle import fix_line_ending
|
|
from subliminal import __short_version__
|
|
from subliminal.cache import SHOW_EXPIRATION_TIME, region
|
|
from subliminal.exceptions import AuthenticationError, ConfigurationError
|
|
from subliminal_patch.subtitle import guess_matches
|
|
from subliminal_patch.utils import sanitize
|
|
from subliminal.video import Episode, Movie
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class KtuvitSubtitle(Subtitle):
|
|
"""Ktuvit Subtitle."""
|
|
|
|
provider_name = "ktuvit"
|
|
|
|
def __init__(
|
|
self,
|
|
language,
|
|
hearing_impaired,
|
|
page_link,
|
|
series,
|
|
season,
|
|
episode,
|
|
title,
|
|
imdb_id,
|
|
ktuvit_id,
|
|
subtitle_id,
|
|
release,
|
|
):
|
|
super(KtuvitSubtitle, self).__init__(language, hearing_impaired, page_link)
|
|
self.series = series
|
|
self.season = season
|
|
self.episode = episode
|
|
self.title = title
|
|
self.imdb_id = imdb_id
|
|
self.ktuvit_id = ktuvit_id
|
|
self.subtitle_id = subtitle_id
|
|
self.release = release
|
|
|
|
def __repr__(self):
|
|
return "<%s [%s] %r [%s:%s]>" % (
|
|
self.__class__.__name__,
|
|
self.subtitle_id,
|
|
self.page_link,
|
|
self.language,
|
|
self._guessed_encoding,
|
|
)
|
|
|
|
@property
|
|
def id(self):
|
|
return str(self.subtitle_id)
|
|
|
|
@property
|
|
def release_info(self):
|
|
return self.release
|
|
|
|
def get_matches(self, video):
|
|
matches = set()
|
|
# episode
|
|
if isinstance(video, Episode):
|
|
# series
|
|
if video.series and (
|
|
sanitize(self.title)
|
|
in (
|
|
sanitize(name) for name in [video.series] + video.alternative_series
|
|
)
|
|
):
|
|
matches.add("series")
|
|
# season
|
|
if video.season and self.season == video.season:
|
|
matches.add("season")
|
|
# episode
|
|
if video.episode and self.episode == video.episode:
|
|
matches.add("episode")
|
|
# imdb_id
|
|
if video.series_imdb_id and self.imdb_id == video.series_imdb_id:
|
|
matches.add("series_imdb_id")
|
|
# guess
|
|
matches |= guess_matches(video, guessit(self.release, {"type": "episode"}))
|
|
# movie
|
|
elif isinstance(video, Movie):
|
|
# guess
|
|
matches |= guess_matches(video, guessit(self.release, {"type": "movie"}))
|
|
|
|
# title
|
|
if video.title and (
|
|
sanitize(self.title)
|
|
in (sanitize(name) for name in [video.title] + video.alternative_titles)
|
|
):
|
|
matches.add("title")
|
|
|
|
return matches
|
|
|
|
|
|
class KtuvitProvider(Provider):
|
|
"""Ktuvit Provider."""
|
|
|
|
languages = {Language(l) for l in ["heb"]}
|
|
server_url = "https://www.ktuvit.me/"
|
|
sign_in_url = "Services/MembershipService.svc/Login"
|
|
search_url = "Services/ContentProvider.svc/SearchPage_search"
|
|
movie_info_url = "MovieInfo.aspx?ID="
|
|
episode_info_url = "Services/GetModuleAjax.ashx?"
|
|
request_download_id_url = "Services/ContentProvider.svc/RequestSubtitleDownload"
|
|
download_link = "Services/DownloadFile.ashx?DownloadIdentifier="
|
|
subtitle_class = KtuvitSubtitle
|
|
no_subtitle_str = 'אין כתוביות'
|
|
|
|
_tmdb_api_key = "a51ee051bcd762543373903de296e0a3"
|
|
|
|
def __init__(self, email=None, hashed_password=None):
|
|
if any((email, hashed_password)) and not all((email, hashed_password)):
|
|
raise ConfigurationError("Email and Hashed Password must be specified")
|
|
|
|
self.email = email
|
|
self.hashed_password = hashed_password
|
|
self.logged_in = False
|
|
self.session = None
|
|
self.login_cookie = None
|
|
|
|
def initialize(self):
|
|
self.session = Session()
|
|
|
|
# login
|
|
if self.email and self.hashed_password:
|
|
logger.info("Logging in")
|
|
|
|
data = {"request": {"Email": self.email, "Password": self.hashed_password}}
|
|
|
|
self.session.headers["Accept-Encoding"] = "gzip"
|
|
self.session.headers["Accept-Language"] = "en-us,en;q=0.5"
|
|
self.session.headers["Pragma"] = "no-cache"
|
|
self.session.headers["Cache-Control"] = "no-cache"
|
|
self.session.headers["Content-Type"] = "application/json"
|
|
self.session.headers["User-Agent"]: os.environ.get(
|
|
"SZ_USER_AGENT", "Sub-Zero/2"
|
|
)
|
|
|
|
r = self.session.post(
|
|
self.server_url + self.sign_in_url,
|
|
json=data,
|
|
allow_redirects=False,
|
|
timeout=10,
|
|
)
|
|
|
|
if r.content:
|
|
is_success = False
|
|
try:
|
|
is_success = self.parse_d_response(
|
|
r, "IsSuccess", False, "Authentication to the provider"
|
|
)
|
|
except json.decoder.JSONDecodeError:
|
|
logger.info("Failed to Login to Ktuvit")
|
|
if not is_success:
|
|
error_message = ''
|
|
try:
|
|
error_message = self.parse_d_response(r, "ErrorMessage", "[None]")
|
|
except json.decode.JSONDecoderError:
|
|
raise AuthenticationError(
|
|
"Error Logging in to Ktuvit Provider: " + str(r.content)
|
|
)
|
|
raise AuthenticationError(
|
|
"Error Logging in to Ktuvit Provider: " + error_message
|
|
)
|
|
else:
|
|
cookie_split = r.headers["set-cookie"].split("Login=")
|
|
if len(cookie_split) != 2:
|
|
self.logged_in = False
|
|
raise AuthenticationError(
|
|
"Login Failed, didn't receive valid cookie in response"
|
|
)
|
|
|
|
self.login_cookie = cookie_split[1].split(";")[0]
|
|
logger.debug("Logged in with cookie: " + self.login_cookie)
|
|
|
|
self.logged_in = True
|
|
|
|
def terminate(self):
|
|
self.session.close()
|
|
|
|
@region.cache_on_arguments(expiration_time=SHOW_EXPIRATION_TIME)
|
|
def _search_imdb_id(self, title, year, is_movie):
|
|
"""Search the IMDB ID for the given `title` and `year`.
|
|
|
|
:param str title: title to search for.
|
|
:param int year: year to search for (or 0 if not relevant).
|
|
:param bool is_movie: If True, IMDB ID will be searched for in TMDB instead of Wizdom.
|
|
:return: the IMDB ID for the given title and year (or None if not found).
|
|
:rtype: str
|
|
|
|
"""
|
|
# make the search
|
|
logger.info(
|
|
"Searching IMDB ID for %r%r",
|
|
title,
|
|
"" if not year else " ({})".format(year),
|
|
)
|
|
category = "movie" if is_movie else "tv"
|
|
title = title.replace("'", "")
|
|
# get TMDB ID first
|
|
r = self.session.get(
|
|
"http://api.tmdb.org/3/search/{}?api_key={}&query={}{}&language=en".format(
|
|
category,
|
|
self._tmdb_api_key,
|
|
title,
|
|
"" if not year else "&year={}".format(year),
|
|
)
|
|
)
|
|
r.raise_for_status()
|
|
tmdb_results = r.json().get("results")
|
|
if tmdb_results:
|
|
tmdb_id = tmdb_results[0].get("id")
|
|
if tmdb_id:
|
|
# get actual IMDB ID from TMDB
|
|
r = self.session.get(
|
|
"http://api.tmdb.org/3/{}/{}{}?api_key={}&language=en".format(
|
|
category,
|
|
tmdb_id,
|
|
"" if is_movie else "/external_ids",
|
|
self._tmdb_api_key,
|
|
)
|
|
)
|
|
r.raise_for_status()
|
|
imdb_id = r.json().get("imdb_id")
|
|
if imdb_id:
|
|
return str(imdb_id)
|
|
else:
|
|
return None
|
|
return None
|
|
|
|
def query(
|
|
self, title, season=None, episode=None, year=None, filename=None, imdb_id=None
|
|
):
|
|
if not self.logged_in:
|
|
logger.info("Not logged in to Ktuvit. Returning 0 results")
|
|
return {}
|
|
|
|
# search for the IMDB ID if needed.
|
|
is_movie = not (season and episode)
|
|
imdb_id = imdb_id or self._search_imdb_id(title, year, is_movie)
|
|
if not imdb_id:
|
|
return {}
|
|
|
|
# search
|
|
logger.debug("Using IMDB ID %r", imdb_id)
|
|
|
|
query = {
|
|
"FilmName": title,
|
|
"Actors": [],
|
|
"Studios": [],
|
|
"Directors": [],
|
|
"Genres": [],
|
|
"Countries": [],
|
|
"Languages": [],
|
|
"Year": "",
|
|
"Rating": [],
|
|
"Page": 1,
|
|
"SearchType": "0",
|
|
"WithSubsOnly": False,
|
|
}
|
|
|
|
if not is_movie:
|
|
query["SearchType"] = "1"
|
|
|
|
if year:
|
|
query["Year"] = year
|
|
|
|
# get the list of subtitles
|
|
logger.debug("Getting the list of subtitles")
|
|
|
|
url = self.server_url + self.search_url
|
|
logger.debug("Calling URL: {} with request: {}".format(url, str({"request": query})))
|
|
|
|
r = self.session.post(url, json={"request": query}, timeout=10)
|
|
r.raise_for_status()
|
|
|
|
if r.content:
|
|
results = self.parse_d_response(r, "Films", [], "Films/Series Information")
|
|
else:
|
|
return {}
|
|
|
|
# loop over results
|
|
subtitles = {}
|
|
for result in results:
|
|
imdb_link = result["IMDB_Link"]
|
|
imdb_link = imdb_link[0:-1] if imdb_link.endswith("/") else imdb_link
|
|
results_imdb_id = imdb_link.split("/")[-1]
|
|
|
|
if results_imdb_id != imdb_id:
|
|
logger.debug(
|
|
"Subtitles is for IMDB %r but actual IMDB ID is %r",
|
|
results_imdb_id,
|
|
imdb_id,
|
|
)
|
|
continue
|
|
|
|
language = Language("heb")
|
|
hearing_impaired = False
|
|
ktuvit_id = result["ID"]
|
|
page_link = self.server_url + self.movie_info_url + ktuvit_id
|
|
|
|
if is_movie:
|
|
subs = self._search_movie(ktuvit_id)
|
|
else:
|
|
subs = self._search_tvshow(ktuvit_id, season, episode)
|
|
|
|
logger.debug('Got {} Subs from Ktuvit'.format(len(subs)))
|
|
for sub in subs:
|
|
# otherwise create it
|
|
subtitle = KtuvitSubtitle(
|
|
language,
|
|
hearing_impaired,
|
|
page_link,
|
|
title,
|
|
season,
|
|
episode,
|
|
title,
|
|
imdb_id,
|
|
ktuvit_id,
|
|
sub["sub_id"],
|
|
sub["rls"],
|
|
)
|
|
logger.debug("Found subtitle %r", subtitle)
|
|
subtitles[sub["sub_id"]] = subtitle
|
|
|
|
return subtitles.values()
|
|
|
|
def _search_tvshow(self, id, season, episode):
|
|
subs = []
|
|
|
|
url = (
|
|
self.server_url
|
|
+ self.episode_info_url
|
|
+ "moduleName=SubtitlesList&SeriesID={}&Season={}&Episode={}".format(
|
|
id, season, episode
|
|
)
|
|
)
|
|
r = self.session.get(url, timeout=10)
|
|
r.raise_for_status()
|
|
|
|
if len(r.content) < 10:
|
|
logger.debug("Too short content-length in response: [{}]. Treating as No Subtitles Found ".format(str(r.content)))
|
|
return []
|
|
|
|
sub_list = ParserBeautifulSoup(r.content, ["html.parser"])
|
|
sub_rows = sub_list("tr")
|
|
|
|
if sub_list.find("tr") and sub_list.find("tr").find("td") and sub_list.find("tr").find("td").get_text() == self.no_subtitle_str:
|
|
logger.debug("No Subtitles Found. URL " + url)
|
|
return subs
|
|
|
|
for row in sub_rows:
|
|
columns = row.find_all("td")
|
|
sub = {"id": id}
|
|
|
|
for index, column in enumerate(columns):
|
|
if index == 0:
|
|
sub["rls"] = column.get_text().strip().split("\n")[0]
|
|
if index == 5:
|
|
sub["sub_id"] = column.find("input", attrs={"data-sub-id": True})[
|
|
"data-sub-id"
|
|
]
|
|
|
|
if 'sub_id' in sub:
|
|
subs.append(sub)
|
|
return subs
|
|
|
|
def _search_movie(self, movie_id):
|
|
subs = []
|
|
url = self.server_url + self.movie_info_url + movie_id
|
|
r = self.session.get(url, timeout=10)
|
|
r.raise_for_status()
|
|
|
|
if len(r.content) < 10:
|
|
logger.debug("Too short content-length in response: [{}]. Treating as No Subtitles Found ".format(str(r.content)))
|
|
return []
|
|
|
|
html = ParserBeautifulSoup(r.content, ["html.parser"])
|
|
sub_rows = html.select("table#subtitlesList tbody > tr")
|
|
|
|
for row in sub_rows:
|
|
columns = row.find_all("td")
|
|
sub = {"id": movie_id}
|
|
for index, column in enumerate(columns):
|
|
if index == 0:
|
|
sub["rls"] = column.get_text().strip().split("\n")[0]
|
|
if index == 5:
|
|
sub["sub_id"] = column.find("a", attrs={"data-subtitle-id": True})[
|
|
"data-subtitle-id"
|
|
]
|
|
|
|
if 'sub_id' in sub:
|
|
subs.append(sub)
|
|
return subs
|
|
|
|
def list_subtitles(self, video, languages):
|
|
season = episode = None
|
|
year = video.year
|
|
filename = video.name
|
|
|
|
if isinstance(video, Episode):
|
|
titles = [video.series] + video.alternative_series
|
|
season = video.season
|
|
episode = video.episode
|
|
imdb_id = video.series_imdb_id
|
|
else:
|
|
titles = [video.title] + video.alternative_titles
|
|
imdb_id = video.imdb_id
|
|
|
|
for title in titles:
|
|
subtitles = [
|
|
s
|
|
for s in self.query(title, season, episode, year, filename, imdb_id)
|
|
if s.language in languages
|
|
]
|
|
if subtitles:
|
|
return subtitles
|
|
|
|
return []
|
|
|
|
def download_subtitle(self, subtitle):
|
|
if isinstance(subtitle, KtuvitSubtitle):
|
|
download_identifier_request = {
|
|
"FilmID": subtitle.ktuvit_id,
|
|
"SubtitleID": subtitle.subtitle_id,
|
|
"FontSize": 0,
|
|
"FontColor": "",
|
|
"PredefinedLayout": -1,
|
|
}
|
|
|
|
logger.debug(
|
|
"Download Identifier Request data: "
|
|
+ str(json.dumps({"request": download_identifier_request}))
|
|
)
|
|
|
|
# download
|
|
url = self.server_url + self.request_download_id_url
|
|
r = self.session.post(
|
|
url, json={"request": download_identifier_request}, timeout=10
|
|
)
|
|
r.raise_for_status()
|
|
|
|
if r.content:
|
|
download_identifier = self.parse_d_response(r, "DownloadIdentifier")
|
|
|
|
url = self.server_url + self.download_link + download_identifier
|
|
|
|
r = self.session.get(url, timeout=10)
|
|
r.raise_for_status()
|
|
|
|
if not r.content:
|
|
logger.debug(
|
|
"Unable to download subtitle. No data returned from provider"
|
|
)
|
|
return
|
|
|
|
subtitle.content = fix_line_ending(r.content)
|
|
|
|
def parse_d_response(self, response, field, default_value=None, message=None):
|
|
message = message if message else field
|
|
|
|
try:
|
|
response_content = response.json()
|
|
except json.decoder.JSONDecodeError as ex:
|
|
raise json.decoder.JSONDecodeError(
|
|
"Unable to parse JSON returned while getting " + message, ex.doc, ex.pos
|
|
)
|
|
else:
|
|
# kept for manual debugging when needed:
|
|
# logger.debug("Parsing d response_content: " + str(response_content))
|
|
|
|
if "d" in response_content:
|
|
response_content = json.loads(response_content["d"])
|
|
value = response_content.get(field, default_value)
|
|
|
|
if not value and value != default_value:
|
|
raise json.decoder.JSONDecodeError(
|
|
"Missing " + message, str(response_content), 0
|
|
)
|
|
else:
|
|
raise json.decoder.JSONDecodeError(
|
|
"Incomplete JSON returned while getting " + message,
|
|
str(response_content),
|
|
0
|
|
)
|
|
return value
|