Updated Apprise to 0.8.8

This commit is contained in:
Louis Vézina 2020-09-14 08:24:45 -04:00
parent e6b8b1ad19
commit 5d6f453d3f
8 changed files with 3021 additions and 0 deletions

View File

@ -0,0 +1,434 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2020 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from .NotifyBase import NotifyBase
from ..URLBase import PrivacyMode
from ..common import NotifyImageSize
from ..common import NotifyType
from ..utils import parse_bool
from ..AppriseLocale import gettext_lazy as _
# Default our global support flag
NOTIFY_GROWL_SUPPORT_ENABLED = False
try:
import gntp.notifier
# We're good to go!
NOTIFY_GROWL_SUPPORT_ENABLED = True
except ImportError:
# No problem; we just simply can't support this plugin until
# gntp is installed
pass
# Priorities
class GrowlPriority(object):
LOW = -2
MODERATE = -1
NORMAL = 0
HIGH = 1
EMERGENCY = 2
GROWL_PRIORITIES = (
GrowlPriority.LOW,
GrowlPriority.MODERATE,
GrowlPriority.NORMAL,
GrowlPriority.HIGH,
GrowlPriority.EMERGENCY,
)
class NotifyGrowl(NotifyBase):
"""
A wrapper to Growl Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Growl'
# The services URL
service_url = 'http://growl.info/'
# The default protocol
protocol = 'growl'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_growl'
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_72
# This entry is a bit hacky, but it allows us to unit-test this library
# in an environment that simply doesn't have the windows packages
# available to us. It also allows us to handle situations where the
# packages actually are present but we need to test that they aren't.
# If anyone is seeing this had knows a better way of testing this
# outside of what is defined in test/test_growl_plugin.py, please
# let me know! :)
_enabled = NOTIFY_GROWL_SUPPORT_ENABLED
# Disable throttle rate for Growl requests since they are normally
# local anyway
request_rate_per_sec = 0
# Limit results to just the first 10 line otherwise there is just to much
# content to display
body_max_line_count = 2
# Default Growl Port
default_port = 23053
# The Growl notification type used
growl_notification_type = "New Messages"
# Define object templates
templates = (
'{schema}://{host}',
'{schema}://{host}:{port}',
'{schema}://{password}@{host}',
'{schema}://{password}@{host}:{port}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'host': {
'name': _('Hostname'),
'type': 'string',
'required': True,
},
'port': {
'name': _('Port'),
'type': 'int',
'min': 1,
'max': 65535,
},
'password': {
'name': _('Password'),
'type': 'string',
'private': True,
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'priority': {
'name': _('Priority'),
'type': 'choice:int',
'values': GROWL_PRIORITIES,
'default': GrowlPriority.NORMAL,
},
'version': {
'name': _('Version'),
'type': 'choice:int',
'values': (1, 2),
'default': 2,
},
'image': {
'name': _('Include Image'),
'type': 'bool',
'default': True,
'map_to': 'include_image',
},
'sticky': {
'name': _('Sticky'),
'type': 'bool',
'default': True,
'map_to': 'sticky',
},
})
def __init__(self, priority=None, version=2, include_image=True,
sticky=False, **kwargs):
"""
Initialize Growl Object
"""
super(NotifyGrowl, self).__init__(**kwargs)
if not self.port:
self.port = self.default_port
# The Priority of the message
if priority not in GROWL_PRIORITIES:
self.priority = GrowlPriority.NORMAL
else:
self.priority = priority
# Our Registered object
self.growl = None
# Sticky flag
self.sticky = sticky
# Store Version
self.version = version
# Track whether or not we want to send an image with our notification
# or not.
self.include_image = include_image
return
def register(self):
"""
Registers with the Growl server
"""
payload = {
'applicationName': self.app_id,
'notifications': [self.growl_notification_type, ],
'defaultNotifications': [self.growl_notification_type, ],
'hostname': self.host,
'port': self.port,
}
if self.password is not None:
payload['password'] = self.password
self.logger.debug('Growl Registration Payload: %s' % str(payload))
self.growl = gntp.notifier.GrowlNotifier(**payload)
try:
self.growl.register()
except gntp.errors.NetworkError:
msg = 'A network error error occurred registering ' \
'with Growl at {}.'.format(self.host)
self.logger.warning(msg)
return False
except gntp.errors.ParseError:
msg = 'A parsing error error occurred registering ' \
'with Growl at {}.'.format(self.host)
self.logger.warning(msg)
return False
except gntp.errors.AuthError:
msg = 'An authentication error error occurred registering ' \
'with Growl at {}.'.format(self.host)
self.logger.warning(msg)
return False
except gntp.errors.UnsupportedError:
msg = 'An unsupported error occurred registering with ' \
'Growl at {}.'.format(self.host)
self.logger.warning(msg)
return False
self.logger.debug(
'Growl server registration completed successfully.'
)
# Return our state
return True
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Growl Notification
"""
if not self._enabled:
self.logger.warning(
"Growl Notifications are not supported by this system; "
"`pip install gntp`.")
return False
# Register ourselves with the server if we haven't done so already
if not self.growl and not self.register():
# We failed to register
return False
icon = None
if self.version >= 2:
# URL Based
icon = None if not self.include_image \
else self.image_url(notify_type)
else:
# Raw
icon = None if not self.include_image \
else self.image_raw(notify_type)
payload = {
'noteType': self.growl_notification_type,
'title': title,
'description': body,
'icon': icon is not None,
'sticky': self.sticky,
'priority': self.priority,
}
self.logger.debug('Growl Payload: %s' % str(payload))
# Update icon of payload to be raw data; this is intentionally done
# here after we spit the debug message above (so we don't try to
# print the binary contents of an image
payload['icon'] = icon
# Always call throttle before any remote server i/o is made
self.throttle()
try:
# Perform notification
response = self.growl.notify(**payload)
if not isinstance(response, bool):
self.logger.warning(
'Growl notification failed to send with response: %s' %
str(response),
)
else:
self.logger.info('Sent Growl notification.')
except gntp.errors.BaseError as e:
# Since Growl servers listen for UDP broadcasts, it's possible
# that you will never get to this part of the code since there is
# no acknowledgement as to whether it accepted what was sent to it
# or not.
# However, if the host/server is unavailable, you will get to this
# point of the code.
self.logger.warning(
'A Connection error occurred sending Growl '
'notification to %s.' % self.host)
self.logger.debug('Growl Exception: %s' % str(e))
# Return; we're done
return False
return True
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
_map = {
GrowlPriority.LOW: 'low',
GrowlPriority.MODERATE: 'moderate',
GrowlPriority.NORMAL: 'normal',
GrowlPriority.HIGH: 'high',
GrowlPriority.EMERGENCY: 'emergency',
}
# Define any URL parameters
params = {
'image': 'yes' if self.include_image else 'no',
'sticky': 'yes' if self.sticky else 'no',
'priority':
_map[GrowlPriority.NORMAL] if self.priority not in _map
else _map[self.priority],
'version': self.version,
}
# Extend our parameters
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
auth = ''
if self.user:
# The growl password is stored in the user field
auth = '{password}@'.format(
password=self.pprint(
self.user, privacy, mode=PrivacyMode.Secret, safe=''),
)
return '{schema}://{auth}{hostname}{port}/?{params}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
# never encode hostname since we're expecting it to be a valid one
hostname=self.host,
port='' if self.port is None or self.port == self.default_port
else ':{}'.format(self.port),
params=NotifyGrowl.urlencode(params),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
version = None
if 'version' in results['qsd'] and len(results['qsd']['version']):
# Allow the user to specify the version of the protocol to use.
try:
version = int(
NotifyGrowl.unquote(
results['qsd']['version']).strip().split('.')[0])
except (AttributeError, IndexError, TypeError, ValueError):
NotifyGrowl.logger.warning(
'An invalid Growl version of "%s" was specified and will '
'be ignored.' % results['qsd']['version']
)
pass
if 'priority' in results['qsd'] and len(results['qsd']['priority']):
_map = {
'l': GrowlPriority.LOW,
'm': GrowlPriority.MODERATE,
'n': GrowlPriority.NORMAL,
'h': GrowlPriority.HIGH,
'e': GrowlPriority.EMERGENCY,
}
try:
results['priority'] = \
_map[results['qsd']['priority'][0].lower()]
except KeyError:
# No priority was set
pass
# Because of the URL formatting, the password is actually where the
# username field is. For this reason, we just preform this small hack
# to make it (the URL) conform correctly. The following strips out the
# existing password entry (if exists) so that it can be swapped with
# the new one we specify.
if results.get('password', None) is None:
results['password'] = results.get('user', None)
# Include images with our message
results['include_image'] = \
parse_bool(results['qsd'].get('image',
NotifyGrowl.template_args['image']['default']))
# Include images with our message
results['sticky'] = \
parse_bool(results['qsd'].get('sticky',
NotifyGrowl.template_args['sticky']['default']))
# Set our version
if version:
results['version'] = version
return results

