""" Viewセキュリティオプション付与チェック用Lambda関数のエントリーポイント """ import json import botocore from aws.s3 import ConfigBucket from aws.ssm import SSMParameterStore from constants import (CHECK_TARGET_SCHEMAS, INFORMATION_SCHEMA_SECURITY_TYPE_INVOKER, MAIL_INDENT, RESPONSE_CODE_NO_SUCH_KEY, RESPONSE_CODE_PARAMETER_NOT_FOUND, RESPONSE_ERROR, RESPONSE_ERROR_CODE) from database import Database from dto.view_secutiry_option import ViewSecurityOption from exceptions import (DatabaseConnectionException, FileNotFoundException, MeDaCaException, ParameterNotFoundException, QueryExecutionException) from medaca_logger import MeDaCaLogger logger = MeDaCaLogger.get_logger() def handler(event, context): try: logger.info('I-01-01', '処理開始 Viewセキュリティオプション付与チェック') logger.info('I-02-02', 'チェック対象スキーマ名ファイルを読み込み 開始') check_target_schemas = read_check_target_schemas() logger.info('I-02-02', f'チェック対象スキーマ名ファイルを読み込み 終了 チェック対象スキーマ名:{check_target_schemas}') # print(check_target_schemas) logger.info('I-03-01', 'データベースへの接続開始 開始') # DB接続のためのパラメータ取得 db_host, db_user_name, db_user_password = read_db_param_from_parameter_store() connection = connection_database(db_host, db_user_name, db_user_password) logger.info('I-03-01', 'データベースへの接続開始 終了') logger.info('I-04-01', 'Viewセキュリティオプション チェック開始') check_result = fetch_view_security_options(connection, check_target_schemas) if len(check_result) == 0: logger.info('I-04-02', 'Viewセキュリティオプション 未設定のViewはありません。処理を終了します。') return logger.info('I-04-01', 'Viewセキュリティオプション 未設定のViewがあるため、メール送信処理を開始します。') view_security_options = [ViewSecurityOption(*row) for row in check_result] mail_title, mail_body = make_notice_mail(view_security_options) print(mail_title, mail_body) except MeDaCaException as e: logger.exception(e.error_id, e) raise e except Exception as e: logger.exception('E-99', f'想定外のエラーが発生しました エラー内容:{e}') raise e finally: logger.info('I-06-01', '処理終了 Viewセキュリティオプション付与チェック') def read_check_target_schemas() -> list: """設定ファイル[チェック対象スキーマ名ファイル]を読み込む Raises: FileNotFoundException: ファイルが読み込めなかったエラー Exception: 想定外のエラー Returns: list: チェック対象のスキーマ名のリスト """ try: config_bucket = ConfigBucket() check_target_schema_names = config_bucket.check_target_schema_names return json.loads(check_target_schema_names)[CHECK_TARGET_SCHEMAS] except botocore.exceptions.ClientError as e: if e.response[RESPONSE_ERROR][RESPONSE_ERROR_CODE] == RESPONSE_CODE_NO_SUCH_KEY: raise FileNotFoundException('E-02-01', f'チェック対象スキーマ名ファイルの読み込みに失敗しました エラー内容:{e}') else: raise Exception(e) def read_db_param_from_parameter_store() -> tuple: """パラメータストアからDB接続情報を取得する Raises: ParameterNotFoundException: 指定されたパラメータが存在しないエラー Exception: 想定外のエラー Returns: tuple: DB接続情報 """ try: parameter_store = SSMParameterStore() db_host = parameter_store.db_host db_user_name = parameter_store.db_user_name db_user_password = parameter_store.db_user_password return db_host, db_user_name, db_user_password except botocore.exceptions.ClientError as e: if e.response[RESPONSE_ERROR][RESPONSE_ERROR_CODE] == RESPONSE_CODE_PARAMETER_NOT_FOUND: raise ParameterNotFoundException('E-03-02', f'パラメータストアの取得に失敗しました エラー内容:{e}') else: raise Exception(e) def connection_database(host: str, user_name: str, password: str) -> Database: """データベース接続 Args: host (str): DBホスト user_name (str): DBユーザー名 password (str): DBパスワード Raises: DatabaseConnectionException: データベースへの接続に失敗したエラー Returns: Database: データベース操作クラス """ try: database = Database(host, user_name, password) database.connect() return database except Exception as e: raise DatabaseConnectionException('E-03-02', f'データベースへの接続に失敗しました エラー内容:{e}') def fetch_view_security_options(connection: Database, check_target_schemas: list) -> tuple: """SECURITY INVOKERのついていないViewの一覧を取得する Args: connection (Database): 接続済みDB操作クラス check_target_schemas (str): チェック対象のスキーマ一覧 Raises: QueryExecutionException: クエリ実行エラー Returns: tuple: クエリ実行結果 """ select_view_security_option_sql = f""" SELECT TABLE_SCHEMA, TABLE_NAME FROM INFORMATION_SCHEMA.VIEWS WHERE TABLE_SCHEMA IN ( {','.join([f"'{schema_name}'" for schema_name in check_target_schemas])} ) AND SECURITY_TYPE <> '{INFORMATION_SCHEMA_SECURITY_TYPE_INVOKER}' """ try: with connection.query(select_view_security_option_sql) as cursor: result = cursor.fetchall() return result except Exception as e: raise QueryExecutionException('E-03-02', f'Viewセキュリティオプションチェックに失敗しました エラー内容:{e}') def make_notice_mail(view_security_options: list[ViewSecurityOption]): config_bucket = ConfigBucket() logger.info( 'I-05-02', f'通知メール(タイトル)テンプレートファイル読込 読込元:{config_bucket.bucket_name}/{config_bucket.mail_title_file_path}') mail_title_template = read_mail_title(config_bucket) logger.info( 'I-05-02', f'通知メール(本文)テンプレートファイル読込 読込元:{config_bucket.bucket_name}/{config_bucket.mail_body_file_path}') mail_body_template = read_mail_body_template(config_bucket) mail_message = MAIL_INDENT.join([f'{option.schema_name}.{option.table_name}' for option in view_security_options]) mail_body = mail_body_template.format(no_option_views=mail_message) return mail_title_template, mail_body def read_mail_title(config_bucket: ConfigBucket): """メールタイトルを読み込む Args: config_bucket (ConfigBucket): 設定ファイル保管バケット操作クラス Raises: FileNotFoundException: ファイルが読み込めなかったエラー Exception: 想定外のエラー Returns: str: メールタイトル """ try: mail_title = config_bucket.notice_mail_title_template except botocore.exceptions.ClientError as e: if e.response[RESPONSE_ERROR][RESPONSE_ERROR_CODE] == RESPONSE_CODE_NO_SUCH_KEY: raise FileNotFoundException('E-02-01', f'通知メール(タイトル)テンプレートファイルの読み込みに失敗しました エラー内容:{e}') else: raise Exception(e) return mail_title def read_mail_body_template(config_bucket: ConfigBucket): """メール本文を読み込む Args: config_bucket (ConfigBucket): 設定ファイル保管バケット操作クラス Raises: FileNotFoundException: ファイルが読み込めなかったエラー Exception: 想定外のエラー Returns: str: メール本文 """ try: mail_body_template = config_bucket.notice_mail_body_template except botocore.exceptions.ClientError as e: if e.response[RESPONSE_ERROR][RESPONSE_ERROR_CODE] == RESPONSE_CODE_NO_SUCH_KEY: raise FileNotFoundException('E-02-01', f'通知メール(本文)テンプレートファイルの読み込みに失敗しました エラー内容:{e}') else: raise Exception(e) return mail_body_template # ローカル実行用 if __name__ == '__main__': handler({}, {})