|
|
@@ -0,0 +1,1316 @@
|
|
|
+# BrandCultivation 项目级重构实现计划
|
|
|
+
|
|
|
+> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|
|
+
|
|
|
+**Goal:** 为 BrandCultivation 项目添加日志系统、配置管理、异常处理、请求追踪,修复已知 bug,移除明文密码。
|
|
|
+
|
|
|
+**Architecture:** 渐进式重构 — 新增 `core/` 基础设施层,在现有模块中逐步替换 print 为 logger,添加错误处理和请求追踪。保持现有目录结构和业务逻辑不变。
|
|
|
+
|
|
|
+**Tech Stack:** Python 3.x, FastAPI, SQLAlchemy, Redis, logging (stdlib), pathlib, contextvars, uuid
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+## File Structure
|
|
|
+
|
|
|
+### New Files
|
|
|
+- `core/__init__.py` — 公共接口导出
|
|
|
+- `core/logging.py` — JSON 格式日志系统
|
|
|
+- `core/config.py` — 配置管理(YAML + 环境变量)
|
|
|
+- `core/exceptions.py` — 自定义异常体系
|
|
|
+- `core/middleware.py` — 请求日志和 request_id 中间件
|
|
|
+- `.env.example` — 环境变量模板
|
|
|
+
|
|
|
+### Modified Files
|
|
|
+- `config/config.py` — 废弃,改为从 core/config.py 导入
|
|
|
+- `config/database_config.yaml` — 移除密码
|
|
|
+- `database/db/mysql.py` — 日志、session context manager、配置来源
|
|
|
+- `database/db/redis_db.py` — 日志、配置来源、连接池
|
|
|
+- `database/__init__.py` — 更新导出
|
|
|
+- `database/dao/mysql_dao.py` — 日志、异常处理
|
|
|
+- `run_api.py` — 注册中间件、健康检查、异常处理器
|
|
|
+- `api/recommend.py` — 日志、错误处理
|
|
|
+- `api/eval_report.py` — 日志、错误处理
|
|
|
+- `api/report.py` — 日志
|
|
|
+- `models/rank/gbdt_lr_inference.py` — bug 修复、日志
|
|
|
+- `models/recommend.py` — 日志
|
|
|
+- `models/recall/hot_recall.py` — 替换 print
|
|
|
+- `models/recall/itemCF/ItemCF.py` — 替换 print
|
|
|
+- `models/item2vec/inference.py` — 日志
|
|
|
+- `models/rank/data/preprocess.py` — 替换 print
|
|
|
+- `utils/file_stream.py` — 日志、错误处理
|
|
|
+- `utils/report_utils.py` — 日志
|
|
|
+- `train.py` — 替换 print
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+### Task 1: 创建 `core/exceptions.py` — 自定义异常体系
|
|
|
+
|
|
|
+**Files:**
|
|
|
+- Create: `core/exceptions.py`
|
|
|
+
|
|
|
+- [ ] **Step 1: 创建异常定义文件**
|
|
|
+
|
|
|
+```python
|
|
|
+class AppException(Exception):
|
|
|
+ """应用异常基类"""
|
|
|
+ def __init__(self, code: int, message: str, detail: str = None):
|
|
|
+ self.code = code
|
|
|
+ self.message = message
|
|
|
+ self.detail = detail
|
|
|
+ super().__init__(message)
|
|
|
+
|
|
|
+
|
|
|
+class DatabaseException(AppException):
|
|
|
+ """数据库操作失败"""
|
|
|
+ def __init__(self, message: str = "数据库操作失败", detail: str = None):
|
|
|
+ super().__init__(code=500, message=message, detail=detail)
|
|
|
+
|
|
|
+
|
|
|
+class ModelException(AppException):
|
|
|
+ """模型推理失败"""
|
|
|
+ def __init__(self, message: str = "模型推理失败", detail: str = None):
|
|
|
+ super().__init__(code=500, message=message, detail=detail)
|
|
|
+
|
|
|
+
|
|
|
+class FileServiceException(AppException):
|
|
|
+ """文件服务失败"""
|
|
|
+ def __init__(self, message: str = "文件服务操作失败", detail: str = None):
|
|
|
+ super().__init__(code=500, message=message, detail=detail)
|
|
|
+
|
|
|
+
|
|
|
+class ValidationException(AppException):
|
|
|
+ """业务校验失败"""
|
|
|
+ def __init__(self, message: str = "参数校验失败", detail: str = None):
|
|
|
+ super().__init__(code=400, message=message, detail=detail)
|
|
|
+```
|
|
|
+
|
|
|
+- [ ] **Step 2: 验证导入**
|
|
|
+
|
|
|
+Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "from core.exceptions import AppException, DatabaseException, ModelException, FileServiceException, ValidationException; print('OK')"`
|
|
|
+Expected: `OK`
|
|
|
+
|
|
|
+- [ ] **Step 3: Commit**
|
|
|
+
|
|
|
+```bash
|
|
|
+git add core/exceptions.py
|
|
|
+git commit -m "feat(core): add custom exception hierarchy"
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+### Task 2: 创建 `core/config.py` — 配置管理
|
|
|
+
|
|
|
+**Files:**
|
|
|
+- Create: `core/config.py`
|
|
|
+
|
|
|
+- [ ] **Step 1: 创建配置管理模块**
|
|
|
+
|
|
|
+```python
|
|
|
+import os
|
|
|
+from pathlib import Path
|
|
|
+import yaml
|
|
|
+
|
|
|
+PROJECT_ROOT = Path(__file__).resolve().parent.parent
|
|
|
+
|
|
|
+
|
|
|
+def _get_env(key: str, default=None):
|
|
|
+ """从环境变量获取值"""
|
|
|
+ return os.environ.get(key, default)
|
|
|
+
|
|
|
+
|
|
|
+def _load_yaml(filename: str) -> dict:
|
|
|
+ """加载 YAML 配置文件"""
|
|
|
+ filepath = PROJECT_ROOT / "config" / filename
|
|
|
+ with open(filepath, encoding="utf-8") as f:
|
|
|
+ return yaml.safe_load(f) or {}
|
|
|
+
|
|
|
+
|
|
|
+class _Settings:
|
|
|
+ """配置单例,支持环境变量覆盖 YAML 默认值"""
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self._db_cfg = _load_yaml("database_config.yaml")
|
|
|
+ self._model_cfg = _load_yaml("model_config.yaml")
|
|
|
+ self._service_cfg = _load_yaml("service_config.yaml")
|
|
|
+
|
|
|
+ @property
|
|
|
+ def mysql_host(self) -> str:
|
|
|
+ return _get_env("MYSQL_HOST", self._db_cfg.get("mysql", {}).get("host", "localhost"))
|
|
|
+
|
|
|
+ @property
|
|
|
+ def mysql_port(self) -> int:
|
|
|
+ return int(_get_env("MYSQL_PORT", self._db_cfg.get("mysql", {}).get("port", 3306)))
|
|
|
+
|
|
|
+ @property
|
|
|
+ def mysql_user(self) -> str:
|
|
|
+ return _get_env("MYSQL_USER", self._db_cfg.get("mysql", {}).get("user", "root"))
|
|
|
+
|
|
|
+ @property
|
|
|
+ def mysql_password(self) -> str:
|
|
|
+ return _get_env("MYSQL_PASSWORD", self._db_cfg.get("mysql", {}).get("passwd", ""))
|
|
|
+
|
|
|
+ @property
|
|
|
+ def mysql_db(self) -> str:
|
|
|
+ return _get_env("MYSQL_DB", self._db_cfg.get("mysql", {}).get("db", ""))
|
|
|
+
|
|
|
+ @property
|
|
|
+ def redis_host(self) -> str:
|
|
|
+ return _get_env("REDIS_HOST", self._db_cfg.get("redis", {}).get("host", "localhost"))
|
|
|
+
|
|
|
+ @property
|
|
|
+ def redis_port(self) -> int:
|
|
|
+ return int(_get_env("REDIS_PORT", self._db_cfg.get("redis", {}).get("port", 6379)))
|
|
|
+
|
|
|
+ @property
|
|
|
+ def redis_password(self) -> str:
|
|
|
+ return _get_env("REDIS_PASSWORD", self._db_cfg.get("redis", {}).get("passwd", ""))
|
|
|
+
|
|
|
+ @property
|
|
|
+ def redis_db(self) -> int:
|
|
|
+ return int(_get_env("REDIS_DB", self._db_cfg.get("redis", {}).get("db", 0)))
|
|
|
+
|
|
|
+ @property
|
|
|
+ def log_level(self) -> str:
|
|
|
+ return _get_env("LOG_LEVEL", "INFO").upper()
|
|
|
+
|
|
|
+ @property
|
|
|
+ def file_upload_url(self) -> str:
|
|
|
+ return _get_env("FILE_UPLOAD_URL", self._service_cfg.get("aliyun", {}).get("upload_url", ""))
|
|
|
+
|
|
|
+ @property
|
|
|
+ def file_download_url(self) -> str:
|
|
|
+ return _get_env("FILE_DOWNLOAD_URL", self._service_cfg.get("aliyun", {}).get("download_url", ""))
|
|
|
+
|
|
|
+ @property
|
|
|
+ def model_config(self) -> dict:
|
|
|
+ return self._model_cfg
|
|
|
+
|
|
|
+
|
|
|
+settings = _Settings()
|
|
|
+```
|
|
|
+
|
|
|
+- [ ] **Step 2: 验证导入**
|
|
|
+
|
|
|
+Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "from core.config import settings; print(settings.mysql_host)"`
|
|
|
+Expected: 输出数据库 host 地址
|
|
|
+
|
|
|
+- [ ] **Step 3: Commit**
|
|
|
+
|
|
|
+```bash
|
|
|
+git add core/config.py
|
|
|
+git commit -m "feat(core): add config management with env var override"
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+### Task 3: 创建 `core/logging.py` — 日志系统
|
|
|
+
|
|
|
+**Files:**
|
|
|
+- Create: `core/logging.py`
|
|
|
+
|
|
|
+- [ ] **Step 1: 创建日志模块**
|
|
|
+
|
|
|
+```python
|
|
|
+import logging
|
|
|
+import json
|
|
|
+import sys
|
|
|
+from contextvars import ContextVar
|
|
|
+from datetime import datetime, timezone
|
|
|
+
|
|
|
+request_id_var: ContextVar[str] = ContextVar("request_id", default="-")
|
|
|
+
|
|
|
+
|
|
|
+class JSONFormatter(logging.Formatter):
|
|
|
+ """JSON 格式日志输出"""
|
|
|
+
|
|
|
+ def format(self, record: logging.LogRecord) -> str:
|
|
|
+ log_data = {
|
|
|
+ "timestamp": datetime.now(timezone.utc).isoformat(),
|
|
|
+ "level": record.levelname,
|
|
|
+ "module": record.module,
|
|
|
+ "function": record.funcName,
|
|
|
+ "line": record.lineno,
|
|
|
+ "message": record.getMessage(),
|
|
|
+ "request_id": request_id_var.get("-"),
|
|
|
+ }
|
|
|
+ if record.exc_info and record.exc_info[0] is not None:
|
|
|
+ log_data["exception"] = self.formatException(record.exc_info)
|
|
|
+ if hasattr(record, "extra_data"):
|
|
|
+ log_data["extra"] = record.extra_data
|
|
|
+ return json.dumps(log_data, ensure_ascii=False)
|
|
|
+
|
|
|
+
|
|
|
+def get_logger(name: str) -> logging.Logger:
|
|
|
+ """获取指定名称的 logger"""
|
|
|
+ from core.config import settings
|
|
|
+
|
|
|
+ logger = logging.getLogger(name)
|
|
|
+ if not logger.handlers:
|
|
|
+ handler = logging.StreamHandler(sys.stdout)
|
|
|
+ handler.setFormatter(JSONFormatter())
|
|
|
+ logger.addHandler(handler)
|
|
|
+ logger.setLevel(getattr(logging, settings.log_level, logging.INFO))
|
|
|
+ logger.propagate = False
|
|
|
+ return logger
|
|
|
+```
|
|
|
+
|
|
|
+- [ ] **Step 2: 验证导入和基本功能**
|
|
|
+
|
|
|
+Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "from core.logging import get_logger; logger = get_logger('test'); logger.info('hello')"`
|
|
|
+Expected: JSON 格式日志输出
|
|
|
+
|
|
|
+- [ ] **Step 3: Commit**
|
|
|
+
|
|
|
+```bash
|
|
|
+git add core/logging.py
|
|
|
+git commit -m "feat(core): add JSON logging system with request_id support"
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+### Task 4: 创建 `core/middleware.py` — 请求中间件
|
|
|
+
|
|
|
+**Files:**
|
|
|
+- Create: `core/middleware.py`
|
|
|
+
|
|
|
+- [ ] **Step 1: 创建中间件模块**
|
|
|
+
|
|
|
+```python
|
|
|
+import time
|
|
|
+import uuid
|
|
|
+from starlette.middleware.base import BaseHTTPMiddleware
|
|
|
+from starlette.requests import Request
|
|
|
+from starlette.responses import Response
|
|
|
+from core.logging import get_logger, request_id_var
|
|
|
+
|
|
|
+logger = get_logger("middleware")
|
|
|
+
|
|
|
+
|
|
|
+def get_request_id() -> str:
|
|
|
+ """获取当前请求的 request_id"""
|
|
|
+ return request_id_var.get("-")
|
|
|
+
|
|
|
+
|
|
|
+class RequestLoggingMiddleware(BaseHTTPMiddleware):
|
|
|
+ """请求日志中间件:生成 request_id,记录请求开始/结束"""
|
|
|
+
|
|
|
+ async def dispatch(self, request: Request, call_next) -> Response:
|
|
|
+ req_id = str(uuid.uuid4())[:8]
|
|
|
+ request_id_var.set(req_id)
|
|
|
+
|
|
|
+ start_time = time.time()
|
|
|
+ client_ip = request.client.host if request.client else "unknown"
|
|
|
+
|
|
|
+ logger.info(
|
|
|
+ f"Request started: {request.method} {request.url.path}",
|
|
|
+ extra={"extra_data": {"client_ip": client_ip, "method": request.method, "path": str(request.url.path)}},
|
|
|
+ )
|
|
|
+
|
|
|
+ try:
|
|
|
+ response = await call_next(request)
|
|
|
+ except Exception as e:
|
|
|
+ duration_ms = (time.time() - start_time) * 1000
|
|
|
+ logger.error(
|
|
|
+ f"Request failed: {request.method} {request.url.path} ({duration_ms:.1f}ms)",
|
|
|
+ exc_info=True,
|
|
|
+ )
|
|
|
+ raise
|
|
|
+
|
|
|
+ duration_ms = (time.time() - start_time) * 1000
|
|
|
+ logger.info(
|
|
|
+ f"Request completed: {request.method} {request.url.path} -> {response.status_code} ({duration_ms:.1f}ms)",
|
|
|
+ extra={"extra_data": {"status_code": response.status_code, "duration_ms": round(duration_ms, 1)}},
|
|
|
+ )
|
|
|
+
|
|
|
+ response.headers["X-Request-ID"] = req_id
|
|
|
+ return response
|
|
|
+```
|
|
|
+
|
|
|
+- [ ] **Step 2: 验证导入**
|
|
|
+
|
|
|
+Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "from core.middleware import RequestLoggingMiddleware, get_request_id; print('OK')"`
|
|
|
+Expected: `OK`
|
|
|
+
|
|
|
+- [ ] **Step 3: Commit**
|
|
|
+
|
|
|
+```bash
|
|
|
+git add core/middleware.py
|
|
|
+git commit -m "feat(core): add request logging middleware with request_id tracking"
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+### Task 5: 创建 `core/__init__.py` — 公共接口导出
|
|
|
+
|
|
|
+**Files:**
|
|
|
+- Create: `core/__init__.py`
|
|
|
+
|
|
|
+- [ ] **Step 1: 创建 init 文件**
|
|
|
+
|
|
|
+```python
|
|
|
+from core.logging import get_logger, request_id_var
|
|
|
+from core.config import settings
|
|
|
+from core.exceptions import (
|
|
|
+ AppException,
|
|
|
+ DatabaseException,
|
|
|
+ ModelException,
|
|
|
+ FileServiceException,
|
|
|
+ ValidationException,
|
|
|
+)
|
|
|
+from core.middleware import RequestLoggingMiddleware, get_request_id
|
|
|
+
|
|
|
+__all__ = [
|
|
|
+ "get_logger",
|
|
|
+ "request_id_var",
|
|
|
+ "settings",
|
|
|
+ "AppException",
|
|
|
+ "DatabaseException",
|
|
|
+ "ModelException",
|
|
|
+ "FileServiceException",
|
|
|
+ "ValidationException",
|
|
|
+ "RequestLoggingMiddleware",
|
|
|
+ "get_request_id",
|
|
|
+]
|
|
|
+```
|
|
|
+
|
|
|
+- [ ] **Step 2: 验证完整导入**
|
|
|
+
|
|
|
+Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "from core import get_logger, settings, AppException, DatabaseException, RequestLoggingMiddleware, get_request_id; print('All imports OK')"`
|
|
|
+Expected: `All imports OK`
|
|
|
+
|
|
|
+- [ ] **Step 3: Commit**
|
|
|
+
|
|
|
+```bash
|
|
|
+git add core/__init__.py
|
|
|
+git commit -m "feat(core): add public interface exports"
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+### Task 6: 改进 `database/db/mysql.py` — 日志、session 管理、配置来源
|
|
|
+
|
|
|
+**Files:**
|
|
|
+- Modify: `database/db/mysql.py`
|
|
|
+
|
|
|
+- [ ] **Step 1: 重写 mysql.py**
|
|
|
+
|
|
|
+替换整个文件内容为:
|
|
|
+
|
|
|
+```python
|
|
|
+from contextlib import contextmanager
|
|
|
+from core import get_logger, settings, DatabaseException
|
|
|
+import pandas as pd
|
|
|
+from sqlalchemy import create_engine, text
|
|
|
+from sqlalchemy.orm import sessionmaker
|
|
|
+from sqlalchemy.exc import SQLAlchemyError
|
|
|
+
|
|
|
+logger = get_logger("database.mysql")
|
|
|
+
|
|
|
+
|
|
|
+class MySqlDatabaseHelper:
|
|
|
+ _instance = None
|
|
|
+
|
|
|
+ def __new__(cls):
|
|
|
+ if not cls._instance:
|
|
|
+ cls._instance = super(MySqlDatabaseHelper, cls).__new__(cls)
|
|
|
+ cls._instance._initialized = False
|
|
|
+ return cls._instance
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ if self._initialized:
|
|
|
+ return
|
|
|
+ self._connect_database()
|
|
|
+ self._initialized = True
|
|
|
+
|
|
|
+ def _connect_database(self):
|
|
|
+ try:
|
|
|
+ conn_str = (
|
|
|
+ f"mysql+pymysql://{settings.mysql_user}:{settings.mysql_password}"
|
|
|
+ f"@{settings.mysql_host}:{settings.mysql_port}/{settings.mysql_db}"
|
|
|
+ )
|
|
|
+ self.engine = create_engine(
|
|
|
+ conn_str,
|
|
|
+ pool_size=20,
|
|
|
+ max_overflow=30,
|
|
|
+ pool_recycle=1800,
|
|
|
+ pool_pre_ping=True,
|
|
|
+ isolation_level="READ COMMITTED",
|
|
|
+ )
|
|
|
+ self._DBSession = sessionmaker(bind=self.engine)
|
|
|
+ logger.info("MySQL connection pool created", extra={"extra_data": {"host": settings.mysql_host, "db": settings.mysql_db}})
|
|
|
+ except Exception as e:
|
|
|
+ logger.error("Failed to create MySQL connection", exc_info=True)
|
|
|
+ raise DatabaseException(message="数据库连接失败", detail=str(e))
|
|
|
+
|
|
|
+ @contextmanager
|
|
|
+ def get_session(self):
|
|
|
+ session = self._DBSession()
|
|
|
+ try:
|
|
|
+ yield session
|
|
|
+ session.commit()
|
|
|
+ except SQLAlchemyError as e:
|
|
|
+ session.rollback()
|
|
|
+ logger.error("Database operation failed", exc_info=True)
|
|
|
+ raise DatabaseException(message="数据库操作失败", detail=str(e))
|
|
|
+ finally:
|
|
|
+ session.close()
|
|
|
+
|
|
|
+ def load_data_with_page(self, query, params, page_size=100000):
|
|
|
+ """分页查询数据"""
|
|
|
+ count_query = text(f"SELECT COUNT(*) FROM ({query}) AS _count_subq")
|
|
|
+ query += " LIMIT :limit OFFSET :offset"
|
|
|
+ query = text(query)
|
|
|
+
|
|
|
+ result = self.fetch_one(count_query, params)
|
|
|
+ total_rows = result[0] if result is not None else 0
|
|
|
+
|
|
|
+ if total_rows == 0:
|
|
|
+ logger.debug("Query returned 0 rows")
|
|
|
+ return pd.DataFrame()
|
|
|
+
|
|
|
+ logger.debug(f"Loading {total_rows} rows with page_size={page_size}")
|
|
|
+ data = pd.DataFrame()
|
|
|
+ page = 1
|
|
|
+ while True:
|
|
|
+ offset = (page - 1) * page_size
|
|
|
+ page_params = dict(params)
|
|
|
+ page_params["limit"] = page_size
|
|
|
+ page_params["offset"] = offset
|
|
|
+
|
|
|
+ df = pd.DataFrame(self.fetch_all(query, page_params))
|
|
|
+ if df.empty:
|
|
|
+ break
|
|
|
+ data = pd.concat([data, df], ignore_index=True)
|
|
|
+ page += 1
|
|
|
+
|
|
|
+ logger.debug(f"Loaded {len(data)} rows in {page - 1} pages")
|
|
|
+ return data
|
|
|
+
|
|
|
+ def fetch_all(self, query, params=None):
|
|
|
+ """执行SQL查询并返回所有结果"""
|
|
|
+ with self.get_session() as session:
|
|
|
+ results = session.execute(query, params or {}).fetchall()
|
|
|
+ return results
|
|
|
+
|
|
|
+ def fetch_one(self, query, params=None):
|
|
|
+ """执行SQL查询并返回单条结果"""
|
|
|
+ with self.get_session() as session:
|
|
|
+ result = session.execute(query, params or {}).fetchone()
|
|
|
+ return result
|
|
|
+
|
|
|
+ def insert_data(self, table_name, data_dict):
|
|
|
+ """插入单条数据到指定表"""
|
|
|
+ if not data_dict:
|
|
|
+ return 0
|
|
|
+
|
|
|
+ columns = ", ".join(data_dict.keys())
|
|
|
+ values = ", ".join([f":{key}" for key in data_dict.keys()])
|
|
|
+ query = text(f"INSERT INTO {table_name} ({columns}) VALUES ({values})")
|
|
|
+
|
|
|
+ with self.get_session() as session:
|
|
|
+ result = session.execute(query, data_dict)
|
|
|
+ logger.info(f"Inserted 1 row into {table_name}")
|
|
|
+ return result.rowcount
|
|
|
+
|
|
|
+ def update_data(self, table_name, update_dict, conditions, condition_params=None):
|
|
|
+ """更新表中符合条件的数据"""
|
|
|
+ if not update_dict:
|
|
|
+ return 0
|
|
|
+
|
|
|
+ set_clause = ", ".join([f"{key} = :{key}" for key in update_dict.keys()])
|
|
|
+
|
|
|
+ if len(conditions) == 1:
|
|
|
+ where_clause = f"WHERE {conditions[0]}"
|
|
|
+ elif len(conditions) > 1:
|
|
|
+ where_clause = f"WHERE {' AND '.join(conditions)}"
|
|
|
+ else:
|
|
|
+ where_clause = ""
|
|
|
+
|
|
|
+ query = text(f"UPDATE {table_name} SET {set_clause} {where_clause}")
|
|
|
+
|
|
|
+ params = update_dict.copy()
|
|
|
+ if condition_params:
|
|
|
+ params.update(condition_params)
|
|
|
+
|
|
|
+ with self.get_session() as session:
|
|
|
+ result = session.execute(query, params)
|
|
|
+ logger.info(f"Updated {result.rowcount} rows in {table_name}")
|
|
|
+ return result.rowcount
|
|
|
+
|
|
|
+ def execute_query(self, query, params=None):
|
|
|
+ """执行SQL语句"""
|
|
|
+ with self.get_session() as session:
|
|
|
+ session.execute(query, params or {})
|
|
|
+
|
|
|
+ def check_connection(self) -> bool:
|
|
|
+ """检查数据库连接是否正常"""
|
|
|
+ try:
|
|
|
+ self.fetch_one(text("SELECT 1"), {})
|
|
|
+ return True
|
|
|
+ except Exception:
|
|
|
+ return False
|
|
|
+```
|
|
|
+
|
|
|
+- [ ] **Step 2: 验证导入**
|
|
|
+
|
|
|
+Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "from database.db.mysql import MySqlDatabaseHelper; print('OK')"`
|
|
|
+Expected: `OK`
|
|
|
+
|
|
|
+- [ ] **Step 3: Commit**
|
|
|
+
|
|
|
+```bash
|
|
|
+git add database/db/mysql.py
|
|
|
+git commit -m "refactor(database): add logging, session context manager, env-based config"
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+### Task 7: 改进 `database/db/redis_db.py` — 日志、配置来源、连接池
|
|
|
+
|
|
|
+**Files:**
|
|
|
+- Modify: `database/db/redis_db.py`
|
|
|
+
|
|
|
+- [ ] **Step 1: 重写 redis_db.py**
|
|
|
+
|
|
|
+替换整个文件内容为:
|
|
|
+
|
|
|
+```python
|
|
|
+import redis
|
|
|
+from core import get_logger, settings, DatabaseException
|
|
|
+
|
|
|
+logger = get_logger("database.redis")
|
|
|
+
|
|
|
+
|
|
|
+class RedisDatabaseHelper:
|
|
|
+ _instance = None
|
|
|
+
|
|
|
+ def __new__(cls):
|
|
|
+ if not cls._instance:
|
|
|
+ cls._instance = super(RedisDatabaseHelper, cls).__new__(cls)
|
|
|
+ cls._instance._initialized = False
|
|
|
+ return cls._instance
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ if self._initialized:
|
|
|
+ return
|
|
|
+ try:
|
|
|
+ pool = redis.ConnectionPool(
|
|
|
+ host=settings.redis_host,
|
|
|
+ port=settings.redis_port,
|
|
|
+ password=settings.redis_password,
|
|
|
+ db=settings.redis_db,
|
|
|
+ decode_responses=True,
|
|
|
+ max_connections=50,
|
|
|
+ )
|
|
|
+ self.redis = redis.StrictRedis(connection_pool=pool)
|
|
|
+ self.redis.ping()
|
|
|
+ logger.info("Redis connection established", extra={"extra_data": {"host": settings.redis_host, "db": settings.redis_db}})
|
|
|
+ except redis.ConnectionError as e:
|
|
|
+ logger.error("Failed to connect to Redis", exc_info=True)
|
|
|
+ raise DatabaseException(message="Redis连接失败", detail=str(e))
|
|
|
+ self._initialized = True
|
|
|
+
|
|
|
+ def check_connection(self) -> bool:
|
|
|
+ """检查 Redis 连接是否正常"""
|
|
|
+ try:
|
|
|
+ self.redis.ping()
|
|
|
+ return True
|
|
|
+ except Exception:
|
|
|
+ return False
|
|
|
+```
|
|
|
+
|
|
|
+- [ ] **Step 2: 验证导入**
|
|
|
+
|
|
|
+Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "from database.db.redis_db import RedisDatabaseHelper; print('OK')"`
|
|
|
+Expected: `OK`
|
|
|
+
|
|
|
+- [ ] **Step 3: Commit**
|
|
|
+
|
|
|
+```bash
|
|
|
+git add database/db/redis_db.py
|
|
|
+git commit -m "refactor(database): redis with logging, connection pool, env config"
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+### Task 8: 改进 `database/dao/mysql_dao.py` — 日志和异常处理
|
|
|
+
|
|
|
+**Files:**
|
|
|
+- Modify: `database/dao/mysql_dao.py`
|
|
|
+
|
|
|
+- [ ] **Step 1: 添加日志和异常处理**
|
|
|
+
|
|
|
+在文件顶部添加 logger,在每个方法中添加日志记录。修复 `get_report_file_id` 的 None 处理。
|
|
|
+
|
|
|
+关键改动:
|
|
|
+- 文件顶部添加: `from core import get_logger` 和 `logger = get_logger("database.dao")`
|
|
|
+- 每个 public 方法入口添加 `logger.info(...)` 记录调用参数
|
|
|
+- `get_report_file_id` 中 result 为 None 时返回空 DataFrame 而非抛异常
|
|
|
+- 移除 `data_preprocess` 方法(不属于 DAO 层)
|
|
|
+
|
|
|
+- [ ] **Step 2: 验证导入**
|
|
|
+
|
|
|
+Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "from database.dao.mysql_dao import MySqlDao; print('OK')"`
|
|
|
+Expected: `OK`
|
|
|
+
|
|
|
+- [ ] **Step 3: Commit**
|
|
|
+
|
|
|
+```bash
|
|
|
+git add database/dao/mysql_dao.py
|
|
|
+git commit -m "refactor(dao): add logging, fix get_report_file_id null handling"
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+### Task 9: 改进 `run_api.py` — 注册中间件、健康检查、异常处理器
|
|
|
+
|
|
|
+**Files:**
|
|
|
+- Modify: `run_api.py`
|
|
|
+
|
|
|
+- [ ] **Step 1: 重写 run_api.py**
|
|
|
+
|
|
|
+```python
|
|
|
+from api import recommend_router, report_router, eval_report_router
|
|
|
+from core import get_logger, AppException, RequestLoggingMiddleware, get_request_id
|
|
|
+from core.exceptions import DatabaseException
|
|
|
+from database.db.mysql import MySqlDatabaseHelper
|
|
|
+from database.db.redis_db import RedisDatabaseHelper
|
|
|
+from fastapi import FastAPI, Request, status
|
|
|
+from fastapi.exceptions import RequestValidationError
|
|
|
+from fastapi.responses import JSONResponse
|
|
|
+
|
|
|
+import uvicorn
|
|
|
+
|
|
|
+logger = get_logger("app")
|
|
|
+
|
|
|
+app = FastAPI()
|
|
|
+
|
|
|
+app.add_middleware(RequestLoggingMiddleware)
|
|
|
+
|
|
|
+
|
|
|
+@app.exception_handler(RequestValidationError)
|
|
|
+async def validation_exception_handler(request: Request, exc: RequestValidationError):
|
|
|
+ logger.warning(f"Validation error: {exc.errors()}")
|
|
|
+ return JSONResponse(
|
|
|
+ status_code=status.HTTP_400_BAD_REQUEST,
|
|
|
+ content={
|
|
|
+ "code": 400,
|
|
|
+ "msg": "请求参数错误",
|
|
|
+ "data": {"detail": exc.errors(), "body": exc.body},
|
|
|
+ "request_id": get_request_id(),
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+@app.exception_handler(AppException)
|
|
|
+async def app_exception_handler(request: Request, exc: AppException):
|
|
|
+ logger.error(f"AppException: {exc.message}", extra={"extra_data": {"detail": exc.detail}})
|
|
|
+ return JSONResponse(
|
|
|
+ status_code=exc.code,
|
|
|
+ content={
|
|
|
+ "code": exc.code,
|
|
|
+ "msg": exc.message,
|
|
|
+ "data": {"detail": exc.detail},
|
|
|
+ "request_id": get_request_id(),
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+@app.exception_handler(Exception)
|
|
|
+async def unhandled_exception_handler(request: Request, exc: Exception):
|
|
|
+ logger.error("Unhandled exception", exc_info=True)
|
|
|
+ return JSONResponse(
|
|
|
+ status_code=500,
|
|
|
+ content={
|
|
|
+ "code": 500,
|
|
|
+ "msg": "服务器内部错误",
|
|
|
+ "data": None,
|
|
|
+ "request_id": get_request_id(),
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+@app.get("/health")
|
|
|
+async def health_check():
|
|
|
+ """健康检查端点"""
|
|
|
+ mysql_ok = False
|
|
|
+ redis_ok = False
|
|
|
+ try:
|
|
|
+ mysql_ok = MySqlDatabaseHelper().check_connection()
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+ try:
|
|
|
+ redis_ok = RedisDatabaseHelper().check_connection()
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+ healthy = mysql_ok and redis_ok
|
|
|
+ return {
|
|
|
+ "status": "healthy" if healthy else "degraded",
|
|
|
+ "mysql": "ok" if mysql_ok else "error",
|
|
|
+ "redis": "ok" if redis_ok else "error",
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+url_prefix = "/brandcultivation/api/v1"
|
|
|
+
|
|
|
+app.include_router(recommend_router, prefix=url_prefix)
|
|
|
+app.include_router(report_router, prefix=url_prefix)
|
|
|
+app.include_router(eval_report_router, prefix=url_prefix)
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ logger.info("Starting BrandCultivation API server on port 7960")
|
|
|
+ uvicorn.run(app, host="0.0.0.0", port=7960)
|
|
|
+```
|
|
|
+
|
|
|
+- [ ] **Step 2: 验证语法**
|
|
|
+
|
|
|
+Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('run_api.py').read()); print('Syntax OK')"`
|
|
|
+Expected: `Syntax OK`
|
|
|
+
|
|
|
+- [ ] **Step 3: Commit**
|
|
|
+
|
|
|
+```bash
|
|
|
+git add run_api.py
|
|
|
+git commit -m "refactor(api): add middleware, health check, global exception handlers"
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+### Task 10: 改进 `api/recommend.py` — 日志和错误处理
|
|
|
+
|
|
|
+**Files:**
|
|
|
+- Modify: `api/recommend.py`
|
|
|
+
|
|
|
+- [ ] **Step 1: 添加日志和错误处理**
|
|
|
+
|
|
|
+关键改动:
|
|
|
+- 添加 `from core import get_logger` 和 `logger = get_logger("api.recommend")`
|
|
|
+- 模型不存在时返回 404 状态码
|
|
|
+- `generate_and_upload_report` 加入 try/except + logger.error
|
|
|
+- 推荐过程添加关键日志(开始、召回数量、完成)
|
|
|
+
|
|
|
+```python
|
|
|
+from database import MySqlDao
|
|
|
+from fastapi import APIRouter, BackgroundTasks, HTTPException, status
|
|
|
+from .request_body import RecommendRequest
|
|
|
+from core import get_logger
|
|
|
+
|
|
|
+from models import Recommend
|
|
|
+import os
|
|
|
+from utils import FileStreamUtils, ReportUtils
|
|
|
+
|
|
|
+logger = get_logger("api.recommend")
|
|
|
+dao = MySqlDao()
|
|
|
+router = APIRouter()
|
|
|
+
|
|
|
+
|
|
|
+@router.post("/recommend")
|
|
|
+async def recommend(request: RecommendRequest, backgroundTasks: BackgroundTasks):
|
|
|
+ """推荐接口"""
|
|
|
+ logger.info(f"Recommend request: city={request.city_uuid}, product={request.product_code}, recall={request.recall_cust_count}")
|
|
|
+
|
|
|
+ gbdtlr_model_path = os.path.join("./models/rank/weights", request.city_uuid, "gbdtlr_model.pkl")
|
|
|
+ if not os.path.exists(gbdtlr_model_path):
|
|
|
+ logger.warning(f"Model not found: {gbdtlr_model_path}")
|
|
|
+ raise HTTPException(
|
|
|
+ status_code=status.HTTP_404_NOT_FOUND,
|
|
|
+ detail="该城市的模型未训练,请先进行训练",
|
|
|
+ )
|
|
|
+
|
|
|
+ recommend_model = Recommend(request.city_uuid)
|
|
|
+
|
|
|
+ products_in_order = dao.get_product_from_order(request.city_uuid)["product_code"].unique().tolist()
|
|
|
+ if request.product_code in products_in_order:
|
|
|
+ logger.info(f"Using GBDT-LR model for existing product {request.product_code}")
|
|
|
+ recommend_list = recommend_model.get_recommend_list_by_gbdtlr(request.product_code, recall_count=request.recall_cust_count)
|
|
|
+ else:
|
|
|
+ logger.info(f"Using Item2Vec model for new product {request.product_code}")
|
|
|
+ recommend_list = recommend_model.get_recommend_list_by_item2vec(request.product_code, recall_count=request.recall_cust_count)
|
|
|
+
|
|
|
+ recommend_data = recommend_model.get_recommend_and_delivery(recommend_list, delivery_count=request.delivery_count)
|
|
|
+ request_data = []
|
|
|
+ for index, data in enumerate(recommend_data):
|
|
|
+ request_data.append(
|
|
|
+ {
|
|
|
+ "id": index + 1,
|
|
|
+ "cust_code": data["cust_code"],
|
|
|
+ "recommend_score": data["recommend_score"],
|
|
|
+ "delivery_count": data["delivery_count"],
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ logger.info(f"Recommend completed: {len(request_data)} customers recommended")
|
|
|
+
|
|
|
+ backgroundTasks.add_task(generate_and_upload_report, request)
|
|
|
+
|
|
|
+ return {"code": 200, "msg": "success", "data": {"recommendationInfo": request_data}}
|
|
|
+
|
|
|
+
|
|
|
+def generate_and_upload_report(request: RecommendRequest):
|
|
|
+ """生成并上传报告到阿里云文件数据库"""
|
|
|
+ logger.info(f"Background task started: generating report for {request.city_uuid}/{request.product_code}")
|
|
|
+ try:
|
|
|
+ report_util = ReportUtils(request.city_uuid, request.product_code)
|
|
|
+ report_util.generate_all_data(request.recall_cust_count, request.delivery_count)
|
|
|
+
|
|
|
+ reports_dir = os.path.join("./data/reports", request.city_uuid, request.product_code)
|
|
|
+ report_files = ["卷烟信息表", "品规商户特征关系表", "相似卷烟表", "商户售卖推荐表"]
|
|
|
+ file_id_map = FileStreamUtils.upload_files(reports_dir, report_files)
|
|
|
+
|
|
|
+ if file_id_map is None:
|
|
|
+ logger.error(f"Report upload failed for {request.city_uuid}/{request.product_code}")
|
|
|
+ return
|
|
|
+
|
|
|
+ data_dict = {
|
|
|
+ "cultivacation_id": request.cultivacation_id,
|
|
|
+ "city_uuid": request.city_uuid,
|
|
|
+ "limit_cycle_name": request.limit_cycle_name,
|
|
|
+ "product_code": request.product_code,
|
|
|
+ "product_info_table": file_id_map.get("卷烟信息表"),
|
|
|
+ "relation_table": file_id_map.get("品规商户特征关系表"),
|
|
|
+ "similarity_product_table": file_id_map.get("相似卷烟表"),
|
|
|
+ "recommend_table": file_id_map.get("商户售卖推荐表"),
|
|
|
+ }
|
|
|
+ dao.insert_report(data_dict)
|
|
|
+ logger.info(f"Background task completed: report uploaded for {request.city_uuid}/{request.product_code}")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Background task failed: {e}", exc_info=True)
|
|
|
+```
|
|
|
+
|
|
|
+- [ ] **Step 2: 验证语法**
|
|
|
+
|
|
|
+Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('api/recommend.py').read()); print('Syntax OK')"`
|
|
|
+Expected: `Syntax OK`
|
|
|
+
|
|
|
+- [ ] **Step 3: Commit**
|
|
|
+
|
|
|
+```bash
|
|
|
+git add api/recommend.py
|
|
|
+git commit -m "refactor(api): add logging and error handling to recommend endpoint"
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+### Task 11: 改进 `api/eval_report.py` 和 `api/report.py` — 日志
|
|
|
+
|
|
|
+**Files:**
|
|
|
+- Modify: `api/eval_report.py`
|
|
|
+- Modify: `api/report.py`
|
|
|
+
|
|
|
+- [ ] **Step 1: 改进 eval_report.py**
|
|
|
+
|
|
|
+添加 `from core import get_logger` 和 `logger = get_logger("api.eval_report")`,在每个步骤添加日志。
|
|
|
+
|
|
|
+- [ ] **Step 2: 改进 report.py**
|
|
|
+
|
|
|
+添加 `from core import get_logger` 和 `logger = get_logger("api.report")`,在关键步骤添加日志。
|
|
|
+
|
|
|
+- [ ] **Step 3: 验证语法**
|
|
|
+
|
|
|
+Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('api/eval_report.py').read()); ast.parse(open('api/report.py').read()); print('OK')"`
|
|
|
+Expected: `OK`
|
|
|
+
|
|
|
+- [ ] **Step 4: Commit**
|
|
|
+
|
|
|
+```bash
|
|
|
+git add api/eval_report.py api/report.py
|
|
|
+git commit -m "refactor(api): add logging to eval_report and report endpoints"
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+### Task 12: 修复 `models/rank/gbdt_lr_inference.py` — Bug 修复 + 日志
|
|
|
+
|
|
|
+**Files:**
|
|
|
+- Modify: `models/rank/gbdt_lr_inference.py`
|
|
|
+
|
|
|
+- [ ] **Step 1: 修复 get_recommend_list 方法**
|
|
|
+
|
|
|
+将 `get_recommend_list` 方法中的循环从:
|
|
|
+
|
|
|
+```python
|
|
|
+recommend_list = []
|
|
|
+for cust_id, score in zip(recall_list, scores):
|
|
|
+ recommend_list.append({cust_id: float(score)})
|
|
|
+ recommend_list.append({"cust_code": cust_id, "recommend_score": score})
|
|
|
+
|
|
|
+recommend_list = sorted(
|
|
|
+ [item for item in recommend_list if "recommend_score" in item],
|
|
|
+ key=lambda x: x["recommend_score"],
|
|
|
+ reverse=True
|
|
|
+)
|
|
|
+```
|
|
|
+
|
|
|
+修改为:
|
|
|
+
|
|
|
+```python
|
|
|
+recommend_list = [
|
|
|
+ {"cust_code": cust_id, "recommend_score": float(score)}
|
|
|
+ for cust_id, score in zip(recall_list, scores)
|
|
|
+]
|
|
|
+recommend_list.sort(key=lambda x: x["recommend_score"], reverse=True)
|
|
|
+```
|
|
|
+
|
|
|
+- [ ] **Step 2: 添加日志**
|
|
|
+
|
|
|
+在文件顶部添加 `from core import get_logger` 和 `logger = get_logger("models.rank.gbdtlr")`。
|
|
|
+在 `get_recommend_list` 中添加推理耗时日志。
|
|
|
+在 `generate_shap_interance` 中替换 print 为 logger。
|
|
|
+
|
|
|
+- [ ] **Step 3: 验证语法**
|
|
|
+
|
|
|
+Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('models/rank/gbdt_lr_inference.py').read()); print('OK')"`
|
|
|
+Expected: `OK`
|
|
|
+
|
|
|
+- [ ] **Step 4: Commit**
|
|
|
+
|
|
|
+```bash
|
|
|
+git add models/rank/gbdt_lr_inference.py
|
|
|
+git commit -m "fix(models): fix double-append bug in get_recommend_list, add logging"
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+### Task 13: 改进 `models/recommend.py` — 日志
|
|
|
+
|
|
|
+**Files:**
|
|
|
+- Modify: `models/recommend.py`
|
|
|
+
|
|
|
+- [ ] **Step 1: 添加日志**
|
|
|
+
|
|
|
+添加 `from core import get_logger` 和 `logger = get_logger("models.recommend")`。
|
|
|
+在关键方法中添加日志:
|
|
|
+- `_load_molde`: 模型加载完成
|
|
|
+- `get_recal_cust`: 召回数量
|
|
|
+- `get_recommend_list_by_gbdtlr`: 开始/完成 + 耗时
|
|
|
+- `get_recommend_list_by_item2vec`: 开始/完成 + 耗时
|
|
|
+- `get_recommend_and_delivery`: 投放分配完成
|
|
|
+
|
|
|
+- [ ] **Step 2: 验证语法**
|
|
|
+
|
|
|
+Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('models/recommend.py').read()); print('OK')"`
|
|
|
+Expected: `OK`
|
|
|
+
|
|
|
+- [ ] **Step 3: Commit**
|
|
|
+
|
|
|
+```bash
|
|
|
+git add models/recommend.py
|
|
|
+git commit -m "refactor(models): add logging to recommend module"
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+### Task 14: 改进召回模块 — 替换 print 为 logger
|
|
|
+
|
|
|
+**Files:**
|
|
|
+- Modify: `models/recall/hot_recall.py`
|
|
|
+- Modify: `models/recall/itemCF/ItemCF.py`
|
|
|
+- Modify: `models/item2vec/inference.py`
|
|
|
+
|
|
|
+- [ ] **Step 1: 改进 hot_recall.py**
|
|
|
+
|
|
|
+替换 `print("hot_recall: ...")` 为 `logger.info(...)`。添加 `from core import get_logger` 和 `logger = get_logger("models.recall.hot")`。
|
|
|
+
|
|
|
+- [ ] **Step 2: 改进 ItemCF.py**
|
|
|
+
|
|
|
+替换 `print(...)` 为 `logger.info(...)`。添加 `from core import get_logger` 和 `logger = get_logger("models.recall.itemcf")`。
|
|
|
+
|
|
|
+- [ ] **Step 3: 改进 inference.py (item2vec)**
|
|
|
+
|
|
|
+添加 `from core import get_logger` 和 `logger = get_logger("models.item2vec")`。在关键步骤添加日志。
|
|
|
+
|
|
|
+- [ ] **Step 4: 验证语法**
|
|
|
+
|
|
|
+Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('models/recall/hot_recall.py').read()); ast.parse(open('models/recall/itemCF/ItemCF.py').read()); ast.parse(open('models/item2vec/inference.py').read()); print('OK')"`
|
|
|
+Expected: `OK`
|
|
|
+
|
|
|
+- [ ] **Step 5: Commit**
|
|
|
+
|
|
|
+```bash
|
|
|
+git add models/recall/hot_recall.py models/recall/itemCF/ItemCF.py models/item2vec/inference.py
|
|
|
+git commit -m "refactor(models): replace print with logger in recall modules"
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+### Task 15: 改进 `utils/file_stream.py` — 日志和错误处理
|
|
|
+
|
|
|
+**Files:**
|
|
|
+- Modify: `utils/file_stream.py`
|
|
|
+
|
|
|
+- [ ] **Step 1: 添加日志和改进错误处理**
|
|
|
+
|
|
|
+添加 `from core import get_logger` 和 `logger = get_logger("utils.file_stream")`。
|
|
|
+- 上传时记录 file_id 和耗时
|
|
|
+- 下载时记录 HTTP 状态码
|
|
|
+- 失败时记录具体错误信息(替换 print)
|
|
|
+- 使用 `settings` 获取 URL 配置
|
|
|
+
|
|
|
+```python
|
|
|
+import time
|
|
|
+from core import get_logger, settings
|
|
|
+from core.exceptions import FileServiceException
|
|
|
+from io import BytesIO
|
|
|
+import os
|
|
|
+import pandas as pd
|
|
|
+import requests
|
|
|
+
|
|
|
+logger = get_logger("utils.file_stream")
|
|
|
+
|
|
|
+
|
|
|
+class FileStreamUtils:
|
|
|
+ upload_url = settings.file_upload_url
|
|
|
+ download_url = settings.file_download_url
|
|
|
+ headers = {
|
|
|
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
|
|
|
+ "Accept": "*/*",
|
|
|
+ }
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def upload_files(reports_dir, files):
|
|
|
+ files_id = {}
|
|
|
+ for filename in files:
|
|
|
+ file_path = os.path.join(reports_dir, f"{filename}.xlsx")
|
|
|
+ start_time = time.time()
|
|
|
+ try:
|
|
|
+ with open(file_path, "rb") as f:
|
|
|
+ upload_files = {"file": (os.path.basename(file_path), f)}
|
|
|
+ response = requests.post(
|
|
|
+ FileStreamUtils.upload_url,
|
|
|
+ headers=FileStreamUtils.headers,
|
|
|
+ files=upload_files,
|
|
|
+ verify=True,
|
|
|
+ )
|
|
|
+ duration_ms = (time.time() - start_time) * 1000
|
|
|
+ if response.json().get("success"):
|
|
|
+ file_id = response.json()["data"]["file_info"]["fileid"]
|
|
|
+ files_id[filename] = file_id
|
|
|
+ logger.info(f"File uploaded: {filename} -> {file_id} ({duration_ms:.0f}ms)")
|
|
|
+ else:
|
|
|
+ logger.error(f"Upload failed for {filename}: {response.text}")
|
|
|
+ return None
|
|
|
+ except requests.exceptions.RequestException as e:
|
|
|
+ logger.error(f"Upload request error for {filename}: {e}", exc_info=True)
|
|
|
+ return None
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Upload error for {filename}: {e}", exc_info=True)
|
|
|
+ return None
|
|
|
+ return files_id
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def download_file(file_id, file_type="xlsx"):
|
|
|
+ """通过file_id从阿里云文件数据库下载文件"""
|
|
|
+ start_time = time.time()
|
|
|
+ try:
|
|
|
+ response = requests.get(
|
|
|
+ f"{FileStreamUtils.download_url}/{file_id}",
|
|
|
+ headers=FileStreamUtils.headers,
|
|
|
+ verify=True,
|
|
|
+ )
|
|
|
+ duration_ms = (time.time() - start_time) * 1000
|
|
|
+
|
|
|
+ if response.status_code == 200:
|
|
|
+ file_content = BytesIO(response.content)
|
|
|
+ if file_type == "xlsx":
|
|
|
+ data = pd.read_excel(file_content, engine="openpyxl")
|
|
|
+ elif file_type == "csv":
|
|
|
+ data = pd.read_csv(file_content)
|
|
|
+ else:
|
|
|
+ raise ValueError(f"不支持的文件类型:{file_type}")
|
|
|
+ logger.info(f"File downloaded: {file_id} ({duration_ms:.0f}ms, {len(response.content)} bytes)")
|
|
|
+ return data
|
|
|
+ else:
|
|
|
+ logger.error(f"Download failed: file_id={file_id}, status={response.status_code}")
|
|
|
+ return None
|
|
|
+ except requests.exceptions.RequestException as e:
|
|
|
+ logger.error(f"Download request error: file_id={file_id}, error={e}", exc_info=True)
|
|
|
+ return None
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Download error: file_id={file_id}, error={e}", exc_info=True)
|
|
|
+ return None
|
|
|
+```
|
|
|
+
|
|
|
+- [ ] **Step 2: 验证语法**
|
|
|
+
|
|
|
+Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('utils/file_stream.py').read()); print('OK')"`
|
|
|
+Expected: `OK`
|
|
|
+
|
|
|
+- [ ] **Step 3: Commit**
|
|
|
+
|
|
|
+```bash
|
|
|
+git add utils/file_stream.py
|
|
|
+git commit -m "refactor(utils): add logging and error details to file_stream"
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+### Task 16: 改进 `utils/report_utils.py` 和 `train.py` — 日志
|
|
|
+
|
|
|
+**Files:**
|
|
|
+- Modify: `utils/report_utils.py`
|
|
|
+- Modify: `train.py`
|
|
|
+- Modify: `models/rank/data/preprocess.py`
|
|
|
+
|
|
|
+- [ ] **Step 1: 改进 report_utils.py**
|
|
|
+
|
|
|
+添加 `from core import get_logger` 和 `logger = get_logger("utils.report")`。
|
|
|
+每个 `generate_*` 方法添加开始/完成日志。
|
|
|
+
|
|
|
+- [ ] **Step 2: 改进 train.py**
|
|
|
+
|
|
|
+替换所有 `print(...)` 为 `logger.info(...)`。添加 `from core import get_logger` 和 `logger = get_logger("train")`。
|
|
|
+
|
|
|
+- [ ] **Step 3: 改进 preprocess.py**
|
|
|
+
|
|
|
+替换 `print("gbdr-lr: ...")` 为 `logger.info(...)`。添加 `from core import get_logger` 和 `logger = get_logger("models.rank.preprocess")`。
|
|
|
+
|
|
|
+- [ ] **Step 4: 验证语法**
|
|
|
+
|
|
|
+Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('utils/report_utils.py').read()); ast.parse(open('train.py').read()); ast.parse(open('models/rank/data/preprocess.py').read()); print('OK')"`
|
|
|
+Expected: `OK`
|
|
|
+
|
|
|
+- [ ] **Step 5: Commit**
|
|
|
+
|
|
|
+```bash
|
|
|
+git add utils/report_utils.py train.py models/rank/data/preprocess.py
|
|
|
+git commit -m "refactor: replace print with logger in report_utils, train, preprocess"
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+### Task 17: 配置文件变更 — 移除密码、创建 .env.example
|
|
|
+
|
|
|
+**Files:**
|
|
|
+- Modify: `config/database_config.yaml`
|
|
|
+- Create: `.env.example`
|
|
|
+- Modify: `config/config.py`
|
|
|
+
|
|
|
+- [ ] **Step 1: 更新 database_config.yaml(移除密码)**
|
|
|
+
|
|
|
+```yaml
|
|
|
+mysql:
|
|
|
+ host: 'rm-t4n6rz18y4t5x47y70o.mysql.singapore.rds.aliyuncs.com'
|
|
|
+ port: 3036
|
|
|
+ db: 'brand_cultivation'
|
|
|
+ user: 'BrandCultivation'
|
|
|
+ # passwd 已移至环境变量 MYSQL_PASSWORD
|
|
|
+
|
|
|
+redis:
|
|
|
+ host: 'r-t4nb4n9i8je7u6ogk1pd.redis.singapore.rds.aliyuncs.com'
|
|
|
+ port: 5000
|
|
|
+ db: 10
|
|
|
+ # passwd 已移至环境变量 REDIS_PASSWORD
|
|
|
+```
|
|
|
+
|
|
|
+- [ ] **Step 2: 创建 .env.example**
|
|
|
+
|
|
|
+```bash
|
|
|
+# BrandCultivation 环境变量配置
|
|
|
+# 复制此文件为 .env 并填入实际值
|
|
|
+
|
|
|
+# MySQL
|
|
|
+MYSQL_HOST=rm-t4n6rz18y4t5x47y70o.mysql.singapore.rds.aliyuncs.com
|
|
|
+MYSQL_PORT=3036
|
|
|
+MYSQL_USER=BrandCultivation
|
|
|
+MYSQL_PASSWORD=your_mysql_password_here
|
|
|
+MYSQL_DB=brand_cultivation
|
|
|
+
|
|
|
+# Redis
|
|
|
+REDIS_HOST=r-t4nb4n9i8je7u6ogk1pd.redis.singapore.rds.aliyuncs.com
|
|
|
+REDIS_PORT=5000
|
|
|
+REDIS_PASSWORD=your_redis_password_here
|
|
|
+REDIS_DB=10
|
|
|
+
|
|
|
+# Logging
|
|
|
+LOG_LEVEL=INFO
|
|
|
+
|
|
|
+# File Service
|
|
|
+FILE_UPLOAD_URL=http://file-center.jcpt:8080/file/fileUpload
|
|
|
+FILE_DOWNLOAD_URL=http://file-center.jcpt:8080/file/fileDownload
|
|
|
+```
|
|
|
+
|
|
|
+- [ ] **Step 3: 更新 config/config.py 为兼容层**
|
|
|
+
|
|
|
+保留旧接口以兼容未迁移的代码:
|
|
|
+
|
|
|
+```python
|
|
|
+from core.config import settings
|
|
|
+
|
|
|
+def load_config():
|
|
|
+ return {
|
|
|
+ "mysql": {
|
|
|
+ "host": settings.mysql_host,
|
|
|
+ "port": settings.mysql_port,
|
|
|
+ "user": settings.mysql_user,
|
|
|
+ "passwd": settings.mysql_password,
|
|
|
+ "db": settings.mysql_db,
|
|
|
+ },
|
|
|
+ "redis": {
|
|
|
+ "host": settings.redis_host,
|
|
|
+ "port": settings.redis_port,
|
|
|
+ "passwd": settings.redis_password,
|
|
|
+ "db": settings.redis_db,
|
|
|
+ },
|
|
|
+ }
|
|
|
+
|
|
|
+def load_model_config():
|
|
|
+ return settings.model_config
|
|
|
+
|
|
|
+def load_service_config():
|
|
|
+ return {
|
|
|
+ "aliyun": {
|
|
|
+ "upload_url": settings.file_upload_url,
|
|
|
+ "download_url": settings.file_download_url,
|
|
|
+ }
|
|
|
+ }
|
|
|
+```
|
|
|
+
|
|
|
+- [ ] **Step 4: 确保 .gitignore 包含 .env**
|
|
|
+
|
|
|
+Run: `cd /home/yangzeyu/project/BrandCultivation && grep -q "^\.env$" .gitignore 2>/dev/null || echo ".env" >> .gitignore`
|
|
|
+
|
|
|
+- [ ] **Step 5: Commit**
|
|
|
+
|
|
|
+```bash
|
|
|
+git add config/database_config.yaml .env.example config/config.py .gitignore
|
|
|
+git commit -m "security: remove passwords from yaml, add .env.example"
|
|
|
+```
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+### Task 18: 最终验证
|
|
|
+
|
|
|
+**Files:** None (verification only)
|
|
|
+
|
|
|
+- [ ] **Step 1: 验证所有模块可导入**
|
|
|
+
|
|
|
+Run:
|
|
|
+```bash
|
|
|
+cd /home/yangzeyu/project/BrandCultivation && python -c "
|
|
|
+from core import get_logger, settings, AppException, DatabaseException, RequestLoggingMiddleware
|
|
|
+from database.db.mysql import MySqlDatabaseHelper
|
|
|
+from database.db.redis_db import RedisDatabaseHelper
|
|
|
+from database.dao.mysql_dao import MySqlDao
|
|
|
+from api.recommend import router as rec_router
|
|
|
+from api.report import router as rep_router
|
|
|
+from api.eval_report import router as eval_router
|
|
|
+from models.recommend import Recommend
|
|
|
+from utils.file_stream import FileStreamUtils
|
|
|
+from utils.report_utils import ReportUtils
|
|
|
+print('All imports successful')
|
|
|
+"
|
|
|
+```
|
|
|
+Expected: `All imports successful`
|
|
|
+
|
|
|
+- [ ] **Step 2: 验证 FastAPI 应用可启动(语法级别)**
|
|
|
+
|
|
|
+Run: `cd /home/yangzeyu/project/BrandCultivation && python -c "import ast; ast.parse(open('run_api.py').read()); print('App syntax OK')"`
|
|
|
+Expected: `App syntax OK`
|
|
|
+
|
|
|
+- [ ] **Step 3: 最终 Commit(如有遗漏文件)**
|
|
|
+
|
|
|
+```bash
|
|
|
+git status
|
|
|
+# 如有未提交的改动,补充提交
|
|
|
+```
|