database.database のソースコード

# coding: utf-8

from sqlalchemy      import create_engine
from sqlalchemy.pool import QueuePool

from halumain.haluconf          import HaluConf
from commons.specialcharconvert import SpecialCharConvert
from database.databasecommon    import DatabaseCommon

[ドキュメント] class Database(): """ Database に接続し、SQLを実行するクラス。 Attributes ---------- hconf : HaluConf 設定情報読み込み用オブジェクト。クラス変数として保持。 engine : dict エンジンの格納場所 connect : dict コネクションの格納場所 trans : dict トランザクションの格納場所 """ # クラス変数を定義 hconf = HaluConf() def __init__(self, dlog, dlogname): self.dlog = dlog self.dlogname = dlogname self.dlog.debug(self.dlogname, 'Database init start') self.specialcharconvert = SpecialCharConvert() # エンジン・コネクション・トランザクションの格納場所を初期化 self.engine = {} self.connect = {} self.trans = {} self.dlog.debug(self.dlogname, 'Database init end\n')
[ドキュメント] def create_engine(self, dbname): """ DB接続要求 (self.engineはインスタンス変数であるが、sqlalchemy内部でプーリングされているもよう) Parameters ---------- dbname : str 接続するデータベース名 """ try: if dbname not in self.engine: self.dlog.debug(self.dlogname, f'Database データベース接続処理 start : dbname = {dbname}') def_db = Database.hconf.def_database[dbname] # PostgreSQLの接続 if def_db['dbdriver'] == 'postgresql': str_db = def_db['dbdriver'] + '://' \ + def_db['username'] + ':' \ + def_db['password'] + '@' \ + def_db['hostname'] + ':' \ + def_db['portno'] + '/' \ + def_db['database'] self.engine[dbname] = create_engine(str_db, pool_size=5, max_overflow=0, poolclass=QueuePool) self.dlog.debug(self.dlogname, 'Database データベース接続処理 end') # SQLite create_engine('sqlite:///file.db', poolclass=QueuePool) # MySQL create_engine("mysql://user:pass@host/dbname", poolclass=QueuePool) except Exception as e: self.dlog.error(self.dlogname, f'Database create_engine exception message : {e}')
[ドキュメント] def connection(self, dbname): """ コネクション取得 Parameters ---------- dbname : str 接続するデータベース名 """ if dbname not in self.connect: self.dlog.debug(self.dlogname, f'Database コネクション取得処理 start : dbname = {dbname}') self.connect[dbname] = self.engine[dbname].connect() self.dlog.debug(self.dlogname, 'Database コネクション取得処理 end')
[ドキュメント] def begin(self, dbname): """ トランザクション開始 Parameters ---------- dbname : str 接続するデータベース名 """ if dbname not in self.trans: self.dlog.debug(self.dlogname, f'Database トランザクション開始処理 start : dbname = {dbname}') self.trans[dbname] = self.connect[dbname].begin() self.dlog.debug(self.dlogname, 'Database トランザクション開始処理 end')
[ドキュメント] def doSql(self, dbname, sql_info): """ SQL文の実行 Parameters ---------- sql_info : dict sqldict['sqls'] の1要素 """ # パラメータ設定の場合リターン sql_source = sql_info['sql'] if sql_source['type'] == 'param': return try: # DB接続要求、コネクション取得、トランザクション開始 self.create_engine(dbname) self.connection(dbname) self.begin(dbname) # SQL実行 self.dlog.debug(self.dlogname, f'Database SQL文の実行 start : dbname = {dbname}') self.doGenerateSql(dbname, sql_info) except Exception as e: self.dlog.error(self.dlogname, f'Database SQL文の実行 exception message : {e}') finally: self.dlog.debug(self.dlogname, f'Database SQL文の実行 end : dbname = {dbname}')
[ドキュメント] def doGenerateSql(self, dbname, sql_info): """ SQL文の実行 Parameters ---------- sql_info : dict sqldict['sqls'] の1要素 """ databasecommon = DatabaseCommon(self.dlog, self.dlogname, sql_info) # 入力レコードが定義されているか確認 if 'input' in sql_info: input_record = sql_info['input']['record'] max_line = databasecommon.getMaxLine() else: max_line = 1 # 入力レコードの件数分、繰り返す(入力レコードが無い時は一回) idx = 0 for _ in range(max_line): before_sql = databasecommon.createSql() self.dlog.debug(self.dlogname, f'Database バインド変数、置換前のSQL : {before_sql}') if 'input' in sql_info: after_sql = databasecommon.replaceSql(before_sql, input_record, idx) else: after_sql = before_sql # バインド変数置換後の文字列の特殊文字を元に戻す after_sql = self.specialcharconvert.editSqlDataToNormalChar(after_sql) self.dlog.debug(self.dlogname, f'Database バインド変数、置換後のSQL : {after_sql}') # SQL文を実行する(select 以外) if sql_info['sql']['type'] == 'doRun': #self.connect[dbname].execute(after_sql) self.execute(dbname, after_sql) idx += 1 continue if sql_info['sql']['type'] != 'select': # DBアクセスログを出力する(割愛) #self.connect[dbname].execute(after_sql) self.execute(dbname, after_sql) idx += 1 continue # SQL文を実行する(select) self.execute_select(dbname, after_sql, sql_info) idx += 1
[ドキュメント] def execute(self, dbname, str_sql): """ SQL実行 Parameters ---------- dbname : str 接続するデータベース名 str_sql : str 実行するSQL文 """ self.dlog.debug(self.dlogname, f'Database 実行するSQL文 : {str_sql}') result = self.connect[dbname].execute(str_sql) #self.dlog.debug(self.dlogname, f'Database 実行結果 : {result}') return result
[ドキュメント] def execute_select(self, dbname, str_sql, sql_info): """ Select SQL実行 """ self.dlog.debug(self.dlogname, f'Database 実行するSQL文 : {str_sql}') output_record = sql_info['output']['record'] self.dlog.debug(self.dlogname, f'Database SQL実行前の出力レコード : {output_record}') # 取得行毎にループ result = self.connect[dbname].execute(str_sql) for i, row in enumerate(result): self.dlog.debug(self.dlogname, f'Database 取得行毎にループ : i = {i}, row = {row}') for key, value in row.items(): if key in output_record: if value is None: value = '' if i == 0: output_record[key]['value'][0] = value else: output_record[key]['value'].append(value) # 取得データ無しの時の処理(割愛) self.dlog.debug(self.dlogname, f'Database SQL実行後の出力レコード : {output_record}')
[ドキュメント] def commit(self): """ コミット実行 """ before_key = "" for key in self.trans: if before_key == key: continue before_key = key self.dlog.debug(self.dlogname, f'Database コミット処理 start key : {key}') self.trans[key].commit() self.dlog.debug(self.dlogname, 'Database コミット処理 end')
[ドキュメント] def rollback(self): """ ロールバック """ for key in self.trans: self.dlog.debug(self.dlogname, f'Database ロールバック処理 start key : {key}') self.trans[key].rollback() self.dlog.debug(self.dlogname, 'Database ロールバック処理 end')
[ドキュメント] def close(self): """ コネクションをプールに戻す """ for key in self.connect: self.dlog.debug(self.dlogname, f'Database クローズ処理 start key : {key}') self.connect[key].close() self.dlog.debug(self.dlogname, 'Database クローズ処理 end\n') self.engine = {} self.connect = {} self.trans = {}
[ドキュメント] def recordCheck(self, sql_info, sqldict): """ sql.json の'check' が設定されている場合、出力レコードの存在チェックを行う check の設定値 : 'not found error'/ 'found error' Parameters ---------- sql_info : dict sqldict['sqls'] の1要素 sql_data : dict sqldict """ try: if 'check' not in sql_info['sql']: return check = sql_info['sql']['check'] if check == '': return errormsg = sql_info['sql']['errormsg'] output_record = sql_info['output']['record'] self.dlog.debug(self.dlogname, f'Database recordCheck チェック処理 : {check}') for _, value in output_record.items(): size = len(value['value']) if check == 'not found error': if size == 1 and value['value'][0] == '': self.dlog.debug(self.dlogname, 'Database recordCheck チェック処理エラー 有り') sqldict['message']['status'] = 'ERROR' sqldict['message']['msg'] = errormsg return if check == 'found error': if size >= 1 and value['value'][0] != '': self.dlog.debug(self.dlogname, 'Database recordCheck チェック処理エラー 有り') sqldict['message']['status'] = 'ERROR' sqldict['message']['msg'] = errormsg return except Exception as e: self.dlog.error(self.dlogname, f'Database recordCheck exception message : {e}')