View File

@ -0,0 +1,869 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2020 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# For LaMetric to work, you need to first setup a custom application on their
# website. it can be done as follows:
# Cloud Mode:
# 1. Sign Up and login to the developer webpage https://developer.lametric.com
# 2. Create a **Notification App** if you haven't already done so from:
# https://developer.lametric.com/applications/sources
# 3. Provide it an app name, a description and privacy URL (which can point to
# anywhere; I set mine to `http://localhost`). No permissions are
# required.
# 4. Access your newly created app so that you can acquire both the
# **Client ID** and the **Client Secret** here:
# https://developer.lametric.com/applications/sources
# Device Mode:
# - Sign Up and login to the developer webpage https://developer.lametric.com
# - Locate your Device API Key; you can find it here:
# https://developer.lametric.com/user/devices
# - From here you can get your your API Key for the device you plan to notify.
# - Your devices IP Address can be found in LaMetric Time app at:
# Settings -> Wi-Fi -> IP Address
# A great source for API examples (Device Mode):
# - https://lametric-documentation.readthedocs.io/en/latest/reference-docs\
# /device-notifications.html
# A great source for API examples (Cloud Mode):
# - https://lametric-documentation.readthedocs.io/en/latest/reference-docs\
# /lametric-cloud-reference.html
# A great source for the icon reference:
# - https://developer.lametric.com/icons
import re
import six
import requests
from json import dumps
from .NotifyBase import NotifyBase
from ..URLBase import PrivacyMode
from ..common import NotifyType
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
from ..utils import is_hostname
from ..utils import is_ipaddr
class LametricMode(object):
"""
Define Lametric Notification Modes
"""
# App posts upstream to the developer API on Lametric's website
CLOUD = "cloud"
# Device mode posts directly to the device that you identify
DEVICE = "device"
LAMETRIC_MODES = (
LametricMode.CLOUD,
LametricMode.DEVICE,
)
class LametricPriority(object):
"""
Priority of the message
"""
# info: this priority means that notification will be displayed on the
# same “level” as all other notifications on the device that come
# from apps (for example facebook app). This notification will not
# be shown when screensaver is active. By default message is sent
# with "info" priority. This level of notification should be used
# for notifications like news, weather, temperature, etc.
INFO = 'info'
# warning: notifications with this priority will interrupt ones sent with
# lower priority (“info”). Should be used to notify the user
# about something important but not critical. For example,
# events like “someone is coming home” should use this priority
# when sending notifications from smart home.
WARNING = 'warning'
# critical: the most important notifications. Interrupts notification
# with priority info or warning and is displayed even if
# screensaver is active. Use with care as these notifications
# can pop in the middle of the night. Must be used only for
# really important notifications like notifications from smoke
# detectors, water leak sensors, etc. Use it for events that
# require human interaction immediately.
CRITICAL = 'critical'
LAMETRIC_PRIORITIES = (
LametricPriority.INFO,
LametricPriority.WARNING,
LametricPriority.CRITICAL,
)
class LametricIconType(object):
"""
Represents the nature of notification.
"""
# info - "i" icon will be displayed prior to the notification. Means that
# notification contains information, no need to take actions on it.
INFO = 'info'
# alert: "!!!" icon will be displayed prior to the notification. Use it
# when you want the user to pay attention to that notification as
# it indicates that something bad happened and user must take
# immediate action.
ALERT = 'alert'
# none: no notification icon will be shown.
NONE = 'none'
LAMETRIC_ICON_TYPES = (
LametricIconType.INFO,
LametricIconType.ALERT,
LametricIconType.NONE,
)
class LametricSoundCategory(object):
"""
Define Sound Categories
"""
NOTIFICATIONS = "notifications"
ALARMS = "alarms"
class LametricSound(object):
"""
There are 2 categories of sounds, to make things simple we just lump them
all togther in one class object.
Syntax is (Category, (AlarmID, Alias1, Alias2, ...))
"""
# Alarm Category Sounds
ALARM01 = (LametricSoundCategory.ALARMS, ('alarm1', 'a1', 'a01'))
ALARM02 = (LametricSoundCategory.ALARMS, ('alarm2', 'a2', 'a02'))
ALARM03 = (LametricSoundCategory.ALARMS, ('alarm3', 'a3', 'a03'))
ALARM04 = (LametricSoundCategory.ALARMS, ('alarm4', 'a4', 'a04'))
ALARM05 = (LametricSoundCategory.ALARMS, ('alarm5', 'a5', 'a05'))
ALARM06 = (LametricSoundCategory.ALARMS, ('alarm6', 'a6', 'a06'))
ALARM07 = (LametricSoundCategory.ALARMS, ('alarm7', 'a7', 'a07'))
ALARM08 = (LametricSoundCategory.ALARMS, ('alarm8', 'a8', 'a08'))
ALARM09 = (LametricSoundCategory.ALARMS, ('alarm9', 'a9', 'a09'))
ALARM10 = (LametricSoundCategory.ALARMS, ('alarm10', 'a10'))
ALARM11 = (LametricSoundCategory.ALARMS, ('alarm11', 'a11'))
ALARM12 = (LametricSoundCategory.ALARMS, ('alarm12', 'a12'))
ALARM13 = (LametricSoundCategory.ALARMS, ('alarm13', 'a13'))
# Notification Category Sounds
BICYCLE = (LametricSoundCategory.NOTIFICATIONS, ('bicycle', 'bike'))
CAR = (LametricSoundCategory.NOTIFICATIONS, ('car', ))
CASH = (LametricSoundCategory.NOTIFICATIONS, ('cash', ))
CAT = (LametricSoundCategory.NOTIFICATIONS, ('cat', ))
DOG01 = (LametricSoundCategory.NOTIFICATIONS, ('dog', 'dog1', 'dog01'))
DOG02 = (LametricSoundCategory.NOTIFICATIONS, ('dog2', 'dog02'))
ENERGY = (LametricSoundCategory.NOTIFICATIONS, ('energy', ))
KNOCK = (LametricSoundCategory.NOTIFICATIONS, ('knock-knock', 'knock'))
EMAIL = (LametricSoundCategory.NOTIFICATIONS, (
'letter_email', 'letter', 'email'))
LOSE01 = (LametricSoundCategory.NOTIFICATIONS, ('lose1', 'lose01', 'lose'))
LOSE02 = (LametricSoundCategory.NOTIFICATIONS, ('lose2', 'lose02'))
NEGATIVE01 = (LametricSoundCategory.NOTIFICATIONS, (
'negative1', 'negative01', 'neg01', 'neg1', '-'))
NEGATIVE02 = (LametricSoundCategory.NOTIFICATIONS, (
'negative2', 'negative02', 'neg02', 'neg2', '--'))
NEGATIVE03 = (LametricSoundCategory.NOTIFICATIONS, (
'negative3', 'negative03', 'neg03', 'neg3', '---'))
NEGATIVE04 = (LametricSoundCategory.NOTIFICATIONS, (
'negative4', 'negative04', 'neg04', 'neg4', '----'))
NEGATIVE05 = (LametricSoundCategory.NOTIFICATIONS, (
'negative5', 'negative05', 'neg05', 'neg5', '-----'))
NOTIFICATION01 = (LametricSoundCategory.NOTIFICATIONS, (
'notification', 'notification1', 'notification01', 'not01', 'not1'))
NOTIFICATION02 = (LametricSoundCategory.NOTIFICATIONS, (
'notification2', 'notification02', 'not02', 'not2'))
NOTIFICATION03 = (LametricSoundCategory.NOTIFICATIONS, (
'notification3', 'notification03', 'not03', 'not3'))
NOTIFICATION04 = (LametricSoundCategory.NOTIFICATIONS, (
'notification4', 'notification04', 'not04', 'not4'))
OPEN_DOOR = (LametricSoundCategory.NOTIFICATIONS, (
'open_door', 'open', 'door'))
POSITIVE01 = (LametricSoundCategory.NOTIFICATIONS, (
'positive1', 'positive01', 'pos01', 'p1', '+'))
POSITIVE02 = (LametricSoundCategory.NOTIFICATIONS, (
'positive2', 'positive02', 'pos02', 'p2', '++'))
POSITIVE03 = (LametricSoundCategory.NOTIFICATIONS, (
'positive3', 'positive03', 'pos03', 'p3', '+++'))
POSITIVE04 = (LametricSoundCategory.NOTIFICATIONS, (
'positive4', 'positive04', 'pos04', 'p4', '++++'))
POSITIVE05 = (LametricSoundCategory.NOTIFICATIONS, (
'positive5', 'positive05', 'pos05', 'p5', '+++++'))
POSITIVE06 = (LametricSoundCategory.NOTIFICATIONS, (
'positive6', 'positive06', 'pos06', 'p6', '++++++'))
STATISTIC = (LametricSoundCategory.NOTIFICATIONS, ('statistic', 'stat'))
THUNDER = (LametricSoundCategory.NOTIFICATIONS, ('thunder'))
WATER01 = (LametricSoundCategory.NOTIFICATIONS, ('water1', 'water01'))
WATER02 = (LametricSoundCategory.NOTIFICATIONS, ('water2', 'water02'))
WIN01 = (LametricSoundCategory.NOTIFICATIONS, ('win', 'win01', 'win1'))
WIN02 = (LametricSoundCategory.NOTIFICATIONS, ('win2', 'win02'))
WIND = (LametricSoundCategory.NOTIFICATIONS, ('wind', ))
WIND_SHORT = (LametricSoundCategory.NOTIFICATIONS, ('wind_short', ))
# A listing of all the sounds; the order DOES matter, content is read from
# top down and then right to left (over aliases). Longer similar sounding
# elements should be placed higher in the list over others. for example
# ALARM10 should come before ALARM01 (because ALARM01 can match on 'alarm1'
# which is very close to 'alarm10'
LAMETRIC_SOUNDS = (
# Alarm Category Entries
LametricSound.ALARM13, LametricSound.ALARM12, LametricSound.ALARM11,
LametricSound.ALARM10, LametricSound.ALARM09, LametricSound.ALARM08,
LametricSound.ALARM07, LametricSound.ALARM06, LametricSound.ALARM05,
LametricSound.ALARM04, LametricSound.ALARM03, LametricSound.ALARM02,
LametricSound.ALARM01,
# Notification Category Entries
LametricSound.BICYCLE, LametricSound.CAR, LametricSound.CASH,
LametricSound.CAT, LametricSound.DOG02, LametricSound.DOG01,
LametricSound.ENERGY, LametricSound.KNOCK, LametricSound.EMAIL,
LametricSound.LOSE02, LametricSound.LOSE01, LametricSound.NEGATIVE01,
LametricSound.NEGATIVE02, LametricSound.NEGATIVE03,
LametricSound.NEGATIVE04, LametricSound.NEGATIVE05,
LametricSound.NOTIFICATION04, LametricSound.NOTIFICATION03,
LametricSound.NOTIFICATION02, LametricSound.NOTIFICATION01,
LametricSound.OPEN_DOOR, LametricSound.POSITIVE01,
LametricSound.POSITIVE02, LametricSound.POSITIVE03,
LametricSound.POSITIVE04, LametricSound.POSITIVE05,
LametricSound.POSITIVE01, LametricSound.STATISTIC, LametricSound.THUNDER,
LametricSound.WATER02, LametricSound.WATER01, LametricSound.WIND,
LametricSound.WIND_SHORT, LametricSound.WIN01, LametricSound.WIN02,
)
class NotifyLametric(NotifyBase):
"""
A wrapper for LaMetric Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'LaMetric'
# The services URL
service_url = 'https://lametric.com'
# The default protocol
protocol = 'lametric'
# The default secure protocol
secure_protocol = 'lametrics'
# Allow 300 requests per minute.
# 60/300 = 0.2
request_rate_per_sec = 0.20
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_lametric'
# Lametric does have titles when creating a message
title_maxlen = 0
# URL used for notifying Lametric App's created in the Dev Portal
cloud_notify_url = 'https://developer.lametric.com/api/v1' \
'/dev/widget/update/com.lametric.{client_id}'
# URL used for local notifications directly to the device
device_notify_url = '{schema}://{host}{port}/api/v2/device/notifications'
# The Device User ID
default_device_user = 'dev'
# Track all icon mappings back to Apprise Icon NotifyType's
# See: https://developer.lametric.com/icons
# Icon ID looks like <prefix>XXX, where <prefix> is:
# - "i" (for static icon)
# - "a" (for animation)
# - XXX - is the number of the icon and can be found at:
# https://developer.lametric.com/icons
lametric_icon_id_mapping = {
# 620/Info
NotifyType.INFO: 'i620',
# 9182/info_good
NotifyType.SUCCESS: 'i9182',
# 9183/info_caution
NotifyType.WARNING: 'i9183',
# 9184/info_error
NotifyType.FAILURE: 'i9184',
}
# Define object templates
templates = (
# App Mode
'{schema}://{client_id}@{secret}',
# Device Mode
'{schema}://{apikey}@{host}',
'{schema}://{apikey}@{host}:{port}',
'{schema}://{user}:{apikey}@{host}:{port}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'apikey': {
'name': _('Device API Key'),
'type': 'string',
'private': True,
},
'host': {
'name': _('Hostname'),
'type': 'string',
'required': True,
},
'port': {
'name': _('Port'),
'type': 'int',
'min': 1,
'max': 65535,
'default': 8080,
},
'user': {
'name': _('Username'),
'type': 'string',
},
'client_id': {
'name': _('Client ID'),
'type': 'string',
'private': True,
'regex': (r'^[a-z0-9-]+$', 'i'),
},
'secret': {
'name': _('Client Secret'),
'type': 'string',
'private': True,
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'oauth_id': {
'alias_of': 'client_id',
},
'oauth_secret': {
'alias_of': 'secret',
},
'apikey': {
'alias_of': 'apikey',
},
'priority': {
'name': _('Priority'),
'type': 'choice:string',
'values': LAMETRIC_PRIORITIES,
'default': LametricPriority.INFO,
},
'icon': {
'name': _('Custom Icon'),
'type': 'string',
},
'icon_type': {
'name': _('Icon Type'),
'type': 'choice:string',
'values': LAMETRIC_ICON_TYPES,
'default': LametricIconType.NONE,
},
'mode': {
'name': _('Mode'),
'type': 'choice:string',
'values': LAMETRIC_MODES,
'default': LametricMode.DEVICE,
},
'sound': {
'name': _('Sound'),
'type': 'string',
},
# Lifetime is in seconds
'cycles': {
'name': _('Cycles'),
'type': 'int',
'min': 0,
'default': 1,
},
})
def __init__(self, apikey=None, client_id=None, secret=None, priority=None,
icon=None, icon_type=None, sound=None, mode=None,
cycles=None, **kwargs):
"""
Initialize LaMetric Object
"""
super(NotifyLametric, self).__init__(**kwargs)
self.mode = mode.strip().lower() \
if isinstance(mode, six.string_types) \
else self.template_args['mode']['default']
if self.mode not in LAMETRIC_MODES:
msg = 'An invalid LaMetric Mode ({}) was specified.'.format(mode)
self.logger.warning(msg)
raise TypeError(msg)
# Default Cloud Arguments
self.secret = None
self.client_id = None
# Default Device Arguments
self.apikey = None
if self.mode == LametricMode.CLOUD:
# Client ID
self.client_id = validate_regex(
client_id, *self.template_tokens['client_id']['regex'])
if not self.client_id:
msg = 'An invalid LaMetric Client OAuth2 ID ' \
'({}) was specified.'.format(client_id)
self.logger.warning(msg)
raise TypeError(msg)
# Client Secret
self.secret = validate_regex(secret)
if not self.secret:
msg = 'An invalid LaMetric Client OAuth2 Secret ' \
'({}) was specified.'.format(secret)
self.logger.warning(msg)
raise TypeError(msg)
else: # LametricMode.DEVICE
# API Key
self.apikey = validate_regex(apikey)
if not self.apikey:
msg = 'An invalid LaMetric Device API Key ' \
'({}) was specified.'.format(apikey)
self.logger.warning(msg)
raise TypeError(msg)
if priority not in LAMETRIC_PRIORITIES:
self.priority = self.template_args['priority']['default']
else:
self.priority = priority
# assign our icon (if it was defined); we also eliminate
# any hashtag (#) entries that might be present
self.icon = re.search(r'[#\s]*(?P<value>.+?)\s*$', icon) \
.group('value') if isinstance(icon, six.string_types) else None
if icon_type not in LAMETRIC_ICON_TYPES:
self.icon_type = self.template_args['icon_type']['default']
else:
self.icon_type = icon_type
# The number of times the message should be displayed
self.cycles = self.template_args['cycles']['default'] \
if not (isinstance(cycles, int) and
cycles > self.template_args['cycles']['min']) else cycles
self.sound = None
if isinstance(sound, six.string_types):
# If sound is set, get it's match
self.sound = self.sound_lookup(sound.strip().lower())
if self.sound is None:
self.logger.warning(
'An invalid LaMetric sound ({}) was specified.'.format(
sound))
return
@staticmethod
def sound_lookup(lookup):
"""
A simple match function that takes string and returns the
LametricSound object it was found in.
"""
for x in LAMETRIC_SOUNDS:
match = next((f for f in x[1] if f.startswith(lookup)), None)
if match:
# We're done
return x
# No match was found
return None
def _cloud_notification_payload(self, body, notify_type, headers):
"""
Return URL and payload for cloud directed requests
"""
# Update header entries
headers.update({
'X-Access-Token': self.secret,
'Cache-Control': 'no-cache',
})
if self.sound:
self.logger.warning(
'LaMetric sound setting is unavailable in Cloud mode')
if self.priority != self.template_args['priority']['default']:
self.logger.warning(
'LaMetric priority setting is unavailable in Cloud mode')
if self.icon_type != self.template_args['icon_type']['default']:
self.logger.warning(
'LaMetric icon_type setting is unavailable in Cloud mode')
if self.cycles != self.template_args['cycles']['default']:
self.logger.warning(
'LaMetric cycle settings is unavailable in Cloud mode')
# Assign our icon if the user specified a custom one, otherwise
# choose from our pre-set list (based on notify_type)
icon = self.icon if self.icon \
else self.lametric_icon_id_mapping[notify_type]
# Our Payload
# Cloud Notifications don't have as much functionality
# You can not set priority and/or sound
payload = {
"frames": [
{
"icon": icon,
"text": body,
}
]
}
# Prepare our Cloud Notify URL
notify_url = self.cloud_notify_url.format(client_id=self.client_id)
# Return request parameters
return (notify_url, None, payload)
def _device_notification_payload(self, body, notify_type, headers):
"""
Return URL and Payload for Device directed requests
"""
# Assign our icon if the user specified a custom one, otherwise
# choose from our pre-set list (based on notify_type)
icon = self.icon if self.icon \
else self.lametric_icon_id_mapping[notify_type]
# Our Payload
payload = {
# Priority of the message
"priority": self.priority,
# Icon Type: Represents the nature of notification
"icon_type": self.icon_type,
# The time notification lives in queue to be displayed in
# milliseconds (ms). The default lifetime is 2 minutes (120000ms).
# If notification stayed in queue for longer than lifetime
# milliseconds - it will not be displayed.
"lifetime": 120000,
"model": {
# cycles - the number of times message should be displayed. If
# cycles is set to 0, notification will stay on the screen
# until user dismisses it manually. By default it is set to 1.
"cycles": self.cycles,
"frames": [
{
"icon": icon,
"text": body,
}
]
}
}
if self.sound:
# Sound was set, so add it to the payload
payload["model"]["sound"] = {
# The sound category
"category": self.sound[0],
# The first element of our tuple is always the id
"id": self.sound[1][0],
# repeat - defines the number of times sound must be played.
# If set to 0 sound will be played until notification is
# dismissed. By default the value is set to 1.
"repeat": 1,
}
if not self.user:
# Use default user if there wasn't one otherwise specified
self.user = self.default_device_user
# Prepare our authentication
auth = (self.user, self.password)
# Prepare our Direct Access Notify URL
notify_url = self.device_notify_url.format(
schema="https" if self.secure else "http",
host=self.host,
port=':{}'.format(
self.port if self.port
else self.template_tokens['port']['default']))
# Return request parameters
return (notify_url, auth, payload)
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform LaMetric Notification
"""
# Prepare our headers:
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/json',
'Accept': 'application/json',
}
# Depending on the mode, the payload is gathered by
# - _device_notification_payload()
# - _cloud_notification_payload()
(notify_url, auth, payload) = getattr(
self, '_{}_notification_payload'.format(self.mode))(
body=body, notify_type=notify_type, headers=headers)
self.logger.debug('LaMetric POST URL: %s (cert_verify=%r)' % (
notify_url, self.verify_certificate,
))
self.logger.debug('LaMetric Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
notify_url,
data=dumps(payload),
headers=headers,
auth=auth,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
# An ideal response would be:
# {
# "success": {
# "id": "<notification id>"
# }
# }
if r.status_code not in (
requests.codes.created, requests.codes.ok):
# We had a problem
status_str = \
NotifyLametric.http_response_code_lookup(r.status_code)
self.logger.warning(
'Failed to send LaMetric notification: '
'{}{}error={}.'.format(
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug('Response Details:\r\n{}'.format(r.content))
# Return; we're done
return False
else:
self.logger.info('Sent LaMetric notification.')
except requests.RequestException as e:
self.logger.warning(
'A Connection error occurred sending LaMetric '
'notification to %s.' % self.host)
self.logger.debug('Socket Exception: %s' % str(e))
# Return; we're done
return False
return True
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any URL parameters
params = {
'mode': self.mode,
}
# Extend our parameters
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
if self.icon:
# Assign our icon IF one was specified
params['icon'] = self.icon
if self.mode == LametricMode.CLOUD:
# Upstream/LaMetric App Return
return '{schema}://{client_id}@{secret}/?{params}'.format(
schema=self.protocol,
client_id=self.pprint(self.client_id, privacy, safe=''),
secret=self.pprint(
self.secret, privacy, mode=PrivacyMode.Secret, safe=''),
params=NotifyLametric.urlencode(params))
#
# If we reach here then we're dealing with LametricMode.DEVICE
#
if self.priority != self.template_args['priority']['default']:
params['priority'] = self.priority
if self.icon_type != self.template_args['icon_type']['default']:
params['icon_type'] = self.icon_type
if self.cycles != self.template_args['cycles']['default']:
params['cycles'] = self.cycles
if self.sound:
# Store our sound entry
# The first element of our tuple is always the id
params['sound'] = self.sound[1][0]
auth = ''
if self.user and self.password:
auth = '{user}:{apikey}@'.format(
user=NotifyLametric.quote(self.user, safe=''),
apikey=self.pprint(self.apikey, privacy, safe=''),
)
else: # self.apikey is set
auth = '{apikey}@'.format(
apikey=self.pprint(self.apikey, privacy, safe=''),
)
# Local Return
return '{schema}://{auth}{hostname}{port}/?{params}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
# never encode hostname since we're expecting it to be a valid one
hostname=self.host,
port='' if self.port is None
or self.port == self.template_tokens['port']['default']
else ':{}'.format(self.port),
params=NotifyLametric.urlencode(params),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
if results.get('user') and not results.get('password'):
# Handle URL like:
# schema://user@host
# This becomes the password
results['password'] = results['user']
results['user'] = None
# Priority Handling
if 'priority' in results['qsd'] and len(results['qsd']['priority']):
results['priority'] = results['qsd']['priority'].strip().lower()
# Icon Type
if 'icon' in results['qsd'] and len(results['qsd']['icon']):
results['icon'] = results['qsd']['icon'].strip().lower()
# Icon Type
if 'icon_type' in results['qsd'] and len(results['qsd']['icon_type']):
results['icon_type'] = results['qsd']['icon_type'].strip().lower()
# Sound
if 'sound' in results['qsd'] and len(results['qsd']['sound']):
results['sound'] = results['qsd']['sound'].strip().lower()
# We can detect the mode based on the validity of the hostname
results['mode'] = LametricMode.DEVICE \
if (is_hostname(results['host']) or
is_ipaddr(results['host'])) else LametricMode.CLOUD
# Mode override
if 'mode' in results['qsd'] and len(results['qsd']['mode']):
results['mode'] = NotifyLametric.unquote(results['qsd']['mode'])
# API Key (Device Mode)
if results['mode'] == LametricMode.DEVICE:
if 'apikey' in results['qsd'] and len(results['qsd']['apikey']):
# Extract API Key from an argument
results['apikey'] = \
NotifyLametric.unquote(results['qsd']['apikey'])
else:
results['apikey'] = \
NotifyLametric.unquote(results['password'])
elif results['mode'] == LametricMode.CLOUD:
# OAuth2 ID (Cloud Mode)
if 'oauth_id' in results['qsd'] \
and len(results['qsd']['oauth_id']):
# Extract the OAuth2 Key from an argument
results['client_id'] = \
NotifyLametric.unquote(results['qsd']['oauth_id'])
else:
results['client_id'] = \
NotifyLametric.unquote(results['password'])
# OAuth2 Secret (Cloud Mode)
if 'oauth_secret' in results['qsd'] and \
len(results['qsd']['oauth_secret']):
# Extract the API Secret from an argument
results['secret'] = \
NotifyLametric.unquote(results['qsd']['oauth_secret'])
else:
results['secret'] = \
NotifyLametric.unquote(results['host'])
# Set cycles
try:
results['cycles'] = abs(int(results['qsd'].get('cycles')))
except (TypeError, ValueError):
# Not a valid integer; ignore entry
pass
return results

View File

@ -0,0 +1,219 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2020 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import absolute_import
from __future__ import print_function
import platform
import subprocess
import os
from .NotifyBase import NotifyBase
from ..common import NotifyImageSize
from ..common import NotifyType
from ..utils import parse_bool
from ..AppriseLocale import gettext_lazy as _
class NotifyMacOSX(NotifyBase):
"""
A wrapper for the MacOS X terminal-notifier tool
Source: https://github.com/julienXX/terminal-notifier
"""
# The default descriptive name associated with the Notification
service_name = 'MacOSX Notification'
# The default protocol
protocol = 'macosx'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_macosx'
# Allows the user to specify the NotifyImageSize object
image_size = NotifyImageSize.XY_128
# Disable throttle rate for MacOSX requests since they are normally
# local anyway
request_rate_per_sec = 0
# Limit results to just the first 10 line otherwise there is just to much
# content to display
body_max_line_count = 10
# The path to the terminal-notifier
notify_path = '/usr/local/bin/terminal-notifier'
# Define object templates
templates = (
'{schema}://',
)
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'image': {
'name': _('Include Image'),
'type': 'bool',
'default': True,
'map_to': 'include_image',
},
# Play the NAME sound when the notification appears.
# Sound names are listed in Sound Preferences.
# Use 'default' for the default sound.
'sound': {
'name': _('Sound'),
'type': 'string',
},
})
def __init__(self, sound=None, include_image=True, **kwargs):
"""
Initialize MacOSX Object
"""
super(NotifyMacOSX, self).__init__(**kwargs)
# Track whether or not we want to send an image with our notification
# or not.
self.include_image = include_image
self._enabled = False
if platform.system() == 'Darwin':
# Check this is Mac OS X 10.8, or higher
major, minor = platform.mac_ver()[0].split('.')[:2]
# Toggle our _enabled flag if verion is correct and executable
# found. This is done in such a way to provide verbosity to the
# end user so they know why it may or may not work for them.
if not (int(major) > 10 or (int(major) == 10 and int(minor) >= 8)):
self.logger.warning(
"MacOSX Notifications require your OS to be at least "
"v10.8 (detected {}.{})".format(major, minor))
elif not os.access(self.notify_path, os.X_OK):
self.logger.warning(
"MacOSX Notifications require '{}' to be in place."
.format(self.notify_path))
else:
# We're good to go
self._enabled = True
# Set sound object (no q/a for now)
self.sound = sound
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform MacOSX Notification
"""
if not self._enabled:
self.logger.warning(
"MacOSX Notifications are not supported by this system.")
return False
# Start with our notification path
cmd = [
self.notify_path,
'-message', body,
]
# Title is an optional switch
if title:
cmd.extend(['-title', title])
# The sound to play
if self.sound:
cmd.extend(['-sound', self.sound])
# Support any defined images if set
image_path = None if not self.include_image \
else self.image_url(notify_type)
if image_path:
cmd.extend(['-appIcon', image_path])
# Always call throttle before any remote server i/o is made
self.throttle()
# Send our notification
output = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Wait for process to complete
output.wait()
if output.returncode:
self.logger.warning('Failed to send MacOSX notification.')
self.logger.exception('MacOSX Exception')
return False
self.logger.info('Sent MacOSX notification.')
return True
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any URL parametrs
params = {
'image': 'yes' if self.include_image else 'no',
}
# Extend our parameters
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
if self.sound:
# Store our sound
params['sound'] = self.sound
return '{schema}://_/?{params}'.format(
schema=self.protocol,
params=NotifyMacOSX.urlencode(params),
)
@staticmethod
def parse_url(url):
"""
There are no parameters nessisary for this protocol; simply having
gnome:// is all you need. This function just makes sure that
is in place.
"""
results = NotifyBase.parse_url(url, verify_host=False)
# Include images with our message
results['include_image'] = \
parse_bool(results['qsd'].get('image', True))
# Support 'sound'
if 'sound' in results['qsd'] and len(results['qsd']['sound']):
results['sound'] = NotifyMacOSX.unquote(results['qsd']['sound'])
return results

View File

@ -0,0 +1,704 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2020 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# API Details:
# https://docs.microsoft.com/en-us/previous-versions/office/\
# office-365-api/?redirectedfrom=MSDN
# Information on sending an email:
# https://docs.microsoft.com/en-us/graph/api/user-sendmail\
# ?view=graph-rest-1.0&tabs=http
# Steps to get your Microsoft Client ID, Client Secret, and Tenant ID:
# 1. You should have valid Microsoft personal account. Go to Azure Portal
# 2. Go to -> Microsoft Active Directory --> App Registrations
# 3. Click new -> give any name (your choice) in Name field -> select
# personal Microsoft accounts only --> Register
# 4. Now you have your client_id & Tenant id.
# 5. To create client_secret , go to active directory ->
# Certificate & Tokens -> New client secret
# **This is auto-generated string which may have '@' and '?'
# characters in it. You should encode these to prevent
# from having any issues.**
# 6. Now need to set permission Active directory -> API permissions ->
# Add permission (search mail) , add relevant permission.
# 7. Set the redirect uri (Web) to:
# https://login.microsoftonline.com/common/oauth2/nativeclient
#
# ...and click register.
#
# This needs to be inserted into the "Redirect URI" text box as simply
# checking the check box next to this link seems to be insufficient.
# This is the default redirect uri used by this library, but you can use
# any other if you want.
#
# 8. Now you're good to go
import requests
from datetime import datetime
from datetime import timedelta
from json import loads
from json import dumps
from .NotifyBase import NotifyBase
from ..URLBase import PrivacyMode
from ..common import NotifyFormat
from ..common import NotifyType
from ..utils import is_email
from ..utils import parse_emails
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
class NotifyOffice365(NotifyBase):
"""
A wrapper for Office 365 Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Office 365'
# The services URL
service_url = 'https://office.com/'
# The default protocol
secure_protocol = 'o365'
# Allow 300 requests per minute.
# 60/300 = 0.2
request_rate_per_sec = 0.20
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_office365'
# URL to Microsoft Graph Server
graph_url = 'https://graph.microsoft.com'
# Authentication URL
auth_url = 'https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token'
# Use all the direct application permissions you have configured for your
# app. The endpoint should issue a token for the ones associated with the
# resource you want to use.
# see https://docs.microsoft.com/en-us/azure/active-directory/develop/\
# v2-permissions-and-consent#the-default-scope
scope = '.default'
# Default Notify Format
notify_format = NotifyFormat.HTML
# Define object templates
templates = (
'{schema}://{tenant}:{email}/{client_id}/{secret}',
'{schema}://{tenant}:{email}/{client_id}/{secret}/{targets}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'tenant': {
'name': _('Tenant Domain'),
'type': 'string',
'required': True,
'private': True,
'regex': (r'^[a-z0-9-]+$', 'i'),
},
'email': {
'name': _('Account Email'),
'type': 'string',
'required': True,
},
'client_id': {
'name': _('Client ID'),
'type': 'string',
'required': True,
'private': True,
'regex': (r'^[a-z0-9-]+$', 'i'),
},
'secret': {
'name': _('Client Secret'),
'type': 'string',
'private': True,
'required': True,
},
'targets': {
'name': _('Target Emails'),
'type': 'list:string',
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'to': {
'alias_of': 'targets',
},
'cc': {
'name': _('Carbon Copy'),
'type': 'list:string',
},
'bcc': {
'name': _('Blind Carbon Copy'),
'type': 'list:string',
},
'oauth_id': {
'alias_of': 'client_id',
},
'oauth_secret': {
'alias_of': 'secret',
},
})
def __init__(self, tenant, email, client_id, secret,
targets=None, cc=None, bcc=None, **kwargs):
"""
Initialize Office 365 Object
"""
super(NotifyOffice365, self).__init__(**kwargs)
# Tenant identifier
self.tenant = validate_regex(
tenant, *self.template_tokens['tenant']['regex'])
if not self.tenant:
msg = 'An invalid Office 365 Tenant' \
'({}) was specified.'.format(tenant)
self.logger.warning(msg)
raise TypeError(msg)
result = is_email(email)
if not result:
msg = 'An invalid Office 365 Email Account ID' \
'({}) was specified.'.format(email)
self.logger.warning(msg)
raise TypeError(msg)
# Otherwise store our the email address
self.email = result['full_email']
# Client Key (associated with generated OAuth2 Login)
self.client_id = validate_regex(
client_id, *self.template_tokens['client_id']['regex'])
if not self.client_id:
msg = 'An invalid Office 365 Client OAuth2 ID ' \
'({}) was specified.'.format(client_id)
self.logger.warning(msg)
raise TypeError(msg)
# Client Secret (associated with generated OAuth2 Login)
self.secret = validate_regex(secret)
if not self.secret:
msg = 'An invalid Office 365 Client OAuth2 Secret ' \
'({}) was specified.'.format(secret)
self.logger.warning(msg)
raise TypeError(msg)
# For tracking our email -> name lookups
self.names = {}
# Acquire Carbon Copies
self.cc = set()
# Acquire Blind Carbon Copies
self.bcc = set()
# Parse our targets
self.targets = list()
if targets:
for recipient in parse_emails(targets):
# Validate recipients (to:) and drop bad ones:
result = is_email(recipient)
if result:
# Add our email to our target list
self.targets.append(
(result['name'] if result['name'] else False,
result['full_email']))
continue
self.logger.warning(
'Dropped invalid To email ({}) specified.'
.format(recipient))
else:
# If our target email list is empty we want to add ourselves to it
self.targets.append((False, self.email))
# Validate recipients (cc:) and drop bad ones:
for recipient in parse_emails(cc):
email = is_email(recipient)
if email:
self.cc.add(email['full_email'])
# Index our name (if one exists)
self.names[email['full_email']] = \
email['name'] if email['name'] else False
continue
self.logger.warning(
'Dropped invalid Carbon Copy email '
'({}) specified.'.format(recipient),
)
# Validate recipients (bcc:) and drop bad ones:
for recipient in parse_emails(bcc):
email = is_email(recipient)
if email:
self.bcc.add(email['full_email'])
# Index our name (if one exists)
self.names[email['full_email']] = \
email['name'] if email['name'] else False
continue
self.logger.warning(
'Dropped invalid Blind Carbon Copy email '
'({}) specified.'.format(recipient),
)
# Our token is acquired upon a successful login
self.token = None
# Presume that our token has expired 'now'
self.token_expiry = datetime.now()
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Office 365 Notification
"""
# error tracking (used for function return)
has_error = False
if not self.targets:
# There is no one to email; we're done
self.logger.warning(
'There are no Email recipients to notify')
return False
# Setup our Content Type
content_type = \
'HTML' if self.notify_format == NotifyFormat.HTML else 'Text'
# Prepare our payload
payload = {
'Message': {
'Subject': title,
'Body': {
'ContentType': content_type,
'Content': body,
},
},
'SaveToSentItems': 'false'
}
# Create a copy of the email list
emails = list(self.targets)
# Define our URL to post to
url = '{graph_url}/v1.0/users/{email}/sendmail'.format(
email=self.email,
graph_url=self.graph_url,
)
while len(emails):
# authenticate ourselves if we aren't already; but this function
# also tracks if our token we have is still valid and will
# re-authenticate ourselves if nessisary.
if not self.authenticate():
# We could not authenticate ourselves; we're done
return False
# Get our email to notify
to_name, to_addr = emails.pop(0)
# Strip target out of cc list if in To or Bcc
cc = (self.cc - self.bcc - set([to_addr]))
# Strip target out of bcc list if in To
bcc = (self.bcc - set([to_addr]))
# Prepare our email
payload['Message']['ToRecipients'] = [{
'EmailAddress': {
'Address': to_addr
}
}]
if to_name:
# Apply our To Name
payload['Message']['ToRecipients'][0]['EmailAddress']['Name'] \
= to_name
self.logger.debug('Email To: {}'.format(to_addr))
if cc:
# Prepare our CC list
payload['Message']['CcRecipients'] = []
for addr in cc:
_payload = {'Address': addr}
if self.names.get(addr):
_payload['Name'] = self.names[addr]
# Store our address in our payload
payload['Message']['CcRecipients']\
.append({'EmailAddress': _payload})
self.logger.debug('Email Cc: {}'.format(', '.join(
['{}{}'.format(
'' if self.names.get(e)
else '{}: '.format(self.names[e]), e) for e in cc])))
if bcc:
# Prepare our CC list
payload['Message']['BccRecipients'] = []
for addr in bcc:
_payload = {'Address': addr}
if self.names.get(addr):
_payload['Name'] = self.names[addr]
# Store our address in our payload
payload['Message']['BccRecipients']\
.append({'EmailAddress': _payload})
self.logger.debug('Email Bcc: {}'.format(', '.join(
['{}{}'.format(
'' if self.names.get(e)
else '{}: '.format(self.names[e]), e) for e in bcc])))
# Perform upstream fetch
postokay, response = self._fetch(
url=url, payload=dumps(payload),
content_type='application/json')
# Test if we were okay
if not postokay:
has_error = True
return not has_error
def authenticate(self):
"""
Logs into and acquires us an authentication token to work with
"""
if self.token and self.token_expiry > datetime.now():
# If we're already authenticated and our token is still valid
self.logger.debug(
'Already authenticate with token {}'.format(self.token))
return True
# If we reach here, we've either expired, or we need to authenticate
# for the first time.
# Prepare our payload
payload = {
'client_id': self.client_id,
'client_secret': self.secret,
'scope': '{graph_url}/{scope}'.format(
graph_url=self.graph_url,
scope=self.scope),
'grant_type': 'client_credentials',
}
# Prepare our URL
url = self.auth_url.format(tenant=self.tenant)
# A response looks like the following:
# {
# "token_type": "Bearer",
# "expires_in": 3599,
# "access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJSzI1NiIsInNBXBP..."
# }
#
# Where expires_in defines the number of seconds the key is valid for
# before it must be renewed.
# Alternatively, this could happen too...
# {
# "error": "invalid_scope",
# "error_description": "AADSTS70011: Blah... Blah Blah... Blah",
# "error_codes": [
# 70011
# ],
# "timestamp": "2020-01-09 02:02:12Z",
# "trace_id": "255d1aef-8c98-452f-ac51-23d051240864",
# "correlation_id": "fb3d2015-bc17-4bb9-bb85-30c5cf1aaaa7"
# }
postokay, response = self._fetch(url=url, payload=payload)
if not postokay:
return False
# Reset our token
self.token = None
try:
# Extract our time from our response and subtrace 10 seconds from
# it to give us some wiggle/grace people to re-authenticate if we
# need to
self.token_expiry = datetime.now() + \
timedelta(seconds=int(response.get('expires_in')) - 10)
except (ValueError, AttributeError, TypeError):
# ValueError: expires_in wasn't an integer
# TypeError: expires_in was None
# AttributeError: we could not extract anything from our response
# object.
return False
# Go ahead and store our token if it's available
self.token = response.get('access_token')
# We're authenticated
return True if self.token else False
def _fetch(self, url, payload,
content_type='application/x-www-form-urlencoded'):
"""
Wrapper to request object
"""
# Prepare our headers:
headers = {
'User-Agent': self.app_id,
'Content-Type': content_type,
}
if self.token:
# Are we authenticated?
headers['Authorization'] = 'Bearer ' + self.token
# Default content response object
content = {}
# Some Debug Logging
self.logger.debug('Office 365 POST URL: {} (cert_verify={})'.format(
url, self.verify_certificate))
self.logger.debug('Office 365 Payload: {}' .format(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
# fetch function
try:
r = requests.post(
url,
data=payload,
headers=headers,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
if r.status_code not in (
requests.codes.ok, requests.codes.accepted):
# We had a problem
status_str = \
NotifyOffice365.http_response_code_lookup(r.status_code)
self.logger.warning(
'Failed to send Office 365 POST to {}: '
'{}error={}.'.format(
url,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# Mark our failure
return (False, content)
try:
content = loads(r.content)
except (AttributeError, TypeError, ValueError):
# ValueError = r.content is Unparsable
# TypeError = r.content is None
# AttributeError = r is None
content = {}
except requests.RequestException as e:
self.logger.warning(
'Exception received when sending Office 365 POST to {}: '.
format(url))
self.logger.debug('Socket Exception: %s' % str(e))
# Mark our failure
return (False, content)
return (True, content)
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Our URL parameters
params = self.url_parameters(privacy=privacy, *args, **kwargs)
if self.cc:
# Handle our Carbon Copy Addresses
params['cc'] = ','.join(
['{}{}'.format(
'' if not self.names.get(e)
else '{}:'.format(self.names[e]), e) for e in self.cc])
if self.bcc:
# Handle our Blind Carbon Copy Addresses
params['bcc'] = ','.join(
['{}{}'.format(
'' if not self.names.get(e)
else '{}:'.format(self.names[e]), e) for e in self.bcc])
return '{schema}://{tenant}:{email}/{client_id}/{secret}' \
'/{targets}/?{params}'.format(
schema=self.secure_protocol,
tenant=self.pprint(self.tenant, privacy, safe=''),
# email does not need to be escaped because it should
# already be a valid host and username at this point
email=self.email,
client_id=self.pprint(self.client_id, privacy, safe=''),
secret=self.pprint(
self.secret, privacy, mode=PrivacyMode.Secret,
safe=''),
targets='/'.join(
[NotifyOffice365.quote('{}{}'.format(
'' if not e[0] else '{}:'.format(e[0]), e[1]),
safe='') for e in self.targets]),
params=NotifyOffice365.urlencode(params))
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# Now make a list of all our path entries
# We need to read each entry back one at a time in reverse order
# where each email found we mark as a target. Once we run out
# of targets, the presume the remainder of the entries are part
# of the secret key (since it can contain slashes in it)
entries = NotifyOffice365.split_path(results['fullpath'])
try:
# Get our client_id is the first entry on the path
results['client_id'] = NotifyOffice365.unquote(entries.pop(0))
except IndexError:
# no problem, we may get the client_id another way through
# arguments...
pass
# Prepare our target listing
results['targets'] = list()
while entries:
# Pop the last entry
entry = NotifyOffice365.unquote(entries.pop(-1))
if is_email(entry):
# Store our email and move on
results['targets'].append(entry)
continue
# If we reach here, the entry we just popped is part of the secret
# key, so put it back
entries.append(NotifyOffice365.quote(entry, safe=''))
# We're done
break
# Initialize our tenant
results['tenant'] = None
# Assemble our secret key which is a combination of the host followed
# by all entries in the full path that follow up until the first email
results['secret'] = '/'.join(
[NotifyOffice365.unquote(x) for x in entries])
# Assemble our client id from the user@hostname
if results['password']:
results['email'] = '{}@{}'.format(
NotifyOffice365.unquote(results['password']),
NotifyOffice365.unquote(results['host']),
)
# Update our tenant
results['tenant'] = NotifyOffice365.unquote(results['user'])
else:
# No tenant specified..
results['email'] = '{}@{}'.format(
NotifyOffice365.unquote(results['user']),
NotifyOffice365.unquote(results['host']),
)
# OAuth2 ID
if 'oauth_id' in results['qsd'] and len(results['qsd']['oauth_id']):
# Extract the API Key from an argument
results['client_id'] = \
NotifyOffice365.unquote(results['qsd']['oauth_id'])
# OAuth2 Secret
if 'oauth_secret' in results['qsd'] and \
len(results['qsd']['oauth_secret']):
# Extract the API Secret from an argument
results['secret'] = \
NotifyOffice365.unquote(results['qsd']['oauth_secret'])
# Tenant
if 'from' in results['qsd'] and \
len(results['qsd']['from']):
# Extract the sending account's information
results['email'] = \
NotifyOffice365.unquote(results['qsd']['from'])
# Tenant
if 'tenant' in results['qsd'] and \
len(results['qsd']['tenant']):
# Extract the Tenant from the argument
results['tenant'] = \
NotifyOffice365.unquote(results['qsd']['tenant'])
# Support the 'to' variable so that we can support targets this way too
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifyOffice365.parse_list(results['qsd']['to'])
# Handle Carbon Copy Addresses
if 'cc' in results['qsd'] and len(results['qsd']['cc']):
results['cc'] = results['qsd']['cc']
# Handle Blind Carbon Copy Addresses
if 'bcc' in results['qsd'] and len(results['qsd']['bcc']):
results['bcc'] = results['qsd']['bcc']
return results

View File

@ -0,0 +1,304 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2020 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import re
import requests
from .NotifyBase import NotifyBase
from ..common import NotifyType
from ..utils import is_email
from ..utils import parse_list
from ..utils import parse_bool
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
# Some Phone Number Detection
IS_PHONE_NO = re.compile(r'^\+?(?P<phone>[0-9\s)(+-]+)\s*$')
class NotifyPopcornNotify(NotifyBase):
"""
A wrapper for PopcornNotify Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'PopcornNotify'
# The services URL
service_url = 'https://popcornnotify.com/'
# The default protocol
secure_protocol = 'popcorn'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_popcornnotify'
# PopcornNotify uses the http protocol
notify_url = 'https://popcornnotify.com/notify'
# The maximum targets to include when doing batch transfers
default_batch_size = 10
# Define object templates
templates = (
'{schema}://{apikey}/{targets}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'apikey': {
'name': _('API Key'),
'type': 'string',
'regex': (r'^[a-z0-9]+$', 'i'),
'required': True,
},
'target_phone': {
'name': _('Target Phone No'),
'type': 'string',
'prefix': '+',
'regex': (r'^[0-9\s)(+-]+$', 'i'),
'map_to': 'targets',
},
'target_email': {
'name': _('Target Email'),
'type': 'string',
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
}
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'to': {
'alias_of': 'targets',
},
'batch': {
'name': _('Batch Mode'),
'type': 'bool',
'default': False,
},
})
def __init__(self, apikey, targets=None, batch=False, **kwargs):
"""
Initialize PopcornNotify Object
"""
super(NotifyPopcornNotify, self).__init__(**kwargs)
# Access Token (associated with project)
self.apikey = validate_regex(
apikey, *self.template_tokens['apikey']['regex'])
if not self.apikey:
msg = 'An invalid PopcornNotify API Key ' \
'({}) was specified.'.format(apikey)
self.logger.warning(msg)
raise TypeError(msg)
# Prepare Batch Mode Flag
self.batch = batch
# Parse our targets
self.targets = list()
for target in parse_list(targets):
# Validate targets and drop bad ones:
result = IS_PHONE_NO.match(target)
if result:
# Further check our phone # for it's digit count
result = ''.join(re.findall(r'\d+', result.group('phone')))
if len(result) < 11 or len(result) > 14:
self.logger.warning(
'Dropped invalid phone # '
'({}) specified.'.format(target),
)
continue
# store valid phone number
self.targets.append(result)
continue
result = is_email(target)
if result:
# store valid email
self.targets.append(result['full_email'])
continue
self.logger.warning(
'Dropped invalid target '
'({}) specified.'.format(target),
)
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform PopcornNotify Notification
"""
if len(self.targets) == 0:
# There were no services to notify
self.logger.warning(
'There were no PopcornNotify targets to notify.')
return False
# error tracking (used for function return)
has_error = False
# Prepare our headers
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/x-www-form-urlencoded',
}
# Prepare our payload
payload = {
'message': body,
'subject': title,
}
auth = (self.apikey, None)
# Send in batches if identified to do so
batch_size = 1 if not self.batch else self.default_batch_size
for index in range(0, len(self.targets), batch_size):
# Prepare our recipients
payload['recipients'] = \
','.join(self.targets[index:index + batch_size])
self.logger.debug('PopcornNotify POST URL: %s (cert_verify=%r)' % (
self.notify_url, self.verify_certificate,
))
self.logger.debug('PopcornNotify Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,
auth=auth,
data=payload,
headers=headers,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyPopcornNotify.http_response_code_lookup(
r.status_code)
self.logger.warning(
'Failed to send {} PopcornNotify notification{}: '
'{}{}error={}.'.format(
len(self.targets[index:index + batch_size]),
' to {}'.format(self.targets[index])
if batch_size == 1 else '(s)',
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# Mark our failure
has_error = True
continue
else:
self.logger.info(
'Sent {} PopcornNotify notification{}.'
.format(
len(self.targets[index:index + batch_size]),
' to {}'.format(self.targets[index])
if batch_size == 1 else '(s)',
))
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending {} PopcornNotify '
'notification(s).'.format(
len(self.targets[index:index + batch_size])))
self.logger.debug('Socket Exception: %s' % str(e))
# Mark our failure
has_error = True
continue
return not has_error
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any URL parameters
params = {
'batch': 'yes' if self.batch else 'no',
}
# Extend our parameters
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
return '{schema}://{apikey}/{targets}/?{params}'.format(
schema=self.secure_protocol,
apikey=self.pprint(self.apikey, privacy, safe=''),
targets='/'.join(
[NotifyPopcornNotify.quote(x, safe='') for x in self.targets]),
params=NotifyPopcornNotify.urlencode(params))
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# Get our entries; split_path() looks after unquoting content for us
# by default
results['targets'] = \
NotifyPopcornNotify.split_path(results['fullpath'])
# The hostname is our authentication key
results['apikey'] = NotifyPopcornNotify.unquote(results['host'])
# Support the 'to' variable so that we can support targets this way too
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifyPopcornNotify.parse_list(results['qsd']['to'])
# Get Batch Mode Flag
results['batch'] = \
parse_bool(results['qsd'].get('batch', False))
return results

View File

@ -0,0 +1,376 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2020 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# To use this service you will need a Spontit account from their website
# at https://spontit.com/
#
# After you have an account created:
# - Visit your profile at https://spontit.com/profile and take note of your
# {username}. It might look something like: user12345678901
# - Next generate an API key at https://spontit.com/secret_keys. This will
# generate a very long alpha-numeric string we'll refer to as the
# {apikey}
# The Spontit Syntax is as follows:
# spontit://{username}@{apikey}
import re
import requests
from json import loads
from .NotifyBase import NotifyBase
from ..common import NotifyType
from ..utils import parse_list
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
# Syntax suggests you use a hashtag '#' to help distinguish we're dealing
# with a channel.
# Secondly we extract the user information only if it's
# specified. If not, we use the user of the person sending the notification
# Finally the channel identifier is detected
CHANNEL_REGEX = re.compile(
r'^\s*(#|%23)?((@|%40)?(?P<user>[a-z0-9_]+)([/\\]|%2F))?'
r'(?P<channel>[a-z0-9_-]+)\s*$', re.I)
class NotifySpontit(NotifyBase):
"""
A wrapper for Spontit Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Spontit'
# The services URL
service_url = 'https://spontit.com/'
# All notification requests are secure
secure_protocol = 'spontit'
# Allow 300 requests per minute.
# 60/300 = 0.2
request_rate_per_sec = 0.20
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_spontit'
# Spontit single notification URL
notify_url = 'https://api.spontit.com/v3/push'
# The maximum length of the body
body_maxlen = 5000
# The maximum length of the title
title_maxlen = 100
# If we don't have the specified min length, then we don't bother using
# the body directive
spontit_body_minlen = 100
# Subtitle support; this is the maximum allowed characters defined by
# the API page
spontit_subtitle_maxlen = 20
# Define object templates
templates = (
'{schema}://{user}@{apikey}',
'{schema}://{user}@{apikey}/{targets}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'user': {
'name': _('User ID'),
'type': 'string',
'required': True,
'regex': (r'^[a-z0-9_-]+$', 'i'),
},
'apikey': {
'name': _('API Key'),
'type': 'string',
'required': True,
'private': True,
'regex': (r'^[a-z0-9]+$', 'i'),
},
# Target Channel ID's
# If a slash is used; you must escape it
# If no slash is used; channel is presumed to be your own
'target_channel': {
'name': _('Target Channel ID'),
'type': 'string',
'prefix': '#',
'regex': (r'^[0-9\s)(+-]+$', 'i'),
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
'required': True,
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'to': {
'alias_of': 'targets',
},
'subtitle': {
# Subtitle is available for MacOS users
'name': _('Subtitle'),
'type': 'string',
},
})
def __init__(self, apikey, targets=None, subtitle=None, **kwargs):
"""
Initialize Spontit Object
"""
super(NotifySpontit, self).__init__(**kwargs)
# User ID (associated with project)
user = validate_regex(
self.user, *self.template_tokens['user']['regex'])
if not user:
msg = 'An invalid Spontit User ID ' \
'({}) was specified.'.format(self.user)
self.logger.warning(msg)
raise TypeError(msg)
# use cleaned up version
self.user = user
# API Key (associated with project)
self.apikey = validate_regex(
apikey, *self.template_tokens['apikey']['regex'])
if not self.apikey:
msg = 'An invalid Spontit API Key ' \
'({}) was specified.'.format(apikey)
self.logger.warning(msg)
raise TypeError(msg)
# Save our subtitle information
self.subtitle = subtitle
# Parse our targets
self.targets = list()
for target in parse_list(targets):
# Validate targets and drop bad ones:
result = CHANNEL_REGEX.match(target)
if result:
# Just extract the channel
self.targets.append(
'{}'.format(result.group('channel')))
continue
self.logger.warning(
'Dropped invalid channel/user ({}) specified.'.format(target))
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Sends Message
"""
# error tracking (used for function return)
has_error = False
# Prepare our headers
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/json',
'X-Authorization': self.apikey,
'X-UserId': self.user,
}
# use the list directly
targets = list(self.targets)
if not len(targets):
# The user did not specify a channel and therefore wants to notify
# the main account only. We just set a substitute marker of
# None so that our while loop below can still process one iteration
targets = [None, ]
while len(targets):
# Get our target(s) to notify
target = targets.pop(0)
# Prepare our payload
payload = {
'message': body,
}
# Use our body directive if we exceed the minimum message
# limitation
if len(body) > self.spontit_body_minlen:
payload['message'] = '{}...'.format(
body[:self.spontit_body_minlen - 3])
payload['body'] = body
if self.subtitle:
# Set title if specified
payload['subtitle'] = \
self.subtitle[:self.spontit_subtitle_maxlen]
elif self.app_desc:
# fall back to app description
payload['subtitle'] = \
self.app_desc[:self.spontit_subtitle_maxlen]
elif self.app_id:
# fall back to app id
payload['subtitle'] = \
self.app_id[:self.spontit_subtitle_maxlen]
if title:
# Set title if specified
payload['pushTitle'] = title
if target is not None:
payload['channelName'] = target
# Some Debug Logging
self.logger.debug(
'Spontit POST URL: {} (cert_verify={})'.format(
self.notify_url, self.verify_certificate))
self.logger.debug('Spontit Payload: {}' .format(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,
params=payload,
headers=headers,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
if r.status_code not in (
requests.codes.created, requests.codes.ok):
status_str = \
NotifyBase.http_response_code_lookup(
r.status_code)
try:
# Update our status response if we can
json_response = loads(r.content)
status_str = json_response.get('message', status_str)
except (AttributeError, TypeError, ValueError):
# ValueError = r.content is Unparsable
# TypeError = r.content is None
# AttributeError = r is None
# We could not parse JSON response.
# We will just use the status we already have.
pass
self.logger.warning(
'Failed to send Spontit notification to {}: '
'{}{}error={}.'.format(
target,
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# Mark our failure
has_error = True
continue
# If we reach here; the message was sent
self.logger.info(
'Sent Spontit notification to {}.'.format(target))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
except requests.RequestException as e:
self.logger.warning(
'A Connection error occurred sending Spontit:%s ' % (
', '.join(self.targets)) + 'notification.'
)
self.logger.debug('Socket Exception: %s' % str(e))
# Mark our failure
has_error = True
continue
return not has_error
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Our URL parameters
params = self.url_parameters(privacy=privacy, *args, **kwargs)
if self.subtitle:
params['subtitle'] = self.subtitle
return '{schema}://{userid}@{apikey}/{targets}?{params}'.format(
schema=self.secure_protocol,
userid=self.user,
apikey=self.pprint(self.apikey, privacy, safe=''),
targets='/'.join(
[NotifySpontit.quote(x, safe='') for x in self.targets]),
params=NotifySpontit.urlencode(params))
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# Get our entries; split_path() looks after unquoting content for us
# by default
results['targets'] = NotifySpontit.split_path(results['fullpath'])
# The hostname is our authentication key
results['apikey'] = NotifySpontit.unquote(results['host'])
# Support MacOS subtitle option
if 'subtitle' in results['qsd'] and len(results['qsd']['subtitle']):
results['subtitle'] = results['qsd']['subtitle']
# Support the 'to' variable so that we can support targets this way too
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifySpontit.parse_list(results['qsd']['to'])
return results

View File

View File

@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2020 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import sys
import asyncio
from ..URLBase import URLBase
from ..logger import logger
# A global flag that tracks if we are Python v3.7 or higher
ASYNCIO_RUN_SUPPORT = \
sys.version_info.major > 3 or \
(sys.version_info.major == 3 and sys.version_info.minor >= 7)
def notify(coroutines, debug=False):
"""
A Wrapper to the AsyncNotifyBase.async_notify() calls allowing us
to call gather() and collect the responses
"""
# Create log entry
logger.info(
'Notifying {} service(s) asynchronous.'.format(len(coroutines)))
if ASYNCIO_RUN_SUPPORT:
# async reference produces a SyntaxError (E999) in Python v2.7
# For this reason we turn on the noqa flag
async def main(results, coroutines): # noqa: E999
"""
Task: Notify all servers specified and return our result set
through a mutable object.
"""
# send our notifications and store our result set into
# our results dictionary
results['response'] = \
await asyncio.gather(*coroutines, return_exceptions=True)
# Initialize a mutable object we can populate with our notification
# responses
results = {}
# Send our notifications
asyncio.run(main(results, coroutines), debug=debug)
# Acquire our return status
status = next((s for s in results['response'] if s is False), True)
else:
#
# The depricated way
#
# acquire access to our event loop
loop = asyncio.get_event_loop()
if debug:
# Enable debug mode
loop.set_debug(1)
# Send our notifications and acquire our status
results = loop.run_until_complete(asyncio.gather(*coroutines))
# Acquire our return status
status = next((r for r in results if r is False), True)
# Returns True if all notifications succeeded, otherwise False is
# returned.
return status
class AsyncNotifyBase(URLBase):
"""
asyncio wrapper for the NotifyBase object
"""
async def async_notify(self, *args, **kwargs): # noqa: E999
"""
Async Notification Wrapper
"""
try:
return self.notify(*args, **kwargs)
except TypeError:
# These our our internally thrown notifications
pass
except Exception:
# A catch all so we don't have to abort early
# just because one of our plugins has a bug in it.
logger.exception("Notification Exception")
return False