newdwh2021/ecs/jskult-webapp/src/services/master_mainte_service.py

263 lines
12 KiB
Python

import os
import json
import html
import csv
import pandas as pd
from fastapi import HTTPException
from io import TextIOWrapper
from src.aws.aws_api_client import AWSAPIClient
from src.aws.s3 import S3Client
from src.error.exceptions import DBException
from starlette import status
from datetime import datetime
from src.services.base_service import BaseService
from src.system_var import constants, environment
from src.repositories.base_repository import BaseRepository
from src.repositories.mst_inst_repository import MstInstRepository
from src.repositories.bu_master_cd_repository import BuMasterRepository
from src.repositories.emp_master_repository import EmpMasterRepository
from src.repositories.emp_chg_inst_repository import EmpChgInstRepository
from src.model.internal.master_mainte_csv import MasterMainteCSVItems
from src.model.internal.master_mainte_emp_chg_inst_function import NewEmpChgInstFunction
from src.model.internal.master_mainte_emp_chg_inst_function import ChangeEmpChgInstFunction
from src.model.view.inst_emp_csv_upload_view_model import InstEmpCsvUploadViewModel
from src.model.view.table_override_view_model import TableOverrideViewModel
from src.model.request.master_mainte_csvup import MasterMainteCsvUpModel
from src.model.request.master_mainte_csvdl import MasterMainteCsvDlModel
from src.logging.get_logger import get_logger
logger = get_logger('マスターメンテ')
class MasterMainteService(BaseService):
REPOSITORIES = {
'mst_inst_repository': MstInstRepository,
'emp_master_repository': EmpMasterRepository,
'bu_master_repository': BuMasterRepository,
'emp_chginst_repository': EmpChgInstRepository,
}
CLIENTS = {
's3_client': S3Client
}
mst_inst_repository: MstInstRepository
emp_master_repository: EmpMasterRepository
bu_master_repository: BuMasterRepository
emp_chginst_repository: EmpChgInstRepository
s3_client: S3Client
def __init__(self, repositories: dict[str, BaseRepository], clients: dict[str, AWSAPIClient]) -> None:
super().__init__(repositories, clients)
self.mst_inst_repository = repositories['mst_inst_repository']
self.emp_master_repository = repositories['emp_master_repository']
self.bu_master_repository = repositories['bu_master_repository']
self.emp_chginst_repository = repositories['emp_chginst_repository']
self.s3_client = clients['s3_client']
def prepare_mainte_csv_up_view(self,
file: TextIOWrapper,
csv_file_name: str,
csv_upload_form: MasterMainteCsvUpModel) -> InstEmpCsvUploadViewModel:
if csv_upload_form.select_function != 'new' and csv_upload_form.select_function != 'change':
raise Exception(f'機能の選択値が不正です: {csv_upload_form.select_function}')
if csv_upload_form.select_table != 'dummy' and csv_upload_form.select_table != 'real':
raise Exception(f'登録テーブルの選択値が不正です: {csv_upload_form.select_table}')
(table_name, selected_table_msg) = self.__choose_target_table(csv_upload_form.select_table)
csv_items = MasterMainteCSVItems(
file,
csv_upload_form.select_function,
table_name,
self.mst_inst_repository,
self.emp_master_repository,
self.bu_master_repository,
self.emp_chginst_repository
)
error_message_list = []
# CSVファイル0件(ヘッダ行のみ)チェック
if len(csv_items.lines) == 0:
error_message_list.append('選択されたCSVファイルの2行目以降に値が記入されておりません。')
else:
for row_item in csv_items:
error_message_list.extend([data for data in row_item.validate()])
csv_upload_list = []
json_upload_data = ''
if len(error_message_list) == 0:
csv_upload_list: list[dict] = csv_items.to_dict()
# json作成
json_upload_data = csv_items.to_json()
mainte_csv_up = InstEmpCsvUploadViewModel(
is_verified=True,
error_message_list=error_message_list,
select_function=csv_upload_form.select_function,
select_table=csv_upload_form.select_table,
csv_upload_list=csv_upload_list,
json_upload_data=json_upload_data,
csv_file_name=csv_file_name,
select_function_message=self.__make_dialog_confirm_message(
csv_upload_form.select_function,
selected_table_msg)
)
return mainte_csv_up
def prepare_mainte_new_inst_view(self,
user_name: str,
csv_upload_form: MasterMainteCsvUpModel) -> InstEmpCsvUploadViewModel:
(table_name, selected_table_msg) = self.__choose_target_table(csv_upload_form.select_table)
csv_data_list = json.loads(html.unescape(csv_upload_form.unescape().json_upload_data))
if csv_upload_form.select_function == 'new':
emp_chg_inst = NewEmpChgInstFunction(
csv_data_list,
table_name,
selected_table_msg,
user_name,
self.emp_chginst_repository)
elif csv_upload_form.select_function == 'change':
emp_chg_inst = ChangeEmpChgInstFunction(
csv_data_list,
table_name,
selected_table_msg,
user_name,
self.emp_chginst_repository)
else:
raise Exception(f'機能の選択値が不正です: {csv_upload_form.select_function}')
(result_message_list, raw_error_list) = emp_chg_inst.save()
error_message_list = []
error_message_list.extend(raw_error_list)
mainte_csv_up = InstEmpCsvUploadViewModel(
is_insert=True,
result_message_list=result_message_list,
error_message_list=error_message_list
)
return mainte_csv_up
def copy_data_real_to_dummy(self) -> TableOverrideViewModel:
try:
self.emp_chginst_repository.connect()
self.emp_chginst_repository.begin()
self.emp_chginst_repository.delete_dummy_table()
self.emp_chginst_repository.copy_real_to_dummy()
self.emp_chginst_repository.commit()
except Exception as e:
self.emp_chginst_repository.rollback()
raise e
finally:
self.emp_chginst_repository.disconnect()
# コピー完了をマークして画面に返却
table_override = TableOverrideViewModel(
is_override=True
)
return table_override
def search_emp_chg_inst_data(self, csv_download_form: MasterMainteCsvDlModel) -> pd.DataFrame:
try:
csv_download_form.unescape()
# 施設担当者データを検索
search_result_df = self.search_download_emp_chg_inst_data(csv_download_form)
except DBException as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={'error': 'db_error', 'message': e.args}
)
return search_result_df
def search_download_emp_chg_inst_data(self, csv_download_form: MasterMainteCsvDlModel):
(table_name, _) = self.__choose_target_table(csv_download_form.select_table)
search_result_df = self.emp_chginst_repository.fetch_as_data_frame(table_name, csv_download_form)
return search_result_df
def write_csv_file(self, data_frame: pd.DataFrame, header: list[str], download_file_name: str):
# csvに書き込み
output_file_path = os.path.join(constants.MENTE_CSV_TEMPORARY_FILE_DIR_PATH, download_file_name)
# 横長のDataFrameとするため、ヘッダーの加工処理
header_data = {}
for df_column, header_column in zip(data_frame.columns, header):
header_data[df_column] = header_column
header_df = pd.DataFrame([header_data], index=None)
output_df = pd.concat([header_df, data_frame])
# ヘッダー行としてではなく、1レコードとして出力する
output_df.to_csv(output_file_path, encoding="utf-8_sig", quoting=csv.QUOTE_ALL, index=False, header=False)
return output_file_path
def upload_emp_chg_inst_data_file(self, df: pd.DataFrame, user_id: str, select_table: str) -> tuple[str, str]:
if df.shape[0] == 0:
return '該当データが存在しないためCSVファイルを出力しませんでした', ''
# ファイル名に使用するタイムスタンプを初期化しておく
current_timestamp = datetime.now()
download_file_name = f'Result_{user_id}_{current_timestamp:%Y%m%d%H%M%S%f}.csv'
# ファイルを書き出し(CSV)
local_file_path = self.__write_emp_chg_inst_data_to_file(df, download_file_name)
# ローカルファイルからS3にアップロードし、ダウンロード用URLを取得する
download_file_url = ''
try:
bucket_name = environment.MASTER_MAINTENANCE_BUCKET
file_key = f'data/{os.path.basename(local_file_path)}'
self.s3_client.upload_file(local_file_path, bucket_name, file_key)
# アップロード後、ローカルからは削除する
self.delete_local_file(local_file_path)
download_file_url = self.generate_download_file_url(local_file_path)
except Exception as e:
logger.exception(f'S3 アクセスエラー{e}')
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={'error': 'aws_error', 'message': e.args}
)
if select_table == 'dummy':
result_msg = f'ダミーテーブルのデータ{df.shape[0]}件をCSVファイルに出力しました'
else:
result_msg = f'本番テーブルのデータ{df.shape[0]}件をCSVファイルに出力しました'
return result_msg, download_file_url
def generate_download_file_url(self, local_file_path: str) -> str:
bucket_name = environment.MASTER_MAINTENANCE_BUCKET
file_key = f'data/{os.path.basename(local_file_path)}'
return self.s3_client.generate_presigned_url(bucket_name, file_key, constants.MENTE_CSV_DOWNLOAD_FILE_NAME)
def __write_emp_chg_inst_data_to_file(self, df: pd.DataFrame, download_file_name: str) -> str:
logger.info('CSVファイルを出力する')
local_file_path = self.write_csv_file(
df, header=constants.MENTE_CSV_DOWNLOAD_HEADER, download_file_name=download_file_name)
return local_file_path
def __choose_target_table(self, select_table: str):
if select_table == 'dummy':
table_name = 'src05.emp_chg_inst_wrk'
selected_table_msg = constants.CSV_CHANGE_TABLE_NAME
elif select_table == 'real':
table_name = 'src05.emp_chg_inst'
selected_table_msg = constants.CSV_REAL_TABLE_NAME
else:
raise Exception(f'登録テーブルの選択値が不正です: {select_table}')
return (table_name, selected_table_msg)
def __make_dialog_confirm_message(self, select_function: str, selected_table_msg: str) -> str:
select_function_msg = '新規施設登録' if select_function == 'new' else '施設担当者変更'
return f'{selected_table_msg}{select_function_msg}を行いますか?'
def delete_local_file(self, local_file_path: str):
os.remove(local_file_path)