mirror of
https://github.com/morpheus65535/bazarr
synced 2024-12-30 19:46:25 +00:00
253 lines
9.4 KiB
Python
253 lines
9.4 KiB
Python
# coding=utf-8
|
|
|
|
import io
|
|
import logging
|
|
import os
|
|
import time
|
|
import inflect
|
|
import cfscrape
|
|
|
|
from random import randint
|
|
from zipfile import ZipFile
|
|
|
|
from babelfish import language_converters
|
|
from guessit import guessit
|
|
from requests import Session
|
|
from dogpile.cache.api import NO_VALUE
|
|
from subliminal import Episode, ProviderError
|
|
from subliminal.cache import region
|
|
from subliminal.utils import sanitize_release_group
|
|
from subliminal_patch.providers import Provider
|
|
from subliminal_patch.providers.mixins import ProviderSubtitleArchiveMixin
|
|
from subliminal_patch.subtitle import Subtitle, guess_matches
|
|
from subliminal_patch.converters.subscene import language_ids, supported_languages
|
|
from subscene_api.subscene import search, Subtitle as APISubtitle
|
|
from subzero.language import Language
|
|
|
|
p = inflect.engine()
|
|
|
|
|
|
language_converters.register('subscene = subliminal_patch.converters.subscene:SubsceneConverter')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SubsceneSubtitle(Subtitle):
|
|
provider_name = 'subscene'
|
|
hearing_impaired_verifiable = True
|
|
is_pack = False
|
|
page_link = None
|
|
season = None
|
|
episode = None
|
|
releases = None
|
|
|
|
def __init__(self, language, release_info, hearing_impaired=False, page_link=None, encoding=None, mods=None,
|
|
asked_for_release_group=None, asked_for_episode=None):
|
|
super(SubsceneSubtitle, self).__init__(language, hearing_impaired=hearing_impaired, page_link=page_link,
|
|
encoding=encoding, mods=mods)
|
|
self.release_info = self.releases = release_info
|
|
self.asked_for_episode = asked_for_episode
|
|
self.asked_for_release_group = asked_for_release_group
|
|
self.season = None
|
|
self.episode = None
|
|
|
|
@classmethod
|
|
def from_api(cls, s):
|
|
return cls(Language.fromsubscene(s.language.strip()), s.title, hearing_impaired=s.hearing_impaired,
|
|
page_link=s.url)
|
|
|
|
@property
|
|
def id(self):
|
|
return self.page_link
|
|
|
|
@property
|
|
def numeric_id(self):
|
|
return self.page_link.split("/")[-1]
|
|
|
|
def get_matches(self, video):
|
|
matches = set()
|
|
|
|
if self.release_info.strip() == get_video_filename(video):
|
|
logger.debug("Using hash match as the release name is the same")
|
|
matches |= {"hash"}
|
|
|
|
# episode
|
|
if isinstance(video, Episode):
|
|
guess = guessit(self.release_info, {'type': 'episode'})
|
|
self.season = guess.get("season")
|
|
self.episode = guess.get("episode")
|
|
|
|
matches |= guess_matches(video, guess)
|
|
if "season" in matches and "episode" not in guess:
|
|
# pack
|
|
matches.add("episode")
|
|
logger.debug("%r is a pack", self)
|
|
self.is_pack = True
|
|
|
|
# movie
|
|
else:
|
|
guess = guessit(self.release_info, {'type': 'movie'})
|
|
matches |= guess_matches(video, guess)
|
|
|
|
if video.release_group and "release_group" not in matches and "release_group" in guess:
|
|
if sanitize_release_group(video.release_group) in sanitize_release_group(guess["release_group"]):
|
|
matches.add("release_group")
|
|
|
|
self.matches = matches
|
|
|
|
return matches
|
|
|
|
def get_download_link(self, session):
|
|
return APISubtitle.get_zipped_url(self.page_link, session)
|
|
|
|
|
|
def get_video_filename(video):
|
|
return os.path.splitext(os.path.basename(video.original_name))[0]
|
|
|
|
|
|
class SubsceneProvider(Provider, ProviderSubtitleArchiveMixin):
|
|
"""
|
|
This currently only searches for the filename on SubScene. It doesn't open every found subtitle page to avoid
|
|
massive hammering, thus it can't determine whether a subtitle is only-foreign or not.
|
|
"""
|
|
subtitle_class = SubsceneSubtitle
|
|
languages = supported_languages
|
|
languages.update(set(Language.rebuild(l, forced=True) for l in languages))
|
|
|
|
session = None
|
|
skip_wrong_fps = False
|
|
hearing_impaired_verifiable = True
|
|
only_foreign = False
|
|
|
|
search_throttle = 2 # seconds
|
|
|
|
def __init__(self, only_foreign=False):
|
|
self.only_foreign = only_foreign
|
|
|
|
def initialize(self):
|
|
logger.info("Creating session")
|
|
self.session = Session()
|
|
from .utils import FIRST_THOUSAND_OR_SO_USER_AGENTS as AGENT_LIST
|
|
self.session.headers['User-Agent'] = AGENT_LIST[randint(0, len(AGENT_LIST) - 1)]
|
|
|
|
def terminate(self):
|
|
logger.info("Closing session")
|
|
self.session.close()
|
|
|
|
def _create_filters(self, languages):
|
|
self.filters = dict(HearingImpaired="2")
|
|
if self.only_foreign:
|
|
self.filters["ForeignOnly"] = "True"
|
|
logger.info("Only searching for foreign/forced subtitles")
|
|
|
|
self.filters["LanguageFilter"] = ",".join((str(language_ids[l.alpha3]) for l in languages
|
|
if l.alpha3 in language_ids))
|
|
|
|
logger.debug("Filter created: '%s'" % self.filters)
|
|
|
|
def _enable_filters(self):
|
|
self.session.cookies.update(self.filters)
|
|
logger.debug("Filters applied")
|
|
|
|
def list_subtitles(self, video, languages):
|
|
if not video.original_name:
|
|
logger.info("Skipping search because we don't know the original release name")
|
|
return []
|
|
|
|
self._create_filters(languages)
|
|
self._enable_filters()
|
|
return [s for s in self.query(video) if s.language in languages]
|
|
|
|
def download_subtitle(self, subtitle):
|
|
if subtitle.pack_data:
|
|
logger.info("Using previously downloaded pack data")
|
|
archive = ZipFile(io.BytesIO(subtitle.pack_data))
|
|
subtitle.pack_data = None
|
|
|
|
try:
|
|
subtitle.content = self.get_subtitle_from_archive(subtitle, archive)
|
|
return
|
|
except ProviderError:
|
|
pass
|
|
|
|
# open the archive
|
|
r = self.session.get(subtitle.get_download_link(self.session), timeout=10)
|
|
r.raise_for_status()
|
|
archive_stream = io.BytesIO(r.content)
|
|
archive = ZipFile(archive_stream)
|
|
|
|
subtitle.content = self.get_subtitle_from_archive(subtitle, archive)
|
|
|
|
# store archive as pack_data for later caching
|
|
subtitle.pack_data = r.content
|
|
|
|
def parse_results(self, video, film):
|
|
subtitles = []
|
|
for s in film.subtitles:
|
|
subtitle = SubsceneSubtitle.from_api(s)
|
|
subtitle.asked_for_release_group = video.release_group
|
|
if isinstance(video, Episode):
|
|
subtitle.asked_for_episode = video.episode
|
|
|
|
if self.only_foreign:
|
|
subtitle.language = Language.rebuild(subtitle.language, forced=True)
|
|
|
|
subtitles.append(subtitle)
|
|
logger.debug('Found subtitle %r', subtitle)
|
|
|
|
return subtitles
|
|
|
|
def query(self, video):
|
|
vfn = get_video_filename(video)
|
|
subtitles = []
|
|
#logger.debug(u"Searching for: %s", vfn)
|
|
# film = search(vfn, session=self.session)
|
|
#
|
|
# if film and film.subtitles:
|
|
# logger.debug('Release results found: %s', len(film.subtitles))
|
|
# subtitles = self.parse_results(video, film)
|
|
# else:
|
|
# logger.debug('No release results found')
|
|
|
|
#time.sleep(self.search_throttle)
|
|
|
|
# re-search for episodes without explicit release name
|
|
if isinstance(video, Episode):
|
|
#term = u"%s S%02iE%02i" % (video.series, video.season, video.episode)
|
|
more_than_one = len([video.series] + video.alternative_series) > 1
|
|
for series in [video.series] + video.alternative_series:
|
|
term = u"%s - %s Season" % (series, p.number_to_words("%sth" % video.season).capitalize())
|
|
logger.debug('Searching for alternative results: %s', term)
|
|
film = search(term, session=self.session, release=False)
|
|
if film and film.subtitles:
|
|
logger.debug('Alternative results found: %s', len(film.subtitles))
|
|
subtitles += self.parse_results(video, film)
|
|
else:
|
|
logger.debug('No alternative results found')
|
|
|
|
# packs
|
|
if video.season_fully_aired:
|
|
term = u"%s S%02i" % (series, video.season)
|
|
logger.debug('Searching for packs: %s', term)
|
|
time.sleep(self.search_throttle)
|
|
film = search(term, session=self.session)
|
|
if film and film.subtitles:
|
|
logger.debug('Pack results found: %s', len(film.subtitles))
|
|
subtitles += self.parse_results(video, film)
|
|
else:
|
|
logger.debug('No pack results found')
|
|
else:
|
|
logger.debug("Not searching for packs, because the season hasn't fully aired")
|
|
if more_than_one:
|
|
time.sleep(self.search_throttle)
|
|
else:
|
|
more_than_one = len([video.title] + video.alternative_titles) > 1
|
|
for title in [video.title] + video.alternative_titles:
|
|
logger.debug('Searching for movie results: %s', title)
|
|
film = search(title, year=video.year, session=self.session, limit_to=None, release=False)
|
|
if film and film.subtitles:
|
|
subtitles += self.parse_results(video, film)
|
|
if more_than_one:
|
|
time.sleep(self.search_throttle)
|
|
|
|
logger.info("%s subtitles found" % len(subtitles))
|
|
return subtitles
|