"""
awslimitchecker/services/rds.py
The latest version of this package is available at:
<https://github.com/jantman/awslimitchecker>
################################################################################
Copyright 2015 Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>
This file is part of awslimitchecker, also known as awslimitchecker.
awslimitchecker is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
awslimitchecker is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with awslimitchecker. If not, see <http://www.gnu.org/licenses/>.
The Copyright and Authors attributions contained herein may not be removed or
otherwise altered, except to add the Author attribution of a contributor to
this work. (Additional Terms pursuant to Section 7b of the AGPL v3)
################################################################################
While not legally required, I sincerely request that anyone who finds
bugs please submit them at <https://github.com/jantman/pydnstest> or
to me via email, and that you send any contributions or improvements
either as a pull request on GitHub, or to me via email.
################################################################################
AUTHORS:
Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>
################################################################################
"""
import abc # noqa
import boto
import boto.rds2
import logging
from .base import _AwsService
from ..limit import AwsLimit
from ..utils import boto_query_wrapper
logger = logging.getLogger(__name__)
[docs]class _RDSService(_AwsService):
service_name = 'RDS'
[docs] def connect(self):
"""Connect to API if not already connected; set self.conn."""
if self.conn is not None:
return
elif self.region:
self.conn = self.connect_via(boto.rds2.connect_to_region)
else:
self.conn = boto.connect_rds2()
[docs] def find_usage(self):
"""
Determine the current usage for each limit of this service,
and update corresponding Limit via
:py:meth:`~.AwsLimit._add_current_usage`.
"""
logger.debug("Checking usage for service %s", self.service_name)
self.connect()
for lim in self.limits.values():
lim._reset_usage()
self._find_usage_instances()
self._find_usage_snapshots()
self._find_usage_param_groups()
self._find_usage_subnet_groups()
self._find_usage_option_groups()
self._find_usage_event_subscriptions()
self._find_usage_security_groups()
self._find_usage_reserved_instances()
self._have_usage = True
logger.debug("Done checking usage.")
[docs] def _find_usage_instances(self):
"""find usage for DB Instances and related limits"""
# instance count
instances = boto_query_wrapper(
self.conn.describe_db_instances,
alc_marker_path=[
'DescribeDBInstancesResponse',
'DescribeDBInstancesResult',
'Marker'
],
alc_data_path=[
'DescribeDBInstancesResponse',
'DescribeDBInstancesResult',
'DBInstances'
],
alc_marker_param='marker'
)
instances = instances[
'DescribeDBInstancesResponse'][
'DescribeDBInstancesResult']['DBInstances']
self.limits['DB instances']._add_current_usage(
len(instances),
aws_type='AWS::RDS::DBInstance'
)
# per-instance limits
allocated_gb = 0
for i in instances:
allocated_gb += i['AllocatedStorage']
self.limits['Read replicas per master']._add_current_usage(
len(i['ReadReplicaDBInstanceIdentifiers']),
aws_type='AWS::RDS::DBInstance',
resource_id=i['DBInstanceIdentifier']
)
# overall storage quota
self.limits['Storage quota (GB)']._add_current_usage(
allocated_gb,
aws_type='AWS::RDS::DBInstance'
)
[docs] def _find_usage_reserved_instances(self):
"""find usage for reserved instances"""
reserved = boto_query_wrapper(
self.conn.describe_reserved_db_instances,
alc_marker_path=[
'DescribeReservedDBInstancesResponse',
'DescribeReservedDBInstancesResult',
"Marker"
],
alc_data_path=[
'DescribeReservedDBInstancesResponse',
'DescribeReservedDBInstancesResult',
'ReservedDBInstances'
],
alc_marker_param='marker'
)[
'DescribeReservedDBInstancesResponse'][
'DescribeReservedDBInstancesResult'][
'ReservedDBInstances']
self.limits['Reserved Instances']._add_current_usage(
len(reserved),
aws_type='AWS::RDS::DBInstance'
)
[docs] def _find_usage_snapshots(self):
"""find usage for (manual) DB snapshots"""
snaps = boto_query_wrapper(
self.conn.describe_db_snapshots,
alc_marker_path=[
"DescribeDBSnapshotsResponse",
"DescribeDBSnapshotsResult",
'Marker'
],
alc_data_path=[
"DescribeDBSnapshotsResponse",
"DescribeDBSnapshotsResult",
"DBSnapshots"
],
alc_marker_param='marker'
)
snaps = snaps[
"DescribeDBSnapshotsResponse"]["DescribeDBSnapshotsResult"][
"DBSnapshots"]
num_manual_snaps = 0
for snap in snaps:
if snap['SnapshotType'] == 'manual':
num_manual_snaps += 1
self.limits['DB snapshots per user']._add_current_usage(
num_manual_snaps,
aws_type='AWS::RDS::DBSnapshot'
)
[docs] def _find_usage_param_groups(self):
"""find usage for parameter groups"""
params = boto_query_wrapper(
self.conn.describe_db_parameter_groups,
alc_marker_path=[
"DescribeDBParameterGroupsResponse",
"DescribeDBParameterGroupsResult",
'Marker'
],
alc_data_path=[
"DescribeDBParameterGroupsResponse",
"DescribeDBParameterGroupsResult",
"DBParameterGroups"
],
alc_marker_param='marker'
)
params = params[
"DescribeDBParameterGroupsResponse"][
"DescribeDBParameterGroupsResult"][
"DBParameterGroups"]
self.limits['DB parameter groups']._add_current_usage(
len(params),
aws_type='AWS::RDS::DBParameterGroup'
)
[docs] def _find_usage_subnet_groups(self):
"""find usage for subnet groups"""
groups = boto_query_wrapper(
self.conn.describe_db_subnet_groups,
alc_marker_path=[
"DescribeDBSubnetGroupsResponse",
"DescribeDBSubnetGroupsResult",
"Marker"
],
alc_data_path=[
"DescribeDBSubnetGroupsResponse",
"DescribeDBSubnetGroupsResult",
"DBSubnetGroups"
],
alc_marker_param='marker'
)[
"DescribeDBSubnetGroupsResponse"][
"DescribeDBSubnetGroupsResult"][
"DBSubnetGroups"]
count = 0
for group in groups:
count += 1
self.limits['Subnets per Subnet Group']._add_current_usage(
len(group['Subnets']),
aws_type='AWS::RDS::DBSubnetGroup',
resource_id=group["DBSubnetGroupName"],
)
self.limits['Subnet Groups']._add_current_usage(
count,
aws_type='AWS::RDS::DBSubnetGroup',
)
[docs] def _find_usage_option_groups(self):
"""find usage for option groups"""
groups = boto_query_wrapper(
self.conn.describe_option_groups,
alc_marker_path=[
"DescribeOptionGroupsResponse",
"DescribeOptionGroupsResult",
"Marker"
],
alc_data_path=[
"DescribeOptionGroupsResponse",
"DescribeOptionGroupsResult",
"OptionGroupsList"
],
alc_marker_param='marker'
)[
"DescribeOptionGroupsResponse"][
"DescribeOptionGroupsResult"]["OptionGroupsList"]
self.limits['Option Groups']._add_current_usage(
len(groups),
aws_type='AWS::RDS::DBOptionGroup',
)
[docs] def _find_usage_event_subscriptions(self):
"""find usage for event subscriptions"""
subs = boto_query_wrapper(
self.conn.describe_event_subscriptions,
alc_marker_path=[
"DescribeEventSubscriptionsResponse",
"DescribeEventSubscriptionsResult",
"Marker"
],
alc_data_path=[
"DescribeEventSubscriptionsResponse",
"DescribeEventSubscriptionsResult",
"EventSubscriptionsList"
],
alc_marker_param='marker'
)[
"DescribeEventSubscriptionsResponse"][
"DescribeEventSubscriptionsResult"][
"EventSubscriptionsList"]
self.limits['Event Subscriptions']._add_current_usage(
len(subs),
aws_type='AWS::RDS::EventSubscription',
)
[docs] def _find_usage_security_groups(self):
"""find usage for security groups"""
groups = boto_query_wrapper(
self.conn.describe_db_security_groups,
alc_marker_path=[
"DescribeDBSecurityGroupsResponse",
"DescribeDBSecurityGroupsResult",
"Marker"
],
alc_data_path=[
"DescribeDBSecurityGroupsResponse",
"DescribeDBSecurityGroupsResult",
"DBSecurityGroups"
],
alc_marker_param='marker'
)[
"DescribeDBSecurityGroupsResponse"][
"DescribeDBSecurityGroupsResult"][
"DBSecurityGroups"]
vpc_count = 0
classic_count = 0
for group in groups:
if group['VpcId'] is None:
classic_count += 1
else:
vpc_count += 1
self.limits['Max auths per security group']._add_current_usage(
len(group["EC2SecurityGroups"]) + len(group["IPRanges"]),
aws_type='AWS::RDS::DBSecurityGroup',
resource_id=group['DBSecurityGroupName']
)
self.limits['DB security groups']._add_current_usage(
classic_count,
aws_type='AWS::RDS::DBSecurityGroup',
)
self.limits['VPC Security Groups']._add_current_usage(
vpc_count,
aws_type='AWS::RDS::DBSecurityGroup',
)
[docs] def get_limits(self):
"""
Return all known limits for this service, as a dict of their names
to :py:class:`~.AwsLimit` objects.
:returns: dict of limit names to :py:class:`~.AwsLimit` objects
:rtype: dict
"""
if self.limits != {}:
return self.limits
limits = {}
limits['DB instances'] = AwsLimit(
'DB instances',
self,
40,
self.warning_threshold,
self.critical_threshold,
limit_type='AWS::RDS::DBInstance',
)
limits['Reserved Instances'] = AwsLimit(
'Reserved Instances',
self,
40,
self.warning_threshold,
self.critical_threshold,
limit_type='AWS::RDS::DBInstance',
)
limits['Storage quota (GB)'] = AwsLimit(
'Storage quota (GB)',
self,
100000,
self.warning_threshold,
self.critical_threshold,
limit_type='AWS::RDS::DBInstance',
)
limits['DB snapshots per user'] = AwsLimit(
'DB snapshots per user',
self,
50,
self.warning_threshold,
self.critical_threshold,
limit_type='AWS::RDS::DBSnapshot',
)
limits['DB parameter groups'] = AwsLimit(
'DB parameter groups',
self,
50,
self.warning_threshold,
self.critical_threshold,
limit_type='AWS::RDS::DBParameterGroup',
)
limits['DB security groups'] = AwsLimit(
'DB security groups',
self,
25,
self.warning_threshold,
self.critical_threshold,
limit_type='AWS::RDS::DBSecurityGroup',
)
limits['VPC Security Groups'] = AwsLimit(
'VPC Security Groups',
self,
5,
self.warning_threshold,
self.critical_threshold,
limit_type='AWS::RDS::DBSecurityGroup',
)
limits['Subnet Groups'] = AwsLimit(
'Subnet Groups',
self,
20,
self.warning_threshold,
self.critical_threshold,
limit_type='AWS::RDS::DBSubnetGroup',
)
limits['Subnets per Subnet Group'] = AwsLimit(
'Subnets per Subnet Group',
self,
20,
self.warning_threshold,
self.critical_threshold,
limit_type='AWS::RDS::DBSubnetGroup',
)
limits['Option Groups'] = AwsLimit(
'Option Groups',
self,
20,
self.warning_threshold,
self.critical_threshold,
limit_type='AWS::RDS::DBOptionGroup',
)
limits['Event Subscriptions'] = AwsLimit(
'Event Subscriptions',
self,
20,
self.warning_threshold,
self.critical_threshold,
limit_type='AWS::RDS::DBEventSubscription',
)
limits['Read replicas per master'] = AwsLimit(
'Read replicas per master',
self,
5,
self.warning_threshold,
self.critical_threshold,
limit_type='AWS::RDS::DBInstance',
)
# this is the number of rules per security group
limits['Max auths per security group'] = AwsLimit(
'Max auths per security group',
self,
20,
self.warning_threshold,
self.critical_threshold,
limit_type='AWS::RDS::DBSecurityGroup',
limit_subtype='AWS::RDS::DBSecurityGroupIngress',
)
self.limits = limits
return limits
[docs] def required_iam_permissions(self):
"""
Return a list of IAM Actions required for this Service to function
properly. All Actions will be shown with an Effect of "Allow"
and a Resource of "*".
:returns: list of IAM Action strings
:rtype: list
"""
return [
"rds:DescribeDBInstances",
"rds:DescribeDBParameterGroups",
"rds:DescribeDBSecurityGroups",
"rds:DescribeDBSnapshots",
"rds:DescribeDBSubnetGroups",
"rds:DescribeEventSubscriptions",
"rds:DescribeOptionGroups",
"rds:DescribeReservedDBInstances",
]