2026-05-21-project-refactoring.md 41 KB

BrandCultivation 项目级重构实现计划

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: 为 BrandCultivation 项目添加日志系统、配置管理、异常处理、请求追踪,修复已知 bug,移除明文密码。

Architecture: 渐进式重构 — 新增 core/ 基础设施层,在现有模块中逐步替换 print 为 logger,添加错误处理和请求追踪。保持现有目录结构和业务逻辑不变。

Tech Stack: Python 3.x, FastAPI, SQLAlchemy, Redis, logging (stdlib), pathlib, contextvars, uuid


File Structure

New Files

  • core/__init__.py — 公共接口导出
  • core/logging.py — JSON 格式日志系统
  • core/config.py — 配置管理(YAML + 环境变量)
  • core/exceptions.py — 自定义异常体系
  • core/middleware.py — 请求日志和 request_id 中间件
  • .env.example — 环境变量模板

Modified Files

  • config/config.py — 废弃,改为从 core/config.py 导入
  • config/database_config.yaml — 移除密码
  • database/db/mysql.py — 日志、session context manager、配置来源
  • database/db/redis_db.py — 日志、配置来源、连接池
  • database/__init__.py — 更新导出
  • database/dao/mysql_dao.py — 日志、异常处理
  • run_api.py — 注册中间件、健康检查、异常处理器
  • api/recommend.py — 日志、错误处理
  • api/eval_report.py — 日志、错误处理
  • api/report.py — 日志
  • models/rank/gbdt_lr_inference.py — bug 修复、日志
  • models/recommend.py — 日志
  • models/recall/hot_recall.py — 替换 print
  • models/recall/itemCF/ItemCF.py — 替换 print
  • models/item2vec/inference.py — 日志
  • models/rank/data/preprocess.py — 替换 print
  • utils/file_stream.py — 日志、错误处理
  • utils/report_utils.py — 日志
  • train.py — 替换 print

Task 1: 创建 core/exceptions.py — 自定义异常体系

Files:

  • Create: core/exceptions.py

  • [ ] Step 1: 创建异常定义文件

    class AppException(Exception):
    """应用异常基类"""
    def __init__(self, code: int, message: str, detail: str = None):
        self.code = code
        self.message = message
        self.detail = detail
        super().__init__(message)
    
    
    class DatabaseException(AppException):
    """数据库操作失败"""
    def __init__(self, message: str = "数据库操作失败", detail: str = None):
        super().__init__(code=500, message=message, detail=detail)
    
    
    class ModelException(AppException):
    """模型推理失败"""
    def __init__(self, message: str = "模型推理失败", detail: str = None):
        super().__init__(code=500, message=message, detail=detail)
    
    
    class FileServiceException(AppException):
    """文件服务失败"""
    def __init__(self, message: str = "文件服务操作失败", detail: str = None):
        super().__init__(code=500, message=message, detail=detail)
    
    
    class ValidationException(AppException):
    """业务校验失败"""
    def __init__(self, message: str = "参数校验失败", detail: str = None):
        super().__init__(code=400, message=message, detail=detail)
    
  • [ ] Step 2: 验证导入

Run: cd /home/yangzeyu/project/BrandCultivation && python -c "from core.exceptions import AppException, DatabaseException, ModelException, FileServiceException, ValidationException; print('OK')" Expected: OK

  • [ ] Step 3: Commit

    git add core/exceptions.py
    git commit -m "feat(core): add custom exception hierarchy"
    

Task 2: 创建 core/config.py — 配置管理

