from datetime import datetime import boto3 import io import csv import re from error import error # 定数 LOG_LEVEL = {"i": 'Info', "e": 'Error'} MAPPING_FILE_NAME = 'configmap.config' DIRECTORY_TARGET = '/target/' DIRECTORY_WORK = '/work/' DIRECTORY_SETTINGS = '/settings/' # クラス変数 s3_client = boto3.client('s3') s3_resource = boto3.resource('s3') def init(bucket_name, target_key, target_data_source, target_file_name, log_info): """初期処理 Args: bucket_name : バケット名 target_key : 投入データのフルパス target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 target_file_name : 投入データのファイル名 log_info : ログに記載するデータソース名とファイル名 Returns: settings_key : 投入データに該当する個別設定ファイルのフルパス """ # ① 初期処理開始ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-01 - 初期処理を開始します') # ② S3バケット内のtargetディレクトリに「投入データファイル名.doing」ファイルが存在するかチェックする try: doing_file_name = target_file_name + '.doing' doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-02 - doingファイル:{doing_file_name} の存在チェック') s3_client.head_object(Bucket=bucket_name, Key=doing_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-01 - 投入データ {target_file_name} は既に処理中です') error(bucket_name, target_data_source, target_file_name, log_info) except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-03 - doingファイルは存在しませんでした') try: # ③ S3バケット内のtargetディレクトリに、「投入データファイル名.doing」ファイルを作成する doing_obj = s3_resource.Object(bucket_name, doing_key) doing_obj.put(Body='') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-04 - targetディレクトリに {doing_file_name} を作成しました') # ④ 投入データファイルをS3バケット内のtargetディレクトリから、workディレクトリに移動(コピー削除)する target_obj = s3_resource.Object(bucket_name, target_key) target_response = target_obj.get() work_key = target_data_source + DIRECTORY_WORK + target_file_name work_body = target_response["Body"].read() work_obj = s3_resource.Object(bucket_name, work_key) work_obj.put(Body=work_body) s3_client.delete_object(Bucket=bucket_name, Key=target_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-05 - 投入データ {target_file_name} をworkディレクトリに移動しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) # ⑤ S3バケット内のtargetディレクトリの以下ファイル群を削除(前回分の削除)する try: # doneファイルの存在確認 print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-06 - doneファイル:{target_file_name}.done の存在チェック') result_done_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.done' s3_client.head_object(Bucket=bucket_name, Key=result_done_key) s3_client.delete_object(Bucket=bucket_name, Key=result_done_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-07 - doneファイルが存在したため削除しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-08 - doneファイルは存在しませんでした') try: # warningファイルの存在確認 print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-09 - warningファイル:{target_file_name}.warning の存在チェック') result_warning_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.warning' s3_client.head_object(Bucket=bucket_name, Key=result_warning_key) s3_client.delete_object(Bucket=bucket_name, Key=result_warning_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-10 - warningファイルが存在したため削除しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-11 - warningファイルは存在しませんでした') try: # errorファイルの存在確認 print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-12 - errorファイル:{target_file_name}.error の存在チェック') result_error_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.error' s3_client.head_object(Bucket=bucket_name, Key=result_error_key) s3_client.delete_object(Bucket=bucket_name, Key=result_error_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-13 - errorファイルが存在したため削除しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-14 - errorファイルは存在しませんでした') # ⑥ 個別設定マッピングリストが存在するかチェックする try: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-15 - 個別設定マッピングリスト:{MAPPING_FILE_NAME} の存在チェック') mapping_key = target_data_source + DIRECTORY_SETTINGS + MAPPING_FILE_NAME s3_client.head_object(Bucket=bucket_name, Key=mapping_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-16 - 個別設定マッピングリストの存在を確認しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-02 - 個別設定マッピングリストが存在しません') error(bucket_name, target_data_source, target_file_name, log_info) # ⑦ 個別設定ファイルを特定する try: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-17 - 個別設定ファイルを検索します') mapping_obj = s3_resource.Object(bucket_name, mapping_key) mapping_response = mapping_obj.get() mapping_body = io.TextIOWrapper(io.BytesIO(mapping_response["Body"].read()), encoding='utf-8') settings_file_name = '' for row in csv.reader(mapping_body, delimiter='\t'): match_result = re.fullmatch(row[0], target_file_name) if match_result is not None: settings_file_name = row[1].rstrip() print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-18 - 個別設定ファイル:{settings_file_name} を特定しました') break if not settings_file_name: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-03 - 個別設定ファイルが特定出来ません') error(bucket_name, target_data_source, target_file_name, log_info) except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) # ⑧ ⑦の個別設定ファイルが存在するかチェックする try: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-19 - 個別設定ファイル:{settings_file_name} の存在チェック') settings_key = target_data_source + DIRECTORY_SETTINGS + settings_file_name s3_client.head_object(Bucket=bucket_name, Key=settings_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-20 - 個別設定ファイルの存在を確認しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-04 - 個別設定ファイルが存在しません') error(bucket_name, target_data_source, target_file_name, log_info) # ⑨ 初期処理終了ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-21 - 初期処理を終了します') return settings_key