from datetime import datetime import boto3 from error import error # 定数 LOG_LEVEL = {'i': 'Info', 'e': 'Error'} DIRECTORY_TARGET = '/target/' DIRECTORY_WORK = '/work/' DIRECTORY_DONE = '/done/' DIRECTORY_WARNING = '/warning/' # クラス変数 s3_client = boto3.client('s3') s3_resource = boto3.resource('s3') def end(bucket_name, target_data_source, target_file_name, warning_info, log_info): """終了処理 Args: bucket_name : バケット名 target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 target_file_name : 投入データのファイル名 warning_info : Warning情報 log_info : ログに記載するデータソース名とファイル名 """ # ① 終了処理開始ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-01 - 終了処理を開始します') try: # ② 投入データファイルをS3バケット内のworkディレクトリから、以下ファイル名でdoneディレクトリに移動(コピー削除)する work_key = target_data_source + DIRECTORY_WORK + target_file_name work_obj = s3_resource.Object(bucket_name, work_key) work_response = work_obj.get() work_body = work_response["Body"].read() done_file_name = str(datetime.now()) + '_' + target_file_name done_key = target_data_source + DIRECTORY_DONE + done_file_name done_obj = s3_resource.Object(bucket_name, done_key) done_obj.put(Body=work_body) s3_client.delete_object(Bucket=bucket_name, Key=work_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-02 - workディレクトリの {target_file_name} をdoneディレクトリに移動しました 移動後ファイル名:{done_file_name}') # ③ S3バケット内のtargetディレクトリに存在する「投入データファイル名.doing」ファイルを削除する doing_file_name = target_file_name + '.doing' doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name s3_client.delete_object(Bucket=bucket_name, Key=doing_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-03 - targetディレクトリの {doing_file_name} を削除しました') # ④ Warning情報が存在するか確認する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-04 - Warning情報の存在チェック') if warning_info: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-05 - Warning情報は存在しました') # warningファイルの作成 warning_file_name = str(datetime.now()) + '_' + target_file_name + '_war.log' warning_key = target_data_source + DIRECTORY_WARNING + warning_file_name warning_obj = s3_resource.Object(bucket_name, warning_key) warning_obj.put(Body=warning_info) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-06 - warningディレクトリに {warning_file_name} を作成しました') # warning処理結果ファイルの作成 result_warning_file_name = target_file_name + '.warning' result_warning_key = target_data_source + DIRECTORY_TARGET + result_warning_file_name result_warning_obj = s3_resource.Object(bucket_name, result_warning_key) result_warning_obj.put(Body='') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-07 - targetディレクトリに {result_warning_file_name} を作成しました') else: # done処理結果ファイルの作成 print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-08 - Warning情報は存在しませんでした') result_done_file_name = target_file_name + '.done' result_done_key = target_data_source + DIRECTORY_TARGET + result_done_file_name result_done_obj = s3_resource.Object(bucket_name, result_done_key) result_done_obj.put(Body='') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-09 - targetディレクトリに {result_done_file_name} を作成しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-END-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) # ⑤ 終了処理終了ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-10 - 終了処理を終了します')