Source code for controltowerlib.resources.resources

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: __init__.py
#
# Copyright 2020 Costas Tyfoxylos
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to
#  deal in the Software without restriction, including without limitation the
#  rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
#  sell copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
#  all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
#  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
#  DEALINGS IN THE SOFTWARE.
#

"""
resources module.

Import all parts from resources here

.. _Google Python Style Guide:
   http://google.github.io/styleguide/pyguide.html
"""

from time import sleep

from awsauthenticationlib.awsauthenticationlib import LoggerMixin

from controltowerlib.controltowerlibexceptions import (NonExistentSCP,
                                                       ControlTowerBusy,
                                                       NoSuspendedOU)

__author__ = '''Costas Tyfoxylos <ctyfoxylos@schubergphilis.com>'''
__docformat__ = '''google'''
__date__ = '''18-02-2020'''
__copyright__ = '''Copyright 2020, Costas Tyfoxylos'''
__license__ = '''MIT'''
__maintainer__ = '''Costas Tyfoxylos'''
__email__ = '''<ctyfoxylos@schubergphilis.com>'''
__status__ = '''Development'''  # "Prototype", "Development", "Production".


[docs]class AccountFactory: # pylint: disable=too-few-public-methods, too-many-instance-attributes """Models the account factory data of service catalog.""" def __init__(self, service_catalog_client, data): self._service_catalog = service_catalog_client self._data = data self.has_default_path = self._data.get('HasDefaultPath') self.id = self._data.get('Id') # pylint: disable=invalid-name self.name = self._data.get('Name') self.owner = self._data.get('Owner') self.product_id = self._data.get('ProductId') self.short_description = self._data.get('ShortDescription') self.type = self._data.get('Type')
[docs]class ServiceControlPolicy: """Models the account factory data of service catalog.""" def __init__(self, data): self._data = data @property def arn(self): """Arn.""" return self._data.get('Arn') @property def aws_managed(self): """Aws Managed.""" return self._data.get('AwsManaged') @property def description(self): """Description.""" return self._data.get('Description') @property def id(self): # pylint: disable=invalid-name """Id.""" return self._data.get('Id') @property def name(self): """Name.""" return self._data.get('Name') @property def type(self): """Type.""" return self._data.get('Type')
[docs]class GuardRail(LoggerMixin): """Models the guard rail data.""" def __init__(self, control_tower, data): self.control_tower = control_tower self._data_ = data @property def _data(self): """The data of the guard rail as returned by the api.""" return self._data_ @property def behavior(self): """Behavior.""" return self._data_.get('Behavior') @property def category(self): """Category.""" return self._data_.get('Category') @property def description(self): """Description.""" return self._data_.get('Description') @property def display_name(self): """DisplayName.""" return self._data_.get('DisplayName') @property def name(self): """Name.""" return self._data_.get('Name') @property def provider(self): """Provider.""" return self._data_.get('Provider') @property def regional_preference(self): """Regional preference.""" return self._data_.get('RegionalPreference') @property def type(self): """Type.""" return self._data_.get('Type') @property def compliancy_status(self): """Compliancy status.""" payload = self.control_tower._get_api_payload(content_string={'GuardrailName': self.name}, # pylint: disable=protected-access target='getGuardrailComplianceStatus') self.logger.debug('Trying to get the compliancy status with payload "%s"', payload) response = self.control_tower.session.post(self.control_tower.url, json=payload) if not response.ok: self.logger.error('Failed to get the drift message of the landing zone with response status "%s" and ' 'response text "%s"', response.status_code, response.text) return None return response.json().get('ComplianceStatus')
[docs]class CoreAccount: """Models the core landing zone account data.""" def __init__(self, control_tower, account_label, data): self.control_tower = control_tower self._label = account_label self._data_ = data @property def _data(self): """The data of the account as returned by the api.""" return self._data_ @property def label(self): """Account label.""" return self._label @property def email(self): """Email.""" return self._data_.get('AccountEmail') @property def id(self): # pylint: disable=invalid-name """Id.""" return self._data_.get('AccountId') @property def core_resource_mappings(self): """Core resource mappings.""" return self._data_.get('CoreResourceMappings') @property def stack_set_arn(self): """Stack set arn.""" return self._data_.get('StackSetARN')
[docs]class ControlTowerAccount(LoggerMixin): # pylint: disable=too-many-public-methods """Models the account data.""" def __init__(self, control_tower, data, info_polling_interval=30): self.control_tower = control_tower self.service_catalog = control_tower.service_catalog self.organizations = control_tower.organizations self._data_ = data self._service_catalog_data_ = None self._record_data_ = None self._info_polling_interval = info_polling_interval @property def _data(self): """The data of the account as returned by the api.""" return self._data_ @property def _service_catalog_data(self): if self._service_catalog_data_ is None: data = self.service_catalog.search_provisioned_products(Filters={'SearchQuery': [f'physicalId:{self.id}']}) if not data.get('TotalResultsCount'): self._service_catalog_data_ = {} else: self._service_catalog_data_ = data.get('ProvisionedProducts', [{}]).pop() return self._service_catalog_data_ @property def _record_data(self): if self._record_data_ is None: if not self.last_record_id: self._record_data_ = {} else: self._record_data_ = self.service_catalog.describe_record(Id=self.last_record_id) return self._record_data_ @property def email(self): """Email.""" return self._data_.get('AccountEmail') @property def id(self): # pylint: disable=invalid-name """Id.""" return self._data_.get('AccountId') @property def name(self): """Name.""" return self._data_.get('AccountName') @property def arn(self): """Arn.""" return self._data_.get('Arn') @property def owner(self): """Owner.""" return self._data_.get('Owner') @property def provision_state(self): """Provision state.""" return self._data_.get('ProvisionState') @property def status(self): """Status.""" return self._data_.get('Status') @property def landing_zone_version(self): """Landing zone version.""" return self._data_.get('DeployedLandingZoneVersion') @property def has_available_update(self): """If the account is behind the landing zone version.""" return float(self.landing_zone_version) < float(self.control_tower.landing_zone_version) @property def guardrail_compliance_status(self): """Retrieves the guardrail compliancy status for the account. Returns: status (str): COMPLIANT|NON COMPLIANT """ payload = self.control_tower._get_api_payload(content_string={'AccountId': self.id}, # pylint: disable=protected-access target='getGuardrailComplianceStatus') response = self.control_tower.session.post(self.control_tower.url, json=payload) if not response.ok: self.logger.error('Failed to get compliancy status from api.') return False return response.json().get('ComplianceStatus') @property def organizational_unit(self): """Organizational Unit.""" return self.control_tower.get_organizational_unit_by_id(self._data_.get('ParentOrganizationalUnitId')) @property def stack_arn(self): """Stack Arn.""" return self._service_catalog_data.get('Arn') @property def created_time(self): """Created Time.""" return self._service_catalog_data.get('CreatedTime') @property def service_catalog_id(self): """Service Catalog ID.""" return self._service_catalog_data.get('Id') @property def idempotency_token(self): """Idempotency Token.""" return self._service_catalog_data.get('IdempotencyToken') @property def last_record_id(self): """Last Record ID.""" return self._service_catalog_data.get('LastRecordId') @property def physical_id(self): """Physical ID.""" return self._service_catalog_data.get('PhysicalId') @property def service_catalog_product_id(self): """Service catalog product ID.""" return self._service_catalog_data.get('ProductId') @property def provisioning_artifact_id(self): """Provisioning artifact ID.""" return self._service_catalog_data.get('ProvisioningArtifactId') @property def service_catalog_tags(self): """Service catalog tags.""" return self._service_catalog_data.get('Tags') @property def service_catalog_type(self): """Service catalog type.""" return self._service_catalog_data.get('Type') @property def service_catalog_status(self): """Service catalog status.""" return self._service_catalog_data.get('Status') @property def service_catalog_user_arn(self): """Service catalog user arn.""" return self._service_catalog_data.get('UserArn') @property def user_arn_session(self): """User arn session.""" return self._service_catalog_data.get('UserArnSession') def _refresh(self): self._data_ = self.control_tower.get_account_by_id(self.id)._data # pylint: disable=protected-access self._record_data_ = None self._service_catalog_data_ = None def _get_record_entry(self, output_key): return next((entry for entry in self._record_data.get('RecordOutputs', []) if entry.get('OutputKey', '') == output_key), {}) @property def sso_user_email(self): """SSO user email.""" return self._get_record_entry(output_key='SSOUserEmail').get('OutputValue') @property def sso_user_portal(self): """SSO user portal.""" return self._get_record_entry(output_key='SSOUserPortal').get('OutputValue')
[docs] def detach_service_control_policy(self, name): """Detaches a Service Control Policy from the account. Args: name (str): The name of the SCP to detach Returns: result (bool): True on success, False otherwise. """ return self._action_service_control_policy('detach', name)
[docs] def attach_service_control_policy(self, name): """Attaches a Service Control Policy to the account. Args: name (str): The name of the SCP to attach Returns: result (bool): True on success, False otherwise. """ return self._action_service_control_policy('attach', name)
def _action_service_control_policy(self, action, scp_name): scp = self.control_tower.get_service_control_policy_by_name(scp_name) if not scp: raise NonExistentSCP(scp_name) response = getattr(self.organizations, f'{action}_policy')(PolicyId=scp.id, TargetId=self.id) if not response.get('ResponseMetadata', {}).get('HTTPStatusCode') == 200: self.logger.error('Failed to %s SCP "%s" to account with response "%s"', action, scp.name, response) return False self.logger.debug('Successfully %sed SCP "%s" to account', action, scp.name) return True def _terminate(self): """Terminates an account that is in error. Returns: response (dict): The response from the api of the termination request. """ return self.service_catalog.terminate_provisioned_product(ProvisionedProductId=self.service_catalog_id)
[docs] def delete(self, suspended_ou_name=None): """Delete.""" if not suspended_ou_name: return self._terminate() suspended_ou = self.control_tower.get_organizational_unit_by_name(suspended_ou_name) if not suspended_ou: raise NoSuspendedOU(suspended_ou_name) self._terminate() while self.control_tower.busy: self.logger.debug('Waiting for control tower to terminate the account...') sleep(self._info_polling_interval) self.logger.debug('Moving account from root OU to %s', suspended_ou_name) self.organizations.move_account(AccountId=self.id, SourceParentId=self.control_tower.root_ou.id, DestinationParentId=suspended_ou.id) self.logger.debug('Attaching SCP %s to account', suspended_ou_name) self.attach_service_control_policy(suspended_ou_name) self.logger.debug('Detaching full access SCP from account') self.detach_service_control_policy('FullAWSAccess') return True
[docs] def update(self): """Updates the account in service catalog. Returns: True if the call succeeded False otherwise """ if not self.has_available_update: return True if self.control_tower.busy: raise ControlTowerBusy arguments = {'ProductId': self.control_tower._account_factory.product_id, # pylint: disable=protected-access 'ProvisionedProductName': self.name, 'ProvisioningArtifactId': self.control_tower._active_artifact.get('Id'), # pylint: disable=protected-access 'ProvisioningParameters': [{'Key': 'AccountName', 'Value': self.name, 'UsePreviousValue': True}, {'Key': 'AccountEmail', 'Value': self.email, 'UsePreviousValue': True}, {'Key': 'SSOUserFirstName', 'Value': 'Control', 'UsePreviousValue': True}, {'Key': 'SSOUserLastName', 'Value': 'Tower', 'UsePreviousValue': True}, {'Key': 'SSOUserEmail', 'Value': self.email, 'UsePreviousValue': True}, {'Key': 'ManagedOrganizationalUnit', 'Value': self.organizational_unit.name, 'UsePreviousValue': True}]} response = self.service_catalog.update_provisioned_product(**arguments) return response.get('ResponseMetadata', {}).get('HTTPStatusCode') == 200
[docs]class OrganizationsOU: """Model the data of an Organizations managed OU.""" def __init__(self, data): self._data = data @property def id(self): # pylint: disable=invalid-name """The id of the OU.""" return self._data.get('Id') @property def name(self): """The name of the OU.""" return self._data.get('Name') @property def arn(self): """The arn of the OU.""" return self._data.get('Arn')
[docs]class ControlTowerOU: """Model the data of a Control Tower managed OU.""" def __init__(self, control_tower, data): self.control_tower = control_tower self._data = data @property def create_date(self): """The date the ou was created in timestamp.""" return self._data.get('CreateDate') @property def id(self): # pylint: disable=invalid-name """OU ID.""" return self._data.get('OrganizationalUnitId') @property def name(self): """The name of the OU.""" return self._data.get('OrganizationalUnitName') @property def type(self): """The type of the OU.""" return self._data.get('OrganizationalUnitType') @property def parent_ou_id(self): """The id of the parent OU.""" return self._data.get('ParentOrganizationalUnitId') @property def parent_ou_name(self): """The name of the parent OU.""" return self._data.get('ParentOrganizationalUnitName')
[docs] def delete(self): """Deletes the ou. Returns: response (bool): True on success, False otherwise. """ return self.control_tower.delete_organizational_unit(self.name)