From 06d741e994dbc313c0743d3212ace7915f54a1cd Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Thu, 8 May 2025 13:48:40 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=9C=A7=E7=B8=AE=E3=83=95=E3=83=A9?= =?UTF-8?q?=E3=82=B0ON/OFF=E3=81=AB=E3=82=88=E3=82=8A=E4=BD=BF=E7=94=A8?= =?UTF-8?q?=E3=81=99=E3=82=8B=E3=83=95=E3=82=A1=E3=82=A4=E3=83=AB=E3=83=90?= =?UTF-8?q?=E3=82=A4=E3=83=88=E3=82=92=E5=88=A4=E5=AE=9A=E3=81=99=E3=82=8B?= =?UTF-8?q?=E3=82=88=E3=81=86=E3=81=AB=E4=BF=AE=E6=AD=A3=E3=80=82=20refact?= =?UTF-8?q?or:=20DB=E6=8E=A5=E7=B6=9A=E6=99=82=E3=81=AB=E4=BB=BB=E6=84=8F?= =?UTF-8?q?=E3=81=AEPORT=E3=82=92=E6=8C=87=E5=AE=9A=E3=81=A7=E3=81=8D?= =?UTF-8?q?=E3=82=8B=E3=82=88=E3=81=86=E3=81=AB=E3=81=97=E3=81=9F=E3=80=82?= =?UTF-8?q?=E6=8C=87=E5=AE=9A=E3=81=95=E3=82=8C=E3=81=A6=E3=81=84=E3=81=AA?= =?UTF-8?q?=E3=81=84=E5=A0=B4=E5=90=88=E3=81=AF3306=E3=81=8C=E3=83=87?= =?UTF-8?q?=E3=83=95=E3=82=A9=E3=83=AB=E3=83=88=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/controller.py | 39 ++++++++++++++++--------- ecs/dataimport/dataimport/main.py | 28 ++++++++++++++---- 2 files changed, 48 insertions(+), 19 deletions(-) diff --git a/ecs/dataimport/dataimport/controller.py b/ecs/dataimport/dataimport/controller.py index 79b54717..d9a639f3 100644 --- a/ecs/dataimport/dataimport/controller.py +++ b/ecs/dataimport/dataimport/controller.py @@ -1,10 +1,11 @@ import os from datetime import datetime -from ini import init + from chk import check -from main import main from end import end from error import error +from ini import init +from main import main # 引数 BUCKET_NAME = os.environ["BUCKET_NAME"] @@ -18,7 +19,9 @@ DB_HOST = os.environ["DB_HOST"] DB_NAME = os.environ["DB_NAME"] DB_PASS = os.environ["DB_PASS"] DB_USER = os.environ["DB_USER"] -DB_INFO = {"host": DB_HOST, "name": DB_NAME, "pass": DB_PASS, "user": DB_USER} +DB_PORT = int(os.environ.get("DB_PORT", 3306)) +DB_INFO = {"host": DB_HOST, "port": DB_PORT, + "name": DB_NAME, "pass": DB_PASS, "user": DB_USER} # 定数 LOG_LEVEL = {"i": 'Info'} @@ -33,26 +36,36 @@ LOG_INFO = f'{DATA_SOURCE_NAME} {FILE_NAME}' try: # ① データ取込処理開始ログを出力する - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します') # ② 初期処理を呼び出す - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し') - settings_key = init(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO, MODE) + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し') + settings_key = init(BUCKET_NAME, TARGET_KEY, + DATA_SOURCE_NAME, FILE_NAME, LOG_INFO, MODE) # ③ チェック処理を呼び出す - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し') - check(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO, MODE) + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し') + check(BUCKET_NAME, DATA_SOURCE_NAME, + FILE_NAME, settings_key, LOG_INFO, MODE) # ④ メイン処理を呼び出す - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し') - warning_info = main(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO, MODE) + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し') + warning_info = main(BUCKET_NAME, DATA_SOURCE_NAME, + FILE_NAME, settings_key, DB_INFO, LOG_INFO, MODE) # ⑤ 終了処理を呼び出す - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し') end(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, warning_info, LOG_INFO, MODE) # ⑥ データ取込処理終了ログを出力する - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します') except Exception as e: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["e"]} E-CTRL-99 - エラー内容:{e}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["e"]} E-CTRL-99 - エラー内容:{e}') error(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO) diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index 3a39c7d6..55ea1119 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -4,6 +4,7 @@ from datetime import datetime import boto3 import pymysql +from chk import LOCAL_UNCOMPRESSED_FILE_PATH from common import convert_quotechar, debug_log from error import error from pymysql.constants import CLIENT @@ -87,7 +88,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-01 - メイン処理を開始します') # ② DB接続を開始する - conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], + conn = pymysql.connect(host=db_info["host"], port=db_info["port"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS) print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-02 - DB接続を開始しました') @@ -124,14 +125,29 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO}' + f' I-MAIN-04 - {load_schema_name} をTRUNCATEしました') - # ⑤ 投入データファイルを1行ごとにループする + # ⑤-1 投入データファイルを1行ごとにループする print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') - work_key = target_data_source + DIRECTORY_WORK + target_file_name - work_response = s3_client.get_object(Bucket=bucket_name, Key=work_key) - work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read( - )), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) + import_data_bytes = None + compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]] + if not compressed_flag or int(compressed_flag) == 0: + # 圧縮フラグが未設定、またはOFFの場合、S3バケット上の投入データファイルを読む + work_key = target_data_source + DIRECTORY_WORK + target_file_name + work_response = s3_client.get_object( + Bucket=bucket_name, Key=work_key) + import_data_bytes = work_response["Body"].read() + # 圧縮フラグがONの場合、チェック処理で展開済みのtmpディレクトリ内のファイルを読む + elif compressed_flag and int(compressed_flag) == 1: + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-21 - ファイルが圧縮されていたため、展開済みのファイルを利用します') + with open(LOCAL_UNCOMPRESSED_FILE_PATH, 'rb') as f: + import_data_bytes = f.read() + else: + # nop + pass + work_data = io.TextIOWrapper(io.BytesIO(import_data_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]], + newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) process_count = 0 # 処理件数カウンタ normal_count = 0 # 正常終了件数カウンタ warning_count = 0 # ワーニング終了件数カウンター