Files:

  • Create: core/config.py

  • [ ] Step 1: 创建配置管理模块

    import os
    from pathlib import Path
    import yaml
    
    PROJECT_ROOT = Path(__file__).resolve().parent.parent
    
    
    def _get_env(key: str, default=None):
    """从环境变量获取值"""
    return os.environ.get(key, default)
    
    
    def _load_yaml(filename: str) -> dict:
    """加载 YAML 配置文件"""
    filepath = PROJECT_ROOT / "config" / filename
    with open(filepath, encoding="utf-8") as f:
        return yaml.safe_load(f) or {}
    
    
    class _Settings:
    """配置单例,支持环境变量覆盖 YAML 默认值"""
    
    def __init__(self):
        self._db_cfg = _load_yaml("database_config.yaml")
        self._model_cfg = _load_yaml("model_config.yaml")
        self._service_cfg = _load_yaml("service_config.yaml")
    
    @property
    def mysql_host(self) -> str:
        return _get_env("MYSQL_HOST", self._db_cfg.get("mysql", {}).get("host", "localhost"))
    
    @property
    def mysql_port(self) -> int:
        return int(_get_env("MYSQL_PORT", self._db_cfg.get("mysql", {}).get("port", 3306)))
    
    @property
    def mysql_user(self) -> str:
        return _get_env("MYSQL_USER", self._db_cfg.get("mysql", {}).get("user", "root"))
    
    @property
    def mysql_password(self) -> str:
        return _get_env("MYSQL_PASSWORD", self._db_cfg.get("mysql", {}).get("passwd", ""))
    
    @property
    def mysql_db(self) -> str:
        return _get_env("MYSQL_DB", self._db_cfg.get("mysql", {}).get("db", ""))
    
    @property
    def redis_host(self) -> str:
        return _get_env("REDIS_HOST", self._db_cfg.get("redis", {}).get("host", "localhost"))
    
    @property
    def redis_port(self) -> int:
        return int(_get_env("REDIS_PORT", self._db_cfg.get("redis", {}).get("port", 6379)))
    
    @property
    def redis_password(self) -> str:
        return _get_env("REDIS_PASSWORD", self._db_cfg.get("redis", {}).get("passwd", ""))
    
    @property
    def redis_db(self) -> int:
        return int(_get_env("REDIS_DB", self._db_cfg.get("redis", {}).get("db", 0)))
    
    @property
    def log_level(self) -> str:
        return _get_env("LOG_LEVEL", "INFO").upper()
    
    @property
    def file_upload_url(self) -> str:
        return _get_env("FILE_UPLOAD_URL", self._service_cfg.get("aliyun", {}).get("upload_url", ""))
    
    @property
    def file_download_url(self) -> str:
        return _get_env("FILE_DOWNLOAD_URL", self._service_cfg.get("aliyun", {}).get("download_url", ""))
    
    @property
    def model_config(self) -> dict:
        return self._model_cfg
    
    
    settings = _Settings()
    
  • [ ] Step 2: 验证导入

Run: cd /home/yangzeyu/project/BrandCultivation && python -c "from core.config import settings; print(settings.mysql_host)" Expected: 输出数据库 host 地址

  • [ ] Step 3: Commit

    git add core/config.py
    git commit -m "feat(core): add config management with env var override"
    

Task 3: 创建 core/logging.py — 日志系统

Files:

  • Create: core/logging.py

  • [ ] Step 1: 创建日志模块

    import logging
    import json
    import sys
    from contextvars import ContextVar
    from datetime import datetime, timezone
    
    request_id_var: ContextVar[str] = ContextVar("request_id", default="-")
    
    
    class JSONFormatter(logging.Formatter):
    """JSON 格式日志输出"""
    
    def format(self, record: logging.LogRecord) -> str:
        log_data = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "level": record.levelname,
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno,
            "message": record.getMessage(),
            "request_id": request_id_var.get("-"),
        }
        if record.exc_info and record.exc_info[0] is not None:
            log_data["exception"] = self.formatException(record.exc_info)
        if hasattr(record, "extra_data"):
            log_data["extra"] = record.extra_data
        return json.dumps(log_data, ensure_ascii=False)
    
    
    def get_logger(name: str) -> logging.Logger:
    """获取指定名称的 logger"""
    from core.config import settings
    
    logger = logging.getLogger(name)
    if not logger.handlers:
        handler = logging.StreamHandler(sys.stdout)
        handler.setFormatter(JSONFormatter())
        logger.addHandler(handler)
        logger.setLevel(getattr(logging, settings.log_level, logging.INFO))
        logger.propagate = False
    return logger
    
  • [ ] Step 2: 验证导入和基本功能

Run: cd /home/yangzeyu/project/BrandCultivation && python -c "from core.logging import get_logger; logger = get_logger('test'); logger.info('hello')" Expected: JSON 格式日志输出

  • [ ] Step 3: Commit

    git add core/logging.py
    git commit -m "feat(core): add JSON logging system with request_id support"
    

Task 4: 创建 core/middleware.py — 请求中间件

Files:

  • Create: core/middleware.py

  • [ ] Step 1: 创建中间件模块

    import time
    import uuid
    from starlette.middleware.base import BaseHTTPMiddleware
    from starlette.requests import Request
    from starlette.responses import Response
    from core.logging import get_logger, request_id_var
    
    logger = get_logger("middleware")
    
    
    def get_request_id() -> str:
    """获取当前请求的 request_id"""
    return request_id_var.get("-")
    
    
    class RequestLoggingMiddleware(BaseHTTPMiddleware):
    """请求日志中间件:生成 request_id,记录请求开始/结束"""
    
    async def dispatch(self, request: Request, call_next) -> Response:
        req_id = str(uuid.uuid4())[:8]
        request_id_var.set(req_id)
    
        start_time = time.time()
        client_ip = request.client.host if request.client else "unknown"
    
        logger.info(
            f"Request started: {request.method} {request.url.path}",
            extra={"extra_data": {"client_ip": client_ip, "method": request.method, "path": str(request.url.path)}},
        )
    
        try:
            response = await call_next(request)
        except Exception as e:
            duration_ms = (time.time() - start_time) * 1000
            logger.error(
                f"Request failed: {request.method} {request.url.path} ({duration_ms:.1f}ms)",
                exc_info=True,
            )
            raise
    
        duration_ms = (time.time() - start_time) * 1000
        logger.info(
            f"Request completed: {request.method} {request.url.path} -> {response.status_code} ({duration_ms:.1f}ms)",
            extra={"extra_data": {"status_code": response.status_code, "duration_ms": round(duration_ms, 1)}},
        )
    
        response.headers["X-Request-ID"] = req_id
        return response
    
  • [ ] Step 2: 验证导入

