Upgrade Apprise to fix issue with Discord notification.

This commit is contained in:
Louis Vézina 2019-11-24 20:15:55 -05:00
parent 777913bd40
commit 1f8b5bd2e1
17 changed files with 5307 additions and 0 deletions

View File

@ -0,0 +1,270 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import six
from . import attachment
from . import URLBase
from .AppriseAsset import AppriseAsset
from .logger import logger
from .utils import GET_SCHEMA_RE
class AppriseAttachment(object):
"""
Our Apprise Attachment File Manager
"""
def __init__(self, paths=None, asset=None, cache=True, **kwargs):
"""
Loads all of the paths/urls specified (if any).
The path can either be a single string identifying one explicit
location, otherwise you can pass in a series of locations to scan
via a list.
By default we cache our responses so that subsiquent calls does not
cause the content to be retrieved again. For local file references
this makes no difference at all. But for remote content, this does
mean more then one call can be made to retrieve the (same) data. This
method can be somewhat inefficient if disabled. Only disable caching
if you understand the consequences.
You can alternatively set the cache value to an int identifying the
number of seconds the previously retrieved can exist for before it
should be considered expired.
It's also worth nothing that the cache value is only set to elements
that are not already of subclass AttachBase()
"""
# Initialize our attachment listings
self.attachments = list()
# Set our cache flag
self.cache = cache
# Prepare our Asset Object
self.asset = \
asset if isinstance(asset, AppriseAsset) else AppriseAsset()
# Now parse any paths specified
if paths is not None:
# Store our path(s)
if not self.add(paths):
# Parse Source domain based on from_addr
raise TypeError("One or more attachments could not be added.")
def add(self, attachments, asset=None, cache=None):
"""
Adds one or more attachments into our list.
By default we cache our responses so that subsiquent calls does not
cause the content to be retrieved again. For local file references
this makes no difference at all. But for remote content, this does
mean more then one call can be made to retrieve the (same) data. This
method can be somewhat inefficient if disabled. Only disable caching
if you understand the consequences.
You can alternatively set the cache value to an int identifying the
number of seconds the previously retrieved can exist for before it
should be considered expired.
It's also worth nothing that the cache value is only set to elements
that are not already of subclass AttachBase()
"""
# Initialize our return status
return_status = True
# Initialize our default cache value
cache = cache if cache is not None else self.cache
if isinstance(asset, AppriseAsset):
# prepare default asset
asset = self.asset
if isinstance(attachments, attachment.AttachBase):
# Go ahead and just add our attachments into our list
self.attachments.append(attachments)
return True
elif isinstance(attachments, six.string_types):
# Save our path
attachments = (attachments, )
elif not isinstance(attachments, (tuple, set, list)):
logger.error(
'An invalid attachment url (type={}) was '
'specified.'.format(type(attachments)))
return False
# Iterate over our attachments
for _attachment in attachments:
if isinstance(_attachment, attachment.AttachBase):
# Go ahead and just add our attachment into our list
self.attachments.append(_attachment)
continue
elif not isinstance(_attachment, six.string_types):
logger.warning(
"An invalid attachment (type={}) was specified.".format(
type(_attachment)))
return_status = False
continue
logger.debug("Loading attachment: {}".format(_attachment))
# Instantiate ourselves an object, this function throws or
# returns None if it fails
instance = AppriseAttachment.instantiate(
_attachment, asset=asset, cache=cache)
if not isinstance(instance, attachment.AttachBase):
return_status = False
continue
# Add our initialized plugin to our server listings
self.attachments.append(instance)
# Return our status
return return_status
@staticmethod
def instantiate(url, asset=None, cache=None, suppress_exceptions=True):
"""
Returns the instance of a instantiated attachment plugin based on
the provided Attachment URL. If the url fails to be parsed, then None
is returned.
A specified cache value will over-ride anything set
"""
# Attempt to acquire the schema at the very least to allow our
# attachment based urls.
schema = GET_SCHEMA_RE.match(url)
if schema is None:
# Plan B is to assume we're dealing with a file
schema = attachment.AttachFile.protocol
url = '{}://{}'.format(schema, URLBase.quote(url))
else:
# Ensure our schema is always in lower case
schema = schema.group('schema').lower()
# Some basic validation
if schema not in attachment.SCHEMA_MAP:
logger.warning('Unsupported schema {}.'.format(schema))
return None
# Parse our url details of the server object as dictionary containing
# all of the information parsed from our URL
results = attachment.SCHEMA_MAP[schema].parse_url(url)
if not results:
# Failed to parse the server URL
logger.warning('Unparseable URL {}.'.format(url))
return None
# Prepare our Asset Object
results['asset'] = \
asset if isinstance(asset, AppriseAsset) else AppriseAsset()
if cache is not None:
# Force an over-ride of the cache value to what we have specified
results['cache'] = cache
if suppress_exceptions:
try:
# Attempt to create an instance of our plugin using the parsed
# URL information
attach_plugin = \
attachment.SCHEMA_MAP[results['schema']](**results)
except Exception:
# the arguments are invalid or can not be used.
logger.warning('Could not load URL: %s' % url)
return None
else:
# Attempt to create an instance of our plugin using the parsed
# URL information but don't wrap it in a try catch
attach_plugin = attachment.SCHEMA_MAP[results['schema']](**results)
return attach_plugin
def clear(self):
"""
Empties our attachment list
"""
self.attachments[:] = []
def size(self):
"""
Returns the total size of accumulated attachments
"""
return sum([len(a) for a in self.attachments if len(a) > 0])
def pop(self, index=-1):
"""
Removes an indexed Apprise Attachment from the stack and returns it.
by default the last element is poped from the list
"""
# Remove our entry
return self.attachments.pop(index)
def __getitem__(self, index):
"""
Returns the indexed entry of a loaded apprise attachments
"""
return self.attachments[index]
def __bool__(self):
"""
Allows the Apprise object to be wrapped in an Python 3.x based 'if
statement'. True is returned if at least one service has been loaded.
"""
return True if self.attachments else False
def __nonzero__(self):
"""
Allows the Apprise object to be wrapped in an Python 2.x based 'if
statement'. True is returned if at least one service has been loaded.
"""
return True if self.attachments else False
def __iter__(self):
"""
Returns an iterator to our attachment list
"""
return iter(self.attachments)
def __len__(self):
"""
Returns the number of attachment entries loaded
"""
return len(self.attachments)

View File

@ -0,0 +1,333 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import os
import time
import mimetypes
from ..URLBase import URLBase
from ..utils import parse_bool
class AttachBase(URLBase):
"""
This is the base class for all supported attachment types
"""
# For attachment type detection; this amount of data is read into memory
# 128KB (131072B)
max_detect_buffer_size = 131072
# Unknown mimetype
unknown_mimetype = 'application/octet-stream'
# Our filename when we can't otherwise determine one
unknown_filename = 'apprise-attachment'
# Our filename extension when we can't otherwise determine one
unknown_filename_extension = '.obj'
# The strict argument is a flag specifying whether the list of known MIME
# types is limited to only the official types registered with IANA. When
# strict is True, only the IANA types are supported; when strict is False
# (the default), some additional non-standard but commonly used MIME types
# are also recognized.
strict = False
# The maximum file-size we will accept for an attachment size. If this is
# set to zero (0), then no check is performed
# 1 MB = 1048576 bytes
# 5 MB = 5242880 bytes
max_file_size = 5242880
def __init__(self, name=None, mimetype=None, cache=True, **kwargs):
"""
Initialize some general logging and common server arguments that will
keep things consistent when working with the configurations that
inherit this class.
Optionally provide a filename to over-ride name associated with the
actual file retrieved (from where-ever).
The mime-type is automatically detected, but you can over-ride this by
explicitly stating what it should be.
By default we cache our responses so that subsiquent calls does not
cause the content to be retrieved again. For local file references
this makes no difference at all. But for remote content, this does
mean more then one call can be made to retrieve the (same) data. This
method can be somewhat inefficient if disabled. Only disable caching
if you understand the consequences.
You can alternatively set the cache value to an int identifying the
number of seconds the previously retrieved can exist for before it
should be considered expired.
"""
super(AttachBase, self).__init__(**kwargs)
if not mimetypes.inited:
# Ensure mimetypes has been initialized
mimetypes.init()
# Attach Filename (does not have to be the same as path)
self._name = name
# The mime type of the attached content. This is detected if not
# otherwise specified.
self._mimetype = mimetype
# The detected_mimetype, this is only used as a fallback if the
# mimetype wasn't forced by the user
self.detected_mimetype = None
# The detected filename by calling child class. A detected filename
# is always used if no force naming was specified.
self.detected_name = None
# Absolute path to attachment
self.download_path = None
# Set our cache flag; it can be True or a (positive) integer
try:
self.cache = cache if isinstance(cache, bool) else int(cache)
if self.cache < 0:
err = 'A negative cache value ({}) was specified.'.format(
cache)
self.logger.warning(err)
raise TypeError(err)
except (ValueError, TypeError):
err = 'An invalid cache value ({}) was specified.'.format(cache)
self.logger.warning(err)
raise TypeError(err)
# Validate mimetype if specified
if self._mimetype:
if next((t for t in mimetypes.types_map.values()
if self._mimetype == t), None) is None:
err = 'An invalid mime-type ({}) was specified.'.format(
mimetype)
self.logger.warning(err)
raise TypeError(err)
return
@property
def path(self):
"""
Returns the absolute path to the filename. If this is not known or
is know but has been considered expired (due to cache setting), then
content is re-retrieved prior to returning.
"""
if not self.exists():
# we could not obtain our path
return None
return self.download_path
@property
def name(self):
"""
Returns the filename
"""
if self._name:
# return our fixed content
return self._name
if not self.exists():
# we could not obtain our name
return None
if not self.detected_name:
# If we get here, our download was successful but we don't have a
# filename based on our content.
extension = mimetypes.guess_extension(self.mimetype)
self.detected_name = '{}{}'.format(
self.unknown_filename,
extension if extension else self.unknown_filename_extension)
return self.detected_name
@property
def mimetype(self):
"""
Returns mime type (if one is present).
Content is cached once determied to prevent overhead of future
calls.
"""
if self._mimetype:
# return our pre-calculated cached content
return self._mimetype
if not self.exists():
# we could not obtain our attachment
return None
if not self.detected_mimetype:
# guess_type() returns: (type, encoding) and sets type to None
# if it can't otherwise determine it.
try:
# Directly reference _name and detected_name to prevent
# recursion loop (as self.name calls this function)
self.detected_mimetype, _ = mimetypes.guess_type(
self._name if self._name
else self.detected_name, strict=self.strict)
except TypeError:
# Thrown if None was specified in filename section
pass
# Return our mime type
return self.detected_mimetype \
if self.detected_mimetype else self.unknown_mimetype
def exists(self):
"""
Simply returns true if the object has downloaded and stored the
attachment AND the attachment has not expired.
"""
if self.download_path and os.path.isfile(self.download_path) \
and self.cache:
# We have enough reason to look further into our cached content
# and verify it has not expired.
if self.cache is True:
# return our fixed content as is; we will always cache it
return True
# Verify our cache time to determine whether we will get our
# content again.
try:
age_in_sec = time.time() - os.stat(self.download_path).st_mtime
if age_in_sec <= self.cache:
return True
except (OSError, IOError):
# The file is not present
pass
return self.download()
def invalidate(self):
"""
Release any temporary data that may be open by child classes.
Externally fetched content should be automatically cleaned up when
this function is called.
This function should also reset the following entries to None:
- detected_name : Should identify a human readable filename
- download_path: Must contain a absolute path to content
- detected_mimetype: Should identify mimetype of content
"""
self.detected_name = None
self.download_path = None
self.detected_mimetype = None
return
def download(self):
"""
This function must be over-ridden by inheriting classes.
Inherited classes MUST populate:
- detected_name: Should identify a human readable filename
- download_path: Must contain a absolute path to content
- detected_mimetype: Should identify mimetype of content
If a download fails, you should ensure these values are set to None.
"""
raise NotImplementedError(
"download() is implimented by the child class.")
@staticmethod
def parse_url(url, verify_host=True, mimetype_db=None):
"""Parses the URL and returns it broken apart into a dictionary.
This is very specific and customized for Apprise.
Args:
url (str): The URL you want to fully parse.
verify_host (:obj:`bool`, optional): a flag kept with the parsed
URL which some child classes will later use to verify SSL
keys (if SSL transactions take place). Unless under very
specific circumstances, it is strongly recomended that
you leave this default value set to True.
Returns:
A dictionary is returned containing the URL fully parsed if
successful, otherwise None is returned.
"""
results = URLBase.parse_url(url, verify_host=verify_host)
if not results:
# We're done; we failed to parse our url
return results
# Allow overriding the default config mime type
if 'mime' in results['qsd']:
results['mimetype'] = results['qsd'].get('mime', '') \
.strip().lower()
# Allow overriding the default file name
if 'name' in results['qsd']:
results['name'] = results['qsd'].get('name', '') \
.strip().lower()
# Our cache value
if 'cache' in results['qsd']:
# First try to get it's integer value
try:
results['cache'] = int(results['qsd']['cache'])
except (ValueError, TypeError):
# No problem, it just isn't an integer; now treat it as a bool
# instead:
results['cache'] = parse_bool(results['qsd']['cache'])
return results
def __len__(self):
"""
Returns the filesize of the attachment.
"""
return os.path.getsize(self.path) if self.path else 0
def __bool__(self):
"""
Allows the Apprise object to be wrapped in an Python 3.x based 'if
statement'. True is returned if our content was downloaded correctly.
"""
return True if self.path else False
def __nonzero__(self):
"""
Allows the Apprise object to be wrapped in an Python 2.x based 'if
statement'. True is returned if our content was downloaded correctly.
"""
return True if self.path else False

