Merge pull request #471 feature-NEWDWH2021-1857 into develop

This commit is contained in:
下田雅人 2025-05-16 16:53:42 +09:00
commit dcf438297c
3 changed files with 630 additions and 179 deletions

View File

@ -1,6 +1,8 @@
import csv
import io
import os
import sys
import zipfile
from datetime import datetime
import boto3
@ -10,7 +12,10 @@ from error import error
# 定数
DIRECTORY_WORK = '/work/'
LOG_LEVEL = {'i': 'Info', 'e': 'Error'}
LOCAL_DIRECTORY_TMP = '/tmp'
# チェック処理で解凍した圧縮ファイルの中身を格納するフォルダ
LOCAL_TEMPORARY_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/temporary_file.dat'
LOG_LEVEL = {'i': 'Info', 'e': 'Error', 'w': "Warning"}
SETTINGS_ITEM = {
'dataSource': 0,
'delimiter': 1,
@ -26,19 +31,29 @@ SETTINGS_ITEM = {
'exSqlFileName': 11,
'commaReplaceColumns': 12,
'importManner': 13,
'reserved1': 14,
'reserved2': 15,
'reserved3': 16,
'reserved4': 17,
'reserved5': 18,
'reserved6': 19
'bulkImportFlag': 14,
'compressedFlag': 15,
'compression': 16,
'reserved1': 17,
'reserved2': 18,
'reserved3': 19,
'reserved4': 20,
'reserved5': 21,
'reserved6': 22,
'reserved7': 23,
'reserved8': 24
}
LINE_FEED_CODE = {
'CR': '\r',
'LF': '\n',
'CRLF': '\r\n',
}
INFO = LOG_LEVEL["i"]
ERROR = LOG_LEVEL["e"]
WARNING = LOG_LEVEL["w"]
# クラス変数
s3_client = boto3.client('s3')
@ -63,26 +78,48 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i
try:
debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode)
debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode)
debug_log(
f'引数 target_data_source : {target_data_source}', log_info, mode)
debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode)
debug_log(f'引数 settings_key : {settings_key}', log_info, mode)
debug_log(f'引数 log_info : {log_info}', log_info, mode)
debug_log(f'引数 mode : {mode}', log_info, mode)
# ① チェック処理開始ログを出力する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-01 - チェック処理を開始します')
# データ読込
settings_obj_response = s3_client.get_object(Bucket=bucket_name, Key=settings_key)
settings_obj_response = s3_client.get_object(
Bucket=bucket_name, Key=settings_key)
settings_list = []
for line in io.TextIOWrapper(io.BytesIO(settings_obj_response["Body"].read()), encoding='utf-8'):
settings_list.append(line.rstrip('\n'))
# 設定ファイルに記載のない行を空文字として扱い、予約行とする
for _ in range(len(SETTINGS_ITEM) - len(settings_list)):
settings_list.append('')
work_key = target_data_source + DIRECTORY_WORK + target_file_name
work_obj_response = s3_client.get_object(Bucket=bucket_name, Key=work_key)
work_data = io.TextIOWrapper(io.BytesIO(work_obj_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
work_obj_response = s3_client.get_object(
Bucket=bucket_name, Key=work_key)
work_obj_file = work_obj_response["Body"].read()
work_data_bytes = io.BytesIO(work_obj_file)
compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]]
# ② ファイル圧縮フラグがONの場合、チェック対象ファイルの展開処理を行う。
if compressed_flag and compressed_flag == '1':
work_data_bytes = io.BytesIO(uncompress_file(
work_data_bytes, settings_list, log_info))
encoding = settings_list[SETTINGS_ITEM["charCode"]]
line_feed = LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]
delimiter = settings_list[SETTINGS_ITEM["delimiter"]]
work_data = io.TextIOWrapper(work_data_bytes, encoding=encoding,
newline=line_feed)
work_csv_row = []
for i, line in enumerate(csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]])):
for i, line in enumerate(csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
delimiter=delimiter)):
# ヘッダあり、かつ、1行目の場合
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and i == 0:
work_csv_row.append(line)
@ -90,41 +127,79 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i
work_csv_row.append(line)
break
# ② C-0のデータ件数チェックを開始する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-0のチェックを開始します')
# ③ C-0のデータ件数チェックを開始する
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-02 - C-0のチェックを開始します')
if is_empty_file(work_csv_row, settings_list):
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します')
end(bucket_name, target_data_source, target_file_name, '', log_info, mode)
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - 終了処理完了')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します')
end(bucket_name, target_data_source,
target_file_name, '', log_info, mode)
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-04 - 終了処理完了')
sys.exit()
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-0正常終了')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-05 - C-0正常終了')
# ③ C-1の項目数チェックを開始する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - C-1のチェックを開始します')
# ④ C-1の項目数チェックを開始する
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-06 - C-1 項目数チェックを開始します')
work_csv_row_item_len = len(work_csv_row[0])
if work_csv_row_item_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]):
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-07 - C-1正常終了')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-07 - C-1 項目数チェック:正常終了')
else:
raise CheckError(f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_csv_row_item_len}')
raise CheckError(
f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_csv_row_item_len}')
# ④ C-2の項目並び順チェック開始する
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-08 - C-2のチェックを開始します')
settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip().split(',')
# ⑤ C-2の項目並び順チェック開始する
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-08 - C-2 項目並び順チェックを開始します')
settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip(
).split(',')
work_header_list = work_csv_row[0]
for i in range(len(settings_header_list)):
if not settings_header_list[i] == work_header_list[i]:
raise CheckError(f'E-CHK-02 - 項目順序が一致しません {i + 1}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{work_header_list[i]}')
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-09 - C-2正常終了')
raise CheckError(
f'E-CHK-02 - 項目順序が一致しません {i + 1}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{work_header_list[i]}')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-09 - C-2 項目並び順チェック:正常終了')
# ⑤ チェック処理終了ログを出力する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-10 - チェック処理を終了します')
# ⑥ C-3の末尾行項目数チェック開始する
if settings_list[SETTINGS_ITEM["bulkImportFlag"]] and int(settings_list[SETTINGS_ITEM["bulkImportFlag"]]) == 1:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-13 - C-3 末尾行項目数チェックを開始します')
# ファイルの末尾行を取得し、ファイル項目数と比較する
last_line_work_data_bytes = next(reverse_readline_stream(
work_data_bytes, LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]))
# 区切り文字例えばカンマを文字とデリミタで区別できるように、csvリーダーで読む
last_line_separated_data = [
row for row in csv.reader(
io.TextIOWrapper(io.BytesIO(last_line_work_data_bytes)),
quotechar=convert_quotechar(
settings_list[SETTINGS_ITEM["quotechar"]]),
delimiter=delimiter)
]
last_line_count = len(last_line_separated_data[0])
if last_line_count != int(settings_list[SETTINGS_ITEM["csvNumItems"]]):
raise CheckError(
f'E-CHK-03 - 投入データ末尾の項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{last_line_count}')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-09 - C-3 末尾行項目数チェック 正常終了')
# ⑦ チェック処理終了ログを出力する
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-10 - チェック処理を終了します')
except CheckError as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} {e}')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} {e}')
error(bucket_name, target_data_source, target_file_name, log_info)
except Exception as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-CHK-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
@ -147,6 +222,113 @@ def is_empty_file(work_csv_row: list, settings_list: list):
return len(work_csv_row) == 0
def uncompress_file(work_data_file: bytes, settings_list: list, log_info) -> bytes:
"""指定された形式で圧縮ファイルを展開し、ローカルに書き出す。
Args:
work_data_file (bytes): S3から読み込んだ登録
settings_list (list): 個別設定ファイルのリスト
log_info (str): ログに記載するデータソース名とファイル名
Returns:
bytes: 解凍後のファイルのバイト配列
"""
compression = settings_list[SETTINGS_ITEM['compression']]
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-11 - 投入ファイルが圧縮されているため、展開処理を行います。圧縮形式:{compression}')
if compression == 'zip':
file_bytes = None
with zipfile.ZipFile(work_data_file, 'r') as zip_ref:
# 昇順でソートする。
file_list: list[str] = sorted(zip_ref.namelist())
if len(file_list) > 1:
# 圧縮ファイル内に複数ファイルが存在する場合、warningログを出力する。
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-CHK-01 - 圧縮データ内に複数ファイルが存在したため、{file_list[0]}のみ登録を行います。')
target_file_name = file_list[0]
# 末尾が「/」で終わるのはフォルダ
if target_file_name.endswith('/'):
# 圧縮ファイル内の先頭がフォルダの場合、エラー処理を行う。
raise Exception(
f'展開したデータはファイルではありません。ファイルパス: {target_file_name}')
# zipファイル内には1ファイルのみ
with zip_ref.open(target_file_name) as file:
file_bytes = file.read()
# ファイルを一時ディレクトリに書き出す。
encoding = settings_list[SETTINGS_ITEM["charCode"]]
line_feed = LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]
delimiter = settings_list[SETTINGS_ITEM["delimiter"]]
quote_char = convert_quotechar(
settings_list[SETTINGS_ITEM["quotechar"]])
uncompressed_file = io.TextIOWrapper(io.BytesIO(file_bytes), encoding=encoding,
newline=line_feed)
csv_reader = csv.reader(uncompressed_file, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
delimiter=delimiter)
with open(LOCAL_TEMPORARY_FILE_PATH, 'w', encoding=encoding, newline='') as csvfile:
csv_writer = csv.writer(csvfile, quotechar=quote_char,
delimiter=delimiter)
for row in csv_reader:
csv_writer.writerow(row)
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-12 - 投入ファイルの展開が正常終了しました')
return file_bytes
# MEMO: zip以外の圧縮形式に対応する際に追記すること
else:
raise ValueError("圧縮形式が一致しません")
def reverse_readline_stream(f: io.BytesIO, line_feed: str, chunk_size=4096):
"""バイトモードのファイルストリームを後ろから読み、1行ずつ返すジェネレータ。
指定されたバイト単位でファイルを読み込み1行ずつにして返している
Args:
f (io.BytesIO): バイトモードのファイルストリーム
line_feed (str): 改行コード
chunk_size (int, optional): 読み込むファイルチャンク数(byte) Defaults to 4096.
Yields:
bytes: 末尾から1行分のレコード
"""
# ファイルのポインタを末尾に移動させてからポインタを取得する
f.seek(0, os.SEEK_END)
pointer = f.tell()
buffer = b''
yielded_any = False
while pointer > 0:
read_size = min(chunk_size, pointer)
# チャンク数分ファイルを読み込むために、ポインタをずらす
pointer -= read_size
f.seek(pointer)
chunk = f.read(read_size)
# 読み込んだチャンク + バッファ(途中行)
buffer = chunk + buffer
# 改行文字で区切る
parts = buffer.split(line_feed.encode())
# parts[0] は先頭途中行 → 次ループへ残す
buffer = parts[0]
# parts[1:] が「完全に揃った行」のリスト
# 改行文字で区切っているため、行途中のparts[1:] は空文字になるため、以下のループは素通りする。
for line in reversed(parts[1:]):
if not yielded_any and line == b'':
# 最初にだけ出てくる空行を飛ばす
continue
# 一度でも結果を返している場合にフラグを立てる
yielded_any = True
yield line
# ポインターがファイル先頭まで来たあとの残り1行
if not (not yielded_any and buffer == b''):
# 一度でも結果を返しているかつ、バッファにデータが残っている場合、先頭行のデータを返す
yield buffer
# ローカル実行用コード
# 値はよしなに変えてください
# if __name__ == '__main__':

