Merge branch 'development' into hermes

# Conflicts:
#	bazarr/main.py
#	views/episodes.tpl
#	views/movie.tpl
#	views/providers.tpl
This commit is contained in:
Louis Vézina 2020-03-29 10:02:10 -04:00
commit d9453121a6
45 changed files with 3620 additions and 336 deletions

View File

@ -68,6 +68,7 @@ If you need something that is not already part of Bazarr, feel free to create a
* Supersubtitles
* Titlovi
* TVSubtitles
* Wizdom
* XSubs
* Zimuku

View File

@ -25,10 +25,10 @@ class EmbeddedSubsReader:
if 'subtitle' in data:
for detected_language in data['subtitle']:
language = detected_language['language'].alpha3
forced = detected_language['forced'] if 'forced' in detected_language else None
codec = detected_language['format'] if 'format' in detected_language else None
if language:
if 'language' in detected_language:
language = detected_language['language'].alpha3
forced = detected_language['forced'] if 'forced' in detected_language else None
codec = detected_language['format'] if 'format' in detected_language else None
subtitles_list.append([language, forced, codec])
else:
continue

View File

@ -34,6 +34,7 @@ from database import database, dict_mapper
from analytics import track_event
from locale import getpreferredencoding
import chardet
def get_video(path, title, sceneName, providers=None, media_type="movie"):
@ -342,6 +343,11 @@ def manual_search(path, language, hi, forced, providers, providers_auth, sceneNa
if len(releases) == 0:
releases = ['n/a']
if s.uploader and s.uploader.strip():
s_uploader = s.uploader.strip()
else:
s_uploader = 'n/a'
subtitles_list.append(
dict(score=round((score / max_score * 100), 2),
orig_score=score,
@ -350,7 +356,7 @@ def manual_search(path, language, hi, forced, providers, providers_auth, sceneNa
provider=s.provider_name,
subtitle=codecs.encode(pickle.dumps(s.make_picklable()), "base64").decode(),
url=s.page_link, matches=list(matches), dont_matches=list(not_matched),
release_info=releases))
release_info=releases, uploader=s_uploader))
final_subtitles = sorted(subtitles_list, key=lambda x: (x['orig_score'], x['score_without_hash']),
reverse=True)
@ -482,7 +488,34 @@ def manual_upload_subtitle(path, language, forced, title, scene_name, media_type
if os.path.exists(subtitle_path):
os.remove(subtitle_path)
subtitle.save(subtitle_path)
if settings.general.utf8_encode:
try:
os.remove(subtitle_path + ".tmp")
except:
pass
subtitle.save(subtitle_path + ".tmp")
with open(subtitle_path + ".tmp", 'rb') as fr:
text = fr.read()
try:
guess = chardet.detect(text)
text = text.decode(guess["encoding"])
text = text.encode('utf-8')
except UnicodeError:
logging.exception("BAZARR subtitles file doesn't seems to be text based. Skipping this file: " +
subtitle_path)
else:
with open(subtitle_path, 'wb') as fw:
fw.write(text)
finally:
try:
os.remove(subtitle_path + ".tmp")
except:
pass
else:
subtitle.save(subtitle_path)
if chmod:
os.chmod(subtitle_path, chmod)
@ -857,7 +890,7 @@ def refine_from_db(path, video):
"WHERE table_episodes.path = ?", (path_replace_reverse(path),), only_one=True)
if data:
video.series, year, country = series_re.match(data['seriesTitle']).groups()
video.series = data['seriesTitle']
video.season = int(data['season'])
video.episode = int(data['episode'])
video.title = data['episodeTitle']

View File

@ -382,11 +382,13 @@ def guess_external_subtitles(dest_folder, subtitles):
text = f.read()
try:
# to improve performance, use only the first 8K to detect encoding
if len(text) > 8192: guess = chardet.detect(text[:8192])
else: guess = chardet.detect(text)
if guess["confidence"] < 0.8:
# to improve performance, use only the first 32K to detect encoding
guess = chardet.detect(text[:32768])
logging.debug('BAZARR detected encoding %r', guess)
if guess["confidence"] < 0.6:
raise UnicodeError
if guess["confidence"] < 0.8 or guess["encoding"] == "ascii":
guess["encoding"] = "utf-8"
text = text.decode(guess["encoding"])
detected_language = guess_language(text)
except UnicodeError:

View File

@ -323,6 +323,10 @@ class Apprise(object):
# bad attachments
return False
# Allow Asset default value
body_format = self.asset.body_format \
if body_format is None else body_format
# Iterate over our loaded plugins
for server in self.find(tag):
if status is None:

View File

@ -86,23 +86,32 @@ class AppriseAsset(object):
'apprise-{TYPE}-{XY}{EXTENSION}',
))
def __init__(self, theme='default', image_path_mask=None,
image_url_mask=None, default_extension=None):
# This value can also be set on calls to Apprise.notify(). This allows
# you to let Apprise upfront the type of data being passed in. This
# must be of type NotifyFormat. Possible values could be:
# - NotifyFormat.TEXT
# - NotifyFormat.MARKDOWN
# - NotifyFormat.HTML
# - None
#
# If no format is specified (hence None), then no special pre-formating
# actions will take place during a notificaton. This has been and always
# will be the default.
body_format = None
def __init__(self, **kwargs):
"""
Asset Initialization
"""
if theme:
self.theme = theme
# Assign default arguments if specified
for key, value in kwargs.items():
if not hasattr(AppriseAsset, key):
raise AttributeError(
'AppriseAsset init(): '
'An invalid key {} was specified.'.format(key))
if image_path_mask is not None:
self.image_path_mask = image_path_mask
if image_url_mask is not None:
self.image_url_mask = image_url_mask
if default_extension is not None:
self.default_extension = default_extension
setattr(self, key, value)
def color(self, notify_type, color_type=None):
"""

View File

@ -102,7 +102,7 @@ class AppriseAttachment(object):
# Initialize our default cache value
cache = cache if cache is not None else self.cache
if isinstance(asset, AppriseAsset):
if asset is None:
# prepare default asset
asset = self.asset

View File

@ -115,7 +115,7 @@ class AppriseConfig(object):
# Initialize our default cache value
cache = cache if cache is not None else self.cache
if isinstance(asset, AppriseAsset):
if asset is None:
# prepare default asset
asset = self.asset
@ -165,6 +165,39 @@ class AppriseConfig(object):
# Return our status
return return_status
def add_config(self, content, asset=None, tag=None, format=None):
"""
Adds one configuration file in it's raw format. Content gets loaded as
a memory based object and only exists for the life of this
AppriseConfig object it was loaded into.
If you know the format ('yaml' or 'text') you can specify
it for slightly less overhead during this call. Otherwise the
configuration is auto-detected.
"""
if asset is None:
# prepare default asset
asset = self.asset
if not isinstance(content, six.string_types):
logger.warning(
"An invalid configuration (type={}) was specified.".format(
type(content)))
return False
logger.debug("Loading raw configuration: {}".format(content))
# Create ourselves a ConfigMemory Object to store our configuration
instance = config.ConfigMemory(
content=content, format=format, asset=asset, tag=tag)
# Add our initialized plugin to our server listings
self.configs.append(instance)
# Return our status
return True
def servers(self, tag=MATCH_ALL_TAG, *args, **kwargs):
"""
Returns all of our servers dynamically build based on parsed

View File

@ -24,10 +24,10 @@
# THE SOFTWARE.
__title__ = 'apprise'
__version__ = '0.8.2'
__version__ = '0.8.4'
__author__ = 'Chris Caron'
__license__ = 'MIT'
__copywrite__ = 'Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>'
__copywrite__ = 'Copyright (C) 2020 Chris Caron <lead2gold@gmail.com>'
__email__ = 'lead2gold@gmail.com'
__status__ = 'Production'

View File

@ -28,6 +28,7 @@ import time
import mimetypes
from ..URLBase import URLBase
from ..utils import parse_bool
from ..AppriseLocale import gettext_lazy as _
class AttachBase(URLBase):
@ -61,7 +62,35 @@ class AttachBase(URLBase):
# 5 MB = 5242880 bytes
max_file_size = 5242880
def __init__(self, name=None, mimetype=None, cache=True, **kwargs):
# Here is where we define all of the arguments we accept on the url
# such as: schema://whatever/?overflow=upstream&format=text
# These act the same way as tokens except they are optional and/or
# have default values set if mandatory. This rule must be followed
template_args = {
'cache': {
'name': _('Cache Age'),
'type': 'int',
# We default to (600) which means we cache for 10 minutes
'default': 600,
},
'mime': {
'name': _('Forced Mime Type'),
'type': 'string',
},
'name': {
'name': _('Forced File Name'),
'type': 'string',
},
'verify': {
'name': _('Verify SSL'),
# SSL Certificate Authority Verification
'type': 'bool',
# Provide a default
'default': True,
},
}
def __init__(self, name=None, mimetype=None, cache=None, **kwargs):
"""
Initialize some general logging and common server arguments that will
keep things consistent when working with the configurations that
@ -109,19 +138,27 @@ class AttachBase(URLBase):
# Absolute path to attachment
self.download_path = None
# Set our cache flag; it can be True or a (positive) integer
try:
self.cache = cache if isinstance(cache, bool) else int(cache)
# Set our cache flag; it can be True, False, None, or a (positive)
# integer... nothing else
if cache is not None:
try:
self.cache = cache if isinstance(cache, bool) else int(cache)
except (TypeError, ValueError):
err = 'An invalid cache value ({}) was specified.'.format(
cache)
self.logger.warning(err)
raise TypeError(err)
# Some simple error checking
if self.cache < 0:
err = 'A negative cache value ({}) was specified.'.format(
cache)
self.logger.warning(err)
raise TypeError(err)
except (ValueError, TypeError):
err = 'An invalid cache value ({}) was specified.'.format(cache)
self.logger.warning(err)
raise TypeError(err)
else:
self.cache = None
# Validate mimetype if specified
if self._mimetype:
@ -211,12 +248,16 @@ class AttachBase(URLBase):
Simply returns true if the object has downloaded and stored the
attachment AND the attachment has not expired.
"""
cache = self.template_args['cache']['default'] \
if self.cache is None else self.cache
if self.download_path and os.path.isfile(self.download_path) \
and self.cache:
and cache:
# We have enough reason to look further into our cached content
# and verify it has not expired.
if self.cache is True:
if cache is True:
# return our fixed content as is; we will always cache it
return True
@ -224,7 +265,7 @@ class AttachBase(URLBase):
# content again.
try:
age_in_sec = time.time() - os.stat(self.download_path).st_mtime
if age_in_sec <= self.cache:
if age_in_sec <= cache:
return True
except (OSError, IOError):

View File

