Source code for awslimitchecker.trustedadvisor

"""
awslimitchecker/trustedadvisor.py

The latest version of this package is available at:
<https://github.com/jantman/awslimitchecker>

################################################################################
Copyright 2015 Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>

    This file is part of awslimitchecker, also known as awslimitchecker.

    awslimitchecker is free software: you can redistribute it and/or modify
    it under the terms of the GNU Affero General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    awslimitchecker is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Affero General Public License for more details.

    You should have received a copy of the GNU Affero General Public License
    along with awslimitchecker.  If not, see <http://www.gnu.org/licenses/>.

The Copyright and Authors attributions contained herein may not be removed or
otherwise altered, except to add the Author attribution of a contributor to
this work. (Additional Terms pursuant to Section 7b of the AGPL v3)
################################################################################
While not legally required, I sincerely request that anyone who finds
bugs please submit them at <https://github.com/jantman/awslimitchecker> or
to me via email, and that you send any contributions or improvements
either as a pull request on GitHub, or to me via email.
################################################################################

AUTHORS:
Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>
################################################################################
"""

from botocore.exceptions import ClientError
from dateutil import parser
import logging
from .connectable import Connectable

logger = logging.getLogger(__name__)


[docs]class TrustedAdvisor(Connectable): """ Class to handle interaction with TrustedAdvisor API, polling TA and updating limits from TA information. """ service_name = 'TrustedAdvisor' api_name = 'support'
[docs] def __init__(self, all_services, account_id=None, account_role=None, region=None, external_id=None, mfa_serial_number=None, mfa_token=None): """ Class to contain all TrustedAdvisor-related logic. :param all_services: :py:class:`~.checker.AwsLimitChecker` ``services`` dictionary. :type all_services: dict :param account_id: `AWS Account ID <http://docs.aws.amazon.com/general/ latest/gr/acct-identifiers.html>`_ (12-digit string, currently numeric) for the account to connect to (destination) via STS :type account_id: str :param account_role: the name of an `IAM Role <http://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles. html>`_ (in the destination account) to assume :param region: AWS region name to connect to :type region: str :type account_role: str :param external_id: (optional) the `External ID <http://docs.aws.amazon. com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html>`_ string to use when assuming a role via STS. :type external_id: str :param mfa_serial_number: (optional) the `MFA Serial Number` string to use when assuming a role via STS. :type mfa_serial_number: str :param mfa_token: (optional) the `MFA Token` string to use when assuming a role via STS. :type mfa_token: str """ self.conn = None self.have_ta = True self.account_id = account_id self.account_role = account_role self.region = 'us-east-1' self.ta_region = region self.external_id = external_id self.mfa_serial_number = mfa_serial_number self.mfa_token = mfa_token self.all_services = all_services self.ta_services = self._make_ta_service_dict() self.limits_updated = False
[docs] def update_limits(self): """ Poll 'Service Limits' check results from Trusted Advisor, if possible. Iterate over all :py:class:`~.AwsLimit` objects for the given services and update their limits from TA if present in TA checks. :param services: dict of service name (string) to :py:class:`~._AwsService` objects :type services: dict """ if self.limits_updated: logger.debug('Already polled TA; skipping update') return self.connect() ta_results = self._poll() self._update_services(ta_results) self.limits_updated = True
[docs] def _poll(self): """ Poll Trusted Advisor (Support) API for limit checks. Return a dict of service name (string) keys to nested dict vals, where each key is a limit name and each value the current numeric limit. e.g.: :: { 'EC2': { 'SomeLimit': 10, } } """ logger.info("Beginning TrustedAdvisor poll") tmp = self._get_limit_check_id() if not self.have_ta: logger.info('TrustedAdvisor.have_ta is False; not polling TA') return {} if tmp is None: logger.critical("Unable to find 'Service Limits' Trusted Advisor " "check; not using Trusted Advisor data.") return check_id, metadata = tmp region = self.ta_region or self.conn._client_config.region_name checks = self.conn.describe_trusted_advisor_check_result( checkId=check_id, language='en' ) check_datetime = parser.parse(checks['result']['timestamp']) logger.debug("Got TrustedAdvisor data for check %s as of %s", check_id, check_datetime) res = {} for check in checks['result']['flaggedResources']: if 'region' in check and check['region'] != region: continue data = dict(zip(metadata, check['metadata'])) if data['Service'] not in res: res[data['Service']] = {} res[data['Service']][data['Limit Name']] = int(data['Limit Amount']) logger.info("Finished TrustedAdvisor poll") return res
[docs] def _get_limit_check_id(self): """ Query currently-available TA checks, return the check ID and metadata of the 'performance/Service Limits' check. :returns: 2-tuple of Service Limits TA check ID (string), metadata (list), or (None, None). :rtype: tuple """ logger.debug("Querying Trusted Advisor checks") try: checks = self.conn.describe_trusted_advisor_checks( language='en' )['checks'] except ClientError as ex: if ex.response['Error']['Code'] == 'SubscriptionRequiredException': logger.warning( "Cannot check TrustedAdvisor: %s", ex.response['Error']['Message'] ) self.have_ta = False return (None, None) else: raise ex for check in checks: if ( check['category'] == 'performance' and check['name'] == 'Service Limits' ): logger.debug("Found TA check; id=%s", check['id']) return ( check['id'], check['metadata'] ) logger.debug("Unable to find check with category 'performance' and " "name 'Service Limits'.") return (None, None)
[docs] def _update_services(self, ta_results): """ Given a dict of TrustedAdvisor check results from :py:meth:`~._poll` and a dict of Service objects passed in to :py:meth:`~.update_limits`, updated the TrustedAdvisor limits for all services. :param ta_results: results returned by :py:meth:`~._poll` :type ta_results: dict :param services: dict of service names to _AwsService objects :type services: dict """ logger.debug("Updating TA limits on all services") for svc_name in sorted(ta_results.keys()): svc_results = ta_results[svc_name] if svc_name not in self.ta_services: logger.info("TrustedAdvisor returned check results for " "unknown service '%s'", svc_name) continue svc_limits = self.ta_services[svc_name] for lim_name in sorted(svc_results): if lim_name not in svc_limits: logger.info("TrustedAdvisor returned check results for " "unknown limit '%s' (service %s)", lim_name, svc_name) continue svc_limits[lim_name]._set_ta_limit(svc_results[lim_name]) logger.info("Done updating TA limits on all services")
[docs] def _make_ta_service_dict(self): """ Build our service and limits dict. This is laid out identical to ``self.all_services``, but keys limits by their ``ta_service_name`` and ``ta_limit_name`` properties. :return: dict of TA service names to TA limit names to AwsLimit objects. """ res = {} for svc_name in self.all_services: svc_obj = self.all_services[svc_name] for lim_name, lim in svc_obj.get_limits().items(): if lim.ta_service_name not in res: res[lim.ta_service_name] = {} res[lim.ta_service_name][lim.ta_limit_name] = lim return res