2026-05-21-project-refactoring-design.md 7.9 KB

BrandCultivation 项目级重构设计

概述

对 BrandCultivation(卷烟品牌培育推荐系统)进行全面重构,解决安全漏洞、缺失日志、错误处理不足、代码缺陷等问题。采用渐进式重构策略,保持现有目录结构,新增基础设施层。

当前问题清单

安全漏洞

  • database_config.yaml 明文存储 MySQL/Redis 密码
  • API 无认证中间件(内网服务可接受,但需加基础防护)
  • 无请求频率限制

代码缺陷

  • gbdt_lr_inference.py:60-77get_recommend_list 每次循环 append 两种格式字典后再过滤
  • 全局使用 print() 无日志系统
  • 后台任务 generate_and_upload_report 失败时静默无通知
  • 配置加载使用相对路径 ./config/...,CWD 变化即崩溃
  • get_report_file_id 中 result 为 None 时抛 TypeError

架构问题

  • 无请求追踪(request_id)
  • 无健康检查端点
  • 模块级别实例化(cfgs = load_config() 在 import 时执行)
  • Session 管理重复 try/finally 模式

设计方案

1. 基础设施层 (core/)

core/__init__.py

导出公共接口:get_logger, settings, AppException 等。

core/logging.py — 统一日志系统

# 基于 Python 标准库 logging
# JSON 格式输出,便于日志收集系统解析
# 工厂函数 get_logger(name) 按模块名创建 logger
# 日志级别通过环境变量 LOG_LEVEL 控制(默认 INFO)
# 输出字段:时间戳、级别、模块名、函数名、行号、消息、request_id(如有)

日志级别规范:

  • DEBUG: 数据库查询参数、模型推理中间结果
  • INFO: 请求开始/结束、任务开始/完成、关键业务步骤
  • WARNING: 非致命异常(如文件下载重试)
  • ERROR: 操作失败(含完整 traceback)

core/config.py — 配置管理

  • 保留 YAML 文件作为默认值和非敏感配置
  • 敏感信息(密码)通过环境变量注入:MYSQL_PASSWORDREDIS_PASSWORD
  • 使用 pathlib 基于 __file__ 定位项目根目录,解决相对路径问题
  • 配置加载一次后缓存(模块级单例)
  • 支持 Docker 环境变量覆盖所有数据库连接参数

环境变量清单:

MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DB
REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_DB
LOG_LEVEL (default: INFO)
FILE_UPLOAD_URL, FILE_DOWNLOAD_URL

core/exceptions.py — 自定义异常体系

class AppException(Exception):
    """应用异常基类"""
    def __init__(self, code: int, message: str, detail: str = None): ...

class DatabaseException(AppException): ...    # 数据库操作失败
class ModelException(AppException): ...       # 模型推理失败
class FileServiceException(AppException): ... # 文件服务失败
class ValidationException(AppException): ...  # 业务校验失败

core/middleware.py — 请求中间件

  • 为每个请求生成 UUID4 作为 request_id
  • 通过 contextvars.ContextVar 传递 request_id,所有下游日志自动携带
  • 记录请求开始(method, path, client_ip)和结束(status_code, 耗时ms)
  • 异常时记录完整 traceback

2. 数据库层改进

database/db/mysql.py

  • 替换 print(f"error: {e}")logger.error("...", exc_info=True)
  • 连接字符串从 core/config.py 获取(支持环境变量覆盖)
  • load_data_with_page 中 tqdm 改为 logger.debug 输出
  • Session 管理改为 context manager:

    @contextmanager
    def get_session(self):
    session = self._DBSession()
    try:
        yield session
        session.commit()
    except SQLAlchemyError as e:
        session.rollback()
        logger.error("Database operation failed", exc_info=True)
        raise DatabaseException(500, "数据库操作失败", str(e))
    finally:
        session.close()
    

