Merge pull request #5 feature-NEWDWH2021-191 into develop

This commit is contained in:
朝倉 明日香 2021-10-28 13:40:41 +09:00
commit 0b98a89448
8 changed files with 232 additions and 135 deletions

View File

@ -3,6 +3,7 @@ import boto3
import io import io
import csv import csv
from error import error from error import error
from common import debug_log
# 定数 # 定数
LOG_LEVEL = {'i': 'Info', 'e': 'Error'} LOG_LEVEL = {'i': 'Info', 'e': 'Error'}
@ -35,7 +36,7 @@ class CheckError(Exception):
pass pass
def check(bucket_name, target_key, target_data_source, target_file_name, settings_key, log_info): def check(bucket_name, target_key, target_data_source, target_file_name, settings_key, log_info, mode):
"""チェック処理 """チェック処理
Args: Args:
bucket_name : バケット名 bucket_name : バケット名
@ -44,19 +45,29 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting
target_file_name : 投入データのファイル名 target_file_name : 投入データのファイル名
settings_key : 投入データに該当する個別設定ファイルのフルパス settings_key : 投入データに該当する個別設定ファイルのフルパス
log_info : ログに記載するデータソース名とファイル名 log_info : ログに記載するデータソース名とファイル名
mode : 処理モード
Raises: Raises:
CheckError : チェックでエラーがあった場合に発生する例外 CheckError : チェックでエラーがあった場合に発生する例外
""" """
# ① チェック処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します')
try: try:
debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode)
debug_log(f'引数 target_key : {target_key}', log_info, mode)
debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode)
debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode)
debug_log(f'引数 settings_key : {settings_key}', log_info, mode)
debug_log(f'引数 log_info : {log_info}', log_info, mode)
debug_log(f'引数 mode : {mode}', log_info, mode)
# ① チェック処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します')
# データ読込 # データ読込
settings_obj = s3_resource.Object(bucket_name, settings_key) settings_obj = s3_resource.Object(bucket_name, settings_key)
settings_response = settings_obj.get() settings_response = settings_obj.get()
settings_list = [] settings_list = []
for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'): for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'):
settings_list.append(line.rstrip()) settings_list.append(line.rstrip('\n'))
target_obj = s3_resource.Object(bucket_name, target_key) target_obj = s3_resource.Object(bucket_name, target_key)
target_response = target_obj.get() target_response = target_obj.get()
@ -82,6 +93,8 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting
raise CheckError(f'E-CHK-02 - 項目順序が一致しません {i}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{target_header_list[i]}') raise CheckError(f'E-CHK-02 - 項目順序が一致しません {i}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{target_header_list[i]}')
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-2正常終了') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-2正常終了')
# ④ チェック処理終了ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - チェック処理を終了します')
except CheckError as e: except CheckError as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} {e}') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} {e}')
error(bucket_name, target_data_source, target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info)
@ -89,6 +102,3 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting
except Exception as e: except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info)
# ④ チェック処理終了ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - チェック処理を終了します')

View File

@ -0,0 +1,11 @@
from datetime import datetime
MODE_TYPE = {
'n': 'normal',
'd': 'debug',
}
def debug_log(log, log_info, mode):
if MODE_TYPE['d'] == mode:
print(f'{str(datetime.now())} {log_info} Debug {log}')

View File

