205 lines
8.7 KiB
Python
205 lines
8.7 KiB
Python
import datetime
|
||
import logging
|
||
import os
|
||
from zoneinfo import ZoneInfo
|
||
|
||
import boto3
|
||
import pyzipper
|
||
from pyzipper.zipfile import BadZipFile
|
||
|
||
# 環境変数
|
||
# mbj-newdwh2021-staging-data
|
||
DATA_IMPORT_BUCKET = os.environ["DATA_IMPORT_BUCKET"]
|
||
# mbj-newdwh2021-staging-backup-medpass
|
||
MEDPASS_BACKUP_BUCKET = os.environ["MEDPASS_BACKUP_BUCKET"]
|
||
MEDPASS_TARGET_FOLDER = os.environ["MEDPASS_TARGET_FOLDER"] # medpass/target
|
||
# /staging/s3/medpassZipExtractPassword
|
||
MEDPASS_ZIP_PASSWORD_PARAMETER_STORE_KEY = os.environ["MEDPASS_ZIP_PASSWORD_PARAMETER_STORE_KEY"]
|
||
LOG_LEVEL = os.environ["LOG_LEVEL"]
|
||
TZ = os.environ["TZ"]
|
||
|
||
# 定数
|
||
# 多重起動抑制用のコントロールファイルの拡張子
|
||
EXCLUSIVE_CONTROL_FILE_EXT = '.doing'
|
||
# tmpフォルダパス
|
||
PATH_TMP = '/tmp'
|
||
# 拡張子
|
||
ZIP_FILE_EXT = 'zip'
|
||
CSV_FILE_EXT = 'csv'
|
||
|
||
# S3クライアント
|
||
s3_client = boto3.client('s3')
|
||
# SystemsManagerクライアント
|
||
ssm_client = boto3.client('ssm')
|
||
|
||
# logger設定
|
||
logger = logging.getLogger()
|
||
|
||
|
||
def log_datetime_convert_tz(*arg):
|
||
"""ログに出力するタイムスタンプのロケールを変更する(JST指定)"""
|
||
return datetime.datetime.now(ZoneInfo(TZ)).timetuple()
|
||
|
||
|
||
formatter = logging.Formatter(
|
||
'[%(levelname)s]\t%(asctime)s\t%(message)s\n',
|
||
'%Y-%m-%d %H:%M:%S'
|
||
)
|
||
formatter.converter = log_datetime_convert_tz
|
||
for handler in logger.handlers:
|
||
handler.setFormatter(formatter)
|
||
|
||
level = logging.getLevelName(LOG_LEVEL)
|
||
if not isinstance(level, int):
|
||
level = logging.INFO
|
||
logger.setLevel(level)
|
||
|
||
|
||
def extract_zip_with_password(zip_filepath: str, extract_to_folder: str, password: str):
|
||
"""
|
||
暗号化ZIPを解凍する。
|
||
|
||
:param zip_filepath: ZIPファイルが保管されているフォルダパス
|
||
:param extract_to_folder: ZIPファイルの解凍先フォルダ
|
||
:param password: ZIPパスワード
|
||
"""
|
||
# ZIPを解凍
|
||
try:
|
||
with pyzipper.AESZipFile(zip_filepath) as z:
|
||
# ZIP内のファイルは1つのみ
|
||
inner_filename = z.filelist[0].filename
|
||
z.extractall(path=extract_to_folder, pwd=password.encode())
|
||
except BadZipFile as e:
|
||
pass
|
||
except Exception as e:
|
||
pass
|
||
|
||
logger.info(f'解凍先フォルダ: {os.listdir("/tmp")}')
|
||
logger.info(f'解凍ファイルパス: {os.path.join(extract_to_folder, inner_filename)}')
|
||
return os.path.join(extract_to_folder, inner_filename)
|
||
|
||
|
||
def get_s3_event_parameter(event: dict):
|
||
s3_event = event["Records"][0]["s3"]
|
||
event_bucket_name = s3_event["bucket"]["name"]
|
||
event_object_key = s3_event["object"]["key"]
|
||
event_file_name = os.path.basename(event_object_key)
|
||
event_folder_name = os.path.dirname(event_object_key).split('/')[0]
|
||
|
||
return event_bucket_name, event_object_key, event_file_name, event_folder_name
|
||
|
||
|
||
def get_ssm_params(parameter_key: str, with_decryption: bool = True):
|
||
response = ssm_client.get_parameter(
|
||
Name=parameter_key, WithDecryption=with_decryption)
|
||
parameter_value = response['Parameter']['Value']
|
||
return parameter_value
|
||
|
||
|
||
def delete_doing_file(event_bucket_name: str, event_object_key: str):
|
||
# ⑨ メモリに保持したバケット名/フォルダ名内の「受信データファイル名.doing」ファイルを削除する
|
||
s3_client.delete_object(
|
||
Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}')
|
||
|
||
|
||
def handler(event, context):
|
||
try:
|
||
# ① 処理開始ログを出力する
|
||
logger.info('I-01-01 処理開始 medパスデータ解凍・復号化・転送処理')
|
||
|
||
# ② 処理開始時に受け取ったイベント情報をログに出力する
|
||
# バケット名・フォルダ名・受信データファイル名をメモリに保持
|
||
(
|
||
event_bucket_name,
|
||
event_object_key,
|
||
event_file_name,
|
||
event_folder_name
|
||
) = get_s3_event_parameter(event)
|
||
logger.info(f'I-02-01 受信バケット:{event_bucket_name}')
|
||
logger.info(f'I-02-01 フォルダ名:{event_folder_name}')
|
||
logger.info(f'I-02-01 ファイル名:{event_file_name}')
|
||
# ③ S3イベントによるLambdaの重複発火防止の為、メモリに保持したバケット名/フォルダ名内に、「受信データファイル名.doing」ファイルが存在するかチェックする
|
||
try:
|
||
s3_client.head_object(
|
||
Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}')
|
||
logger.error(
|
||
f'E-01-01 {event_bucket_name}/{event_object_key}は現在処理中です。処理を終了します。')
|
||
return
|
||
except Exception:
|
||
# .doingファイルが見つからなかった場合は、処理を続行する
|
||
# メモリに保持したバケット名/フォルダ名内に、「受信データファイル名.doing」ファイルを作成する
|
||
logger.info('I-03-01 medパスデータの解凍・復号化・転送を開始します')
|
||
s3_client.put_object(
|
||
Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}', Body=b'')
|
||
|
||
# ④ S3から暗号化ZIPファイルを読み込む
|
||
try:
|
||
logger.info(
|
||
f'I-04-01 暗号化ZIPファイル読込 読込元:{event_bucket_name}/{event_object_key}')
|
||
s3_client.download_file(
|
||
event_bucket_name, event_object_key, os.path.join(PATH_TMP, event_file_name))
|
||
logger.info('I-04-02 暗号化ZIPファイルをダウンロードしました')
|
||
except Exception as e:
|
||
logger.exception(f'E-04-01 暗号化ZIPファイルのダウンロードに失敗しました エラー内容:{e}')
|
||
raise e
|
||
|
||
# ⑤ ZIP解凍パスワードをSSM パラメータストアから取得する
|
||
try:
|
||
logger.info(
|
||
f'I-05-01 ZIP解凍パスワードを読み込み 読込元:{MEDPASS_ZIP_PASSWORD_PARAMETER_STORE_KEY}')
|
||
zip_password = get_ssm_params(
|
||
MEDPASS_ZIP_PASSWORD_PARAMETER_STORE_KEY)
|
||
except Exception as e:
|
||
logger.exception(f'E-05-01 ZIP解凍パスワードの読み込みに失敗しました エラー内容:{e}')
|
||
raise e
|
||
|
||
# ⑥ ZIPファイルを解凍してローカルに保存
|
||
try:
|
||
logger.info(f'I-06-01 ZIP解凍開始')
|
||
extracted_zip_file_path = extract_zip_with_password(
|
||
os.path.join(PATH_TMP, event_file_name), PATH_TMP, zip_password)
|
||
except Exception as e:
|
||
logger.exception(f'E-06-01 ZIPの解凍に失敗しました エラー内容:{e}')
|
||
raise e
|
||
logger.info(f'I-06-02 ZIP解凍成功')
|
||
# ⑦ バックアップS3バケットにコピー
|
||
copy_source = {'Bucket': event_bucket_name, 'Key': event_object_key}
|
||
execute_date_yyyymm = datetime.date.today().strftime('%Y/%m/%d')
|
||
s3_client.copy_object(
|
||
Bucket=MEDPASS_BACKUP_BUCKET,
|
||
Key=f'{execute_date_yyyymm}/{event_file_name}',
|
||
CopySource=copy_source
|
||
)
|
||
logger.info(
|
||
f'I-07-01 medパス受信データのバックアップ完了:{MEDPASS_BACKUP_BUCKET}/{execute_date_yyyymm}/{event_file_name}')
|
||
|
||
# ⑧ データ登録S3バケットにアップロード
|
||
csv_file_name = extracted_zip_file_path.split('/')[-1]
|
||
logger.info(csv_file_name)
|
||
s3_client.upload_file(
|
||
extracted_zip_file_path,
|
||
Bucket=DATA_IMPORT_BUCKET,
|
||
Key=f'{MEDPASS_TARGET_FOLDER}/{csv_file_name}'
|
||
)
|
||
|
||
# アップロード後、元のバケットからは削除する
|
||
s3_client.delete_object(Bucket=event_bucket_name, Key=event_object_key)
|
||
logger.info(
|
||
f'I-08-01 Encise受信データの転送完了:{DATA_IMPORT_BUCKET}/{MEDPASS_TARGET_FOLDER}/{csv_file_name}')
|
||
|
||
# ⑨ メモリに保持したバケット名/フォルダ名内の「受信データファイル名.doing」ファイルを削除する
|
||
delete_doing_file(event_bucket_name, event_object_key)
|
||
|
||
logger.info('I-08-09 処理終了 Medpass受信データ転送処理')
|
||
|
||
except Exception as e:
|
||
logger.exception(f'想定外のエラーが発生しました。処理を終了します。 例外内容:{e}')
|
||
(
|
||
event_bucket_name,
|
||
event_object_key,
|
||
_,
|
||
_
|
||
) = get_s3_event_parameter(event)
|
||
delete_doing_file(event_bucket_name, event_object_key)
|
||
raise e
|