From 787cacf4319cbb36809ec9c3e1ef7dd7f99bab56 Mon Sep 17 00:00:00 2001 From: *lcOeIaePm0 Date: Wed, 13 Oct 2021 10:56:56 +0900 Subject: [PATCH 1/7] =?UTF-8?q?feat:DockerFile=E3=82=92=E4=BD=9C=E6=88=90?= =?UTF-8?q?=E3=81=97=E3=81=9F=E3=81=9F=E3=82=81=E3=80=81DockerFile?= =?UTF-8?q?=E8=B3=87=E6=9D=90=E3=82=92=E8=BF=BD=E5=8A=A0=E3=81=97=E3=81=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/Dockerfile/Dockerfile | 10 ++++++++++ ecs/Dockerfile/main.py | 19 +++++++++++++++++++ ecs/Dockerfile/requirements.txt | 1 + 3 files changed, 30 insertions(+) create mode 100644 ecs/Dockerfile/Dockerfile create mode 100644 ecs/Dockerfile/main.py create mode 100644 ecs/Dockerfile/requirements.txt diff --git a/ecs/Dockerfile/Dockerfile b/ecs/Dockerfile/Dockerfile new file mode 100644 index 00000000..6934fec5 --- /dev/null +++ b/ecs/Dockerfile/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.9 + +ENV TZ="Asia/Tokyo" + +WORKDIR /usr/src/app +COPY requirements.txt ./ +RUN pip install --no-cache-dir -r requirements.txt +COPY main.py ./ + +CMD [ "python", "./main.py" ] \ No newline at end of file diff --git a/ecs/Dockerfile/main.py b/ecs/Dockerfile/main.py new file mode 100644 index 00000000..5b66d418 --- /dev/null +++ b/ecs/Dockerfile/main.py @@ -0,0 +1,19 @@ +import boto3 +import os +from os import path + +# EVENT_BUCKET = os.environ["EVENT_BUCKET"] +# EVENT_OBJECKT_KEY = os.environ["EVENT_OBJECKT_KEY"] + +# DESTINATION_BUCKET = os.environ["DESTINATION_BUCKET"] +# DESTINATION_OBJECKT_DIR = os.environ["DESTINATION_OBJECKT_DIR"] + +# s3 = boto3.resource("s3") + +# destination_object_key = path.join( +# DESTINATION_OBJECKT_DIR, path.basename(EVENT_OBJECKT_KEY) +# ) +# event_object = s3.Object(DESTINATION_BUCKET, destination_object_key) +# event_object.copy({"Bucket": EVENT_BUCKET, "Key": EVENT_OBJECKT_KEY}) + +print('呼び出し成功') \ No newline at end of file diff --git a/ecs/Dockerfile/requirements.txt b/ecs/Dockerfile/requirements.txt new file mode 100644 index 00000000..1db657b6 --- /dev/null +++ b/ecs/Dockerfile/requirements.txt @@ -0,0 +1 @@ +boto3 \ No newline at end of file From 12b06d0574b06a8f1e291c1d206c4525a46813e9 Mon Sep 17 00:00:00 2001 From: *lcOeIaePm0 Date: Thu, 14 Oct 2021 14:23:32 +0900 Subject: [PATCH 2/7] =?UTF-8?q?feat:python=E3=83=95=E3=82=A1=E3=82=A4?= =?UTF-8?q?=E3=83=AB=E3=82=92=E3=81=BE=E3=81=A8=E3=82=81=E3=82=8B=E3=81=9F?= =?UTF-8?q?=E3=82=81=E3=80=81dataimport=E3=83=95=E3=82=A9=E3=83=AB?= =?UTF-8?q?=E3=83=80=E3=82=92=E8=BF=BD=E5=8A=A0=E3=81=97=E3=81=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/Dockerfile/Dockerfile | 11 +++++++++-- ecs/Dockerfile/{ => dataimport}/main.py | 0 2 files changed, 9 insertions(+), 2 deletions(-) rename ecs/Dockerfile/{ => dataimport}/main.py (100%) diff --git a/ecs/Dockerfile/Dockerfile b/ecs/Dockerfile/Dockerfile index 6934fec5..87ebe333 100644 --- a/ecs/Dockerfile/Dockerfile +++ b/ecs/Dockerfile/Dockerfile @@ -5,6 +5,13 @@ ENV TZ="Asia/Tokyo" WORKDIR /usr/src/app COPY requirements.txt ./ RUN pip install --no-cache-dir -r requirements.txt -COPY main.py ./ +COPY dataimport ./ +# COPY controller.py ./ +# COPY ini.py ./ +# COPY chk.py ./ +# COPY main.py ./ +# COPY end.py ./ +# COPY warning.py ./ +# COPY error.py ./ -CMD [ "python", "./main.py" ] \ No newline at end of file +CMD [ "python", "./dataimport/controller.py" ] \ No newline at end of file diff --git a/ecs/Dockerfile/main.py b/ecs/Dockerfile/dataimport/main.py similarity index 100% rename from ecs/Dockerfile/main.py rename to ecs/Dockerfile/dataimport/main.py From b0c6683fbbefb0bfe13568d42deb77db5a50ae0a Mon Sep 17 00:00:00 2001 From: *lcOeIaePm0 Date: Wed, 20 Oct 2021 16:16:58 +0900 Subject: [PATCH 3/7] =?UTF-8?q?feat:ECS=E3=82=B3=E3=83=B3=E3=83=86?= =?UTF-8?q?=E3=83=8A=E3=81=A7=E7=A8=BC=E5=83=8D=E3=81=99=E3=82=8B=E3=82=A2?= =?UTF-8?q?=E3=83=97=E3=83=AA=E3=82=B1=E3=83=BC=E3=82=B7=E3=83=A7=E3=83=B3?= =?UTF-8?q?=E3=83=95=E3=82=A1=E3=82=A4=E3=83=AB=E3=82=92=E8=BF=BD=E5=8A=A0?= =?UTF-8?q?=E3=81=97=E3=81=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/Dockerfile/dataimport/chk.py | 86 ++++++++++ ecs/Dockerfile/dataimport/controller.py | 63 ++++++++ ecs/Dockerfile/dataimport/end.py | 76 +++++++++ ecs/Dockerfile/dataimport/error.py | 57 +++++++ ecs/Dockerfile/dataimport/ini.py | 142 +++++++++++++++++ ecs/Dockerfile/dataimport/main.py | 202 ++++++++++++++++++++++-- ecs/Dockerfile/requirements.txt | 3 +- 7 files changed, 615 insertions(+), 14 deletions(-) create mode 100644 ecs/Dockerfile/dataimport/chk.py create mode 100644 ecs/Dockerfile/dataimport/controller.py create mode 100644 ecs/Dockerfile/dataimport/end.py create mode 100644 ecs/Dockerfile/dataimport/error.py create mode 100644 ecs/Dockerfile/dataimport/ini.py diff --git a/ecs/Dockerfile/dataimport/chk.py b/ecs/Dockerfile/dataimport/chk.py new file mode 100644 index 00000000..f6f795bb --- /dev/null +++ b/ecs/Dockerfile/dataimport/chk.py @@ -0,0 +1,86 @@ +from datetime import datetime +import boto3 +import io +import csv +from error import error + +s3_resource = boto3.resource('s3') + +LOG_LEVEL = {'i': 'Info', 'e': 'Error'} +SETTINGS_ITEM = { + 'dataSource': 0, + 'delimiter': 1, + 'charCode': 2, + 'quotechar': 3, + 'lineFeedCode': 4, + 'headerFlag': 5, + 'csvNumItems': 6, + 'csvNameItems': 7, + 'dbColumuName': 8, + 'storageSchemaName': 9, + 'loadSchemaName': 10, + 'exSqlFileName': 11, +} + + +class CheckError(Exception): + pass + + +def check(bucket_name, target_key, target_data_source, target_file_name, settings_key, log_info): + """チェック処理 + Args: + bucket_name : バケット名 + target_key : 投入データのフルパス + target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 + target_file_name : 投入データのファイル名 + settings_key : 投入データに該当する個別設定ファイルのフルパス + log_info : ログに記載するデータソース名とファイル名 + Raises: + CheckError : チェックでエラーがあった場合に発生する例外 + """ + # ① 開始ログの出力 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します') + + try: + # データ読込 + settings_obj = s3_resource.Object(bucket_name, settings_key) + settings_response = settings_obj.get() + settings_list = [] + for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'): + settings_list.append(line.rstrip()) + + target_obj = s3_resource.Object(bucket_name, target_key) + target_response = target_obj.get() + target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=settings_list[SETTINGS_ITEM["lineFeedCode"]]) + for line in csv.reader(target_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): + target_header_list = line.rstrip() + break + + # ② C-1 項目数チェック + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-1のチェックを開始します') + target_header_list_len = len(target_header_list) + if not target_header_list_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]): + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - C-1:正常終了') + else: + raise CheckError('E-CHK-01 - 項目数が一致しません') + + # ③ C-2 項目並び順チェック + if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - C-2のチェックを開始します') + settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip().split(',') + for i in range(len(settings_header_list)): + if not settings_header_list[i] == target_header_list[i]: + raise CheckError('E-CHK-02 - 項目順序が一致しません') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-2:正常終了') + + except CheckError as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} {e}') + error(bucket_name, target_data_source, target_file_name) + + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name) + + # ④ 終了ログの出力 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - チェック処理を終了します') diff --git a/ecs/Dockerfile/dataimport/controller.py b/ecs/Dockerfile/dataimport/controller.py new file mode 100644 index 00000000..01bd22cd --- /dev/null +++ b/ecs/Dockerfile/dataimport/controller.py @@ -0,0 +1,63 @@ +import os +from datetime import datetime +import boto3 +import sys + + +from ini import init +from chk import check +from end import end + +# 引数 +BUCKET_NAME = os.environ["BUCKET_NAME"] +TARGET_KEY = os.environ["TARGET_KEY"] +DATA_SOURCE_NAME = os.environ["DATA_SOURCE_NAME"] +FILE_NAME = os.environ["FILE_NAME"] + +# 環境変数 +DB_HOST = os.environ["DB_HOST"] +DB_NAME = os.environ["DB_NAME"] +DB_PASS = os.environ["DB_PASS"] +DB_USER = os.environ["DB_USER"] +DB_INFO = {"host": DB_HOST, "name": DB_NAME, "pass": DB_PASS, "user": DB_USER} + +s3_client = boto3.client('s3') +s3_resource = boto3.resource('s3') + +LOG_LEVEL = {"i": 'Info', "e": 'Error'} +LOG_INFO = f' {DATA_SOURCE_NAME} {FILE_NAME} ' +DIRECTORY_TARGET = '/target/' +DIRECTORY_WORK = '/work/' +DIRECTORY_DONE = '/done/' +DIRECTORY_WARNING = '/warning/' + + +def controller(): + """コントロール処理 + + 各処理を呼び出すコントローラー + + """ + + # ① 開始ログの出力 + print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します') + + # ② 初期処理 + print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し') + settings_key = init(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO) + + # ③ チェック処理 + print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し') + check(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO) + + # ④ メイン処理 + print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し') + warning_info = '' + # warning_info = main(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO) + + # ⑤ 終了処理 + print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し') + end(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, warning_info, LOG_INFO) + + # ⑥ 終了ログの出力 + print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します') diff --git a/ecs/Dockerfile/dataimport/end.py b/ecs/Dockerfile/dataimport/end.py new file mode 100644 index 00000000..907a86cd --- /dev/null +++ b/ecs/Dockerfile/dataimport/end.py @@ -0,0 +1,76 @@ +from datetime import datetime +import boto3 +from error import error + +s3_client = boto3.client('s3') +s3_resource = boto3.resource('s3') + +LOG_LEVEL = {'i': 'Info', 'e': 'Error'} +DIRECTORY_TARGET = '/target/' +DIRECTORY_WORK = '/work/' +DIRECTORY_DONE = '/done/' +DIRECTORY_WARNING = '/warning/' + + +def end(bucket_name, target_data_source, target_file_name, warning_info, log_info): + """終了処理 + Args: + bucket_name : バケット名 + target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 + target_file_name : 投入データのファイル名 + warning_info : Warning情報 + log_info : ログに記載するデータソース名とファイル名 + """ + # ① 開始ログの出力 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-01 - 終了処理を開始します') + + try: + # ② 投入データファイルをdoneディレクトリに移動 + work_key = target_data_source + DIRECTORY_WORK + target_file_name + work_obj = s3_resource.Object(bucket_name, work_key) + work_response = work_obj.get() + work_body = work_response["Body"].read() + done_file_name = str(datetime.now()) + '_' + target_file_name + done_key = target_data_source + DIRECTORY_DONE + done_file_name + done_obj = s3_resource.Object(bucket_name, done_key) + done_obj.put(Body=work_body) + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-02 - workディレクトリの {target_file_name} をdoneディレクトリに移動しました 移動後ファイル名:{done_file_name}') + + # ③ doingファイルの削除 + doing_file_name = target_file_name + '.doing' + doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name + s3_client.delete_object(Bucket=bucket_name, Key=doing_key) + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-03 - targetディレクトリの {doing_file_name} を削除しました') + + # ④ Warning情報の存在確認 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-04 - Warning情報の存在チェック') + if warning_info: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-05 - Warning情報は存在しました') + + # warningファイルの作成 + warning_file_name = str(datetime.now()) + '_' + target_file_name + '_war.log' + warning_key = target_data_source + DIRECTORY_WARNING + warning_file_name + warning_obj = s3_resource.Object(bucket_name, warning_key) + warning_obj.put(Body=warning_info) + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-06 - warningディレクトリに {warning_file_name} を作成しました') + + # warning処理結果ファイルの作成 + result_warning_file_name = target_file_name + '.warning' + result_warning_key = target_data_source + DIRECTORY_TARGET + result_warning_file_name + result_warning_obj = s3_resource.Object(bucket_name, result_warning_key) + result_warning_obj.put(Body='') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-07 - targetディレクトリに {result_warning_file_name} を作成しました') + else: + # done処理結果ファイルの作成 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-08 - Warning情報は存在しませんでした') + result_done_file_name = target_file_name + '.done' + result_done_key = target_data_source + DIRECTORY_TARGET + result_done_file_name + result_done_obj = s3_resource.Object(bucket_name, result_done_key) + result_done_obj.put(Body='') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-09 - targetディレクトリに {result_done_file_name} を作成しました') + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-END-99 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name) + + # ⑤ 終了ログの出力 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-0 - 終了処理を終了します') diff --git a/ecs/Dockerfile/dataimport/error.py b/ecs/Dockerfile/dataimport/error.py new file mode 100644 index 00000000..e8de6e4f --- /dev/null +++ b/ecs/Dockerfile/dataimport/error.py @@ -0,0 +1,57 @@ +from datetime import datetime +import boto3 +import sys + +s3_client = boto3.client('s3') +s3_resource = boto3.resource('s3') + +LOG_LEVEL = {'i': 'Info', 'e': 'Error'} +DIRECTORY_TARGET = '/target/' +DIRECTORY_WORK = '/work/' +DIRECTORY_DONE = '/done/' + + +def error(bucket_name, target_data_source, target_file_name, log_info): + """エラー処理 + Args: + bucket_name : バケット名 + target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 + target_file_name : 投入データのファイル名 + log_info : ログに記載するデータソース名とファイル名 + """ + + # ① 開始ログの出力 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-01 - エラー処理を開始します') + + try: + # ② 投入データファイルをerrorディレクトリに移動 + work_key = target_data_source + DIRECTORY_WORK + target_file_name + work_obj = s3_resource.Object(bucket_name, work_key) + work_response = work_obj.get() + work_body = work_response["Body"].read() + error_file_name = str(datetime.now()) + '_' + target_file_name + error_key = target_data_source + DIRECTORY_DONE + error_file_name + error_obj = s3_resource.Object(bucket_name, error_key) + error_obj.put(Body=work_body) + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-02 - workディレクトリの {target_file_name} をerrorディレクトリに移動しました 移動後ファイル名:{error_file_name}') + + # ③ doingファイルの削除 + doing_file_name = target_file_name + '.doing' + doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name + s3_client.delete_object(Bucket=bucket_name, Key=doing_key) + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-03 - targetディレクトリの {doing_file_name} を削除しました') + + # ④ error処理結果ファイルの作成 + result_error_file_name = target_file_name + '.error' + result_error_key = target_data_source + DIRECTORY_TARGET + result_error_file_name + result_error_obj = s3_resource.Object(bucket_name, result_error_key) + result_error_obj.put(Body='') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-04 - targetディレクトリに {result_error_file_name} を作成しました') + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-ERR-99 - エラー内容:{e}') + finally: + # ⑤ 終了ログの出力 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-05 - エラー処理を終了します') + + # ⑥ 処理終了 + sys.exit() diff --git a/ecs/Dockerfile/dataimport/ini.py b/ecs/Dockerfile/dataimport/ini.py new file mode 100644 index 00000000..fbddb372 --- /dev/null +++ b/ecs/Dockerfile/dataimport/ini.py @@ -0,0 +1,142 @@ +from datetime import datetime +import boto3 +import io +import csv +import re +from error import error + +ecs_client = boto3.client('ecs') +s3_client = boto3.client('s3') +s3_resource = boto3.resource('s3') + +LOG_LEVEL = {"i": 'Info', "e": 'Error'} +MAPPING_FILE_NAME = 'configmap.config' +DIRECTORY_TARGET = '/target/' +DIRECTORY_WORK = '/work/' +DIRECTORY_SETTINGS = '/settings/' + + +def init(bucket_name, target_key, target_data_source, target_file_name, log_info): + """初期処理 + Args: + bucket_name : バケット名 + target_key : 投入データのフルパス + target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 + target_file_name : 投入データのファイル名 + log_info : ログに記載するデータソース名とファイル名 + Returns: + settings_key : 投入データに該当する個別設定ファイルのフルパス + """ + + # ① 開始ログの出力 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-01 - 初期処理を開始します') + + # doingファイル情報作成 + doing_file_name = target_file_name + '.doing' + doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name + + # ② doingファイルの存在確認 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-02 - doingファイル:{doing_file_name} の存在チェック') + try: + s3_client.head_object(Bucket=bucket_name, Key=doing_key) + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-01 - 投入データ {target_file_name} は既に処理中です') + error(bucket_name, target_data_source, target_file_name) + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-03 - doingファイルは存在しませんでした') + + # ③ doingファイルの作成 + try: + doing_obj = s3_resource.Object(bucket_name, doing_key) + doing_obj.put(Body='') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-04 - targetディレクトリに {doing_file_name} を作成しました') + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name) + + # ④ 投入データファイルをworkディレクトリに移動 + try: + target_obj = s3_resource.Object(bucket_name, target_key) + target_response = target_obj.get() + work_key = target_data_source + DIRECTORY_WORK + target_file_name + work_body = target_response["Body"].read() + work_obj = s3_resource.Object(bucket_name, work_key) + work_obj.put(Body=work_body) + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-05 - 投入データ {target_file_name} をworkディレクトリに移動しました') + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name) + + # ⑤ 処理結果ファイルの削除 + # doneファイルの存在確認 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-06 - doneファイル:{target_file_name}.done の存在チェック') + result_done_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.done' + try: + s3_client.head_object(Bucket=bucket_name, Key=result_done_key) + s3_client.delete_object(Bucket=bucket_name, Key=result_done_key) + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-07 - doneファイルが存在したため削除しました') + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-08 - doneファイルは存在しませんでした') + + # warningファイルの存在確認 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-09 - warningファイル:{target_file_name}.warning の存在チェック') + result_warning_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.warning' + try: + s3_client.head_object(Bucket=bucket_name, Key=result_warning_key) + s3_client.delete_object(Bucket=bucket_name, Key=result_warning_key) + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-10 - warningファイルが存在したため削除しました') + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-11 - warningファイルは存在しませんでした') + + # errorファイルの存在確認 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-12 - errorファイル:{target_file_name}.error の存在チェック') + result_error_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.error' + try: + s3_client.head_object(Bucket=bucket_name, Key=result_error_key) + s3_client.delete_object(Bucket=bucket_name, Key=result_error_key) + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-13 - errorファイルが存在したため削除しました') + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-14 - errorファイルは存在しませんでした') + + # ⑥ 個別設定マッピングリストの存在確認 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-15 - 個別設定マッピングリスト:{MAPPING_FILE_NAME} の存在チェック') + mapping_key = target_data_source + DIRECTORY_SETTINGS + MAPPING_FILE_NAME + try: + s3_client.head_object(Bucket=bucket_name, Key=mapping_key) + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-16 - 個別設定マッピングリストの存在を確認しました') + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-02 - 個別設定マッピングリストが存在しません') + error(bucket_name, target_data_source, target_file_name) + + # ⑦ 個別設定ファイルの特定 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-17 - 個別設定ファイルを検索します') + try: + mapping_obj = s3_resource.Object(bucket_name, mapping_key) + mapping_response = mapping_obj.get() + mapping_body = io.TextIOWrapper(io.BytesIO(mapping_response["Body"].read()), encoding='utf-8') + for row in csv.reader(mapping_body, delimiter='\t'): + match_result = re.fullmatch(row[0], target_file_name) + if match_result is not None: + settings_file_name = row[1].rstrip() + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-18 - 個別設定ファイル:{settings_file_name} を特定しました') + break + + if settings_file_name is None: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-03 - 個別設定ファイルが特定出来ません') + error(bucket_name, target_data_source, target_file_name) + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name) + + # ⑧ 個別設定ファイルの存在確認 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-19 - 個別設定ファイル:{settings_file_name} の存在チェック') + settings_key = target_data_source + DIRECTORY_SETTINGS + settings_file_name + try: + s3_client.head_object(Bucket=bucket_name, Key=settings_key) + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-20 - 個別設定ファイルの存在を確認しました') + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-04 - 個別設定ファイルが存在しません') + error(bucket_name, target_data_source, target_file_name) + + # ⑨ 終了ログの出力 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-21 - 初期処理を終了します') + return settings_key diff --git a/ecs/Dockerfile/dataimport/main.py b/ecs/Dockerfile/dataimport/main.py index 5b66d418..5b270062 100644 --- a/ecs/Dockerfile/dataimport/main.py +++ b/ecs/Dockerfile/dataimport/main.py @@ -1,19 +1,195 @@ +from datetime import datetime import boto3 -import os -from os import path +import pymysql +import io +import csv +from error import error -# EVENT_BUCKET = os.environ["EVENT_BUCKET"] -# EVENT_OBJECKT_KEY = os.environ["EVENT_OBJECKT_KEY"] +ecs_client = boto3.client('ecs') +s3_client = boto3.client('s3') +s3_resource = boto3.resource('s3') -# DESTINATION_BUCKET = os.environ["DESTINATION_BUCKET"] -# DESTINATION_OBJECKT_DIR = os.environ["DESTINATION_OBJECKT_DIR"] +LOG_LEVEL = {"i": 'Info', "e": 'Error', "w": 'Warning'} +SETTINGS_ITEM = { + 'dataSource': 0, + 'delimiter': 1, + 'charCode': 2, + 'quotechar': 3, + 'lineFeedCode': 4, + 'headerFlag': 5, + 'csvNumItems': 6, + 'csvNameItems': 7, + 'dbColumuName': 8, + 'storageSchemaName': 9, + 'loadSchemaName': 10, + 'exSqlFileName': 11, +} +DIRECTORY_SETTINGS = '/settings/' -# s3 = boto3.resource("s3") -# destination_object_key = path.join( -# DESTINATION_OBJECKT_DIR, path.basename(EVENT_OBJECKT_KEY) -# ) -# event_object = s3.Object(DESTINATION_BUCKET, destination_object_key) -# event_object.copy({"Bucket": EVENT_BUCKET, "Key": EVENT_OBJECKT_KEY}) +def init(bucket_name, target_key, target_data_source, target_file_name, settings_key, db_info, log_info): + """メイン処理 + Args: + bucket_name : バケット名 + target_key : 投入データのフルパス + target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 + target_file_name : 投入データのファイル名 + settings_key : 投入データに該当する個別設定ファイルのフルパス + db_info : データベース情報 + log_info : ログに記載するデータソース名とファイル名 + Returns: + warning_info : Warning情報 + """ -print('呼び出し成功') \ No newline at end of file + # ① メイン処理開始ログを出力する + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します') + + # ② DB接続を開始する + try: + conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5) + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name) + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました') + + # ③ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする + try: + # 個別設定ファイルの読込 + settings_obj = s3_resource.Object(bucket_name, settings_key) + settings_response = settings_obj.get() + settings_list = [] + for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'): + settings_list.append(line.rstrip()) + + # TRUNCATE実行 + with conn.cursor() as cur: + sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}' + cur.execute(sql_truncate) + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました') + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name) + + # ④ 投入データファイルを1行ごとにループする + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - 投入データ {target_file_name} の読み込みを開始します') + try: + target_obj = s3_resource.Object(bucket_name, target_key) + target_response = target_obj.get() + target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=settings_list[SETTINGS_ITEM["lineFeedCode"]]) + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name) + + process_count = 0 # 処理件数カウンタ + normal_count = 0 # 正常終了件数カウンタ + warning_count = 0 # ワーニング終了件数カウンター + warning_info = '' # ワーニング情報 + index = 0 # ループインデックス + + for line in csv.reader(target_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): + try: + if settings_list[SETTINGS_ITEM["headerFlag"]] and index == 0: + index += 1 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - ヘッダー行をスキップします') + continue + + # 処理件数カウント + process_count += 1 + + # SQL文生成 + sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} VALUES (' + for i in range(len(line)): + sql = f'{sql} "{line[i]}",' + + sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名 + sql = f'{sql} "{index}",' # システム項目:取込ファイル行番号 + sql = f'{sql} "0",' # システム項目:論理削除フラグ + sql = f'{sql} CURRENT_USER(),' # システム項目:登録者 + sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時 + sql = f'{sql} NULL,' # システム項目:更新者 + sql = f'{sql} NULL)' # システム項目:更新日時 + + # ロードスキーマのトランザクション開始 + with conn.cursor() as cur: + cur.execute(sql) + conn.commit() + normal_count += 1 + + index += 1 + except Exception as e: + warning_info = f'{warning_info} {index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n' + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') + + # ⑤ ④の処理結果件数をログ出力する + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - 投入データ件数:{process_count} 正常終了件数:{normal_count} Warning終了件数:{warning_count}') + + # ⑥ ロードスキーマのデータを蓄積スキーマにUPSERTする + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') + try: + # SQL文生成 + sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]}' + sql = f'{sql} SELECT t.*' + sql = f'{sql} FROM {settings_list[SETTINGS_ITEM["loadSchemaName"]]} as t' + sql = f'{sql} ON DUPLICATE KEY UPDATE' + settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip().split(',') + for i in range(len(settings_db_columu_list)): + sql = f'{sql} {settings_db_columu_list[i]}=t.{settings_db_columu_list[i]},' + sql = f'{sql} file_name=t.file_name,' # システム項目:取込ファイル名 + sql = f'{sql} file_row_cnt=t.file_row_cnt,' # システム項目:取込ファイル行番号 + sql = f'{sql} ins_user=t.ins_user,' # システム項目:登録者 + sql = f'{sql} ins_date=t.ins_date' # システム項目:登録日時 + + # トランザクション開始 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') + with conn.cursor() as cur: + cur.execute(sql) + conn.commit() + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name) + + # ⑦ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 拡張SQL設定が存在するかチェックします') + if settings_list[SETTINGS_ITEM["exSqlFileName"]]: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定の存在を確認しました') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQLファイル名:{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック') + ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]] + try: + s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key) + ex_sql_file_exists = True + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名の存在を確認しました') + except Exception as e: + warning_info = f'{warning_info} - 拡張SQLファイルが存在しません\n' + ex_sql_file_exists = False + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - 拡張SQLファイルが存在しません') + + try: + if ex_sql_file_exists: + # 拡張SQLファイルからSQL文生成 + ex_sqls_obj = s3_resource.Object(bucket_name, ex_sql_key) + ex_sql_response = ex_sqls_obj.get() + ex_sql = '' + for line in io.TextIOWrapper(io.BytesIO(ex_sql_response["Body"].read()), encoding='utf-8'): + ex_sql = f'{ex_sql} {line.rstrip()}' + + # トランザクション開始 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') + with conn.cursor() as cur: + cur.execute(ex_sql) + conn.commit() + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') + except Exception as e: + warning_info = f'{warning_info} - 拡張SQLにエラーが発生しました:{e}\n' + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLにエラーが発生しました:{e}') + else: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL設定の存在はありませんでした') + + # ⑧ DB接続を終了する + conn.close() + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - DB接続を終了しました') + + # ⑨ 終了ログの出力 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - メイン処理を終了します') + + return warning_info diff --git a/ecs/Dockerfile/requirements.txt b/ecs/Dockerfile/requirements.txt index 1db657b6..7253f919 100644 --- a/ecs/Dockerfile/requirements.txt +++ b/ecs/Dockerfile/requirements.txt @@ -1 +1,2 @@ -boto3 \ No newline at end of file +boto3 +PyMySQL \ No newline at end of file From 25bb81f460b8a242243ac1b16faa8c5a8fab9e66 Mon Sep 17 00:00:00 2001 From: *lcOeIaePm0 Date: Thu, 21 Oct 2021 17:02:49 +0900 Subject: [PATCH 4/7] =?UTF-8?q?fix:=E5=8B=95=E4=BD=9C=E7=A2=BA=E8=AA=8D?= =?UTF-8?q?=E6=99=82=E3=81=AB=E7=99=BA=E7=94=9F=E3=81=97=E3=81=9F=E3=82=A8?= =?UTF-8?q?=E3=83=A9=E3=83=BC=E3=82=92=E8=A7=A3=E6=B6=88=E3=81=97=E3=81=9F?= =?UTF-8?q?=E3=81=9F=E3=82=81=E3=82=B3=E3=83=9F=E3=83=83=E3=83=88=E3=81=99?= =?UTF-8?q?=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/Dockerfile/Dockerfile | 9 +--- ecs/Dockerfile/dataimport/chk.py | 26 ++++++++---- ecs/Dockerfile/dataimport/controller.py | 56 ++++++++++--------------- ecs/Dockerfile/dataimport/end.py | 11 +++-- ecs/Dockerfile/dataimport/error.py | 13 +++--- ecs/Dockerfile/dataimport/ini.py | 26 ++++++------ ecs/Dockerfile/dataimport/main.py | 26 +++++++----- 7 files changed, 86 insertions(+), 81 deletions(-) diff --git a/ecs/Dockerfile/Dockerfile b/ecs/Dockerfile/Dockerfile index 87ebe333..ec5fb0e0 100644 --- a/ecs/Dockerfile/Dockerfile +++ b/ecs/Dockerfile/Dockerfile @@ -6,12 +6,5 @@ WORKDIR /usr/src/app COPY requirements.txt ./ RUN pip install --no-cache-dir -r requirements.txt COPY dataimport ./ -# COPY controller.py ./ -# COPY ini.py ./ -# COPY chk.py ./ -# COPY main.py ./ -# COPY end.py ./ -# COPY warning.py ./ -# COPY error.py ./ -CMD [ "python", "./dataimport/controller.py" ] \ No newline at end of file +CMD [ "python", "./controller.py" ] diff --git a/ecs/Dockerfile/dataimport/chk.py b/ecs/Dockerfile/dataimport/chk.py index f6f795bb..6c043e76 100644 --- a/ecs/Dockerfile/dataimport/chk.py +++ b/ecs/Dockerfile/dataimport/chk.py @@ -4,8 +4,7 @@ import io import csv from error import error -s3_resource = boto3.resource('s3') - +# 定数 LOG_LEVEL = {'i': 'Info', 'e': 'Error'} SETTINGS_ITEM = { 'dataSource': 0, @@ -21,8 +20,17 @@ SETTINGS_ITEM = { 'loadSchemaName': 10, 'exSqlFileName': 11, } +LINE_FEED_CODE = { + 'CR': '\r', + 'LF': '\n', + 'CRLF': '\r\n', +} + +# クラス変数 +s3_resource = boto3.resource('s3') +# チェック例外クラス class CheckError(Exception): pass @@ -52,18 +60,18 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting target_obj = s3_resource.Object(bucket_name, target_key) target_response = target_obj.get() - target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=settings_list[SETTINGS_ITEM["lineFeedCode"]]) + target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) for line in csv.reader(target_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): - target_header_list = line.rstrip() + target_header_list = line break # ② C-1 項目数チェック print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-1のチェックを開始します') target_header_list_len = len(target_header_list) - if not target_header_list_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]): + if target_header_list_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]): print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - C-1:正常終了') else: - raise CheckError('E-CHK-01 - 項目数が一致しません') + raise CheckError(f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{target_header_list_len}') # ③ C-2 項目並び順チェック if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True: @@ -71,16 +79,16 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip().split(',') for i in range(len(settings_header_list)): if not settings_header_list[i] == target_header_list[i]: - raise CheckError('E-CHK-02 - 項目順序が一致しません') + raise CheckError(f'E-CHK-02 - 項目順序が一致しません {i}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{target_header_list[i]}') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-2:正常終了') except CheckError as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} {e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) # ④ 終了ログの出力 print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - チェック処理を終了します') diff --git a/ecs/Dockerfile/dataimport/controller.py b/ecs/Dockerfile/dataimport/controller.py index 01bd22cd..e4b0d637 100644 --- a/ecs/Dockerfile/dataimport/controller.py +++ b/ecs/Dockerfile/dataimport/controller.py @@ -1,11 +1,8 @@ import os from datetime import datetime -import boto3 -import sys - - from ini import init from chk import check +from main import main from end import end # 引数 @@ -21,43 +18,36 @@ DB_PASS = os.environ["DB_PASS"] DB_USER = os.environ["DB_USER"] DB_INFO = {"host": DB_HOST, "name": DB_NAME, "pass": DB_PASS, "user": DB_USER} -s3_client = boto3.client('s3') -s3_resource = boto3.resource('s3') - -LOG_LEVEL = {"i": 'Info', "e": 'Error'} +# 定数 +LOG_LEVEL = {"i": 'Info'} LOG_INFO = f' {DATA_SOURCE_NAME} {FILE_NAME} ' -DIRECTORY_TARGET = '/target/' -DIRECTORY_WORK = '/work/' -DIRECTORY_DONE = '/done/' -DIRECTORY_WARNING = '/warning/' -def controller(): - """コントロール処理 +"""コントロール処理 - 各処理を呼び出すコントローラー +各処理を呼び出すコントローラー - """ +""" - # ① 開始ログの出力 - print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します') - # ② 初期処理 - print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し') - settings_key = init(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO) +# ① 開始ログの出力 +print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します') - # ③ チェック処理 - print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し') - check(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO) +# ② 初期処理 +print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し') +settings_key = init(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO) - # ④ メイン処理 - print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し') - warning_info = '' - # warning_info = main(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO) +# ③ チェック処理 +print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し') +check(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO) - # ⑤ 終了処理 - print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し') - end(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, warning_info, LOG_INFO) +# ④ メイン処理 +print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し') +warning_info = main(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO) - # ⑥ 終了ログの出力 - print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します') +# ⑤ 終了処理 +print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し') +end(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, warning_info, LOG_INFO) + +# ⑥ 終了ログの出力 +print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します') diff --git a/ecs/Dockerfile/dataimport/end.py b/ecs/Dockerfile/dataimport/end.py index 907a86cd..06bad863 100644 --- a/ecs/Dockerfile/dataimport/end.py +++ b/ecs/Dockerfile/dataimport/end.py @@ -2,15 +2,17 @@ from datetime import datetime import boto3 from error import error -s3_client = boto3.client('s3') -s3_resource = boto3.resource('s3') - +# 定数 LOG_LEVEL = {'i': 'Info', 'e': 'Error'} DIRECTORY_TARGET = '/target/' DIRECTORY_WORK = '/work/' DIRECTORY_DONE = '/done/' DIRECTORY_WARNING = '/warning/' +# クラス変数 +s3_client = boto3.client('s3') +s3_resource = boto3.resource('s3') + def end(bucket_name, target_data_source, target_file_name, warning_info, log_info): """終了処理 @@ -34,6 +36,7 @@ def end(bucket_name, target_data_source, target_file_name, warning_info, log_inf done_key = target_data_source + DIRECTORY_DONE + done_file_name done_obj = s3_resource.Object(bucket_name, done_key) done_obj.put(Body=work_body) + s3_client.delete_object(Bucket=bucket_name, Key=work_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-02 - workディレクトリの {target_file_name} をdoneディレクトリに移動しました 移動後ファイル名:{done_file_name}') # ③ doingファイルの削除 @@ -70,7 +73,7 @@ def end(bucket_name, target_data_source, target_file_name, warning_info, log_inf print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-09 - targetディレクトリに {result_done_file_name} を作成しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-END-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) # ⑤ 終了ログの出力 print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-0 - 終了処理を終了します') diff --git a/ecs/Dockerfile/dataimport/error.py b/ecs/Dockerfile/dataimport/error.py index e8de6e4f..d8d17c27 100644 --- a/ecs/Dockerfile/dataimport/error.py +++ b/ecs/Dockerfile/dataimport/error.py @@ -2,13 +2,15 @@ from datetime import datetime import boto3 import sys -s3_client = boto3.client('s3') -s3_resource = boto3.resource('s3') - +# 定数 LOG_LEVEL = {'i': 'Info', 'e': 'Error'} DIRECTORY_TARGET = '/target/' DIRECTORY_WORK = '/work/' -DIRECTORY_DONE = '/done/' +DIRECTORY_ERROR = '/error/' + +# クラス変数 +s3_client = boto3.client('s3') +s3_resource = boto3.resource('s3') def error(bucket_name, target_data_source, target_file_name, log_info): @@ -30,9 +32,10 @@ def error(bucket_name, target_data_source, target_file_name, log_info): work_response = work_obj.get() work_body = work_response["Body"].read() error_file_name = str(datetime.now()) + '_' + target_file_name - error_key = target_data_source + DIRECTORY_DONE + error_file_name + error_key = target_data_source + DIRECTORY_ERROR + error_file_name error_obj = s3_resource.Object(bucket_name, error_key) error_obj.put(Body=work_body) + s3_client.delete_object(Bucket=bucket_name, Key=work_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-02 - workディレクトリの {target_file_name} をerrorディレクトリに移動しました 移動後ファイル名:{error_file_name}') # ③ doingファイルの削除 diff --git a/ecs/Dockerfile/dataimport/ini.py b/ecs/Dockerfile/dataimport/ini.py index fbddb372..d3720db5 100644 --- a/ecs/Dockerfile/dataimport/ini.py +++ b/ecs/Dockerfile/dataimport/ini.py @@ -5,16 +5,17 @@ import csv import re from error import error -ecs_client = boto3.client('ecs') -s3_client = boto3.client('s3') -s3_resource = boto3.resource('s3') - +# 定数 LOG_LEVEL = {"i": 'Info', "e": 'Error'} MAPPING_FILE_NAME = 'configmap.config' DIRECTORY_TARGET = '/target/' DIRECTORY_WORK = '/work/' DIRECTORY_SETTINGS = '/settings/' +# クラス変数 +s3_client = boto3.client('s3') +s3_resource = boto3.resource('s3') + def init(bucket_name, target_key, target_data_source, target_file_name, log_info): """初期処理 @@ -40,7 +41,7 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info try: s3_client.head_object(Bucket=bucket_name, Key=doing_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-01 - 投入データ {target_file_name} は既に処理中です') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-03 - doingファイルは存在しませんでした') @@ -51,7 +52,7 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-04 - targetディレクトリに {doing_file_name} を作成しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) # ④ 投入データファイルをworkディレクトリに移動 try: @@ -64,7 +65,7 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-05 - 投入データ {target_file_name} をworkディレクトリに移動しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) # ⑤ 処理結果ファイルの削除 # doneファイルの存在確認 @@ -105,7 +106,7 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-16 - 個別設定マッピングリストの存在を確認しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-02 - 個別設定マッピングリストが存在しません') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) # ⑦ 個別設定ファイルの特定 print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-17 - 個別設定ファイルを検索します') @@ -113,6 +114,7 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info mapping_obj = s3_resource.Object(bucket_name, mapping_key) mapping_response = mapping_obj.get() mapping_body = io.TextIOWrapper(io.BytesIO(mapping_response["Body"].read()), encoding='utf-8') + settings_file_name = '' for row in csv.reader(mapping_body, delimiter='\t'): match_result = re.fullmatch(row[0], target_file_name) if match_result is not None: @@ -120,12 +122,12 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-18 - 個別設定ファイル:{settings_file_name} を特定しました') break - if settings_file_name is None: + if not settings_file_name: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-03 - 個別設定ファイルが特定出来ません') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) # ⑧ 個別設定ファイルの存在確認 print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-19 - 個別設定ファイル:{settings_file_name} の存在チェック') @@ -135,7 +137,7 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-20 - 個別設定ファイルの存在を確認しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-04 - 個別設定ファイルが存在しません') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) # ⑨ 終了ログの出力 print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-21 - 初期処理を終了します') diff --git a/ecs/Dockerfile/dataimport/main.py b/ecs/Dockerfile/dataimport/main.py index 5b270062..a2551d83 100644 --- a/ecs/Dockerfile/dataimport/main.py +++ b/ecs/Dockerfile/dataimport/main.py @@ -5,10 +5,7 @@ import io import csv from error import error -ecs_client = boto3.client('ecs') -s3_client = boto3.client('s3') -s3_resource = boto3.resource('s3') - +# 定数 LOG_LEVEL = {"i": 'Info', "e": 'Error', "w": 'Warning'} SETTINGS_ITEM = { 'dataSource': 0, @@ -24,10 +21,19 @@ SETTINGS_ITEM = { 'loadSchemaName': 10, 'exSqlFileName': 11, } +LINE_FEED_CODE = { + 'CR': '\r', + 'LF': '\n', + 'CRLF': '\r\n', +} DIRECTORY_SETTINGS = '/settings/' +# クラス変数 +s3_client = boto3.client('s3') +s3_resource = boto3.resource('s3') -def init(bucket_name, target_key, target_data_source, target_file_name, settings_key, db_info, log_info): + +def main(bucket_name, target_key, target_data_source, target_file_name, settings_key, db_info, log_info): """メイン処理 Args: bucket_name : バケット名 @@ -49,7 +55,7 @@ def init(bucket_name, target_key, target_data_source, target_file_name, settings conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5) except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました') # ③ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする @@ -68,17 +74,17 @@ def init(bucket_name, target_key, target_data_source, target_file_name, settings print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) # ④ 投入データファイルを1行ごとにループする print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - 投入データ {target_file_name} の読み込みを開始します') try: target_obj = s3_resource.Object(bucket_name, target_key) target_response = target_obj.get() - target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=settings_list[SETTINGS_ITEM["lineFeedCode"]]) + target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) process_count = 0 # 処理件数カウンタ normal_count = 0 # 正常終了件数カウンタ @@ -147,7 +153,7 @@ def init(bucket_name, target_key, target_data_source, target_file_name, settings print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name) + error(bucket_name, target_data_source, target_file_name, log_info) # ⑦ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 拡張SQL設定が存在するかチェックします') From 9ba49d2d89fb5846498f922a15ab4293ff915f3f Mon Sep 17 00:00:00 2001 From: *lcOeIaePm0 Date: Fri, 22 Oct 2021 13:06:20 +0900 Subject: [PATCH 5/7] =?UTF-8?q?refactor:=E3=82=B3=E3=83=A1=E3=83=B3?= =?UTF-8?q?=E3=83=88=E4=BD=8D=E7=BD=AE=E3=81=AE=E4=BF=AE=E6=AD=A3=E3=81=A8?= =?UTF-8?q?=E4=B8=8D=E8=A6=81try=E6=96=87=E3=82=92=E5=89=8A=E9=99=A4?= =?UTF-8?q?=E3=81=97=E3=81=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/Dockerfile/dataimport/chk.py | 8 ++-- ecs/Dockerfile/dataimport/controller.py | 12 ++--- ecs/Dockerfile/dataimport/end.py | 12 ++--- ecs/Dockerfile/dataimport/error.py | 12 ++--- ecs/Dockerfile/dataimport/ini.py | 58 +++++++++++------------ ecs/Dockerfile/dataimport/main.py | 61 +++++++++++++------------ 6 files changed, 82 insertions(+), 81 deletions(-) diff --git a/ecs/Dockerfile/dataimport/chk.py b/ecs/Dockerfile/dataimport/chk.py index 6c043e76..dd70d836 100644 --- a/ecs/Dockerfile/dataimport/chk.py +++ b/ecs/Dockerfile/dataimport/chk.py @@ -47,7 +47,7 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting Raises: CheckError : チェックでエラーがあった場合に発生する例外 """ - # ① 開始ログの出力 + # ① チェック処理開始ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します') try: @@ -65,7 +65,7 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting target_header_list = line break - # ② C-1 項目数チェック + # ② C-1の項目数チェックを開始する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-1のチェックを開始します') target_header_list_len = len(target_header_list) if target_header_list_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]): @@ -73,7 +73,7 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting else: raise CheckError(f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{target_header_list_len}') - # ③ C-2 項目並び順チェック + # ③ C-2の項目並び順チェック開始する if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - C-2のチェックを開始します') settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip().split(',') @@ -90,5 +90,5 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) - # ④ 終了ログの出力 + # ④ チェック処理終了ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - チェック処理を終了します') diff --git a/ecs/Dockerfile/dataimport/controller.py b/ecs/Dockerfile/dataimport/controller.py index e4b0d637..d04edf53 100644 --- a/ecs/Dockerfile/dataimport/controller.py +++ b/ecs/Dockerfile/dataimport/controller.py @@ -30,24 +30,24 @@ LOG_INFO = f' {DATA_SOURCE_NAME} {FILE_NAME} ' """ -# ① 開始ログの出力 +# ① データ取込処理開始ログを出力する print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します') -# ② 初期処理 +# ② 初期処理を呼び出す print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し') settings_key = init(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO) -# ③ チェック処理 +# ③ チェック処理を呼び出す print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し') check(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO) -# ④ メイン処理 +# ④ メイン処理を呼び出す print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し') warning_info = main(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO) -# ⑤ 終了処理 +# ⑤ 終了処理を呼び出す print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し') end(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, warning_info, LOG_INFO) -# ⑥ 終了ログの出力 +# ⑥ データ取込処理終了ログを出力する print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します') diff --git a/ecs/Dockerfile/dataimport/end.py b/ecs/Dockerfile/dataimport/end.py index 06bad863..3e7025ad 100644 --- a/ecs/Dockerfile/dataimport/end.py +++ b/ecs/Dockerfile/dataimport/end.py @@ -23,11 +23,11 @@ def end(bucket_name, target_data_source, target_file_name, warning_info, log_inf warning_info : Warning情報 log_info : ログに記載するデータソース名とファイル名 """ - # ① 開始ログの出力 + # ① 終了処理開始ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-01 - 終了処理を開始します') try: - # ② 投入データファイルをdoneディレクトリに移動 + # ② 投入データファイルをS3バケット内のworkディレクトリから、以下ファイル名でdoneディレクトリに移動(コピー削除)する work_key = target_data_source + DIRECTORY_WORK + target_file_name work_obj = s3_resource.Object(bucket_name, work_key) work_response = work_obj.get() @@ -39,13 +39,13 @@ def end(bucket_name, target_data_source, target_file_name, warning_info, log_inf s3_client.delete_object(Bucket=bucket_name, Key=work_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-02 - workディレクトリの {target_file_name} をdoneディレクトリに移動しました 移動後ファイル名:{done_file_name}') - # ③ doingファイルの削除 + # ③ S3バケット内のtargetディレクトリに存在する「投入データファイル名.doing」ファイルを削除する doing_file_name = target_file_name + '.doing' doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name s3_client.delete_object(Bucket=bucket_name, Key=doing_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-03 - targetディレクトリの {doing_file_name} を削除しました') - # ④ Warning情報の存在確認 + # ④ Warning情報が存在するか確認する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-04 - Warning情報の存在チェック') if warning_info: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-05 - Warning情報は存在しました') @@ -75,5 +75,5 @@ def end(bucket_name, target_data_source, target_file_name, warning_info, log_inf print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-END-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) - # ⑤ 終了ログの出力 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-0 - 終了処理を終了します') + # ⑤ 終了処理終了ログを出力する + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-10 - 終了処理を終了します') diff --git a/ecs/Dockerfile/dataimport/error.py b/ecs/Dockerfile/dataimport/error.py index d8d17c27..08a61238 100644 --- a/ecs/Dockerfile/dataimport/error.py +++ b/ecs/Dockerfile/dataimport/error.py @@ -22,11 +22,11 @@ def error(bucket_name, target_data_source, target_file_name, log_info): log_info : ログに記載するデータソース名とファイル名 """ - # ① 開始ログの出力 + # ① エラー処理開始ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-01 - エラー処理を開始します') try: - # ② 投入データファイルをerrorディレクトリに移動 + # ② 投入データファイルをS3バケット内のworkディレクトリから、以下ファイル名でerrorディレクトリに移動(コピー削除)する work_key = target_data_source + DIRECTORY_WORK + target_file_name work_obj = s3_resource.Object(bucket_name, work_key) work_response = work_obj.get() @@ -38,13 +38,13 @@ def error(bucket_name, target_data_source, target_file_name, log_info): s3_client.delete_object(Bucket=bucket_name, Key=work_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-02 - workディレクトリの {target_file_name} をerrorディレクトリに移動しました 移動後ファイル名:{error_file_name}') - # ③ doingファイルの削除 + # ③ S3バケット内のtargetディレクトリに存在する「投入データファイル名.doing」ファイルを削除する doing_file_name = target_file_name + '.doing' doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name s3_client.delete_object(Bucket=bucket_name, Key=doing_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-03 - targetディレクトリの {doing_file_name} を削除しました') - # ④ error処理結果ファイルの作成 + # ④ S3バケット内のtargetディレクトリに、「投入データファイル名.error」ファイルを作成する result_error_file_name = target_file_name + '.error' result_error_key = target_data_source + DIRECTORY_TARGET + result_error_file_name result_error_obj = s3_resource.Object(bucket_name, result_error_key) @@ -53,8 +53,8 @@ def error(bucket_name, target_data_source, target_file_name, log_info): except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-ERR-99 - エラー内容:{e}') finally: - # ⑤ 終了ログの出力 + # ⑤ 終了処理終了ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-05 - エラー処理を終了します') - # ⑥ 処理終了 + # ⑥ 処理を終了する sys.exit() diff --git a/ecs/Dockerfile/dataimport/ini.py b/ecs/Dockerfile/dataimport/ini.py index d3720db5..5953895b 100644 --- a/ecs/Dockerfile/dataimport/ini.py +++ b/ecs/Dockerfile/dataimport/ini.py @@ -29,88 +29,84 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info settings_key : 投入データに該当する個別設定ファイルのフルパス """ - # ① 開始ログの出力 + # ① 初期処理開始ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-01 - 初期処理を開始します') - # doingファイル情報作成 - doing_file_name = target_file_name + '.doing' - doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name - # ② doingファイルの存在確認 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-02 - doingファイル:{doing_file_name} の存在チェック') + # ② S3バケット内のtargetディレクトリに「投入データファイル名.doing」ファイルが存在するかチェックする try: + doing_file_name = target_file_name + '.doing' + doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-02 - doingファイル:{doing_file_name} の存在チェック') s3_client.head_object(Bucket=bucket_name, Key=doing_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-01 - 投入データ {target_file_name} は既に処理中です') error(bucket_name, target_data_source, target_file_name, log_info) except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-03 - doingファイルは存在しませんでした') - # ③ doingファイルの作成 try: + # ③ S3バケット内のtargetディレクトリに、「投入データファイル名.doing」ファイルを作成する doing_obj = s3_resource.Object(bucket_name, doing_key) doing_obj.put(Body='') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-04 - targetディレクトリに {doing_file_name} を作成しました') - except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name, log_info) - # ④ 投入データファイルをworkディレクトリに移動 - try: + # ④ 投入データファイルをS3バケット内のtargetディレクトリから、workディレクトリに移動(コピー削除)する target_obj = s3_resource.Object(bucket_name, target_key) target_response = target_obj.get() work_key = target_data_source + DIRECTORY_WORK + target_file_name work_body = target_response["Body"].read() work_obj = s3_resource.Object(bucket_name, work_key) work_obj.put(Body=work_body) + s3_client.delete_object(Bucket=bucket_name, Key=target_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-05 - 投入データ {target_file_name} をworkディレクトリに移動しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) - # ⑤ 処理結果ファイルの削除 - # doneファイルの存在確認 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-06 - doneファイル:{target_file_name}.done の存在チェック') - result_done_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.done' + # ⑤ S3バケット内のtargetディレクトリの以下ファイル群を削除(前回分の削除)する try: + # doneファイルの存在確認 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-06 - doneファイル:{target_file_name}.done の存在チェック') + result_done_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.done' s3_client.head_object(Bucket=bucket_name, Key=result_done_key) s3_client.delete_object(Bucket=bucket_name, Key=result_done_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-07 - doneファイルが存在したため削除しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-08 - doneファイルは存在しませんでした') - # warningファイルの存在確認 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-09 - warningファイル:{target_file_name}.warning の存在チェック') - result_warning_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.warning' try: + # warningファイルの存在確認 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-09 - warningファイル:{target_file_name}.warning の存在チェック') + result_warning_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.warning' s3_client.head_object(Bucket=bucket_name, Key=result_warning_key) s3_client.delete_object(Bucket=bucket_name, Key=result_warning_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-10 - warningファイルが存在したため削除しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-11 - warningファイルは存在しませんでした') - # errorファイルの存在確認 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-12 - errorファイル:{target_file_name}.error の存在チェック') - result_error_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.error' try: + # errorファイルの存在確認 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-12 - errorファイル:{target_file_name}.error の存在チェック') + result_error_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.error' s3_client.head_object(Bucket=bucket_name, Key=result_error_key) s3_client.delete_object(Bucket=bucket_name, Key=result_error_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-13 - errorファイルが存在したため削除しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-14 - errorファイルは存在しませんでした') - # ⑥ 個別設定マッピングリストの存在確認 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-15 - 個別設定マッピングリスト:{MAPPING_FILE_NAME} の存在チェック') - mapping_key = target_data_source + DIRECTORY_SETTINGS + MAPPING_FILE_NAME + # ⑥ 個別設定マッピングリストが存在するかチェックする try: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-15 - 個別設定マッピングリスト:{MAPPING_FILE_NAME} の存在チェック') + mapping_key = target_data_source + DIRECTORY_SETTINGS + MAPPING_FILE_NAME s3_client.head_object(Bucket=bucket_name, Key=mapping_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-16 - 個別設定マッピングリストの存在を確認しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-02 - 個別設定マッピングリストが存在しません') error(bucket_name, target_data_source, target_file_name, log_info) - # ⑦ 個別設定ファイルの特定 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-17 - 個別設定ファイルを検索します') + # ⑦ 個別設定ファイルを特定する try: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-17 - 個別設定ファイルを検索します') mapping_obj = s3_resource.Object(bucket_name, mapping_key) mapping_response = mapping_obj.get() mapping_body = io.TextIOWrapper(io.BytesIO(mapping_response["Body"].read()), encoding='utf-8') @@ -129,16 +125,16 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) - # ⑧ 個別設定ファイルの存在確認 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-19 - 個別設定ファイル:{settings_file_name} の存在チェック') - settings_key = target_data_source + DIRECTORY_SETTINGS + settings_file_name + # ⑧ ⑦の個別設定ファイルが存在するかチェックする try: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-19 - 個別設定ファイル:{settings_file_name} の存在チェック') + settings_key = target_data_source + DIRECTORY_SETTINGS + settings_file_name s3_client.head_object(Bucket=bucket_name, Key=settings_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-20 - 個別設定ファイルの存在を確認しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-04 - 個別設定ファイルが存在しません') error(bucket_name, target_data_source, target_file_name, log_info) - # ⑨ 終了ログの出力 + # ⑨ 初期処理終了ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-21 - 初期処理を終了します') return settings_key diff --git a/ecs/Dockerfile/dataimport/main.py b/ecs/Dockerfile/dataimport/main.py index a2551d83..ad77fdb4 100644 --- a/ecs/Dockerfile/dataimport/main.py +++ b/ecs/Dockerfile/dataimport/main.py @@ -50,35 +50,35 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings # ① メイン処理開始ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します') - # ② DB接続を開始する try: + # ② DB接続を開始する conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5) - except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name, log_info) - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました') - # ③ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする - try: - # 個別設定ファイルの読込 + # ③ タイムゾーンを変更する + with conn.cursor() as cur: + cur.execute(f'SET time_zone = "+9:00"') + result = cur.fetchall() + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました:{result}') + + # ③ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする settings_obj = s3_resource.Object(bucket_name, settings_key) settings_response = settings_obj.get() settings_list = [] for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'): settings_list.append(line.rstrip()) - # TRUNCATE実行 with conn.cursor() as cur: sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}' cur.execute(sql_truncate) - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) # ④ 投入データファイルを1行ごとにループする - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - 投入データ {target_file_name} の読み込みを開始します') try: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') target_obj = s3_resource.Object(bucket_name, target_key) target_response = target_obj.get() target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) @@ -96,7 +96,7 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings try: if settings_list[SETTINGS_ITEM["headerFlag"]] and index == 0: index += 1 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - ヘッダー行をスキップします') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします') continue # 処理件数カウント @@ -127,11 +127,12 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') # ⑤ ④の処理結果件数をログ出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - 投入データ件数:{process_count} 正常終了件数:{normal_count} Warning終了件数:{warning_count}') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count} Warning終了件数:{warning_count}') # ⑥ ロードスキーマのデータを蓄積スキーマにUPSERTする - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') try: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') + # SQL文生成 sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]}' sql = f'{sql} SELECT t.*' @@ -146,25 +147,25 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings sql = f'{sql} ins_date=t.ins_date' # システム項目:登録日時 # トランザクション開始 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') with conn.cursor() as cur: cur.execute(sql) conn.commit() - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) # ⑦ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 拡張SQL設定が存在するかチェックします') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします') if settings_list[SETTINGS_ITEM["exSqlFileName"]]: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定の存在を確認しました') - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQLファイル名:{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック') - ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]] try: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名:{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック') + ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]] s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key) ex_sql_file_exists = True - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名の存在を確認しました') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました') except Exception as e: warning_info = f'{warning_info} - 拡張SQLファイルが存在しません\n' ex_sql_file_exists = False @@ -180,22 +181,26 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings ex_sql = f'{ex_sql} {line.rstrip()}' # トランザクション開始 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') with conn.cursor() as cur: cur.execute(ex_sql) conn.commit() - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') except Exception as e: warning_info = f'{warning_info} - 拡張SQLにエラーが発生しました:{e}\n' print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLにエラーが発生しました:{e}') else: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL設定の存在はありませんでした') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした') # ⑧ DB接続を終了する - conn.close() - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - DB接続を終了しました') + try: + conn.close() + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました') + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name, log_info) - # ⑨ 終了ログの出力 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - メイン処理を終了します') + # ⑨ メイン処理終了ログを出力する + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します') return warning_info From 4660a56d315af685acf52612e7a54c1298c4f329 Mon Sep 17 00:00:00 2001 From: *lcOeIaePm0 Date: Fri, 22 Oct 2021 16:53:34 +0900 Subject: [PATCH 6/7] =?UTF-8?q?feat:=E8=A4=87=E6=95=B0=E3=82=AF=E3=82=A8?= =?UTF-8?q?=E3=83=AA=E3=81=AB=E5=AF=BE=E5=BF=9C=E3=81=99=E3=82=8B=E3=81=9F?= =?UTF-8?q?=E3=82=81=E3=80=81DB=E6=8E=A5=E7=B6=9A=E3=82=AA=E3=83=97?= =?UTF-8?q?=E3=82=B7=E3=83=A7=E3=83=B3=E3=82=92=E8=BF=BD=E5=8A=A0=E3=81=97?= =?UTF-8?q?=E3=81=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/Dockerfile/dataimport/main.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ecs/Dockerfile/dataimport/main.py b/ecs/Dockerfile/dataimport/main.py index ad77fdb4..92eac609 100644 --- a/ecs/Dockerfile/dataimport/main.py +++ b/ecs/Dockerfile/dataimport/main.py @@ -1,6 +1,7 @@ from datetime import datetime import boto3 import pymysql +from pymysql.constants import CLIENT import io import csv from error import error @@ -52,7 +53,7 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings try: # ② DB接続を開始する - conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5) + conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました') # ③ タイムゾーンを変更する From 110dd431e8d54431dff8a18815dd15836a95a232 Mon Sep 17 00:00:00 2001 From: *lcOeIaePm0 Date: Mon, 25 Oct 2021 14:28:43 +0900 Subject: [PATCH 7/7] =?UTF-8?q?fix:doing=E3=83=95=E3=82=A1=E3=82=A4?= =?UTF-8?q?=E3=83=AB=E3=81=8C=E5=AD=98=E5=9C=A8=E3=81=97=E3=81=9F=E6=99=82?= =?UTF-8?q?=E3=81=AE=E5=87=A6=E7=90=86=E3=82=92=E4=BF=AE=E6=AD=A3=E3=81=97?= =?UTF-8?q?=E3=81=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/Dockerfile/dataimport/ini.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ecs/Dockerfile/dataimport/ini.py b/ecs/Dockerfile/dataimport/ini.py index 5953895b..b80af238 100644 --- a/ecs/Dockerfile/dataimport/ini.py +++ b/ecs/Dockerfile/dataimport/ini.py @@ -3,6 +3,7 @@ import boto3 import io import csv import re +import sys from error import error # 定数 @@ -40,7 +41,7 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-02 - doingファイル:{doing_file_name} の存在チェック') s3_client.head_object(Bucket=bucket_name, Key=doing_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-01 - 投入データ {target_file_name} は既に処理中です') - error(bucket_name, target_data_source, target_file_name, log_info) + sys.exit() except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-03 - doingファイルは存在しませんでした')