对 BrandCultivation(卷烟品牌培育推荐系统)进行全面重构,解决安全漏洞、缺失日志、错误处理不足、代码缺陷等问题。采用渐进式重构策略,保持现有目录结构,新增基础设施层。
database_config.yaml 明文存储 MySQL/Redis 密码gbdt_lr_inference.py:60-77 — get_recommend_list 每次循环 append 两种格式字典后再过滤print() 无日志系统generate_and_upload_report 失败时静默无通知./config/...,CWD 变化即崩溃get_report_file_id 中 result 为 None 时抛 TypeErrorcfgs = load_config() 在 import 时执行)core/)core/__init__.py导出公共接口:get_logger, settings, AppException 等。
core/logging.py — 统一日志系统# 基于 Python 标准库 logging
# JSON 格式输出,便于日志收集系统解析
# 工厂函数 get_logger(name) 按模块名创建 logger
# 日志级别通过环境变量 LOG_LEVEL 控制(默认 INFO)
# 输出字段:时间戳、级别、模块名、函数名、行号、消息、request_id(如有)
日志级别规范:
core/config.py — 配置管理MYSQL_PASSWORD、REDIS_PASSWORDpathlib 基于 __file__ 定位项目根目录,解决相对路径问题环境变量清单:
MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DB
REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_DB
LOG_LEVEL (default: INFO)
FILE_UPLOAD_URL, FILE_DOWNLOAD_URL
core/exceptions.py — 自定义异常体系class AppException(Exception):
"""应用异常基类"""
def __init__(self, code: int, message: str, detail: str = None): ...
class DatabaseException(AppException): ... # 数据库操作失败
class ModelException(AppException): ... # 模型推理失败
class FileServiceException(AppException): ... # 文件服务失败
class ValidationException(AppException): ... # 业务校验失败
core/middleware.py — 请求中间件contextvars.ContextVar 传递 request_id,所有下游日志自动携带database/db/mysql.pyprint(f"error: {e}") 为 logger.error("...", exc_info=True)core/config.py 获取(支持环境变量覆盖)load_data_with_page 中 tqdm 改为 logger.debug 输出Session 管理改为 context manager:
@contextmanager
def get_session(self):
session = self._DBSession()
try:
yield session
session.commit()
except SQLAlchemyError as e:
session.rollback()
logger.error("Database operation failed", exc_info=True)
raise DatabaseException(500, "数据库操作失败", str(e))
finally:
session.close()
database/db/redis_db.pycore/config.py 获取配置max_connections=50)DatabaseExceptiondatabase/dao/mysql_dao.pylogger.info("Loading product data", extra={"city_uuid": city_uuid})DatabaseException 而非静默返回空 DataFrameget_report_file_id 中 result 为 None 时的处理@app.exception_handler(AppException)
async def app_exception_handler(request, exc):
return JSONResponse(
status_code=exc.code,
content={"code": exc.code, "msg": exc.message, "data": {"detail": exc.detail}, "request_id": get_request_id()}
)
@app.exception_handler(Exception)
async def unhandled_exception_handler(request, exc):
logger.error("Unhandled exception", exc_info=True)
return JSONResponse(
status_code=500,
content={"code": 500, "msg": "服务器内部错误", "data": None, "request_id": get_request_id()}
)
@app.get("/health")
async def health_check():
# 检查 MySQL 和 Redis 连接状态
return {"status": "healthy", "mysql": "ok", "redis": "ok"}
api/recommend.pyapi/eval_report.pymodels/rank/gbdt_lr_inference.py:60-77修复前(每次循环 append 两个字典,然后过滤):
for cust_id, score in zip(recall_list, scores):
recommend_list.append({cust_id: float(score)})
recommend_list.append({"cust_code": cust_id, "recommend_score": score})
recommend_list = sorted(
[item for item in recommend_list if "recommend_score" in item],
key=lambda x: x["recommend_score"], reverse=True
)
修复后:
recommend_list = [
{"cust_code": cust_id, "recommend_score": float(score)}
for cust_id, score in zip(recall_list, scores)
]
recommend_list.sort(key=lambda x: x["recommend_score"], reverse=True)
models/recommend.py — 记录召回数量、排序结果数量、各步骤耗时models/recall/hot_recall.py — 替换 print 为 loggermodels/recall/itemCF/ItemCF.py — 替换 print 为 logger,记录 Redis 写入状态models/rank/gbdt_lr_inference.py — 添加推理耗时日志models/item2vec/inference.py — 添加相似度计算日志utils/file_stream.py — 记录上传/下载的 file_id、耗时、HTTP 状态码utils/report_utils.py — 每个报告生成步骤记录开始/完成/耗时train.py — 替换 print 为 logger,记录训练全流程database_config.yaml — 移除密码,改为占位符 passwd: "${MYSQL_PASSWORD}".env.example — 列出所有环境变量及说明core/__init__.py — 导出公共接口BrandCultivation/
├── core/ # 新增:基础设施层
│ ├── __init__.py
│ ├── logging.py # 日志系统
│ ├── config.py # 配置管理
│ ├── exceptions.py # 异常定义
│ └── middleware.py # 请求中间件
├── api/ # 改进:添加日志和错误处理
├── database/ # 改进:日志、异常、session 管理
├── models/ # 改进:日志、bug 修复
├── utils/ # 改进:日志
├── config/ # 改进:移除敏感信息
├── .env.example # 新增:环境变量模板
└── run_api.py # 改进:注册中间件和健康检查