@ -4,12 +4,14 @@ from ini import init
from chk import check from chk import check
from main import main from main import main
from end import end from end import end
from error import error
# 引数 # 引数
BUCKET_NAME = os.environ["BUCKET_NAME"] BUCKET_NAME = os.environ["BUCKET_NAME"]
TARGET_KEY = os.environ["TARGET_KEY"] TARGET_KEY = os.environ["TARGET_KEY"]
DATA_SOURCE_NAME = os.environ["DATA_SOURCE_NAME"] DATA_SOURCE_NAME = os.environ["DATA_SOURCE_NAME"]
FILE_NAME = os.environ["FILE_NAME"] FILE_NAME = os.environ["FILE_NAME"]
MODE = os.environ["MODE"]
# 環境変数 # 環境変数
DB_HOST = os.environ["DB_HOST"] DB_HOST = os.environ["DB_HOST"]
@ -29,25 +31,28 @@ LOG_INFO = f' {DATA_SOURCE_NAME} {FILE_NAME} '
""" """
try:
# ① データ取込処理開始ログを出力する
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します')
# ① データ取込処理開始ログを出力する # ② 初期処理を呼び出す
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します') print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し')
settings_key = init(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO, MODE)
# ② 初期処理を呼び出す # ③ チェック処理を呼び出す
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し') print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し')
settings_key = init(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO) check(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO, MODE)
# ③ チェック処理を呼び出す # ④ メイン処理を呼び出す
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し') print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し')
check(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO) warning_info = main(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO, MODE)
# ④ メイン処理を呼び出す # ⑤ 終了処理を呼び出す
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し') print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し')
warning_info = main(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO) end(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, warning_info, LOG_INFO, MODE)
# ⑤ 終了処理を呼び出す # ⑥ データ取込処理終了ログを出力する
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し') print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します')
end(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, warning_info, LOG_INFO) except Exception as e:
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["e"]} E-CTRL-99 - エラー内容:{e}')
# ⑥ データ取込処理終了ログを出力する error(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO)
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します')

View File

@ -1,6 +1,7 @@
from datetime import datetime from datetime import datetime
import boto3 import boto3
from error import error from error import error
from common import debug_log
# 定数 # 定数
LOG_LEVEL = {'i': 'Info', 'e': 'Error'} LOG_LEVEL = {'i': 'Info', 'e': 'Error'}
@ -14,7 +15,7 @@ s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3') s3_resource = boto3.resource('s3')
def end(bucket_name, target_data_source, target_file_name, warning_info, log_info): def end(bucket_name, target_data_source, target_file_name, warning_info, log_info, mode):
"""終了処理 """終了処理
Args: Args:
bucket_name : バケット名 bucket_name : バケット名
@ -22,11 +23,20 @@ def end(bucket_name, target_data_source, target_file_name, warning_info, log_inf
target_file_name : 投入データのファイル名 target_file_name : 投入データのファイル名
warning_info : Warning情報 warning_info : Warning情報
log_info : ログに記載するデータソース名とファイル名 log_info : ログに記載するデータソース名とファイル名
mode : 処理モード
""" """
# ① 終了処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-01 - 終了処理を開始します')
try: try:
debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode)
debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode)
debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode)
debug_log(f'引数 warning_info : {warning_info}', log_info, mode)
debug_log(f'引数 log_info : {log_info}', log_info, mode)
debug_log(f'引数 mode : {mode}', log_info, mode)
# ① 終了処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-01 - 終了処理を開始します')
# ② 投入データファイルをS3バケット内のworkディレクトリから、以下ファイル名でdoneディレクトリに移動(コピー削除)する # ② 投入データファイルをS3バケット内のworkディレクトリから、以下ファイル名でdoneディレクトリに移動(コピー削除)する
work_key = target_data_source + DIRECTORY_WORK + target_file_name work_key = target_data_source + DIRECTORY_WORK + target_file_name
work_obj = s3_resource.Object(bucket_name, work_key) work_obj = s3_resource.Object(bucket_name, work_key)
@ -71,9 +81,9 @@ def end(bucket_name, target_data_source, target_file_name, warning_info, log_inf
result_done_obj = s3_resource.Object(bucket_name, result_done_key) result_done_obj = s3_resource.Object(bucket_name, result_done_key)
result_done_obj.put(Body='') result_done_obj.put(Body='')
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-09 - targetディレクトリに {result_done_file_name} を作成しました') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-09 - targetディレクトリに {result_done_file_name} を作成しました')
# ⑤ 終了処理終了ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-10 - 終了処理を終了します')
except Exception as e: except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-END-99 - エラー内容:{e}') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-END-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info)
# ⑤ 終了処理終了ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-10 - 終了処理を終了します')

