# BrandCultivation 项目级重构实现计划 > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** 为 BrandCultivation 项目添加日志系统、配置管理、异常处理、请求追踪,修复已知 bug,移除明文密码。 **Architecture:** 渐进式重构 — 新增 `core/` 基础设施层,在现有模块中逐步替换 print 为 logger,添加错误处理和请求追踪。保持现有目录结构和业务逻辑不变。 **Tech Stack:** Python 3.x, FastAPI, SQLAlchemy, Redis, logging (stdlib), pathlib, contextvars, uuid --- ## File Structure ### New Files - `core/__init__.py` — 公共接口导出 - `core/logging.py` — JSON 格式日志系统 - `core/config.py` — 配置管理(YAML + 环境变量) - `core/exceptions.py` — 自定义异常体系 - `core/middleware.py` — 请求日志和 request_id 中间件 - `.env.example` — 环境变量模板 ### Modified Files - `config/config.py` — 废弃,改为从 core/config.py 导入 - `config/database_config.yaml` — 移除密码 - `database/db/mysql.py` — 日志、session context manager、配置来源 - `database/db/redis_db.py` — 日志、配置来源、连接池 - `database/__init__.py` — 更新导出 - `database/dao/mysql_dao.py` — 日志、异常处理 - `run_api.py` — 注册中间件、健康检查、异常处理器 - `api/recommend.py` — 日志、错误处理 - `api/eval_report.py` — 日志、错误处理 - `api/report.py` — 日志 - `models/rank/gbdt_lr_inference.py` — bug 修复、日志 - `models/recommend.py` — 日志 - `models/recall/hot_recall.py` — 替换 print - `models/recall/itemCF/ItemCF.py` — 替换 print - `models/item2vec/inference.py` — 日志 - `models/rank/data/preprocess.py` — 替换 print - `utils/file_stream.py` — 日志、错误处理 - `utils/report_utils.py` — 日志 - `train.py` — 替换 print --- ### Task 1: 创建 `core/exceptions.py` — 自定义异常体系 **Files:** - Create: `core/exceptions.py` - [ ] **Step 1: 创建异常定义文件** ```python class AppException(Exception): """应用异常基类""" def __init__(self, code: int, message: str, detail: str = None): self.code = code self.message = message self.detail = detail super().__init__(message) class DatabaseException(AppException): """数据库操作失败""" def __init__(self, message: str = "数据库操作失败", detail: str = None): super().__init__(code=500, message=message, detail=detail) class ModelException(AppException): """模型推理失败""" def __init__(self, message: str = "模型推理失败", detail: str = None): super().__init__(code=500, message=message, detail=detail) class FileServiceException(AppException): """文件服务失败""" def __init__(self, message: str = "文件服务操作失败", detail: str = None): super().__init__(code=500, message=message, detail=detail) class ValidationException(AppException): """业务校验失败""" def __init__(self, message: str = "参数校验失败", detail: str = None): super().__init__(code=400, message=message, detail=detail) ``` - [ ] **Step 2: 验证导入** Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "from core.exceptions import AppException, DatabaseException, ModelException, FileServiceException, ValidationException; print('OK')"` Expected: `OK` - [ ] **Step 3: Commit** ```bash git add core/exceptions.py git commit -m "feat(core): add custom exception hierarchy" ``` --- ### Task 2: 创建 `core/config.py` — 配置管理 **Files:** - Create: `core/config.py` - [ ] **Step 1: 创建配置管理模块** ```python import os from pathlib import Path import yaml PROJECT_ROOT = Path(__file__).resolve().parent.parent def _get_env(key: str, default=None): """从环境变量获取值""" return os.environ.get(key, default) def _load_yaml(filename: str) -> dict: """加载 YAML 配置文件""" filepath = PROJECT_ROOT / "config" / filename with open(filepath, encoding="utf-8") as f: return yaml.safe_load(f) or {} class _Settings: """配置单例,支持环境变量覆盖 YAML 默认值""" def __init__(self): self._db_cfg = _load_yaml("database_config.yaml") self._model_cfg = _load_yaml("model_config.yaml") self._service_cfg = _load_yaml("service_config.yaml") @property def mysql_host(self) -> str: return _get_env("MYSQL_HOST", self._db_cfg.get("mysql", {}).get("host", "localhost")) @property def mysql_port(self) -> int: return int(_get_env("MYSQL_PORT", self._db_cfg.get("mysql", {}).get("port", 3306))) @property def mysql_user(self) -> str: return _get_env("MYSQL_USER", self._db_cfg.get("mysql", {}).get("user", "root")) @property def mysql_password(self) -> str: return _get_env("MYSQL_PASSWORD", self._db_cfg.get("mysql", {}).get("passwd", "")) @property def mysql_db(self) -> str: return _get_env("MYSQL_DB", self._db_cfg.get("mysql", {}).get("db", "")) @property def redis_host(self) -> str: return _get_env("REDIS_HOST", self._db_cfg.get("redis", {}).get("host", "localhost")) @property def redis_port(self) -> int: return int(_get_env("REDIS_PORT", self._db_cfg.get("redis", {}).get("port", 6379))) @property def redis_password(self) -> str: return _get_env("REDIS_PASSWORD", self._db_cfg.get("redis", {}).get("passwd", "")) @property def redis_db(self) -> int: return int(_get_env("REDIS_DB", self._db_cfg.get("redis", {}).get("db", 0))) @property def log_level(self) -> str: return _get_env("LOG_LEVEL", "INFO").upper() @property def file_upload_url(self) -> str: return _get_env("FILE_UPLOAD_URL", self._service_cfg.get("aliyun", {}).get("upload_url", "")) @property def file_download_url(self) -> str: return _get_env("FILE_DOWNLOAD_URL", self._service_cfg.get("aliyun", {}).get("download_url", "")) @property def model_config(self) -> dict: return self._model_cfg settings = _Settings() ``` - [ ] **Step 2: 验证导入** Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "from core.config import settings; print(settings.mysql_host)"` Expected: 输出数据库 host 地址 - [ ] **Step 3: Commit** ```bash git add core/config.py git commit -m "feat(core): add config management with env var override" ``` --- ### Task 3: 创建 `core/logging.py` — 日志系统 **Files:** - Create: `core/logging.py` - [ ] **Step 1: 创建日志模块** ```python import logging import json import sys from contextvars import ContextVar from datetime import datetime, timezone request_id_var: ContextVar[str] = ContextVar("request_id", default="-") class JSONFormatter(logging.Formatter): """JSON 格式日志输出""" def format(self, record: logging.LogRecord) -> str: log_data = { "timestamp": datetime.now(timezone.utc).isoformat(), "level": record.levelname, "module": record.module, "function": record.funcName, "line": record.lineno, "message": record.getMessage(), "request_id": request_id_var.get("-"), } if record.exc_info and record.exc_info[0] is not None: log_data["exception"] = self.formatException(record.exc_info) if hasattr(record, "extra_data"): log_data["extra"] = record.extra_data return json.dumps(log_data, ensure_ascii=False) def get_logger(name: str) -> logging.Logger: """获取指定名称的 logger""" from core.config import settings logger = logging.getLogger(name) if not logger.handlers: handler = logging.StreamHandler(sys.stdout) handler.setFormatter(JSONFormatter()) logger.addHandler(handler) logger.setLevel(getattr(logging, settings.log_level, logging.INFO)) logger.propagate = False return logger ``` - [ ] **Step 2: 验证导入和基本功能** Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "from core.logging import get_logger; logger = get_logger('test'); logger.info('hello')"` Expected: JSON 格式日志输出 - [ ] **Step 3: Commit** ```bash git add core/logging.py git commit -m "feat(core): add JSON logging system with request_id support" ``` --- ### Task 4: 创建 `core/middleware.py` — 请求中间件 **Files:** - Create: `core/middleware.py` - [ ] **Step 1: 创建中间件模块** ```python import time import uuid from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.responses import Response from core.logging import get_logger, request_id_var logger = get_logger("middleware") def get_request_id() -> str: """获取当前请求的 request_id""" return request_id_var.get("-") class RequestLoggingMiddleware(BaseHTTPMiddleware): """请求日志中间件:生成 request_id,记录请求开始/结束""" async def dispatch(self, request: Request, call_next) -> Response: req_id = str(uuid.uuid4())[:8] request_id_var.set(req_id) start_time = time.time() client_ip = request.client.host if request.client else "unknown" logger.info( f"Request started: {request.method} {request.url.path}", extra={"extra_data": {"client_ip": client_ip, "method": request.method, "path": str(request.url.path)}}, ) try: response = await call_next(request) except Exception as e: duration_ms = (time.time() - start_time) * 1000 logger.error( f"Request failed: {request.method} {request.url.path} ({duration_ms:.1f}ms)", exc_info=True, ) raise duration_ms = (time.time() - start_time) * 1000 logger.info( f"Request completed: {request.method} {request.url.path} -> {response.status_code} ({duration_ms:.1f}ms)", extra={"extra_data": {"status_code": response.status_code, "duration_ms": round(duration_ms, 1)}}, ) response.headers["X-Request-ID"] = req_id return response ``` - [ ] **Step 2: 验证导入** Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "from core.middleware import RequestLoggingMiddleware, get_request_id; print('OK')"` Expected: `OK` - [ ] **Step 3: Commit** ```bash git add core/middleware.py git commit -m "feat(core): add request logging middleware with request_id tracking" ``` --- ### Task 5: 创建 `core/__init__.py` — 公共接口导出 **Files:** - Create: `core/__init__.py` - [ ] **Step 1: 创建 init 文件** ```python from core.logging import get_logger, request_id_var from core.config import settings from core.exceptions import ( AppException, DatabaseException, ModelException, FileServiceException, ValidationException, ) from core.middleware import RequestLoggingMiddleware, get_request_id __all__ = [ "get_logger", "request_id_var", "settings", "AppException", "DatabaseException", "ModelException", "FileServiceException", "ValidationException", "RequestLoggingMiddleware", "get_request_id", ] ``` - [ ] **Step 2: 验证完整导入** Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "from core import get_logger, settings, AppException, DatabaseException, RequestLoggingMiddleware, get_request_id; print('All imports OK')"` Expected: `All imports OK` - [ ] **Step 3: Commit** ```bash git add core/__init__.py git commit -m "feat(core): add public interface exports" ``` --- ### Task 6: 改进 `database/db/mysql.py` — 日志、session 管理、配置来源 **Files:** - Modify: `database/db/mysql.py` - [ ] **Step 1: 重写 mysql.py** 替换整个文件内容为: ```python from contextlib import contextmanager from core import get_logger, settings, DatabaseException import pandas as pd from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import SQLAlchemyError logger = get_logger("database.mysql") class MySqlDatabaseHelper: _instance = None def __new__(cls): if not cls._instance: cls._instance = super(MySqlDatabaseHelper, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self._connect_database() self._initialized = True def _connect_database(self): try: conn_str = ( f"mysql+pymysql://{settings.mysql_user}:{settings.mysql_password}" f"@{settings.mysql_host}:{settings.mysql_port}/{settings.mysql_db}" ) self.engine = create_engine( conn_str, pool_size=20, max_overflow=30, pool_recycle=1800, pool_pre_ping=True, isolation_level="READ COMMITTED", ) self._DBSession = sessionmaker(bind=self.engine) logger.info("MySQL connection pool created", extra={"extra_data": {"host": settings.mysql_host, "db": settings.mysql_db}}) except Exception as e: logger.error("Failed to create MySQL connection", exc_info=True) raise DatabaseException(message="数据库连接失败", detail=str(e)) @contextmanager def get_session(self): session = self._DBSession() try: yield session session.commit() except SQLAlchemyError as e: session.rollback() logger.error("Database operation failed", exc_info=True) raise DatabaseException(message="数据库操作失败", detail=str(e)) finally: session.close() def load_data_with_page(self, query, params, page_size=100000): """分页查询数据""" count_query = text(f"SELECT COUNT(*) FROM ({query}) AS _count_subq") query += " LIMIT :limit OFFSET :offset" query = text(query) result = self.fetch_one(count_query, params) total_rows = result[0] if result is not None else 0 if total_rows == 0: logger.debug("Query returned 0 rows") return pd.DataFrame() logger.debug(f"Loading {total_rows} rows with page_size={page_size}") data = pd.DataFrame() page = 1 while True: offset = (page - 1) * page_size page_params = dict(params) page_params["limit"] = page_size page_params["offset"] = offset df = pd.DataFrame(self.fetch_all(query, page_params)) if df.empty: break data = pd.concat([data, df], ignore_index=True) page += 1 logger.debug(f"Loaded {len(data)} rows in {page - 1} pages") return data def fetch_all(self, query, params=None): """执行SQL查询并返回所有结果""" with self.get_session() as session: results = session.execute(query, params or {}).fetchall() return results def fetch_one(self, query, params=None): """执行SQL查询并返回单条结果""" with self.get_session() as session: result = session.execute(query, params or {}).fetchone() return result def insert_data(self, table_name, data_dict): """插入单条数据到指定表""" if not data_dict: return 0 columns = ", ".join(data_dict.keys()) values = ", ".join([f":{key}" for key in data_dict.keys()]) query = text(f"INSERT INTO {table_name} ({columns}) VALUES ({values})") with self.get_session() as session: result = session.execute(query, data_dict) logger.info(f"Inserted 1 row into {table_name}") return result.rowcount def update_data(self, table_name, update_dict, conditions, condition_params=None): """更新表中符合条件的数据""" if not update_dict: return 0 set_clause = ", ".join([f"{key} = :{key}" for key in update_dict.keys()]) if len(conditions) == 1: where_clause = f"WHERE {conditions[0]}" elif len(conditions) > 1: where_clause = f"WHERE {' AND '.join(conditions)}" else: where_clause = "" query = text(f"UPDATE {table_name} SET {set_clause} {where_clause}") params = update_dict.copy() if condition_params: params.update(condition_params) with self.get_session() as session: result = session.execute(query, params) logger.info(f"Updated {result.rowcount} rows in {table_name}") return result.rowcount def execute_query(self, query, params=None): """执行SQL语句""" with self.get_session() as session: session.execute(query, params or {}) def check_connection(self) -> bool: """检查数据库连接是否正常""" try: self.fetch_one(text("SELECT 1"), {}) return True except Exception: return False ``` - [ ] **Step 2: 验证导入** Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "from database.db.mysql import MySqlDatabaseHelper; print('OK')"` Expected: `OK` - [ ] **Step 3: Commit** ```bash git add database/db/mysql.py git commit -m "refactor(database): add logging, session context manager, env-based config" ``` --- ### Task 7: 改进 `database/db/redis_db.py` — 日志、配置来源、连接池 **Files:** - Modify: `database/db/redis_db.py` - [ ] **Step 1: 重写 redis_db.py** 替换整个文件内容为: ```python import redis from core import get_logger, settings, DatabaseException logger = get_logger("database.redis") class RedisDatabaseHelper: _instance = None def __new__(cls): if not cls._instance: cls._instance = super(RedisDatabaseHelper, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return try: pool = redis.ConnectionPool( host=settings.redis_host, port=settings.redis_port, password=settings.redis_password, db=settings.redis_db, decode_responses=True, max_connections=50, ) self.redis = redis.StrictRedis(connection_pool=pool) self.redis.ping() logger.info("Redis connection established", extra={"extra_data": {"host": settings.redis_host, "db": settings.redis_db}}) except redis.ConnectionError as e: logger.error("Failed to connect to Redis", exc_info=True) raise DatabaseException(message="Redis连接失败", detail=str(e)) self._initialized = True def check_connection(self) -> bool: """检查 Redis 连接是否正常""" try: self.redis.ping() return True except Exception: return False ``` - [ ] **Step 2: 验证导入** Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "from database.db.redis_db import RedisDatabaseHelper; print('OK')"` Expected: `OK` - [ ] **Step 3: Commit** ```bash git add database/db/redis_db.py git commit -m "refactor(database): redis with logging, connection pool, env config" ``` --- ### Task 8: 改进 `database/dao/mysql_dao.py` — 日志和异常处理 **Files:** - Modify: `database/dao/mysql_dao.py` - [ ] **Step 1: 添加日志和异常处理** 在文件顶部添加 logger,在每个方法中添加日志记录。修复 `get_report_file_id` 的 None 处理。 关键改动: - 文件顶部添加: `from core import get_logger` 和 `logger = get_logger("database.dao")` - 每个 public 方法入口添加 `logger.info(...)` 记录调用参数 - `get_report_file_id` 中 result 为 None 时返回空 DataFrame 而非抛异常 - 移除 `data_preprocess` 方法(不属于 DAO 层) - [ ] **Step 2: 验证导入** Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "from database.dao.mysql_dao import MySqlDao; print('OK')"` Expected: `OK` - [ ] **Step 3: Commit** ```bash git add database/dao/mysql_dao.py git commit -m "refactor(dao): add logging, fix get_report_file_id null handling" ``` --- ### Task 9: 改进 `run_api.py` — 注册中间件、健康检查、异常处理器 **Files:** - Modify: `run_api.py` - [ ] **Step 1: 重写 run_api.py** ```python from api import recommend_router, report_router, eval_report_router from core import get_logger, AppException, RequestLoggingMiddleware, get_request_id from core.exceptions import DatabaseException from database.db.mysql import MySqlDatabaseHelper from database.db.redis_db import RedisDatabaseHelper from fastapi import FastAPI, Request, status from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse import uvicorn logger = get_logger("app") app = FastAPI() app.add_middleware(RequestLoggingMiddleware) @app.exception_handler(RequestValidationError) async def validation_exception_handler(request: Request, exc: RequestValidationError): logger.warning(f"Validation error: {exc.errors()}") return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={ "code": 400, "msg": "请求参数错误", "data": {"detail": exc.errors(), "body": exc.body}, "request_id": get_request_id(), }, ) @app.exception_handler(AppException) async def app_exception_handler(request: Request, exc: AppException): logger.error(f"AppException: {exc.message}", extra={"extra_data": {"detail": exc.detail}}) return JSONResponse( status_code=exc.code, content={ "code": exc.code, "msg": exc.message, "data": {"detail": exc.detail}, "request_id": get_request_id(), }, ) @app.exception_handler(Exception) async def unhandled_exception_handler(request: Request, exc: Exception): logger.error("Unhandled exception", exc_info=True) return JSONResponse( status_code=500, content={ "code": 500, "msg": "服务器内部错误", "data": None, "request_id": get_request_id(), }, ) @app.get("/health") async def health_check(): """健康检查端点""" mysql_ok = False redis_ok = False try: mysql_ok = MySqlDatabaseHelper().check_connection() except Exception: pass try: redis_ok = RedisDatabaseHelper().check_connection() except Exception: pass healthy = mysql_ok and redis_ok return { "status": "healthy" if healthy else "degraded", "mysql": "ok" if mysql_ok else "error", "redis": "ok" if redis_ok else "error", } url_prefix = "/brandcultivation/api/v1" app.include_router(recommend_router, prefix=url_prefix) app.include_router(report_router, prefix=url_prefix) app.include_router(eval_report_router, prefix=url_prefix) if __name__ == "__main__": logger.info("Starting BrandCultivation API server on port 7960") uvicorn.run(app, host="0.0.0.0", port=7960) ``` - [ ] **Step 2: 验证语法** Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('run_api.py').read()); print('Syntax OK')"` Expected: `Syntax OK` - [ ] **Step 3: Commit** ```bash git add run_api.py git commit -m "refactor(api): add middleware, health check, global exception handlers" ``` --- ### Task 10: 改进 `api/recommend.py` — 日志和错误处理 **Files:** - Modify: `api/recommend.py` - [ ] **Step 1: 添加日志和错误处理** 关键改动: - 添加 `from core import get_logger` 和 `logger = get_logger("api.recommend")` - 模型不存在时返回 404 状态码 - `generate_and_upload_report` 加入 try/except + logger.error - 推荐过程添加关键日志(开始、召回数量、完成) ```python from database import MySqlDao from fastapi import APIRouter, BackgroundTasks, HTTPException, status from .request_body import RecommendRequest from core import get_logger from models import Recommend import os from utils import FileStreamUtils, ReportUtils logger = get_logger("api.recommend") dao = MySqlDao() router = APIRouter() @router.post("/recommend") async def recommend(request: RecommendRequest, backgroundTasks: BackgroundTasks): """推荐接口""" logger.info(f"Recommend request: city={request.city_uuid}, product={request.product_code}, recall={request.recall_cust_count}") gbdtlr_model_path = os.path.join("./models/rank/weights", request.city_uuid, "gbdtlr_model.pkl") if not os.path.exists(gbdtlr_model_path): logger.warning(f"Model not found: {gbdtlr_model_path}") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="该城市的模型未训练,请先进行训练", ) recommend_model = Recommend(request.city_uuid) products_in_order = dao.get_product_from_order(request.city_uuid)["product_code"].unique().tolist() if request.product_code in products_in_order: logger.info(f"Using GBDT-LR model for existing product {request.product_code}") recommend_list = recommend_model.get_recommend_list_by_gbdtlr(request.product_code, recall_count=request.recall_cust_count) else: logger.info(f"Using Item2Vec model for new product {request.product_code}") recommend_list = recommend_model.get_recommend_list_by_item2vec(request.product_code, recall_count=request.recall_cust_count) recommend_data = recommend_model.get_recommend_and_delivery(recommend_list, delivery_count=request.delivery_count) request_data = [] for index, data in enumerate(recommend_data): request_data.append( { "id": index + 1, "cust_code": data["cust_code"], "recommend_score": data["recommend_score"], "delivery_count": data["delivery_count"], } ) logger.info(f"Recommend completed: {len(request_data)} customers recommended") backgroundTasks.add_task(generate_and_upload_report, request) return {"code": 200, "msg": "success", "data": {"recommendationInfo": request_data}} def generate_and_upload_report(request: RecommendRequest): """生成并上传报告到阿里云文件数据库""" logger.info(f"Background task started: generating report for {request.city_uuid}/{request.product_code}") try: report_util = ReportUtils(request.city_uuid, request.product_code) report_util.generate_all_data(request.recall_cust_count, request.delivery_count) reports_dir = os.path.join("./data/reports", request.city_uuid, request.product_code) report_files = ["卷烟信息表", "品规商户特征关系表", "相似卷烟表", "商户售卖推荐表"] file_id_map = FileStreamUtils.upload_files(reports_dir, report_files) if file_id_map is None: logger.error(f"Report upload failed for {request.city_uuid}/{request.product_code}") return data_dict = { "cultivacation_id": request.cultivacation_id, "city_uuid": request.city_uuid, "limit_cycle_name": request.limit_cycle_name, "product_code": request.product_code, "product_info_table": file_id_map.get("卷烟信息表"), "relation_table": file_id_map.get("品规商户特征关系表"), "similarity_product_table": file_id_map.get("相似卷烟表"), "recommend_table": file_id_map.get("商户售卖推荐表"), } dao.insert_report(data_dict) logger.info(f"Background task completed: report uploaded for {request.city_uuid}/{request.product_code}") except Exception as e: logger.error(f"Background task failed: {e}", exc_info=True) ``` - [ ] **Step 2: 验证语法** Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('api/recommend.py').read()); print('Syntax OK')"` Expected: `Syntax OK` - [ ] **Step 3: Commit** ```bash git add api/recommend.py git commit -m "refactor(api): add logging and error handling to recommend endpoint" ``` --- ### Task 11: 改进 `api/eval_report.py` 和 `api/report.py` — 日志 **Files:** - Modify: `api/eval_report.py` - Modify: `api/report.py` - [ ] **Step 1: 改进 eval_report.py** 添加 `from core import get_logger` 和 `logger = get_logger("api.eval_report")`,在每个步骤添加日志。 - [ ] **Step 2: 改进 report.py** 添加 `from core import get_logger` 和 `logger = get_logger("api.report")`,在关键步骤添加日志。 - [ ] **Step 3: 验证语法** Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('api/eval_report.py').read()); ast.parse(open('api/report.py').read()); print('OK')"` Expected: `OK` - [ ] **Step 4: Commit** ```bash git add api/eval_report.py api/report.py git commit -m "refactor(api): add logging to eval_report and report endpoints" ``` --- ### Task 12: 修复 `models/rank/gbdt_lr_inference.py` — Bug 修复 + 日志 **Files:** - Modify: `models/rank/gbdt_lr_inference.py` - [ ] **Step 1: 修复 get_recommend_list 方法** 将 `get_recommend_list` 方法中的循环从: ```python recommend_list = [] for cust_id, score in zip(recall_list, scores): recommend_list.append({cust_id: float(score)}) recommend_list.append({"cust_code": cust_id, "recommend_score": score}) recommend_list = sorted( [item for item in recommend_list if "recommend_score" in item], key=lambda x: x["recommend_score"], reverse=True ) ``` 修改为: ```python recommend_list = [ {"cust_code": cust_id, "recommend_score": float(score)} for cust_id, score in zip(recall_list, scores) ] recommend_list.sort(key=lambda x: x["recommend_score"], reverse=True) ``` - [ ] **Step 2: 添加日志** 在文件顶部添加 `from core import get_logger` 和 `logger = get_logger("models.rank.gbdtlr")`。 在 `get_recommend_list` 中添加推理耗时日志。 在 `generate_shap_interance` 中替换 print 为 logger。 - [ ] **Step 3: 验证语法** Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('models/rank/gbdt_lr_inference.py').read()); print('OK')"` Expected: `OK` - [ ] **Step 4: Commit** ```bash git add models/rank/gbdt_lr_inference.py git commit -m "fix(models): fix double-append bug in get_recommend_list, add logging" ``` --- ### Task 13: 改进 `models/recommend.py` — 日志 **Files:** - Modify: `models/recommend.py` - [ ] **Step 1: 添加日志** 添加 `from core import get_logger` 和 `logger = get_logger("models.recommend")`。 在关键方法中添加日志: - `_load_molde`: 模型加载完成 - `get_recal_cust`: 召回数量 - `get_recommend_list_by_gbdtlr`: 开始/完成 + 耗时 - `get_recommend_list_by_item2vec`: 开始/完成 + 耗时 - `get_recommend_and_delivery`: 投放分配完成 - [ ] **Step 2: 验证语法** Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('models/recommend.py').read()); print('OK')"` Expected: `OK` - [ ] **Step 3: Commit** ```bash git add models/recommend.py git commit -m "refactor(models): add logging to recommend module" ``` --- ### Task 14: 改进召回模块 — 替换 print 为 logger **Files:** - Modify: `models/recall/hot_recall.py` - Modify: `models/recall/itemCF/ItemCF.py` - Modify: `models/item2vec/inference.py` - [ ] **Step 1: 改进 hot_recall.py** 替换 `print("hot_recall: ...")` 为 `logger.info(...)`。添加 `from core import get_logger` 和 `logger = get_logger("models.recall.hot")`。 - [ ] **Step 2: 改进 ItemCF.py** 替换 `print(...)` 为 `logger.info(...)`。添加 `from core import get_logger` 和 `logger = get_logger("models.recall.itemcf")`。 - [ ] **Step 3: 改进 inference.py (item2vec)** 添加 `from core import get_logger` 和 `logger = get_logger("models.item2vec")`。在关键步骤添加日志。 - [ ] **Step 4: 验证语法** Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('models/recall/hot_recall.py').read()); ast.parse(open('models/recall/itemCF/ItemCF.py').read()); ast.parse(open('models/item2vec/inference.py').read()); print('OK')"` Expected: `OK` - [ ] **Step 5: Commit** ```bash git add models/recall/hot_recall.py models/recall/itemCF/ItemCF.py models/item2vec/inference.py git commit -m "refactor(models): replace print with logger in recall modules" ``` --- ### Task 15: 改进 `utils/file_stream.py` — 日志和错误处理 **Files:** - Modify: `utils/file_stream.py` - [ ] **Step 1: 添加日志和改进错误处理** 添加 `from core import get_logger` 和 `logger = get_logger("utils.file_stream")`。 - 上传时记录 file_id 和耗时 - 下载时记录 HTTP 状态码 - 失败时记录具体错误信息(替换 print) - 使用 `settings` 获取 URL 配置 ```python import time from core import get_logger, settings from core.exceptions import FileServiceException from io import BytesIO import os import pandas as pd import requests logger = get_logger("utils.file_stream") class FileStreamUtils: upload_url = settings.file_upload_url download_url = settings.file_download_url headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Accept": "*/*", } @staticmethod def upload_files(reports_dir, files): files_id = {} for filename in files: file_path = os.path.join(reports_dir, f"{filename}.xlsx") start_time = time.time() try: with open(file_path, "rb") as f: upload_files = {"file": (os.path.basename(file_path), f)} response = requests.post( FileStreamUtils.upload_url, headers=FileStreamUtils.headers, files=upload_files, verify=True, ) duration_ms = (time.time() - start_time) * 1000 if response.json().get("success"): file_id = response.json()["data"]["file_info"]["fileid"] files_id[filename] = file_id logger.info(f"File uploaded: {filename} -> {file_id} ({duration_ms:.0f}ms)") else: logger.error(f"Upload failed for {filename}: {response.text}") return None except requests.exceptions.RequestException as e: logger.error(f"Upload request error for {filename}: {e}", exc_info=True) return None except Exception as e: logger.error(f"Upload error for {filename}: {e}", exc_info=True) return None return files_id @staticmethod def download_file(file_id, file_type="xlsx"): """通过file_id从阿里云文件数据库下载文件""" start_time = time.time() try: response = requests.get( f"{FileStreamUtils.download_url}/{file_id}", headers=FileStreamUtils.headers, verify=True, ) duration_ms = (time.time() - start_time) * 1000 if response.status_code == 200: file_content = BytesIO(response.content) if file_type == "xlsx": data = pd.read_excel(file_content, engine="openpyxl") elif file_type == "csv": data = pd.read_csv(file_content) else: raise ValueError(f"不支持的文件类型:{file_type}") logger.info(f"File downloaded: {file_id} ({duration_ms:.0f}ms, {len(response.content)} bytes)") return data else: logger.error(f"Download failed: file_id={file_id}, status={response.status_code}") return None except requests.exceptions.RequestException as e: logger.error(f"Download request error: file_id={file_id}, error={e}", exc_info=True) return None except Exception as e: logger.error(f"Download error: file_id={file_id}, error={e}", exc_info=True) return None ``` - [ ] **Step 2: 验证语法** Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('utils/file_stream.py').read()); print('OK')"` Expected: `OK` - [ ] **Step 3: Commit** ```bash git add utils/file_stream.py git commit -m "refactor(utils): add logging and error details to file_stream" ``` --- ### Task 16: 改进 `utils/report_utils.py` 和 `train.py` — 日志 **Files:** - Modify: `utils/report_utils.py` - Modify: `train.py` - Modify: `models/rank/data/preprocess.py` - [ ] **Step 1: 改进 report_utils.py** 添加 `from core import get_logger` 和 `logger = get_logger("utils.report")`。 每个 `generate_*` 方法添加开始/完成日志。 - [ ] **Step 2: 改进 train.py** 替换所有 `print(...)` 为 `logger.info(...)`。添加 `from core import get_logger` 和 `logger = get_logger("train")`。 - [ ] **Step 3: 改进 preprocess.py** 替换 `print("gbdr-lr: ...")` 为 `logger.info(...)`。添加 `from core import get_logger` 和 `logger = get_logger("models.rank.preprocess")`。 - [ ] **Step 4: 验证语法** Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('utils/report_utils.py').read()); ast.parse(open('train.py').read()); ast.parse(open('models/rank/data/preprocess.py').read()); print('OK')"` Expected: `OK` - [ ] **Step 5: Commit** ```bash git add utils/report_utils.py train.py models/rank/data/preprocess.py git commit -m "refactor: replace print with logger in report_utils, train, preprocess" ``` --- ### Task 17: 配置文件变更 — 移除密码、创建 .env.example **Files:** - Modify: `config/database_config.yaml` - Create: `.env.example` - Modify: `config/config.py` - [ ] **Step 1: 更新 database_config.yaml(移除密码)** ```yaml mysql: host: 'rm-t4n6rz18y4t5x47y70o.mysql.singapore.rds.aliyuncs.com' port: 3036 db: 'brand_cultivation' user: 'BrandCultivation' # passwd 已移至环境变量 MYSQL_PASSWORD redis: host: 'r-t4nb4n9i8je7u6ogk1pd.redis.singapore.rds.aliyuncs.com' port: 5000 db: 10 # passwd 已移至环境变量 REDIS_PASSWORD ``` - [ ] **Step 2: 创建 .env.example** ```bash # BrandCultivation 环境变量配置 # 复制此文件为 .env 并填入实际值 # MySQL MYSQL_HOST=rm-t4n6rz18y4t5x47y70o.mysql.singapore.rds.aliyuncs.com MYSQL_PORT=3036 MYSQL_USER=BrandCultivation MYSQL_PASSWORD=your_mysql_password_here MYSQL_DB=brand_cultivation # Redis REDIS_HOST=r-t4nb4n9i8je7u6ogk1pd.redis.singapore.rds.aliyuncs.com REDIS_PORT=5000 REDIS_PASSWORD=your_redis_password_here REDIS_DB=10 # Logging LOG_LEVEL=INFO # File Service FILE_UPLOAD_URL=http://file-center.jcpt:8080/file/fileUpload FILE_DOWNLOAD_URL=http://file-center.jcpt:8080/file/fileDownload ``` - [ ] **Step 3: 更新 config/config.py 为兼容层** 保留旧接口以兼容未迁移的代码: ```python from core.config import settings def load_config(): return { "mysql": { "host": settings.mysql_host, "port": settings.mysql_port, "user": settings.mysql_user, "passwd": settings.mysql_password, "db": settings.mysql_db, }, "redis": { "host": settings.redis_host, "port": settings.redis_port, "passwd": settings.redis_password, "db": settings.redis_db, }, } def load_model_config(): return settings.model_config def load_service_config(): return { "aliyun": { "upload_url": settings.file_upload_url, "download_url": settings.file_download_url, } } ``` - [ ] **Step 4: 确保 .gitignore 包含 .env** Run: `cd /home/yangzeyu/project/BrandCultivation && grep -q "^\.env$" .gitignore 2>/dev/null || echo ".env" >> .gitignore` - [ ] **Step 5: Commit** ```bash git add config/database_config.yaml .env.example config/config.py .gitignore git commit -m "security: remove passwords from yaml, add .env.example" ``` --- ### Task 18: 最终验证 **Files:** None (verification only) - [ ] **Step 1: 验证所有模块可导入** Run: ```bash cd /home/yangzeyu/project/BrandCultivation && python -c " from core import get_logger, settings, AppException, DatabaseException, RequestLoggingMiddleware from database.db.mysql import MySqlDatabaseHelper from database.db.redis_db import RedisDatabaseHelper from database.dao.mysql_dao import MySqlDao from api.recommend import router as rec_router from api.report import router as rep_router from api.eval_report import router as eval_router from models.recommend import Recommend from utils.file_stream import FileStreamUtils from utils.report_utils import ReportUtils print('All imports successful') " ``` Expected: `All imports successful` - [ ] **Step 2: 验证 FastAPI 应用可启动(语法级别)** Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('run_api.py').read()); print('App syntax OK')"` Expected: `App syntax OK` - [ ] **Step 3: 最终 Commit(如有遗漏文件)** ```bash git status # 如有未提交的改动,补充提交 ```