""" Viewセキュリティオプション付与チェック用Lambda関数のエントリーポイント """ import json import botocore from aws.s3 import ConfigBucket from aws.sns import SNSNotifier from aws.ssm import SSMParameterStore from constants import (CHECK_TARGET_SCHEMAS, INFORMATION_SCHEMA_SECURITY_TYPE_INVOKER, MAIL_INDENT, RESPONSE_CODE_NO_SUCH_KEY, RESPONSE_CODE_PARAMETER_NOT_FOUND, RESPONSE_ERROR, RESPONSE_ERROR_CODE) from database import Database from dto.no_security_option_view import NoSecurityOptionView from environments import (CONFIG_BUCKET_NAME, MBJ_NOTICE_TOPIC, NDS_NOTICE_TOPIC, NOTICE_MAIL_BODY_TEMPLATE_PATH, NOTICE_MAIL_TITLE_TEMPLATE_PATH) from exceptions import (DatabaseConnectionException, FileNotFoundException, MeDaCaException, ParameterNotFoundException, QueryExecutionException, SNSPublishException) from medaca_logger import MeDaCaLogger logger = MeDaCaLogger.get_logger() def handler(event, context): try: # ① 処理開始ログを出力する logger.info('I-01-01', '処理開始 Viewセキュリティオプション付与チェック') # ② 設定ファイル[チェック対象スキーマ名ファイル]を読み込む logger.info('I-02-01', 'チェック対象スキーマ名ファイルを読み込み 開始') check_target_schemas = read_check_target_schemas() logger.info('I-02-02', f'チェック対象スキーマ名ファイルを読み込み 終了 チェック対象スキーマ名:{check_target_schemas}') # ③ データベースに接続する logger.info('I-03-01', 'データベースへの接続開始 開始') # DB接続のためのパラメータ取得 db_host, db_user_name, db_user_password = read_db_param_from_parameter_store() connection = connection_database(db_host, db_user_name, db_user_password) logger.info('I-03-02', 'データベースへの接続開始 成功') # ④ Viewのオプションを確認するため、データを取得する logger.info('I-04-01', 'Viewセキュリティオプション チェック開始') check_result = fetch_view_security_options(connection, check_target_schemas) if len(check_result) == 0: logger.info('I-04-02', 'Viewセキュリティオプション 未設定のViewはありません。処理を終了します。') return logger.info('I-05-01', 'Viewセキュリティオプション 未設定のViewがあるため、メール送信処理を開始します。') # ⑤ 取得できたデータをもとに、メール通知する文言を作成する no_security_option_views = [NoSecurityOptionView(*row) for row in check_result] logger.info( 'I-05-02', f'通知メール(タイトル)テンプレートファイル読込 読込元:{CONFIG_BUCKET_NAME}/{NOTICE_MAIL_TITLE_TEMPLATE_PATH}') mail_title = read_mail_title() logger.info( 'I-05-03', '通知メール(タイトル)テンプレートファイルを読み込みました') logger.info( 'I-05-04', f'通知メール(本文)テンプレートファイル読込 読込元:{CONFIG_BUCKET_NAME}/{NOTICE_MAIL_BODY_TEMPLATE_PATH}') mail_body_template = read_mail_body_template() logger.info( 'I-05-05', '通知メール(本文)テンプレートファイルを読み込みました') mail_body = make_notice_mail_body(no_security_option_views, mail_body_template) logger.info('I-05-06', f'メール送信指示をします 送信先トピック:{MBJ_NOTICE_TOPIC}') notice_to_mbj(mail_title, mail_body) logger.info('I-05-07', 'メール送信指示をしました') except MeDaCaException as e: logger.exception(e.error_id, e) logger.error('E-ERR-01', f'処理異常通知の送信指示をしました 通知先トピック:{NDS_NOTICE_TOPIC}') notice_to_nds(e.error_id, e) raise e except Exception as e: logger.exception('E-99', f'想定外のエラーが発生しました エラー内容:{e}') logger.error('E-ERR-01', f'処理異常通知の送信指示をしました 通知先トピック:{NDS_NOTICE_TOPIC}') notice_to_nds('E-99', e) raise e finally: # ⑥ 処理終了ログを出力する logger.info('I-06-01', '処理終了 Viewセキュリティオプション付与チェック') def read_check_target_schemas() -> list: """設定ファイル[チェック対象スキーマ名ファイル]を読み込む Raises: FileNotFoundException: ファイルが読み込めなかったエラー Exception: 想定外のエラー Returns: list: チェック対象のスキーマ名のリスト """ try: config_bucket = ConfigBucket() check_target_schema_names = config_bucket.check_target_schema_names return json.loads(check_target_schema_names)[CHECK_TARGET_SCHEMAS] except botocore.exceptions.ClientError as e: if e.response[RESPONSE_ERROR][RESPONSE_ERROR_CODE] == RESPONSE_CODE_NO_SUCH_KEY: raise FileNotFoundException('E-02-01', f'チェック対象スキーマ名ファイルの読み込みに失敗しました エラー内容:{e}') else: raise Exception(e) def read_db_param_from_parameter_store() -> tuple: """パラメータストアからDB接続情報を取得する Raises: ParameterNotFoundException: 指定されたパラメータが存在しないエラー Exception: 想定外のエラー Returns: tuple: DB接続情報 """ try: parameter_store = SSMParameterStore() db_host = parameter_store.db_host db_user_name = parameter_store.db_user_name db_user_password = parameter_store.db_user_password return db_host, db_user_name, db_user_password except botocore.exceptions.ClientError as e: if e.response[RESPONSE_ERROR][RESPONSE_ERROR_CODE] == RESPONSE_CODE_PARAMETER_NOT_FOUND: raise ParameterNotFoundException('E-03-01', f'パラメータストアの取得に失敗しました エラー内容:{e}') else: raise Exception(e) def connection_database(host: str, user_name: str, password: str) -> Database: """データベース接続 Args: host (str): DBホスト user_name (str): DBユーザー名 password (str): DBパスワード Raises: DatabaseConnectionException: データベースへの接続に失敗したエラー Returns: Database: データベース操作クラス """ try: database = Database(host, user_name, password) database.connect() return database except Exception as e: raise DatabaseConnectionException('E-03-02', f'データベースへの接続に失敗しました エラー内容:{e}') def fetch_view_security_options(connection: Database, check_target_schemas: list) -> tuple: """SECURITY INVOKERのついていないViewの一覧を取得する Args: connection (Database): 接続済みDB操作クラス check_target_schemas (str): チェック対象のスキーマ一覧 Raises: QueryExecutionException: クエリ実行エラー Returns: tuple: クエリ実行結果 """ select_view_security_option_sql = f""" SELECT TABLE_SCHEMA, TABLE_NAME FROM INFORMATION_SCHEMA.VIEWS WHERE TABLE_SCHEMA IN ( {','.join([f"'{schema_name}'" for schema_name in check_target_schemas])} ) AND SECURITY_TYPE <> '{INFORMATION_SCHEMA_SECURITY_TYPE_INVOKER}' """ try: with connection.query(select_view_security_option_sql) as cursor: result = cursor.fetchall() return result except Exception as e: raise QueryExecutionException('E-04-01', f'Viewセキュリティオプションチェックに失敗しました エラー内容:{e}') def make_notice_mail_body(no_security_option_views: list[NoSecurityOptionView], mail_body_template: str) -> tuple[str]: """メール本文を生成します Args: view_security_options (list[NoSecurityOptionView]): チェック対象のビュー一覧 mail_body_template (str): メール本文のテンプレート Returns: tuple[str]: メール本文 """ mail_message = MAIL_INDENT.join( [f'{option.schema_name}.{option.table_name}' for option in no_security_option_views]) mail_body = mail_body_template.format(no_option_views=mail_message) return mail_body def read_mail_title() -> str: """メールタイトルを読み込む Raises: FileNotFoundException: ファイルが読み込めなかったエラー Exception: 想定外のエラー Returns: str: メールタイトル """ try: config_bucket = ConfigBucket() mail_title = config_bucket.notice_mail_title_template except botocore.exceptions.ClientError as e: if e.response[RESPONSE_ERROR][RESPONSE_ERROR_CODE] == RESPONSE_CODE_NO_SUCH_KEY: raise FileNotFoundException('E-05-01', f'通知メール(タイトル)テンプレートファイルの読み込みに失敗しました エラー内容:{e}') else: raise Exception(e) return mail_title def read_mail_body_template() -> str: """メール本文を読み込む Raises: FileNotFoundException: ファイルが読み込めなかったエラー Exception: 想定外のエラー Returns: str: メール本文 """ try: config_bucket = ConfigBucket() mail_body_template = config_bucket.notice_mail_body_template except botocore.exceptions.ClientError as e: if e.response[RESPONSE_ERROR][RESPONSE_ERROR_CODE] == RESPONSE_CODE_NO_SUCH_KEY: raise FileNotFoundException('E-05-02', f'通知メール(本文)テンプレートファイルの読み込みに失敗しました エラー内容:{e}') else: raise Exception(e) return mail_body_template def notice_to_mbj(mail_title: str, mail_body: str) -> None: """MBJへ通知を行います Args: mail_title (str): メールタイトル mail_body (str): メール本文 Raises: SNSPublishException: SNSでの通知失敗した場合のエラー """ try: notifier = SNSNotifier() notifier.publish_to_mbj(mail_title, mail_body) except Exception as e: raise SNSPublishException('E-98', f'通知の送信指示に失敗しました エラー内容:{e}') def notice_to_nds(error_id: str, error_message: str) -> None: """NDSに処理以上通知を行う Args: error_id (str): エラーID error_message (str): エラーメッセージ Raises: SNSPublishException: SNSでの通知失敗した場合のエラー """ try: notifier = SNSNotifier() notifier.publish_to_nds(error_id, error_message) except Exception as e: raise SNSPublishException('E-98', f'通知の送信指示に失敗しました エラー内容:{e}') # ローカル実行用 if __name__ == '__main__': handler({}, {})