View File

@ -22,10 +22,10 @@ def error(bucket_name, target_data_source, target_file_name, log_info):
log_info : ログに記載するデータソース名とファイル名 log_info : ログに記載するデータソース名とファイル名
""" """
# ① エラー処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-01 - エラー処理を開始します')
try: try:
# ① エラー処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-01 - エラー処理を開始します')
# ② 投入データファイルをS3バケット内のworkディレクトリから、以下ファイル名でerrorディレクトリに移動(コピー削除)する # ② 投入データファイルをS3バケット内のworkディレクトリから、以下ファイル名でerrorディレクトリに移動(コピー削除)する
work_key = target_data_source + DIRECTORY_WORK + target_file_name work_key = target_data_source + DIRECTORY_WORK + target_file_name
work_obj = s3_resource.Object(bucket_name, work_key) work_obj = s3_resource.Object(bucket_name, work_key)

View File

@ -5,6 +5,7 @@ import csv
import re import re
import sys import sys
from error import error from error import error
from common import debug_log
# 定数 # 定数
LOG_LEVEL = {"i": 'Info', "e": 'Error'} LOG_LEVEL = {"i": 'Info', "e": 'Error'}
@ -18,7 +19,7 @@ s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3') s3_resource = boto3.resource('s3')
def init(bucket_name, target_key, target_data_source, target_file_name, log_info): def init(bucket_name, target_key, target_data_source, target_file_name, log_info, mode):
"""初期処理 """初期処理
Args: Args:
bucket_name : バケット名 bucket_name : バケット名
@ -26,16 +27,27 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info
target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分
target_file_name : 投入データのファイル名 target_file_name : 投入データのファイル名
log_info : ログに記載するデータソース名とファイル名 log_info : ログに記載するデータソース名とファイル名
mode : 処理モード
Returns: Returns:
settings_key : 投入データに該当する個別設定ファイルのフルパス settings_key : 投入データに該当する個別設定ファイルのフルパス
""" """
# ① 初期処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-01 - 初期処理を開始します')
# ② S3バケット内のtargetディレクトリに「投入データファイル名.doing」ファイルが存在するかチェックする
try: try:
debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode)
debug_log(f'引数 target_key : {target_key}', log_info, mode)
debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode)
debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode)
debug_log(f'引数 log_info : {log_info}', log_info, mode)
debug_log(f'引数 mode : {mode}', log_info, mode)
# ① 初期処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-01 - 初期処理を開始します')
except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
try:
# ② S3バケット内のtargetディレクトリに「投入データファイル名.doing」ファイルが存在するかチェックする
doing_file_name = target_file_name + '.doing' doing_file_name = target_file_name + '.doing'
doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-02 - doingファイル{doing_file_name} の存在チェック') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-02 - doingファイル{doing_file_name} の存在チェック')
@ -64,8 +76,8 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info)
# ⑤ S3バケット内のtargetディレクトリの以下ファイル群を削除(前回分の削除)する
try: try:
# ⑤ S3バケット内のtargetディレクトリの以下ファイル群を削除(前回分の削除)する
# doneファイルの存在確認 # doneファイルの存在確認
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-06 - doneファイル{target_file_name}.done の存在チェック') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-06 - doneファイル{target_file_name}.done の存在チェック')
result_done_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.done' result_done_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.done'
@ -95,8 +107,8 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info
except Exception as e: except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-14 - errorファイルは存在しませんでした') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-14 - errorファイルは存在しませんでした')
# ⑥ 個別設定マッピングリストが存在するかチェックする
try: try:
# ⑥ 個別設定マッピングリストが存在するかチェックする
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-15 - 個別設定マッピングリスト:{MAPPING_FILE_NAME} の存在チェック') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-15 - 個別設定マッピングリスト:{MAPPING_FILE_NAME} の存在チェック')
mapping_key = target_data_source + DIRECTORY_SETTINGS + MAPPING_FILE_NAME mapping_key = target_data_source + DIRECTORY_SETTINGS + MAPPING_FILE_NAME
s3_client.head_object(Bucket=bucket_name, Key=mapping_key) s3_client.head_object(Bucket=bucket_name, Key=mapping_key)
@ -105,8 +117,8 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-02 - 個別設定マッピングリストが存在しません') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-02 - 個別設定マッピングリストが存在しません')
error(bucket_name, target_data_source, target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info)
# ⑦ 個別設定ファイルを特定する
try: try:
# ⑦ 個別設定ファイルを特定する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-17 - 個別設定ファイルを検索します') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-17 - 個別設定ファイルを検索します')
mapping_obj = s3_resource.Object(bucket_name, mapping_key) mapping_obj = s3_resource.Object(bucket_name, mapping_key)
mapping_response = mapping_obj.get() mapping_response = mapping_obj.get()
@ -126,8 +138,8 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info)
# ⑧ ⑦の個別設定ファイルが存在するかチェックする
try: try:
# ⑧ ⑦の個別設定ファイルが存在するかチェックする
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-19 - 個別設定ファイル:{settings_file_name} の存在チェック') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-19 - 個別設定ファイル:{settings_file_name} の存在チェック')
settings_key = target_data_source + DIRECTORY_SETTINGS + settings_file_name settings_key = target_data_source + DIRECTORY_SETTINGS + settings_file_name
s3_client.head_object(Bucket=bucket_name, Key=settings_key) s3_client.head_object(Bucket=bucket_name, Key=settings_key)
@ -136,6 +148,11 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-04 - 個別設定ファイルが存在しません') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-04 - 個別設定ファイルが存在しません')
error(bucket_name, target_data_source, target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info)
# ⑨ 初期処理終了ログを出力する try:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-21 - 初期処理を終了します') # ⑨ 初期処理終了ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-21 - 初期処理を終了します')
except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
return settings_key return settings_key