@ -78,6 +78,11 @@ class AttachHTTP(AttachBase):
# Where our content is written to upon a call to download.
self._temp_file = None
# Our Query String Dictionary; we use this to track arguments
# specified that aren't otherwise part of this class
self.qsd = {k: v for k, v in kwargs.get('qsd', {}).items()
if k not in self.template_args}
return
def download(self, **kwargs):
@ -122,6 +127,7 @@ class AttachHTTP(AttachBase):
url,
headers=headers,
auth=auth,
params=self.qsd,
verify=self.verify_certificate,
timeout=self.connection_timeout_sec,
stream=True) as r:
@ -252,18 +258,21 @@ class AttachHTTP(AttachBase):
Returns the URL built dynamically based on specified arguments.
"""
# Prepare our cache value
if isinstance(self.cache, bool) or not self.cache:
cache = 'yes' if self.cache else 'no'
else:
cache = int(self.cache)
# Define any arguments set
args = {
'verify': 'yes' if self.verify_certificate else 'no',
'cache': cache,
}
# Prepare our cache value
if self.cache is not None:
if isinstance(self.cache, bool) or not self.cache:
cache = 'yes' if self.cache else 'no'
else:
cache = int(self.cache)
# Set our cache value
args['cache'] = cache
if self._mimetype:
# A format was enforced
args['mime'] = self._mimetype
@ -275,6 +284,9 @@ class AttachHTTP(AttachBase):
# Append our headers into our args
args.update({'+{}'.format(k): v for k, v in self.headers.items()})
# Apply any remaining entries to our URL
args.update(self.qsd)
# Determine Authentication
auth = ''
if self.user and self.password:
@ -290,7 +302,7 @@ class AttachHTTP(AttachBase):
default_port = 443 if self.secure else 80
return '{schema}://{auth}{hostname}{port}{fullpath}/?{args}'.format(
return '{schema}://{auth}{hostname}{port}{fullpath}?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=self.quote(self.host, safe=''),

View File

@ -118,7 +118,9 @@ def print_version_msg():
help='Perform a trial run but only prints the notification '
'services to-be triggered to stdout. Notifications are never '
'sent using this mode.')
@click.option('--verbose', '-v', count=True)
@click.option('--verbose', '-v', count=True,
help='Makes the operation more talkative. Use multiple v to '
'increase the verbosity. I.e.: -vvvv')
@click.option('--version', '-V', is_flag=True,
help='Display the apprise version and exit.')
@click.argument('urls', nargs=-1,

View File

@ -92,7 +92,8 @@ class ConfigBase(URLBase):
# Store the encoding
self.encoding = kwargs.get('encoding')
if 'format' in kwargs:
if 'format' in kwargs \
and isinstance(kwargs['format'], six.string_types):
# Store the enforced config format
self.config_format = kwargs.get('format').lower()
@ -249,6 +250,109 @@ class ConfigBase(URLBase):
return results
@staticmethod
def detect_config_format(content, **kwargs):
"""
Takes the specified content and attempts to detect the format type
The function returns the actual format type if detected, otherwise
it returns None
"""
# Detect Format Logic:
# - A pound/hashtag (#) is alawys a comment character so we skip over
# lines matched here.
# - Detection begins on the first non-comment and non blank line
# matched.
# - If we find a string followed by a colon, we know we're dealing
# with a YAML file.
# - If we find a string that starts with a URL, or our tag
# definitions (accepting commas) followed by an equal sign we know
# we're dealing with a TEXT format.
# Define what a valid line should look like
valid_line_re = re.compile(
r'^\s*(?P<line>([;#]+(?P<comment>.*))|'
r'(?P<text>((?P<tag>[ \t,a-z0-9_-]+)=)?[a-z0-9]+://.*)|'
r'((?P<yaml>[a-z0-9]+):.*))?$', re.I)
try:
# split our content up to read line by line
content = re.split(r'\r*\n', content)
except TypeError:
# content was not expected string type
ConfigBase.logger.error('Invalid apprise config specified')
return None
# By default set our return value to None since we don't know
# what the format is yet
config_format = None
# iterate over each line of the file to attempt to detect it
# stop the moment a the type has been determined
for line, entry in enumerate(content, start=1):
result = valid_line_re.match(entry)
if not result:
# Invalid syntax
ConfigBase.logger.error(
'Undetectable apprise configuration found '
'based on line {}.'.format(line))
# Take an early exit
return None
# Attempt to detect configuration
if result.group('yaml'):
config_format = ConfigFormat.YAML
ConfigBase.logger.debug(
'Detected YAML configuration '
'based on line {}.'.format(line))
break
elif result.group('text'):
config_format = ConfigFormat.TEXT
ConfigBase.logger.debug(
'Detected TEXT configuration '
'based on line {}.'.format(line))
break
# If we reach here, we have a comment entry
# Adjust default format to TEXT
config_format = ConfigFormat.TEXT
return config_format
@staticmethod
def config_parse(content, asset=None, config_format=None, **kwargs):
"""
Takes the specified config content and loads it based on the specified
config_format. If a format isn't specified, then it is auto detected.
"""
if config_format is None:
# Detect the format
config_format = ConfigBase.detect_config_format(content)
if not config_format:
# We couldn't detect configuration
ConfigBase.logger.error('Could not detect configuration')
return list()
if config_format not in CONFIG_FORMATS:
# Invalid configuration type specified
ConfigBase.logger.error(
'An invalid configuration format ({}) was specified'.format(
config_format))
return list()
# Dynamically load our parse_ function based on our config format
fn = getattr(ConfigBase, 'config_parse_{}'.format(config_format))
# Execute our config parse function which always returns a list
return fn(content=content, asset=asset)
@staticmethod
def config_parse_text(content, asset=None):
"""
@ -270,9 +374,6 @@ class ConfigBase(URLBase):
<URL>
"""
# For logging, track the line number
line = 0
response = list()
# Define what a valid line should look like
@ -290,10 +391,7 @@ class ConfigBase(URLBase):
ConfigBase.logger.error('Invalid apprise text data specified')
return list()
for entry in content:
# Increment our line count
line += 1
for line, entry in enumerate(content, start=1):
result = valid_line_re.match(entry)
if not result:
# Invalid syntax

View File

@ -0,0 +1,82 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2020 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from .ConfigBase import ConfigBase
from ..AppriseLocale import gettext_lazy as _
class ConfigMemory(ConfigBase):
"""
For information that was loaded from memory and does not
persist anywhere.
"""
# The default descriptive name associated with the service
service_name = _('Memory')
# The default protocol
protocol = 'memory'
def __init__(self, content, **kwargs):
"""
Initialize Memory Object
Memory objects just store the raw configuration in memory. There is
no external reference point. It's always considered cached.
"""
super(ConfigMemory, self).__init__(**kwargs)
# Store our raw config into memory
self.content = content
if self.config_format is None:
# Detect our format if possible
self.config_format = \
ConfigMemory.detect_config_format(self.content)
return
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
return 'memory://'
def read(self, **kwargs):
"""
Simply return content stored into memory
"""
return self.content
@staticmethod
def parse_url(url):
"""
Memory objects have no parseable URL
"""
# These URLs can not be parsed
return None

View File

@ -1,21 +1,21 @@
# Translations template for apprise.
# Copyright (C) 2019 Chris Caron
# Copyright (C) 2020 Chris Caron
# This file is distributed under the same license as the apprise project.
# FIRST AUTHOR <EMAIL@ADDRESS>, 2019.
# FIRST AUTHOR <EMAIL@ADDRESS>, 2020.
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: apprise 0.8.2\n"
"Project-Id-Version: apprise 0.8.4\n"
"Report-Msgid-Bugs-To: lead2gold@gmail.com\n"
"POT-Creation-Date: 2019-11-25 18:50-0500\n"
"POT-Creation-Date: 2020-02-01 12:59-0500\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.7.0\n"
"Generated-By: Babel 2.8.0\n"
msgid "API Key"
msgstr ""
@ -71,6 +71,9 @@ msgstr ""
msgid "Bot Token"
msgstr ""
msgid "Cache Age"
msgstr ""
msgid "Cache Results"
msgstr ""
@ -128,6 +131,12 @@ msgstr ""
msgid "Footer Logo"
msgstr ""
msgid "Forced File Name"
msgstr ""
msgid "Forced Mime Type"
msgstr ""
msgid "From Email"
msgstr ""
@ -164,6 +173,9 @@ msgstr ""
msgid "Log to STDERR"
msgstr ""
msgid "Memory"
msgstr ""
msgid "Message Hook"
msgstr ""
@ -203,6 +215,9 @@ msgstr ""
msgid "Priority"
msgstr ""
msgid "Private Key"
msgstr ""
msgid "Project ID"
msgstr ""
@ -365,6 +380,9 @@ msgstr ""
msgid "Version"
msgstr ""
msgid "Vibration"
msgstr ""
msgid "Web Based"
msgstr ""

View File

@ -86,7 +86,7 @@ class NotifyD7Networks(NotifyBase):
# The services URL
service_url = 'https://d7networks.com/'
# All pushover requests are secure
# All notification requests are secure
secure_protocol = 'd7sms'
# Allow 300 requests per minute.
@ -94,7 +94,7 @@ class NotifyD7Networks(NotifyBase):
request_rate_per_sec = 0.20
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_twilio'
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_d7networks'
# D7 Networks batch notification URL
notify_batch_url = 'http://rest-api.d7networks.com/secure/sendbatch'

View File

@ -51,6 +51,7 @@ from ..common import NotifyType
from ..utils import parse_bool
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
from ..attachment.AttachBase import AttachBase
class NotifyDiscord(NotifyBase):
@ -312,6 +313,19 @@ class NotifyDiscord(NotifyBase):
# Always call throttle before any remote server i/o is made
self.throttle()
# Perform some simple error checking
if isinstance(attach, AttachBase):
if not attach:
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
attach.url(privacy=True)))
return False
self.logger.debug(
'Posting Discord attachment {}'.format(
attach.url(privacy=True)))
# Our attachment path (if specified)
files = None
try:

View File

@ -573,21 +573,22 @@ class NotifyEmail(NotifyBase):
# First attach our body to our content as the first element
base.attach(content)
attach_error = False
# Now store our attachments
for attachment in attach:
if not attachment:
# We could not load the attachment; take an early
# exit since this isn't what the end user wanted
self.logger.warning(
'The specified attachment could not be referenced:'
' {}.'.format(attachment.url(privacy=True)))
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
attachment.url(privacy=True)))
# Mark our failure
attach_error = True
break
return False
self.logger.debug(
'Preparing Email attachment {}'.format(
attachment.url(privacy=True)))
with open(attachment.path, "rb") as abody:
app = MIMEApplication(
@ -600,11 +601,6 @@ class NotifyEmail(NotifyBase):
base.attach(app)
if attach_error:
# Mark our error and quit early
has_error = True
break
# bind the socket variable to the current namespace
socket = None

View File

@ -0,0 +1,352 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# Sources
# - https://dreambox.de/en/
# - https://dream.reichholf.net/wiki/Hauptseite
# - https://dream.reichholf.net/wiki/Enigma2:WebInterface#Message
# - https://github.com/E2OpenPlugins/e2openplugin-OpenWebif
# - https://github.com/E2OpenPlugins/e2openplugin-OpenWebif/wiki/\
# OpenWebif-API-documentation#message
import six
import requests
from json import loads
from .NotifyBase import NotifyBase
from ..URLBase import PrivacyMode
from ..common import NotifyType
from ..AppriseLocale import gettext_lazy as _
class Enigma2MessageType(object):
# Defines the Enigma2 notification types Apprise can map to
INFO = 1
WARNING = 2
ERROR = 3
# If a mapping fails, the default of Enigma2MessageType.INFO is used
MESSAGE_MAPPING = {
NotifyType.INFO: Enigma2MessageType.INFO,
NotifyType.SUCCESS: Enigma2MessageType.INFO,
NotifyType.WARNING: Enigma2MessageType.WARNING,
NotifyType.FAILURE: Enigma2MessageType.ERROR,
}
class NotifyEnigma2(NotifyBase):
"""
A wrapper for Enigma2 Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Enigma2'
# The services URL
service_url = 'https://dreambox.de/'
# The default protocol
protocol = 'enigma2'
# The default secure protocol
secure_protocol = 'enigma2s'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_enigma2'
# Enigma2 does not support a title
title_maxlen = 0
# The maximum allowable characters allowed in the body per message
body_maxlen = 1000
# Throttle a wee-bit to avoid thrashing
request_rate_per_sec = 0.5
# Define object templates
templates = (
'{schema}://{host}',
'{schema}://{host}:{port}',
'{schema}://{user}@{host}',
'{schema}://{user}@{host}:{port}',
'{schema}://{user}:{password}@{host}',
'{schema}://{user}:{password}@{host}:{port}',
'{schema}://{host}/{fullpath}',
'{schema}://{host}:{port}/{fullpath}',
'{schema}://{user}@{host}/{fullpath}',
'{schema}://{user}@{host}:{port}/{fullpath}',
'{schema}://{user}:{password}@{host}/{fullpath}',
'{schema}://{user}:{password}@{host}:{port}/{fullpath}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'host': {
'name': _('Hostname'),
'type': 'string',
'required': True,
},
'port': {
'name': _('Port'),
'type': 'int',
'min': 1,
'max': 65535,
},
'user': {
'name': _('Username'),
'type': 'string',
},
'password': {
'name': _('Password'),
'type': 'string',
'private': True,
},
'fullpath': {
'name': _('Path'),
'type': 'string',
},
})
template_args = dict(NotifyBase.template_args, **{
'timeout': {
'name': _('Server Timeout'),
'type': 'int',
# The number of seconds to display the message for
'default': 13,
# -1 means infinit
'min': -1,
},
})
# Define any kwargs we're using
template_kwargs = {
'headers': {
'name': _('HTTP Header'),
'prefix': '+',
},
}
def __init__(self, timeout=None, headers=None, **kwargs):
"""
Initialize Enigma2 Object
headers can be a dictionary of key/value pairs that you want to
additionally include as part of the server headers to post with
"""
super(NotifyEnigma2, self).__init__(**kwargs)
try:
self.timeout = int(timeout)
if self.timeout < self.template_args['timeout']['min']:
# Bulletproof; can't go lower then min value
self.timeout = self.template_args['timeout']['min']
except (ValueError, TypeError):
# Use default timeout
self.timeout = self.template_args['timeout']['default']
self.fullpath = kwargs.get('fullpath')
if not isinstance(self.fullpath, six.string_types):
self.fullpath = '/'
self.headers = {}
if headers:
# Store our extra headers
self.headers.update(headers)
return
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
'timeout': str(self.timeout),
}
# Append our headers into our args
args.update({'+{}'.format(k): v for k, v in self.headers.items()})
# Determine Authentication
auth = ''
if self.user and self.password:
auth = '{user}:{password}@'.format(
user=NotifyEnigma2.quote(self.user, safe=''),
password=self.pprint(
self.password, privacy, mode=PrivacyMode.Secret, safe=''),
)
elif self.user:
auth = '{user}@'.format(
user=NotifyEnigma2.quote(self.user, safe=''),
)
default_port = 443 if self.secure else 80
return '{schema}://{auth}{hostname}{port}{fullpath}?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=NotifyEnigma2.quote(self.host, safe=''),
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
fullpath=NotifyEnigma2.quote(self.fullpath, safe='/'),
args=NotifyEnigma2.urlencode(args),
)
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Enigma2 Notification
"""
# prepare Enigma2 Object
headers = {
'User-Agent': self.app_id,
}
params = {
'text': body,
'type': MESSAGE_MAPPING.get(
notify_type, Enigma2MessageType.INFO),
'timeout': self.timeout,
}
# Apply any/all header over-rides defined
headers.update(self.headers)
auth = None
if self.user:
auth = (self.user, self.password)
# Set our schema
schema = 'https' if self.secure else 'http'
url = '%s://%s' % (schema, self.host)
if isinstance(self.port, int):
url += ':%d' % self.port
# Prepare our message URL
url += self.fullpath.rstrip('/') + '/api/message'
self.logger.debug('Enigma2 POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
))
self.logger.debug('Enigma2 Parameters: %s' % str(params))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.get(
url,
params=params,
headers=headers,
auth=auth,
verify=self.verify_certificate,
)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyEnigma2.http_response_code_lookup(r.status_code)
self.logger.warning(
'Failed to send Enigma2 notification: '
'{}{}error={}.'.format(
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug('Response Details:\r\n{}'.format(r.content))
# Return; we're done
return False
# We were able to post our message; now lets evaluate the response
try:
# Acquire our result
result = loads(r.content).get('result', False)
except (AttributeError, TypeError, ValueError):
# ValueError = r.content is Unparsable
# TypeError = r.content is None
# AttributeError = r is None
# We could not parse JSON response.
result = False
if not result:
self.logger.warning(
'Failed to send Enigma2 notification: '
'There was no server acknowledgement.')
self.logger.debug('Response Details:\r\n{}'.format(r.content))
# Return; we're done
return False
self.logger.info('Sent Enigma2 notification.')
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending Enigma2 '
'notification to %s.' % self.host)
self.logger.debug('Socket Exception: %s' % str(e))
# Return; we're done
return False
return True
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Add our headers that the user can potentially over-ride if they wish
# to to our returned result set
results['headers'] = results['qsd-']
results['headers'].update(results['qsd+'])
# Tidy our header entries by unquoting them
results['headers'] = {
NotifyEnigma2.unquote(x): NotifyEnigma2.unquote(y)
for x, y in results['headers'].items()}
# Save timeout value (if specified)
if 'timeout' in results['qsd'] and len(results['qsd']['timeout']):
results['timeout'] = results['qsd']['timeout']
return results

View File

@ -71,7 +71,7 @@ class NotifyGitter(NotifyBase):
# The services URL
service_url = 'https://gitter.im/'
# All pushover requests are secure
# All notification requests are secure
secure_protocol = 'gitter'
# A URL that takes you to the setup/help of the specific protocol
@ -102,7 +102,7 @@ class NotifyGitter(NotifyBase):
# Define object templates
templates = (
'{schema}://{token}:{targets}/',
'{schema}://{token}/{targets}/',
)
# Define our template tokens

View File

@ -0,0 +1,377 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2020 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# To use this service you will need a Kavenegar account from their website
# at https://kavenegar.com/
#
# After you've established your account you can get your API Key from your
# account profile: https://panel.kavenegar.com/client/setting/account
#
# This provider does not accept +1 (for example) as a country code. You need
# to specify 001 instead.
#
import re
import requests
from json import loads
from .NotifyBase import NotifyBase
from ..common import NotifyType
from ..utils import parse_list
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
# Extend HTTP Error Messages
# Based on https://kavenegar.com/rest.html
KAVENEGAR_HTTP_ERROR_MAP = {
200: 'The request was approved',
400: 'Parameters are incomplete',
401: 'Account has been disabled',
402: 'The operation failed',
403: 'The API Key is invalid',
404: 'The method is unknown',
405: 'The GET/POST request is wrong',
406: 'Invalid mandatory parameters sent',
407: 'You canot access the information you want',
409: 'The server is unable to response',
411: 'The recipient is invalid',
412: 'The sender is invalid',
413: 'Message empty or message length exceeded',
414: 'The number of recipients is more than 200',
415: 'The start index is larger then the total',
416: 'The source IP of the service does not match the settings',
417: 'The submission date is incorrect, '
'either expired or not in the correct format',
418: 'Your account credit is insufficient',
422: 'Data cannot be processed due to invalid characters',
501: 'SMS can only be sent to the account holder number',
}
# Some Phone Number Detection
IS_PHONE_NO = re.compile(r'^\+?(?P<phone>[0-9\s)(+-]+)\s*$')
class NotifyKavenegar(NotifyBase):
"""
A wrapper for Kavenegar Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Kavenegar'
# The services URL
service_url = 'https://kavenegar.com/'
# All notification requests are secure
secure_protocol = 'kavenegar'
# Allow 300 requests per minute.
# 60/300 = 0.2
request_rate_per_sec = 0.20
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_kavenegar'
# Kavenegar single notification URL
notify_url = 'http://api.kavenegar.com/v1/{apikey}/sms/send.json'
# The maximum length of the body
body_maxlen = 160
# A title can not be used for SMS Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.
title_maxlen = 0
# Define object templates
templates = (
'{schema}://{apikey}/{targets}',
'{schema}://{source}@{apikey}/{targets}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'apikey': {
'name': _('API Key'),
'type': 'string',
'required': True,
'private': True,
'regex': (r'^[a-z0-9]+$', 'i'),
},
'source': {
'name': _('Source Phone No'),
'type': 'string',
'prefix': '+',
'regex': (r'^[0-9\s)(+-]+$', 'i'),
},
'target_phone': {
'name': _('Target Phone No'),
'type': 'string',
'prefix': '+',
'regex': (r'^[0-9\s)(+-]+$', 'i'),
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
'required': True,
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'to': {
'alias_of': 'targets',
},
'from': {
'alias_of': 'source',
},
})
def __init__(self, apikey, source=None, targets=None, **kwargs):
"""
Initialize Kavenegar Object
"""
super(NotifyKavenegar, self).__init__(**kwargs)
# API Key (associated with project)
self.apikey = validate_regex(
apikey, *self.template_tokens['apikey']['regex'])
if not self.apikey:
msg = 'An invalid Kavenegar API Key ' \
'({}) was specified.'.format(apikey)
self.logger.warning(msg)
raise TypeError(msg)
self.source = None
if source is not None:
result = IS_PHONE_NO.match(source)
if not result:
msg = 'The Kavenegar source specified ({}) is invalid.'\
.format(source)
self.logger.warning(msg)
raise TypeError(msg)
# Further check our phone # for it's digit count
result = ''.join(re.findall(r'\d+', result.group('phone')))
if len(result) < 11 or len(result) > 14:
msg = 'The MessageBird source # specified ({}) is invalid.'\
.format(source)
self.logger.warning(msg)
raise TypeError(msg)
# Store our source
self.source = result
# Parse our targets
self.targets = list()
for target in parse_list(targets):
# Validate targets and drop bad ones:
result = IS_PHONE_NO.match(target)
if result:
# Further check our phone # for it's digit count
# if it's less than 10, then we can assume it's
# a poorly specified phone no and spit a warning
result = ''.join(re.findall(r'\d+', result.group('phone')))
if len(result) < 11 or len(result) > 14:
self.logger.warning(
'Dropped invalid phone # '
'({}) specified.'.format(target),
)
continue
# store valid phone number
self.targets.append(result)
continue
self.logger.warning(
'Dropped invalid phone # ({}) specified.'.format(target))
if len(self.targets) == 0:
msg = 'There are no valid targets identified to notify.'
self.logger.warning(msg)
raise TypeError(msg)
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Sends SMS Message
"""
# error tracking (used for function return)
has_error = False
# Prepare our headers
headers = {
'User-Agent': self.app_id,
'Accept': 'application/json',
}
# Our URL
url = self.notify_url.format(apikey=self.apikey)
# use the list directly
targets = list(self.targets)
while len(targets):
# Get our target(s) to notify
target = targets.pop(0)
# Prepare our payload
payload = {
'receptor': target,
'message': body,
}
if self.source:
# Only set source if specified
payload['sender'] = self.source
# Some Debug Logging
self.logger.debug(
'Kavenegar POST URL: {} (cert_verify={})'.format(
url, self.verify_certificate))
self.logger.debug('Kavenegar Payload: {}' .format(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,
params=payload,
headers=headers,
verify=self.verify_certificate,
)
if r.status_code not in (
requests.codes.created, requests.codes.ok):
# We had a problem
status_str = \
NotifyBase.http_response_code_lookup(
r.status_code, KAVENEGAR_HTTP_ERROR_MAP)
try:
# Update our status response if we can
json_response = loads(r.content)
status_str = json_response.get('message', status_str)
except (AttributeError, TypeError, ValueError):
# ValueError = r.content is Unparsable
# TypeError = r.content is None
# AttributeError = r is None
# We could not parse JSON response.
# We will just use the status we already have.
pass
self.logger.warning(
'Failed to send Kavenegar SMS notification to {}: '
'{}{}error={}.'.format(
target,
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# Mark our failure
has_error = True
continue
# If we reach here; the message was sent
self.logger.info(
'Sent Kavenegar SMS notification to {}.'.format(target))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending Kavenegar:%s ' % (
', '.join(self.targets)) + 'notification.'
)
self.logger.debug('Socket Exception: %s' % str(e))
# Mark our failure
has_error = True
continue
return not has_error
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
}
return '{schema}://{source}{apikey}/{targets}?{args}'.format(
schema=self.secure_protocol,
source='' if not self.source else '{}@'.format(self.source),
apikey=self.pprint(self.apikey, privacy, safe=''),
targets='/'.join(
[NotifyKavenegar.quote(x, safe='') for x in self.targets]),
args=NotifyKavenegar.urlencode(args))
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# Store the source if specified
if results.get('user', None):
results['source'] = results['user']
# Get our entries; split_path() looks after unquoting content for us
# by default
results['targets'] = NotifyKavenegar.split_path(results['fullpath'])
# The hostname is our authentication key
results['apikey'] = NotifyKavenegar.unquote(results['host'])
# Support the 'to' variable so that we can support targets this way too
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifyKavenegar.parse_list(results['qsd']['to'])
if 'from' in results['qsd'] and len(results['qsd']['from']):
results['source'] = \
NotifyKavenegar.unquote(results['qsd']['from'])
return results

View File

@ -98,7 +98,7 @@ class NotifyMSG91(NotifyBase):
notify_url = 'https://world.msg91.com/api/sendhttp.php'
# The maximum length of the body
body_maxlen = 140
body_maxlen = 160
# A title can not be used for SMS Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.

View File

@ -101,7 +101,7 @@ class NotifyMailgun(NotifyBase):
# The services URL
service_url = 'https://www.mailgun.com/'
# All pushover requests are secure
# All notification requests are secure
secure_protocol = 'mailgun'
# Mailgun advertises they allow 300 requests per minute.

View File

@ -41,6 +41,7 @@ from ..common import NotifyImageSize
from ..common import NotifyFormat
from ..utils import parse_bool
from ..utils import parse_list
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
# Define default path
@ -74,12 +75,16 @@ class MatrixWebhookMode(object):
# Support the slack webhook plugin
SLACK = "slack"
# Support the t2bot webhook plugin
T2BOT = "t2bot"
# webhook modes are placed ito this list for validation purposes
MATRIX_WEBHOOK_MODES = (
MatrixWebhookMode.DISABLED,
MatrixWebhookMode.MATRIX,
MatrixWebhookMode.SLACK,
MatrixWebhookMode.T2BOT,
)
@ -122,6 +127,11 @@ class NotifyMatrix(NotifyBase):
# Define object templates
templates = (
# Targets are ignored when using t2bot mode; only a token is required
'{schema}://{token}',
'{schema}://{user}@{token}',
# All other non-t2bot setups require targets
'{schema}://{user}:{password}@{host}/{targets}',
'{schema}://{user}:{password}@{host}:{port}/{targets}',
'{schema}://{token}:{password}@{host}/{targets}',
@ -199,8 +209,7 @@ class NotifyMatrix(NotifyBase):
},
})
def __init__(self, targets=None, mode=None, include_image=False,
**kwargs):
def __init__(self, targets=None, mode=None, include_image=False, **kwargs):
"""
Initialize Matrix Object
"""
@ -233,6 +242,16 @@ class NotifyMatrix(NotifyBase):
self.logger.warning(msg)
raise TypeError(msg)
if self.mode == MatrixWebhookMode.T2BOT:
# t2bot configuration requires that a webhook id is specified
self.access_token = validate_regex(
self.host, r'^[a-z0-9]{64}$', 'i')
if not self.access_token:
msg = 'An invalid T2Bot/Matrix Webhook ID ' \
'({}) was specified.'.format(self.host)
self.logger.warning(msg)
raise TypeError(msg)
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Matrix Notification
@ -257,20 +276,30 @@ class NotifyMatrix(NotifyBase):
'Content-Type': 'application/json',
}
# Acquire our access token from our URL
access_token = self.password if self.password else self.user
if self.mode != MatrixWebhookMode.T2BOT:
# Acquire our access token from our URL
access_token = self.password if self.password else self.user
default_port = 443 if self.secure else 80
default_port = 443 if self.secure else 80
# Prepare our URL
url = '{schema}://{hostname}:{port}/{webhook_path}/{token}'.format(
schema='https' if self.secure else 'http',
hostname=self.host,
port='' if self.port is None
or self.port == default_port else self.port,
webhook_path=MATRIX_V1_WEBHOOK_PATH,
token=access_token,
)
# Prepare our URL
url = '{schema}://{hostname}:{port}/{webhook_path}/{token}'.format(
schema='https' if self.secure else 'http',
hostname=self.host,
port='' if self.port is None
or self.port == default_port else self.port,
webhook_path=MATRIX_V1_WEBHOOK_PATH,
token=access_token,
)
else:
#
# t2bot Setup
#
# Prepare our URL
url = 'https://webhooks.t2bot.io/api/v1/matrix/hook/' \
'{token}'.format(token=self.access_token)
# Retrieve our payload
payload = getattr(self, '_{}_webhook_payload'.format(self.mode))(
@ -381,7 +410,7 @@ class NotifyMatrix(NotifyBase):
payload = {
'displayName':
self.user if self.user else self.matrix_default_user,
self.user if self.user else self.app_id,
'format': 'html',
}
@ -399,6 +428,27 @@ class NotifyMatrix(NotifyBase):
return payload
def _t2bot_webhook_payload(self, body, title='',
notify_type=NotifyType.INFO, **kwargs):
"""
Format the payload for a T2Bot Matrix based messages
"""
# Retrieve our payload
payload = self._matrix_webhook_payload(
body=body, title=title, notify_type=notify_type, **kwargs)
# Acquire our image url if we're configured to do so
image_url = None if not self.include_image else \
self.image_url(notify_type)
if image_url:
# t2bot can take an avatarUrl Entry
payload['avatarUrl'] = image_url
return payload
def _send_server_notification(self, body, title='',
notify_type=NotifyType.INFO, **kwargs):
"""
@ -867,6 +917,9 @@ class NotifyMatrix(NotifyBase):
))
self.logger.debug('Matrix Payload: %s' % str(payload))
# Initialize our response object
r = None
try:
r = fn(
url,
@ -948,7 +1001,8 @@ class NotifyMatrix(NotifyBase):
"""
Ensure we relinquish our token
"""
self._logout()
if self.mode != MatrixWebhookMode.T2BOT:
self._logout()
def url(self, privacy=False, *args, **kwargs):
"""
@ -997,12 +1051,14 @@ class NotifyMatrix(NotifyBase):
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
if not results.get('host'):
return None
# Get our rooms
results['targets'] = NotifyMatrix.split_path(results['fullpath'])
@ -1040,4 +1096,37 @@ class NotifyMatrix(NotifyBase):
results['mode'] = results['qsd'].get(
'mode', results['qsd'].get('webhook'))
# t2bot detection... look for just a hostname, and/or just a user/host
# if we match this; we can go ahead and set the mode (but only if
# it was otherwise not set)
if results['mode'] is None \
and not results['password'] \
and not results['targets']:
# Default mode to t2bot
results['mode'] = MatrixWebhookMode.T2BOT
return results
@staticmethod
def parse_native_url(url):
"""
Support https://webhooks.t2bot.io/api/v1/matrix/hook/WEBHOOK_TOKEN/
"""
result = re.match(
r'^https?://webhooks\.t2bot\.io/api/v1/matrix/hook/'
r'(?P<webhook_token>[A-Z0-9_-]+)/?'
r'(?P<args>\?.+)?$', url, re.I)
if result:
mode = 'mode={}'.format(MatrixWebhookMode.T2BOT)
return NotifyMatrix.parse_url(
'{schema}://{webhook_token}/{args}'.format(
schema=NotifyMatrix.secure_protocol,
webhook_token=result.group('webhook_token'),
args='?{}'.format(mode) if not result.group('args')
else '{}&{}'.format(result.group('args'), mode)))
return None

View File

@ -63,7 +63,7 @@ class NotifyMessageBird(NotifyBase):
notify_url = 'https://rest.messagebird.com/messages'
# The maximum length of the body
body_maxlen = 140
body_maxlen = 160
# A title can not be used for SMS Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.

View File

@ -64,21 +64,12 @@ class NotifyNexmo(NotifyBase):
notify_url = 'https://rest.nexmo.com/sms/json'
# The maximum length of the body
body_maxlen = 140
body_maxlen = 160
# A title can not be used for SMS Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.
title_maxlen = 0
# Default Time To Live
# By default Nexmo attempt delivery for 72 hours, however the maximum
# effective value depends on the operator and is typically 24 - 48 hours.
# We recommend this value should be kept at its default or at least 30
# minutes.
default_ttl = 900000
ttl_max = 604800000
ttl_min = 20000
# Define object templates
templates = (
'{schema}://{apikey}:{secret}@{from_phone}',
@ -135,6 +126,12 @@ class NotifyNexmo(NotifyBase):
'secret': {
'alias_of': 'secret',
},
# Default Time To Live
# By default Nexmo attempt delivery for 72 hours, however the maximum
# effective value depends on the operator and is typically 24 - 48
# hours. We recommend this value should be kept at its default or at
# least 30 minutes.
'ttl': {
'name': _('ttl'),
'type': 'int',
@ -170,7 +167,7 @@ class NotifyNexmo(NotifyBase):
raise TypeError(msg)
# Set our Time to Live Flag
self.ttl = self.default_ttl
self.ttl = self.template_args['ttl']['default']
try:
self.ttl = int(ttl)
@ -178,7 +175,8 @@ class NotifyNexmo(NotifyBase):
# Do nothing
pass
if self.ttl < self.ttl_min or self.ttl > self.ttl_max:
if self.ttl < self.template_args['ttl']['min'] or \
self.ttl > self.template_args['ttl']['max']:
msg = 'The Nexmo TTL specified ({}) is out of range.'\
.format(self.ttl)
self.logger.warning(msg)

View File

@ -0,0 +1,294 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CON
import requests
from .NotifyBase import NotifyBase
from ..URLBase import PrivacyMode
from ..common import NotifyType
from ..utils import parse_list
from ..AppriseLocale import gettext_lazy as _
class NotifyNextcloud(NotifyBase):
"""
A wrapper for Nextcloud Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Nextcloud'
# The services URL
service_url = 'https://nextcloud.com/'
# Insecure protocol (for those self hosted requests)
protocol = 'ncloud'
# The default protocol (this is secure for notica)
secure_protocol = 'nclouds'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_nextcloud'
# Nextcloud URL
notify_url = '{schema}://{host}/ocs/v2.php/apps/admin_notifications/' \
'api/v1/notifications/{target}'
# Nextcloud does not support a title
title_maxlen = 255
# Defines the maximum allowable characters per message.
body_maxlen = 4000
# Define object templates
templates = (
'{schema}://{user}:{password}@{host}/{targets}',
'{schema}://{user}:{password}@{host}:{port}/{targets}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'host': {
'name': _('Hostname'),
'type': 'string',
'required': True,
},
'port': {
'name': _('Port'),
'type': 'int',
'min': 1,
'max': 65535,
},
'user': {
'name': _('Username'),
'type': 'string',
},
'password': {
'name': _('Password'),
'type': 'string',
'private': True,
},
'target_user': {
'name': _('Target User'),
'type': 'string',
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
'required': True,
},
})
# Define any kwargs we're using
template_kwargs = {
'headers': {
'name': _('HTTP Header'),
'prefix': '+',
},
}
def __init__(self, targets=None, headers=None, **kwargs):
"""
Initialize Nextcloud Object
"""
super(NotifyNextcloud, self).__init__(**kwargs)
self.targets = parse_list(targets)
if len(self.targets) == 0:
msg = 'At least one Nextcloud target user must be specified.'
self.logger.warning(msg)
raise TypeError(msg)
self.headers = {}
if headers:
# Store our extra headers
self.headers.update(headers)
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Nextcloud Notification
"""
# Prepare our Header
headers = {
'User-Agent': self.app_id,
'OCS-APIREQUEST': 'true',
}
# Apply any/all header over-rides defined
headers.update(self.headers)
# error tracking (used for function return)
has_error = False
# Create a copy of the targets list
targets = list(self.targets)
while len(targets):
target = targets.pop(0)
# Prepare our Payload
payload = {
'shortMessage': title if title else self.app_desc,
}
if body:
# Only store the longMessage if a body was defined; nextcloud
# doesn't take kindly to empty longMessage entries.
payload['longMessage'] = body
auth = None
if self.user:
auth = (self.user, self.password)
notify_url = self.notify_url.format(
schema='https' if self.secure else 'http',
host=self.host if not isinstance(self.port, int)
else '{}:{}'.format(self.host, self.port),
target=target,
)
self.logger.debug('Nextcloud POST URL: %s (cert_verify=%r)' % (
notify_url, self.verify_certificate,
))
self.logger.debug('Nextcloud Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
notify_url,
data=payload,
headers=headers,
auth=auth,
verify=self.verify_certificate,
)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyNextcloud.http_response_code_lookup(
r.status_code)
self.logger.warning(
'Failed to send Nextcloud notification:'
'{}{}error={}.'.format(
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# track our failure
has_error = True
continue
else:
self.logger.info('Sent Nextcloud notification.')
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending Nextcloud '
'notification.',
)
self.logger.debug('Socket Exception: %s' % str(e))
# track our failure
has_error = True
continue
return not has_error
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
}
# Append our headers into our args
args.update({'+{}'.format(k): v for k, v in self.headers.items()})
# Determine Authentication
auth = ''
if self.user and self.password:
auth = '{user}:{password}@'.format(
user=NotifyNextcloud.quote(self.user, safe=''),
password=self.pprint(
self.password, privacy, mode=PrivacyMode.Secret, safe=''),
)
elif self.user:
auth = '{user}@'.format(
user=NotifyNextcloud.quote(self.user, safe=''),
)
default_port = 443 if self.secure else 80
return '{schema}://{auth}{hostname}{port}/{targets}?{args}' \
.format(
schema=self.secure_protocol
if self.secure else self.protocol,
auth=auth,
hostname=NotifyNextcloud.quote(self.host, safe=''),
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
targets='/'.join([NotifyNextcloud.quote(x)
for x in self.targets]),
args=NotifyNextcloud.urlencode(args),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Fetch our targets
results['targets'] = \
NotifyNextcloud.split_path(results['fullpath'])
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifyNextcloud.parse_list(results['qsd']['to'])
# Add our headers that the user can potentially over-ride if they
# wish to to our returned result set
results['headers'] = results['qsd-']
results['headers'].update(results['qsd+'])
return results

View File

@ -147,6 +147,19 @@ class NotifyPushBullet(NotifyBase):
# We need to upload our payload first so that we can source it
# in remaining messages
for attachment in attach:
# Perform some simple error checking
if not attachment:
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
attachment.url(privacy=True)))
return False
self.logger.debug(
'Preparing PushBullet attachment {}'.format(
attachment.url(privacy=True)))
# prepare payload
payload = {
'file_name': attachment.name,
@ -253,7 +266,7 @@ class NotifyPushBullet(NotifyBase):
continue
self.logger.info(
'Sent PushBullet attachment (%s) to "%s".' % (
'Sent PushBullet attachment ({}) to "{}".'.format(
attach_payload['file_name'], recipient))
return not has_error

View File

@ -0,0 +1,832 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# We use io because it allows us to test the open() call
import io
import base64
import requests
from json import loads
from .NotifyBase import NotifyBase
from ..common import NotifyType
from ..utils import parse_list
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
class PushSaferSound(object):
"""
Defines all of the supported PushSafe sounds
"""
# Silent
SILENT = 0
# Ahem (IM)
AHEM = 1
# Applause (Mail)
APPLAUSE = 2
# Arrow (Reminder)
ARROW = 3
# Baby (SMS)
BABY = 4
# Bell (Alarm)
BELL = 5
# Bicycle (Alarm2)
BICYCLE = 6
# Boing (Alarm3)
BOING = 7
# Buzzer (Alarm4)
BUZZER = 8
# Camera (Alarm5)
CAMERA = 9
# Car Horn (Alarm6)
CAR_HORN = 10
# Cash Register (Alarm7)
CASH_REGISTER = 11
# Chime (Alarm8)
CHIME = 12
# Creaky Door (Alarm9)
CREAKY_DOOR = 13
# Cuckoo Clock (Alarm10)
CUCKOO_CLOCK = 14
# Disconnect (Call)
DISCONNECT = 15
# Dog (Call2)
DOG = 16
# Doorbell (Call3)
DOORBELL = 17
# Fanfare (Call4)
FANFARE = 18
# Gun Shot (Call5)
GUN_SHOT = 19
# Honk (Call6)
HONK = 20
# Jaw Harp (Call7)
JAW_HARP = 21
# Morse (Call8)
MORSE = 22
# Electricity (Call9)
ELECTRICITY = 23
# Radio Tuner (Call10)
RADIO_TURNER = 24
# Sirens
SIRENS = 25
# Military Trumpets
MILITARY_TRUMPETS = 26
# Ufo
UFO = 27
# Whah Whah Whah
LONG_WHAH = 28
# Man Saying Goodbye
GOODBYE = 29
# Man Saying Hello
HELLO = 30
# Man Saying No
NO = 31
# Man Saying Ok
OKAY = 32
# Man Saying Ooohhhweee
OOOHHHWEEE = 33
# Man Saying Warning
WARNING = 34
# Man Saying Welcome
WELCOME = 35
# Man Saying Yeah
YEAH = 36
# Man Saying Yes
YES = 37
# Beep short
BEEP1 = 38
# Weeeee short
WEEE = 39
# Cut in and out short
CUTINOUT = 40
# Finger flicking glas short
FLICK_GLASS = 41
# Wa Wa Waaaa short
SHORT_WHAH = 42
# Laser short
LASER = 43
# Wind Chime short
WIND_CHIME = 44
# Echo short
ECHO = 45
# Zipper short
ZIPPER = 46
# HiHat short
HIHAT = 47
# Beep 2 short
BEEP2 = 48
# Beep 3 short
BEEP3 = 49
# Beep 4 short
BEEP4 = 50
# The Alarm is armed
ALARM_ARMED = 51
# The Alarm is disarmed
ALARM_DISARMED = 52
# The Backup is ready
BACKUP_READY = 53
# The Door is closed
DOOR_CLOSED = 54
# The Door is opend
DOOR_OPENED = 55
# The Window is closed
WINDOW_CLOSED = 56
# The Window is open
WINDOW_OPEN = 57
# The Light is off
LIGHT_ON = 58
# The Light is on
LIGHT_OFF = 59
# The Doorbell rings
DOORBELL_RANG = 60
PUSHSAFER_SOUND_MAP = {
# Device Default,
'silent': PushSaferSound.SILENT,
'ahem': PushSaferSound.AHEM,
'applause': PushSaferSound.APPLAUSE,
'arrow': PushSaferSound.ARROW,
'baby': PushSaferSound.BABY,
'bell': PushSaferSound.BELL,
'bicycle': PushSaferSound.BICYCLE,
'bike': PushSaferSound.BICYCLE,
'boing': PushSaferSound.BOING,
'buzzer': PushSaferSound.BUZZER,
'camera': PushSaferSound.CAMERA,
'carhorn': PushSaferSound.CAR_HORN,
'horn': PushSaferSound.CAR_HORN,
'cashregister': PushSaferSound.CASH_REGISTER,
'chime': PushSaferSound.CHIME,
'creakydoor': PushSaferSound.CREAKY_DOOR,
'cuckooclock': PushSaferSound.CUCKOO_CLOCK,
'cuckoo': PushSaferSound.CUCKOO_CLOCK,
'disconnect': PushSaferSound.DISCONNECT,
'dog': PushSaferSound.DOG,
'doorbell': PushSaferSound.DOORBELL,
'fanfare': PushSaferSound.FANFARE,
'gunshot': PushSaferSound.GUN_SHOT,
'honk': PushSaferSound.HONK,
'jawharp': PushSaferSound.JAW_HARP,
'morse': PushSaferSound.MORSE,
'electric': PushSaferSound.ELECTRICITY,
'radiotuner': PushSaferSound.RADIO_TURNER,
'sirens': PushSaferSound.SIRENS,
'militarytrumpets': PushSaferSound.MILITARY_TRUMPETS,
'military': PushSaferSound.MILITARY_TRUMPETS,
'trumpets': PushSaferSound.MILITARY_TRUMPETS,
'ufo': PushSaferSound.UFO,
'whahwhah': PushSaferSound.LONG_WHAH,
'whah': PushSaferSound.SHORT_WHAH,
'goodye': PushSaferSound.GOODBYE,
'hello': PushSaferSound.HELLO,
'no': PushSaferSound.NO,
'okay': PushSaferSound.OKAY,
'ok': PushSaferSound.OKAY,
'ooohhhweee': PushSaferSound.OOOHHHWEEE,
'warn': PushSaferSound.WARNING,
'warning': PushSaferSound.WARNING,
'welcome': PushSaferSound.WELCOME,
'yeah': PushSaferSound.YEAH,
'yes': PushSaferSound.YES,
'beep': PushSaferSound.BEEP1,
'beep1': PushSaferSound.BEEP1,
'weee': PushSaferSound.WEEE,
'wee': PushSaferSound.WEEE,
'cutinout': PushSaferSound.CUTINOUT,
'flickglass': PushSaferSound.FLICK_GLASS,
'laser': PushSaferSound.LASER,
'windchime': PushSaferSound.WIND_CHIME,
'echo': PushSaferSound.ECHO,
'zipper': PushSaferSound.ZIPPER,
'hihat': PushSaferSound.HIHAT,
'beep2': PushSaferSound.BEEP2,
'beep3': PushSaferSound.BEEP3,
'beep4': PushSaferSound.BEEP4,
'alarmarmed': PushSaferSound.ALARM_ARMED,
'armed': PushSaferSound.ALARM_ARMED,
'alarmdisarmed': PushSaferSound.ALARM_DISARMED,
'disarmed': PushSaferSound.ALARM_DISARMED,
'backupready': PushSaferSound.BACKUP_READY,
'dooropen': PushSaferSound.DOOR_OPENED,
'dopen': PushSaferSound.DOOR_OPENED,
'doorclosed': PushSaferSound.DOOR_CLOSED,
'dclosed': PushSaferSound.DOOR_CLOSED,
'windowopen': PushSaferSound.WINDOW_OPEN,
'wopen': PushSaferSound.WINDOW_OPEN,
'windowclosed': PushSaferSound.WINDOW_CLOSED,
'wclosed': PushSaferSound.WINDOW_CLOSED,
'lighton': PushSaferSound.LIGHT_ON,
'lon': PushSaferSound.LIGHT_ON,
'lightoff': PushSaferSound.LIGHT_OFF,
'loff': PushSaferSound.LIGHT_OFF,
'doorbellrang': PushSaferSound.DOORBELL_RANG,
}
# Priorities
class PushSaferPriority(object):
LOW = -2
MODERATE = -1
NORMAL = 0
HIGH = 1
EMERGENCY = 2
PUSHSAFER_PRIORITIES = (
PushSaferPriority.LOW,
PushSaferPriority.MODERATE,
PushSaferPriority.NORMAL,
PushSaferPriority.HIGH,
PushSaferPriority.EMERGENCY,
)
PUSHSAFER_PRIORITY_MAP = {
# short for 'low'
'low': PushSaferPriority.LOW,
# short for 'medium'
'medium': PushSaferPriority.MODERATE,
# short for 'normal'
'normal': PushSaferPriority.NORMAL,
# short for 'high'
'high': PushSaferPriority.HIGH,
# short for 'emergency'
'emergency': PushSaferPriority.EMERGENCY,
}
# Identify the priority ou want to designate as the fall back
DEFAULT_PRIORITY = "normal"
# Vibrations
class PushSaferVibration(object):
"""
Defines the acceptable vibration settings for notification
"""
# x1
LOW = 1
# x2
NORMAL = 2
# x3
HIGH = 3
# Identify all of the vibrations in one place
PUSHSAFER_VIBRATIONS = (
PushSaferVibration.LOW,
PushSaferVibration.NORMAL,
PushSaferVibration.HIGH,
)
# At this time, the following pictures can be attached to each notification
# at one time. When more are supported, just add their argument below
PICTURE_PARAMETER = (
'p',
'p2',
'p3',
)
# Flag used as a placeholder to sending to all devices
PUSHSAFER_SEND_TO_ALL = 'a'
class NotifyPushSafer(NotifyBase):
"""
A wrapper for PushSafer Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Pushsafer'
# The services URL
service_url = 'https://www.pushsafer.com/'
# The default insecure protocol
protocol = 'psafer'
# The default secure protocol
secure_protocol = 'psafers'
# Number of requests to a allow per second
request_rate_per_sec = 1.2
# The icon ID of 25 looks like a megaphone
default_pushsafer_icon = 25
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_pushsafer'
# Defines the hostname to post content to; since this service supports
# both insecure and secure methods, we set the {schema} just before we
# post the message upstream.
notify_url = '{schema}://www.pushsafer.com/api'
# Define object templates
templates = (
'{schema}://{privatekey}',
'{schema}://{privatekey}/{targets}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'privatekey': {
'name': _('Private Key'),
'type': 'string',
'private': True,
'required': True,
},
'target_device': {
'name': _('Target Device'),
'type': 'string',
'map_to': 'targets',
},
'target_email': {
'name': _('Target Email'),
'type': 'string',
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'priority': {
'name': _('Priority'),
'type': 'choice:int',
'values': PUSHSAFER_PRIORITIES,
},
'sound': {
'name': _('Sound'),
'type': 'choice:string',
'values': PUSHSAFER_SOUND_MAP,
},
'vibration': {
'name': _('Vibration'),
'type': 'choice:int',
'values': PUSHSAFER_VIBRATIONS,
},
'to': {
'alias_of': 'targets',
},
})
def __init__(self, privatekey, targets=None, priority=None, sound=None,
vibration=None, **kwargs):
"""
Initialize PushSafer Object
"""
super(NotifyPushSafer, self).__init__(**kwargs)
#
# Priority
#
try:
# Acquire our priority if we can:
# - We accept both the integer form as well as a string
# representation
self.priority = int(priority)
except TypeError:
# NoneType means use Default; this is an okay exception
self.priority = None
except ValueError:
# Input is a string; attempt to get the lookup from our
# priority mapping
priority = priority.lower().strip()
# This little bit of black magic allows us to match against
# low, lo, l (for low);
# normal, norma, norm, nor, no, n (for normal)
# ... etc
match = next((key for key in PUSHSAFER_PRIORITY_MAP.keys()
if key.startswith(priority)), None) \
if priority else None
# Now test to see if we got a match
if not match:
msg = 'An invalid PushSafer priority ' \
'({}) was specified.'.format(priority)
self.logger.warning(msg)
raise TypeError(msg)
# store our successfully looked up priority
self.priority = PUSHSAFER_PRIORITY_MAP[match]
if self.priority is not None and \
self.priority not in PUSHSAFER_PRIORITY_MAP.values():
msg = 'An invalid PushSafer priority ' \
'({}) was specified.'.format(priority)
self.logger.warning(msg)
raise TypeError(msg)
#
# Sound
#
try:
# Acquire our sound if we can:
# - We accept both the integer form as well as a string
# representation
self.sound = int(sound)
except TypeError:
# NoneType means use Default; this is an okay exception
self.sound = None
except ValueError:
# Input is a string; attempt to get the lookup from our
# sound mapping
sound = sound.lower().strip()
# This little bit of black magic allows us to match against
# against multiple versions of the same string
# ... etc
match = next((key for key in PUSHSAFER_SOUND_MAP.keys()
if key.startswith(sound)), None) \
if sound else None
# Now test to see if we got a match
if not match:
msg = 'An invalid PushSafer sound ' \
'({}) was specified.'.format(sound)
self.logger.warning(msg)
raise TypeError(msg)
# store our successfully looked up sound
self.sound = PUSHSAFER_SOUND_MAP[match]
if self.sound is not None and \
self.sound not in PUSHSAFER_SOUND_MAP.values():
msg = 'An invalid PushSafer sound ' \
'({}) was specified.'.format(sound)
self.logger.warning(msg)
raise TypeError(msg)
#
# Vibration
#
try:
# Use defined integer as is if defined, no further error checking
# is performed
self.vibration = int(vibration)
except TypeError:
# NoneType means use Default; this is an okay exception
self.vibration = None
except ValueError:
msg = 'An invalid PushSafer vibration ' \
'({}) was specified.'.format(vibration)
self.logger.warning(msg)
raise TypeError(msg)
if self.vibration and self.vibration not in PUSHSAFER_VIBRATIONS:
msg = 'An invalid PushSafer vibration ' \
'({}) was specified.'.format(vibration)
self.logger.warning(msg)
raise TypeError(msg)
#
# Private Key (associated with project)
#
self.privatekey = validate_regex(privatekey)
if not self.privatekey:
msg = 'An invalid PushSafer Private Key ' \
'({}) was specified.'.format(privatekey)
self.logger.warning(msg)
raise TypeError(msg)
self.targets = parse_list(targets)
if len(self.targets) == 0:
self.targets = (PUSHSAFER_SEND_TO_ALL, )
return
def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
**kwargs):
"""
Perform PushSafer Notification
"""
# error tracking (used for function return)
has_error = False
# Initialize our list of attachments
attachments = []
if attach:
# We need to upload our payload first so that we can source it
# in remaining messages
for attachment in attach:
# prepare payload
if not attachment:
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
attachment.url(privacy=True)))
return False
if not attachment.mimetype.startswith('image/'):
# Attachment not supported; continue peacefully
self.logger.debug(
'Ignoring unsupported PushSafer attachment {}.'.format(
attachment.url(privacy=True)))
continue
self.logger.debug(
'Posting PushSafer attachment {}'.format(
attachment.url(privacy=True)))
try:
with io.open(attachment.path, 'rb') as f:
# Output must be in a DataURL format (that's what
# PushSafer calls it):
attachment = (
attachment.name,
'data:{};base64,{}'.format(
attachment.mimetype,
base64.b64encode(f.read())))
except (OSError, IOError) as e:
self.logger.warning(
'An I/O error occured while reading {}.'.format(
attachment.name if attachment else 'attachment'))
self.logger.debug('I/O Exception: %s' % str(e))
return False
# Save our pre-prepared payload for attachment posting
attachments.append(attachment)
# Create a copy of the targets list
targets = list(self.targets)
while len(targets):
recipient = targets.pop(0)
# prepare payload
payload = {
't': title,
'm': body,
# Our default icon to use
'i': self.default_pushsafer_icon,
# Notification Color
'c': self.color(notify_type),
# Target Recipient
'd': recipient,
}
if self.sound is not None:
# Only apply sound setting if it was specified
payload['s'] = str(self.sound)
if self.vibration is not None:
# Only apply vibration setting
payload['v'] = str(self.vibration)
if not attachments:
okay, response = self._send(payload)
if not okay:
has_error = True
continue
self.logger.info(
'Sent PushSafer notification to "%s".' % (recipient))
else:
# Create a copy of our payload object
_payload = payload.copy()
for idx in range(
0, len(attachments), len(PICTURE_PARAMETER)):
# Send our attachments to our same user (already prepared
# as our payload object)
for c, attachment in enumerate(
attachments[idx:idx + len(PICTURE_PARAMETER)]):
# Get our attachment information
filename, dataurl = attachment
_payload.update({PICTURE_PARAMETER[c]: dataurl})
self.logger.debug(
'Added attachment (%s) to "%s".' % (
filename, recipient))
okay, response = self._send(_payload)
if not okay:
has_error = True
continue
self.logger.info(
'Sent PushSafer attachment (%s) to "%s".' % (
filename, recipient))
# More then the maximum messages shouldn't cause all of
# the text to loop on future iterations
_payload = payload.copy()
_payload['t'] = ''
_payload['m'] = '...'
return not has_error
def _send(self, payload, **kwargs):
"""
Wrapper to the requests (post) object
"""
headers = {
'User-Agent': self.app_id,
}
# Prepare the notification URL to post to
notify_url = self.notify_url.format(
schema='https' if self.secure else 'http'
)
# Store the payload key
payload['k'] = self.privatekey
self.logger.debug('PushSafer POST URL: %s (cert_verify=%r)' % (
notify_url, self.verify_certificate,
))
self.logger.debug('PushSafer Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
# Default response type
response = None
# Initialize our Pushsafer expected responses
_code = None
_str = 'Unknown'
try:
# Open our attachment path if required:
r = requests.post(
notify_url,
data=payload,
headers=headers,
verify=self.verify_certificate,
)
try:
response = loads(r.content)
_code = response.get('status')
_str = response.get('success', _str) \
if _code == 1 else response.get('error', _str)
except (AttributeError, TypeError, ValueError):
# ValueError = r.content is Unparsable
# TypeError = r.content is None
# AttributeError = r is None
# Fall back to the existing unparsed value
response = r.content
if r.status_code not in (
requests.codes.ok, requests.codes.no_content):
# We had a problem
status_str = \
NotifyPushSafer.http_response_code_lookup(
r.status_code)
self.logger.warning(
'Failed to deliver payload to PushSafer:'
'{}{}error={}.'.format(
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
return False, response
elif _code != 1:
# It's a bit backwards, but:
# 1 is returned if we succeed
# 0 is returned if we fail
self.logger.warning(
'Failed to deliver payload to PushSafer;'
' error={}.'.format(_str))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
return False, response
# otherwise we were successful
return True, response
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured communicating with PushSafer.')
self.logger.debug('Socket Exception: %s' % str(e))
return False, response
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
}
if self.priority is not None:
# Store our priority; but only if it was specified
args['priority'] = \
next((key for key, value in PUSHSAFER_PRIORITY_MAP.items()
if value == self.priority),
DEFAULT_PRIORITY) # pragma: no cover
if self.sound is not None:
# Store our sound; but only if it was specified
args['sound'] = \
next((key for key, value in PUSHSAFER_SOUND_MAP.items()
if value == self.sound), '') # pragma: no cover
if self.vibration is not None:
# Store our vibration; but only if it was specified
args['vibration'] = str(self.vibration)
targets = '/'.join([NotifyPushSafer.quote(x) for x in self.targets])
if targets == PUSHSAFER_SEND_TO_ALL:
# keyword is reserved for internal usage only; it's safe to remove
# it from the recipients list
targets = ''
return '{schema}://{privatekey}/{targets}?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
privatekey=self.pprint(self.privatekey, privacy, safe=''),
targets=targets,
args=NotifyPushSafer.urlencode(args))
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Fetch our targets
results['targets'] = \
NotifyPushSafer.split_path(results['fullpath'])
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifyPushSafer.parse_list(results['qsd']['to'])
# Setup the token; we store it in Private Key for global
# plugin consistency with naming conventions
results['privatekey'] = NotifyPushSafer.unquote(results['host'])
if 'priority' in results['qsd'] and len(results['qsd']['priority']):
results['priority'] = \
NotifyPushSafer.unquote(results['qsd']['priority'])
if 'sound' in results['qsd'] and len(results['qsd']['sound']):
results['sound'] = \
NotifyPushSafer.unquote(results['qsd']['sound'])
if 'vibration' in results['qsd'] and len(results['qsd']['vibration']):
results['vibration'] = \
NotifyPushSafer.unquote(results['qsd']['vibration'])
return results

View File

@ -68,7 +68,7 @@ class NotifyPushed(NotifyBase):
title_maxlen = 0
# The maximum allowable characters allowed in the body per message
body_maxlen = 140
body_maxlen = 160
# Define object templates
templates = (

View File

@ -32,6 +32,7 @@ from ..common import NotifyType
from ..utils import parse_list
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
from ..attachment.AttachBase import AttachBase
# Flag used as a placeholder to sending to all devices
PUSHOVER_SEND_TO_ALL = 'ALL_DEVICES'
@ -140,6 +141,14 @@ class NotifyPushover(NotifyBase):
# Default Pushover sound
default_pushover_sound = PushoverSound.PUSHOVER
# 2.5MB is the maximum supported image filesize as per documentation
# here: https://pushover.net/api#attachments (Dec 26th, 2019)
attach_max_size_bytes = 2621440
# The regular expression of the current attachment supported mime types
# At this time it is only images
attach_supported_mime_type = r'^image/.*'
# Define object templates
templates = (
'{schema}://{user_key}@{token}',
@ -281,17 +290,12 @@ class NotifyPushover(NotifyBase):
raise TypeError(msg)
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
**kwargs):
"""
Perform Pushover Notification
"""
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/x-www-form-urlencoded'
}
auth = (self.token, '')
# error tracking (used for function return)
has_error = False
@ -314,7 +318,7 @@ class NotifyPushover(NotifyBase):
'token': self.token,
'user': self.user_key,
'priority': str(self.priority),
'title': title,
'title': title if title else self.app_desc,
'message': body,
'device': device,
'sound': self.sound,
@ -323,60 +327,162 @@ class NotifyPushover(NotifyBase):
if self.priority == PushoverPriority.EMERGENCY:
payload.update({'retry': self.retry, 'expire': self.expire})
self.logger.debug('Pushover POST URL: %s (cert_verify=%r)' % (
self.notify_url, self.verify_certificate,
))
self.logger.debug('Pushover Payload: %s' % str(payload))
if attach:
# Create a copy of our payload
_payload = payload.copy()
# Always call throttle before any remote server i/o is made
self.throttle()
# Send with attachments
for attachment in attach:
# Simple send
if not self._send(_payload, attachment):
# Mark our failure
has_error = True
# clean exit from our attachment loop
break
try:
r = requests.post(
self.notify_url,
data=payload,
headers=headers,
auth=auth,
verify=self.verify_certificate,
)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyPushover.http_response_code_lookup(
r.status_code, PUSHOVER_HTTP_ERROR_MAP)
self.logger.warning(
'Failed to send Pushover notification to {}: '
'{}{}error={}.'.format(
device,
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# To handle multiple attachments, clean up our message
_payload['title'] = '...'
_payload['message'] = attachment.name
# No need to alarm for each consecutive attachment uploaded
# afterwards
_payload['sound'] = PushoverSound.NONE
else:
# Simple send
if not self._send(payload):
# Mark our failure
has_error = True
continue
else:
self.logger.info(
'Sent Pushover notification to %s.' % device)
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending Pushover:%s ' % (
device) + 'notification.'
)
self.logger.debug('Socket Exception: %s' % str(e))
# Mark our failure
has_error = True
continue
return not has_error
def _send(self, payload, attach=None):
"""
Wrapper to the requests (post) object
"""
if isinstance(attach, AttachBase):
# Perform some simple error checking
if not attach:
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
attach.url(privacy=True)))
return False
# Perform some basic checks as we want to gracefully skip
# over unsupported mime types.
if not re.match(
self.attach_supported_mime_type,
attach.mimetype,
re.I):
# No problem; we just don't support this attachment
# type; gracefully move along
self.logger.debug(
'Ignored unsupported Pushover attachment ({}): {}'
.format(
attach.mimetype,
attach.url(privacy=True)))
return True
# If we get here, we're dealing with a supported image.
# Verify that the filesize is okay though.
file_size = len(attach)
if not (file_size > 0
and file_size <= self.attach_max_size_bytes):
# File size is no good
self.logger.warning(
'Pushover attachment size ({}B) exceeds limit: {}'
.format(file_size, attach.url(privacy=True)))
return False
self.logger.debug(
'Posting Pushover attachment {}'.format(
attach.url(privacy=True)))
# Default Header
headers = {
'User-Agent': self.app_id,
}
# Authentication
auth = (self.token, '')
# Some default values for our request object to which we'll update
# depending on what our payload is
files = None
self.logger.debug('Pushover POST URL: %s (cert_verify=%r)' % (
self.notify_url, self.verify_certificate,
))
self.logger.debug('Pushover Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
# Open our attachment path if required:
if attach:
files = {'attachment': (attach.name, open(attach.path, 'rb'))}
r = requests.post(
self.notify_url,
data=payload,
headers=headers,
files=files,
auth=auth,
verify=self.verify_certificate,
)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyPushover.http_response_code_lookup(
r.status_code, PUSHOVER_HTTP_ERROR_MAP)
self.logger.warning(
'Failed to send Pushover notification to {}: '
'{}{}error={}.'.format(
payload['device'],
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
return False
else:
self.logger.info(
'Sent Pushover notification to %s.' % payload['device'])
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending Pushover:%s ' % (
payload['device']) + 'notification.'
)
self.logger.debug('Socket Exception: %s' % str(e))
return False
except (OSError, IOError) as e:
self.logger.warning(
'An I/O error occured while reading {}.'.format(
attach.name if attach else 'attachment'))
self.logger.debug('I/O Exception: %s' % str(e))
return False
finally:
# Close our file (if it's open) stored in the second element
# of our files tuple (index 1)
if files:
files['attachment'][1].close()
return True
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.

View File

@ -89,7 +89,7 @@ class NotifySNS(NotifyBase):
# The maximum length of the body
# Source: https://docs.aws.amazon.com/sns/latest/api/API_Publish.html
body_maxlen = 140
body_maxlen = 160
# A title can not be used for SMS Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.

View File

@ -0,0 +1,476 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# To use this service you will need a Sinch account to which you can get your
# API_TOKEN and SERVICE_PLAN_ID right from your console/dashboard at:
# https://dashboard.sinch.com/sms/overview
#
# You will also need to send the SMS From a phone number or account id name.
# This is identified as the source (or where the SMS message will originate
# from). Activated phone numbers can be found on your dashboard here:
# - https://dashboard.sinch.com/numbers/your-numbers/numbers
#
import re
import six
import requests
import json
from .NotifyBase import NotifyBase
from ..URLBase import PrivacyMode
from ..common import NotifyType
from ..utils import parse_list
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
# Some Phone Number Detection
IS_PHONE_NO = re.compile(r'^\+?(?P<phone>[0-9\s)(+-]+)\s*$')
class SinchRegion(object):
"""
Defines the Sinch Server Regions
"""
USA = 'us'
EUROPE = 'eu'
# Used for verification purposes
SINCH_REGIONS = (SinchRegion.USA, SinchRegion.EUROPE)
class NotifySinch(NotifyBase):
"""
A wrapper for Sinch Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Sinch'
# The services URL
service_url = 'https://sinch.com/'
# All notification requests are secure
secure_protocol = 'sinch'
# Allow 300 requests per minute.
# 60/300 = 0.2
request_rate_per_sec = 0.20
# the number of seconds undelivered messages should linger for
# in the Sinch queue
validity_period = 14400
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_sinch'
# Sinch uses the http protocol with JSON requests
# - the 'spi' gets substituted with the Service Provider ID
# provided as part of the Apprise URL.
notify_url = 'https://{region}.sms.api.sinch.com/xms/v1/{spi}/batches'
# The maximum length of the body
body_maxlen = 160
# A title can not be used for SMS Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.
title_maxlen = 0
# Define object templates
templates = (
'{schema}://{service_plan_id}:{api_token}@{from_phone}',
'{schema}://{service_plan_id}:{api_token}@{from_phone}/{targets}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'service_plan_id': {
'name': _('Account SID'),
'type': 'string',
'private': True,
'required': True,
'regex': (r'^[a-f0-9]+$', 'i'),
},
'api_token': {
'name': _('Auth Token'),
'type': 'string',
'private': True,
'required': True,
'regex': (r'^[a-f0-9]+$', 'i'),
},
'from_phone': {
'name': _('From Phone No'),
'type': 'string',
'required': True,
'regex': (r'^\+?[0-9\s)(+-]+$', 'i'),
'map_to': 'source',
},
'target_phone': {
'name': _('Target Phone No'),
'type': 'string',
'prefix': '+',
'regex': (r'^[0-9\s)(+-]+$', 'i'),
'map_to': 'targets',
},
'short_code': {
'name': _('Target Short Code'),
'type': 'string',
'regex': (r'^[0-9]{5,6}$', 'i'),
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'to': {
'alias_of': 'targets',
},
'from': {
'alias_of': 'from_phone',
},
'spi': {
'alias_of': 'service_plan_id',
},
'region': {
'name': _('Region'),
'type': 'string',
'regex': (r'^[a-z]{2}$', 'i'),
'default': SinchRegion.USA,
},
'token': {
'alias_of': 'api_token',
},
})
def __init__(self, service_plan_id, api_token, source, targets=None,
region=None, **kwargs):
"""
Initialize Sinch Object
"""
super(NotifySinch, self).__init__(**kwargs)
# The Account SID associated with the account
self.service_plan_id = validate_regex(
service_plan_id, *self.template_tokens['service_plan_id']['regex'])
if not self.service_plan_id:
msg = 'An invalid Sinch Account SID ' \
'({}) was specified.'.format(service_plan_id)
self.logger.warning(msg)
raise TypeError(msg)
# The Authentication Token associated with the account
self.api_token = validate_regex(
api_token, *self.template_tokens['api_token']['regex'])
if not self.api_token:
msg = 'An invalid Sinch Authentication Token ' \
'({}) was specified.'.format(api_token)
self.logger.warning(msg)
raise TypeError(msg)
# The Source Phone # and/or short-code
self.source = source
if not IS_PHONE_NO.match(self.source):
msg = 'The Account (From) Phone # or Short-code specified ' \
'({}) is invalid.'.format(source)
self.logger.warning(msg)
raise TypeError(msg)
# Setup our region
self.region = self.template_args['region']['default'] \
if not isinstance(region, six.string_types) else region.lower()
if self.region and self.region not in SINCH_REGIONS:
msg = 'The region specified ({}) is invalid.'.format(region)
self.logger.warning(msg)
raise TypeError(msg)
# Tidy source
self.source = re.sub(r'[^\d]+', '', self.source)
if len(self.source) < 11 or len(self.source) > 14:
# A short code is a special 5 or 6 digit telephone number
# that's shorter than a full phone number.
if len(self.source) not in (5, 6):
msg = 'The Account (From) Phone # specified ' \
'({}) is invalid.'.format(source)
self.logger.warning(msg)
raise TypeError(msg)
# else... it as a short code so we're okay
else:
# We're dealing with a phone number; so we need to just
# place a plus symbol at the end of it
self.source = '+{}'.format(self.source)
# Parse our targets
self.targets = list()
for target in parse_list(targets):
# Validate targets and drop bad ones:
result = IS_PHONE_NO.match(target)
if result:
# Further check our phone # for it's digit count
# if it's less than 10, then we can assume it's
# a poorly specified phone no and spit a warning
result = ''.join(re.findall(r'\d+', result.group('phone')))
if len(result) < 11 or len(result) > 14:
self.logger.warning(
'Dropped invalid phone # '
'({}) specified.'.format(target),
)
continue
# store valid phone number
self.targets.append('+{}'.format(result))
continue
self.logger.warning(
'Dropped invalid phone # '
'({}) specified.'.format(target),
)
if not self.targets:
if len(self.source) in (5, 6):
# raise a warning since we're a short-code. We need
# a number to message
msg = 'There are no valid Sinch targets to notify.'
self.logger.warning(msg)
raise TypeError(msg)
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Sinch Notification
"""
# error tracking (used for function return)
has_error = False
# Prepare our headers
headers = {
'User-Agent': self.app_id,
'Authorization': 'Bearer {}'.format(self.api_token),
'Content-Type': 'application/json',
}
# Prepare our payload
payload = {
'body': body,
'from': self.source,
# The To gets populated in the loop below
'to': None,
}
# Prepare our Sinch URL (spi = Service Provider ID)
url = self.notify_url.format(
region=self.region, spi=self.service_plan_id)
# Create a copy of the targets list
targets = list(self.targets)
if len(targets) == 0:
# No sources specified, use our own phone no
targets.append(self.source)
while len(targets):
# Get our target to notify
target = targets.pop(0)
# Prepare our user
payload['to'] = [target]
# Some Debug Logging
self.logger.debug('Sinch POST URL: {} (cert_verify={})'.format(
url, self.verify_certificate))
self.logger.debug('Sinch Payload: {}' .format(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
url,
data=json.dumps(payload),
headers=headers,
verify=self.verify_certificate,
)
# The responsne might look like:
# {
# "id": "CJloRJOe3MtDITqx",
# "to": ["15551112222"],
# "from": "15553334444",
# "canceled": false,
# "body": "This is a test message from your Sinch account",
# "type": "mt_text",
# "created_at": "2020-01-14T01:05:20.694Z",
# "modified_at": "2020-01-14T01:05:20.694Z",
# "delivery_report": "none",
# "expire_at": "2020-01-17T01:05:20.694Z",
# "flash_message": false
# }
if r.status_code not in (
requests.codes.created, requests.codes.ok):
# We had a problem
status_str = \
NotifyBase.http_response_code_lookup(r.status_code)
# set up our status code to use
status_code = r.status_code
try:
# Update our status response if we can
json_response = json.loads(r.content)
status_code = json_response.get('code', status_code)
status_str = json_response.get('message', status_str)
except (AttributeError, TypeError, ValueError):
# ValueError = r.content is Unparsable
# TypeError = r.content is None
# AttributeError = r is None
# We could not parse JSON response.
# We will just use the status we already have.
pass
self.logger.warning(
'Failed to send Sinch notification to {}: '
'{}{}error={}.'.format(
target,
status_str,
', ' if status_str else '',
status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# Mark our failure
has_error = True
continue
else:
self.logger.info(
'Sent Sinch notification to {}.'.format(target))
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending Sinch:%s ' % (
target) + 'notification.'
)
self.logger.debug('Socket Exception: %s' % str(e))
# Mark our failure
has_error = True
continue
return not has_error
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
'region': self.region,
}
return '{schema}://{spi}:{token}@{source}/{targets}/?{args}'.format(
schema=self.secure_protocol,
spi=self.pprint(
self.service_plan_id, privacy, mode=PrivacyMode.Tail, safe=''),
token=self.pprint(self.api_token, privacy, safe=''),
source=NotifySinch.quote(self.source, safe=''),
targets='/'.join(
[NotifySinch.quote(x, safe='') for x in self.targets]),
args=NotifySinch.urlencode(args))
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# Get our entries; split_path() looks after unquoting content for us
# by default
results['targets'] = NotifySinch.split_path(results['fullpath'])
# The hostname is our source number
results['source'] = NotifySinch.unquote(results['host'])
# Get our service_plan_ide and api_token from the user/pass config
results['service_plan_id'] = NotifySinch.unquote(results['user'])
results['api_token'] = NotifySinch.unquote(results['password'])
# Auth Token
if 'token' in results['qsd'] and len(results['qsd']['token']):
# Extract the account spi from an argument
results['api_token'] = \
NotifySinch.unquote(results['qsd']['token'])
# Account SID
if 'spi' in results['qsd'] and len(results['qsd']['spi']):
# Extract the account spi from an argument
results['service_plan_id'] = \
NotifySinch.unquote(results['qsd']['spi'])
# Support the 'from' and 'source' variable so that we can support
# targets this way too.
# The 'from' makes it easier to use yaml configuration
if 'from' in results['qsd'] and len(results['qsd']['from']):
results['source'] = \
NotifySinch.unquote(results['qsd']['from'])
if 'source' in results['qsd'] and len(results['qsd']['source']):
results['source'] = \
NotifySinch.unquote(results['qsd']['source'])
# Allow one to define a region
if 'region' in results['qsd'] and len(results['qsd']['region']):
results['region'] = \
NotifySinch.unquote(results['qsd']['region'])
# Support the 'to' variable so that we can support targets this way too
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifySinch.parse_list(results['qsd']['to'])
return results

View File

@ -435,8 +435,18 @@ class NotifySlack(NotifyBase):
if attach and self.mode is SlackMode.BOT and attach_channel_list:
# Send our attachments (can only be done in bot mode)
for attachment in attach:
self.logger.info(
'Posting Slack Attachment {}'.format(attachment.name))
# Perform some simple error checking
if not attachment:
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
attachment.url(privacy=True)))
return False
self.logger.debug(
'Posting Slack attachment {}'.format(
attachment.url(privacy=True)))
# Prepare API Upload Payload
_payload = {
@ -515,25 +525,29 @@ class NotifySlack(NotifyBase):
'Response Details:\r\n{}'.format(r.content))
return False
try:
response = loads(r.content)
elif attach:
# Attachment posts return a JSON string
try:
response = loads(r.content)
except (AttributeError, TypeError, ValueError):
# ValueError = r.content is Unparsable
# TypeError = r.content is None
# AttributeError = r is None
pass
except (AttributeError, TypeError, ValueError):
# ValueError = r.content is Unparsable
# TypeError = r.content is None
# AttributeError = r is None
pass
if not (response and response.get('ok', True)):
# Bare minimum requirements not met
self.logger.warning(
'Failed to send {}to Slack: error={}.'.format(
attach.name if attach else '',
r.status_code))
if not (response and response.get('ok', True)):
# Bare minimum requirements not met
self.logger.warning(
'Failed to send {}to Slack: error={}.'.format(
attach.name if attach else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
return False
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
return False
else:
response = r.content
# Message Post Response looks like this:
# {

View File

@ -267,15 +267,22 @@ class NotifyTelegram(NotifyBase):
path = None
if isinstance(attach, AttachBase):
if not attach:
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
attach.url(privacy=True)))
return False
self.logger.debug(
'Posting Telegram attachment {}'.format(
attach.url(privacy=True)))
# Store our path to our file
path = attach.path
file_name = attach.name
mimetype = attach.mimetype
if not path:
# Could not load attachment
return False
# Process our attachment
function_name, key = \
next(((x['function_name'], x['key']) for x in self.mime_lookup
@ -639,10 +646,10 @@ class NotifyTelegram(NotifyBase):
if attach:
# Send our attachments now (if specified and if it exists)
for attachment in attach:
sent_attachment = self.send_media(
payload['chat_id'], notify_type, attach=attachment)
if not self.send_media(
payload['chat_id'], notify_type,
attach=attachment):
if not sent_attachment:
# We failed; don't continue
has_error = True
break

View File

@ -23,7 +23,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# To use this service you will need a Twillio account to which you can get your
# To use this service you will need a Twilio account to which you can get your
# AUTH_TOKEN and ACCOUNT SID right from your console/dashboard at:
# https://www.twilio.com/console
#
@ -67,7 +67,7 @@ class NotifyTwilio(NotifyBase):
# The services URL
service_url = 'https://www.twilio.com/'
# All pushover requests are secure
# All notification requests are secure
secure_protocol = 'twilio'
# Allow 300 requests per minute.
@ -86,7 +86,7 @@ class NotifyTwilio(NotifyBase):
'{sid}/Messages.json'
# The maximum length of the body
body_maxlen = 140
body_maxlen = 160
# A title can not be used for SMS Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.

View File

@ -217,9 +217,16 @@ def _sanitize_token(tokens, default_delimiter):
and 'default' not in tokens[key] \
and 'values' in tokens[key] \
and len(tokens[key]['values']) == 1:
# If there is only one choice; then make it the default
tokens[key]['default'] = \
tokens[key]['values'][0]
# - support dictionaries too
tokens[key]['default'] = tokens[key]['values'][0] \
if not isinstance(tokens[key]['values'], dict) \
else next(iter(tokens[key]['values']))
if 'values' in tokens[key] and isinstance(tokens[key]['values'], dict):
# Convert values into a list if it was defined as a dictionary
tokens[key]['values'] = [k for k in tokens[key]['values'].keys()]
if 'regex' in tokens[key]:
# Verify that we are a tuple; convert strings to tuples

View File

@ -327,6 +327,7 @@ class OpenSubtitlesProvider(ProviderRetryMixin, _OpenSubtitlesProvider):
hash, movie_name, movie_release_name, movie_year, movie_imdb_id,
series_season, series_episode, query_parameters, filename, encoding,
movie_fps, skip_wrong_fps=self.skip_wrong_fps)
subtitle.uploader = _subtitle_item['UserNickName'] if _subtitle_item['UserNickName'] else 'anonymous'
logger.debug('Found subtitle %r by %s', subtitle, matched_by)
subtitles.append(subtitle)

View File

@ -30,11 +30,11 @@ class SubdivxSubtitle(Subtitle):
provider_name = 'subdivx'
hash_verifiable = False
def __init__(self, language, page_link, description, title):
super(SubdivxSubtitle, self).__init__(language, hearing_impaired=False,
page_link=page_link)
self.description = description.lower()
def __init__(self, language, page_link, title, description, uploader):
super(SubdivxSubtitle, self).__init__(language, hearing_impaired=False, page_link=page_link)
self.title = title
self.description = description
self.uploader = uploader
@property
def id(self):
@ -120,7 +120,7 @@ class SubdivxSubtitlesProvider(Provider):
query += ' {:4d}'.format(year)
params = {
'buscar': query, # search string
'q': query, # search string
'accion': 5, # action search
'oxdown': 1, # order by downloads descending
'pg': 1 # page 1
@ -131,7 +131,7 @@ class SubdivxSubtitlesProvider(Provider):
language = self.language_list[0]
search_link = self.server_url + 'index.php'
while True:
response = self.session.get(search_link, params=params, timeout=10)
response = self.session.get(search_link, params=params, timeout=20)
self._check_response(response)
try:
@ -142,11 +142,11 @@ class SubdivxSubtitlesProvider(Provider):
subtitles += page_subtitles
if len(page_subtitles) >= 20:
params['pg'] += 1 # search next page
time.sleep(self.multi_result_throttle)
else:
break
if len(page_subtitles) < 20:
break # this is the last page
params['pg'] += 1 # search next page
time.sleep(self.multi_result_throttle)
return subtitles
@ -204,13 +204,17 @@ class SubdivxSubtitlesProvider(Provider):
title_soup, body_soup = title_soups[subtitle], body_soups[subtitle]
# title
title = title_soup.find("a").text.replace("Subtitulo de ", "")
page_link = title_soup.find("a")["href"].replace('http://', 'https://')
title = title_soup.find("a").text.replace("Subtitulos de ", "")
page_link = title_soup.find("a")["href"]
# body
# description
description = body_soup.find("div", {'id': 'buscador_detalle_sub'}).text
description = description.replace(",", " ").lower()
subtitle = self.subtitle_class(language, page_link, description, title)
# uploader
uploader = body_soup.find("a", {'class': 'link1'}).text
subtitle = self.subtitle_class(language, page_link, title, description, uploader)
logger.debug('Found subtitle %r', subtitle)
subtitles.append(subtitle)
@ -218,7 +222,7 @@ class SubdivxSubtitlesProvider(Provider):
return subtitles
def _get_download_link(self, subtitle):
response = self.session.get(subtitle.page_link, timeout=10)
response = self.session.get(subtitle.page_link, timeout=20)
self._check_response(response)
try:
page_soup = ParserBeautifulSoup(response.content.decode('iso-8859-1', 'ignore'), ['lxml', 'html.parser'])
@ -226,12 +230,10 @@ class SubdivxSubtitlesProvider(Provider):
for link_soup in links_soup:
if link_soup['href'].startswith('bajar'):
return self.server_url + link_soup['href']
links_soup = page_soup.find_all ("a", {'class': 'link1'})
links_soup = page_soup.find_all("a", {'class': 'link1'})
for link_soup in links_soup:
if "bajar.php" in link_soup['href']:
# not using link_soup['href'] directly because it's http://
dl_link = urlparse(link_soup['href'])
return self.server_url + dl_link.path + '?' + dl_link.query
return link_soup['href']
except Exception as e:
raise APIThrottled('Error parsing download link: ' + str(e))

View File

@ -132,7 +132,7 @@ class SubsSabBzProvider(Provider):
logger.debug('No subtitles found')
return subtitles
soup = BeautifulSoup(response.content, 'html.parser')
soup = BeautifulSoup(response.content, 'lxml')
rows = soup.findAll('tr', {'class': 'subs-row'})
# Search on first 20 rows only
@ -142,9 +142,13 @@ class SubsSabBzProvider(Provider):
element = a_element_wrapper.find('a')
if element:
link = element.get('href')
element = row.find('a', href = re.compile(r'.*showuser=.*'))
uploader = element.get_text() if element else None
logger.info('Found subtitle link %r', link)
subtitles = subtitles + self.download_archive_and_add_subtitle_files(link, language, video)
sub = self.download_archive_and_add_subtitle_files(link, language, video)
for s in sub:
s.uploader = uploader
subtitles = subtitles + sub
return subtitles
def list_subtitles(self, video, languages):

View File

@ -135,17 +135,23 @@ class SubsUnacsProvider(Provider):
logger.debug('No subtitles found')
return subtitles
soup = BeautifulSoup(response.content, 'html.parser')
rows = soup.findAll('td', {'class': 'tdMovie'})
soup = BeautifulSoup(response.content, 'lxml')
rows = soup.findAll('tr', onmouseover=True)
# Search on first 20 rows only
for row in rows[:20]:
element = row.find('a', {'class': 'tooltip'})
if element:
link = element.get('href')
logger.info('Found subtitle link %r', link)
subtitles = subtitles + self.download_archive_and_add_subtitle_files('https://subsunacs.net' + link, language, video)
a_element_wrapper = row.find('td', {'class': 'tdMovie'})
if a_element_wrapper:
element = a_element_wrapper.find('a', {'class': 'tooltip'})
if element:
link = element.get('href')
element = row.find('a', href = re.compile(r'.*/search\.php\?t=1\&memid=.*'))
uploader = element.get_text() if element else None
logger.info('Found subtitle link %r', link)
sub = self.download_archive_and_add_subtitle_files('https://subsunacs.net' + link, language, video)
for s in sub:
s.uploader = uploader
subtitles = subtitles + sub
return subtitles
def list_subtitles(self, video, languages):
@ -165,11 +171,16 @@ class SubsUnacsProvider(Provider):
subtitles = []
type = 'episode' if isinstance(video, Episode) else 'movie'
for file_name in archiveStream.namelist():
if file_name.lower().endswith(('.srt', '.sub')):
if file_name.lower().endswith(('.srt', '.sub', '.txt')):
file_is_txt = True if file_name.lower().endswith('.txt') else False
if file_is_txt and re.search(r'subsunacs\.net|танете част|прочети|^read ?me|procheti', file_name, re.I):
logger.info('Ignore readme txt file %r', file_name)
continue
logger.info('Found subtitle file %r', file_name)
subtitle = SubsUnacsSubtitle(language, file_name, type, video, link)
subtitle.content = archiveStream.read(file_name)
subtitles.append(subtitle)
if file_is_txt == False or subtitle.is_valid():
subtitles.append(subtitle)
return subtitles
def download_archive_and_add_subtitle_files(self, link, language, video ):

View File

@ -0,0 +1,207 @@
# -*- coding: utf-8 -*-
import io
import logging
import os
import zipfile
from babelfish import Language
from guessit import guessit
from requests import Session
from subliminal_patch.providers import Provider
from subliminal_patch.subtitle import Subtitle
from subliminal.subtitle import fix_line_ending
from subliminal import __short_version__
from subliminal.cache import SHOW_EXPIRATION_TIME, region
from subliminal_patch.exceptions import ProviderError
from subliminal_patch.subtitle import guess_matches
from subliminal_patch.utils import sanitize
from subliminal.video import Episode, Movie
logger = logging.getLogger(__name__)
class WizdomSubtitle(Subtitle):
"""Wizdom Subtitle."""
provider_name = 'wizdom'
def __init__(self, language, hearing_impaired, page_link, series, season, episode, title, imdb_id, subtitle_id,
release):
super(WizdomSubtitle, self).__init__(language, hearing_impaired, page_link)
self.series = series
self.season = season
self.episode = episode
self.title = title
self.imdb_id = imdb_id
self.subtitle_id = subtitle_id
self.release = release
@property
def id(self):
return str(self.subtitle_id)
def get_matches(self, video):
matches = set()
# episode
if isinstance(video, Episode):
# series
if video.series and (sanitize(self.title) in (
sanitize(name) for name in [video.series] + video.alternative_series)):
matches.add('series')
# season
if video.season and self.season == video.season:
matches.add('season')
# episode
if video.episode and self.episode == video.episode:
matches.add('episode')
# imdb_id
if video.series_imdb_id and self.imdb_id == video.series_imdb_id:
matches.add('series_imdb_id')
# guess
matches |= guess_matches(video, guessit(self.release, {'type': 'episode'}), partial=True)
# movie
elif isinstance(video, Movie):
# guess
matches |= guess_matches(video, guessit(self.release, {'type': 'movie'}), partial=True)
# title
if video.title and (sanitize(self.title) in (
sanitize(name) for name in [video.title] + video.alternative_titles)):
matches.add('title')
return matches
class WizdomProvider(Provider):
"""Wizdom Provider."""
languages = {Language.fromalpha2(l) for l in ['he']}
server_url = 'wizdom.xyz'
_tmdb_api_key = 'a51ee051bcd762543373903de296e0a3'
def __init__(self):
self.session = None
def initialize(self):
self.session = Session()
self.session.headers['User-Agent'] = 'Subliminal/{}'.format(__short_version__)
def terminate(self):
self.session.close()
@region.cache_on_arguments(expiration_time=SHOW_EXPIRATION_TIME)
def _search_imdb_id(self, title, year, is_movie):
"""Search the IMDB ID for the given `title` and `year`.
:param str title: title to search for.
:param int year: year to search for (or 0 if not relevant).
:param bool is_movie: If True, IMDB ID will be searched for in TMDB instead of Wizdom.
:return: the IMDB ID for the given title and year (or None if not found).
:rtype: str
"""
# make the search
logger.info('Searching IMDB ID for %r%r', title, '' if not year else ' ({})'.format(year))
category = 'movie' if is_movie else 'tv'
title = title.replace('\'', '')
# get TMDB ID first
r = self.session.get('http://api.tmdb.org/3/search/{}?api_key={}&query={}{}&language=en'.format(
category, self._tmdb_api_key, title, '' if not year else '&year={}'.format(year)))
r.raise_for_status()
tmdb_results = r.json().get('results')
if tmdb_results:
tmdb_id = tmdb_results[0].get('id')
if tmdb_id:
# get actual IMDB ID from TMDB
r = self.session.get('http://api.tmdb.org/3/{}/{}{}?api_key={}&language=en'.format(
category, tmdb_id, '' if is_movie else '/external_ids', self._tmdb_api_key))
r.raise_for_status()
return str(r.json().get('imdb_id', '')) or None
return None
def query(self, title, season=None, episode=None, year=None, filename=None, imdb_id=None):
# search for the IMDB ID if needed.
is_movie = not (season and episode)
imdb_id = imdb_id or self._search_imdb_id(title, year, is_movie)
if not imdb_id:
return {}
# search
logger.debug('Using IMDB ID %r', imdb_id)
url = 'http://json.{}/{}.json'.format(self.server_url, imdb_id)
page_link = 'http://{}/#/{}/{}'.format(self.server_url, 'movies' if is_movie else 'series', imdb_id)
# get the list of subtitles
logger.debug('Getting the list of subtitles')
r = self.session.get(url)
r.raise_for_status()
try:
results = r.json()
except ValueError:
return {}
# filter irrelevant results
if not is_movie:
results = results.get('subs', [])
# there are two formats of result jsons - seasons list and seasons dict
if isinstance(results, list):
results = results[season] if len(results) >= season else {}
else:
results = results.get(str(season), {})
results = results.get(str(episode), [])
else:
results = results.get('subs', [])
# loop over results
subtitles = {}
for result in results:
language = Language.fromalpha2('he')
hearing_impaired = False
subtitle_id = result['id']
release = result['version']
# otherwise create it
subtitle = WizdomSubtitle(language, hearing_impaired, page_link, title, season, episode, title, imdb_id,
subtitle_id, release)
logger.debug('Found subtitle %r', subtitle)
subtitles[subtitle_id] = subtitle
return subtitles.values()
def list_subtitles(self, video, languages):
season = episode = None
year = video.year
filename = video.name
imdb_id = video.imdb_id
if isinstance(video, Episode):
titles = [video.series] + video.alternative_series
season = video.season
episode = video.episode
imdb_id = video.series_imdb_id
else:
titles = [video.title] + video.alternative_titles
for title in titles:
subtitles = [s for s in
self.query(title, season, episode, year, filename, imdb_id) if s.language in languages]
if subtitles:
return subtitles
return []
def download_subtitle(self, subtitle):
# download
url = 'http://zip.{}/{}.zip'.format(self.server_url, subtitle.subtitle_id)
r = self.session.get(url, headers={'Referer': subtitle.page_link}, timeout=10)
r.raise_for_status()
# open the zip
with zipfile.ZipFile(io.BytesIO(r.content)) as zf:
# remove some filenames from the namelist
namelist = [n for n in zf.namelist() if os.path.splitext(n)[1] in ['.srt', '.sub']]
if len(namelist) > 1:
raise ProviderError('More than one file to unzip')
subtitle.content = fix_line_ending(zf.read(namelist[0]))

View File

@ -121,7 +121,7 @@ class YavkaNetProvider(Provider):
logger.debug('No subtitles found')
return subtitles
soup = BeautifulSoup(response.content, 'html.parser')
soup = BeautifulSoup(response.content, 'lxml')
rows = soup.findAll('tr', {'class': 'info'})
# Search on first 20 rows only
@ -129,9 +129,13 @@ class YavkaNetProvider(Provider):
element = row.find('a', {'class': 'selector'})
if element:
link = element.get('href')
element = row.find('a', {'class': 'click'})
uploader = element.get_text() if element else None
logger.info('Found subtitle link %r', link)
subtitles = subtitles + self.download_archive_and_add_subtitle_files('http://yavka.net/' + link, language, video)
sub = self.download_archive_and_add_subtitle_files('http://yavka.net/' + link, language, video)
for s in sub:
s.uploader = uploader
subtitles = subtitles + sub
return subtitles
def list_subtitles(self, video, languages):

View File

@ -4,6 +4,13 @@ import io
import logging
import os
import zipfile
import re
import copy
try:
from urlparse import urljoin
except ImportError:
from urllib.parse import urljoin
import rarfile
from subzero.language import Language
@ -13,7 +20,12 @@ from six import text_type
from subliminal import __short_version__
from subliminal.providers import ParserBeautifulSoup, Provider
from subliminal.subtitle import SUBTITLE_EXTENSIONS, Subtitle, fix_line_ending, guess_matches
from subliminal.subtitle import (
SUBTITLE_EXTENSIONS,
Subtitle,
fix_line_ending,
guess_matches,
)
from subliminal.video import Episode, Movie
logger = logging.getLogger(__name__)
@ -21,43 +33,50 @@ logger = logging.getLogger(__name__)
class ZimukuSubtitle(Subtitle):
"""Zimuku Subtitle."""
provider_name = 'zimuku'
def __init__(self, language, page_link, version, download_link):
provider_name = "zimuku"
def __init__(self, language, page_link, version, session):
super(ZimukuSubtitle, self).__init__(language, page_link=page_link)
self.version = version
self.download_link = download_link
self.hearing_impaired = None
self.encoding = 'utf-8'
self.hearing_impaired = False
self.encoding = "utf-8"
self.session = session
@property
def id(self):
return self.download_link
return self.version
def get_matches(self, video):
matches = set()
# episode
if isinstance(video, Episode):
# always make year a match
info = guessit(self.version, {"type": "episode"})
info["year"] = video.year
# other properties
matches |= guess_matches(video, guessit(self.version, {'type': 'episode'}), partial=True)
matches |= guess_matches(video, info, partial=True)
# movie
elif isinstance(video, Movie):
# other properties
matches |= guess_matches(video, guessit(self.version, {'type': 'movie'}), partial=True)
matches |= guess_matches(
video, guessit(self.version, {"type": "movie"}), partial=True
)
return matches
class ZimukuProvider(Provider):
"""Zimuku Provider."""
languages = {Language(l) for l in ['zho', 'eng']}
server_url = 'http://www.zimuku.la'
search_url = '/search?q={}'
download_url = 'http://www.zimuku.la/'
UserAgent = 'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; Trident/6.0)'
languages = {Language(l) for l in ["zho", "eng"]}
server_url = "http://www.zimuku.la"
search_url = "/search?q={}"
download_url = "http://www.zimuku.la/"
UserAgent = "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; Trident/6.0)"
subtitle_class = ZimukuSubtitle
@ -66,19 +85,52 @@ class ZimukuProvider(Provider):
def initialize(self):
self.session = Session()
self.session.headers['User-Agent'] = 'Subliminal/{}'.format(__short_version__)
self.session.headers["User-Agent"] = "Subliminal/{}".format(__short_version__)
def terminate(self):
self.session.close()
def _parse_episode_page(self, link):
r = self.session.get(link)
bs_obj = ParserBeautifulSoup(
r.content.decode("utf-8", "ignore"), ["html.parser"]
)
subs_body = bs_obj.find("div", class_="subs box clearfix").find("tbody")
subs = []
for sub in subs_body.find_all("tr"):
a = sub.find("a")
name = _extract_name(a.text)
name = os.path.splitext(name)[
0
] # remove ext because it can be an archive type
language = Language("eng")
for img in sub.find("td", class_="tac lang").find_all("img"):
if (
"hongkong" in img.attrs["src"]
or "china" in img.attrs["src"]
or "jollyroger" in img.attrs["src"]
):
language = Language("zho")
break
sub_page_link = urljoin(self.server_url, a.attrs["href"])
backup_session = copy.deepcopy(self.session)
backup_session.headers["Referer"] = link
subs.append(
self.subtitle_class(language, sub_page_link, name, backup_session)
)
return subs
def query(self, keyword, season=None, episode=None, year=None):
params = keyword
if season and episode:
params += ' S{season:02d}E{episode:02d}'.format(season=season, episode=episode)
if season:
params += ".S{season:02d}".format(season=season)
elif year:
params += ' {:4d}'.format(year)
params += " {:4d}".format(year)
logger.debug('Searching subtitles %r', params)
logger.debug("Searching subtitles %r", params)
subtitles = []
search_link = self.server_url + text_type(self.search_url).format(params)
@ -86,45 +138,33 @@ class ZimukuProvider(Provider):
r.raise_for_status()
if not r.content:
logger.debug('No data returned from provider')
logger.debug("No data returned from provider")
return []
soup = ParserBeautifulSoup(r.content.decode('utf-8', 'ignore'), ['lxml', 'html.parser'])
soup = ParserBeautifulSoup(
r.content.decode("utf-8", "ignore"), ["lxml", "html.parser"]
)
for entity in soup.select('div.item.prel.clearfix a:nth-of-type(2)'):
moviename = entity.text
entity_url = self.server_url + entity['href']
logger.debug(entity_url)
r = self.session.get(entity_url, timeout=30)
r.raise_for_status()
logger.debug('looking into ' + entity_url)
# non-shooter result page
if soup.find("div", {"class": "item"}):
logger.debug("enter a non-shooter page")
for item in soup.find_all("div", {"class": "item"}):
title_a = item.find("p", class_="tt clearfix").find("a")
if season:
title = title_a.text
season_cn1 = re.search("第(.*)季", title)
if not season_cn1:
season_cn1 = ""
else:
season_cn1 = season_cn1.group(1).strip()
season_cn2 = num_to_cn(str(season))
if season_cn1 != season_cn2:
continue
episode_link = self.server_url + title_a.attrs["href"]
new_subs = self._parse_episode_page(episode_link)
subtitles += new_subs
soup = ParserBeautifulSoup(r.content.decode('utf-8', 'ignore'), ['lxml', 'html.parser']).find("div", class_="subs box clearfix")
# loop over subtitles cells
subs = soup.tbody.find_all("tr")
for sub in subs:
page_link = '%s%s' % (self.server_url, sub.a.get('href').encode('utf-8'))
version = sub.a.text.encode('utf-8') or None
if version is None:
version = ""
try:
td = sub.find("td", class_="tac lang")
r2 = td.find_all("img")
langs = [x.get('title').encode('utf-8') for x in r2]
except:
langs = '未知'
name = '%s (%s)' % (version, ",".join(langs))
if ('English' in langs) and not(('简体中文' in langs) or ('繁體中文' in langs)):
language = Language('eng')
else:
language = Language('zho')
# read the item
subtitle = self.subtitle_class(language, page_link, version, page_link.replace("detail","dld"))
logger.debug('Found subtitle %r', subtitle)
subtitles.append(subtitle)
# NOTE: shooter result pages are ignored due to the existence of assrt provider
return subtitles
@ -140,70 +180,174 @@ class ZimukuProvider(Provider):
# query for subtitles with the show_id
for title in titles:
if isinstance(video, Episode):
subtitles += [s for s in self.query(title, season=video.season, episode=video.episode,
year=video.year)
if s.language in languages]
subtitles += [
s
for s in self.query(
title,
season=video.season,
episode=video.episode,
year=video.year,
)
if s.language in languages
]
elif isinstance(video, Movie):
subtitles += [s for s in self.query(title, year=video.year)
if s.language in languages]
subtitles += [
s
for s in self.query(title, year=video.year)
if s.language in languages
]
return subtitles
def download_subtitle(self, subtitle):
if isinstance(subtitle, ZimukuSubtitle):
# download the subtitle
logger.info('Downloading subtitle %r', subtitle)
r = self.session.get(subtitle.download_link, headers={'Referer': subtitle.page_link},
timeout=30)
r.raise_for_status()
def _get_archive_dowload_link(session, sub_page_link):
r = session.get(sub_page_link)
bs_obj = ParserBeautifulSoup(
r.content.decode("utf-8", "ignore"), ["html.parser"]
)
down_page_link = bs_obj.find("a", {"id": "down1"}).attrs["href"]
down_page_link = urljoin(sub_page_link, down_page_link)
r = session.get(down_page_link)
bs_obj = ParserBeautifulSoup(
r.content.decode("utf-8", "ignore"), ["html.parser"]
)
download_link = bs_obj.find("a", {"rel": "nofollow"})
download_link = download_link.attrs["href"]
download_link = urljoin(sub_page_link, download_link)
return download_link
if not r.content:
logger.debug('Unable to download subtitle. No data returned from provider')
# download the subtitle
logger.info("Downloading subtitle %r", subtitle)
self.session = subtitle.session
download_link = _get_archive_dowload_link(self.session, subtitle.page_link)
r = self.session.get(download_link, timeout=30)
r.raise_for_status()
filename = r.headers["Content-Disposition"]
if not r.content:
logger.debug("Unable to download subtitle. No data returned from provider")
return
archive_stream = io.BytesIO(r.content)
archive = None
if rarfile.is_rarfile(archive_stream):
logger.debug("Identified rar archive")
if ".rar" not in filename:
logger.debug(
".rar should be in the downloaded file name: {}".format(filename)
)
return
soup = ParserBeautifulSoup(r.content.decode('utf-8', 'ignore'), ['lxml', 'html.parser'])
links = soup.find("div", {"class":"clearfix"}).find_all('a')
# TODO: add settings for choice
for down_link in links:
url = down_link.get('href').encode('utf-8')
url = self.server_url + url
r = self.session.get(url, headers={'Referer': subtitle.download_link},
timeout=30)
r.raise_for_status()
if len(r.content) > 1024:
archive = rarfile.RarFile(archive_stream)
subtitle_content = _get_subtitle_from_archive(archive)
elif zipfile.is_zipfile(archive_stream):
logger.debug("Identified zip archive")
if ".zip" not in filename:
logger.debug(
".zip should be in the downloaded file name: {}".format(filename)
)
return
archive = zipfile.ZipFile(archive_stream)
subtitle_content = _get_subtitle_from_archive(archive)
else:
is_sub = ""
for sub_ext in SUBTITLE_EXTENSIONS:
if sub_ext in filename:
is_sub = sub_ext
break
if not is_sub:
logger.debug(
"unknown subtitle ext int downloaded file name: {}".format(filename)
)
return
logger.debug("Identified {} file".format(is_sub))
subtitle_content = r.content
archive_stream = io.BytesIO(r.content)
archive = None
if rarfile.is_rarfile(archive_stream):
logger.debug('Identified rar archive')
archive = rarfile.RarFile(archive_stream)
subtitle_content = _get_subtitle_from_archive(archive)
elif zipfile.is_zipfile(archive_stream):
logger.debug('Identified zip archive')
archive = zipfile.ZipFile(archive_stream)
subtitle_content = _get_subtitle_from_archive(archive)
else:
subtitle_content = r.content
if subtitle_content:
subtitle.content = fix_line_ending(subtitle_content)
else:
logger.debug('Could not extract subtitle from %r', archive)
if subtitle_content:
subtitle.content = fix_line_ending(subtitle_content)
else:
logger.debug("Could not extract subtitle from %r", archive)
def _get_subtitle_from_archive(archive):
for name in archive.namelist():
extract_subname, max_score = "", -1
for subname in archive.namelist():
# discard hidden files
if os.path.split(name)[-1].startswith('.'):
if os.path.split(subname)[-1].startswith("."):
continue
# discard non-subtitle files
if not name.lower().endswith(SUBTITLE_EXTENSIONS):
if not subname.lower().endswith(SUBTITLE_EXTENSIONS):
continue
return archive.read(name)
# prefer ass/ssa subtitles with double languages or simplified chinese
score = ("ass" in subname or "ssa" in subname) * 1
if "简体" in subname or "chs" in subname or ".gb." in subname:
score += 2
if "繁体" in subname or "cht" in subname or ".big5." in subname:
pass
if "chs.eng" in subname or "chs&eng" in subname:
score += 2
if "中英" in subname or "简英" in subname or "双语" in subname or "简体&英文" in subname:
score += 4
logger.debug("subtitle {}, score: {}".format(subname, score))
if score > max_score:
max_score = score
extract_subname = subname
return None
return archive.read(extract_subname) if max_score != -1 else None
def _extract_name(name):
""" filter out Chinese characters from subtitle names """
name, suffix = os.path.splitext(name)
c_pattern = "[\u4e00-\u9fff]"
e_pattern = "[a-zA-Z]"
c_indices = [m.start(0) for m in re.finditer(c_pattern, name)]
e_indices = [m.start(0) for m in re.finditer(e_pattern, name)]
target, discard = e_indices, c_indices
if len(target) == 0:
return ""
first_target, last_target = target[0], target[-1]
first_discard = discard[0] if discard else -1
last_discard = discard[-1] if discard else -1
if last_discard < first_target:
new_name = name[first_target:]
elif last_target < first_discard:
new_name = name[:first_discard]
else:
# try to find maximum continous part
result, start, end = [0, 1], -1, 0
while end < len(name):
while end not in e_indices and end < len(name):
end += 1
if end == len(name):
break
start = end
while end not in c_indices and end < len(name):
end += 1
if end - start > result[1] - result[0]:
result = [start, end]
print(result)
start = end
end += 1
new_name = name[result[0] : result[1]]
new_name = new_name.strip() + suffix
return new_name
def num_to_cn(number):
""" convert numbers(1-99) to Chinese """
assert number.isdigit() and 1 <= int(number) <= 99
trans_map = {n: c for n, c in zip(("123456789"), ("一二三四五六七八九"))}
if len(number) == 1:
return trans_map[number]
else:
part1 = "" if number[0] == "1" else trans_map[number[0]] + ""
part2 = trans_map[number[1]] if number[1] != "0" else ""
return part1 + part2

View File

@ -54,6 +54,7 @@ class Subtitle(Subtitle_):
is_pack = False
asked_for_release_group = None
asked_for_episode = None
uploader = None # string - uploader username
pack_data = None
_guessed_encoding = None