For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: 为 BrandCultivation 项目添加日志系统、配置管理、异常处理、请求追踪,修复已知 bug,移除明文密码。
Architecture: 渐进式重构 — 新增 core/ 基础设施层,在现有模块中逐步替换 print 为 logger,添加错误处理和请求追踪。保持现有目录结构和业务逻辑不变。
Tech Stack: Python 3.x, FastAPI, SQLAlchemy, Redis, logging (stdlib), pathlib, contextvars, uuid
core/__init__.py — 公共接口导出core/logging.py — JSON 格式日志系统core/config.py — 配置管理(YAML + 环境变量)core/exceptions.py — 自定义异常体系core/middleware.py — 请求日志和 request_id 中间件.env.example — 环境变量模板config/config.py — 废弃,改为从 core/config.py 导入config/database_config.yaml — 移除密码database/db/mysql.py — 日志、session context manager、配置来源database/db/redis_db.py — 日志、配置来源、连接池database/__init__.py — 更新导出database/dao/mysql_dao.py — 日志、异常处理run_api.py — 注册中间件、健康检查、异常处理器api/recommend.py — 日志、错误处理api/eval_report.py — 日志、错误处理api/report.py — 日志models/rank/gbdt_lr_inference.py — bug 修复、日志models/recommend.py — 日志models/recall/hot_recall.py — 替换 printmodels/recall/itemCF/ItemCF.py — 替换 printmodels/item2vec/inference.py — 日志models/rank/data/preprocess.py — 替换 printutils/file_stream.py — 日志、错误处理utils/report_utils.py — 日志train.py — 替换 printcore/exceptions.py — 自定义异常体系Files:
Create: core/exceptions.py
[ ] Step 1: 创建异常定义文件
class AppException(Exception):
"""应用异常基类"""
def __init__(self, code: int, message: str, detail: str = None):
self.code = code
self.message = message
self.detail = detail
super().__init__(message)
class DatabaseException(AppException):
"""数据库操作失败"""
def __init__(self, message: str = "数据库操作失败", detail: str = None):
super().__init__(code=500, message=message, detail=detail)
class ModelException(AppException):
"""模型推理失败"""
def __init__(self, message: str = "模型推理失败", detail: str = None):
super().__init__(code=500, message=message, detail=detail)
class FileServiceException(AppException):
"""文件服务失败"""
def __init__(self, message: str = "文件服务操作失败", detail: str = None):
super().__init__(code=500, message=message, detail=detail)
class ValidationException(AppException):
"""业务校验失败"""
def __init__(self, message: str = "参数校验失败", detail: str = None):
super().__init__(code=400, message=message, detail=detail)
[ ] Step 2: 验证导入
Run: cd /home/yangzeyu/project/BrandCultivation && python -c "from core.exceptions import AppException, DatabaseException, ModelException, FileServiceException, ValidationException; print('OK')"
Expected: OK
[ ] Step 3: Commit
git add core/exceptions.py
git commit -m "feat(core): add custom exception hierarchy"
core/config.py — 配置管理Files:
Create: core/config.py
[ ] Step 1: 创建配置管理模块
import os
from pathlib import Path
import yaml
PROJECT_ROOT = Path(__file__).resolve().parent.parent
def _get_env(key: str, default=None):
"""从环境变量获取值"""
return os.environ.get(key, default)
def _load_yaml(filename: str) -> dict:
"""加载 YAML 配置文件"""
filepath = PROJECT_ROOT / "config" / filename
with open(filepath, encoding="utf-8") as f:
return yaml.safe_load(f) or {}
class _Settings:
"""配置单例,支持环境变量覆盖 YAML 默认值"""
def __init__(self):
self._db_cfg = _load_yaml("database_config.yaml")
self._model_cfg = _load_yaml("model_config.yaml")
self._service_cfg = _load_yaml("service_config.yaml")
@property
def mysql_host(self) -> str:
return _get_env("MYSQL_HOST", self._db_cfg.get("mysql", {}).get("host", "localhost"))
@property
def mysql_port(self) -> int:
return int(_get_env("MYSQL_PORT", self._db_cfg.get("mysql", {}).get("port", 3306)))
@property
def mysql_user(self) -> str:
return _get_env("MYSQL_USER", self._db_cfg.get("mysql", {}).get("user", "root"))
@property
def mysql_password(self) -> str:
return _get_env("MYSQL_PASSWORD", self._db_cfg.get("mysql", {}).get("passwd", ""))
@property
def mysql_db(self) -> str:
return _get_env("MYSQL_DB", self._db_cfg.get("mysql", {}).get("db", ""))
@property
def redis_host(self) -> str:
return _get_env("REDIS_HOST", self._db_cfg.get("redis", {}).get("host", "localhost"))
@property
def redis_port(self) -> int:
return int(_get_env("REDIS_PORT", self._db_cfg.get("redis", {}).get("port", 6379)))
@property
def redis_password(self) -> str:
return _get_env("REDIS_PASSWORD", self._db_cfg.get("redis", {}).get("passwd", ""))
@property
def redis_db(self) -> int:
return int(_get_env("REDIS_DB", self._db_cfg.get("redis", {}).get("db", 0)))
@property
def log_level(self) -> str:
return _get_env("LOG_LEVEL", "INFO").upper()
@property
def file_upload_url(self) -> str:
return _get_env("FILE_UPLOAD_URL", self._service_cfg.get("aliyun", {}).get("upload_url", ""))
@property
def file_download_url(self) -> str:
return _get_env("FILE_DOWNLOAD_URL", self._service_cfg.get("aliyun", {}).get("download_url", ""))
@property
def model_config(self) -> dict:
return self._model_cfg
settings = _Settings()
[ ] Step 2: 验证导入
Run: cd /home/yangzeyu/project/BrandCultivation && python -c "from core.config import settings; print(settings.mysql_host)"
Expected: 输出数据库 host 地址
[ ] Step 3: Commit
git add core/config.py
git commit -m "feat(core): add config management with env var override"
core/logging.py — 日志系统Files:
Create: core/logging.py
[ ] Step 1: 创建日志模块
import logging
import json
import sys
from contextvars import ContextVar
from datetime import datetime, timezone
request_id_var: ContextVar[str] = ContextVar("request_id", default="-")
class JSONFormatter(logging.Formatter):
"""JSON 格式日志输出"""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"module": record.module,
"function": record.funcName,
"line": record.lineno,
"message": record.getMessage(),
"request_id": request_id_var.get("-"),
}
if record.exc_info and record.exc_info[0] is not None:
log_data["exception"] = self.formatException(record.exc_info)
if hasattr(record, "extra_data"):
log_data["extra"] = record.extra_data
return json.dumps(log_data, ensure_ascii=False)
def get_logger(name: str) -> logging.Logger:
"""获取指定名称的 logger"""
from core.config import settings
logger = logging.getLogger(name)
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(getattr(logging, settings.log_level, logging.INFO))
logger.propagate = False
return logger
[ ] Step 2: 验证导入和基本功能
Run: cd /home/yangzeyu/project/BrandCultivation && python -c "from core.logging import get_logger; logger = get_logger('test'); logger.info('hello')"
Expected: JSON 格式日志输出
[ ] Step 3: Commit
git add core/logging.py
git commit -m "feat(core): add JSON logging system with request_id support"
core/middleware.py — 请求中间件Files:
Create: core/middleware.py
[ ] Step 1: 创建中间件模块
import time
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
from core.logging import get_logger, request_id_var
logger = get_logger("middleware")
def get_request_id() -> str:
"""获取当前请求的 request_id"""
return request_id_var.get("-")
class RequestLoggingMiddleware(BaseHTTPMiddleware):
"""请求日志中间件:生成 request_id,记录请求开始/结束"""
async def dispatch(self, request: Request, call_next) -> Response:
req_id = str(uuid.uuid4())[:8]
request_id_var.set(req_id)
start_time = time.time()
client_ip = request.client.host if request.client else "unknown"
logger.info(
f"Request started: {request.method} {request.url.path}",
extra={"extra_data": {"client_ip": client_ip, "method": request.method, "path": str(request.url.path)}},
)
try:
response = await call_next(request)
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
logger.error(
f"Request failed: {request.method} {request.url.path} ({duration_ms:.1f}ms)",
exc_info=True,
)
raise
duration_ms = (time.time() - start_time) * 1000
logger.info(
f"Request completed: {request.method} {request.url.path} -> {response.status_code} ({duration_ms:.1f}ms)",
extra={"extra_data": {"status_code": response.status_code, "duration_ms": round(duration_ms, 1)}},
)
response.headers["X-Request-ID"] = req_id
return response
[ ] Step 2: 验证导入
Run: cd /home/yangzeyu/project/BrandCultivation && python -c "from core.middleware import RequestLoggingMiddleware, get_request_id; print('OK')"
Expected: OK
[ ] Step 3: Commit
git add core/middleware.py
git commit -m "feat(core): add request logging middleware with request_id tracking"
core/__init__.py — 公共接口导出Files:
Create: core/__init__.py
[ ] Step 1: 创建 init 文件
from core.logging import get_logger, request_id_var
from core.config import settings
from core.exceptions import (
AppException,
DatabaseException,
ModelException,
FileServiceException,
ValidationException,
)
from core.middleware import RequestLoggingMiddleware, get_request_id
__all__ = [
"get_logger",
"request_id_var",
"settings",
"AppException",
"DatabaseException",
"ModelException",
"FileServiceException",
"ValidationException",
"RequestLoggingMiddleware",
"get_request_id",
]
[ ] Step 2: 验证完整导入
Run: cd /home/yangzeyu/project/BrandCultivation && python -c "from core import get_logger, settings, AppException, DatabaseException, RequestLoggingMiddleware, get_request_id; print('All imports OK')"
Expected: All imports OK
[ ] Step 3: Commit
git add core/__init__.py
git commit -m "feat(core): add public interface exports"
database/db/mysql.py — 日志、session 管理、配置来源Files:
Modify: database/db/mysql.py
[ ] Step 1: 重写 mysql.py
替换整个文件内容为:
from contextlib import contextmanager
from core import get_logger, settings, DatabaseException
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError
logger = get_logger("database.mysql")
class MySqlDatabaseHelper:
_instance = None
def __new__(cls):
if not cls._instance:
cls._instance = super(MySqlDatabaseHelper, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._connect_database()
self._initialized = True
def _connect_database(self):
try:
conn_str = (
f"mysql+pymysql://{settings.mysql_user}:{settings.mysql_password}"
f"@{settings.mysql_host}:{settings.mysql_port}/{settings.mysql_db}"
)
self.engine = create_engine(
conn_str,
pool_size=20,
max_overflow=30,
pool_recycle=1800,
pool_pre_ping=True,
isolation_level="READ COMMITTED",
)
self._DBSession = sessionmaker(bind=self.engine)
logger.info("MySQL connection pool created", extra={"extra_data": {"host": settings.mysql_host, "db": settings.mysql_db}})
except Exception as e:
logger.error("Failed to create MySQL connection", exc_info=True)
raise DatabaseException(message="数据库连接失败", detail=str(e))
@contextmanager
def get_session(self):
session = self._DBSession()
try:
yield session
session.commit()
except SQLAlchemyError as e:
session.rollback()
logger.error("Database operation failed", exc_info=True)
raise DatabaseException(message="数据库操作失败", detail=str(e))
finally:
session.close()
def load_data_with_page(self, query, params, page_size=100000):
"""分页查询数据"""
count_query = text(f"SELECT COUNT(*) FROM ({query}) AS _count_subq")
query += " LIMIT :limit OFFSET :offset"
query = text(query)
result = self.fetch_one(count_query, params)
total_rows = result[0] if result is not None else 0
if total_rows == 0:
logger.debug("Query returned 0 rows")
return pd.DataFrame()
logger.debug(f"Loading {total_rows} rows with page_size={page_size}")
data = pd.DataFrame()
page = 1
while True:
offset = (page - 1) * page_size
page_params = dict(params)
page_params["limit"] = page_size
page_params["offset"] = offset
df = pd.DataFrame(self.fetch_all(query, page_params))
if df.empty:
break
data = pd.concat([data, df], ignore_index=True)
page += 1
logger.debug(f"Loaded {len(data)} rows in {page - 1} pages")
return data
def fetch_all(self, query, params=None):
"""执行SQL查询并返回所有结果"""
with self.get_session() as session:
results = session.execute(query, params or {}).fetchall()
return results
def fetch_one(self, query, params=None):
"""执行SQL查询并返回单条结果"""
with self.get_session() as session:
result = session.execute(query, params or {}).fetchone()
return result
def insert_data(self, table_name, data_dict):
"""插入单条数据到指定表"""
if not data_dict:
return 0
columns = ", ".join(data_dict.keys())
values = ", ".join([f":{key}" for key in data_dict.keys()])
query = text(f"INSERT INTO {table_name} ({columns}) VALUES ({values})")
with self.get_session() as session:
result = session.execute(query, data_dict)
logger.info(f"Inserted 1 row into {table_name}")
return result.rowcount
def update_data(self, table_name, update_dict, conditions, condition_params=None):
"""更新表中符合条件的数据"""
if not update_dict:
return 0
set_clause = ", ".join([f"{key} = :{key}" for key in update_dict.keys()])
if len(conditions) == 1:
where_clause = f"WHERE {conditions[0]}"
elif len(conditions) > 1:
where_clause = f"WHERE {' AND '.join(conditions)}"
else:
where_clause = ""
query = text(f"UPDATE {table_name} SET {set_clause} {where_clause}")
params = update_dict.copy()
if condition_params:
params.update(condition_params)
with self.get_session() as session:
result = session.execute(query, params)
logger.info(f"Updated {result.rowcount} rows in {table_name}")
return result.rowcount
def execute_query(self, query, params=None):
"""执行SQL语句"""
with self.get_session() as session:
session.execute(query, params or {})
def check_connection(self) -> bool:
"""检查数据库连接是否正常"""
try:
self.fetch_one(text("SELECT 1"), {})
return True
except Exception:
return False
Run: cd /home/yangzeyu/project/BrandCultivation && python -c "from database.db.mysql import MySqlDatabaseHelper; print('OK')"
Expected: OK
[ ] Step 3: Commit
git add database/db/mysql.py
git commit -m "refactor(database): add logging, session context manager, env-based config"
database/db/redis_db.py — 日志、配置来源、连接池Files:
Modify: database/db/redis_db.py
[ ] Step 1: 重写 redis_db.py
替换整个文件内容为:
import redis
from core import get_logger, settings, DatabaseException
logger = get_logger("database.redis")
class RedisDatabaseHelper:
_instance = None
def __new__(cls):
if not cls._instance:
cls._instance = super(RedisDatabaseHelper, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
try:
pool = redis.ConnectionPool(
host=settings.redis_host,
port=settings.redis_port,
password=settings.redis_password,
db=settings.redis_db,
decode_responses=True,
max_connections=50,
)
self.redis = redis.StrictRedis(connection_pool=pool)
self.redis.ping()
logger.info("Redis connection established", extra={"extra_data": {"host": settings.redis_host, "db": settings.redis_db}})
except redis.ConnectionError as e:
logger.error("Failed to connect to Redis", exc_info=True)
raise DatabaseException(message="Redis连接失败", detail=str(e))
self._initialized = True
def check_connection(self) -> bool:
"""检查 Redis 连接是否正常"""
try:
self.redis.ping()
return True
except Exception:
return False
Run: cd /home/yangzeyu/project/BrandCultivation && python -c "from database.db.redis_db import RedisDatabaseHelper; print('OK')"
Expected: OK
[ ] Step 3: Commit
git add database/db/redis_db.py
git commit -m "refactor(database): redis with logging, connection pool, env config"
database/dao/mysql_dao.py — 日志和异常处理Files:
Modify: database/dao/mysql_dao.py
[ ] Step 1: 添加日志和异常处理
在文件顶部添加 logger,在每个方法中添加日志记录。修复 get_report_file_id 的 None 处理。
关键改动:
from core import get_logger 和 logger = get_logger("database.dao")logger.info(...) 记录调用参数get_report_file_id 中 result 为 None 时返回空 DataFrame 而非抛异常移除 data_preprocess 方法(不属于 DAO 层)
[ ] Step 2: 验证导入
Run: cd /home/yangzeyu/project/BrandCultivation && python -c "from database.dao.mysql_dao import MySqlDao; print('OK')"
Expected: OK
[ ] Step 3: Commit
git add database/dao/mysql_dao.py
git commit -m "refactor(dao): add logging, fix get_report_file_id null handling"
run_api.py — 注册中间件、健康检查、异常处理器Files:
Modify: run_api.py
[ ] Step 1: 重写 run_api.py
from api import recommend_router, report_router, eval_report_router
from core import get_logger, AppException, RequestLoggingMiddleware, get_request_id
from core.exceptions import DatabaseException
from database.db.mysql import MySqlDatabaseHelper
from database.db.redis_db import RedisDatabaseHelper
from fastapi import FastAPI, Request, status
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
import uvicorn
logger = get_logger("app")
app = FastAPI()
app.add_middleware(RequestLoggingMiddleware)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
logger.warning(f"Validation error: {exc.errors()}")
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={
"code": 400,
"msg": "请求参数错误",
"data": {"detail": exc.errors(), "body": exc.body},
"request_id": get_request_id(),
},
)
@app.exception_handler(AppException)
async def app_exception_handler(request: Request, exc: AppException):
logger.error(f"AppException: {exc.message}", extra={"extra_data": {"detail": exc.detail}})
return JSONResponse(
status_code=exc.code,
content={
"code": exc.code,
"msg": exc.message,
"data": {"detail": exc.detail},
"request_id": get_request_id(),
},
)
@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception):
logger.error("Unhandled exception", exc_info=True)
return JSONResponse(
status_code=500,
content={
"code": 500,
"msg": "服务器内部错误",
"data": None,
"request_id": get_request_id(),
},
)
@app.get("/health")
async def health_check():
"""健康检查端点"""
mysql_ok = False
redis_ok = False
try:
mysql_ok = MySqlDatabaseHelper().check_connection()
except Exception:
pass
try:
redis_ok = RedisDatabaseHelper().check_connection()
except Exception:
pass
healthy = mysql_ok and redis_ok
return {
"status": "healthy" if healthy else "degraded",
"mysql": "ok" if mysql_ok else "error",
"redis": "ok" if redis_ok else "error",
}
url_prefix = "/brandcultivation/api/v1"
app.include_router(recommend_router, prefix=url_prefix)
app.include_router(report_router, prefix=url_prefix)
app.include_router(eval_report_router, prefix=url_prefix)
if __name__ == "__main__":
logger.info("Starting BrandCultivation API server on port 7960")
uvicorn.run(app, host="0.0.0.0", port=7960)
[ ] Step 2: 验证语法
Run: cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('run_api.py').read()); print('Syntax OK')"
Expected: Syntax OK
[ ] Step 3: Commit
git add run_api.py
git commit -m "refactor(api): add middleware, health check, global exception handlers"
api/recommend.py — 日志和错误处理Files:
Modify: api/recommend.py
[ ] Step 1: 添加日志和错误处理
关键改动:
from core import get_logger 和 logger = get_logger("api.recommend")generate_and_upload_report 加入 try/except + logger.error推荐过程添加关键日志(开始、召回数量、完成)
from database import MySqlDao
from fastapi import APIRouter, BackgroundTasks, HTTPException, status
from .request_body import RecommendRequest
from core import get_logger
from models import Recommend
import os
from utils import FileStreamUtils, ReportUtils
logger = get_logger("api.recommend")
dao = MySqlDao()
router = APIRouter()
@router.post("/recommend")
async def recommend(request: RecommendRequest, backgroundTasks: BackgroundTasks):
"""推荐接口"""
logger.info(f"Recommend request: city={request.city_uuid}, product={request.product_code}, recall={request.recall_cust_count}")
gbdtlr_model_path = os.path.join("./models/rank/weights", request.city_uuid, "gbdtlr_model.pkl")
if not os.path.exists(gbdtlr_model_path):
logger.warning(f"Model not found: {gbdtlr_model_path}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="该城市的模型未训练,请先进行训练",
)
recommend_model = Recommend(request.city_uuid)
products_in_order = dao.get_product_from_order(request.city_uuid)["product_code"].unique().tolist()
if request.product_code in products_in_order:
logger.info(f"Using GBDT-LR model for existing product {request.product_code}")
recommend_list = recommend_model.get_recommend_list_by_gbdtlr(request.product_code, recall_count=request.recall_cust_count)
else:
logger.info(f"Using Item2Vec model for new product {request.product_code}")
recommend_list = recommend_model.get_recommend_list_by_item2vec(request.product_code, recall_count=request.recall_cust_count)
recommend_data = recommend_model.get_recommend_and_delivery(recommend_list, delivery_count=request.delivery_count)
request_data = []
for index, data in enumerate(recommend_data):
request_data.append(
{
"id": index + 1,
"cust_code": data["cust_code"],
"recommend_score": data["recommend_score"],
"delivery_count": data["delivery_count"],
}
)
logger.info(f"Recommend completed: {len(request_data)} customers recommended")
backgroundTasks.add_task(generate_and_upload_report, request)
return {"code": 200, "msg": "success", "data": {"recommendationInfo": request_data}}
def generate_and_upload_report(request: RecommendRequest):
"""生成并上传报告到阿里云文件数据库"""
logger.info(f"Background task started: generating report for {request.city_uuid}/{request.product_code}")
try:
report_util = ReportUtils(request.city_uuid, request.product_code)
report_util.generate_all_data(request.recall_cust_count, request.delivery_count)
reports_dir = os.path.join("./data/reports", request.city_uuid, request.product_code)
report_files = ["卷烟信息表", "品规商户特征关系表", "相似卷烟表", "商户售卖推荐表"]
file_id_map = FileStreamUtils.upload_files(reports_dir, report_files)
if file_id_map is None:
logger.error(f"Report upload failed for {request.city_uuid}/{request.product_code}")
return
data_dict = {
"cultivacation_id": request.cultivacation_id,
"city_uuid": request.city_uuid,
"limit_cycle_name": request.limit_cycle_name,
"product_code": request.product_code,
"product_info_table": file_id_map.get("卷烟信息表"),
"relation_table": file_id_map.get("品规商户特征关系表"),
"similarity_product_table": file_id_map.get("相似卷烟表"),
"recommend_table": file_id_map.get("商户售卖推荐表"),
}
dao.insert_report(data_dict)
logger.info(f"Background task completed: report uploaded for {request.city_uuid}/{request.product_code}")
except Exception as e:
logger.error(f"Background task failed: {e}", exc_info=True)
[ ] Step 2: 验证语法
Run: cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('api/recommend.py').read()); print('Syntax OK')"
Expected: Syntax OK
[ ] Step 3: Commit
git add api/recommend.py
git commit -m "refactor(api): add logging and error handling to recommend endpoint"
api/eval_report.py 和 api/report.py — 日志Files:
api/eval_report.pyModify: api/report.py
[ ] Step 1: 改进 eval_report.py
添加 from core import get_logger 和 logger = get_logger("api.eval_report"),在每个步骤添加日志。
添加 from core import get_logger 和 logger = get_logger("api.report"),在关键步骤添加日志。
Run: cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('api/eval_report.py').read()); ast.parse(open('api/report.py').read()); print('OK')"
Expected: OK
[ ] Step 4: Commit
git add api/eval_report.py api/report.py
git commit -m "refactor(api): add logging to eval_report and report endpoints"
models/rank/gbdt_lr_inference.py — Bug 修复 + 日志Files:
Modify: models/rank/gbdt_lr_inference.py
[ ] Step 1: 修复 get_recommend_list 方法
将 get_recommend_list 方法中的循环从:
recommend_list = []
for cust_id, score in zip(recall_list, scores):
recommend_list.append({cust_id: float(score)})
recommend_list.append({"cust_code": cust_id, "recommend_score": score})
recommend_list = sorted(
[item for item in recommend_list if "recommend_score" in item],
key=lambda x: x["recommend_score"],
reverse=True
)
修改为:
recommend_list = [
{"cust_code": cust_id, "recommend_score": float(score)}
for cust_id, score in zip(recall_list, scores)
]
recommend_list.sort(key=lambda x: x["recommend_score"], reverse=True)
在文件顶部添加 from core import get_logger 和 logger = get_logger("models.rank.gbdtlr")。
在 get_recommend_list 中添加推理耗时日志。
在 generate_shap_interance 中替换 print 为 logger。
Run: cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('models/rank/gbdt_lr_inference.py').read()); print('OK')"
Expected: OK
[ ] Step 4: Commit
git add models/rank/gbdt_lr_inference.py
git commit -m "fix(models): fix double-append bug in get_recommend_list, add logging"
models/recommend.py — 日志Files:
Modify: models/recommend.py
[ ] Step 1: 添加日志
添加 from core import get_logger 和 logger = get_logger("models.recommend")。
在关键方法中添加日志:
_load_molde: 模型加载完成get_recal_cust: 召回数量get_recommend_list_by_gbdtlr: 开始/完成 + 耗时get_recommend_list_by_item2vec: 开始/完成 + 耗时get_recommend_and_delivery: 投放分配完成
[ ] Step 2: 验证语法
Run: cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('models/recommend.py').read()); print('OK')"
Expected: OK
[ ] Step 3: Commit
git add models/recommend.py
git commit -m "refactor(models): add logging to recommend module"
Files:
models/recall/hot_recall.pymodels/recall/itemCF/ItemCF.pyModify: models/item2vec/inference.py
[ ] Step 1: 改进 hot_recall.py
替换 print("hot_recall: ...") 为 logger.info(...)。添加 from core import get_logger 和 logger = get_logger("models.recall.hot")。
替换 print(...) 为 logger.info(...)。添加 from core import get_logger 和 logger = get_logger("models.recall.itemcf")。
添加 from core import get_logger 和 logger = get_logger("models.item2vec")。在关键步骤添加日志。
Run: cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('models/recall/hot_recall.py').read()); ast.parse(open('models/recall/itemCF/ItemCF.py').read()); ast.parse(open('models/item2vec/inference.py').read()); print('OK')"
Expected: OK
[ ] Step 5: Commit
git add models/recall/hot_recall.py models/recall/itemCF/ItemCF.py models/item2vec/inference.py
git commit -m "refactor(models): replace print with logger in recall modules"
utils/file_stream.py — 日志和错误处理Files:
Modify: utils/file_stream.py
[ ] Step 1: 添加日志和改进错误处理
添加 from core import get_logger 和 logger = get_logger("utils.file_stream")。
使用 settings 获取 URL 配置
import time
from core import get_logger, settings
from core.exceptions import FileServiceException
from io import BytesIO
import os
import pandas as pd
import requests
logger = get_logger("utils.file_stream")
class FileStreamUtils:
upload_url = settings.file_upload_url
download_url = settings.file_download_url
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "*/*",
}
@staticmethod
def upload_files(reports_dir, files):
files_id = {}
for filename in files:
file_path = os.path.join(reports_dir, f"{filename}.xlsx")
start_time = time.time()
try:
with open(file_path, "rb") as f:
upload_files = {"file": (os.path.basename(file_path), f)}
response = requests.post(
FileStreamUtils.upload_url,
headers=FileStreamUtils.headers,
files=upload_files,
verify=True,
)
duration_ms = (time.time() - start_time) * 1000
if response.json().get("success"):
file_id = response.json()["data"]["file_info"]["fileid"]
files_id[filename] = file_id
logger.info(f"File uploaded: {filename} -> {file_id} ({duration_ms:.0f}ms)")
else:
logger.error(f"Upload failed for {filename}: {response.text}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Upload request error for {filename}: {e}", exc_info=True)
return None
except Exception as e:
logger.error(f"Upload error for {filename}: {e}", exc_info=True)
return None
return files_id
@staticmethod
def download_file(file_id, file_type="xlsx"):
"""通过file_id从阿里云文件数据库下载文件"""
start_time = time.time()
try:
response = requests.get(
f"{FileStreamUtils.download_url}/{file_id}",
headers=FileStreamUtils.headers,
verify=True,
)
duration_ms = (time.time() - start_time) * 1000
if response.status_code == 200:
file_content = BytesIO(response.content)
if file_type == "xlsx":
data = pd.read_excel(file_content, engine="openpyxl")
elif file_type == "csv":
data = pd.read_csv(file_content)
else:
raise ValueError(f"不支持的文件类型:{file_type}")
logger.info(f"File downloaded: {file_id} ({duration_ms:.0f}ms, {len(response.content)} bytes)")
return data
else:
logger.error(f"Download failed: file_id={file_id}, status={response.status_code}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Download request error: file_id={file_id}, error={e}", exc_info=True)
return None
except Exception as e:
logger.error(f"Download error: file_id={file_id}, error={e}", exc_info=True)
return None
[ ] Step 2: 验证语法
Run: cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('utils/file_stream.py').read()); print('OK')"
Expected: OK
[ ] Step 3: Commit
git add utils/file_stream.py
git commit -m "refactor(utils): add logging and error details to file_stream"
utils/report_utils.py 和 train.py — 日志Files:
utils/report_utils.pytrain.pyModify: models/rank/data/preprocess.py
[ ] Step 1: 改进 report_utils.py
添加 from core import get_logger 和 logger = get_logger("utils.report")。
每个 generate_* 方法添加开始/完成日志。
替换所有 print(...) 为 logger.info(...)。添加 from core import get_logger 和 logger = get_logger("train")。
替换 print("gbdr-lr: ...") 为 logger.info(...)。添加 from core import get_logger 和 logger = get_logger("models.rank.preprocess")。
Run: cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('utils/report_utils.py').read()); ast.parse(open('train.py').read()); ast.parse(open('models/rank/data/preprocess.py').read()); print('OK')"
Expected: OK
[ ] Step 5: Commit
git add utils/report_utils.py train.py models/rank/data/preprocess.py
git commit -m "refactor: replace print with logger in report_utils, train, preprocess"
Files:
config/database_config.yaml.env.exampleModify: config/config.py
[ ] Step 1: 更新 database_config.yaml(移除密码)
mysql:
host: 'rm-t4n6rz18y4t5x47y70o.mysql.singapore.rds.aliyuncs.com'
port: 3036
db: 'brand_cultivation'
user: 'BrandCultivation'
# passwd 已移至环境变量 MYSQL_PASSWORD
redis:
host: 'r-t4nb4n9i8je7u6ogk1pd.redis.singapore.rds.aliyuncs.com'
port: 5000
db: 10
# passwd 已移至环境变量 REDIS_PASSWORD
[ ] Step 2: 创建 .env.example
# BrandCultivation 环境变量配置
# 复制此文件为 .env 并填入实际值
# MySQL
MYSQL_HOST=rm-t4n6rz18y4t5x47y70o.mysql.singapore.rds.aliyuncs.com
MYSQL_PORT=3036
MYSQL_USER=BrandCultivation
MYSQL_PASSWORD=your_mysql_password_here
MYSQL_DB=brand_cultivation
# Redis
REDIS_HOST=r-t4nb4n9i8je7u6ogk1pd.redis.singapore.rds.aliyuncs.com
REDIS_PORT=5000
REDIS_PASSWORD=your_redis_password_here
REDIS_DB=10
# Logging
LOG_LEVEL=INFO
# File Service
FILE_UPLOAD_URL=http://file-center.jcpt:8080/file/fileUpload
FILE_DOWNLOAD_URL=http://file-center.jcpt:8080/file/fileDownload
[ ] Step 3: 更新 config/config.py 为兼容层
保留旧接口以兼容未迁移的代码:
from core.config import settings
def load_config():
return {
"mysql": {
"host": settings.mysql_host,
"port": settings.mysql_port,
"user": settings.mysql_user,
"passwd": settings.mysql_password,
"db": settings.mysql_db,
},
"redis": {
"host": settings.redis_host,
"port": settings.redis_port,
"passwd": settings.redis_password,
"db": settings.redis_db,
},
}
def load_model_config():
return settings.model_config
def load_service_config():
return {
"aliyun": {
"upload_url": settings.file_upload_url,
"download_url": settings.file_download_url,
}
}
Run: cd /home/yangzeyu/project/BrandCultivation && grep -q "^\.env$" .gitignore 2>/dev/null || echo ".env" >> .gitignore
[ ] Step 5: Commit
git add config/database_config.yaml .env.example config/config.py .gitignore
git commit -m "security: remove passwords from yaml, add .env.example"
Files: None (verification only)
Run:
cd /home/yangzeyu/project/BrandCultivation && python -c "
from core import get_logger, settings, AppException, DatabaseException, RequestLoggingMiddleware
from database.db.mysql import MySqlDatabaseHelper
from database.db.redis_db import RedisDatabaseHelper
from database.dao.mysql_dao import MySqlDao
from api.recommend import router as rec_router
from api.report import router as rep_router
from api.eval_report import router as eval_router
from models.recommend import Recommend
from utils.file_stream import FileStreamUtils
from utils.report_utils import ReportUtils
print('All imports successful')
"
Expected: All imports successful
Run: cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('run_api.py').read()); print('App syntax OK')"
Expected: App syntax OK
[ ] Step 3: 最终 Commit(如有遗漏文件)
git status
# 如有未提交的改动,补充提交