feat: データ登録処理 メイン処理内のresourceインタフェースをクライアントインタフェースに変更
This commit is contained in:
parent
7a21b22861
commit
b8975ad453
@ -5,10 +5,9 @@ from datetime import datetime
|
|||||||
|
|
||||||
import boto3
|
import boto3
|
||||||
import pymysql
|
import pymysql
|
||||||
from pymysql.constants import CLIENT
|
|
||||||
|
|
||||||
from common import convert_quotechar, debug_log
|
from common import convert_quotechar, debug_log
|
||||||
from error import error
|
from error import error
|
||||||
|
from pymysql.constants import CLIENT
|
||||||
|
|
||||||
# 定数
|
# 定数
|
||||||
DIRECTORY_WORK = '/work/'
|
DIRECTORY_WORK = '/work/'
|
||||||
@ -47,7 +46,6 @@ INVALID_CONFIG_EXCEPTION_MESSAGE = f'個別設定ファイルのインポート
|
|||||||
|
|
||||||
# クラス変数
|
# クラス変数
|
||||||
s3_client = boto3.client('s3')
|
s3_client = boto3.client('s3')
|
||||||
s3_resource = boto3.resource('s3')
|
|
||||||
|
|
||||||
|
|
||||||
def main(bucket_name, target_data_source, target_file_name, settings_key, db_info, log_info, mode):
|
def main(bucket_name, target_data_source, target_file_name, settings_key, db_info, log_info, mode):
|
||||||
@ -91,8 +89,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
|||||||
|
|
||||||
# ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする
|
# ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする
|
||||||
# 個別設定ファイルの読み込み
|
# 個別設定ファイルの読み込み
|
||||||
settings_obj = s3_resource.Object(bucket_name, settings_key)
|
settings_response = s3_client.get_object(Bucket=bucket_name, Key=settings_key)
|
||||||
settings_response = settings_obj.get()
|
|
||||||
settings_list = []
|
settings_list = []
|
||||||
for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'):
|
for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'):
|
||||||
settings_list.append(line.rstrip('\n'))
|
settings_list.append(line.rstrip('\n'))
|
||||||
@ -110,8 +107,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
|||||||
# ⑤ 投入データファイルを1行ごとにループする
|
# ⑤ 投入データファイルを1行ごとにループする
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します')
|
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します')
|
||||||
work_key = target_data_source + DIRECTORY_WORK + target_file_name
|
work_key = target_data_source + DIRECTORY_WORK + target_file_name
|
||||||
work_obj = s3_resource.Object(bucket_name, work_key)
|
work_response = s3_client.get_object(Bucket=bucket_name, Key=work_key)
|
||||||
work_response = work_obj.get()
|
|
||||||
work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
|
work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
|
||||||
|
|
||||||
process_count = 0 # 処理件数カウンタ
|
process_count = 0 # 処理件数カウンタ
|
||||||
@ -261,10 +257,9 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
|||||||
try:
|
try:
|
||||||
if ex_sql_file_exists:
|
if ex_sql_file_exists:
|
||||||
# 拡張SQLファイルからSQL文生成
|
# 拡張SQLファイルからSQL文生成
|
||||||
ex_sqls_obj = s3_resource.Object(bucket_name, ex_sql_key)
|
ex_sql_obj_response = s3_client.get_object(Bucket=bucket_name, Key=ex_sql_key)
|
||||||
ex_sql_response = ex_sqls_obj.get()
|
|
||||||
ex_sql = ''
|
ex_sql = ''
|
||||||
for line in io.TextIOWrapper(io.BytesIO(ex_sql_response["Body"].read()), encoding='utf-8'):
|
for line in io.TextIOWrapper(io.BytesIO(ex_sql_obj_response["Body"].read()), encoding='utf-8'):
|
||||||
ex_sql = f'{ex_sql} {line.rstrip()}'
|
ex_sql = f'{ex_sql} {line.rstrip()}'
|
||||||
|
|
||||||
# トランザクション開始
|
# トランザクション開始
|
||||||
@ -358,3 +353,18 @@ def truncate_judge(settings_list):
|
|||||||
|
|
||||||
class InvalidConfigException(Exception):
|
class InvalidConfigException(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# ローカル実行用コード
|
||||||
|
# 値はよしなに変えてください
|
||||||
|
# if __name__ == '__main__':
|
||||||
|
# DB_INFO = {"host": '127.0.0.1', "name": 'org02', "pass": 'user', "user": 'user'}
|
||||||
|
# main(
|
||||||
|
# bucket_name='バケット名',
|
||||||
|
# target_data_source='投入データのディレクトリ名よりデータソースに該当する部分',
|
||||||
|
# target_file_name='投入データのファイル名',
|
||||||
|
# settings_key='投入データに該当する個別設定ファイルのフルパス',
|
||||||
|
# db_info=DB_INFO,
|
||||||
|
# log_info='info',
|
||||||
|
# mode='i'
|
||||||
|
# )
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user