# BrandCultivation 项目级重构设计 ## 概述 对 BrandCultivation(卷烟品牌培育推荐系统)进行全面重构,解决安全漏洞、缺失日志、错误处理不足、代码缺陷等问题。采用渐进式重构策略,保持现有目录结构,新增基础设施层。 ## 当前问题清单 ### 安全漏洞 - `database_config.yaml` 明文存储 MySQL/Redis 密码 - API 无认证中间件(内网服务可接受,但需加基础防护) - 无请求频率限制 ### 代码缺陷 - `gbdt_lr_inference.py:60-77` — `get_recommend_list` 每次循环 append 两种格式字典后再过滤 - 全局使用 `print()` 无日志系统 - 后台任务 `generate_and_upload_report` 失败时静默无通知 - 配置加载使用相对路径 `./config/...`,CWD 变化即崩溃 - `get_report_file_id` 中 result 为 None 时抛 TypeError ### 架构问题 - 无请求追踪(request_id) - 无健康检查端点 - 模块级别实例化(`cfgs = load_config()` 在 import 时执行) - Session 管理重复 try/finally 模式 ## 设计方案 ### 1. 基础设施层 (`core/`) #### `core/__init__.py` 导出公共接口:`get_logger`, `settings`, `AppException` 等。 #### `core/logging.py` — 统一日志系统 ```python # 基于 Python 标准库 logging # JSON 格式输出,便于日志收集系统解析 # 工厂函数 get_logger(name) 按模块名创建 logger # 日志级别通过环境变量 LOG_LEVEL 控制(默认 INFO) # 输出字段:时间戳、级别、模块名、函数名、行号、消息、request_id(如有) ``` 日志级别规范: - DEBUG: 数据库查询参数、模型推理中间结果 - INFO: 请求开始/结束、任务开始/完成、关键业务步骤 - WARNING: 非致命异常(如文件下载重试) - ERROR: 操作失败(含完整 traceback) #### `core/config.py` — 配置管理 - 保留 YAML 文件作为默认值和非敏感配置 - 敏感信息(密码)通过环境变量注入:`MYSQL_PASSWORD`、`REDIS_PASSWORD` - 使用 `pathlib` 基于 `__file__` 定位项目根目录,解决相对路径问题 - 配置加载一次后缓存(模块级单例) - 支持 Docker 环境变量覆盖所有数据库连接参数 环境变量清单: ``` MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DB REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_DB LOG_LEVEL (default: INFO) FILE_UPLOAD_URL, FILE_DOWNLOAD_URL ``` #### `core/exceptions.py` — 自定义异常体系 ```python class AppException(Exception): """应用异常基类""" def __init__(self, code: int, message: str, detail: str = None): ... class DatabaseException(AppException): ... # 数据库操作失败 class ModelException(AppException): ... # 模型推理失败 class FileServiceException(AppException): ... # 文件服务失败 class ValidationException(AppException): ... # 业务校验失败 ``` #### `core/middleware.py` — 请求中间件 - 为每个请求生成 UUID4 作为 request_id - 通过 `contextvars.ContextVar` 传递 request_id,所有下游日志自动携带 - 记录请求开始(method, path, client_ip)和结束(status_code, 耗时ms) - 异常时记录完整 traceback ### 2. 数据库层改进 #### `database/db/mysql.py` - 替换 `print(f"error: {e}")` 为 `logger.error("...", exc_info=True)` - 连接字符串从 `core/config.py` 获取(支持环境变量覆盖) - `load_data_with_page` 中 tqdm 改为 `logger.debug` 输出 - Session 管理改为 context manager: ```python @contextmanager def get_session(self): session = self._DBSession() try: yield session session.commit() except SQLAlchemyError as e: session.rollback() logger.error("Database operation failed", exc_info=True) raise DatabaseException(500, "数据库操作失败", str(e)) finally: session.close() ``` #### `database/db/redis_db.py` - 从 `core/config.py` 获取配置 - 添加连接池配置(`max_connections=50`) - 连接失败时记录日志并抛出 `DatabaseException` #### `database/dao/mysql_dao.py` - 每个方法入口记录 `logger.info("Loading product data", extra={"city_uuid": city_uuid})` - 异常时抛出 `DatabaseException` 而非静默返回空 DataFrame - 修复 `get_report_file_id` 中 result 为 None 时的处理 ### 3. API 层改进 #### 全局异常处理器 ```python @app.exception_handler(AppException) async def app_exception_handler(request, exc): return JSONResponse( status_code=exc.code, content={"code": exc.code, "msg": exc.message, "data": {"detail": exc.detail}, "request_id": get_request_id()} ) @app.exception_handler(Exception) async def unhandled_exception_handler(request, exc): logger.error("Unhandled exception", exc_info=True) return JSONResponse( status_code=500, content={"code": 500, "msg": "服务器内部错误", "data": None, "request_id": get_request_id()} ) ``` #### 健康检查端点 ```python @app.get("/health") async def health_check(): # 检查 MySQL 和 Redis 连接状态 return {"status": "healthy", "mysql": "ok", "redis": "ok"} ``` #### `api/recommend.py` - 后台任务加入 try/except + logger.error - 模型不存在时返回 HTTP 404(而非 200 + 错误消息) - 添加推荐过程的关键日志 #### `api/eval_report.py` - 每个步骤添加日志 - 文件下载失败时记录具体错误 ### 4. Bug 修复 #### `models/rank/gbdt_lr_inference.py:60-77` 修复前(每次循环 append 两个字典,然后过滤): ```python for cust_id, score in zip(recall_list, scores): recommend_list.append({cust_id: float(score)}) recommend_list.append({"cust_code": cust_id, "recommend_score": score}) recommend_list = sorted( [item for item in recommend_list if "recommend_score" in item], key=lambda x: x["recommend_score"], reverse=True ) ``` 修复后: ```python recommend_list = [ {"cust_code": cust_id, "recommend_score": float(score)} for cust_id, score in zip(recall_list, scores) ] recommend_list.sort(key=lambda x: x["recommend_score"], reverse=True) ``` ### 5. 模型层和工具层 #### 模型层 - `models/recommend.py` — 记录召回数量、排序结果数量、各步骤耗时 - `models/recall/hot_recall.py` — 替换 print 为 logger - `models/recall/itemCF/ItemCF.py` — 替换 print 为 logger,记录 Redis 写入状态 - `models/rank/gbdt_lr_inference.py` — 添加推理耗时日志 - `models/item2vec/inference.py` — 添加相似度计算日志 #### 工具层 - `utils/file_stream.py` — 记录上传/下载的 file_id、耗时、HTTP 状态码 - `utils/report_utils.py` — 每个报告生成步骤记录开始/完成/耗时 #### 训练脚本 - `train.py` — 替换 print 为 logger,记录训练全流程 ### 6. 配置文件变更 - `database_config.yaml` — 移除密码,改为占位符 `passwd: "${MYSQL_PASSWORD}"` - 新增 `.env.example` — 列出所有环境变量及说明 - 新增 `core/__init__.py` — 导出公共接口 ## 目录结构变更 ``` BrandCultivation/ ├── core/ # 新增:基础设施层 │ ├── __init__.py │ ├── logging.py # 日志系统 │ ├── config.py # 配置管理 │ ├── exceptions.py # 异常定义 │ └── middleware.py # 请求中间件 ├── api/ # 改进:添加日志和错误处理 ├── database/ # 改进:日志、异常、session 管理 ├── models/ # 改进:日志、bug 修复 ├── utils/ # 改进:日志 ├── config/ # 改进:移除敏感信息 ├── .env.example # 新增:环境变量模板 └── run_api.py # 改进:注册中间件和健康检查 ``` ## 不在本次范围内 - API 认证/鉴权(内网服务) - 数据库 ORM 模型定义(当前 raw SQL + pandas 模式保持不变) - ML 模型算法调整 - 单元测试(可作为后续迭代) - CI/CD 配置