database/db/redis_db.py

  • core/config.py 获取配置
  • 添加连接池配置(max_connections=50
  • 连接失败时记录日志并抛出 DatabaseException

database/dao/mysql_dao.py

  • 每个方法入口记录 logger.info("Loading product data", extra={"city_uuid": city_uuid})
  • 异常时抛出 DatabaseException 而非静默返回空 DataFrame
  • 修复 get_report_file_id 中 result 为 None 时的处理

3. API 层改进

全局异常处理器

@app.exception_handler(AppException)
async def app_exception_handler(request, exc):
    return JSONResponse(
        status_code=exc.code,
        content={"code": exc.code, "msg": exc.message, "data": {"detail": exc.detail}, "request_id": get_request_id()}
    )

@app.exception_handler(Exception)
async def unhandled_exception_handler(request, exc):
    logger.error("Unhandled exception", exc_info=True)
    return JSONResponse(
        status_code=500,
        content={"code": 500, "msg": "服务器内部错误", "data": None, "request_id": get_request_id()}
    )

健康检查端点

@app.get("/health")
async def health_check():
    # 检查 MySQL 和 Redis 连接状态
    return {"status": "healthy", "mysql": "ok", "redis": "ok"}

api/recommend.py

  • 后台任务加入 try/except + logger.error
  • 模型不存在时返回 HTTP 404(而非 200 + 错误消息)
  • 添加推荐过程的关键日志

api/eval_report.py

  • 每个步骤添加日志
  • 文件下载失败时记录具体错误

4. Bug 修复

models/rank/gbdt_lr_inference.py:60-77

修复前(每次循环 append 两个字典,然后过滤):

for cust_id, score in zip(recall_list, scores):
    recommend_list.append({cust_id: float(score)})
    recommend_list.append({"cust_code": cust_id, "recommend_score": score})

recommend_list = sorted(
    [item for item in recommend_list if "recommend_score" in item],
    key=lambda x: x["recommend_score"], reverse=True
)

修复后:

recommend_list = [
    {"cust_code": cust_id, "recommend_score": float(score)}
    for cust_id, score in zip(recall_list, scores)
]
recommend_list.sort(key=lambda x: x["recommend_score"], reverse=True)

5. 模型层和工具层

模型层

  • models/recommend.py — 记录召回数量、排序结果数量、各步骤耗时
  • models/recall/hot_recall.py — 替换 print 为 logger
  • models/recall/itemCF/ItemCF.py — 替换 print 为 logger,记录 Redis 写入状态
  • models/rank/gbdt_lr_inference.py — 添加推理耗时日志
  • models/item2vec/inference.py — 添加相似度计算日志

工具层

  • utils/file_stream.py — 记录上传/下载的 file_id、耗时、HTTP 状态码
  • utils/report_utils.py — 每个报告生成步骤记录开始/完成/耗时

训练脚本

  • train.py — 替换 print 为 logger,记录训练全流程

6. 配置文件变更

  • database_config.yaml — 移除密码,改为占位符 passwd: "${MYSQL_PASSWORD}"
  • 新增 .env.example — 列出所有环境变量及说明
  • 新增 core/__init__.py — 导出公共接口

目录结构变更

BrandCultivation/
├── core/                    # 新增:基础设施层
│   ├── __init__.py
│   ├── logging.py           # 日志系统
│   ├── config.py            # 配置管理
│   ├── exceptions.py        # 异常定义
│   └── middleware.py        # 请求中间件
├── api/                     # 改进:添加日志和错误处理
├── database/                # 改进:日志、异常、session 管理
├── models/                  # 改进:日志、bug 修复
├── utils/                   # 改进:日志
├── config/                  # 改进:移除敏感信息
├── .env.example             # 新增:环境变量模板
└── run_api.py               # 改进:注册中间件和健康检查

不在本次范围内

  • API 认证/鉴权(内网服务)
  • 数据库 ORM 模型定义(当前 raw SQL + pandas 模式保持不变)
  • ML 模型算法调整
  • 单元测试(可作为后续迭代)
  • CI/CD 配置