View File

@ -0,0 +1,129 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import re
import os
from .AttachBase import AttachBase
from ..AppriseLocale import gettext_lazy as _
class AttachFile(AttachBase):
"""
A wrapper for File based attachment sources
"""
# The default descriptive name associated with the service
service_name = _('Local File')
# The default protocol
protocol = 'file'
def __init__(self, path, **kwargs):
"""
Initialize Local File Attachment Object
"""
super(AttachFile, self).__init__(**kwargs)
# Store path but mark it dirty since we have not performed any
# verification at this point.
self.dirty_path = os.path.expanduser(path)
return
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {}
if self._mimetype:
# A mime-type was enforced
args['mime'] = self._mimetype
if self._name:
# A name was enforced
args['name'] = self._name
return 'file://{path}{args}'.format(
path=self.quote(self.dirty_path),
args='?{}'.format(self.urlencode(args)) if args else '',
)
def download(self, **kwargs):
"""
Perform retrieval of our data.
For file base attachments, our data already exists, so we only need to
validate it.
"""
# Ensure any existing content set has been invalidated
self.invalidate()
if not os.path.isfile(self.dirty_path):
return False
if self.max_file_size > 0 and \
os.path.getsize(self.dirty_path) > self.max_file_size:
# The content to attach is to large
self.logger.error(
'Content exceeds allowable maximum file length '
'({}KB): {}'.format(
int(self.max_file_size / 1024), self.url(privacy=True)))
# Return False (signifying a failure)
return False
# We're good to go if we get here. Set our minimum requirements of
# a call do download() before returning a success
self.download_path = self.dirty_path
self.detected_name = os.path.basename(self.download_path)
# We don't need to set our self.detected_mimetype as it can be
# pulled at the time it's needed based on the detected_name
return True
@staticmethod
def parse_url(url):
"""
Parses the URL so that we can handle all different file paths
and return it as our path object
"""
results = AttachBase.parse_url(url, verify_host=False)
if not results:
# We're done early; it's not a good URL
return results
match = re.match(r'file://(?P<path>[^?]+)(\?.*)?', url, re.I)
if not match:
return None
results['path'] = AttachFile.unquote(match.group('path'))
return results

View File

@ -0,0 +1,321 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import re
import os
import six
import requests
from tempfile import NamedTemporaryFile
from .AttachBase import AttachBase
from ..URLBase import PrivacyMode
from ..AppriseLocale import gettext_lazy as _
class AttachHTTP(AttachBase):
"""
A wrapper for HTTP based attachment sources
"""
# The default descriptive name associated with the service
service_name = _('Web Based')
# The default protocol
protocol = 'http'
# The default secure protocol
secure_protocol = 'https'
# The maximum number of seconds to wait for a connection to be established
# before out-right just giving up
connection_timeout_sec = 5.0
# The number of bytes in memory to read from the remote source at a time
chunk_size = 8192
def __init__(self, headers=None, **kwargs):
"""
Initialize HTTP Object
headers can be a dictionary of key/value pairs that you want to
additionally include as part of the server headers to post with
"""
super(AttachHTTP, self).__init__(**kwargs)
self.schema = 'https' if self.secure else 'http'
self.fullpath = kwargs.get('fullpath')
if not isinstance(self.fullpath, six.string_types):
self.fullpath = '/'
self.headers = {}
if headers:
# Store our extra headers
self.headers.update(headers)
# Where our content is written to upon a call to download.
self._temp_file = None
return
def download(self, **kwargs):
"""
Perform retrieval of the configuration based on the specified request
"""
# Ensure any existing content set has been invalidated
self.invalidate()
# prepare header
headers = {
'User-Agent': self.app_id,
}
# Apply any/all header over-rides defined
headers.update(self.headers)
auth = None
if self.user:
auth = (self.user, self.password)
url = '%s://%s' % (self.schema, self.host)
if isinstance(self.port, int):
url += ':%d' % self.port
url += self.fullpath
self.logger.debug('HTTP POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
))
# Where our request object will temporarily live.
r = None
# Always call throttle before any remote server i/o is made
self.throttle()
try:
# Make our request
with requests.get(
url,
headers=headers,
auth=auth,
verify=self.verify_certificate,
timeout=self.connection_timeout_sec,
stream=True) as r:
# Handle Errors
r.raise_for_status()
# Get our file-size (if known)
try:
file_size = int(r.headers.get('Content-Length', '0'))
except (TypeError, ValueError):
# Handle edge case where Content-Length is a bad value
file_size = 0
# Perform a little Q/A on file limitations and restrictions
if self.max_file_size > 0 and file_size > self.max_file_size:
# The content retrieved is to large
self.logger.error(
'HTTP response exceeds allowable maximum file length '
'({}KB): {}'.format(
int(self.max_file_size / 1024),
self.url(privacy=True)))
# Return False (signifying a failure)
return False
# Detect config format based on mime if the format isn't
# already enforced
self.detected_mimetype = r.headers.get('Content-Type')
d = r.headers.get('Content-Disposition', '')
result = re.search(
"filename=['\"]?(?P<name>[^'\"]+)['\"]?", d, re.I)
if result:
self.detected_name = result.group('name').strip()
# Create a temporary file to work with
self._temp_file = NamedTemporaryFile()
# Get our chunk size
chunk_size = self.chunk_size
# Track all bytes written to disk
bytes_written = 0
# If we get here, we can now safely write our content to disk
for chunk in r.iter_content(chunk_size=chunk_size):
# filter out keep-alive chunks
if chunk:
self._temp_file.write(chunk)
bytes_written = self._temp_file.tell()
# Prevent a case where Content-Length isn't provided
# we don't want to fetch beyond our limits
if self.max_file_size > 0:
if bytes_written > self.max_file_size:
# The content retrieved is to large
self.logger.error(
'HTTP response exceeds allowable maximum '
'file length ({}KB): {}'.format(
int(self.max_file_size / 1024),
self.url(privacy=True)))
# Invalidate any variables previously set
self.invalidate()
# Return False (signifying a failure)
return False
elif bytes_written + chunk_size \
> self.max_file_size:
# Adjust out next read to accomodate up to our
# limit +1. This will prevent us from readig
# to much into our memory buffer
self.max_file_size - bytes_written + 1
# Ensure our content is flushed to disk for post-processing
self._temp_file.flush()
# Set our minimum requirements for a successful download() call
self.download_path = self._temp_file.name
if not self.detected_name:
self.detected_name = os.path.basename(self.fullpath)
except requests.RequestException as e:
self.logger.error(
'A Connection error occured retrieving HTTP '
'configuration from %s.' % self.host)
self.logger.debug('Socket Exception: %s' % str(e))
# Invalidate any variables previously set
self.invalidate()
# Return False (signifying a failure)
return False
except (IOError, OSError):
# IOError is present for backwards compatibility with Python
# versions older then 3.3. >= 3.3 throw OSError now.
# Could not open and/or write the temporary file
self.logger.error(
'Could not write attachment to disk: {}'.format(
self.url(privacy=True)))
# Invalidate any variables previously set
self.invalidate()
# Return False (signifying a failure)
return False
# Return our success
return True
def invalidate(self):
"""
Close our temporary file
"""
if self._temp_file:
self._temp_file.close()
self._temp_file = None
super(AttachHTTP, self).invalidate()
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Prepare our cache value
if isinstance(self.cache, bool) or not self.cache:
cache = 'yes' if self.cache else 'no'
else:
cache = int(self.cache)
# Define any arguments set
args = {
'verify': 'yes' if self.verify_certificate else 'no',
'cache': cache,
}
if self._mimetype:
# A format was enforced
args['mime'] = self._mimetype
if self._name:
# A name was enforced
args['name'] = self._name
# Append our headers into our args
args.update({'+{}'.format(k): v for k, v in self.headers.items()})
# Determine Authentication
auth = ''
if self.user and self.password:
auth = '{user}:{password}@'.format(
user=self.quote(self.user, safe=''),
password=self.pprint(
self.password, privacy, mode=PrivacyMode.Secret, safe=''),
)
elif self.user:
auth = '{user}@'.format(
user=self.quote(self.user, safe=''),
)
default_port = 443 if self.secure else 80
return '{schema}://{auth}{hostname}{port}{fullpath}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=self.quote(self.host, safe=''),
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
fullpath=self.quote(self.fullpath, safe='/'),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = AttachBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Add our headers that the user can potentially over-ride if they wish
# to to our returned result set
results['headers'] = results['qsd-']
results['headers'].update(results['qsd+'])
return results

View File

@ -0,0 +1,119 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import six
import re
from os import listdir
from os.path import dirname
from os.path import abspath
# Maintains a mapping of all of the attachment services
SCHEMA_MAP = {}
__all__ = []
# Load our Lookup Matrix
def __load_matrix(path=abspath(dirname(__file__)), name='apprise.attachment'):
"""
Dynamically load our schema map; this allows us to gracefully
skip over modules we simply don't have the dependencies for.
"""
# Used for the detection of additional Attachment Services objects
# The .py extension is optional as we support loading directories too
module_re = re.compile(r'^(?P<name>Attach[a-z0-9]+)(\.py)?$', re.I)
for f in listdir(path):
match = module_re.match(f)
if not match:
# keep going
continue
# Store our notification/plugin name:
plugin_name = match.group('name')
try:
module = __import__(
'{}.{}'.format(name, plugin_name),
globals(), locals(),
fromlist=[plugin_name])
except ImportError:
# No problem, we can't use this object
continue
if not hasattr(module, plugin_name):
# Not a library we can load as it doesn't follow the simple rule
# that the class must bear the same name as the notification
# file itself.
continue
# Get our plugin
plugin = getattr(module, plugin_name)
if not hasattr(plugin, 'app_id'):
# Filter out non-notification modules
continue
elif plugin_name in __all__:
# we're already handling this object
continue
# Add our module name to our __all__
__all__.append(plugin_name)
# Ensure we provide the class as the reference to this directory and
# not the module:
globals()[plugin_name] = plugin
# Load protocol(s) if defined
proto = getattr(plugin, 'protocol', None)
if isinstance(proto, six.string_types):
if proto not in SCHEMA_MAP:
SCHEMA_MAP[proto] = plugin
elif isinstance(proto, (set, list, tuple)):
# Support iterables list types
for p in proto:
if p not in SCHEMA_MAP:
SCHEMA_MAP[p] = plugin
# Load secure protocol(s) if defined
protos = getattr(plugin, 'secure_protocol', None)
if isinstance(protos, six.string_types):
if protos not in SCHEMA_MAP:
SCHEMA_MAP[protos] = plugin
if isinstance(protos, (set, list, tuple)):
# Support iterables list types
for p in protos:
if p not in SCHEMA_MAP:
SCHEMA_MAP[p] = plugin
return SCHEMA_MAP
# Dynamically build our schema base
__load_matrix()

View File

