from datetime import datetime import boto3 import io import csv import re import sys from error import error from common import debug_log # 定数 LOG_LEVEL = {"i": 'Info', "e": 'Error'} MAPPING_FILE_NAME = 'configmap.config' DIRECTORY_TARGET = '/target/' DIRECTORY_WORK = '/work/' DIRECTORY_SETTINGS = '/settings/' # クラス変数 s3_client = boto3.client('s3') s3_resource = boto3.resource('s3') def init(bucket_name, target_key, target_data_source, target_file_name, log_info, mode): """初期処理 Args: bucket_name : バケット名 target_key : 投入データのフルパス target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 target_file_name : 投入データのファイル名 log_info : ログに記載するデータソース名とファイル名 mode : 処理モード Returns: settings_key : 投入データに該当する個別設定ファイルのフルパス """ try: debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode) debug_log(f'引数 target_key : {target_key}', log_info, mode) debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode) debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode) debug_log(f'引数 log_info : {log_info}', log_info, mode) debug_log(f'引数 mode : {mode}', log_info, mode) # ① 初期処理開始ログを出力する print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-01 - 初期処理を開始します') except Exception as e: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) try: # ② S3バケット内のtargetディレクトリに「投入データファイル名.doing」ファイルが存在するかチェックする doing_file_name = target_file_name + '.doing' doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-02 - doingファイル:{doing_file_name} の存在チェック') s3_client.head_object(Bucket=bucket_name, Key=doing_key) print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-INI-01 - 投入データ {target_file_name} は既に処理中です') sys.exit() except Exception as e: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-03 - doingファイルは存在しませんでした') try: # ③ S3バケット内のtargetディレクトリに、「投入データファイル名.doing」ファイルを作成する doing_obj = s3_resource.Object(bucket_name, doing_key) doing_obj.put(Body='') print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-04 - targetディレクトリに {doing_file_name} を作成しました') # ④ 投入データファイルをS3バケット内のtargetディレクトリから、workディレクトリに移動(コピー削除)する copy_source = { 'Bucket': bucket_name, 'Key': target_key } work_key = target_data_source + DIRECTORY_WORK + target_file_name work_obj = s3_resource.Object(bucket_name, work_key) work_obj.copy(copy_source) s3_client.delete_object(Bucket=bucket_name, Key=target_key) print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-05 - 投入データ {target_file_name} をworkディレクトリに移動しました') except Exception as e: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) try: # ⑤ S3バケット内のtargetディレクトリの以下ファイル群を削除(前回分の削除)する # doneファイルの存在確認 print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-06 - doneファイル:{target_file_name}.done の存在チェック') result_done_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.done' s3_client.head_object(Bucket=bucket_name, Key=result_done_key) s3_client.delete_object(Bucket=bucket_name, Key=result_done_key) print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-07 - doneファイルが存在したため削除しました') except Exception as e: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-08 - doneファイルは存在しませんでした') try: # warningファイルの存在確認 print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-09 - warningファイル:{target_file_name}.warning の存在チェック') result_warning_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.warning' s3_client.head_object(Bucket=bucket_name, Key=result_warning_key) s3_client.delete_object(Bucket=bucket_name, Key=result_warning_key) print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-10 - warningファイルが存在したため削除しました') except Exception as e: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-11 - warningファイルは存在しませんでした') try: # errorファイルの存在確認 print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-12 - errorファイル:{target_file_name}.error の存在チェック') result_error_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.error' s3_client.head_object(Bucket=bucket_name, Key=result_error_key) s3_client.delete_object(Bucket=bucket_name, Key=result_error_key) print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-13 - errorファイルが存在したため削除しました') except Exception as e: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-14 - errorファイルは存在しませんでした') try: # ⑥ 個別設定マッピングリストが存在するかチェックする print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-15 - 個別設定マッピングリスト:{MAPPING_FILE_NAME} の存在チェック') mapping_key = target_data_source + DIRECTORY_SETTINGS + MAPPING_FILE_NAME s3_client.head_object(Bucket=bucket_name, Key=mapping_key) print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-16 - 個別設定マッピングリストの存在を確認しました') except Exception as e: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-INI-02 - 個別設定マッピングリストが存在しません') error(bucket_name, target_data_source, target_file_name, log_info) try: # ⑦ 個別設定ファイルを特定する print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-17 - 個別設定ファイルを検索します') mapping_obj = s3_resource.Object(bucket_name, mapping_key) mapping_response = mapping_obj.get() mapping_body = io.TextIOWrapper(io.BytesIO(mapping_response["Body"].read()), encoding='utf-8') settings_file_name = '' for row in csv.reader(mapping_body, delimiter='\t'): match_result = re.fullmatch(row[0], target_file_name) if match_result is not None: settings_file_name = row[1].rstrip() print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-18 - 個別設定ファイル:{settings_file_name} を特定しました') break if not settings_file_name: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-INI-03 - 個別設定ファイルが特定出来ません') error(bucket_name, target_data_source, target_file_name, log_info) except Exception as e: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) try: # ⑧ ⑦の個別設定ファイルが存在するかチェックする print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-19 - 個別設定ファイル:{settings_file_name} の存在チェック') settings_key = target_data_source + DIRECTORY_SETTINGS + settings_file_name s3_client.head_object(Bucket=bucket_name, Key=settings_key) print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-20 - 個別設定ファイルの存在を確認しました') except Exception as e: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-INI-04 - 個別設定ファイルが存在しません') error(bucket_name, target_data_source, target_file_name, log_info) try: # ⑨ 初期処理終了ログを出力する print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-21 - 初期処理を終了します') return settings_key except Exception as e: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info)