2022-07-12 15:55:59 +09:00

290 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Viewセキュリティオプション付与チェック用Lambda関数のエントリーポイント
"""
import botocore
from aws.s3 import ConfigBucket
from aws.sns import SNSNotifier
from aws.ssm import SSMParameterStore
from constants import (CHECK_TARGET_SCHEMAS,
INFORMATION_SCHEMA_SECURITY_TYPE_INVOKER, MAIL_INDENT,
RESPONSE_CODE_NO_SUCH_KEY,
RESPONSE_CODE_PARAMETER_NOT_FOUND, RESPONSE_ERROR,
RESPONSE_ERROR_CODE)
from database import Database
from dto.no_security_option_view import NoSecurityOptionView
from environments import (CONFIG_BUCKET_NAME, MBJ_NOTICE_TOPIC,
NDS_NOTICE_TOPIC, NOTICE_MAIL_BODY_TEMPLATE_PATH,
NOTICE_MAIL_TITLE_TEMPLATE_PATH)
from exceptions import (DatabaseConnectionException, FileNotFoundException,
JSONParseException, MeDaCaException,
ParameterNotFoundException, QueryExecutionException,
SNSPublishException)
from json_perser import JSONParser
from medaca_logger import MeDaCaLogger
logger = MeDaCaLogger.get_logger()
def handler(event, context):
try:
# ① 処理開始ログを出力する
logger.info('I-01-01', '処理開始 Viewセキュリティオプション付与チェック')
# ② 設定ファイル[チェック対象スキーマ名ファイル]を読み込む
logger.info('I-02-01', 'チェック対象スキーマ名ファイルを読み込み 開始')
check_target_schemas = read_check_target_schemas()
logger.info('I-02-02', f'チェック対象スキーマ名ファイルを読み込み 終了 チェック対象スキーマ名:{check_target_schemas}')
# ③ データベースに接続する
logger.info('I-03-01', 'データベースへの接続開始 開始')
# DB接続のためのパラメータ取得
db_host, db_user_name, db_user_password = read_db_param_from_parameter_store()
connection = connection_database(db_host, db_user_name, db_user_password)
logger.info('I-03-02', 'データベースへの接続開始 成功')
# ④ Viewのオプションを確認するため、データを取得する
logger.info('I-04-01', 'Viewセキュリティオプション チェック開始')
check_result = fetch_view_security_options(connection, check_target_schemas)
logger.debug('D-04-01', f'取得データ:{check_result}')
if len(check_result) == 0:
logger.info('I-04-02', 'Viewセキュリティオプション 未設定のViewはありません。処理を終了します。')
return
logger.info('I-05-01', 'Viewセキュリティオプション 未設定のViewがあるため、メール送信処理を開始します。')
# ⑤ 取得できたデータをもとに、メール通知する文言を作成する
no_security_option_views = [NoSecurityOptionView(*row) for row in check_result]
logger.info(
'I-05-02', f'通知メール(タイトル)テンプレートファイル読込 読込元:{CONFIG_BUCKET_NAME}/{NOTICE_MAIL_TITLE_TEMPLATE_PATH}')
mail_title = read_mail_title()
logger.info(
'I-05-03', '通知メール(タイトル)テンプレートファイルを読み込みました')
logger.info(
'I-05-04', f'通知メール(本文)テンプレートファイル読込 読込元:{CONFIG_BUCKET_NAME}/{NOTICE_MAIL_BODY_TEMPLATE_PATH}')
mail_body_template = read_mail_body_template()
logger.info(
'I-05-05', '通知メール(本文)テンプレートファイルを読み込みました')
mail_body = make_notice_mail_body(no_security_option_views, mail_body_template)
logger.info('I-05-06', f'メール送信指示をします 送信先トピック:{MBJ_NOTICE_TOPIC}')
notice_to_mbj(mail_title, mail_body)
logger.info('I-05-07', 'メール送信指示をしました')
except MeDaCaException as e:
logger.exception(e.error_id, e)
logger.error('E-ERR-01', f'処理異常通知の送信指示をしました 通知先トピック:{NDS_NOTICE_TOPIC}')
notice_to_nds(e.error_id, e)
raise e
except Exception as e:
logger.exception('E-99', f'想定外のエラーが発生しました エラー内容:{e}')
logger.error('E-ERR-01', f'処理異常通知の送信指示をしました 通知先トピック:{NDS_NOTICE_TOPIC}')
notice_to_nds('E-99', e)
raise e
finally:
# ⑥ 処理終了ログを出力する
logger.info('I-06-01', '処理終了 Viewセキュリティオプション付与チェック')
def read_check_target_schemas() -> list:
"""設定ファイル[チェック対象スキーマ名ファイル]を読み込む
Raises:
FileNotFoundException: ファイルが読み込めなかったエラー
JSONParseException: JSONを辞書オブジェクトに変換できなかったエラー
Exception: 想定外のエラー
Returns:
list: チェック対象のスキーマ名のリスト
"""
try:
config_bucket = ConfigBucket()
check_target_schema_names = config_bucket.check_target_schema_names
except botocore.exceptions.ClientError as e:
if e.response[RESPONSE_ERROR][RESPONSE_ERROR_CODE] == RESPONSE_CODE_NO_SUCH_KEY:
raise FileNotFoundException('E-02-01', f'チェック対象スキーマ名ファイルの読み込みに失敗しました エラー内容:{e}')
else:
raise Exception(e)
try:
json_parser = JSONParser(check_target_schema_names)
check_target_schemas = json_parser.parse()[CHECK_TARGET_SCHEMAS]
except Exception as e:
raise JSONParseException('E-02-01', f'チェック対象スキーマ名ファイルの読み込みに失敗しました エラー内容:{e}')
return check_target_schemas
def read_db_param_from_parameter_store() -> tuple:
"""パラメータストアからDB接続情報を取得する
Raises:
ParameterNotFoundException: 指定されたパラメータが存在しないエラー
Exception: 想定外のエラー
Returns:
tuple: DB接続情報
"""
try:
parameter_store = SSMParameterStore()
db_host = parameter_store.db_host
db_user_name = parameter_store.db_user_name
db_user_password = parameter_store.db_user_password
return db_host, db_user_name, db_user_password
except botocore.exceptions.ClientError as e:
if e.response[RESPONSE_ERROR][RESPONSE_ERROR_CODE] == RESPONSE_CODE_PARAMETER_NOT_FOUND:
raise ParameterNotFoundException('E-03-01', f'パラメータストアの取得に失敗しました エラー内容:{e}')
else:
raise Exception(e)
def connection_database(host: str, user_name: str, password: str) -> Database:
"""データベース接続
Args:
host (str): DBホスト
user_name (str): DBユーザー名
password (str): DBパスワード
Raises:
DatabaseConnectionException: データベースへの接続に失敗したエラー
Returns:
Database: データベース操作クラス
"""
try:
database = Database(host, user_name, password)
database.connect()
return database
except Exception as e:
raise DatabaseConnectionException('E-03-02', f'データベースへの接続に失敗しました エラー内容:{e}')
def fetch_view_security_options(connection: Database, check_target_schemas: list) -> tuple:
"""SECURITY INVOKERのついていないViewの一覧を取得する
Args:
connection (Database): 接続済みDB操作クラス
check_target_schemas (str): チェック対象のスキーマ一覧
Raises:
QueryExecutionException: クエリ実行エラー
Returns:
tuple: クエリ実行結果
"""
select_view_security_option_sql = f"""
SELECT
TABLE_SCHEMA,
TABLE_NAME,
DEFINER
FROM
INFORMATION_SCHEMA.VIEWS
WHERE
TABLE_SCHEMA IN (
{','.join([f"'{schema_name}'" for schema_name in check_target_schemas])}
)
AND SECURITY_TYPE <> '{INFORMATION_SCHEMA_SECURITY_TYPE_INVOKER}'
"""
try:
with connection.query(select_view_security_option_sql) as cursor:
result = cursor.fetchall()
return result
except Exception as e:
raise QueryExecutionException('E-04-01', f'Viewセキュリティオプションチェックに失敗しました エラー内容{e}')
def make_notice_mail_body(no_security_option_views: list[NoSecurityOptionView], mail_body_template: str) -> tuple[str]:
"""メール本文を生成します
Args:
view_security_options (list[NoSecurityOptionView]): チェック対象のビュー一覧
mail_body_template (str): メール本文のテンプレート
Returns:
tuple[str]: メール本文
"""
mail_message = MAIL_INDENT.join(
[f'{option.schema_name}.{option.table_name}' for option in no_security_option_views])
mail_body = mail_body_template.format(no_option_views=mail_message)
return mail_body
def read_mail_title() -> str:
"""メールタイトルを読み込む
Raises:
FileNotFoundException: ファイルが読み込めなかったエラー
Exception: 想定外のエラー
Returns:
str: メールタイトル
"""
try:
config_bucket = ConfigBucket()
mail_title = config_bucket.notice_mail_title_template
except botocore.exceptions.ClientError as e:
if e.response[RESPONSE_ERROR][RESPONSE_ERROR_CODE] == RESPONSE_CODE_NO_SUCH_KEY:
raise FileNotFoundException('E-05-01', f'通知メール(タイトル)テンプレートファイルの読み込みに失敗しました エラー内容:{e}')
else:
raise Exception(e)
return mail_title
def read_mail_body_template() -> str:
"""メール本文を読み込む
Raises:
FileNotFoundException: ファイルが読み込めなかったエラー
Exception: 想定外のエラー
Returns:
str: メール本文
"""
try:
config_bucket = ConfigBucket()
mail_body_template = config_bucket.notice_mail_body_template
except botocore.exceptions.ClientError as e:
if e.response[RESPONSE_ERROR][RESPONSE_ERROR_CODE] == RESPONSE_CODE_NO_SUCH_KEY:
raise FileNotFoundException('E-05-02', f'通知メール(本文)テンプレートファイルの読み込みに失敗しました エラー内容:{e}')
else:
raise Exception(e)
return mail_body_template
def notice_to_mbj(mail_title: str, mail_body: str) -> None:
"""MBJへ通知を行います
Args:
mail_title (str): メールタイトル
mail_body (str): メール本文
Raises:
SNSPublishException: SNSでの通知失敗した場合のエラー
"""
try:
notifier = SNSNotifier()
notifier.publish_to_mbj(mail_title, mail_body)
except Exception as e:
raise SNSPublishException('E-98', f'通知の送信指示に失敗しました エラー内容:{e}')
def notice_to_nds(error_id: str, error_message: str) -> None:
"""NDSに処理以上通知を行う
Args:
error_id (str): エラーID
error_message (str): エラーメッセージ
Raises:
SNSPublishException: SNSでの通知失敗した場合のエラー
"""
try:
notifier = SNSNotifier()
notifier.publish_to_nds(error_id, error_message)
except Exception as e:
raise SNSPublishException('E-98', f'通知の送信指示に失敗しました エラー内容:{e}')
# ローカル実行用
if __name__ == '__main__':
handler({}, {})