View File

@ -1,10 +1,11 @@
import os
from datetime import datetime
from ini import init
from chk import check
from main import main
from end import end
from error import error
from ini import init
from main import main
# 引数
BUCKET_NAME = os.environ["BUCKET_NAME"]
@ -18,7 +19,9 @@ DB_HOST = os.environ["DB_HOST"]
DB_NAME = os.environ["DB_NAME"]
DB_PASS = os.environ["DB_PASS"]
DB_USER = os.environ["DB_USER"]
DB_INFO = {"host": DB_HOST, "name": DB_NAME, "pass": DB_PASS, "user": DB_USER}
DB_PORT = int(os.environ.get("DB_PORT", 3306))
DB_INFO = {"host": DB_HOST, "port": DB_PORT,
"name": DB_NAME, "pass": DB_PASS, "user": DB_USER}
# 定数
LOG_LEVEL = {"i": 'Info'}
@ -33,26 +36,36 @@ LOG_INFO = f'{DATA_SOURCE_NAME} {FILE_NAME}'
try:
# ① データ取込処理開始ログを出力する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します')
# ② 初期処理を呼び出す
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し')
settings_key = init(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO, MODE)
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し')
settings_key = init(BUCKET_NAME, TARGET_KEY,
DATA_SOURCE_NAME, FILE_NAME, LOG_INFO, MODE)
# ③ チェック処理を呼び出す
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し')
check(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO, MODE)
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し')
check(BUCKET_NAME, DATA_SOURCE_NAME,
FILE_NAME, settings_key, LOG_INFO, MODE)
# ④ メイン処理を呼び出す
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し')
warning_info = main(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO, MODE)
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し')
warning_info = main(BUCKET_NAME, DATA_SOURCE_NAME,
FILE_NAME, settings_key, DB_INFO, LOG_INFO, MODE)
# ⑤ 終了処理を呼び出す
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し')
end(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, warning_info, LOG_INFO, MODE)
# ⑥ データ取込処理終了ログを出力する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します')
except Exception as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["e"]} E-CTRL-99 - エラー内容:{e}')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["e"]} E-CTRL-99 - エラー内容:{e}')
error(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO)

