1
0
Fork 0
mirror of https://github.com/morpheus65535/bazarr synced 2024-12-28 10:38:26 +00:00
bazarr/libs/cloudscraper/captcha/capmonster.py
morpheus65535 cb420628f8
Cloudflare improvements (#1448)
* Upgraded cloudscraper to fix multiple issues with providers that uses antibot page.

* Fixed subs4series provider. It now require anti-captcha provider to download subtitles. One captcha will have to be solved for each download. #1442
2021-06-23 15:54:28 -04:00

190 lines
5.6 KiB
Python

from __future__ import absolute_import
import requests
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse
from ..exceptions import (
CaptchaServiceUnavailable,
CaptchaAPIError,
CaptchaTimeout,
CaptchaParameter,
CaptchaBadJobID
)
try:
import polling2
except ImportError:
raise ImportError("Please install the python module 'polling2' via pip")
from . import Captcha
class captchaSolver(Captcha):
def __init__(self):
super(captchaSolver, self).__init__('capmonster')
self.host = 'https://api.capmonster.cloud'
self.session = requests.Session()
# ------------------------------------------------------------------------------- #
@staticmethod
def checkErrorStatus(response):
if response.status_code in [500, 502]:
raise CaptchaServiceUnavailable(
f'CapMonster: Server Side Error {response.status_code}'
)
payload = response.json()
if payload['errorId'] == 1:
if 'errorDescription' in payload:
raise CaptchaAPIError(
payload['errorDescription']
)
else:
raise CaptchaAPIError(payload['errorCode'])
# ------------------------------------------------------------------------------- #
def requestJob(self, taskID):
if not taskID:
raise CaptchaBadJobID(
'CapMonster: Error bad task id to request Captcha.'
)
def _checkRequest(response):
self.checkErrorStatus(response)
if response.ok and response.json()['status'] == 'ready':
return True
return None
response = polling2.poll(
lambda: self.session.post(
f'{self.host}/getTaskResult',
json={
'clientKey': self.clientKey,
'taskId': taskID
},
timeout=30
),
check_success=_checkRequest,
step=5,
timeout=180
)
if response:
return response.json()['solution']['gRecaptchaResponse']
else:
raise CaptchaTimeout(
"CapMonster: Error failed to solve Captcha."
)
# ------------------------------------------------------------------------------- #
def requestSolve(self, captchaType, url, siteKey):
def _checkRequest(response):
self.checkErrorStatus(response)
if response.ok and response.json()['taskId']:
return True
return None
data = {
'clientKey': self.clientKey,
'task': {
'websiteURL': url,
'websiteKey': siteKey,
'softId': 37,
'type': 'NoCaptchaTask' if captchaType == 'reCaptcha' else 'HCaptchaTask'
}
}
if self.proxy:
data['task'].update(self.proxy)
else:
data['task']['type'] = f"{data['task']['type']}Proxyless"
response = polling2.poll(
lambda: self.session.post(
f'{self.host}/createTask',
json=data,
allow_redirects=False,
timeout=30
),
check_success=_checkRequest,
step=5,
timeout=180
)
if response:
return response.json()['taskId']
else:
raise CaptchaBadJobID(
'CapMonster: Error no task id was returned.'
)
# ------------------------------------------------------------------------------- #
def getCaptchaAnswer(self, captchaType, url, siteKey, captchaParams):
taskID = None
if not captchaParams.get('clientKey'):
raise CaptchaParameter(
"CapMonster: Missing clientKey parameter."
)
self.clientKey = captchaParams.get('clientKey')
if captchaParams.get('proxy') and not captchaParams.get('no_proxy'):
hostParsed = urlparse(captchaParams.get('proxy', {}).get('https'))
if not hostParsed.scheme:
raise CaptchaParameter('Cannot parse proxy correctly, bad scheme')
if not hostParsed.netloc:
raise CaptchaParameter('Cannot parse proxy correctly, bad netloc')
ports = {
'http': 80,
'https': 443
}
self.proxy = {
'proxyType': hostParsed.scheme,
'proxyAddress': hostParsed.hostname,
'proxyPort': hostParsed.port if hostParsed.port else ports[self.proxy['proxyType']],
'proxyLogin': hostParsed.username,
'proxyPassword': hostParsed.password,
}
else:
self.proxy = None
try:
taskID = self.requestSolve(captchaType, url, siteKey)
return self.requestJob(taskID)
except polling2.TimeoutException:
try:
if taskID:
self.reportJob(taskID)
except polling2.TimeoutException:
raise CaptchaTimeout(
"CapMonster: Captcha solve took to long and also failed "
f"reporting the task with task id {taskID}."
)
raise CaptchaTimeout(
"CapMonster: Captcha solve took to long to execute "
f"task id {taskID}, aborting."
)
# ------------------------------------------------------------------------------- #
captchaSolver()