@ -0,0 +1,382 @@
# Translations template for apprise.
# Copyright (C) 2019 Chris Caron
# This file is distributed under the same license as the apprise project.
# FIRST AUTHOR <EMAIL@ADDRESS>, 2019.
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: apprise 0.8.1\n"
"Report-Msgid-Bugs-To: lead2gold@gmail.com\n"
"POT-Creation-Date: 2019-10-13 21:39-0400\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.7.0\n"
msgid "API Key"
msgstr ""
msgid "API Secret"
msgstr ""
msgid "Access Key"
msgstr ""
msgid "Access Key ID"
msgstr ""
msgid "Access Secret"
msgstr ""
msgid "Access Token"
msgstr ""
msgid "Account SID"
msgstr ""
msgid "Add Tokens"
msgstr ""
msgid "Application Key"
msgstr ""
msgid "Application Secret"
msgstr ""
msgid "Auth Token"
msgstr ""
msgid "Authentication Key"
msgstr ""
msgid "Authorization Token"
msgstr ""
msgid "Avatar Image"
msgstr ""
msgid "Batch Mode"
msgstr ""
msgid "Blind Carbon Copy"
msgstr ""
msgid "Bot Name"
msgstr ""
msgid "Bot Token"
msgstr ""
msgid "Cache Results"
msgstr ""
msgid "Carbon Copy"
msgstr ""
msgid "Channels"
msgstr ""
msgid "Consumer Key"
msgstr ""
msgid "Consumer Secret"
msgstr ""
msgid "Country"
msgstr ""
msgid "Detect Bot Owner"
msgstr ""
msgid "Device ID"
msgstr ""
msgid "Display Footer"
msgstr ""
msgid "Domain"
msgstr ""
msgid "Duration"
msgstr ""
msgid "Email"
msgstr ""
msgid "Encrypted Password"
msgstr ""
msgid "Encrypted Salt"
msgstr ""
msgid "Event"
msgstr ""
msgid "Events"
msgstr ""
msgid "Expire"
msgstr ""
msgid "Facility"
msgstr ""
msgid "Footer Logo"
msgstr ""
msgid "From Email"
msgstr ""
msgid "From Name"
msgstr ""
msgid "From Phone No"
msgstr ""
msgid "Group"
msgstr ""
msgid "HTTP Header"
msgstr ""
msgid "Hostname"
msgstr ""
msgid "IRC Colors"
msgstr ""
msgid "Include Image"
msgstr ""
msgid "Log PID"
msgstr ""
msgid "Log to STDERR"
msgstr ""
msgid "Message Hook"
msgstr ""
msgid "Message Mode"
msgstr ""
msgid "Modal"
msgstr ""
msgid "Notify Format"
msgstr ""
msgid "Organization"
msgstr ""
msgid "Originating Address"
msgstr ""
msgid "Overflow Mode"
msgstr ""
msgid "Password"
msgstr ""
msgid "Path"
msgstr ""
msgid "Port"
msgstr ""
msgid "Prefix"
msgstr ""
msgid "Priority"
msgstr ""
msgid "Project ID"
msgstr ""
msgid "Provider Key"
msgstr ""
msgid "Region"
msgstr ""
msgid "Region Name"
msgstr ""
msgid "Remove Tokens"
msgstr ""
msgid "Retry"
msgstr ""
msgid "Rooms"
msgstr ""
msgid "Route"
msgstr ""
msgid "SMTP Server"
msgstr ""
msgid "Schema"
msgstr ""
msgid "Secret Access Key"
msgstr ""
msgid "Secret Key"
msgstr ""
msgid "Secure Mode"
msgstr ""
msgid "Sender ID"
msgstr ""
msgid "Server Key"
msgstr ""
msgid "Server Timeout"
msgstr ""
msgid "Sound"
msgstr ""
msgid "Source Email"
msgstr ""
msgid "Source JID"
msgstr ""
msgid "Source Phone No"
msgstr ""
msgid "Target Channel"
msgstr ""
msgid "Target Channel ID"
msgstr ""
msgid "Target Chat ID"
msgstr ""
msgid "Target Device"
msgstr ""
msgid "Target Device ID"
msgstr ""
msgid "Target Email"
msgstr ""
msgid "Target Emails"
msgstr ""
msgid "Target Encoded ID"
msgstr ""
msgid "Target JID"
msgstr ""
msgid "Target Phone No"
msgstr ""
msgid "Target Room Alias"
msgstr ""
msgid "Target Room ID"
msgstr ""
msgid "Target Short Code"
msgstr ""
msgid "Target Tag ID"
msgstr ""
msgid "Target Topic"
msgstr ""
msgid "Target User"
msgstr ""
msgid "Targets"
msgstr ""
msgid "Template"
msgstr ""
msgid "Template Data"
msgstr ""
msgid "Text To Speech"
msgstr ""
msgid "To Channel ID"
msgstr ""
msgid "To Email"
msgstr ""
msgid "To User ID"
msgstr ""
msgid "Token"
msgstr ""
msgid "Token A"
msgstr ""
msgid "Token B"
msgstr ""
msgid "Token C"
msgstr ""
msgid "Urgency"
msgstr ""
msgid "Use Avatar"
msgstr ""
msgid "User Key"
msgstr ""
msgid "User Name"
msgstr ""
msgid "Username"
msgstr ""
msgid "Verify SSL"
msgstr ""
msgid "Version"
msgstr ""
msgid "Webhook"
msgstr ""
msgid "Webhook ID"
msgstr ""
msgid "Webhook Mode"
msgstr ""
msgid "Webhook Token"
msgstr ""
msgid "X-Axis"
msgstr ""
msgid "XEP"
msgstr ""
msgid "Y-Axis"
msgstr ""
msgid "ttl"
msgstr ""

View File

@ -0,0 +1,293 @@
# English translations for apprise.
# Copyright (C) 2019 Chris Caron
# This file is distributed under the same license as the apprise project.
# Chris Caron <lead2gold@gmail.com>, 2019.
#
msgid ""
msgstr ""
"Project-Id-Version: apprise 0.7.6\n"
"Report-Msgid-Bugs-To: lead2gold@gmail.com\n"
"POT-Creation-Date: 2019-05-28 16:56-0400\n"
"PO-Revision-Date: 2019-05-24 20:00-0400\n"
"Last-Translator: Chris Caron <lead2gold@gmail.com>\n"
"Language: en\n"
"Language-Team: en <LL@li.org>\n"
"Plural-Forms: nplurals=2; plural=(n != 1)\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.6.0\n"
msgid "API Key"
msgstr ""
msgid "Access Key"
msgstr ""
msgid "Access Key ID"
msgstr ""
msgid "Access Secret"
msgstr ""
msgid "Access Token"
msgstr ""
msgid "Account SID"
msgstr ""
msgid "Add Tokens"
msgstr ""
msgid "Application Key"
msgstr ""
msgid "Application Secret"
msgstr ""
msgid "Auth Token"
msgstr ""
msgid "Authorization Token"
msgstr ""
msgid "Avatar Image"
msgstr ""
msgid "Bot Name"
msgstr ""
msgid "Bot Token"
msgstr ""
msgid "Channels"
msgstr ""
msgid "Consumer Key"
msgstr ""
msgid "Consumer Secret"
msgstr ""
msgid "Detect Bot Owner"
msgstr ""
msgid "Device ID"
msgstr ""
msgid "Display Footer"
msgstr ""
msgid "Domain"
msgstr ""
msgid "Duration"
msgstr ""
msgid "Events"
msgstr ""
msgid "Footer Logo"
msgstr ""
msgid "From Email"
msgstr ""
msgid "From Name"
msgstr ""
msgid "From Phone No"
msgstr ""
msgid "Group"
msgstr ""
msgid "HTTP Header"
msgstr ""
msgid "Hostname"
msgstr ""
msgid "Include Image"
msgstr ""
msgid "Modal"
msgstr ""
msgid "Notify Format"
msgstr ""
msgid "Organization"
msgstr ""
msgid "Overflow Mode"
msgstr ""
msgid "Password"
msgstr ""
msgid "Port"
msgstr ""
msgid "Priority"
msgstr ""
msgid "Provider Key"
msgstr ""
msgid "Region"
msgstr ""
msgid "Region Name"
msgstr ""
msgid "Remove Tokens"
msgstr ""
msgid "Rooms"
msgstr ""
msgid "SMTP Server"
msgstr ""
msgid "Schema"
msgstr ""
msgid "Secret Access Key"
msgstr ""
msgid "Secret Key"
msgstr ""
msgid "Secure Mode"
msgstr ""
msgid "Server Timeout"
msgstr ""
msgid "Sound"
msgstr ""
msgid "Source JID"
msgstr ""
msgid "Target Channel"
msgstr ""
msgid "Target Chat ID"
msgstr ""
msgid "Target Device"
msgstr ""
msgid "Target Device ID"
msgstr ""
msgid "Target Email"
msgstr ""
msgid "Target Emails"
msgstr ""
msgid "Target Encoded ID"
msgstr ""
msgid "Target JID"
msgstr ""
msgid "Target Phone No"
msgstr ""
msgid "Target Room Alias"
msgstr ""
msgid "Target Room ID"
msgstr ""
msgid "Target Short Code"
msgstr ""
msgid "Target Tag ID"
msgstr ""
msgid "Target Topic"
msgstr ""
msgid "Target User"
msgstr ""
msgid "Targets"
msgstr ""
msgid "Text To Speech"
msgstr ""
msgid "To Channel ID"
msgstr ""
msgid "To Email"
msgstr ""
msgid "To User ID"
msgstr ""
msgid "Token"
msgstr ""
msgid "Token A"
msgstr ""
msgid "Token B"
msgstr ""
msgid "Token C"
msgstr ""
msgid "Urgency"
msgstr ""
msgid "Use Avatar"
msgstr ""
msgid "User"
msgstr ""
msgid "User Key"
msgstr ""
msgid "User Name"
msgstr ""
msgid "Username"
msgstr ""
msgid "Verify SSL"
msgstr ""
msgid "Version"
msgstr ""
msgid "Webhook"
msgstr ""
msgid "Webhook ID"
msgstr ""
msgid "Webhook Mode"
msgstr ""
msgid "Webhook Token"
msgstr ""
msgid "X-Axis"
msgstr ""
msgid "XEP"
msgstr ""
msgid "Y-Axis"
msgstr ""
#~ msgid "Access Key Secret"
#~ msgstr ""

View File