View File

@ -1,10 +1,10 @@
import csv
import io
import re
from datetime import datetime
import boto3
import pymysql
from chk import LOCAL_TEMPORARY_FILE_PATH
from common import convert_quotechar, debug_log
from error import error
from pymysql.constants import CLIENT
@ -27,23 +27,43 @@ SETTINGS_ITEM = {
'exSqlFileName': 11,
'commaReplaceColumns': 12,
'importManner': 13,
'reserved1': 14,
'reserved2': 15,
'reserved3': 16,
'reserved4': 17,
'reserved5': 18,
'reserved6': 19
'bulkImportFlag': 14,
'compressedFlag': 15,
'compression': 16,
'reserved1': 17,
'reserved2': 18,
'reserved3': 19,
'reserved4': 20,
'reserved5': 21,
'reserved6': 22,
'reserved7': 23,
'reserved8': 24
}
LINE_FEED_CODE = {
'CR': '\r',
'LF': '\n',
'CRLF': '\r\n',
}
# LOAD DATA文で文字コードを指定するために、個別設定ファイルの文字コード指定をMySQLの文字コード表記に変換する
MYSQL_CHARSET_CODE = {
'utf-8': 'utf8mb4',
'utf8': 'utf8mb4',
'utf-8-sig': 'utf8mb4',
'shift_jis': 'cp932',
'cp932': 'cp932',
}
DIRECTORY_SETTINGS = '/settings/'
TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:'
TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]'
INVALID_CONFIG_EXCEPTION_MESSAGE = f'個別設定ファイルのインポート方法に不備がありました。 インポート方法は "{TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT}" のように設定してください'
INFO = LOG_LEVEL['i']
WARNING = LOG_LEVEL['w']
ERROR = LOG_LEVEL['e']
# クラス変数
s3_client = boto3.client('s3')
@ -64,7 +84,8 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
try:
debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode)
debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode)
debug_log(
f'引数 target_data_source : {target_data_source}', log_info, mode)
debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode)
debug_log(f'引数 settings_key : {settings_key}', log_info, mode)
debug_log(f'引数 db_info : {db_info}', log_info, mode)
@ -72,24 +93,30 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
debug_log(f'引数 mode : {mode}', log_info, mode)
# ① メイン処理開始ログを出力する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-01 - メイン処理を開始します')
# ② DB接続を開始する
conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS)
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました')
conn = pymysql.connect(host=db_info["host"], port=db_info["port"], user=db_info["user"], passwd=db_info["pass"],
db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS, local_infile=True)
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-02 - DB接続を開始しました')
except Exception as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
try:
# ③ タイムゾーンを変更する
with conn.cursor() as cur:
cur.execute(f'SET time_zone = "+9:00"')
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-03 - タイムゾーンを変更しました')
# ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする
# 個別設定ファイルの読み込み
settings_response = s3_client.get_object(Bucket=bucket_name, Key=settings_key)
settings_response = s3_client.get_object(
Bucket=bucket_name, Key=settings_key)
settings_list = []
for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'):
settings_list.append(line.rstrip('\n'))
@ -99,106 +126,70 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
settings_list.append('')
# ロードスキーマのTRUNCATE
load_schema_name = settings_list[SETTINGS_ITEM["loadSchemaName"]]
with conn.cursor() as cur:
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}'
sql_truncate = f'TRUNCATE table {load_schema_name}'
cur.execute(sql_truncate)
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO}' +
f' I-MAIN-04 - {load_schema_name} をTRUNCATEしました')
# ⑤ 投入データファイルを1行ごとにループする
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します')
work_key = target_data_source + DIRECTORY_WORK + target_file_name
work_response = s3_client.get_object(Bucket=bucket_name, Key=work_key)
work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
process_count = 0 # 処理件数カウンタ
normal_count = 0 # 正常終了件数カウンタ
warning_count = 0 # ワーニング終了件数カウンター
warning_info = '' # ワーニング情報
index = 0 # ループインデックス
settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip().split(',')
settings_replace_comma_list = settings_list[SETTINGS_ITEM["commaReplaceColumns"]].rstrip().split(',')
for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]]):
try:
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True and index == 0:
index += 1
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします')
continue
# 処理件数カウント
process_count += 1
# SQL文生成
query_parameter_list = []
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} ('
for db_column in settings_db_columu_list:
sql = f'{sql} {db_column},'
sql = f'{sql} file_name,' # システム項目:取込ファイル名
sql = f'{sql} file_row_cnt,' # システム項目:取込ファイル行番号
sql = f'{sql} delete_flg,' # システム項目:論理削除フラグ
sql = f'{sql} ins_user,' # システム項目:登録者
sql = f'{sql} ins_date,' # システム項目:登録日時
sql = f'{sql} upd_user,' # システム項目:更新者
sql = f'{sql} upd_date)' # システム項目:更新日時
sql = f'{sql} VALUES ('
for i in range(len(line)):
# データ項目値が0桁より大きいかチェックする
if len(line[i]) == 0:
# 0桁の場合
sql = f'{sql} NULL,'
continue
# データ項目値の変換処理(カンマ除去)
org_column_value = line[i]
current_settings_db_column_name = settings_db_columu_list[i]
column_value = convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list)
# INSERT文のパラメータとそれに対応するプレースホルダーを設定する
query_parameter_list.append(column_value)
sql = f'{sql} %s,'
sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名
sql = f'{sql} "{index + 1}",' # システム項目:取込ファイル行番号
sql = f'{sql} "0",' # システム項目:論理削除フラグ
sql = f'{sql} CURRENT_USER(),' # システム項目:登録者
sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時
sql = f'{sql} NULL,' # システム項目:更新者
sql = f'{sql} NULL)' # システム項目:更新日時
index += 1
debug_log(sql, log_info, mode)
# ロードスキーマのトランザクション開始
with conn.cursor() as cur:
cur.execute(sql, query_parameter_list)
conn.commit()
normal_count += 1
except Exception as e:
warning_count += 1
warning_info = f'{warning_info}{index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n'
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}')
org_import_process_result = None
bulk_import_flag = settings_list[SETTINGS_ITEM["bulkImportFlag"]]
if not bulk_import_flag or int(bulk_import_flag) == 0:
# ⑤-1 一括登録フラグが未設定、またはOFFの場合、行コミットモードでロードスキーマに登録を行う
org_import_process_result = import_data_with_commit_per_row(bucket_name, target_data_source, target_file_name, log_info, mode,
settings_list, conn)
elif bulk_import_flag and int(bulk_import_flag) == 1:
# ⑤-2 一括登録フラグがONの場合、一括登録モードでロードスキーマに登録を行う
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-22 - 一括登録モードが有効のため、一括INSERTを実行します')
org_import_process_result = import_data_with_bulk(bucket_name, target_data_source, target_file_name, log_info, mode,
settings_list, conn)
else:
# nop
pass
# 処理件数カウンタ
process_count = org_import_process_result['counts']['process']
# 正常終了件数カウンタ
normal_count = org_import_process_result['counts']['normal']
# ワーニング終了件数カウンター
warning_count = org_import_process_result['counts']['warning']
# ワーニング情報
warning_info = org_import_process_result['warning_info']
# ⑥ ⑤の処理結果件数をログ出力する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}')
if warning_info:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - Warning終了件数{warning_count}')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-02 - Warning終了件数{warning_count}')
# ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します')
storage_schema_name = settings_list[SETTINGS_ITEM["storageSchemaName"]]
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-08 - ' +
f'ロードスキーマ({load_schema_name})のデータを蓄積スキーマ({storage_schema_name})に登録します')
# インポート方法判断
try:
if truncate_judge(settings_list):
with conn.cursor() as cur:
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["storageSchemaName"]]}'
sql_truncate = f'TRUNCATE table {storage_schema_name}'
cur.execute(sql_truncate)
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-20 - {settings_list[SETTINGS_ITEM["storageSchemaName"]]} をTRUNCATEしました')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO}' +
f' I-MAIN-20 - {storage_schema_name} をTRUNCATEしました')
except InvalidConfigException as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-01 - エラー内容:{e}')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-01 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
# SQL文生成
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]} ('
settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip(
).split(',')
sql = f'INSERT INTO {storage_schema_name} ('
for i in range(len(settings_db_columu_list)):
sql = f'{sql} {settings_db_columu_list[i]},'
sql = f'{sql} file_name,' # システム項目:取込ファイル名
@ -218,79 +209,340 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
sql = f'{sql} t.ins_date,' # システム項目:登録日時
sql = f'{sql} t.upd_user,' # システム項目:更新者
sql = f'{sql} t.upd_date' # システム項目:更新日時
sql = f'{sql} FROM {settings_list[SETTINGS_ITEM["loadSchemaName"]]} as t'
sql = f'{sql} FROM {load_schema_name} as t'
sql = f'{sql} ON DUPLICATE KEY UPDATE'
for i in range(len(settings_db_columu_list)):
sql = f'{sql} {settings_db_columu_list[i]}=t.{settings_db_columu_list[i]},'
sql = f'{sql} file_name=t.file_name,' # システム項目:取込ファイル名
sql = f'{sql} file_row_cnt=t.file_row_cnt,' # システム項目:取込ファイル行番号
sql = f'{sql} delete_flg={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.delete_flg,' # システム項目:論理削除フラグ
sql = f'{sql} ins_user={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_user,' # システム項目:登録者
sql = f'{sql} ins_date={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_date,' # システム項目:登録日時
sql = f'{sql} upd_user=t.ins_user,' # システム項目:更新者
sql = f'{sql} upd_date=t.ins_date' # システム項目:更新日時
# システム項目:取込ファイル名
sql = f'{sql} file_name=t.file_name,'
# システム項目:取込ファイル行番号
sql = f'{sql} file_row_cnt=t.file_row_cnt,'
# システム項目:論理削除フラグ
sql = f'{sql} delete_flg={storage_schema_name}.delete_flg,'
# システム項目:登録者
sql = f'{sql} ins_user={storage_schema_name}.ins_user,'
# システム項目:登録日時
sql = f'{sql} ins_date={storage_schema_name}.ins_date,'
# システム項目:更新者
sql = f'{sql} upd_user=t.ins_user,'
# システム項目:更新日時
sql = f'{sql} upd_date=t.ins_date'
debug_log(sql, log_info, mode)
# トランザクション開始
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - 標準SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-09 - ' +
f'標準SQL{storage_schema_name} のトランザクションを開始します')
with conn.cursor() as cur:
cur.execute(sql)
conn.commit()
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 標準SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-10 - ' +
f'標準SQL{storage_schema_name} のCOMMIT処理が正常終了しました')
# ⑧ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします')
if settings_list[SETTINGS_ITEM["exSqlFileName"]]:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-11 - 拡張SQL設定が存在するかチェックします')
ex_sql_file_name = settings_list[SETTINGS_ITEM["exSqlFileName"]]
if ex_sql_file_name:
try:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました')
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック')
ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]]
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-12 - 拡張SQL設定の存在を確認しました')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-13 - 拡張SQLファイル名{ex_sql_file_name} の存在チェック')
ex_sql_key = target_data_source + DIRECTORY_SETTINGS + ex_sql_file_name
s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key)
ex_sql_file_exists = True
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました')
except Exception as e:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました')
except Exception:
warning_info = f'{warning_info}- 拡張SQLファイルが存在しません\n'
ex_sql_file_exists = False
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLファイルが存在しません')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-03 - 拡張SQLファイルが存在しません')
try:
if ex_sql_file_exists:
# 拡張SQLファイルからSQL文生成
ex_sql_obj_response = s3_client.get_object(Bucket=bucket_name, Key=ex_sql_key)
ex_sql_obj_response = s3_client.get_object(
Bucket=bucket_name, Key=ex_sql_key)
ex_sql = ''
for line in io.TextIOWrapper(io.BytesIO(ex_sql_obj_response["Body"].read()), encoding='utf-8'):
ex_sql = f'{ex_sql} {line.rstrip()}'
# トランザクション開始
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-15 - 拡張SQL{storage_schema_name} のトランザクションを開始します')
with conn.cursor() as cur:
cur.execute(ex_sql)
conn.commit()
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-16 - 拡張SQL{storage_schema_name} のCOMMIT処理が正常終了しました')
except Exception as e:
warning_info = f'{warning_info}- 拡張SQLにエラーが発生しました{e}\n'
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-04 - 拡張SQLにエラーが発生しました{e}')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-04 - 拡張SQLにエラーが発生しました{e}')
else:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-17 - 拡張SQL設定の存在はありませんでした')
# ⑨ DB接続を終了する
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
connection_close(conn, bucket_name, target_data_source,
target_file_name, log_info)
except Exception as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}')
connection_close(conn, bucket_name, target_data_source,
target_file_name, log_info)
error(bucket_name, target_data_source, target_file_name, log_info)
try:
# ⑩ メイン処理終了ログを出力する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-19 - メイン処理を終了します')
return warning_info
except Exception as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
def import_data_with_commit_per_row(
bucket_name: str,
target_data_source: str,
target_file_name: str,
log_info: str,
mode: str,
settings_list: list[dict],
conn: pymysql.Connection) -> dict:
"""一行コミットモードでロードスキーマに登録を行います
Args:
bucket_name (str): バケット名
target_data_source (str): 投入データのディレクトリ名よりデータソースに該当する部分
target_file_name (str): 投入データのファイル名
log_info (str): ログに記載するデータソース名とファイル名
mode (str): 処理モード
settings_list (list[dict]): 設定ファイル
conn (pymysql.Connection): DB接続
Returns:
dict: 処理件数投入データ件数正常終了件数ワーニング件数とワーニング情報の辞書オブジェクト
"""
# ⑤-1 投入データファイルを1行ごとにループする
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します')
import_data_bytes = None
compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]]
if not compressed_flag or int(compressed_flag) == 0:
# 圧縮フラグが未設定、またはOFFの場合、S3バケット上の投入データファイルを読む
work_key = target_data_source + DIRECTORY_WORK + target_file_name
work_response = s3_client.get_object(
Bucket=bucket_name, Key=work_key)
import_data_bytes = work_response["Body"].read()
# 圧縮フラグがONの場合、チェック処理で展開済みのtmpディレクトリ内のファイルを読む
elif compressed_flag and int(compressed_flag) == 1:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-21 - ファイルが圧縮されていたため、展開済みのファイルを利用します')
with open(LOCAL_TEMPORARY_FILE_PATH, 'rb') as f:
import_data_bytes = f.read()
else:
# nop
pass
work_data = io.TextIOWrapper(io.BytesIO(import_data_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]],
newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
process_count = 0 # 処理件数カウンタ
normal_count = 0 # 正常終了件数カウンタ
warning_count = 0 # ワーニング終了件数カウンター
warning_info = '' # ワーニング情報
index = 0 # ループインデックス
settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip(
).split(',')
settings_replace_comma_list = settings_list[SETTINGS_ITEM["commaReplaceColumns"]].rstrip(
).split(',')
for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
delimiter=settings_list[SETTINGS_ITEM["delimiter"]]):
try:
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and index == 0:
index += 1
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-06 - ヘッダー行をスキップします')
continue
# 処理件数カウント
process_count += 1
# SQL文生成
query_parameter_list = []
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} ('
for db_column in settings_db_columu_list:
sql = f'{sql} {db_column},'
sql = f'{sql} file_name,' # システム項目:取込ファイル名
sql = f'{sql} file_row_cnt,' # システム項目:取込ファイル行番号
sql = f'{sql} delete_flg,' # システム項目:論理削除フラグ
sql = f'{sql} ins_user,' # システム項目:登録者
sql = f'{sql} ins_date,' # システム項目:登録日時
sql = f'{sql} upd_user,' # システム項目:更新者
sql = f'{sql} upd_date)' # システム項目:更新日時
sql = f'{sql} VALUES ('
for i in range(len(line)):
# データ項目値が0桁より大きいかチェックする
if len(line[i]) == 0:
# 0桁の場合
sql = f'{sql} NULL,'
continue
# データ項目値の変換処理(カンマ除去)
org_column_value = line[i]
current_settings_db_column_name = settings_db_columu_list[i]
column_value = convert_column_value(
org_column_value, current_settings_db_column_name, settings_replace_comma_list)
# INSERT文のパラメータとそれに対応するプレースホルダーを設定する
query_parameter_list.append(column_value)
sql = f'{sql} %s,'
sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名
sql = f'{sql} "{index + 1}",' # システム項目:取込ファイル行番号
sql = f'{sql} "0",' # システム項目:論理削除フラグ
sql = f'{sql} CURRENT_USER(),' # システム項目:登録者
sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時
sql = f'{sql} NULL,' # システム項目:更新者
sql = f'{sql} NULL)' # システム項目:更新日時
index += 1
debug_log(sql, log_info, mode)
# ロードスキーマのトランザクション開始
with conn.cursor() as cur:
cur.execute(sql, query_parameter_list)
conn.commit()
normal_count += 1
except Exception as e:
warning_count += 1
warning_info = f'{warning_info}{index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n'
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}')
return {
"counts": {
"process": process_count,
"normal": normal_count,
"warning": warning_count
},
"warning_info": warning_info
}
def import_data_with_bulk(
bucket_name: str,
target_data_source: str,
target_file_name: str,
log_info: str,
mode: str,
settings_list: list[dict],
conn: pymysql.Connection) -> dict:
"""一括登録モードでロードスキーマに登録を行います
Args:
bucket_name (str): バケット名
target_data_source (str): 投入データのディレクトリ名よりデータソースに該当する部分
target_file_name (str): 投入データのファイル名
log_info (str): ログに記載するデータソース名とファイル名
mode (str): 処理モード
settings_list (list[dict]): 設定ファイル
conn (pymysql.Connection): DB接続
Returns:
dict: 処理件数投入データ件数正常終了件数ワーニング件数とワーニング情報の辞書オブジェクト
"""
# ⑤-2 一括登録を行う
compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]]
if not compressed_flag or int(compressed_flag) == 0:
# 圧縮フラグがOFFの場合、投入データをtmpディレクトリにダウンロードする
work_key = target_data_source + DIRECTORY_WORK + target_file_name
s3_client.download_file(
Bucket=bucket_name, Key=work_key, Filename=LOCAL_TEMPORARY_FILE_PATH)
elif compressed_flag and int(compressed_flag) == 1:
# 圧縮フラグがONの場合、チェック処理で展開済みのtmpディレクトリ内のファイルを読むため、何もしない。
pass
else:
# nop
pass
process_count = 0 # 処理件数カウンタ
normal_count = 0 # 正常終了件数カウンタ
load_schema_name = settings_list[SETTINGS_ITEM["loadSchemaName"]]
has_header = settings_list[SETTINGS_ITEM["headerFlag"]] or int(
settings_list[SETTINGS_ITEM["headerFlag"]]) == 1
settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip(
).split(',')
enclosed_by = convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]])
sql = f"""
SET @file_row_cnt = 1;
LOAD DATA LOCAL
INFILE %s
REPLACE
INTO TABLE {load_schema_name}
CHARACTER SET {MYSQL_CHARSET_CODE[settings_list[SETTINGS_ITEM["charCode"]]]}
FIELDS TERMINATED BY '{settings_list[SETTINGS_ITEM["delimiter"]]}'
ENCLOSED BY '{enclosed_by if enclosed_by != "'" else "\\'"}'
LINES TERMINATED BY '{LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]}'
{'IGNORE 1 LINES' if has_header else ''}
({','.join([column for column in settings_db_columu_list])})
SET
-- 取込ファイル名
file_name = %s,
-- 取込ファイル行番号
file_row_cnt = (@file_row_cnt := @file_row_cnt + 1),
-- 論理削除フラグ
delete_flg = 0,
-- 登録者
ins_user = CURRENT_USER(),
-- 登録日時
ins_date = CURRENT_TIMESTAMP(),
-- 更新者
upd_user = NULL,
-- 更新日時
upd_date = NULL
;
"""
debug_log(sql, log_info, mode)
try:
# ロードスキーマのトランザクション開始
with conn.cursor() as cur:
cur.execute(sql, [LOCAL_TEMPORARY_FILE_PATH, target_file_name])
# 一括登録モードの場合、LOAD文の成功行数を取得してprocess_countにする
cur.execute("SELECT ROW_COUNT()")
process_count = cur.fetchone()[0]
conn.commit()
except Exception as e:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} 一括登録モードのSQL実行に失敗しました。エラー内容 {e}')
error(bucket_name, target_data_source, target_file_name, log_info)
# 一括登録の場合、クエリ実行に成功したら、処理件数と成功件数は同じにする
normal_count = process_count
return {
"counts": {
"process": process_count,
"normal": normal_count,
"warning": 0 # 一括登録時はワーニングにならずエラーになるため、必ず0件
},
"warning_info": '' # 一括登録時はワーニングにならずエラーになるため、空文字を返却
}
def connection_close(conn, bucket_name, target_data_source, target_file_name, log_info):
"""DB接続切断処理
Args:
@ -302,11 +554,14 @@ def connection_close(conn, bucket_name, target_data_source, target_file_name, lo
"""
try:
conn.close()
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-18 - DB接続を終了しました')
except Exception as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
def convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list):
"""データ項目値変換処理
- 数値内のカンマ除去処理
@ -320,8 +575,8 @@ def convert_column_value(org_column_value, current_settings_db_column_name, sett
"""
# 投入データのDB物理カラム名が設定ファイルの数値型のDBカラム物理名に含まれている場合、データ項目値の「,」を取り除く
converted_column_value = org_column_value
if current_settings_db_column_name in settings_replace_comma_list:
converted_column_value = converted_column_value.replace(',', '')
if current_settings_db_column_name in settings_replace_comma_list:
converted_column_value = converted_column_value.replace(',', '')
return converted_column_value
@ -335,15 +590,16 @@ def truncate_judge(settings_list):
Returns:
Bool: Truncate対象の場合TrueTruncate対象でない場合False
"""
import_manner = settings_list[SETTINGS_ITEM["importManner"]]
# upsert判定
if not settings_list[SETTINGS_ITEM["importManner"]]:
if not import_manner:
return False
# インポート方法設定チェック
if not settings_list[SETTINGS_ITEM["importManner"]].startswith(TRUNCATE_SRC_TABLE_SYMBOL):
if not import_manner.startswith(TRUNCATE_SRC_TABLE_SYMBOL):
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
import_manner_splitted_list = settings_list[SETTINGS_ITEM["importManner"]].split(':')
import_manner_splitted_list = import_manner.split(
':')
if len(import_manner_splitted_list) != 2:
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
if import_manner_splitted_list[1] != settings_list[SETTINGS_ITEM["storageSchemaName"]]: