diff --git a/ecs/Dockerfile/Dockerfile b/ecs/Dockerfile/Dockerfile index 87ebe333..ec5fb0e0 100644 --- a/ecs/Dockerfile/Dockerfile +++ b/ecs/Dockerfile/Dockerfile @@ -6,12 +6,5 @@ WORKDIR /usr/src/app COPY requirements.txt ./ RUN pip install --no-cache-dir -r requirements.txt COPY dataimport ./ -# COPY controller.py ./ -# COPY ini.py ./ -# COPY chk.py ./ -# COPY main.py ./ -# COPY end.py ./ -# COPY warning.py ./ -# COPY error.py ./ -CMD [ "python", "./dataimport/controller.py" ] \ No newline at end of file +CMD [ "python", "./controller.py" ] diff --git a/ecs/Dockerfile/dataimport/chk.py b/ecs/Dockerfile/dataimport/chk.py index f6f795bb..6c043e76 100644 --- a/ecs/Dockerfile/dataimport/chk.py +++ b/ecs/Dockerfile/dataimport/chk.py @@ -4,8 +4,7 @@ import io import csv from error import error -s3_resource = boto3.resource('s3') - +# 定数 LOG_LEVEL = {'i': 'Info', 'e': 'Error'} SETTINGS_ITEM = { 'dataSource': 0, @@ -21,8 +20,17 @@ SETTINGS_ITEM = { 'loadSchemaName': 10, 'exSqlFileName': 11, } +LINE_FEED_CODE = { + 'CR': '\r', + 'LF': '\n', + 'CRLF': '\r\n', +} + +# クラス変数 +s3_resource = boto3.resource('s3') +# チェック例外クラス class CheckError(Exception): pass @@ -52,18 +60,18 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting target_obj = s3_resource.Object(bucket_name, target_key) target_response = target_obj.get() - target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=settings_list[SETTINGS_ITEM["lineFeedCode"]]) + target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) for line in csv.reader(target_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): - target_header_list = line.rstrip() + target_header_list = line break # ② C-1 項目数チェック print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-1のチェックを開始します') target_header_list_len = len(target_header_list) - if not target_header_list_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]): + if target_header_list_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]): print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - C-1:正常終了') else: - raise CheckError('E-CHK-01 - 項目数が一致しません') + raise CheckError(f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{target_header_list_len}') # ③ C-2 項目並び順チェック if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True: @@ -71,16 +79,16 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip().split(',') for i in range(len(settings_header_list)): if not settings_header_list[i] == target_header_list[i]: - raise CheckError('E-CHK-02 - 項目順序が一致しません') + raise CheckError(f'E-CHK-02 - 項目順序が一致しません {i}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{target_header_list[i]}') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-2:正常終了') except CheckError as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} {e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) # ④ 終了ログの出力 print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - チェック処理を終了します') diff --git a/ecs/Dockerfile/dataimport/controller.py b/ecs/Dockerfile/dataimport/controller.py index 01bd22cd..e4b0d637 100644 --- a/ecs/Dockerfile/dataimport/controller.py +++ b/ecs/Dockerfile/dataimport/controller.py @@ -1,11 +1,8 @@ import os from datetime import datetime -import boto3 -import sys - - from ini import init from chk import check +from main import main from end import end # 引数 @@ -21,43 +18,36 @@ DB_PASS = os.environ["DB_PASS"] DB_USER = os.environ["DB_USER"] DB_INFO = {"host": DB_HOST, "name": DB_NAME, "pass": DB_PASS, "user": DB_USER} -s3_client = boto3.client('s3') -s3_resource = boto3.resource('s3') - -LOG_LEVEL = {"i": 'Info', "e": 'Error'} +# 定数 +LOG_LEVEL = {"i": 'Info'} LOG_INFO = f' {DATA_SOURCE_NAME} {FILE_NAME} ' -DIRECTORY_TARGET = '/target/' -DIRECTORY_WORK = '/work/' -DIRECTORY_DONE = '/done/' -DIRECTORY_WARNING = '/warning/' -def controller(): - """コントロール処理 +"""コントロール処理 - 各処理を呼び出すコントローラー +各処理を呼び出すコントローラー - """ +""" - # ① 開始ログの出力 - print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します') - # ② 初期処理 - print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し') - settings_key = init(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO) +# ① 開始ログの出力 +print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します') - # ③ チェック処理 - print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し') - check(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO) +# ② 初期処理 +print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し') +settings_key = init(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO) - # ④ メイン処理 - print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し') - warning_info = '' - # warning_info = main(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO) +# ③ チェック処理 +print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し') +check(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO) - # ⑤ 終了処理 - print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し') - end(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, warning_info, LOG_INFO) +# ④ メイン処理 +print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し') +warning_info = main(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO) - # ⑥ 終了ログの出力 - print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します') +# ⑤ 終了処理 +print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し') +end(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, warning_info, LOG_INFO) + +# ⑥ 終了ログの出力 +print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します') diff --git a/ecs/Dockerfile/dataimport/end.py b/ecs/Dockerfile/dataimport/end.py index 907a86cd..06bad863 100644 --- a/ecs/Dockerfile/dataimport/end.py +++ b/ecs/Dockerfile/dataimport/end.py @@ -2,15 +2,17 @@ from datetime import datetime import boto3 from error import error -s3_client = boto3.client('s3') -s3_resource = boto3.resource('s3') - +# 定数 LOG_LEVEL = {'i': 'Info', 'e': 'Error'} DIRECTORY_TARGET = '/target/' DIRECTORY_WORK = '/work/' DIRECTORY_DONE = '/done/' DIRECTORY_WARNING = '/warning/' +# クラス変数 +s3_client = boto3.client('s3') +s3_resource = boto3.resource('s3') + def end(bucket_name, target_data_source, target_file_name, warning_info, log_info): """終了処理 @@ -34,6 +36,7 @@ def end(bucket_name, target_data_source, target_file_name, warning_info, log_inf done_key = target_data_source + DIRECTORY_DONE + done_file_name done_obj = s3_resource.Object(bucket_name, done_key) done_obj.put(Body=work_body) + s3_client.delete_object(Bucket=bucket_name, Key=work_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-02 - workディレクトリの {target_file_name} をdoneディレクトリに移動しました 移動後ファイル名:{done_file_name}') # ③ doingファイルの削除 @@ -70,7 +73,7 @@ def end(bucket_name, target_data_source, target_file_name, warning_info, log_inf print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-09 - targetディレクトリに {result_done_file_name} を作成しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-END-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) # ⑤ 終了ログの出力 print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-0 - 終了処理を終了します') diff --git a/ecs/Dockerfile/dataimport/error.py b/ecs/Dockerfile/dataimport/error.py index e8de6e4f..d8d17c27 100644 --- a/ecs/Dockerfile/dataimport/error.py +++ b/ecs/Dockerfile/dataimport/error.py @@ -2,13 +2,15 @@ from datetime import datetime import boto3 import sys -s3_client = boto3.client('s3') -s3_resource = boto3.resource('s3') - +# 定数 LOG_LEVEL = {'i': 'Info', 'e': 'Error'} DIRECTORY_TARGET = '/target/' DIRECTORY_WORK = '/work/' -DIRECTORY_DONE = '/done/' +DIRECTORY_ERROR = '/error/' + +# クラス変数 +s3_client = boto3.client('s3') +s3_resource = boto3.resource('s3') def error(bucket_name, target_data_source, target_file_name, log_info): @@ -30,9 +32,10 @@ def error(bucket_name, target_data_source, target_file_name, log_info): work_response = work_obj.get() work_body = work_response["Body"].read() error_file_name = str(datetime.now()) + '_' + target_file_name - error_key = target_data_source + DIRECTORY_DONE + error_file_name + error_key = target_data_source + DIRECTORY_ERROR + error_file_name error_obj = s3_resource.Object(bucket_name, error_key) error_obj.put(Body=work_body) + s3_client.delete_object(Bucket=bucket_name, Key=work_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-02 - workディレクトリの {target_file_name} をerrorディレクトリに移動しました 移動後ファイル名:{error_file_name}') # ③ doingファイルの削除 diff --git a/ecs/Dockerfile/dataimport/ini.py b/ecs/Dockerfile/dataimport/ini.py index fbddb372..d3720db5 100644 --- a/ecs/Dockerfile/dataimport/ini.py +++ b/ecs/Dockerfile/dataimport/ini.py @@ -5,16 +5,17 @@ import csv import re from error import error -ecs_client = boto3.client('ecs') -s3_client = boto3.client('s3') -s3_resource = boto3.resource('s3') - +# 定数 LOG_LEVEL = {"i": 'Info', "e": 'Error'} MAPPING_FILE_NAME = 'configmap.config' DIRECTORY_TARGET = '/target/' DIRECTORY_WORK = '/work/' DIRECTORY_SETTINGS = '/settings/' +# クラス変数 +s3_client = boto3.client('s3') +s3_resource = boto3.resource('s3') + def init(bucket_name, target_key, target_data_source, target_file_name, log_info): """初期処理 @@ -40,7 +41,7 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info try: s3_client.head_object(Bucket=bucket_name, Key=doing_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-01 - 投入データ {target_file_name} は既に処理中です') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-03 - doingファイルは存在しませんでした') @@ -51,7 +52,7 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-04 - targetディレクトリに {doing_file_name} を作成しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) # ④ 投入データファイルをworkディレクトリに移動 try: @@ -64,7 +65,7 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-05 - 投入データ {target_file_name} をworkディレクトリに移動しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) # ⑤ 処理結果ファイルの削除 # doneファイルの存在確認 @@ -105,7 +106,7 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-16 - 個別設定マッピングリストの存在を確認しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-02 - 個別設定マッピングリストが存在しません') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) # ⑦ 個別設定ファイルの特定 print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-17 - 個別設定ファイルを検索します') @@ -113,6 +114,7 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info mapping_obj = s3_resource.Object(bucket_name, mapping_key) mapping_response = mapping_obj.get() mapping_body = io.TextIOWrapper(io.BytesIO(mapping_response["Body"].read()), encoding='utf-8') + settings_file_name = '' for row in csv.reader(mapping_body, delimiter='\t'): match_result = re.fullmatch(row[0], target_file_name) if match_result is not None: @@ -120,12 +122,12 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-18 - 個別設定ファイル:{settings_file_name} を特定しました') break - if settings_file_name is None: + if not settings_file_name: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-03 - 個別設定ファイルが特定出来ません') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) # ⑧ 個別設定ファイルの存在確認 print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-19 - 個別設定ファイル:{settings_file_name} の存在チェック') @@ -135,7 +137,7 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-20 - 個別設定ファイルの存在を確認しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-04 - 個別設定ファイルが存在しません') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) # ⑨ 終了ログの出力 print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-21 - 初期処理を終了します') diff --git a/ecs/Dockerfile/dataimport/main.py b/ecs/Dockerfile/dataimport/main.py index 5b270062..a2551d83 100644 --- a/ecs/Dockerfile/dataimport/main.py +++ b/ecs/Dockerfile/dataimport/main.py @@ -5,10 +5,7 @@ import io import csv from error import error -ecs_client = boto3.client('ecs') -s3_client = boto3.client('s3') -s3_resource = boto3.resource('s3') - +# 定数 LOG_LEVEL = {"i": 'Info', "e": 'Error', "w": 'Warning'} SETTINGS_ITEM = { 'dataSource': 0, @@ -24,10 +21,19 @@ SETTINGS_ITEM = { 'loadSchemaName': 10, 'exSqlFileName': 11, } +LINE_FEED_CODE = { + 'CR': '\r', + 'LF': '\n', + 'CRLF': '\r\n', +} DIRECTORY_SETTINGS = '/settings/' +# クラス変数 +s3_client = boto3.client('s3') +s3_resource = boto3.resource('s3') -def init(bucket_name, target_key, target_data_source, target_file_name, settings_key, db_info, log_info): + +def main(bucket_name, target_key, target_data_source, target_file_name, settings_key, db_info, log_info): """メイン処理 Args: bucket_name : バケット名 @@ -49,7 +55,7 @@ def init(bucket_name, target_key, target_data_source, target_file_name, settings conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5) except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました') # ③ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする @@ -68,17 +74,17 @@ def init(bucket_name, target_key, target_data_source, target_file_name, settings print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) # ④ 投入データファイルを1行ごとにループする print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - 投入データ {target_file_name} の読み込みを開始します') try: target_obj = s3_resource.Object(bucket_name, target_key) target_response = target_obj.get() - target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=settings_list[SETTINGS_ITEM["lineFeedCode"]]) + target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) process_count = 0 # 処理件数カウンタ normal_count = 0 # 正常終了件数カウンタ @@ -147,7 +153,7 @@ def init(bucket_name, target_key, target_data_source, target_file_name, settings print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) # ⑦ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 拡張SQL設定が存在するかチェックします')