日志模块 主要用于记录程序是否执行的

张开发
2026/4/13 19:36:45 15 分钟阅读

分享文章

日志模块 主要用于记录程序是否执行的
# -*- coding: utf-8 -*- import time import sys import copy import pandas as pd from pathlib import Path from loguru import logger from functools import wraps from datetime import datetime from multiprocessing import Queue # 多进程安全日志队列仅多进程时使用 QhLogProcessQueue None # 默认为空单进程不走队列 # 系统路径配置优化版 QhRootPath Path(__file__).parent.parent.parent.resolve() # 防错路径不存在直接退出 if not QhRootPath.exists(): logger.critical(f工程根路径不存在程序退出{QhRootPath}, QueHui!) sys.exit(1) # 防错避免重复添加路径 if str(QhRootPath) not in sys.path: sys.path.append(str(QhRootPath)) logger.success(f工程根路径配置完成【{QhRootPath}】, QueHui!) # 内部依赖导入带异常捕获 try: from QhConfing import QhBasePath from QhLib.QhDbJson import * from QhLib.QhTool import * logger.info(内部依赖库导入完成, QueHui!) except Exception as e: logger.exception(f内部依赖库导入失败, QueHui!) sys.exit(1) def QhReadLogCsvIsExecuted(QhScodeNone, # 流数据代码字段 QhChengXuNameNone, # 需要执行的程序名 流程序需要拼接到文件名 QhIsLogTrue, # 是否需要日志判断 QhTradeTimeNone, # 交易日期 ): 【功能】日志CSV执行状态判断 【作用】读取日志CSV判断任务是否已执行成功 【返回】True 已执行成功跳过 | False 未执行/执行失败执行 【作者】阙辉 # 开关判断不校验日志 if not QhIsLog: logger.info(f【日志判断】关闭日志校验直接执行任务QueHui!) return False # 读取配置 QhJsonConfing QHLOGDBJSON if QhScode in [None, ]: QhJsonKey QhLogList else: QhJsonKey QhLogFlowList logger.debug(f【日志判断】加载配置{QhJsonKey}QueHui!) QhConfig QhJsonConfing.get(QhJsonKey) if not QhConfig: logger.error(f【日志判断】ERROR未找到 {QhJsonKey} 配置QueHui!) return False # 解析配置 QhInterfaceName QhConfig.get(name, 未命名接口) logger.info(f【{QhInterfaceName}】开始校验任务执行状态QueHui!) # 交易日期 try: QhJiaoYiDate QhTradeTime.replace(-, ) except Exception as e: logger.error(f【{QhInterfaceName}】ERROR获取交易日期失败原因{str(e)}QueHui!) return False # 拼接文件名 try: if QhScode in [None, ]: QhWenJianNameAdd str(QhJiaoYiDate) else: QhWenJianNameAdd [str(QhJiaoYiDate), QhChengXuName] except Exception as e: logger.error(f【{QhInterfaceName}】ERROR日志文件名拼接失败原因{str(e)}QueHui!) return False # 读取日志DF try: QhLogDf QhReadTongYongDf( QhJsonKeyQhJsonKey, QhJsonNameQhJsonConfing, QhWenJianNameAddQhWenJianNameAdd, QhEncodingutf-8-sig ) except Exception as e: logger.error(f【{QhInterfaceName}】ERROR读取日志文件失败原因{str(e)}QueHui!) return False # 读取结果安全判断 if QhLogDf is None or not isinstance(QhLogDf, pd.DataFrame) or QhLogDf.empty: logger.warning(f【{QhInterfaceName}】日志文件读取为空任务未执行QueHui!) return False logger.success(f【{QhInterfaceName}】日志读取完成共 {len(QhLogDf)} 条记录QueHui!) # 筛选任务 try: if QhJsonKey QhLogList: QhLogDf00 QhLogDf[QhLogDf[QhTaskId] QhChengXuName] logger.info(f【{QhInterfaceName}】筛选任务ID{QhChengXuName}QueHui!) elif QhJsonKey QhLogFlowList: QhLogDf00 QhLogDf[(QhLogDf[QhTaskId] QhChengXuName) (QhLogDf[QhCode] QhScode)] logger.info(f【{QhInterfaceName}】筛选任务ID{QhChengXuName} | 代码{QhScode}QueHui!) # 未匹配到任务判断 if QhLogDf00 is None or not isinstance(QhLogDf00, pd.DataFrame) or QhLogDf00.empty: logger.info(f【{QhInterfaceName}】未找到任务记录准备执行QueHui!) return False logger.success(f【{QhInterfaceName}】匹配到任务记录共 {len(QhLogDf00)} 条QueHui!) # 取执行状态 QhIsExecuted QhLogDf00[QhIsExecuted].values[0] QhStatus QhLogDf00[QhStatus].values[0] logger.info(f【{QhInterfaceName}】任务状态执行{QhIsExecuted} | 结果{QhStatus}QueHui!) # 核心判断 if QhIsExecuted Y and QhStatus S: logger.success(f【{QhInterfaceName}】任务已执行成功自动跳过QueHui!) return True else: logger.info(f【{QhInterfaceName}】任务未执行/执行失败准备执行QueHui!) return False except Exception as e: logger.error(f【{QhInterfaceName}】ERROR任务状态解析异常原因{str(e)}QueHui!) return False def QhSaveLogCsvIsExecuted(QhScodeNone, # 流数据代码字段 QhChengXuNameNone, # 需要执行的程序名 流程序需要拼接到文件名 QhIsLogTrue, # 是否需要日志判断 QhMsgNone, # 传入日志消息 QhTradeTimeNone, # 交易日期 ): 【功能】日志CSV数据保存 【作用】根据配置写入任务执行日志到CSV文件 【参数】QhScode 流数据代码 QhChengXuName 程序名称 QhIsLog 是否开启日志 QhMsg 日志数据 【返回】无返回 【作者】阙辉 # 关闭日志直接退出 if not QhIsLog: logger.info(f【日志保存】关闭日志写入QueHui!) return # 读取配置 QhJsonConfing QHLOGDBJSON if QhScode in [None, ]: QhJsonKey QhLogList else: QhJsonKey QhLogFlowList logger.debug(f【日志保存】加载配置{QhJsonKey}QueHui!) QhConfig QhJsonConfing.get(QhJsonKey) if not QhConfig: logger.error(f【日志保存】ERROR未找到 {QhJsonKey} 配置QueHui!) return # 解析配置 QhInterfaceName QhConfig.get(name, 未命名接口) QhUniqueValue QhConfig.get(QhUniqueValue) QhWenJianPathList QhConfig.get(QhWenJianPath) QhWenJianName QhConfig.get(QhWenJianName) QhFiledCofig QhConfig.get(fielddic) logger.info(f【{QhInterfaceName}】开始写入任务执行日志QueHui!) # 交易日期 try: QhJiaoYiDate QhTradeTime.replace(-, ) except Exception as e: logger.error(f【{QhInterfaceName}】ERROR获取交易日期失败原因{str(e)}QueHui!) return # 拼接文件名 try: if QhScode in [None, ]: QhWenJianName00 copy.deepcopy(QhWenJianName).format(QhJiaoYiDate) else: QhWenJianName00 copy.deepcopy(QhWenJianName).format(QhJiaoYiDate, QhChengXuName) except Exception as e: logger.error(f【{QhInterfaceName}】ERROR日志文件名拼接失败原因{str(e)}QueHui!) return # 路径拼接 try: QhWenJianPath QhPathJoin(QhBasePath, QhWenJianPathList) logger.info(f【{QhInterfaceName}】日志目录{QhWenJianPath}QueHui!) QhFileFullPath QhWenJianPath / QhWenJianName00 logger.info(f【{QhInterfaceName}】日志文件{QhFileFullPath}QueHui!) except Exception as e: logger.error(f【{QhInterfaceName}】ERROR文件路径拼接失败原因{str(e)}QueHui!) return # 字段配置处理 if isinstance(QhFiledCofig, dict): QhFiledKeyList list(QhFiledCofig.keys()) QhFiledValueList list(QhFiledCofig.values()) elif isinstance(QhFiledCofig, list): QhFiledKeyList [] QhFiledValueList QhFiledCofig else: QhFiledKeyList [] QhFiledValueList [] logger.warning(f【{QhInterfaceName}】字段配置格式异常已清空字段QueHui!) # 安全处理防止None/非字典报错 QhFiledKeyList QhFiledKeyList or [] QhMsg QhMsg if isinstance(QhMsg, dict) else {} QhMsgList [] for QhRow in QhFiledKeyList: QhMsgList.append(QhMsg.get(QhRow, )) # 构建DF try: QhLogDf pd.DataFrame([QhMsgList], columnsQhFiledValueList) logger.info(f【{QhInterfaceName}】日志数据构建完成条数{len(QhLogDf)}QueHui!) except Exception as e: logger.error(f【{QhInterfaceName}】ERRORDataFrame构建失败原因{str(e)}QueHui!) return # 保存文件 logger.info(f【{QhInterfaceName}】开始保存文件...QueHui!) try: QhDfUpsertToCsv( QhWenJianPathQhFileFullPath, QhNewJieGuoDfQhLogDf, QhUniqueColumnsQhUniqueValue, QhSortColumnsNone, QhDateFormat%Y-%m-%d, QhEncodingutf-8-sig ) logger.success(f【{QhInterfaceName}】文件保存成功QueHui!) return except Exception as e: logger.error(f【{QhInterfaceName}】ERROR文件保存失败原因{str(e)}QueHui!) return def QhTaskLogDecorator(QhFunc): 日志装饰器 功能判断程序是否执行并存储过数据 成功则不再执行 多进程时自动走队列单进程直接写CSV 作者阙辉 wraps(QhFunc) def QhWrapper(*QhArgs, **QhKwargs): QhJiaoYiDate QhGetXGDateToJson()[2] # 交易时间获取 全局 # ---------------------- 提取装饰器专用参数 ---------------------- QhScode QhKwargs.get(QhScode, None) QhChengXuName QhKwargs.get(QhChengXuName, 未命名任务) QhIsLog QhKwargs.get(QhIsLog, True) QhRealFuncName QhFunc.__name__ # ---------------------- 日志判断是否跳过 ---------------------- if QhIsLog: try: QhIsSkip QhReadLogCsvIsExecuted( QhScodeQhScode, QhChengXuNameQhChengXuName, QhIsLogQhIsLog, QhTradeTimeQhJiaoYiDate ) if QhIsSkip: logger.info(f【{QhChengXuName}】任务已执行成功自动跳过QueHui!) return except Exception as QhEx: logger.error(f【{QhChengXuName}】日志校验失败{str(QhEx)}QueHui!) return # 开始计时 QhMsgDic {} QhMsgDic[QhTaskId] QhChengXuName QhMsgDic[QhFuncticName] QhRealFuncName if QhScode not in [None, ]: QhMsgDic[QhCode] QhScode QhMsgDic[QhTradeTime] QhJiaoYiDate QhStartTime time.time() QhBeginTime datetime.now().strftime(%Y-%m-%d %H:%M:%S) QhMsgDic[QhBeginTime] QhBeginTime logger.info(f【{QhChengXuName}】任务开始执行{QhBeginTime}QueHui!) # ---------------------- 执行业务函数 ---------------------- try: QhResult QhFunc(*QhArgs, **QhKwargs) QhMsgDic[QhIsExecuted] Y except Exception as QhEx: QhMsgDic[QhIsExecuted] N QhMsgDic[QhStatus] F QhMsgDic[QhMsg] f程序执行异常{str(QhEx)} logger.error(f【{QhChengXuName}】执行失败{str(QhEx)}QueHui!) # 多进程走队列单进程直接写 if QhIsLog: if QhLogProcessQueue is not None: QhLogProcessQueue.put((QhScode, QhChengXuName, QhIsLog, QhMsgDic, QhJiaoYiDate)) else: QhSaveLogCsvIsExecuted( QhScodeQhScode, QhChengXuNameQhChengXuName, QhIsLogQhIsLog, QhMsgQhMsgDic, QhTradeTimeQhJiaoYiDate ) raise # 结束计时 QhRunSec time.time() - QhStartTime QhRunHour round(QhRunSec / 3600, 6) QhMsgDic[QhHaoShi] QhRunHour QhEndTime datetime.now().strftime(%Y-%m-%d %H:%M:%S) QhMsgDic[QhEndTime] QhEndTime logger.info(f【{QhChengXuName}】任务执行完成{QhEndTime}QueHui!) logger.info(f【{QhChengXuName}】耗时{QhRunSec:.6f} 秒 ≈ {QhRunHour} 小时QueHui!) # 解析返回值 if isinstance(QhResult, tuple) and len(QhResult) 2: QhFirstData QhResult[0] QhMsgDic[QhMsg] QhResult[-1] QhReturnData QhResult[:-1] if len(QhReturnData) 1: QhReturnData QhReturnData[0] else: QhReturnData QhResult QhFirstData QhReturnData # 判断成功/失败 QhSuccessFlag False if isinstance(QhFirstData, pd.DataFrame): if QhFirstData is not None and not QhFirstData.empty: QhSuccessFlag True elif isinstance(QhFirstData, (bool, int, str)): if QhFirstData in (True, 1, S, success, 成功): QhSuccessFlag True else: if QhFirstData in [None, , [], {}, ()]: QhSuccessFlag False QhMsgDic[QhStatus] S if QhSuccessFlag else F # ---------------------- 写日志多进程入队单进程直接写 ---------------------- if QhIsLog: try: if QhLogProcessQueue is not None: # 多进程只放入队列 QhLogProcessQueue.put((QhScode, QhChengXuName, QhIsLog, QhMsgDic, QhJiaoYiDate)) else: # 单进程直接写CSV QhSaveLogCsvIsExecuted( QhScodeQhScode, QhChengXuNameQhChengXuName, QhIsLogQhIsLog, QhMsgQhMsgDic, QhTradeTimeQhJiaoYiDate ) except Exception as QhEx: logger.error(f【{QhChengXuName}】日志写入失败{str(QhEx)}QueHui!) return QhReturnData return QhWrapper # ------------------------------------------------------------------------------ # 【多进程专用】初始化队列 # ------------------------------------------------------------------------------ def QhInitLogQueue(): 多进程开始前调用一次初始化日志队列 global QhLogProcessQueue QhLogProcessQueue Queue() logger.success(【日志系统】多进程队列已初始化QueHui!) # ------------------------------------------------------------------------------ # 【多进程专用】所有子进程结束后统一刷入CSV # ------------------------------------------------------------------------------ def QhFlushLogQueueToCSV(): 所有多进程任务跑完后调用 把队列里的日志一次性写入CSV避免多进程写冲突 if QhLogProcessQueue is None: logger.info(【日志系统】当前非多进程模式无需刷盘QueHui!) return logger.info( * 60) logger.info(【日志系统】开始批量写入队列日志到 CSV...QueHui!) logger.info( * 60) QhCount 0 while not QhLogProcessQueue.empty(): try: QhScode, QhChengXu, QhIsLog, QhMsg, QhTradeDate QhLogProcessQueue.get(timeout1) # 执行保存 QhSaveLogCsvIsExecuted( QhScodeQhScode, QhChengXuNameQhChengXu, QhIsLogQhIsLog, QhMsgQhMsg, QhTradeTimeQhTradeDate ) QhCount 1 # 输出单条保存日志 if QhScode in [None, ]: logger.debug(f【日志刷盘】第 {QhCount} 条 | 任务{QhChengXu} | 日期{QhTradeDate}保存成功QueHui!) else: logger.debug(f【日志刷盘】第 {QhCount} 条 | 任务{QhChengXu} | 代码{QhScode} | 日期{QhTradeDate}保存成功QueHui!) except Exception as e: logger.error(f【日志刷盘】写入异常终止刷盘{str(e)}QueHui!) break logger.info( * 60) logger.info(f【日志系统】日志刷盘全部完成共写入 {QhCount} 条记录QueHui!) logger.info( * 60)

更多文章