| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- from database import MySqlDao, RedisDatabaseHelper
- from fastapi import APIRouter, BackgroundTasks, HTTPException, status
- from .request_body import RecommendRequest
- from core import get_logger
- from models import Recommend
- import os
- from utils import FileStreamUtils, ReportUtils
- logger = get_logger("api.recommend")
- dao = MySqlDao()
- redis_client = RedisDatabaseHelper().redis
- router = APIRouter()
- def _get_itemcf_key(city_uuid, product_code):
- return f"fc:{city_uuid}:{product_code}"
- @router.post("/recommend")
- async def recommend(request: RecommendRequest, backgroundTasks: BackgroundTasks):
- """推荐接口"""
- logger.info(f"Recommend request: city={request.city_uuid}, product={request.product_code}, core_custs={len(request.cust_code_list)}")
- gbdtlr_model_path = os.path.join("./models/rank/weights", request.city_uuid, "gbdtlr_model.pkl")
- if not os.path.exists(gbdtlr_model_path):
- logger.warning(f"Model not found: {gbdtlr_model_path}")
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail="该城市的模型未训练,请先进行训练",
- )
- recommend_model = Recommend(request.city_uuid)
- itemcf_key = _get_itemcf_key(request.city_uuid, request.product_code)
- if redis_client.exists(itemcf_key):
- logger.info(f"Using GBDT-LR model for product {request.product_code}, itemcf_key={itemcf_key}")
- recommend_list = recommend_model.get_recommend_list_by_gbdtlr(
- request.product_code, cust_code_list=request.cust_code_list
- )
- else:
- logger.info(f"Using Item2Vec model for product {request.product_code}, itemcf_key not found: {itemcf_key}")
- recommend_list = recommend_model.get_recommend_list_by_item2vec(
- request.product_code, cust_code_list=request.cust_code_list
- )
- request_data = []
- for index, data in enumerate(recommend_list):
- request_data.append(
- {
- "id": index + 1,
- "cust_code": data["cust_code"],
- "recommend_score": data["recommend_score"],
- }
- )
- logger.info(f"Recommend completed: {len(request_data)} customers recommended")
- backgroundTasks.add_task(generate_and_upload_report, request)
- return {"code": 200, "msg": "success", "data": {"recommendationInfo": request_data}}
- def generate_and_upload_report(request: RecommendRequest):
- """生成并上传报告到阿里云文件数据库"""
- logger.info(f"Background task started: generating report for {request.city_uuid}/{request.product_code}")
- try:
- report_util = ReportUtils(request.city_uuid, request.product_code)
- report_util.generate_all_data(request.cust_code_list)
- reports_dir = os.path.join("./data/reports", request.city_uuid, request.product_code)
- report_files = ["卷烟信息表", "品规商户特征关系表", "相似卷烟表", "商户售卖推荐表"]
- file_id_map = FileStreamUtils.upload_files(reports_dir, report_files)
- if file_id_map is None:
- logger.error(f"Report upload failed for {request.city_uuid}/{request.product_code}")
- return
- data_dict = {
- "cultivacation_id": request.cultivacation_id,
- "city_uuid": request.city_uuid,
- "limit_cycle_name": request.limit_cycle_name,
- "product_code": request.product_code,
- "product_info_table": file_id_map.get("卷烟信息表"),
- "relation_table": file_id_map.get("品规商户特征关系表"),
- "similarity_product_table": file_id_map.get("相似卷烟表"),
- "recommend_table": file_id_map.get("商户售卖推荐表"),
- }
- dao.insert_report(data_dict)
- logger.info(f"Background task completed: report uploaded for {request.city_uuid}/{request.product_code}")
- except Exception as e:
- logger.error(f"Background task failed: {e}", exc_info=True)
|