@ -0,0 +1,327 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# To use this plugin, simply signup with clicksend:
# https://www.clicksend.com/
#
# You're done at this point, you only need to know your user/pass that
# you signed up with.
# The following URLs would be accepted by Apprise:
# - clicksend://{user}:{password}@{phoneno}
# - clicksend://{user}:{password}@{phoneno1}/{phoneno2}
# The API reference used to build this plugin was documented here:
# https://developers.clicksend.com/docs/rest/v3/
#
import re
import requests
from json import dumps
from base64 import b64encode
from .NotifyBase import NotifyBase
from ..URLBase import PrivacyMode
from ..common import NotifyType
from ..utils import parse_list
from ..utils import parse_bool
from ..AppriseLocale import gettext_lazy as _
# Extend HTTP Error Messages
CLICKSEND_HTTP_ERROR_MAP = {
401: 'Unauthorized - Invalid Token.',
}
# Some Phone Number Detection
IS_PHONE_NO = re.compile(r'^\+?(?P<phone>[0-9\s)(+-]+)\s*$')
# Used to break path apart into list of channels
TARGET_LIST_DELIM = re.compile(r'[ \t\r\n,#\\/]+')
class NotifyClickSend(NotifyBase):
"""
A wrapper for ClickSend Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'ClickSend'
# The services URL
service_url = 'https://clicksend.com/'
# The default secure protocol
secure_protocol = 'clicksend'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_clicksend'
# ClickSend uses the http protocol with JSON requests
notify_url = 'https://rest.clicksend.com/v3/sms/send'
# The maximum length of the body
body_maxlen = 160
# A title can not be used for SMS Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.
title_maxlen = 0
# The maximum SMS batch size accepted by the ClickSend API
default_batch_size = 1000
# Define object templates
templates = (
'{schema}://{user}:{password}@{targets}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'user': {
'name': _('User Name'),
'type': 'string',
'required': True,
},
'password': {
'name': _('Password'),
'type': 'string',
'private': True,
'required': True,
},
'target_phone': {
'name': _('Target Phone No'),
'type': 'string',
'prefix': '+',
'regex': (r'^[0-9\s)(+-]+$', 'i'),
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
'required': True,
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'to': {
'alias_of': 'targets',
},
'batch': {
'name': _('Batch Mode'),
'type': 'bool',
'default': False,
},
})
def __init__(self, targets=None, batch=False, **kwargs):
"""
Initialize ClickSend Object
"""
super(NotifyClickSend, self).__init__(**kwargs)
# Prepare Batch Mode Flag
self.batch = batch
# Parse our targets
self.targets = list()
if not (self.user and self.password):
msg = 'A ClickSend user/pass was not provided.'
self.logger.warning(msg)
raise TypeError(msg)
for target in parse_list(targets):
# Validate targets and drop bad ones:
result = IS_PHONE_NO.match(target)
if result:
# Further check our phone # for it's digit count
result = ''.join(re.findall(r'\d+', result.group('phone')))
if len(result) < 11 or len(result) > 14:
self.logger.warning(
'Dropped invalid phone # '
'({}) specified.'.format(target),
)
continue
# store valid phone number
self.targets.append(result)
continue
self.logger.warning(
'Dropped invalid phone # '
'({}) specified.'.format(target))
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform ClickSend Notification
"""
if len(self.targets) == 0:
# There were no services to notify
self.logger.warning('There were no ClickSend targets to notify.')
return False
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/json; charset=utf-8',
'Authorization': 'Basic {}'.format(
b64encode('{}:{}'.format(
self.user, self.password).encode('utf-8'))),
}
# error tracking (used for function return)
has_error = False
# prepare JSON Object
payload = {
'messages': []
}
# Send in batches if identified to do so
default_batch_size = 1 if not self.batch else self.default_batch_size
for index in range(0, len(self.targets), default_batch_size):
payload['messages'] = [{
'source': 'php',
'body': body,
'to': '+{}'.format(to),
} for to in self.targets[index:index + default_batch_size]]
self.logger.debug('ClickSend POST URL: %s (cert_verify=%r)' % (
self.notify_url, self.verify_certificate,
))
self.logger.debug('ClickSend Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,
data=dumps(payload),
headers=headers,
verify=self.verify_certificate,
)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyClickSend.http_response_code_lookup(
r.status_code, CLICKSEND_HTTP_ERROR_MAP)
self.logger.warning(
'Failed to send {} ClickSend notification{}: '
'{}{}error={}.'.format(
len(payload['messages']),
' to {}'.format(self.targets[index])
if default_batch_size == 1 else '(s)',
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# Mark our failure
has_error = True
continue
else:
self.logger.info(
'Sent {} ClickSend notification{}.'
.format(
len(payload['messages']),
' to {}'.format(self.targets[index])
if default_batch_size == 1 else '(s)',
))
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending {} ClickSend '
'notification(s).'.format(len(payload['messages'])))
self.logger.debug('Socket Exception: %s' % str(e))
# Mark our failure
has_error = True
continue
return not has_error
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
'batch': 'yes' if self.batch else 'no',
}
# Setup Authentication
auth = '{user}:{password}@'.format(
user=NotifyClickSend.quote(self.user, safe=''),
password=self.pprint(
self.password, privacy, mode=PrivacyMode.Secret, safe=''),
)
return '{schema}://{auth}{targets}?{args}'.format(
schema=self.secure_protocol,
auth=auth,
targets='/'.join(
[NotifyClickSend.quote(x, safe='') for x in self.targets]),
args=NotifyClickSend.urlencode(args),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# All elements are targets
results['targets'] = [NotifyClickSend.unquote(results['host'])]
# All entries after the hostname are additional targets
results['targets'].extend(
NotifyClickSend.split_path(results['fullpath']))
# Get Batch Mode Flag
results['batch'] = \
parse_bool(results['qsd'].get('batch', False))
# Support the 'to' variable so that we can support rooms this way too
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += [x for x in filter(
bool, TARGET_LIST_DELIM.split(
NotifyClickSend.unquote(results['qsd']['to'])))]
return results

View File

@ -0,0 +1,241 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# To use this plugin, you must have a Kumulos account set up. Add a client
# and link it with your phone using the phone app (using your Companion App
# option in the profile menu area):
# Android: https://play.google.com/store/apps/\
# details?id=com.kumulos.companion
# iOS: https://apps.apple.com/us/app/kumulos/id1463947782
#
# The API reference used to build this plugin was documented here:
# https://docs.kumulos.com/messaging/api/#sending-in-app-messages
#
import requests
from json import dumps
from .NotifyBase import NotifyBase
from ..common import NotifyType
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
# Extend HTTP Error Messages
KUMULOS_HTTP_ERROR_MAP = {
401: 'Unauthorized - Invalid API and/or Server Key.',
422: 'Unprocessable Entity - The request was unparsable.',
400: 'Bad Request - Targeted users do not exist or have unsubscribed.',
}
class NotifyKumulos(NotifyBase):
"""
A wrapper for Kumulos Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Kumulos'
# The services URL
service_url = 'https://kumulos.com/'
# The default secure protocol
secure_protocol = 'kumulos'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_kumulos'
# Kumulos uses the http protocol with JSON requests
notify_url = 'https://messages.kumulos.com/v2/notifications'
# The maximum allowable characters allowed in the title per message
title_maxlen = 64
# The maximum allowable characters allowed in the body per message
body_maxlen = 240
# Define object templates
templates = (
'{schema}://{apikey}/{serverkey}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'apikey': {
'name': _('API Key'),
'type': 'string',
'private': True,
'required': True,
# UUID4
'regex': (r'^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-'
r'[89ab][0-9a-f]{3}-[0-9a-f]{12}$', 'i')
},
'serverkey': {
'name': _('Server Key'),
'type': 'string',
'private': True,
'required': True,
'regex': (r'^[A-Z0-9+]{36}$', 'i'),
},
})
def __init__(self, apikey, serverkey, **kwargs):
"""
Initialize Kumulos Object
"""
super(NotifyKumulos, self).__init__(**kwargs)
# API Key (associated with project)
self.apikey = validate_regex(
apikey, *self.template_tokens['apikey']['regex'])
if not self.apikey:
msg = 'An invalid Kumulos API Key ' \
'({}) was specified.'.format(apikey)
self.logger.warning(msg)
raise TypeError(msg)
# Server Key (associated with project)
self.serverkey = validate_regex(
serverkey, *self.template_tokens['serverkey']['regex'])
if not self.serverkey:
msg = 'An invalid Kumulos Server Key ' \
'({}) was specified.'.format(serverkey)
self.logger.warning(msg)
raise TypeError(msg)
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Kumulos Notification
"""
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/json',
'Accept': 'application/json',
}
# prepare JSON Object
payload = {
'target': {
'broadcast': True,
},
'content': {
'title': title,
'message': body,
},
}
# Determine Authentication
auth = (self.apikey, self.serverkey)
self.logger.debug('Kumulos POST URL: %s (cert_verify=%r)' % (
self.notify_url, self.verify_certificate,
))
self.logger.debug('Kumulos Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,
data=dumps(payload),
headers=headers,
auth=auth,
verify=self.verify_certificate,
)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyKumulos.http_response_code_lookup(
r.status_code, KUMULOS_HTTP_ERROR_MAP)
self.logger.warning(
'Failed to send Kumulos notification: '
'{}{}error={}.'.format(
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
return False
else:
self.logger.info('Sent Kumulos notification.')
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending Kumulos '
'notification.')
self.logger.debug('Socket Exception: %s' % str(e))
return False
return True
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
}
return '{schema}://{apikey}/{serverkey}/?{args}'.format(
schema=self.secure_protocol,
apikey=self.pprint(self.apikey, privacy, safe=''),
serverkey=self.pprint(self.serverkey, privacy, safe=''),
args=NotifyKumulos.urlencode(args),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# The first token is stored in the hostname
results['apikey'] = NotifyKumulos.unquote(results['host'])
# Now fetch the remaining tokens
try:
results['serverkey'] = \
NotifyKumulos.split_path(results['fullpath'])[0]
except IndexError:
# no token
results['serverkey'] = None
return results

View File

@ -0,0 +1,370 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# Create an account https://msg91.com/ if you don't already have one
#
# Get your (authkey) from the dashboard here:
# - https://world.msg91.com/user/index.php#api
#
# Get details on the API used in this plugin here:
# - https://world.msg91.com/apidoc/textsms/send-sms.php
import re
import requests
from .NotifyBase import NotifyBase
from ..common import NotifyType
from ..utils import parse_list
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
# Some Phone Number Detection
IS_PHONE_NO = re.compile(r'^\+?(?P<phone>[0-9\s)(+-]+)\s*$')
class MSG91Route(object):
"""
Transactional SMS Routes
route=1 for promotional, route=4 for transactional SMS.
"""
PROMOTIONAL = 1
TRANSACTIONAL = 4
# Used for verification
MSG91_ROUTES = (
MSG91Route.PROMOTIONAL,
MSG91Route.TRANSACTIONAL,
)
class MSG91Country(object):
"""
Optional value that can be specified on the MSG91 api
"""
INTERNATIONAL = 0
USA = 1
INDIA = 91
# Used for verification
MSG91_COUNTRIES = (
MSG91Country.INTERNATIONAL,
MSG91Country.USA,
MSG91Country.INDIA,
)
class NotifyMSG91(NotifyBase):
"""
A wrapper for MSG91 Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'MSG91'
# The services URL
service_url = 'https://msg91.com'
# The default protocol
secure_protocol = 'msg91'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_msg91'
# MSG91 uses the http protocol with JSON requests
notify_url = 'https://world.msg91.com/api/sendhttp.php'
# The maximum length of the body
body_maxlen = 140
# A title can not be used for SMS Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.
title_maxlen = 0
# Define object templates
templates = (
'{schema}://{authkey}/{targets}',
'{schema}://{sender}@{authkey}/{targets}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'authkey': {
'name': _('Authentication Key'),
'type': 'string',
'required': True,
'private': True,
'regex': (r'^[a-z0-9]+$', 'i'),
},
'target_phone': {
'name': _('Target Phone No'),
'type': 'string',
'prefix': '+',
'regex': (r'^[0-9\s)(+-]+$', 'i'),
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
},
'sender': {
'name': _('Sender ID'),
'type': 'string',
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'to': {
'alias_of': 'targets',
},
'route': {
'name': _('Route'),
'type': 'choice:int',
'values': MSG91_ROUTES,
'default': MSG91Route.TRANSACTIONAL,
},
'country': {
'name': _('Country'),
'type': 'choice:int',
'values': MSG91_COUNTRIES,
},
})
def __init__(self, authkey, targets=None, sender=None, route=None,
country=None, **kwargs):
"""
Initialize MSG91 Object
"""
super(NotifyMSG91, self).__init__(**kwargs)
# Authentication Key (associated with project)
self.authkey = validate_regex(
authkey, *self.template_tokens['authkey']['regex'])
if not self.authkey:
msg = 'An invalid MSG91 Authentication Key ' \
'({}) was specified.'.format(authkey)
self.logger.warning(msg)
raise TypeError(msg)
if route is None:
self.route = self.template_args['route']['default']
else:
try:
self.route = int(route)
if self.route not in MSG91_ROUTES:
# Let outer except catch thi
raise ValueError()
except (ValueError, TypeError):
msg = 'The MSG91 route specified ({}) is invalid.'\
.format(route)
self.logger.warning(msg)
raise TypeError(msg)
if country:
try:
self.country = int(country)
if self.country not in MSG91_COUNTRIES:
# Let outer except catch thi
raise ValueError()
except (ValueError, TypeError):
msg = 'The MSG91 country specified ({}) is invalid.'\
.format(country)
self.logger.warning(msg)
raise TypeError(msg)
else:
self.country = country
# Store our sender
self.sender = sender
# Parse our targets
self.targets = list()
for target in parse_list(targets):
# Validate targets and drop bad ones:
result = IS_PHONE_NO.match(target)
if result:
# Further check our phone # for it's digit count
result = ''.join(re.findall(r'\d+', result.group('phone')))
if len(result) < 11 or len(result) > 14:
self.logger.warning(
'Dropped invalid phone # '
'({}) specified.'.format(target),
)
continue
# store valid phone number
self.targets.append(result)
continue
self.logger.warning(
'Dropped invalid phone # '
'({}) specified.'.format(target),
)
if not self.targets:
# We have a bot token and no target(s) to message
msg = 'No MSG91 targets to notify.'
self.logger.warning(msg)
raise TypeError(msg)
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform MSG91 Notification
"""
# Prepare our headers
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/x-www-form-urlencoded',
}
# Prepare our payload
payload = {
'sender': self.sender if self.sender else self.app_id,
'authkey': self.authkey,
'message': body,
'response': 'json',
# target phone numbers are sent with a comma delimiter
'mobiles': ','.join(self.targets),
'route': str(self.route),
}
if self.country:
payload['country'] = str(self.country)
# Some Debug Logging
self.logger.debug('MSG91 POST URL: {} (cert_verify={})'.format(
self.notify_url, self.verify_certificate))
self.logger.debug('MSG91 Payload: {}' .format(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,
data=payload,
headers=headers,
verify=self.verify_certificate,
)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyMSG91.http_response_code_lookup(
r.status_code)
self.logger.warning(
'Failed to send MSG91 notification to {}: '
'{}{}error={}.'.format(
','.join(self.targets),
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
return False
else:
self.logger.info(
'Sent MSG91 notification to %s.' % ','.join(self.targets))
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending MSG91:%s '
'notification.' % ','.join(self.targets)
)
self.logger.debug('Socket Exception: %s' % str(e))
return False
return True
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
'route': str(self.route),
}
if self.country:
args['country'] = str(self.country)
return '{schema}://{authkey}/{targets}/?{args}'.format(
schema=self.secure_protocol,
authkey=self.pprint(self.authkey, privacy, safe=''),
targets='/'.join(
[NotifyMSG91.quote(x, safe='') for x in self.targets]),
args=NotifyMSG91.urlencode(args))
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Get our entries; split_path() looks after unquoting content for us
# by default
results['targets'] = NotifyMSG91.split_path(results['fullpath'])
# The hostname is our authentication key
results['authkey'] = NotifyMSG91.unquote(results['host'])
if 'route' in results['qsd'] and len(results['qsd']['route']):
results['route'] = results['qsd']['route']
if 'country' in results['qsd'] and len(results['qsd']['country']):
results['country'] = results['qsd']['country']
# Support the 'to' variable so that we can support targets this way too
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifyMSG91.parse_list(results['qsd']['to'])
return results

View File

@ -0,0 +1,370 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# Create an account https://messagebird.com if you don't already have one
#
# Get your (apikey) and api example from the dashboard here:
# - https://dashboard.messagebird.com/en/user/index
#
import re
import requests
from .NotifyBase import NotifyBase
from ..common import NotifyType
from ..utils import parse_list
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
# Some Phone Number Detection
IS_PHONE_NO = re.compile(r'^\+?(?P<phone>[0-9\s)(+-]+)\s*$')
class NotifyMessageBird(NotifyBase):
"""
A wrapper for MessageBird Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'MessageBird'
# The services URL
service_url = 'https://messagebird.com'
# The default protocol
secure_protocol = 'msgbird'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_messagebird'
# MessageBird uses the http protocol with JSON requests
notify_url = 'https://rest.messagebird.com/messages'
# The maximum length of the body
body_maxlen = 140
# A title can not be used for SMS Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.
title_maxlen = 0
# Define object templates
templates = (
'{schema}://{apikey}/{source}',
'{schema}://{apikey}/{source}/{targets}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'apikey': {
'name': _('API Key'),
'type': 'string',
'required': True,
'private': True,
'regex': (r'^[a-z0-9]{25}$', 'i'),
},
'source': {
'name': _('Source Phone No'),
'type': 'string',
'prefix': '+',
'required': True,
'regex': (r'^[0-9\s)(+-]+$', 'i'),
},
'target_phone': {
'name': _('Target Phone No'),
'type': 'string',
'prefix': '+',
'regex': (r'^[0-9\s)(+-]+$', 'i'),
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
}
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'to': {
'alias_of': 'targets',
},
'from': {
'alias_of': 'source',
},
})
def __init__(self, apikey, source, targets=None, **kwargs):
"""
Initialize MessageBird Object
"""
super(NotifyMessageBird, self).__init__(**kwargs)
# API Key (associated with project)
self.apikey = validate_regex(
apikey, *self.template_tokens['apikey']['regex'])
if not self.apikey:
msg = 'An invalid MessageBird API Key ' \
'({}) was specified.'.format(apikey)
self.logger.warning(msg)
raise TypeError(msg)
result = IS_PHONE_NO.match(source)
if not result:
msg = 'The MessageBird source specified ({}) is invalid.'\
.format(source)
self.logger.warning(msg)
raise TypeError(msg)
# Further check our phone # for it's digit count
result = ''.join(re.findall(r'\d+', result.group('phone')))
if len(result) < 11 or len(result) > 14:
msg = 'The MessageBird source # specified ({}) is invalid.'\
.format(source)
self.logger.warning(msg)
raise TypeError(msg)
# Store our source
self.source = result
# Parse our targets
self.targets = list()
targets = parse_list(targets)
if not targets:
# No sources specified, use our own phone no
self.targets.append(self.source)
return
# otherwise, store all of our target numbers
for target in targets:
# Validate targets and drop bad ones:
result = IS_PHONE_NO.match(target)
if result:
# Further check our phone # for it's digit count
result = ''.join(re.findall(r'\d+', result.group('phone')))
if len(result) < 11 or len(result) > 14:
self.logger.warning(
'Dropped invalid phone # '
'({}) specified.'.format(target),
)
continue
# store valid phone number
self.targets.append(result)
continue
self.logger.warning(
'Dropped invalid phone # '
'({}) specified.'.format(target),
)
if not self.targets:
# We have a bot token and no target(s) to message
msg = 'No MessageBird targets to notify.'
self.logger.warning(msg)
raise TypeError(msg)
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform MessageBird Notification
"""
# error tracking (used for function return)
has_error = False
# Prepare our headers
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': 'AccessKey {}'.format(self.apikey),
}
# Prepare our payload
payload = {
'originator': '+{}'.format(self.source),
'recipients': None,
'body': body,
}
# Create a copy of the targets list
targets = list(self.targets)
while len(targets):
# Get our target to notify
target = targets.pop(0)
# Prepare our user
payload['recipients'] = '+{}'.format(target)
# Some Debug Logging
self.logger.debug(
'MessageBird POST URL: {} (cert_verify={})'.format(
self.notify_url, self.verify_certificate))
self.logger.debug('MessageBird Payload: {}' .format(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,
data=payload,
headers=headers,
verify=self.verify_certificate,
)
# Sample output of a successful transmission
# {
# "originator": "+15553338888",
# "body": "test",
# "direction": "mt",
# "mclass": 1,
# "reference": null,
# "createdDatetime": "2019-08-22T01:32:18+00:00",
# "recipients": {
# "totalCount": 1,
# "totalSentCount": 1,
# "totalDeliveredCount": 0,
# "totalDeliveryFailedCount": 0,
# "items": [
# {
# "status": "sent",
# "statusDatetime": "2019-08-22T01:32:18+00:00",
# "recipient": 15553338888,
# "messagePartCount": 1
# }
# ]
# },
# "validity": null,
# "gateway": 10,
# "typeDetails": {},
# "href": "https://rest.messagebird.com/messages/\
# b5d424244a5b4fd0b5b5728bccaafc23",
# "datacoding": "plain",
# "scheduledDatetime": null,
# "type": "sms",
# "id": "b5d424244a5b4fd0b5b5728bccaafc23"
# }
if r.status_code not in (
requests.codes.ok, requests.codes.created):
# We had a problem
status_str = \
NotifyMessageBird.http_response_code_lookup(
r.status_code)
self.logger.warning(
'Failed to send MessageBird notification to {}: '
'{}{}error={}.'.format(
','.join(target),
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# Mark our failure
has_error = True
continue
else:
self.logger.info(
'Sent MessageBird notification to {}.'.format(target))
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending MessageBird:%s ' % (
target) + 'notification.'
)
self.logger.debug('Socket Exception: %s' % str(e))
# Mark our failure
has_error = True
continue
return not has_error
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
}
return '{schema}://{apikey}/{source}/{targets}/?{args}'.format(
schema=self.secure_protocol,
apikey=self.pprint(self.apikey, privacy, safe=''),
source=self.source,
targets='/'.join(
[NotifyMessageBird.quote(x, safe='') for x in self.targets]),
args=NotifyMessageBird.urlencode(args))
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Get our entries; split_path() looks after unquoting content for us
# by default
results['targets'] = NotifyMessageBird.split_path(results['fullpath'])
try:
# The first path entry is the source/originator
results['source'] = results['targets'].pop(0)
except IndexError:
# No path specified... this URL is potentially un-parseable; we can
# hope for a from= entry
pass
# The hostname is our authentication key
results['apikey'] = NotifyMessageBird.unquote(results['host'])
# Support the 'to' variable so that we can support targets this way too
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifyMessageBird.parse_list(results['qsd']['to'])
if 'from' in results['qsd'] and len(results['qsd']['from']):
results['source'] = \
NotifyMessageBird.unquote(results['qsd']['from'])
return results