Run: cd /home/yangzeyu/project/BrandCultivation && python -c "from core.middleware import RequestLoggingMiddleware, get_request_id; print('OK')" Expected: OK

  • [ ] Step 3: Commit

    git add core/middleware.py
    git commit -m "feat(core): add request logging middleware with request_id tracking"
    

Task 5: 创建 core/__init__.py — 公共接口导出

Files:

  • Create: core/__init__.py

  • [ ] Step 1: 创建 init 文件

    from core.logging import get_logger, request_id_var
    from core.config import settings
    from core.exceptions import (
    AppException,
    DatabaseException,
    ModelException,
    FileServiceException,
    ValidationException,
    )
    from core.middleware import RequestLoggingMiddleware, get_request_id
    
    __all__ = [
    "get_logger",
    "request_id_var",
    "settings",
    "AppException",
    "DatabaseException",
    "ModelException",
    "FileServiceException",
    "ValidationException",
    "RequestLoggingMiddleware",
    "get_request_id",
    ]
    
  • [ ] Step 2: 验证完整导入

Run: cd /home/yangzeyu/project/BrandCultivation && python -c "from core import get_logger, settings, AppException, DatabaseException, RequestLoggingMiddleware, get_request_id; print('All imports OK')" Expected: All imports OK

  • [ ] Step 3: Commit

    git add core/__init__.py
    git commit -m "feat(core): add public interface exports"
    

Task 6: 改进 database/db/mysql.py — 日志、session 管理、配置来源

Files:

  • Modify: database/db/mysql.py

  • [ ] Step 1: 重写 mysql.py

替换整个文件内容为:

from contextlib import contextmanager
from core import get_logger, settings, DatabaseException
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError

logger = get_logger("database.mysql")


class MySqlDatabaseHelper:
    _instance = None

    def __new__(cls):
        if not cls._instance:
            cls._instance = super(MySqlDatabaseHelper, cls).__new__(cls)
            cls._instance._initialized = False
        return cls._instance

    def __init__(self):
        if self._initialized:
            return
        self._connect_database()
        self._initialized = True

    def _connect_database(self):
        try:
            conn_str = (
                f"mysql+pymysql://{settings.mysql_user}:{settings.mysql_password}"
                f"@{settings.mysql_host}:{settings.mysql_port}/{settings.mysql_db}"
            )
            self.engine = create_engine(
                conn_str,
                pool_size=20,
                max_overflow=30,
                pool_recycle=1800,
                pool_pre_ping=True,
                isolation_level="READ COMMITTED",
            )
            self._DBSession = sessionmaker(bind=self.engine)
            logger.info("MySQL connection pool created", extra={"extra_data": {"host": settings.mysql_host, "db": settings.mysql_db}})
        except Exception as e:
            logger.error("Failed to create MySQL connection", exc_info=True)
            raise DatabaseException(message="数据库连接失败", detail=str(e))

    @contextmanager
    def get_session(self):
        session = self._DBSession()
        try:
            yield session
            session.commit()
        except SQLAlchemyError as e:
            session.rollback()
            logger.error("Database operation failed", exc_info=True)
            raise DatabaseException(message="数据库操作失败", detail=str(e))
        finally:
            session.close()

    def load_data_with_page(self, query, params, page_size=100000):
        """分页查询数据"""
        count_query = text(f"SELECT COUNT(*) FROM ({query}) AS _count_subq")
        query += " LIMIT :limit OFFSET :offset"
        query = text(query)

        result = self.fetch_one(count_query, params)
        total_rows = result[0] if result is not None else 0

        if total_rows == 0:
            logger.debug("Query returned 0 rows")
            return pd.DataFrame()

        logger.debug(f"Loading {total_rows} rows with page_size={page_size}")
        data = pd.DataFrame()
        page = 1
        while True:
            offset = (page - 1) * page_size
            page_params = dict(params)
            page_params["limit"] = page_size
            page_params["offset"] = offset

            df = pd.DataFrame(self.fetch_all(query, page_params))
            if df.empty:
                break
            data = pd.concat([data, df], ignore_index=True)
            page += 1

        logger.debug(f"Loaded {len(data)} rows in {page - 1} pages")
        return data

    def fetch_all(self, query, params=None):
        """执行SQL查询并返回所有结果"""
        with self.get_session() as session:
            results = session.execute(query, params or {}).fetchall()
            return results

    def fetch_one(self, query, params=None):
        """执行SQL查询并返回单条结果"""
        with self.get_session() as session:
            result = session.execute(query, params or {}).fetchone()
            return result

    def insert_data(self, table_name, data_dict):
        """插入单条数据到指定表"""
        if not data_dict:
            return 0

        columns = ", ".join(data_dict.keys())
        values = ", ".join([f":{key}" for key in data_dict.keys()])
        query = text(f"INSERT INTO {table_name} ({columns}) VALUES ({values})")

        with self.get_session() as session:
            result = session.execute(query, data_dict)
            logger.info(f"Inserted 1 row into {table_name}")
            return result.rowcount

    def update_data(self, table_name, update_dict, conditions, condition_params=None):
        """更新表中符合条件的数据"""
        if not update_dict:
            return 0

        set_clause = ", ".join([f"{key} = :{key}" for key in update_dict.keys()])

        if len(conditions) == 1:
            where_clause = f"WHERE {conditions[0]}"
        elif len(conditions) > 1:
            where_clause = f"WHERE {' AND '.join(conditions)}"
        else:
            where_clause = ""

        query = text(f"UPDATE {table_name} SET {set_clause} {where_clause}")

        params = update_dict.copy()
        if condition_params:
            params.update(condition_params)

        with self.get_session() as session:
            result = session.execute(query, params)
            logger.info(f"Updated {result.rowcount} rows in {table_name}")
            return result.rowcount

    def execute_query(self, query, params=None):
        """执行SQL语句"""
        with self.get_session() as session:
            session.execute(query, params or {})

    def check_connection(self) -> bool:
        """检查数据库连接是否正常"""
        try:
            self.fetch_one(text("SELECT 1"), {})
            return True
        except Exception:
            return False
  • Step 2: 验证导入

