import datetime import logging import os from zoneinfo import ZoneInfo import boto3 import pyzipper from pyzipper.zipfile import BadZipFile # 環境変数 DATA_IMPORT_BUCKET = os.environ["DATA_IMPORT_BUCKET"] HCP_WEB_TARGET_FOLDER = os.environ["HCP_WEB_TARGET_FOLDER"] HCP_WEB_BACKUP_BUCKET = os.environ["HCP_WEB_BACKUP_BUCKET"] BACKUP_ZIPFILE_FOLDER = os.environ["BACKUP_ZIPFILE_FOLDER"] BACKUP_DATA_IMPORT_FOLDER = os.environ["BACKUP_DATA_IMPORT_FOLDER"] DATA_IMPORT_FILENAME = os.environ["DATA_IMPORT_FILENAME"] MEDPASS_ZIP_PASSWORD_PARAMETER_STORE_KEY = os.environ["MEDPASS_ZIP_PASSWORD_PARAMETER_STORE_KEY"] LOG_LEVEL = os.environ["LOG_LEVEL"] TZ = os.environ["TZ"] # 定数 # 多重起動抑制用のコントロールファイルの拡張子 EXCLUSIVE_CONTROL_FILE_EXT = '.doing' # tmpフォルダパス PATH_TMP = '/tmp' # 拡張子 ZIP_FILE_EXT = 'zip' CSV_FILE_EXT = 'csv' # S3クライアント s3_client = boto3.client('s3') # SystemsManagerクライアント ssm_client = boto3.client('ssm') # logger設定 logger = logging.getLogger() def log_datetime_convert_tz(*arg): """ログに出力するタイムスタンプのロケールを変更する(JST指定)""" return datetime.datetime.now(ZoneInfo(TZ)).timetuple() formatter = logging.Formatter( '[%(levelname)s]\t%(asctime)s\t%(message)s\n', '%Y-%m-%d %H:%M:%S' ) formatter.converter = log_datetime_convert_tz for handler in logger.handlers: handler.setFormatter(formatter) level = logging.getLevelName(LOG_LEVEL) if not isinstance(level, int): level = logging.INFO logger.setLevel(level) def extract_zip_with_password(zip_filepath: str, extract_to_folder: str, password: str) -> os.path: """ 暗号化ZIPを解凍する。 :param zip_filepath: ZIPファイルが保管されているフォルダパス :param extract_to_folder: ZIPファイルの解凍先フォルダ :param password: ZIPパスワード :return 解凍されたファイルパス """ # ZIPを解凍 try: with pyzipper.AESZipFile(zip_filepath) as z: # ZIP内のファイルは1つのみ inner_filename = z.filelist[0].filename z.extractall(path=extract_to_folder, pwd=password.encode()) except Exception as e: raise e return os.path.join(extract_to_folder, inner_filename) def get_s3_event_parameter(event: dict) -> tuple[str, str, str, str]: s3_event = event["Records"][0]["s3"] event_bucket_name: str = s3_event["bucket"]["name"] event_object_key: str = s3_event["object"]["key"] event_file_name: str = os.path.basename(event_object_key) event_folder_name: str = os.path.dirname(event_object_key).split('/')[0] return event_bucket_name, event_object_key, event_file_name, event_folder_name def get_ssm_params(parameter_key: str, with_decryption: bool = True) -> str: """SSMパラメータストアから指定されたパラメータ名の値を取得する""" response = ssm_client.get_parameter( Name=parameter_key, WithDecryption=with_decryption) parameter_value: str = response['Parameter']['Value'] return parameter_value def delete_doing_file(event: dict) -> None: """.doingファイルをバケット上から削除する""" # イベント情報を取得 ( event_bucket_name, event_object_key, _, _ ) = get_s3_event_parameter(event) # ⑨ メモリに保持したバケット名/フォルダ名内の「受信データファイル名.doing」ファイルを削除する s3_client.delete_object( Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}') def handler(event, context) -> None: try: # ① 処理開始ログを出力する logger.info('I-01-01 処理開始 medパスデータ解凍・復号化・転送処理') # ② 処理開始時に受け取ったイベント情報をログに出力する # バケット名・フォルダ名・受信データファイル名をメモリに保持 ( event_bucket_name, event_object_key, event_file_name, event_folder_name ) = get_s3_event_parameter(event) logger.info(f'I-02-01 受信バケット:{event_bucket_name}') logger.info(f'I-02-01 フォルダ名:{event_folder_name}') logger.info(f'I-02-01 ファイル名:{event_file_name}') # ③ S3イベントによるLambdaの重複発火防止の為、メモリに保持したバケット名/フォルダ名内に、「受信データファイル名.doing」ファイルが存在するかチェックする try: s3_client.head_object( Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}') logger.error( f'E-01-01 {event_bucket_name}/{event_object_key}は現在処理中です。処理を終了します。') return except Exception: # .doingファイルが見つからなかった場合は、処理を続行する # メモリに保持したバケット名/フォルダ名内に、「受信データファイル名.doing」ファイルを作成する logger.info('I-03-01 medパスデータの解凍・復号化・転送を開始します') s3_client.put_object( Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}', Body=b'') # ④ S3から暗号化ZIPファイルを読み込む try: logger.info( f'I-04-01 暗号化ZIPファイル読込 読込元:{event_bucket_name}/{event_object_key}') s3_client.download_file( event_bucket_name, event_object_key, os.path.join(PATH_TMP, event_file_name)) logger.info('I-04-02 暗号化ZIPファイルをダウンロードしました') except Exception as e: logger.exception(f'E-04-01 暗号化ZIPファイルのダウンロードに失敗しました エラー内容:{e}') delete_doing_file(event) return # ⑤ ZIP解凍パスワードをSSM パラメータストアから取得する try: logger.info( f'I-05-01 ZIP解凍パスワードを読み込み 読込元:{MEDPASS_ZIP_PASSWORD_PARAMETER_STORE_KEY}') zip_password = get_ssm_params( MEDPASS_ZIP_PASSWORD_PARAMETER_STORE_KEY) except Exception as e: logger.exception(f'E-05-01 ZIP解凍パスワードの読み込みに失敗しました エラー内容:{e}') delete_doing_file(event) return # ⑥ ZIPファイルを解凍してローカルに保存 try: logger.info(f'I-05-02 ZIP解凍開始') extracted_zip_file_path = extract_zip_with_password( os.path.join(PATH_TMP, event_file_name), PATH_TMP, zip_password) except RuntimeError as e: if 'password' in str(e).lower(): # パスワードが間違っている場合のエラー logger.exception( f'E-05-02 ZIPのパスワードが不正のため、解凍に失敗しました エラー内容:{e}') delete_doing_file(event) return else: # 想定外のエラー raise e # ZIPファイルが壊れている場合のエラー except BadZipFile as e: logger.exception(f'E-05-03 ZIPの形式が不正のため、解凍に失敗しました エラー内容:{e}') delete_doing_file(event) return # データ登録用にファイルをリネーム # ZIPファイル名がyyyymmdd.zipのため、年月日部分をデータ登録用ファイル名の末尾につけ、拡張子をCSVに変更 data_import_file_name = f'{DATA_IMPORT_FILENAME}_{event_file_name.replace(ZIP_FILE_EXT, CSV_FILE_EXT)}' logger.info(f'I-05-03 ZIP解凍成功') # ⑥ 受信した暗号化ZIPファイルと解凍後のファイルをバックアップする backup_copy_source = { 'Bucket': event_bucket_name, 'Key': event_object_key} execute_date_yyyymmdd = datetime.date.today().strftime('%Y/%m/%d') # ZIPファイルのバックアップ s3_client.copy_object( Bucket=HCP_WEB_BACKUP_BUCKET, Key=f'{BACKUP_ZIPFILE_FOLDER}/{execute_date_yyyymmdd}/{event_file_name}', CopySource=backup_copy_source ) logger.info( f'I-06-01 medパス受信データのバックアップ完了:{HCP_WEB_BACKUP_BUCKET}/{BACKUP_ZIPFILE_FOLDER}/{execute_date_yyyymmdd}/{event_file_name}') # 解凍後ファイルのバックアップ s3_client.upload_file( extracted_zip_file_path, Bucket=HCP_WEB_BACKUP_BUCKET, Key=f'{BACKUP_DATA_IMPORT_FOLDER}/{execute_date_yyyymmdd}/{data_import_file_name}' ) logger.info( f'I-06-02 medパス解凍後データのバックアップ完了:{HCP_WEB_BACKUP_BUCKET}/{BACKUP_DATA_IMPORT_FOLDER}/{execute_date_yyyymmdd}/{data_import_file_name}') # ⑦ 解凍後のファイルをデータ登録バケットに転送する data_import_copy_source = {'Bucket': HCP_WEB_BACKUP_BUCKET, 'Key': f'{BACKUP_DATA_IMPORT_FOLDER}/{execute_date_yyyymmdd}/{data_import_file_name}'} s3_client.copy_object( Bucket=DATA_IMPORT_BUCKET, Key=f'{HCP_WEB_TARGET_FOLDER}/{data_import_file_name}', CopySource=data_import_copy_source ) # アップロード後、元のバケットからは削除する s3_client.delete_object(Bucket=event_bucket_name, Key=event_object_key) logger.info( f'I-07-01 medパス解凍後データの転送完了:{DATA_IMPORT_BUCKET}/{HCP_WEB_TARGET_FOLDER}/{data_import_file_name}') # ⑧ メモリに保持したバケット名/フォルダ名内の「受信データファイル名.doing」ファイルを削除する delete_doing_file(event) logger.info('I-08-01 処理終了 medパスデータ解凍・復号化・転送処理') except Exception as e: logger.exception(f'想定外のエラーが発生しました。処理を終了します。 例外内容:{e}') delete_doing_file(event) raise e