View File

@ -5,6 +5,7 @@ from pymysql.constants import CLIENT
import io import io
import csv import csv
from error import error from error import error
from common import debug_log
# 定数 # 定数
LOG_LEVEL = {"i": 'Info', "e": 'Error', "w": 'Warning'} LOG_LEVEL = {"i": 'Info', "e": 'Error', "w": 'Warning'}
@ -34,7 +35,7 @@ s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3') s3_resource = boto3.resource('s3')
def main(bucket_name, target_key, target_data_source, target_file_name, settings_key, db_info, log_info): def main(bucket_name, target_key, target_data_source, target_file_name, settings_key, db_info, log_info, mode):
"""メイン処理 """メイン処理
Args: Args:
bucket_name : バケット名 bucket_name : バケット名
@ -44,30 +45,44 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings
settings_key : 投入データに該当する個別設定ファイルのフルパス settings_key : 投入データに該当する個別設定ファイルのフルパス
db_info : データベース情報 db_info : データベース情報
log_info : ログに記載するデータソース名とファイル名 log_info : ログに記載するデータソース名とファイル名
mode : 処理モード
Returns: Returns:
warning_info : Warning情報 warning_info : Warning情報
""" """
# ① メイン処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します')
try: try:
debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode)
debug_log(f'引数 target_key : {target_key}', log_info, mode)
debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode)
debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode)
debug_log(f'引数 settings_key : {settings_key}', log_info, mode)
debug_log(f'引数 db_info : {db_info}', log_info, mode)
debug_log(f'引数 log_info : {log_info}', log_info, mode)
debug_log(f'引数 mode : {mode}', log_info, mode)
# ① メイン処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します')
# ② DB接続を開始する # ② DB接続を開始する
conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS) conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS)
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました')
except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
try:
# ③ タイムゾーンを変更する # ③ タイムゾーンを変更する
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute(f'SET time_zone = "+9:00"') cur.execute(f'SET time_zone = "+9:00"')
result = cur.fetchall() result = cur.fetchall()
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました:{result}') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました:{result}')
# ③ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする # 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする
settings_obj = s3_resource.Object(bucket_name, settings_key) settings_obj = s3_resource.Object(bucket_name, settings_key)
settings_response = settings_obj.get() settings_response = settings_obj.get()
settings_list = [] settings_list = []
for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'): for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'):
settings_list.append(line.rstrip()) settings_list.append(line.rstrip('\n'))
with conn.cursor() as cur: with conn.cursor() as cur:
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}' sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}'
@ -75,9 +90,10 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました')
except Exception as e: except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
error(bucket_name, target_data_source, target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info)
# 投入データファイルを1行ごとにループする # 投入データファイルを1行ごとにループする
try: try:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します')
target_obj = s3_resource.Object(bucket_name, target_key) target_obj = s3_resource.Object(bucket_name, target_key)
@ -85,53 +101,61 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings
target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
except Exception as e: except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
error(bucket_name, target_data_source, target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info)
process_count = 0 # 処理件数カウンタ
normal_count = 0 # 正常終了件数カウンタ
warning_count = 0 # ワーニング終了件数カウンター
warning_info = '' # ワーニング情報
index = 0 # ループインデックス
for line in csv.reader(target_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]):
try:
if settings_list[SETTINGS_ITEM["headerFlag"]] and index == 0:
index += 1
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします')
continue
# 処理件数カウント
process_count += 1
# SQL文生成
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} VALUES ('
for i in range(len(line)):
sql = f'{sql} "{line[i]}",'
sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名
sql = f'{sql} "{index}",' # システム項目:取込ファイル行番号
sql = f'{sql} "0",' # システム項目:論理削除フラグ
sql = f'{sql} CURRENT_USER(),' # システム項目:登録者
sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時
sql = f'{sql} NULL,' # システム項目:更新者
sql = f'{sql} NULL)' # システム項目:更新日時
# ロードスキーマのトランザクション開始
with conn.cursor() as cur:
cur.execute(sql)
conn.commit()
normal_count += 1
index += 1
except Exception as e:
warning_info = f'{warning_info} {index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n'
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}')
# ⑤ ④の処理結果件数をログ出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count} Warning終了件数{warning_count}')
# ⑥ ロードスキーマのデータを蓄積スキーマにUPSERTする
try: try:
process_count = 0 # 処理件数カウンタ
normal_count = 0 # 正常終了件数カウンタ
warning_count = 0 # ワーニング終了件数カウンター
warning_info = '' # ワーニング情報
index = 0 # ループインデックス
for line in csv.reader(target_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]):
try:
if settings_list[SETTINGS_ITEM["headerFlag"]] and index == 0:
index += 1
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします')
continue
# 処理件数カウント
process_count += 1
# SQL文生成
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} VALUES ('
for i in range(len(line)):
sql = f'{sql} "{line[i]}",'
sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名
sql = f'{sql} "{index}",' # システム項目:取込ファイル行番号
sql = f'{sql} "0",' # システム項目:論理削除フラグ
sql = f'{sql} CURRENT_USER(),' # システム項目:登録者
sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時
sql = f'{sql} NULL,' # システム項目:更新者
sql = f'{sql} NULL)' # システム項目:更新日時
# ロードスキーマのトランザクション開始
with conn.cursor() as cur:
cur.execute(sql)
conn.commit()
normal_count += 1
index += 1
except Exception as e:
warning_info = f'{warning_info} {index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n'
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}')
except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
error(bucket_name, target_data_source, target_file_name, log_info)
try:
# ⑥ ⑤の処理結果件数をログ出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}')
if warning_info:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - Warning終了件数{warning_count}')
# ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します')
# SQL文生成 # SQL文生成
@ -155,53 +179,71 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 標準SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 標準SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました')
except Exception as e: except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
error(bucket_name, target_data_source, target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info)
# ⑦ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック try:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします') # ⑧ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック
if settings_list[SETTINGS_ITEM["exSqlFileName"]]: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします')
try: if settings_list[SETTINGS_ITEM["exSqlFileName"]]:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました') try:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました')
ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]] print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック')
s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key) ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]]
ex_sql_file_exists = True s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key)
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました') ex_sql_file_exists = True
except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました')
warning_info = f'{warning_info} - 拡張SQLファイルが存在しません\n' except Exception as e:
ex_sql_file_exists = False warning_info = f'{warning_info} - 拡張SQLファイルが存在しません\n'
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - 拡張SQLファイルが存在しません') ex_sql_file_exists = False
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLファイルが存在しません')
try: try:
if ex_sql_file_exists: if ex_sql_file_exists:
# 拡張SQLファイルからSQL文生成 # 拡張SQLファイルからSQL文生成
ex_sqls_obj = s3_resource.Object(bucket_name, ex_sql_key) ex_sqls_obj = s3_resource.Object(bucket_name, ex_sql_key)
ex_sql_response = ex_sqls_obj.get() ex_sql_response = ex_sqls_obj.get()
ex_sql = '' ex_sql = ''
for line in io.TextIOWrapper(io.BytesIO(ex_sql_response["Body"].read()), encoding='utf-8'): for line in io.TextIOWrapper(io.BytesIO(ex_sql_response["Body"].read()), encoding='utf-8'):
ex_sql = f'{ex_sql} {line.rstrip()}' ex_sql = f'{ex_sql} {line.rstrip()}'
# トランザクション開始 # トランザクション開始
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します')
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute(ex_sql) cur.execute(ex_sql)
conn.commit() conn.commit()
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました')
except Exception as e: except Exception as e:
warning_info = f'{warning_info} - 拡張SQLにエラーが発生しました{e}\n' warning_info = f'{warning_info} - 拡張SQLにエラーが発生しました{e}\n'
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLにエラーが発生しました{e}') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-04 - 拡張SQLにエラーが発生しました{e}')
else: else:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした')
# ⑧ DB接続を終了する # ⑨ DB接続を終了する
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
# ⑩ メイン処理終了ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します')
except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
error(bucket_name, target_data_source, target_file_name, log_info)
return warning_info
def connection_close(conn, bucket_name, target_data_source, target_file_name, log_info):
"""DB接続切断処理
Args:
conn : DBコネクション
bucket_name : バケット名
target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分
target_file_name : 投入データのファイル名
log_info : ログに記載するデータソース名とファイル名
"""
try: try:
conn.close() conn.close()
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました')
except Exception as e: except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info)
# ⑨ メイン処理終了ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します')
return warning_info