Run: cd /home/yangzeyu/project/BrandCultivation && python -c "from database.db.mysql import MySqlDatabaseHelper; print('OK')" Expected: OK

  • [ ] Step 3: Commit

    git add database/db/mysql.py
    git commit -m "refactor(database): add logging, session context manager, env-based config"
    

Task 7: 改进 database/db/redis_db.py — 日志、配置来源、连接池

Files:

  • Modify: database/db/redis_db.py

  • [ ] Step 1: 重写 redis_db.py

替换整个文件内容为:

import redis
from core import get_logger, settings, DatabaseException

logger = get_logger("database.redis")


class RedisDatabaseHelper:
    _instance = None

    def __new__(cls):
        if not cls._instance:
            cls._instance = super(RedisDatabaseHelper, cls).__new__(cls)
            cls._instance._initialized = False
        return cls._instance

    def __init__(self):
        if self._initialized:
            return
        try:
            pool = redis.ConnectionPool(
                host=settings.redis_host,
                port=settings.redis_port,
                password=settings.redis_password,
                db=settings.redis_db,
                decode_responses=True,
                max_connections=50,
            )
            self.redis = redis.StrictRedis(connection_pool=pool)
            self.redis.ping()
            logger.info("Redis connection established", extra={"extra_data": {"host": settings.redis_host, "db": settings.redis_db}})
        except redis.ConnectionError as e:
            logger.error("Failed to connect to Redis", exc_info=True)
            raise DatabaseException(message="Redis连接失败", detail=str(e))
        self._initialized = True

    def check_connection(self) -> bool:
        """检查 Redis 连接是否正常"""
        try:
            self.redis.ping()
            return True
        except Exception:
            return False
  • Step 2: 验证导入

Run: cd /home/yangzeyu/project/BrandCultivation && python -c "from database.db.redis_db import RedisDatabaseHelper; print('OK')" Expected: OK

  • [ ] Step 3: Commit

    git add database/db/redis_db.py
    git commit -m "refactor(database): redis with logging, connection pool, env config"
    

Task 8: 改进 database/dao/mysql_dao.py — 日志和异常处理

Files:

  • Modify: database/dao/mysql_dao.py

  • [ ] Step 1: 添加日志和异常处理

在文件顶部添加 logger,在每个方法中添加日志记录。修复 get_report_file_id 的 None 处理。

关键改动:

  • 文件顶部添加: from core import get_loggerlogger = get_logger("database.dao")
  • 每个 public 方法入口添加 logger.info(...) 记录调用参数
  • get_report_file_id 中 result 为 None 时返回空 DataFrame 而非抛异常
  • 移除 data_preprocess 方法(不属于 DAO 层)

  • [ ] Step 2: 验证导入

Run: cd /home/yangzeyu/project/BrandCultivation && python -c "from database.dao.mysql_dao import MySqlDao; print('OK')" Expected: OK

  • [ ] Step 3: Commit

    git add database/dao/mysql_dao.py
    git commit -m "refactor(dao): add logging, fix get_report_file_id null handling"
    

Task 9: 改进 run_api.py — 注册中间件、健康检查、异常处理器

