# -*- coding: utf-8 -*- import io import logging import os import zipfile from subzero.language import Language from guessit import guessit from requests import Session from subliminal_patch.providers import Provider from subliminal_patch.subtitle import Subtitle from subliminal.subtitle import fix_line_ending from subliminal import __short_version__ from subliminal.cache import SHOW_EXPIRATION_TIME, region from subliminal_patch.exceptions import ProviderError from subliminal_patch.subtitle import guess_matches from subliminal_patch.utils import sanitize from subliminal.video import Episode, Movie logger = logging.getLogger(__name__) class WizdomSubtitle(Subtitle): """Wizdom Subtitle.""" provider_name = 'wizdom' def __init__(self, language, hearing_impaired, page_link, series, season, episode, title, imdb_id, subtitle_id, release): super(WizdomSubtitle, self).__init__(language, hearing_impaired, page_link) self.series = series self.season = season self.episode = episode self.title = title self.imdb_id = imdb_id self.subtitle_id = subtitle_id self.release = release self.release_info = release @property def id(self): return str(self.subtitle_id) def get_matches(self, video): matches = set() # episode if isinstance(video, Episode): # series if video.series and (sanitize(self.title) in ( sanitize(name) for name in [video.series] + video.alternative_series)): matches.add('series') # season if video.season and self.season == video.season: matches.add('season') # episode if video.episode and self.episode == video.episode: matches.add('episode') # imdb_id if video.series_imdb_id and self.imdb_id == video.series_imdb_id: matches.add('series_imdb_id') # guess matches |= guess_matches(video, guessit(self.release, {'type': 'episode'})) # movie elif isinstance(video, Movie): # guess matches |= guess_matches(video, guessit(self.release, {'type': 'movie'})) # title if video.title and (sanitize(self.title) in ( sanitize(name) for name in [video.title] + video.alternative_titles)): matches.add('title') return matches class WizdomProvider(Provider): """Wizdom Provider.""" languages = {Language(l) for l in ['heb']} video_types = (Episode, Movie) server_url = 'wizdom.xyz' _tmdb_api_key = 'a51ee051bcd762543373903de296e0a3' def __init__(self): self.session = None def initialize(self): self.session = Session() self.session.headers['User-Agent'] = 'Subliminal/{}'.format(__short_version__) def terminate(self): self.session.close() @region.cache_on_arguments(expiration_time=SHOW_EXPIRATION_TIME) def _search_imdb_id(self, title, year, is_movie): """Search the IMDB ID for the given `title` and `year`. :param str title: title to search for. :param int year: year to search for (or 0 if not relevant). :param bool is_movie: If True, IMDB ID will be searched for in TMDB instead of Wizdom. :return: the IMDB ID for the given title and year (or None if not found). :rtype: str """ # make the search logger.info('Searching IMDB ID for %r%r', title, '' if not year else ' ({})'.format(year)) category = 'movie' if is_movie else 'tv' title = title.replace('\'', '') # get TMDB ID first r = self.session.get('http://api.tmdb.org/3/search/{}?api_key={}&query={}{}&language=en'.format( category, self._tmdb_api_key, title, '' if not year else '&year={}'.format(year))) r.raise_for_status() tmdb_results = r.json().get('results') if tmdb_results: tmdb_id = tmdb_results[0].get('id') if tmdb_id: # get actual IMDB ID from TMDB r = self.session.get('http://api.tmdb.org/3/{}/{}{}?api_key={}&language=en'.format( category, tmdb_id, '' if is_movie else '/external_ids', self._tmdb_api_key)) r.raise_for_status() imdb_id = r.json().get('imdb_id') if imdb_id: return str(imdb_id) else: return None return None def query(self, title, season=None, episode=None, year=None, filename=None, imdb_id=None): # search for the IMDB ID if needed. is_movie = not (season and episode) imdb_id = imdb_id or self._search_imdb_id(title, year, is_movie) if not imdb_id: return {} # search logger.debug('Using IMDB ID %r', imdb_id) url = 'https://{}/api/releases/{}'.format(self.server_url, imdb_id) page_link = 'http://{}/#/{}/{}'.format(self.server_url, 'movies' if is_movie else 'series', imdb_id) # get the list of subtitles logger.debug('Getting the list of subtitles') r = self.session.get(url) if r.status_code == 500: logger.debug(f'No subtitles found for imdb id {imdb_id}') return [] r.raise_for_status() try: results = r.json() except ValueError: return {} # filter irrelevant results if not is_movie: results = results.get('subs', []) # there are two formats of result jsons - seasons list and seasons dict if isinstance(results, list): results = results[season] if len(results) >= season else {} else: results = results.get(str(season), {}) results = results.get(str(episode), []) else: results = results.get('subs', []) # loop over results subtitles = {} for result in results: language = Language('heb') hearing_impaired = False subtitle_id = result['id'] release = result['version'] # otherwise create it subtitle = WizdomSubtitle(language, hearing_impaired, page_link, title, season, episode, title, imdb_id, subtitle_id, release) logger.debug('Found subtitle %r', subtitle) subtitles[subtitle_id] = subtitle return subtitles.values() def list_subtitles(self, video, languages): season = episode = None year = video.year filename = video.name imdb_id = video.imdb_id if isinstance(video, Episode): titles = [video.series] + video.alternative_series season = video.season episode = video.episode imdb_id = video.series_imdb_id else: titles = [video.title] + video.alternative_titles imdb_id = video.imdb_id for title in titles: subtitles = [s for s in self.query(title, season, episode, year, filename, imdb_id) if s.language in languages] if subtitles: return subtitles return [] def download_subtitle(self, subtitle): # download url = 'http://zip.{}/{}.zip'.format(self.server_url, subtitle.subtitle_id) r = self.session.get(url, headers={'Referer': subtitle.page_link}, timeout=10) r.raise_for_status() if len(r.content) == 0: return # open the zip with zipfile.ZipFile(io.BytesIO(r.content)) as zf: # remove some filenames from the namelist namelist = [n for n in zf.namelist() if os.path.splitext(n)[1] in ['.srt', '.sub']] if len(namelist) > 1: raise ProviderError('More than one file to unzip') subtitle.content = fix_line_ending(zf.read(namelist[0]))