feat: 圧縮フラグON/OFFにより使用するファイルバイトを判定するように修正。

refactor: DB接続時に任意のPORTを指定できるようにした。指定されていない場合は3306がデフォルト。
This commit is contained in:
shimoda.m@nds-tyo.co.jp 2025-05-08 13:48:40 +09:00
parent df1da34628
commit 06d741e994
2 changed files with 48 additions and 19 deletions

View File

@ -1,10 +1,11 @@
import os
from datetime import datetime
from ini import init
from chk import check
from main import main
from end import end
from error import error
from ini import init
from main import main
# 引数
BUCKET_NAME = os.environ["BUCKET_NAME"]
@ -18,7 +19,9 @@ DB_HOST = os.environ["DB_HOST"]
DB_NAME = os.environ["DB_NAME"]
DB_PASS = os.environ["DB_PASS"]
DB_USER = os.environ["DB_USER"]
DB_INFO = {"host": DB_HOST, "name": DB_NAME, "pass": DB_PASS, "user": DB_USER}
DB_PORT = int(os.environ.get("DB_PORT", 3306))
DB_INFO = {"host": DB_HOST, "port": DB_PORT,
"name": DB_NAME, "pass": DB_PASS, "user": DB_USER}
# 定数
LOG_LEVEL = {"i": 'Info'}
@ -33,26 +36,36 @@ LOG_INFO = f'{DATA_SOURCE_NAME} {FILE_NAME}'
try:
# ① データ取込処理開始ログを出力する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します')
# ② 初期処理を呼び出す
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し')
settings_key = init(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO, MODE)
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し')
settings_key = init(BUCKET_NAME, TARGET_KEY,
DATA_SOURCE_NAME, FILE_NAME, LOG_INFO, MODE)
# ③ チェック処理を呼び出す
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し')
check(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO, MODE)
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し')
check(BUCKET_NAME, DATA_SOURCE_NAME,
FILE_NAME, settings_key, LOG_INFO, MODE)
# ④ メイン処理を呼び出す
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し')
warning_info = main(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO, MODE)
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し')
warning_info = main(BUCKET_NAME, DATA_SOURCE_NAME,
FILE_NAME, settings_key, DB_INFO, LOG_INFO, MODE)
# ⑤ 終了処理を呼び出す
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し')
end(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, warning_info, LOG_INFO, MODE)
# ⑥ データ取込処理終了ログを出力する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します')
except Exception as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["e"]} E-CTRL-99 - エラー内容:{e}')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["e"]} E-CTRL-99 - エラー内容:{e}')
error(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO)

View File

@ -4,6 +4,7 @@ from datetime import datetime
import boto3
import pymysql
from chk import LOCAL_UNCOMPRESSED_FILE_PATH
from common import convert_quotechar, debug_log
from error import error
from pymysql.constants import CLIENT
@ -87,7 +88,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-01 - メイン処理を開始します')
# ② DB接続を開始する
conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"],
conn = pymysql.connect(host=db_info["host"], port=db_info["port"], user=db_info["user"], passwd=db_info["pass"],
db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS)
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-02 - DB接続を開始しました')
@ -124,14 +125,29 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO}' +
f' I-MAIN-04 - {load_schema_name} をTRUNCATEしました')
# ⑤ 投入データファイルを1行ごとにループする
# ⑤-1 投入データファイルを1行ごとにループする
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します')
work_key = target_data_source + DIRECTORY_WORK + target_file_name
work_response = s3_client.get_object(Bucket=bucket_name, Key=work_key)
work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read(
)), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
import_data_bytes = None
compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]]
if not compressed_flag or int(compressed_flag) == 0:
# 圧縮フラグが未設定、またはOFFの場合、S3バケット上の投入データファイルを読む
work_key = target_data_source + DIRECTORY_WORK + target_file_name
work_response = s3_client.get_object(
Bucket=bucket_name, Key=work_key)
import_data_bytes = work_response["Body"].read()
# 圧縮フラグがONの場合、チェック処理で展開済みのtmpディレクトリ内のファイルを読む
elif compressed_flag and int(compressed_flag) == 1:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-21 - ファイルが圧縮されていたため、展開済みのファイルを利用します')
with open(LOCAL_UNCOMPRESSED_FILE_PATH, 'rb') as f:
import_data_bytes = f.read()
else:
# nop
pass
work_data = io.TextIOWrapper(io.BytesIO(import_data_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]],
newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
process_count = 0 # 処理件数カウンタ
normal_count = 0 # 正常終了件数カウンタ
warning_count = 0 # ワーニング終了件数カウンター