Files:

  • Modify: run_api.py

  • [ ] Step 1: 重写 run_api.py

    from api import recommend_router, report_router, eval_report_router
    from core import get_logger, AppException, RequestLoggingMiddleware, get_request_id
    from core.exceptions import DatabaseException
    from database.db.mysql import MySqlDatabaseHelper
    from database.db.redis_db import RedisDatabaseHelper
    from fastapi import FastAPI, Request, status
    from fastapi.exceptions import RequestValidationError
    from fastapi.responses import JSONResponse
    
    import uvicorn
    
    logger = get_logger("app")
    
    app = FastAPI()
    
    app.add_middleware(RequestLoggingMiddleware)
    
    
    @app.exception_handler(RequestValidationError)
    async def validation_exception_handler(request: Request, exc: RequestValidationError):
    logger.warning(f"Validation error: {exc.errors()}")
    return JSONResponse(
        status_code=status.HTTP_400_BAD_REQUEST,
        content={
            "code": 400,
            "msg": "请求参数错误",
            "data": {"detail": exc.errors(), "body": exc.body},
            "request_id": get_request_id(),
        },
    )
    
    
    @app.exception_handler(AppException)
    async def app_exception_handler(request: Request, exc: AppException):
    logger.error(f"AppException: {exc.message}", extra={"extra_data": {"detail": exc.detail}})
    return JSONResponse(
        status_code=exc.code,
        content={
            "code": exc.code,
            "msg": exc.message,
            "data": {"detail": exc.detail},
            "request_id": get_request_id(),
        },
    )
    
    
    @app.exception_handler(Exception)
    async def unhandled_exception_handler(request: Request, exc: Exception):
    logger.error("Unhandled exception", exc_info=True)
    return JSONResponse(
        status_code=500,
        content={
            "code": 500,
            "msg": "服务器内部错误",
            "data": None,
            "request_id": get_request_id(),
        },
    )
    
    
    @app.get("/health")
    async def health_check():
    """健康检查端点"""
    mysql_ok = False
    redis_ok = False
    try:
        mysql_ok = MySqlDatabaseHelper().check_connection()
    except Exception:
        pass
    try:
        redis_ok = RedisDatabaseHelper().check_connection()
    except Exception:
        pass
    
    healthy = mysql_ok and redis_ok
    return {
        "status": "healthy" if healthy else "degraded",
        "mysql": "ok" if mysql_ok else "error",
        "redis": "ok" if redis_ok else "error",
    }
    
    
    url_prefix = "/brandcultivation/api/v1"
    
    app.include_router(recommend_router, prefix=url_prefix)
    app.include_router(report_router, prefix=url_prefix)
    app.include_router(eval_report_router, prefix=url_prefix)
    
    if __name__ == "__main__":
    logger.info("Starting BrandCultivation API server on port 7960")
    uvicorn.run(app, host="0.0.0.0", port=7960)
    
  • [ ] Step 2: 验证语法

Run: cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('run_api.py').read()); print('Syntax OK')" Expected: Syntax OK

  • [ ] Step 3: Commit

    git add run_api.py
    git commit -m "refactor(api): add middleware, health check, global exception handlers"
    

Task 10: 改进 api/recommend.py — 日志和错误处理

Files:

  • Modify: api/recommend.py

  • [ ] Step 1: 添加日志和错误处理

