from datetime import datetime import boto3 import sys # 定数 LOG_LEVEL = {'i': 'Info', 'e': 'Error'} DIRECTORY_TARGET = '/target/' DIRECTORY_WORK = '/work/' DIRECTORY_ERROR = '/error/' # クラス変数 s3_client = boto3.client('s3') s3_resource = boto3.resource('s3') def error(bucket_name, target_data_source, target_file_name, log_info): """エラー処理 Args: bucket_name : バケット名 target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 target_file_name : 投入データのファイル名 log_info : ログに記載するデータソース名とファイル名 """ try: # ① エラー処理開始ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-01 - エラー処理を開始します') # ② 投入データファイルをS3バケット内のworkディレクトリから、以下ファイル名でerrorディレクトリに移動(コピー削除)する work_key = target_data_source + DIRECTORY_WORK + target_file_name work_obj = s3_resource.Object(bucket_name, work_key) work_response = work_obj.get() work_body = work_response["Body"].read() error_file_name = str(datetime.now()) + '_' + target_file_name error_key = target_data_source + DIRECTORY_ERROR + error_file_name error_obj = s3_resource.Object(bucket_name, error_key) error_obj.put(Body=work_body) s3_client.delete_object(Bucket=bucket_name, Key=work_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-02 - workディレクトリの {target_file_name} をerrorディレクトリに移動しました 移動後ファイル名:{error_file_name}') # ③ S3バケット内のtargetディレクトリに存在する「投入データファイル名.doing」ファイルを削除する doing_file_name = target_file_name + '.doing' doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name s3_client.delete_object(Bucket=bucket_name, Key=doing_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-03 - targetディレクトリの {doing_file_name} を削除しました') # ④ S3バケット内のtargetディレクトリに、「投入データファイル名.error」ファイルを作成する result_error_file_name = target_file_name + '.error' result_error_key = target_data_source + DIRECTORY_TARGET + result_error_file_name result_error_obj = s3_resource.Object(bucket_name, result_error_key) result_error_obj.put(Body='') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-04 - targetディレクトリに {result_error_file_name} を作成しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-ERR-99 - エラー内容:{e}') finally: # ⑤ 終了処理終了ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-05 - エラー処理を終了します') # ⑥ 処理を終了する sys.exit()