View File

@ -0,0 +1,380 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CON
# 1. Simply visit https://notica.us
# 2. You'll be provided a new variation of the website which will look
# something like: https://notica.us/?abc123.
# ^
# |
# token
#
# Your token is actually abc123 (do not include/grab the question mark)
# You can use that URL as is directly in Apprise, or you can follow
# the next step which shows you how to assemble the Apprise URL:
#
# 3. With respect to the above, your apprise URL would be:
# notica://abc123
#
import re
import six
import requests
from .NotifyBase import NotifyBase
from ..URLBase import PrivacyMode
from ..common import NotifyType
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
class NoticaMode(object):
"""
Tracks if we're accessing the notica upstream server or a locally hosted
one.
"""
# We're dealing with a self hosted service
SELFHOSTED = 'selfhosted'
# We're dealing with the official hosted service at https://notica.us
OFFICIAL = 'official'
# Define our Notica Modes
NOTICA_MODES = (
NoticaMode.SELFHOSTED,
NoticaMode.OFFICIAL,
)
class NotifyNotica(NotifyBase):
"""
A wrapper for Notica Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Notica'
# The services URL
service_url = 'https://notica.us/'
# Insecure protocol (for those self hosted requests)
protocol = 'notica'
# The default protocol (this is secure for notica)
secure_protocol = 'noticas'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_notica'
# Notica URL
notify_url = 'https://notica.us/?{token}'
# Notica does not support a title
title_maxlen = 0
# Define object templates
templates = (
'{schema}://{token}',
# Self-hosted notica servers
'{schema}://{host}/{token}',
'{schema}://{host}:{port}/{token}',
'{schema}://{user}@{host}/{token}',
'{schema}://{user}@{host}:{port}/{token}',
'{schema}://{user}:{password}@{host}/{token}',
'{schema}://{user}:{password}@{host}:{port}/{token}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'token': {
'name': _('Token'),
'type': 'string',
'private': True,
'required': True,
'regex': r'^\?*(?P<token>[^/]+)\s*$'
},
'host': {
'name': _('Hostname'),
'type': 'string',
},
'port': {
'name': _('Port'),
'type': 'int',
'min': 1,
'max': 65535,
},
'user': {
'name': _('Username'),
'type': 'string',
},
'password': {
'name': _('Password'),
'type': 'string',
'private': True,
},
})
# Define any kwargs we're using
template_kwargs = {
'headers': {
'name': _('HTTP Header'),
'prefix': '+',
},
}
def __init__(self, token, headers=None, **kwargs):
"""
Initialize Notica Object
"""
super(NotifyNotica, self).__init__(**kwargs)
# Token (associated with project)
self.token = validate_regex(token)
if not self.token:
msg = 'An invalid Notica Token ' \
'({}) was specified.'.format(token)
self.logger.warning(msg)
raise TypeError(msg)
# Setup our mode
self.mode = NoticaMode.SELFHOSTED if self.host else NoticaMode.OFFICIAL
# prepare our fullpath
self.fullpath = kwargs.get('fullpath')
if not isinstance(self.fullpath, six.string_types):
self.fullpath = '/'
self.headers = {}
if headers:
# Store our extra headers
self.headers.update(headers)
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Notica Notification
"""
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/x-www-form-urlencoded'
}
# Prepare our payload
payload = 'd:{}'.format(body)
# Auth is used for SELFHOSTED queries
auth = None
if self.mode is NoticaMode.OFFICIAL:
# prepare our notify url
notify_url = self.notify_url.format(token=self.token)
else:
# Prepare our self hosted URL
# Apply any/all header over-rides defined
headers.update(self.headers)
if self.user:
auth = (self.user, self.password)
# Set our schema
schema = 'https' if self.secure else 'http'
# Prepare our notify_url
notify_url = '%s://%s' % (schema, self.host)
if isinstance(self.port, int):
notify_url += ':%d' % self.port
notify_url += '{fullpath}?token={token}'.format(
fullpath=self.fullpath,
token=self.token)
self.logger.debug('Notica POST URL: %s (cert_verify=%r)' % (
notify_url, self.verify_certificate,
))
self.logger.debug('Notica Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
notify_url.format(token=self.token),
data=payload,
headers=headers,
auth=auth,
verify=self.verify_certificate,
)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyNotica.http_response_code_lookup(r.status_code)
self.logger.warning(
'Failed to send Notica notification:'
'{}{}error={}.'.format(
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug('Response Details:\r\n{}'.format(r.content))
# Return; we're done
return False
else:
self.logger.info('Sent Notica notification.')
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending Notica notification.',
)
self.logger.debug('Socket Exception: %s' % str(e))
# Return; we're done
return False
return True
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
}
if self.mode == NoticaMode.OFFICIAL:
# Official URLs are easy to assemble
return '{schema}://{token}/?{args}'.format(
schema=self.protocol,
token=self.pprint(self.token, privacy, safe=''),
args=NotifyNotica.urlencode(args),
)
# If we reach here then we are assembling a self hosted URL
# Append our headers into our args
args.update({'+{}'.format(k): v for k, v in self.headers.items()})
# Authorization can be used for self-hosted sollutions
auth = ''
# Determine Authentication
if self.user and self.password:
auth = '{user}:{password}@'.format(
user=NotifyNotica.quote(self.user, safe=''),
password=self.pprint(
self.password, privacy, mode=PrivacyMode.Secret, safe=''),
)
elif self.user:
auth = '{user}@'.format(
user=NotifyNotica.quote(self.user, safe=''),
)
default_port = 443 if self.secure else 80
return '{schema}://{auth}{hostname}{port}{fullpath}{token}/?{args}' \
.format(
schema=self.secure_protocol
if self.secure else self.protocol,
auth=auth,
hostname=NotifyNotica.quote(self.host, safe=''),
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
fullpath=NotifyNotica.quote(
self.fullpath, safe='/'),
token=self.pprint(self.token, privacy, safe=''),
args=NotifyNotica.urlencode(args),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
# Get unquoted entries
entries = NotifyNotica.split_path(results['fullpath'])
if not entries:
# If there are no path entries, then we're only dealing with the
# official website
results['mode'] = NoticaMode.OFFICIAL
# Store our token using the host
results['token'] = NotifyNotica.unquote(results['host'])
# Unset our host
results['host'] = None
else:
# Otherwise we're running a self hosted instance
results['mode'] = NoticaMode.SELFHOSTED
# The last element in the list is our token
results['token'] = entries.pop()
# Re-assemble our full path
results['fullpath'] = \
'/' if not entries else '/{}/'.format('/'.join(entries))
# Add our headers that the user can potentially over-ride if they
# wish to to our returned result set
results['headers'] = results['qsd-']
results['headers'].update(results['qsd+'])
return results
@staticmethod
def parse_native_url(url):
"""
Support https://notica.us/?abc123
"""
result = re.match(
r'^https?://notica\.us/?'
r'\??(?P<token>[^&]+)([&\s]*(?P<args>.+))?$', url, re.I)
if result:
return NotifyNotica.parse_url(
'{schema}://{token}/{args}'.format(
schema=NotifyNotica.protocol,
token=result.group('token'),
args='' if not result.group('args')
else '?{}'.format(result.group('args'))))
return None

View File

@ -0,0 +1,378 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# Notifico allows you to relay notifications into IRC channels.
#
# 1. visit https://n.tkte.ch and sign up for an account
# 2. create a project; either manually or sync with github
# 3. from within the project, you can create a message hook
#
# the URL will look something like this:
# https://n.tkte.ch/h/2144/uJmKaBW9WFk42miB146ci3Kj
# ^ ^
# | |
# project id message hook
#
# This plugin also supports taking the URL (as identified above) directly
# as well.
import re
import requests
from .NotifyBase import NotifyBase
from ..common import NotifyType
from ..utils import parse_bool
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
class NotificoFormat(object):
# Resets all formating
Reset = '\x0F'
# Formatting
Bold = '\x02'
Italic = '\x1D'
Underline = '\x1F'
BGSwap = '\x16'
class NotificoColor(object):
# Resets Color
Reset = '\x03'
# Colors
White = '\x0300'
Black = '\x0301'
Blue = '\x0302'
Green = '\x0303'
Red = '\x0304'
Brown = '\x0305'
Purple = '\x0306'
Orange = '\x0307'
Yellow = '\x0308',
LightGreen = '\x0309'
Teal = '\x0310'
LightCyan = '\x0311'
LightBlue = '\x0312'
Violet = '\x0313'
Grey = '\x0314'
LightGrey = '\x0315'
class NotifyNotifico(NotifyBase):
"""
A wrapper for Notifico Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Notifico'
# The services URL
service_url = 'https://n.tkte.ch'
# The default protocol
protocol = 'notifico'
# The default secure protocol
secure_protocol = 'notifico'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_notifico'
# Plain Text Notification URL
notify_url = 'https://n.tkte.ch/h/{proj}/{hook}'
# The title is not used
title_maxlen = 0
# The maximum allowable characters allowed in the body per message
body_maxlen = 512
# Define object templates
templates = (
'{schema}://{project_id}/{msghook}',
)
# Define our template arguments
template_tokens = dict(NotifyBase.template_tokens, **{
# The Project ID is found as the first part of the URL
# /1234/........................
'project_id': {
'name': _('Project ID'),
'type': 'string',
'required': True,
'private': True,
'regex': (r'^[0-9]+$', ''),
},
# The Message Hook follows the Project ID
# /..../AbCdEfGhIjKlMnOpQrStUvWX
'msghook': {
'name': _('Message Hook'),
'type': 'string',
'required': True,
'private': True,
'regex': (r'^[a-z0-9]+$', 'i'),
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
# You can optionally pass IRC colors into
'color': {
'name': _('IRC Colors'),
'type': 'bool',
'default': True,
},
# You can optionally pass IRC color into
'prefix': {
'name': _('Prefix'),
'type': 'bool',
'default': True,
},
})
def __init__(self, project_id, msghook, color=True, prefix=True,
**kwargs):
"""
Initialize Notifico Object
"""
super(NotifyNotifico, self).__init__(**kwargs)
# Assign our message hook
self.project_id = validate_regex(
project_id, *self.template_tokens['project_id']['regex'])
if not self.project_id:
msg = 'An invalid Notifico Project ID ' \
'({}) was specified.'.format(project_id)
self.logger.warning(msg)
raise TypeError(msg)
# Assign our message hook
self.msghook = validate_regex(
msghook, *self.template_tokens['msghook']['regex'])
if not self.msghook:
msg = 'An invalid Notifico Message Token ' \
'({}) was specified.'.format(msghook)
self.logger.warning(msg)
raise TypeError(msg)
# Prefix messages with a [?] where ? identifies the message type
# such as if it's an error, warning, info, or success
self.prefix = prefix
# Send colors
self.color = color
# Prepare our notification URL now:
self.api_url = self.notify_url.format(
proj=self.project_id,
hook=self.msghook,
)
return
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
'color': 'yes' if self.color else 'no',
'prefix': 'yes' if self.prefix else 'no',
}
return '{schema}://{proj}/{hook}/?{args}'.format(
schema=self.secure_protocol,
proj=self.pprint(self.project_id, privacy, safe=''),
hook=self.pprint(self.msghook, privacy, safe=''),
args=NotifyNotifico.urlencode(args),
)
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
wrapper to _send since we can alert more then one channel
"""
# prepare our headers
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8',
}
# Prepare our IRC Prefix
color = ''
token = ''
if notify_type == NotifyType.INFO:
color = NotificoColor.Teal
token = 'i'
elif notify_type == NotifyType.SUCCESS:
color = NotificoColor.LightGreen
token = ''
elif notify_type == NotifyType.WARNING:
color = NotificoColor.Orange
token = '!'
elif notify_type == NotifyType.FAILURE:
color = NotificoColor.Red
token = ''
if self.color:
# Colors were specified, make sure we capture and correctly
# allow them to exist inline in the message
# \g<1> is less ambigious than \1
body = re.sub(r'\\x03(\d{0,2})', '\x03\g<1>', body)
else:
# no colors specified, make sure we strip out any colors found
# to make the string read-able
body = re.sub(r'\\x03(\d{1,2}(,[0-9]{1,2})?)?', '', body)
# Prepare our payload
payload = {
'payload': body if not self.prefix
else '{}[{}]{} {}{}{}: {}{}'.format(
# Token [?] at the head
color if self.color else '',
token,
NotificoColor.Reset if self.color else '',
# App ID
NotificoFormat.Bold if self.color else '',
self.app_id,
NotificoFormat.Reset if self.color else '',
# Message Body
body,
# Reset
NotificoFormat.Reset if self.color else '',
),
}
self.logger.debug('Notifico GET URL: %s (cert_verify=%r)' % (
self.api_url, self.verify_certificate))
self.logger.debug('Notifico Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.get(
self.api_url,
params=payload,
headers=headers,
verify=self.verify_certificate,
)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyNotifico.http_response_code_lookup(r.status_code)
self.logger.warning(
'Failed to send Notifico notification: '
'{}{}error={}.'.format(
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug('Response Details:\r\n{}'.format(r.content))
# Return; we're done
return False
else:
self.logger.info('Sent Notifico notification.')
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending Notifico '
'notification.')
self.logger.debug('Socket Exception: %s' % str(e))
# Return; we're done
return False
return True
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# The first token is stored in the hostname
results['project_id'] = NotifyNotifico.unquote(results['host'])
# Get Message Hook
try:
results['msghook'] = NotifyNotifico.split_path(
results['fullpath'])[0]
except IndexError:
results['msghook'] = None
# Include Color
results['color'] = \
parse_bool(results['qsd'].get('color', True))
# Include Prefix
results['prefix'] = \
parse_bool(results['qsd'].get('prefix', True))
return results
@staticmethod
def parse_native_url(url):
"""
Support https://n.tkte.ch/h/PROJ_ID/MESSAGE_HOOK/
"""
result = re.match(
r'^https?://n\.tkte\.ch/h/'
r'(?P<proj>[0-9]+)/'
r'(?P<hook>[A-Z0-9]+)/?'
r'(?P<args>\?.+)?$', url, re.I)
if result:
return NotifyNotifico.parse_url(
'{schema}://{proj}/{hook}/{args}'.format(
schema=NotifyNotifico.secure_protocol,
proj=result.group('proj'),
hook=result.group('hook'),
args='' if not result.group('args')
else result.group('args')))
return None

View File

@ -0,0 +1,297 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import requests
from json import dumps
from .NotifyBase import NotifyBase
from ..URLBase import PrivacyMode
from ..common import NotifyType
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
class NotifyPushjet(NotifyBase):
"""
A wrapper for Pushjet Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Pushjet'
# The default protocol
protocol = 'pjet'
# The default secure protocol
secure_protocol = 'pjets'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_pushjet'
# Disable throttle rate for Pushjet requests since they are normally
# local anyway (the remote/online service is no more)
request_rate_per_sec = 0
# Define object templates
templates = (
'{schema}://{host}:{port}/{secret_key}',
'{schema}://{host}/{secret_key}',
'{schema}://{user}:{password}@{host}:{port}/{secret_key}',
'{schema}://{user}:{password}@{host}/{secret_key}',
# Kept for backwards compatibility; will be depricated eventually
'{schema}://{secret_key}@{host}',
'{schema}://{secret_key}@{host}:{port}',
)
# Define our tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'host': {
'name': _('Hostname'),
'type': 'string',
'required': True,
},
'port': {
'name': _('Port'),
'type': 'int',
'min': 1,
'max': 65535,
},
'secret_key': {
'name': _('Secret Key'),
'type': 'string',
'required': True,
'private': True,
},
'user': {
'name': _('Username'),
'type': 'string',
},
'password': {
'name': _('Password'),
'type': 'string',
'private': True,
},
})
template_args = dict(NotifyBase.template_args, **{
'secret': {
'alias_of': 'secret_key',
},
})
def __init__(self, secret_key, **kwargs):
"""
Initialize Pushjet Object
"""
super(NotifyPushjet, self).__init__(**kwargs)
# Secret Key (associated with project)
self.secret_key = validate_regex(secret_key)
if not self.secret_key:
msg = 'An invalid Pushjet Secret Key ' \
'({}) was specified.'.format(secret_key)
self.logger.warning(msg)
raise TypeError(msg)
return
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
}
default_port = 443 if self.secure else 80
# Determine Authentication
auth = ''
if self.user and self.password:
auth = '{user}:{password}@'.format(
user=NotifyPushjet.quote(self.user, safe=''),
password=self.pprint(
self.password, privacy, mode=PrivacyMode.Secret, safe=''),
)
return '{schema}://{auth}{hostname}{port}/{secret}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=NotifyPushjet.quote(self.host, safe=''),
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
secret=self.pprint(
self.secret_key, privacy, mode=PrivacyMode.Secret, safe=''),
args=NotifyPushjet.urlencode(args),
)
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Pushjet Notification
"""
params = {
'secret': self.secret_key,
}
# prepare Pushjet Object
payload = {
'message': body,
'title': title,
'link': None,
'level': None,
}
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8',
}
auth = None
if self.user:
auth = (self.user, self.password)
notify_url = '{schema}://{host}{port}/message/'.format(
schema="https" if self.secure else "http",
host=self.host,
port=':{}'.format(self.port) if self.port else '')
self.logger.debug('Pushjet POST URL: %s (cert_verify=%r)' % (
notify_url, self.verify_certificate,
))
self.logger.debug('Pushjet Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
notify_url,
params=params,
data=dumps(payload),
headers=headers,
auth=auth,
verify=self.verify_certificate,
)
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyPushjet.http_response_code_lookup(r.status_code)
self.logger.warning(
'Failed to send Pushjet notification: '
'{}{}error={}.'.format(
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug('Response Details:\r\n{}'.format(r.content))
# Return; we're done
return False
else:
self.logger.info('Sent Pushjet notification.')
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending Pushjet '
'notification to %s.' % self.host)
self.logger.debug('Socket Exception: %s' % str(e))
# Return; we're done
return False
return True
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
Syntax:
pjet://hostname/secret_key
pjet://hostname:port/secret_key
pjet://user:pass@hostname/secret_key
pjet://user:pass@hostname:port/secret_key
pjets://hostname/secret_key
pjets://hostname:port/secret_key
pjets://user:pass@hostname/secret_key
pjets://user:pass@hostname:port/secret_key
Legacy (Depricated) Syntax:
pjet://secret_key@hostname
pjet://secret_key@hostname:port
pjets://secret_key@hostname
pjets://secret_key@hostname:port
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
try:
# Retrieve our secret_key from the first entry in the url path
results['secret_key'] = \
NotifyPushjet.split_path(results['fullpath'])[0]
except IndexError:
# no secret key specified
results['secret_key'] = None
# Allow over-riding the secret by specifying it as an argument
# this allows people who have http-auth infront to login
# through it in addition to supporting the secret key
if 'secret' in results['qsd'] and len(results['qsd']['secret']):
results['secret_key'] = \
NotifyPushjet.unquote(results['qsd']['secret'])
if results.get('secret_key') is None:
# Deprication Notice issued for v0.7.9
NotifyPushjet.logger.deprecate(
'The Pushjet URL contains secret_key in the user field'
' which will be deprecated in an upcoming '
'release. Please place this in the path of the URL instead.'
)
# Store it as it's value based on the user field
results['secret_key'] = \
NotifyPushjet.unquote(results.get('user'))
# there is no way http-auth is enabled, be sure to unset the
# current defined user (if present). This is done due to some
# logic that takes place in the send() since we support http-auth.
results['user'] = None
results['password'] = None
return results

View File

@ -0,0 +1,468 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# You will need an API Key for this plugin to work.
# From the Settings -> API Keys you can click "Create API Key" if you don't
# have one already. The key must have at least the "Mail Send" permission
# to work.
#
# The schema to use the plugin looks like this:
# {schema}://{apikey}:{from_email}
#
# Your {from_email} must be comprissed of your Sendgrid Authenticated
# Domain. The same domain must have 'Link Branding' turned on as well or it
# will not work. This can be seen from Settings -> Sender Authentication.
# If you're (SendGrid) verified domain is example.com, then your schema may
# look something like this:
# Simple API Reference:
# - https://sendgrid.com/docs/API_Reference/Web_API_v3/index.html
# - https://sendgrid.com/docs/ui/sending-email/\
# how-to-send-an-email-with-dynamic-transactional-templates/
import requests
from json import dumps
from .NotifyBase import NotifyBase
from ..common import NotifyFormat
from ..common import NotifyType
from ..utils import parse_list
from ..utils import GET_EMAIL_RE
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
# Extend HTTP Error Messages
SENDGRID_HTTP_ERROR_MAP = {
401: 'Unauthorized - You do not have authorization to make the request.',
413: 'Payload To Large - The JSON payload you have included in your '
'request is too large.',
429: 'Too Many Requests - The number of requests you have made exceeds '
'SendGrids rate limitations.',
}
class NotifySendGrid(NotifyBase):
"""
A wrapper for Notify SendGrid Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'SendGrid'
# The services URL
service_url = 'https://sendgrid.com'
# The default secure protocol
secure_protocol = 'sendgrid'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_sendgrid'
# Default to markdown
notify_format = NotifyFormat.HTML
# The default Email API URL to use
notify_url = 'https://api.sendgrid.com/v3/mail/send'
# Allow 300 requests per minute.
# 60/300 = 0.2
request_rate_per_sec = 0.2
# The default subject to use if one isn't specified.
default_empty_subject = '<no subject>'
# Define object templates
templates = (
'{schema}://{apikey}:{from_email}',
'{schema}://{apikey}:{from_email}/{targets}',
)
# Define our template arguments
template_tokens = dict(NotifyBase.template_tokens, **{
'apikey': {
'name': _('API Key'),
'type': 'string',
'private': True,
'required': True,
'regex': (r'^[A-Z0-9._-]+$', 'i'),
},
'from_email': {
'name': _('Source Email'),
'type': 'string',
'required': True,
},
'target_email': {
'name': _('Target Email'),
'type': 'string',
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'to': {
'alias_of': 'targets',
},
'cc': {
'name': _('Carbon Copy'),
'type': 'list:string',
},
'bcc': {
'name': _('Blind Carbon Copy'),
'type': 'list:string',
},
'template': {
# Template ID
# The template ID is 64 characters with one dash (d-uuid)
'name': _('Template'),
'type': 'string',
},
})
# Support Template Dynamic Variables (Substitutions)
template_kwargs = {
'template_data': {
'name': _('Template Data'),
'prefix': '+',
},
}
def __init__(self, apikey, from_email, targets=None, cc=None,
bcc=None, template=None, template_data=None, **kwargs):
"""
Initialize Notify SendGrid Object
"""
super(NotifySendGrid, self).__init__(**kwargs)
# API Key (associated with project)
self.apikey = validate_regex(
apikey, *self.template_tokens['apikey']['regex'])
if not self.apikey:
msg = 'An invalid SendGrid API Key ' \
'({}) was specified.'.format(apikey)
self.logger.warning(msg)
raise TypeError(msg)
self.from_email = from_email
try:
result = GET_EMAIL_RE.match(self.from_email)
if not result:
# let outer exception handle this
raise TypeError
except (TypeError, AttributeError):
msg = 'Invalid ~From~ email specified: {}'.format(self.from_email)
self.logger.warning(msg)
raise TypeError(msg)
# Acquire Targets (To Emails)
self.targets = list()
# Acquire Carbon Copies
self.cc = set()
# Acquire Blind Carbon Copies
self.bcc = set()
# Now our dynamic template (if defined)
self.template = template
# Now our dynamic template data (if defined)
self.template_data = template_data \
if isinstance(template_data, dict) else {}
# Validate recipients (to:) and drop bad ones:
for recipient in parse_list(targets):
if GET_EMAIL_RE.match(recipient):
self.targets.append(recipient)
continue
self.logger.warning(
'Dropped invalid email '
'({}) specified.'.format(recipient),
)
# Validate recipients (cc:) and drop bad ones:
for recipient in parse_list(cc):
if GET_EMAIL_RE.match(recipient):
self.cc.add(recipient)
continue
self.logger.warning(
'Dropped invalid Carbon Copy email '
'({}) specified.'.format(recipient),
)
# Validate recipients (bcc:) and drop bad ones:
for recipient in parse_list(bcc):
if GET_EMAIL_RE.match(recipient):
self.bcc.add(recipient)
continue
self.logger.warning(
'Dropped invalid Blind Carbon Copy email '
'({}) specified.'.format(recipient),
)
if len(self.targets) == 0:
# Notify ourselves
self.targets.append(self.from_email)
return
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
}
if len(self.cc) > 0:
# Handle our Carbon Copy Addresses
args['cc'] = ','.join(self.cc)
if len(self.bcc) > 0:
# Handle our Blind Carbon Copy Addresses
args['bcc'] = ','.join(self.bcc)
if self.template:
# Handle our Template ID if if was specified
args['template'] = self.template
# Append our template_data into our args
args.update({'+{}'.format(k): v
for k, v in self.template_data.items()})
# a simple boolean check as to whether we display our target emails
# or not
has_targets = \
not (len(self.targets) == 1 and self.targets[0] == self.from_email)
return '{schema}://{apikey}:{from_email}/{targets}?{args}'.format(
schema=self.secure_protocol,
apikey=self.pprint(self.apikey, privacy, safe=''),
from_email=self.quote(self.from_email, safe='@'),
targets='' if not has_targets else '/'.join(
[NotifySendGrid.quote(x, safe='') for x in self.targets]),
args=NotifySendGrid.urlencode(args),
)
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform SendGrid Notification
"""
headers = {
'User-Agent': self.app_id,
'Content-Type': 'application/json',
'Authorization': 'Bearer {}'.format(self.apikey),
}
# error tracking (used for function return)
has_error = False
# A Simple Email Payload Template
_payload = {
'personalizations': [{
# Placeholder
'to': [{'email': None}],
}],
'from': {
'email': self.from_email,
},
# A subject is a requirement, so if none is specified we must
# set a default with at least 1 character or SendGrid will deny
# our request
'subject': title if title else self.default_empty_subject,
'content': [{
'type': 'text/plain'
if self.notify_format == NotifyFormat.TEXT else 'text/html',
'value': body,
}],
}
if self.template:
_payload['template_id'] = self.template
if self.template_data:
_payload['personalizations'][0]['dynamic_template_data'] = \
{k: v for k, v in self.template_data.items()}
targets = list(self.targets)
while len(targets) > 0:
target = targets.pop(0)
# Create a copy of our template
payload = _payload.copy()
# the cc, bcc, to field must be unique or SendMail will fail, the
# below code prepares this by ensuring the target isn't in the cc
# list or bcc list. It also makes sure the cc list does not contain
# any of the bcc entries
cc = (self.cc - self.bcc - set([target]))
bcc = (self.bcc - set([target]))
# Set our target
payload['personalizations'][0]['to'][0]['email'] = target
if len(cc):
payload['personalizations'][0]['cc'] = \
[{'email': email} for email in cc]
if len(bcc):
payload['personalizations'][0]['bcc'] = \
[{'email': email} for email in bcc]
self.logger.debug('SendGrid POST URL: %s (cert_verify=%r)' % (
self.notify_url, self.verify_certificate,
))
self.logger.debug('SendGrid Payload: %s' % str(payload))
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,
data=dumps(payload),
headers=headers,
verify=self.verify_certificate,
)
if r.status_code not in (
requests.codes.ok, requests.codes.accepted):
# We had a problem
status_str = \
NotifySendGrid.http_response_code_lookup(
r.status_code, SENDGRID_HTTP_ERROR_MAP)
self.logger.warning(
'Failed to send SendGrid notification to {}: '
'{}{}error={}.'.format(
target,
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# Mark our failure
has_error = True
continue
else:
self.logger.info(
'Sent SendGrid notification to {}.'.format(target))
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending SendGrid '
'notification to {}.'.format(target))
self.logger.debug('Socket Exception: %s' % str(e))
# Mark our failure
has_error = True
continue
return not has_error
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Our URL looks like this:
# {schema}://{apikey}:{from_email}/{targets}
#
# which actually equates to:
# {schema}://{user}:{password}@{host}/{email1}/{email2}/etc..
# ^ ^ ^
# | | |
# apikey -from addr-
if not results.get('user'):
# An API Key as not properly specified
return None
if not results.get('password'):
# A From Email was not correctly specified
return None
# Prepare our API Key
results['apikey'] = NotifySendGrid.unquote(results['user'])
# Prepare our From Email Address
results['from_email'] = '{}@{}'.format(
NotifySendGrid.unquote(results['password']),
NotifySendGrid.unquote(results['host']),
)
# Acquire our targets
results['targets'] = NotifySendGrid.split_path(results['fullpath'])
# The 'to' makes it easier to use yaml configuration
if 'to' in results['qsd'] and len(results['qsd']['to']):
results['targets'] += \
NotifySendGrid.parse_list(results['qsd']['to'])
# Handle Carbon Copy Addresses
if 'cc' in results['qsd'] and len(results['qsd']['cc']):
results['cc'] = \
NotifySendGrid.parse_list(results['qsd']['cc'])
# Handle Blind Carbon Copy Addresses
if 'bcc' in results['qsd'] and len(results['qsd']['bcc']):
results['bcc'] = \
NotifySendGrid.parse_list(results['qsd']['bcc'])
# Handle Blind Carbon Copy Addresses
if 'template' in results['qsd'] and len(results['qsd']['template']):
results['template'] = \
NotifySendGrid.unquote(results['qsd']['template'])
# Add any template substitutions
results['template_data'] = results['qsd+']
return results

View File

@ -0,0 +1,336 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from os import urandom
from json import loads
import requests
from .NotifyBase import NotifyBase
from ..URLBase import PrivacyMode
from ..common import NotifyType
from ..utils import validate_regex
from ..AppriseLocale import gettext_lazy as _
# Default our global support flag
CRYPTOGRAPHY_AVAILABLE = False
try:
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher
from cryptography.hazmat.primitives.ciphers import algorithms
from cryptography.hazmat.primitives.ciphers import modes
from cryptography.hazmat.backends import default_backend
from base64 import urlsafe_b64encode
import hashlib
CRYPTOGRAPHY_AVAILABLE = True
except ImportError:
# no problem; this just means the added encryption functionality isn't
# available. You can still send a SimplePush message
pass
class NotifySimplePush(NotifyBase):
"""
A wrapper for SimplePush Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'SimplePush'
# The services URL
service_url = 'https://simplepush.io/'
# The default secure protocol
secure_protocol = 'spush'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_simplepush'
# SimplePush uses the http protocol with SimplePush requests
notify_url = 'https://api.simplepush.io/send'
# The maximum allowable characters allowed in the body per message
body_maxlen = 10000
# Defines the maximum allowable characters in the title
title_maxlen = 1024
# Define object templates
templates = (
'{schema}://{apikey}',
'{schema}://{salt}:{password}@{apikey}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'apikey': {
'name': _('API Key'),
'type': 'string',
'private': True,
'required': True,
},
# Used for encrypted logins
'password': {
'name': _('Encrypted Password'),
'type': 'string',
'private': True,
},
'salt': {
'name': _('Encrypted Salt'),
'type': 'string',
'private': True,
'map_to': 'user',
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'event': {
'name': _('Event'),
'type': 'string',
},
})
def __init__(self, apikey, event=None, **kwargs):
"""
Initialize SimplePush Object
"""
super(NotifySimplePush, self).__init__(**kwargs)
# API Key (associated with project)
self.apikey = validate_regex(apikey)
if not self.apikey:
msg = 'An invalid SimplePush API Key ' \
'({}) was specified.'.format(apikey)
self.logger.warning(msg)
raise TypeError(msg)
if event:
# Event Name (associated with project)
self.event = validate_regex(event)
if not self.event:
msg = 'An invalid SimplePush Event Name ' \
'({}) was specified.'.format(event)
self.logger.warning(msg)
raise TypeError(msg)
else:
# Default Event Name
self.event = None
# Encrypt Message (providing support is available)
if self.password and self.user and not CRYPTOGRAPHY_AVAILABLE:
# Provide the end user at least some notification that they're
# not getting what they asked for
self.logger.warning(
'SimplePush extended encryption is not supported by this '
'system.')
# Used/cached in _encrypt() function
self._iv = None
self._iv_hex = None
self._key = None
def _encrypt(self, content):
"""
Encrypts message for use with SimplePush
"""
if self._iv is None:
# initialization vector and cache it
self._iv = urandom(algorithms.AES.block_size // 8)
# convert vector into hex string (used in payload)
self._iv_hex = ''.join(["{:02x}".format(ord(self._iv[idx:idx + 1]))
for idx in range(len(self._iv))]).upper()
# encrypted key and cache it
self._key = bytes(bytearray.fromhex(
hashlib.sha1('{}{}'.format(self.password, self.user)
.encode('utf-8')).hexdigest()[0:32]))
padder = padding.PKCS7(algorithms.AES.block_size).padder()
content = padder.update(content.encode()) + padder.finalize()
encryptor = Cipher(
algorithms.AES(self._key),
modes.CBC(self._iv),
default_backend()).encryptor()
return urlsafe_b64encode(
encryptor.update(content) + encryptor.finalize())
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform SimplePush Notification
"""
headers = {
'User-Agent': self.app_id,
'Content-type': "application/x-www-form-urlencoded",
}
# Prepare our payload
payload = {
'key': self.apikey,
}
if self.password and self.user and CRYPTOGRAPHY_AVAILABLE:
body = self._encrypt(body)
title = self._encrypt(title)
payload.update({
'encrypted': 'true',
'iv': self._iv_hex,
})
# prepare SimplePush Object
payload.update({
'msg': body,
'title': title,
})
if self.event:
# Store Event
payload['event'] = self.event
self.logger.debug('SimplePush POST URL: %s (cert_verify=%r)' % (
self.notify_url, self.verify_certificate,
))
self.logger.debug('SimplePush Payload: %s' % str(payload))
# We need to rely on the status string returned in the SimplePush
# response
status_str = None
status = None
# Always call throttle before any remote server i/o is made
self.throttle()
try:
r = requests.post(
self.notify_url,
data=payload,
headers=headers,
verify=self.verify_certificate,
)
# Get our SimplePush response (if it's possible)
try:
json_response = loads(r.content)
status_str = json_response.get('message')
status = json_response.get('status')
except (TypeError, ValueError, AttributeError):
# TypeError = r.content is not a String
# ValueError = r.content is Unparsable
# AttributeError = r.content is None
pass
if r.status_code != requests.codes.ok or status != 'OK':
# We had a problem
status_str = status_str if status_str else\
NotifyBase.http_response_code_lookup(r.status_code)
self.logger.warning(
'Failed to send SimplePush notification:'
'{}{}error={}.'.format(
status_str,
', ' if status_str else '',
r.status_code))
self.logger.debug('Response Details:\r\n{}'.format(r.content))
# Return; we're done
return False
else:
self.logger.info('Sent SimplePush notification.')
except requests.RequestException as e:
self.logger.warning(
'A Connection error occured sending SimplePush notification.')
self.logger.debug('Socket Exception: %s' % str(e))
# Return; we're done
return False
return True
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
}
if self.event:
args['event'] = self.event
# Determine Authentication
auth = ''
if self.user and self.password:
auth = '{salt}:{password}@'.format(
salt=self.pprint(
self.user, privacy, mode=PrivacyMode.Secret, safe=''),
password=self.pprint(
self.password, privacy, mode=PrivacyMode.Secret, safe=''),
)
return '{schema}://{auth}{apikey}/?{args}'.format(
schema=self.secure_protocol,
auth=auth,
apikey=self.pprint(self.apikey, privacy, safe=''),
args=NotifySimplePush.urlencode(args),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Set the API Key
results['apikey'] = NotifySimplePush.unquote(results['host'])
# Event
if 'event' in results['qsd'] and len(results['qsd']['event']):
# Extract the account sid from an argument
results['event'] = \
NotifySimplePush.unquote(results['qsd']['event'])
return results

View File

@ -0,0 +1,293 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import syslog
from .NotifyBase import NotifyBase
from ..common import NotifyType
from ..utils import parse_bool
from ..AppriseLocale import gettext_lazy as _
class SyslogFacility:
"""
All of the supported facilities
"""
KERN = 'kern'
USER = 'user'
MAIL = 'mail'
DAEMON = 'daemon'
AUTH = 'auth'
SYSLOG = 'syslog'
LPR = 'lpr'
NEWS = 'news'
UUCP = 'uucp'
CRON = 'cron'
LOCAL0 = 'local0'
LOCAL1 = 'local1'
LOCAL2 = 'local2'
LOCAL3 = 'local3'
LOCAL4 = 'local4'
LOCAL5 = 'local5'
LOCAL6 = 'local6'
LOCAL7 = 'local7'
SYSLOG_FACILITY_MAP = {
SyslogFacility.KERN: syslog.LOG_KERN,
SyslogFacility.USER: syslog.LOG_USER,
SyslogFacility.MAIL: syslog.LOG_MAIL,
SyslogFacility.DAEMON: syslog.LOG_DAEMON,
SyslogFacility.AUTH: syslog.LOG_AUTH,
SyslogFacility.SYSLOG: syslog.LOG_SYSLOG,
SyslogFacility.LPR: syslog.LOG_LPR,
SyslogFacility.NEWS: syslog.LOG_NEWS,
SyslogFacility.UUCP: syslog.LOG_UUCP,
SyslogFacility.CRON: syslog.LOG_CRON,
SyslogFacility.LOCAL0: syslog.LOG_LOCAL0,
SyslogFacility.LOCAL1: syslog.LOG_LOCAL1,
SyslogFacility.LOCAL2: syslog.LOG_LOCAL2,
SyslogFacility.LOCAL3: syslog.LOG_LOCAL3,
SyslogFacility.LOCAL4: syslog.LOG_LOCAL4,
SyslogFacility.LOCAL5: syslog.LOG_LOCAL5,
SyslogFacility.LOCAL6: syslog.LOG_LOCAL6,
SyslogFacility.LOCAL7: syslog.LOG_LOCAL7,
}
SYSLOG_FACILITY_RMAP = {
syslog.LOG_KERN: SyslogFacility.KERN,
syslog.LOG_USER: SyslogFacility.USER,
syslog.LOG_MAIL: SyslogFacility.MAIL,
syslog.LOG_DAEMON: SyslogFacility.DAEMON,
syslog.LOG_AUTH: SyslogFacility.AUTH,
syslog.LOG_SYSLOG: SyslogFacility.SYSLOG,
syslog.LOG_LPR: SyslogFacility.LPR,
syslog.LOG_NEWS: SyslogFacility.NEWS,
syslog.LOG_UUCP: SyslogFacility.UUCP,
syslog.LOG_CRON: SyslogFacility.CRON,
syslog.LOG_LOCAL0: SyslogFacility.LOCAL0,
syslog.LOG_LOCAL1: SyslogFacility.LOCAL1,
syslog.LOG_LOCAL2: SyslogFacility.LOCAL2,
syslog.LOG_LOCAL3: SyslogFacility.LOCAL3,
syslog.LOG_LOCAL4: SyslogFacility.LOCAL4,
syslog.LOG_LOCAL5: SyslogFacility.LOCAL5,
syslog.LOG_LOCAL6: SyslogFacility.LOCAL6,
syslog.LOG_LOCAL7: SyslogFacility.LOCAL7,
}
class NotifySyslog(NotifyBase):
"""
A wrapper for Syslog Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'Syslog'
# The services URL
service_url = 'https://tools.ietf.org/html/rfc5424'
# The default secure protocol
secure_protocol = 'syslog'
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_syslog'
# Disable throttle rate for Syslog requests since they are normally
# local anyway
request_rate_per_sec = 0
# Title to be added to body if present
title_maxlen = 0
# Define object templates
templates = (
'{schema}://',
'{schema}://{facility}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'facility': {
'name': _('Facility'),
'type': 'choice:string',
'values': [k for k in SYSLOG_FACILITY_MAP.keys()],
'default': SyslogFacility.USER,
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'facility': {
# We map back to the same element defined in template_tokens
'alias_of': 'facility',
},
'logpid': {
'name': _('Log PID'),
'type': 'bool',
'default': True,
'map_to': 'log_pid',
},
'logperror': {
'name': _('Log to STDERR'),
'type': 'bool',
'default': False,
'map_to': 'log_perror',
},
})
def __init__(self, facility=None, log_pid=True, log_perror=False,
**kwargs):
"""
Initialize Syslog Object
"""
super(NotifySyslog, self).__init__(**kwargs)
if facility:
try:
self.facility = SYSLOG_FACILITY_MAP[facility]
except KeyError:
msg = 'An invalid syslog facility ' \
'({}) was specified.'.format(facility)
self.logger.warning(msg)
raise TypeError(msg)
else:
self.facility = \
SYSLOG_FACILITY_MAP[
self.template_tokens['facility']['default']]
# Logging Options
self.logoptions = 0
# Include PID with each message.
# This may not appear evident if using journalctl since the pid
# will always display itself; however it will appear visible
# for log_perror combinations
self.log_pid = log_pid
# Print to stderr as well.
self.log_perror = log_perror
if log_pid:
self.logoptions |= syslog.LOG_PID
if log_perror:
self.logoptions |= syslog.LOG_PERROR
# Initialize our loggig
syslog.openlog(
self.app_id, logoption=self.logoptions, facility=self.facility)
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
"""
Perform Syslog Notification
"""
_pmap = {
NotifyType.INFO: syslog.LOG_INFO,
NotifyType.SUCCESS: syslog.LOG_NOTICE,
NotifyType.FAILURE: syslog.LOG_CRIT,
NotifyType.WARNING: syslog.LOG_WARNING,
}
# Always call throttle before any remote server i/o is made
self.throttle()
try:
syslog.syslog(_pmap[notify_type], body)
except KeyError:
# An invalid notification type was specified
self.logger.warning(
'An invalid notification type '
'({}) was specified.'.format(notify_type))
return False
return True
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'logperror': 'yes' if self.log_perror else 'no',
'logpid': 'yes' if self.log_pid else 'no',
'format': self.notify_format,
'overflow': self.overflow_mode,
'verify': 'yes' if self.verify_certificate else 'no',
}
return '{schema}://{facility}/?{args}'.format(
facility=self.template_tokens['facility']['default']
if self.facility not in SYSLOG_FACILITY_RMAP
else SYSLOG_FACILITY_RMAP[self.facility],
schema=self.secure_protocol,
args=NotifySyslog.urlencode(args),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
return results
# if specified; save hostname into facility
facility = None if not results['host'] \
else NotifySyslog.unquote(results['host'])
# However if specified on the URL, that will over-ride what was
# identified
if 'facility' in results['qsd'] and len(results['qsd']['facility']):
facility = results['qsd']['facility'].lower()
if facility and facility not in SYSLOG_FACILITY_MAP:
# Find first match; if no match is found we set the result
# to the matching key. This allows us to throw a TypeError
# during the __init__() call. The benifit of doing this
# check here is if we do have a valid match, we can support
# short form matches like 'u' which will match against user
facility = next((f for f in SYSLOG_FACILITY_MAP.keys()
if f.startswith(facility)), facility)
# Save facility
results['facility'] = facility
# Include PID as part of the message logged
results['log_pid'] = \
parse_bool(results['qsd'].get('logpid', True))
# Print to stderr as well.
results['log_perror'] = \
parse_bool(results['qsd'].get('logperror', False))
return results