关键改动:

  • 添加 from core import get_loggerlogger = get_logger("api.recommend")
  • 模型不存在时返回 404 状态码
  • generate_and_upload_report 加入 try/except + logger.error
  • 推荐过程添加关键日志(开始、召回数量、完成)

    from database import MySqlDao
    from fastapi import APIRouter, BackgroundTasks, HTTPException, status
    from .request_body import RecommendRequest
    from core import get_logger
    
    from models import Recommend
    import os
    from utils import FileStreamUtils, ReportUtils
    
    logger = get_logger("api.recommend")
    dao = MySqlDao()
    router = APIRouter()
    
    
    @router.post("/recommend")
    async def recommend(request: RecommendRequest, backgroundTasks: BackgroundTasks):
    """推荐接口"""
    logger.info(f"Recommend request: city={request.city_uuid}, product={request.product_code}, recall={request.recall_cust_count}")
    
    gbdtlr_model_path = os.path.join("./models/rank/weights", request.city_uuid, "gbdtlr_model.pkl")
    if not os.path.exists(gbdtlr_model_path):
        logger.warning(f"Model not found: {gbdtlr_model_path}")
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="该城市的模型未训练,请先进行训练",
        )
    
    recommend_model = Recommend(request.city_uuid)
    
    products_in_order = dao.get_product_from_order(request.city_uuid)["product_code"].unique().tolist()
    if request.product_code in products_in_order:
        logger.info(f"Using GBDT-LR model for existing product {request.product_code}")
        recommend_list = recommend_model.get_recommend_list_by_gbdtlr(request.product_code, recall_count=request.recall_cust_count)
    else:
        logger.info(f"Using Item2Vec model for new product {request.product_code}")
        recommend_list = recommend_model.get_recommend_list_by_item2vec(request.product_code, recall_count=request.recall_cust_count)
    
    recommend_data = recommend_model.get_recommend_and_delivery(recommend_list, delivery_count=request.delivery_count)
    request_data = []
    for index, data in enumerate(recommend_data):
        request_data.append(
            {
                "id": index + 1,
                "cust_code": data["cust_code"],
                "recommend_score": data["recommend_score"],
                "delivery_count": data["delivery_count"],
            }
        )
    
    logger.info(f"Recommend completed: {len(request_data)} customers recommended")
    
    backgroundTasks.add_task(generate_and_upload_report, request)
    
    return {"code": 200, "msg": "success", "data": {"recommendationInfo": request_data}}
    
    
    def generate_and_upload_report(request: RecommendRequest):
    """生成并上传报告到阿里云文件数据库"""
    logger.info(f"Background task started: generating report for {request.city_uuid}/{request.product_code}")
    try:
        report_util = ReportUtils(request.city_uuid, request.product_code)
        report_util.generate_all_data(request.recall_cust_count, request.delivery_count)
    
        reports_dir = os.path.join("./data/reports", request.city_uuid, request.product_code)
        report_files = ["卷烟信息表", "品规商户特征关系表", "相似卷烟表", "商户售卖推荐表"]
        file_id_map = FileStreamUtils.upload_files(reports_dir, report_files)
    
        if file_id_map is None:
            logger.error(f"Report upload failed for {request.city_uuid}/{request.product_code}")
            return
    
        data_dict = {
            "cultivacation_id": request.cultivacation_id,
            "city_uuid": request.city_uuid,
            "limit_cycle_name": request.limit_cycle_name,
            "product_code": request.product_code,
            "product_info_table": file_id_map.get("卷烟信息表"),
            "relation_table": file_id_map.get("品规商户特征关系表"),
            "similarity_product_table": file_id_map.get("相似卷烟表"),
            "recommend_table": file_id_map.get("商户售卖推荐表"),
        }
        dao.insert_report(data_dict)
        logger.info(f"Background task completed: report uploaded for {request.city_uuid}/{request.product_code}")
    except Exception as e:
        logger.error(f"Background task failed: {e}", exc_info=True)
    
  • [ ] Step 2: 验证语法

Run: cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('api/recommend.py').read()); print('Syntax OK')" Expected: Syntax OK

  • [ ] Step 3: Commit

    git add api/recommend.py
    git commit -m "refactor(api): add logging and error handling to recommend endpoint"
    

Task 11: 改进 api/eval_report.pyapi/report.py — 日志

Files:

  • Modify: api/eval_report.py
  • Modify: api/report.py

  • [ ] Step 1: 改进 eval_report.py

添加 from core import get_loggerlogger = get_logger("api.eval_report"),在每个步骤添加日志。

  • Step 2: 改进 report.py

添加 from core import get_loggerlogger = get_logger("api.report"),在关键步骤添加日志。

  • Step 3: 验证语法

Run: cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('api/eval_report.py').read()); ast.parse(open('api/report.py').read()); print('OK')" Expected: OK

  • [ ] Step 4: Commit

    git add api/eval_report.py api/report.py
    git commit -m "refactor(api): add logging to eval_report and report endpoints"
    

Task 12: 修复 models/rank/gbdt_lr_inference.py — Bug 修复 + 日志

Files:

  • Modify: models/rank/gbdt_lr_inference.py

  • [ ] Step 1: 修复 get_recommend_list 方法

get_recommend_list 方法中的循环从:

recommend_list = []
for cust_id, score in zip(recall_list, scores):
    recommend_list.append({cust_id: float(score)})
    recommend_list.append({"cust_code": cust_id, "recommend_score": score})

recommend_list = sorted(
    [item for item in recommend_list if "recommend_score" in item],
    key=lambda x: x["recommend_score"],
    reverse=True
)

修改为:

recommend_list = [
    {"cust_code": cust_id, "recommend_score": float(score)}
    for cust_id, score in zip(recall_list, scores)
]
recommend_list.sort(key=lambda x: x["recommend_score"], reverse=True)
  • Step 2: 添加日志

在文件顶部添加 from core import get_loggerlogger = get_logger("models.rank.gbdtlr")。 在 get_recommend_list 中添加推理耗时日志。 在 generate_shap_interance 中替换 print 为 logger。

  • Step 3: 验证语法

Run: cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('models/rank/gbdt_lr_inference.py').read()); print('OK')" Expected: OK

  • [ ] Step 4: Commit

    git add models/rank/gbdt_lr_inference.py
    git commit -m "fix(models): fix double-append bug in get_recommend_list, add logging"
    

Task 13: 改进 models/recommend.py — 日志

Files:

  • Modify: models/recommend.py

  • [ ] Step 1: 添加日志

添加 from core import get_loggerlogger = get_logger("models.recommend")。 在关键方法中添加日志:

  • _load_molde: 模型加载完成
  • get_recal_cust: 召回数量
  • get_recommend_list_by_gbdtlr: 开始/完成 + 耗时
  • get_recommend_list_by_item2vec: 开始/完成 + 耗时
  • get_recommend_and_delivery: 投放分配完成

  • [ ] Step 2: 验证语法

