Source code for awslimitchecker.alerts.pagerdutyv1

"""
awslimitchecker/alerts/pagerdutyv1.py

The latest version of this package is available at:
<https://github.com/jantman/awslimitchecker>

################################################################################
Copyright 2015-2019 Jason Antman <jason@jasonantman.com>

    This file is part of awslimitchecker, also known as awslimitchecker.

    awslimitchecker is free software: you can redistribute it and/or modify
    it under the terms of the GNU Affero General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    awslimitchecker is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Affero General Public License for more details.

    You should have received a copy of the GNU Affero General Public License
    along with awslimitchecker.  If not, see <http://www.gnu.org/licenses/>.

The Copyright and Authors attributions contained herein may not be removed or
otherwise altered, except to add the Author attribution of a contributor to
this work. (Additional Terms pursuant to Section 7b of the AGPL v3)
################################################################################
While not legally required, I sincerely request that anyone who finds
bugs please submit them at <https://github.com/jantman/awslimitchecker> or
to me via email, and that you send any contributions or improvements
either as a pull request on GitHub, or to me via email.
################################################################################

AUTHORS:
Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>
################################################################################
"""

import os
import logging
import urllib3
import json

from .base import AlertProvider
from awslimitchecker.utils import issue_string_tuple

logger = logging.getLogger(__name__)


