from database import MySqlDao from fastapi import APIRouter, BackgroundTasks, HTTPException, status from .request_body import RecommendRequest from core import get_logger from models import Recommend import os from utils import FileStreamUtils, ReportUtils logger = get_logger("api.recommend") dao = MySqlDao() router = APIRouter() @router.post("/recommend") async def recommend(request: RecommendRequest, backgroundTasks: BackgroundTasks): """推荐接口""" logger.info(f"Recommend request: city={request.city_uuid}, product={request.product_code}, core_custs={len(request.cust_code_list)}") gbdtlr_model_path = os.path.join("./models/rank/weights", request.city_uuid, "gbdtlr_model.pkl") if not os.path.exists(gbdtlr_model_path): logger.warning(f"Model not found: {gbdtlr_model_path}") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="该城市的模型未训练,请先进行训练", ) recommend_model = Recommend(request.city_uuid) products_in_order = dao.get_product_from_order(request.city_uuid)["product_code"].unique().tolist() if request.product_code in products_in_order: logger.info(f"Using GBDT-LR model for existing product {request.product_code}") recommend_list = recommend_model.get_recommend_list_by_gbdtlr( request.product_code, cust_code_list=request.cust_code_list ) else: logger.info(f"Using Item2Vec model for new product {request.product_code}") recommend_list = recommend_model.get_recommend_list_by_item2vec( request.product_code, cust_code_list=request.cust_code_list ) request_data = [] for index, data in enumerate(recommend_list): request_data.append( { "id": index + 1, "cust_code": data["cust_code"], "recommend_score": data["recommend_score"], } ) logger.info(f"Recommend completed: {len(request_data)} customers recommended") backgroundTasks.add_task(generate_and_upload_report, request) return {"code": 200, "msg": "success", "data": {"recommendationInfo": request_data}} def generate_and_upload_report(request: RecommendRequest): """生成并上传报告到阿里云文件数据库""" logger.info(f"Background task started: generating report for {request.city_uuid}/{request.product_code}") try: report_util = ReportUtils(request.city_uuid, request.product_code) report_util.generate_all_data(request.cust_code_list) reports_dir = os.path.join("./data/reports", request.city_uuid, request.product_code) report_files = ["卷烟信息表", "品规商户特征关系表", "相似卷烟表", "商户售卖推荐表"] file_id_map = FileStreamUtils.upload_files(reports_dir, report_files) if file_id_map is None: logger.error(f"Report upload failed for {request.city_uuid}/{request.product_code}") return data_dict = { "cultivacation_id": request.cultivacation_id, "city_uuid": request.city_uuid, "limit_cycle_name": request.limit_cycle_name, "product_code": request.product_code, "product_info_table": file_id_map.get("卷烟信息表"), "relation_table": file_id_map.get("品规商户特征关系表"), "similarity_product_table": file_id_map.get("相似卷烟表"), "recommend_table": file_id_map.get("商户售卖推荐表"), } dao.insert_report(data_dict) logger.info(f"Background task completed: report uploaded for {request.city_uuid}/{request.product_code}") except Exception as e: logger.error(f"Background task failed: {e}", exc_info=True)