Run: cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('models/recommend.py').read()); print('OK')" Expected: OK

  • [ ] Step 3: Commit

    git add models/recommend.py
    git commit -m "refactor(models): add logging to recommend module"
    

Task 14: 改进召回模块 — 替换 print 为 logger

Files:

  • Modify: models/recall/hot_recall.py
  • Modify: models/recall/itemCF/ItemCF.py
  • Modify: models/item2vec/inference.py

  • [ ] Step 1: 改进 hot_recall.py

替换 print("hot_recall: ...")logger.info(...)。添加 from core import get_loggerlogger = get_logger("models.recall.hot")

  • Step 2: 改进 ItemCF.py

替换 print(...)logger.info(...)。添加 from core import get_loggerlogger = get_logger("models.recall.itemcf")

  • Step 3: 改进 inference.py (item2vec)

添加 from core import get_loggerlogger = get_logger("models.item2vec")。在关键步骤添加日志。

  • Step 4: 验证语法

Run: cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('models/recall/hot_recall.py').read()); ast.parse(open('models/recall/itemCF/ItemCF.py').read()); ast.parse(open('models/item2vec/inference.py').read()); print('OK')" Expected: OK

  • [ ] Step 5: Commit

    git add models/recall/hot_recall.py models/recall/itemCF/ItemCF.py models/item2vec/inference.py
    git commit -m "refactor(models): replace print with logger in recall modules"
    

Task 15: 改进 utils/file_stream.py — 日志和错误处理

Files:

  • Modify: utils/file_stream.py

  • [ ] Step 1: 添加日志和改进错误处理

添加 from core import get_loggerlogger = get_logger("utils.file_stream")

  • 上传时记录 file_id 和耗时
  • 下载时记录 HTTP 状态码
  • 失败时记录具体错误信息(替换 print)
  • 使用 settings 获取 URL 配置

    import time
    from core import get_logger, settings
    from core.exceptions import FileServiceException
    from io import BytesIO
    import os
    import pandas as pd
    import requests
    
    logger = get_logger("utils.file_stream")
    
    
    class FileStreamUtils:
    upload_url = settings.file_upload_url
    download_url = settings.file_download_url
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
        "Accept": "*/*",
    }
    
    @staticmethod
    def upload_files(reports_dir, files):
        files_id = {}
        for filename in files:
            file_path = os.path.join(reports_dir, f"{filename}.xlsx")
            start_time = time.time()
            try:
                with open(file_path, "rb") as f:
                    upload_files = {"file": (os.path.basename(file_path), f)}
                    response = requests.post(
                        FileStreamUtils.upload_url,
                        headers=FileStreamUtils.headers,
                        files=upload_files,
                        verify=True,
                    )
                    duration_ms = (time.time() - start_time) * 1000
                    if response.json().get("success"):
                        file_id = response.json()["data"]["file_info"]["fileid"]
                        files_id[filename] = file_id
                        logger.info(f"File uploaded: {filename} -> {file_id} ({duration_ms:.0f}ms)")
                    else:
                        logger.error(f"Upload failed for {filename}: {response.text}")
                        return None
            except requests.exceptions.RequestException as e:
                logger.error(f"Upload request error for {filename}: {e}", exc_info=True)
                return None
            except Exception as e:
                logger.error(f"Upload error for {filename}: {e}", exc_info=True)
                return None
        return files_id
    
    @staticmethod
    def download_file(file_id, file_type="xlsx"):
        """通过file_id从阿里云文件数据库下载文件"""
        start_time = time.time()
        try:
            response = requests.get(
                f"{FileStreamUtils.download_url}/{file_id}",
                headers=FileStreamUtils.headers,
                verify=True,
            )
            duration_ms = (time.time() - start_time) * 1000
    
            if response.status_code == 200:
                file_content = BytesIO(response.content)
                if file_type == "xlsx":
                    data = pd.read_excel(file_content, engine="openpyxl")
                elif file_type == "csv":
                    data = pd.read_csv(file_content)
                else:
                    raise ValueError(f"不支持的文件类型:{file_type}")
                logger.info(f"File downloaded: {file_id} ({duration_ms:.0f}ms, {len(response.content)} bytes)")
                return data
            else:
                logger.error(f"Download failed: file_id={file_id}, status={response.status_code}")
                return None
        except requests.exceptions.RequestException as e:
            logger.error(f"Download request error: file_id={file_id}, error={e}", exc_info=True)
            return None
        except Exception as e:
            logger.error(f"Download error: file_id={file_id}, error={e}", exc_info=True)
            return None
    
  • [ ] Step 2: 验证语法

Run: cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('utils/file_stream.py').read()); print('OK')" Expected: OK

  • [ ] Step 3: Commit

    git add utils/file_stream.py
    git commit -m "refactor(utils): add logging and error details to file_stream"
    

Task 16: 改进 utils/report_utils.pytrain.py — 日志