[docs]class PagerDutyV1(AlertProvider): """ Alert provider to send notifications to PagerDuty via the Events API V1. """ pd_url = 'https://events.pagerduty.com/generic/2010-04-15/create_event.json'
[docs] def __init__( self, region_name, account_alias=None, critical_service_key=None, warning_service_key=None, incident_key=None ): """ Initialize PagerDutyV1 alert provider. :param region_name: the name of the region we're connected to :type region_name: str :param account_alias: Optional; an alias for the account that awslimitchecker is currently running against, to use in the default incident_key and description. :param critical_service_key: **Required**; the PagerDuty Integration Key for sending Critical events. Can also be specified via the ``PAGERDUTY_SERVICE_KEY_CRIT`` environment variable. :type critical_service_key: str :param warning_service_key: **Required**; the PagerDuty Integration Key for sending Warning events. Can also be specified via the ``PAGERDUTY_SERVICE_KEY_WARN`` environment variable. If omitted, alerts will not be sent for warnings. :type warning_service_key: str :param incident_key: Optional; the PagerDuty incident/routing key to use, for de-duplication and resolving alerts. This string will have any occurrences of ``{account_alias}`` replaced with the account alias (or an empty string, if not specified) and any occurrences of ``{region_name}`` replaced with the current region name. If not specified, this will default to ``awslimitchecker-{account_alias}-{region_name}``. :type incident_key: str """ super(PagerDutyV1, self).__init__(region_name) self._service_key_crit = os.environ.get( 'PAGERDUTY_SERVICE_KEY_CRIT', None ) if critical_service_key is not None: self._service_key_crit = critical_service_key if self._service_key_crit is None: raise RuntimeError( 'ERROR: PagerDutyV1 alert provider requires ' 'critical_service_key parameter or PAGERDUTY_SERVICE_KEY_CRIT ' 'environment variable.' ) self._service_key_warn = os.environ.get( 'PAGERDUTY_SERVICE_KEY_WARN', None ) if warning_service_key is not None: self._service_key_warn = warning_service_key self._account_alias = account_alias if incident_key is None: incident_key = 'awslimitchecker-{account_alias}-{region_name}' self._incident_key = incident_key.format( account_alias='' if self._account_alias is None else self._account_alias, region_name=self._region_name )
[docs] def _send_event(self, service_key, payload): """ Send an event to PagerDuty. :param service_key: service key to send to :type service_key: str :param payload: data to send with event :type payload: dict """ payload['service_key'] = service_key http = urllib3.PoolManager() logger.info( 'POSTing to PagerDuty Events API (%s): %s', self.pd_url, payload ) encoded = json.dumps(payload, sort_keys=True).encode('utf-8') resp = http.request( 'POST', self.pd_url, headers={'Content-type': 'application/json'}, body=encoded ) if resp.status == 200: logger.debug( 'Successfully POSTed to PagerDuty; HTTP %d: %s', resp.status, resp.data ) return raise RuntimeError( 'ERROR creating PagerDuty Event; API responded HTTP %d: %s' % ( resp.status, resp.data ) )
[docs] def _event_dict(self): """ Return a skeleton dictionary for the PagerDuty V1 Event. :return: skeleton of Event :rtype: dict """ d = { 'incident_key': self._incident_key, 'details': { 'region': self._region_name }, 'client': 'awslimitchecker' } if self._account_alias is not None: d['details']['account_alias'] = self._account_alias return d
[docs] def on_success(self, duration=None): """ Method called when no thresholds were breached, and run completed successfully. Should resolve any open incidents (if the service supports that functionality) or else simply return. :param duration: duration of the usage/threshold checking run :type duration: float """ data = self._event_dict() data['event_type'] = 'resolve' data['description'] = 'awslimitchecker in ' if self._account_alias is not None: data['description'] += self._account_alias + ' ' data['description'] += self._region_name + ' found no problems' if duration: data['description'] += '; run completed in %.2f seconds' % duration data['details']['duration_seconds'] = duration self._send_event(self._service_key_crit, data) if self._service_key_warn is not None: self._send_event(self._service_key_warn, data)
[docs] def _problems_dict(self, problems): """ Make a dict of problems suitable for inclusion in Event details. :param problems: dict of service name to nested dict of limit name to limit, same format as the return value of :py:meth:`~.AwsLimitChecker.check_thresholds`. ``None`` if ``exc`` is specified. :type problems: dict :return: problems summary suitable for Event details :rtype: dict """ res = {} w_count = 0 c_count = 0 for svc in sorted(problems.keys()): for lim_name in sorted(problems[svc].keys()): limit = problems[svc][lim_name] warns = limit.get_warnings() w_count += len(warns) crits = limit.get_criticals() c_count += len(crits) _, v = issue_string_tuple( svc, limit, crits, warns, colorize=False ) if svc not in res: res[svc] = {} res[svc][lim_name] = v return w_count, c_count, res
[docs] def on_critical(self, problems, problem_str, exc=None, duration=None): """ Method called when the run encountered errors, or at least one critical threshold was met or crossed. :param problems: dict of service name to nested dict of limit name to limit, same format as the return value of :py:meth:`~.AwsLimitChecker.check_thresholds`. ``None`` if ``exc`` is specified. :type problems: dict or None :param problem_str: String representation of ``problems``, as displayed in ``awslimitchecker`` command line output. ``None`` if ``exc`` is specified. :type problem_str: str or None :param exc: Exception object that was raised during the run (optional) :type exc: Exception :param duration: duration of the run :type duration: float """ data = self._event_dict() data['event_type'] = 'trigger' data['description'] = 'awslimitchecker in ' if self._account_alias is not None: data['description'] += self._account_alias + ' ' data['description'] += self._region_name if duration: data['description'] += ' ran in %.2f seconds and' % duration data['details']['duration_seconds'] = duration if exc is not None: data['description'] += ' failed with an exception:' \ ' %s' % exc.__repr__() data['details']['exception'] = exc.__repr__() else: w_count, c_count, pdict = self._problems_dict(problems) data['description'] += ' crossed %d CRITICAL thresholds' % c_count if w_count > 0: data['description'] += ' and %d WARNING thresholds' % w_count data['details']['limits'] = pdict self._send_event(self._service_key_crit, data)
[docs] def on_warning(self, problems, problem_str, duration=None): """ Method called when one or more warning thresholds were crossed, but no criticals and the run did not encounter any errors. :param problems: dict of service name to nested dict of limit name to limit, same format as the return value of :py:meth:`~.AwsLimitChecker.check_thresholds`. :type problems: dict or None :param problem_str: String representation of ``problems``, as displayed in ``awslimitchecker`` command line output. :type problem_str: str or None :param duration: duration of the run :type duration: float """ data = self._event_dict() data['event_type'] = 'trigger' data['description'] = 'awslimitchecker in ' if self._account_alias is not None: data['description'] += self._account_alias + ' ' data['description'] += self._region_name if duration: data['description'] += ' ran in %.2f seconds and' % duration data['details']['duration_seconds'] = duration w_count, _, pdict = self._problems_dict(problems) data['description'] += ' crossed %d WARNING thresholds' % w_count data['details']['limits'] = pdict self._send_event(self._service_key_warn, data)