View File

@ -12,6 +12,7 @@ SECURITY_GROUP_ID_ECRAPI = os.environ["SECURITY_GROUP_ID_ECRAPI"]
SECURITY_GROUP_ID_ECRDKR = os.environ["SECURITY_GROUP_ID_ECRDKR"] SECURITY_GROUP_ID_ECRDKR = os.environ["SECURITY_GROUP_ID_ECRDKR"]
SECURITY_GROUP_ID_LOGS = os.environ["SECURITY_GROUP_ID_LOGS"] SECURITY_GROUP_ID_LOGS = os.environ["SECURITY_GROUP_ID_LOGS"]
SECURITY_GROUP_ID_RDS = os.environ["SECURITY_GROUP_ID_RDS"] SECURITY_GROUP_ID_RDS = os.environ["SECURITY_GROUP_ID_RDS"]
MODE = os.environ["MODE"]
# クラス変数 # クラス変数
ecs_client = boto3.client('ecs') ecs_client = boto3.client('ecs')
@ -59,6 +60,7 @@ def lambda_handler(event, context):
{"name": 'TARGET_KEY', "value": event_object_key}, {"name": 'TARGET_KEY', "value": event_object_key},
{"name": 'DATA_SOURCE_NAME', "value": event_data_source_name}, {"name": 'DATA_SOURCE_NAME', "value": event_data_source_name},
{"name": 'FILE_NAME', "value": event_file_name}, {"name": 'FILE_NAME', "value": event_file_name},
{"name": 'MODE', "value": MODE},
], ],
}, },
], ],