Source code for awslimitchecker.trustedadvisor

"""
awslimitchecker/trustedadvisor.py

The latest version of this package is available at:
<https://github.com/jantman/awslimitchecker>

################################################################################
Copyright 2015 Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>

    This file is part of awslimitchecker, also known as awslimitchecker.

    awslimitchecker is free software: you can redistribute it and/or modify
    it under the terms of the GNU Affero General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    awslimitchecker is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Affero General Public License for more details.

    You should have received a copy of the GNU Affero General Public License
    along with awslimitchecker.  If not, see <http://www.gnu.org/licenses/>.

The Copyright and Authors attributions contained herein may not be removed or
otherwise altered, except to add the Author attribution of a contributor to
this work. (Additional Terms pursuant to Section 7b of the AGPL v3)
################################################################################
While not legally required, I sincerely request that anyone who finds
bugs please submit them at <https://github.com/jantman/awslimitchecker> or
to me via email, and that you send any contributions or improvements
either as a pull request on GitHub, or to me via email.
################################################################################

AUTHORS:
Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>
################################################################################
"""

import boto
import boto.support
from dateutil import parser
import logging
from .connectable import Connectable
from .utils import boto_query_wrapper

logger = logging.getLogger(__name__)


[docs]class TrustedAdvisor(Connectable): """ Class to handle interaction with TrustedAdvisor API, polling TA and updating limits from TA information. """ service_name = 'TrustedAdvisor'
[docs] def __init__(self, account_id=None, account_role=None, region=None, external_id=None): """ Class to contain all TrustedAdvisor-related logic. :param account_id: `AWS Account ID <http://docs.aws.amazon.com/general/ latest/gr/acct-identifiers.html>`_ (12-digit string, currently numeric) for the account to connect to (destination) via STS :type account_id: str :param account_role: the name of an `IAM Role <http://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles. html>`_ (in the destination account) to assume :param region: AWS region name to connect to :type region: str :type account_role: str :param external_id: (optional) the `External ID <http://docs.aws.amazon. com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html>`_ string to use when assuming a role via STS. :type external_id: str """ self.conn = None self.have_ta = True self.account_id = account_id self.account_role = account_role self.region = 'us-east-1' self.ta_region = region self.external_id = external_id
[docs] def connect(self): """Connect to API if not already connected; set self.conn.""" if self.conn is not None: return if self.ta_region: logger.debug("Connecting to Support API (TrustedAdvisor) in %s", self.region) self.conn = self.connect_via(boto.support.connect_to_region) else: logger.debug("Connecting to Support API (TrustedAdvisor)") self.conn = boto.connect_support() logger.debug("Connected to Support API")
[docs] def update_limits(self, services): """ Poll 'Service Limits' check results from Trusted Advisor, if possible. Iterate over all :py:class:`~.AwsLimit` objects for the given services and update their limits from TA if present in TA checks. :param services: dict of service name (string) to :py:class:`~._AwsService` objects :type services: dict """ self.connect() ta_results = self._poll() self._update_services(ta_results, services)
[docs] def _poll(self): """ Poll Trusted Advisor (Support) API for limit checks. Return a dict of service name (string) keys to nested dict vals, where each key is a limit name and each value the current numeric limit. e.g.: :: { 'EC2': { 'SomeLimit': 10, } } """ logger.info("Beginning TrustedAdvisor poll") tmp = self._get_limit_check_id() if not self.have_ta: logger.info('TrustedAdvisor.have_ta is False; not polling TA') return {} if tmp is None: logger.critical("Unable to find 'Service Limits' Trusted Advisor " "check; not using Trusted Advisor data.") return check_id, metadata = tmp region = self.ta_region or self.conn.region.name checks = boto_query_wrapper( self.conn.describe_trusted_advisor_check_result, check_id ) check_datetime = parser.parse(checks['result']['timestamp']) logger.debug("Got TrustedAdvisor data for check %s as of %s", check_id, check_datetime) res = {} for check in checks['result']['flaggedResources']: if check['region'] != region: continue data = dict(zip(metadata, check['metadata'])) if data['Service'] not in res: res[data['Service']] = {} res[data['Service']][data['Limit Name']] = int(data['Limit Amount']) logger.info("Finished TrustedAdvisor poll") return res
[docs] def _get_limit_check_id(self): """ Query currently-available TA checks, return the check ID and metadata of the 'performance/Service Limits' check. :returns: 2-tuple of Service Limits TA check ID (string), metadata (list), or (None, None). :rtype: tuple """ logger.debug("Querying Trusted Advisor checks") try: checks = boto_query_wrapper( self.conn.describe_trusted_advisor_checks, 'en' )['checks'] except boto.exception.JSONResponseError as ex: if ( '__type' in ex.body and ex.body['__type'] == 'SubscriptionRequiredException' ): logger.warning( "Cannot check TrustedAdvisor: %s", ex.message ) self.have_ta = False return (None, None) else: raise ex for check in checks: if ( check['category'] == 'performance' and check['name'] == 'Service Limits' ): logger.debug("Found TA check; id=%s", check['id']) return ( check['id'], check['metadata'] ) logger.debug("Unable to find check with category 'performance' and " "name 'Service Limits'.") return (None, None)
[docs] def _update_services(self, ta_results, services): """ Given a dict of TrustedAdvisor check results from :py:meth:`~._poll` and a dict of Service objects passed in to :py:meth:`~.update_limits`, updated the TrustedAdvisor limits for all services. :param ta_results: results returned by :py:meth:`~._poll` :type ta_results: dict :param services: dict of service names to _AwsService objects :type services: dict """ logger.debug("Updating TA limits on all services") for svc_name in sorted(ta_results.keys()): limits = ta_results[svc_name] if svc_name not in services: logger.info("TrustedAdvisor returned check results for " "unknown service '%s'", svc_name) continue service = services[svc_name] for lim_name in sorted(limits.keys()): try: # @TODO - if we have ANY MORE special cases, we need a # better way of handling this - maybe with a mapping if svc_name == 'VPC' and lim_name == 'VPC Elastic IP ' \ 'addresses (EIPs)': services['EC2']._set_ta_limit( lim_name, limits[lim_name] ) else: service._set_ta_limit(lim_name, limits[lim_name]) except ValueError: logger.info("TrustedAdvisor returned check results for " "unknown limit '%s' (service %s)", lim_name, svc_name) logger.info("Done updating TA limits on all services")