Files:

  • Modify: utils/report_utils.py
  • Modify: train.py
  • Modify: models/rank/data/preprocess.py

  • [ ] Step 1: 改进 report_utils.py

添加 from core import get_loggerlogger = get_logger("utils.report")。 每个 generate_* 方法添加开始/完成日志。

  • Step 2: 改进 train.py

替换所有 print(...)logger.info(...)。添加 from core import get_loggerlogger = get_logger("train")

  • Step 3: 改进 preprocess.py

替换 print("gbdr-lr: ...")logger.info(...)。添加 from core import get_loggerlogger = get_logger("models.rank.preprocess")

  • Step 4: 验证语法

Run: cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('utils/report_utils.py').read()); ast.parse(open('train.py').read()); ast.parse(open('models/rank/data/preprocess.py').read()); print('OK')" Expected: OK

  • [ ] Step 5: Commit

    git add utils/report_utils.py train.py models/rank/data/preprocess.py
    git commit -m "refactor: replace print with logger in report_utils, train, preprocess"
    

Task 17: 配置文件变更 — 移除密码、创建 .env.example

Files:

  • Modify: config/database_config.yaml
  • Create: .env.example
  • Modify: config/config.py

  • [ ] Step 1: 更新 database_config.yaml(移除密码)

    mysql:
    host: 'rm-t4n6rz18y4t5x47y70o.mysql.singapore.rds.aliyuncs.com'
    port: 3036
    db: 'brand_cultivation'
    user: 'BrandCultivation'
    # passwd 已移至环境变量 MYSQL_PASSWORD
    
    redis:
    host: 'r-t4nb4n9i8je7u6ogk1pd.redis.singapore.rds.aliyuncs.com'
    port: 5000
    db: 10
    # passwd 已移至环境变量 REDIS_PASSWORD
    
  • [ ] Step 2: 创建 .env.example

    # BrandCultivation 环境变量配置
    # 复制此文件为 .env 并填入实际值
    
    # MySQL
    MYSQL_HOST=rm-t4n6rz18y4t5x47y70o.mysql.singapore.rds.aliyuncs.com
    MYSQL_PORT=3036
    MYSQL_USER=BrandCultivation
    MYSQL_PASSWORD=your_mysql_password_here
    MYSQL_DB=brand_cultivation
    
    # Redis
    REDIS_HOST=r-t4nb4n9i8je7u6ogk1pd.redis.singapore.rds.aliyuncs.com
    REDIS_PORT=5000
    REDIS_PASSWORD=your_redis_password_here
    REDIS_DB=10
    
    # Logging
    LOG_LEVEL=INFO
    
    # File Service
    FILE_UPLOAD_URL=http://file-center.jcpt:8080/file/fileUpload
    FILE_DOWNLOAD_URL=http://file-center.jcpt:8080/file/fileDownload
    
  • [ ] Step 3: 更新 config/config.py 为兼容层

保留旧接口以兼容未迁移的代码:

from core.config import settings

def load_config():
    return {
        "mysql": {
            "host": settings.mysql_host,
            "port": settings.mysql_port,
            "user": settings.mysql_user,
            "passwd": settings.mysql_password,
            "db": settings.mysql_db,
        },
        "redis": {
            "host": settings.redis_host,
            "port": settings.redis_port,
            "passwd": settings.redis_password,
            "db": settings.redis_db,
        },
    }

def load_model_config():
    return settings.model_config

def load_service_config():
    return {
        "aliyun": {
            "upload_url": settings.file_upload_url,
            "download_url": settings.file_download_url,
        }
    }
  • Step 4: 确保 .gitignore 包含 .env

Run: cd /home/yangzeyu/project/BrandCultivation && grep -q "^\.env$" .gitignore 2>/dev/null || echo ".env" >> .gitignore

  • [ ] Step 5: Commit

    git add config/database_config.yaml .env.example config/config.py .gitignore
    git commit -m "security: remove passwords from yaml, add .env.example"
    

Task 18: 最终验证

Files: None (verification only)

  • Step 1: 验证所有模块可导入

Run:

cd /home/yangzeyu/project/BrandCultivation && python -c "
from core import get_logger, settings, AppException, DatabaseException, RequestLoggingMiddleware
from database.db.mysql import MySqlDatabaseHelper
from database.db.redis_db import RedisDatabaseHelper
from database.dao.mysql_dao import MySqlDao
from api.recommend import router as rec_router
from api.report import router as rep_router
from api.eval_report import router as eval_router
from models.recommend import Recommend
from utils.file_stream import FileStreamUtils
from utils.report_utils import ReportUtils
print('All imports successful')
"

Expected: All imports successful

  • Step 2: 验证 FastAPI 应用可启动(语法级别)

Run: cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('run_api.py').read()); print('App syntax OK')" Expected: App syntax OK

  • [ ] Step 3: 最终 Commit(如有遗漏文件)

    git status
    # 如有未提交的改动,补充提交