feat: 0バイトファイルの時に拡張SQLを実行するかどうかのフラグを追加

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2025-06-16 16:13:13 +09:00
parent ced1d110c5
commit 2885b56c0c
4 changed files with 77 additions and 19 deletions

View File

@ -5,14 +5,13 @@ import sys
from datetime import datetime from datetime import datetime
import boto3 import boto3
from common import (ERROR, INFO, LINE_FEED_CODE, SETTINGS_ITEM, import pymysql
convert_quotechar, debug_log, uncompress_gzip, from common import (DIRECTORY_SETTINGS, DIRECTORY_WORK, ERROR, INFO,
uncompress_zip) LINE_FEED_CODE, SETTINGS_ITEM, WARNING, convert_quotechar,
debug_log, uncompress_gzip, uncompress_zip)
from end import end from end import end
from error import error from error import error
from pymysql.constants import CLIENT
# 定数
DIRECTORY_WORK = '/work/'
# クラス変数 # クラス変数
s3_client = boto3.client('s3') s3_client = boto3.client('s3')
@ -23,13 +22,14 @@ class CheckError(Exception):
pass pass
def check(bucket_name, target_data_source, target_file_name, settings_key, log_info, mode): def check(bucket_name, target_data_source, target_file_name, settings_key, db_info, log_info, mode):
"""チェック処理 """チェック処理
Args: Args:
bucket_name : バケット名 bucket_name : バケット名
target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分
target_file_name : 投入データのファイル名 target_file_name : 投入データのファイル名
settings_key : 投入データに該当する個別設定ファイルのフルパス settings_key : 投入データに該当する個別設定ファイルのフルパス
db_info : データベース情報
log_info : ログに記載するデータソース名とファイル名 log_info : ログに記載するデータソース名とファイル名
mode : 処理モード mode : 処理モード
Raises: Raises:
@ -91,6 +91,12 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i
print( print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-02 - C-0のチェックを開始します') f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-02 - C-0のチェックを開始します')
if is_empty_file(work_csv_row, settings_list): if is_empty_file(work_csv_row, settings_list):
# 拡張SQL実行フラグがONになっている場合は拡張SQLを実行して処理終了する。
if settings_list[SETTINGS_ITEM["executeExSqlIfFileEmptyFlag"]] == '1':
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-15 - 投入ファイルがバイトでしたが、拡張SQLを実行します。')
execute_ex_sql(bucket_name, target_data_source,
settings_list, db_info, log_info)
print( print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します') f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します')
end(bucket_name, target_data_source, end(bucket_name, target_data_source,
@ -260,6 +266,57 @@ def reverse_readline_stream(f: io.BytesIO, line_feed: str, chunk_size=4096):
yield buffer yield buffer
def execute_ex_sql(bucket_name, target_data_source, settings_list, db_info, log_info):
# 個別設定ファイルに拡張SQLファイル名が設定されているかチェック
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-16 - 拡張SQL設定が存在するかチェックします')
ex_sql_file_name = settings_list[SETTINGS_ITEM["exSqlFileName"]]
if ex_sql_file_name:
try:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-17 - 拡張SQL設定の存在を確認しました')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-18 - 拡張SQLファイル名{ex_sql_file_name} の存在チェック')
ex_sql_key = target_data_source + DIRECTORY_SETTINGS + ex_sql_file_name
s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key)
ex_sql_file_exists = True
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-19 - 拡張SQLファイル名の存在を確認しました')
except Exception:
ex_sql_file_exists = False
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-CHK-02 - 拡張SQLファイルが存在しません')
try:
if ex_sql_file_exists:
# 拡張SQLファイルからSQL文生成
ex_sql_obj_response = s3_client.get_object(
Bucket=bucket_name, Key=ex_sql_key)
ex_sql = ''
for line in io.TextIOWrapper(io.BytesIO(ex_sql_obj_response["Body"].read()), encoding='utf-8'):
ex_sql = f'{ex_sql} {line.rstrip()}'
# DB接続を開始する
conn = pymysql.connect(host=db_info["host"], port=db_info["port"], user=db_info["user"], passwd=db_info["pass"],
db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS, local_infile=True)
# トランザクション開始
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-20 - 拡張SQL{ex_sql_file_name} のトランザクションを開始します')
with conn.cursor() as cur:
cur.execute(ex_sql)
conn.commit()
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-21 - 拡張SQL{ex_sql_file_name} のCOMMIT処理が正常終了しました')
conn.close()
except Exception as e:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-CHK-03 - 拡張SQLにエラーが発生しました{e}')
else:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-22 - 拡張SQL設定の存在はありませんでした')
# ローカル実行用コード # ローカル実行用コード
# 値はよしなに変えてください # 値はよしなに変えてください
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -56,16 +56,18 @@ SETTINGS_ITEM = {
'bulkImportFlag': 14, 'bulkImportFlag': 14,
'compressedFlag': 15, 'compressedFlag': 15,
'compression': 16, 'compression': 16,
'reserved1': 17, 'executeExSqlIfFileEmptyFlag': 17,
'reserved2': 18, 'reserved1': 18,
'reserved3': 19, 'reserved2': 19,
'reserved4': 20, 'reserved3': 20,
'reserved5': 21, 'reserved4': 21,
'reserved6': 22, 'reserved5': 22,
'reserved7': 23, 'reserved6': 23,
'reserved8': 24 'reserved7': 24
} }
DIRECTORY_WORK = '/work/'
DIRECTORY_SETTINGS = '/settings/'
LOCAL_DIRECTORY_TMP = '/tmp' LOCAL_DIRECTORY_TMP = '/tmp'
# チェック処理で解凍した圧縮ファイルの中身を格納するフォルダ # チェック処理で解凍した圧縮ファイルの中身を格納するフォルダ
LOCAL_TEMPORARY_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/temporary_file.dat' LOCAL_TEMPORARY_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/temporary_file.dat'

View File

@ -49,7 +49,7 @@ try:
print( print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し') f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し')
check(BUCKET_NAME, DATA_SOURCE_NAME, check(BUCKET_NAME, DATA_SOURCE_NAME,
FILE_NAME, settings_key, LOG_INFO, MODE) FILE_NAME, settings_key, DB_INFO, LOG_INFO, MODE)
# ④ メイン処理を呼び出す # ④ メイン処理を呼び出す
print( print(

View File

@ -4,16 +4,15 @@ from datetime import datetime
import boto3 import boto3
import pymysql import pymysql
from common import (ERROR, INFO, LINE_FEED_CODE, LOCAL_TEMPORARY_FILE_PATH, from common import (DIRECTORY_SETTINGS, DIRECTORY_WORK, ERROR, INFO,
LINE_FEED_CODE, LOCAL_TEMPORARY_FILE_PATH,
MYSQL_CHARSET_CODE, SETTINGS_ITEM, WARNING, MYSQL_CHARSET_CODE, SETTINGS_ITEM, WARNING,
convert_quotechar, debug_log) convert_quotechar, debug_log)
from error import error from error import error
from pymysql.constants import CLIENT from pymysql.constants import CLIENT
# 定数 # 定数
DIRECTORY_WORK = '/work/'
DIRECTORY_SETTINGS = '/settings/'
TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:' TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:'
TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]' TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]'
INVALID_CONFIG_EXCEPTION_MESSAGE = f'個別設定ファイルのインポート方法に不備がありました。 インポート方法は "{TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT}" のように設定してください' INVALID_CONFIG_EXCEPTION_MESSAGE = f'個別設定ファイルのインポート方法に不備がありました。 インポート方法は "{TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT}" のように設定してください'