2026-06-05-recommend-api-refactor.md 24 KB

Recommend API Refactor Implementation Plan

For agentic workers: REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Remove recall_cust_count and delivery_count from the recommend API, introduce cust_code_list as a core retailer list that participates in recall/ranking for both old and new SKU paths, and remove all delivery allocation logic and report columns.

Architecture: The change touches four layers in sequence: request schema → API endpoint → core recommendation model → report utilities. The old-SKU path replaces the two-stage recall (ItemCF + hotness fallback) with a union of ItemCF and cust_code_list. The new-SKU path merges cust_code_list into the Item2Vec candidate pool so all customers are scored together via the same sales-volume normalization. Delivery allocation is removed end-to-end.

Tech Stack: Python 3, FastAPI, Pydantic, pandas, numpy, scikit-learn, Redis (ItemCF recall), MySQL (DAO layer), LightGBM + Logistic Regression (GBDT-LR), Item2Vec embeddings.


Chunk 1: Request Schema & API Endpoint

Task 1: Update RecommendRequest schema

Files:

  • Modify: api/request_body.py

  • [ ] Step 1: Open and read api/request_body.py

Confirm current fields: city_uuid, product_code, recall_cust_count, delivery_count, cultivacation_id, limit_cycle_name.

  • [ ] Step 2: Replace the schema

    from pydantic import BaseModel
    from typing import List
    
    class RecommendRequest(BaseModel):
    city_uuid: str
    product_code: str
    cust_code_list: List[str]   # 核心零售户ID列表
    cultivacation_id: str
    limit_cycle_name: str
    

Remove recall_cust_count: int and delivery_count: int. Add cust_code_list: List[str].

  • [ ] Step 3: Verify no syntax errors

    cd D:/projiect/dingsheng/BrandCultivation && python -c "from api.request_body import RecommendRequest; print('OK')"
    

Expected: OK

  • [ ] Step 4: Commit

    git add api/request_body.py
    git commit -m "refactor(api): replace recall_cust_count/delivery_count with cust_code_list in RecommendRequest"
    

Task 2: Update api/recommend.py endpoint

Files:

  • Modify: api/recommend.py

  • [ ] Step 1: Update the recommend() function signature and body

Replace the entire recommend() function (lines 15–54) with:

@router.post("/recommend")
async def recommend(request: RecommendRequest, backgroundTasks: BackgroundTasks):
    """推荐接口"""
    logger.info(f"Recommend request: city={request.city_uuid}, product={request.product_code}, core_custs={len(request.cust_code_list)}")

    gbdtlr_model_path = os.path.join("./models/rank/weights", request.city_uuid, "gbdtlr_model.pkl")
    if not os.path.exists(gbdtlr_model_path):
        logger.warning(f"Model not found: {gbdtlr_model_path}")
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="该城市的模型未训练,请先进行训练",
        )

    recommend_model = Recommend(request.city_uuid)

    products_in_order = dao.get_product_from_order(request.city_uuid)["product_code"].unique().tolist()
    if request.product_code in products_in_order:
        logger.info(f"Using GBDT-LR model for existing product {request.product_code}")
        recommend_list = recommend_model.get_recommend_list_by_gbdtlr(
            request.product_code, cust_code_list=request.cust_code_list
        )
    else:
        logger.info(f"Using Item2Vec model for new product {request.product_code}")
        recommend_list = recommend_model.get_recommend_list_by_item2vec(
            request.product_code, cust_code_list=request.cust_code_list
        )

    request_data = []
    for index, data in enumerate(recommend_list):
        request_data.append(
            {
                "id": index + 1,
                "cust_code": data["cust_code"],
                "recommend_score": data["recommend_score"],
            }
        )

    logger.info(f"Recommend completed: {len(request_data)} customers recommended")

    backgroundTasks.add_task(generate_and_upload_report, request)

    return {"code": 200, "msg": "success", "data": {"recommendationInfo": request_data}}

Key changes:

  • Remove recall_count and delivery_count from all calls
  • Pass cust_code_list=request.cust_code_list to both model methods
  • Remove get_recommend_and_delivery() call
  • Remove delivery_count from response items

  • [ ] Step 2: Update generate_and_upload_report() background task

Replace lines 57–85 with:

def generate_and_upload_report(request: RecommendRequest):
    """生成并上传报告到阿里云文件数据库"""
    logger.info(f"Background task started: generating report for {request.city_uuid}/{request.product_code}")
    try:
        report_util = ReportUtils(request.city_uuid, request.product_code)
        report_util.generate_all_data(request.cust_code_list)

        reports_dir = os.path.join("./data/reports", request.city_uuid, request.product_code)
        report_files = ["卷烟信息表", "品规商户特征关系表", "相似卷烟表", "商户售卖推荐表"]
        file_id_map = FileStreamUtils.upload_files(reports_dir, report_files)

        if file_id_map is None:
            logger.error(f"Report upload failed for {request.city_uuid}/{request.product_code}")
            return

        data_dict = {
            "cultivacation_id": request.cultivacation_id,
            "city_uuid": request.city_uuid,
            "limit_cycle_name": request.limit_cycle_name,
            "product_code": request.product_code,
            "product_info_table": file_id_map.get("卷烟信息表"),
            "relation_table": file_id_map.get("品规商户特征关系表"),
            "similarity_product_table": file_id_map.get("相似卷烟表"),
            "recommend_table": file_id_map.get("商户售卖推荐表"),
        }
        dao.insert_report(data_dict)
        logger.info(f"Background task completed: report uploaded for {request.city_uuid}/{request.product_code}")
    except Exception as e:
        logger.error(f"Background task failed: {e}", exc_info=True)

Key change: generate_all_data(request.recall_cust_count, request.delivery_count)generate_all_data(request.cust_code_list).

  • [ ] Step 3: Verify imports still valid

    cd D:/projiect/dingsheng/BrandCultivation && python -c "from api.recommend import router; print('OK')"
    

Expected: OK

  • [ ] Step 4: Commit

    git add api/recommend.py
    git commit -m "refactor(api): remove delivery allocation, pass cust_code_list to recommend models"
    

Chunk 2: Core Recommendation Model — Old SKU Path (GBDT-LR)

Task 3: Refactor recall logic in models/recommend.py

Files:

  • Modify: models/recommend.py

The goal: replace get_recal_cust(product_id, recall_count) with a new method that takes cust_code_list and returns the union of ItemCF recall and cust_code_list, with no hotness fallback and no count cap.

  • Step 1: Remove _get_hot_recall() method

Delete the entire method (lines 35–39):

def _get_hot_recall(self):
    """热度召回"""
    key = f"hot:{self._city_uuid}:sale_qty"
    recall_list = self._redis.zrevrange(key, 0, -1, withscores=False)
    return recall_list
  • Step 2: Rewrite get_recal_cust() to accept cust_code_list

Replace the old get_recal_cust(self, product_id, recall_count) (lines 41–55) with:

def get_recal_cust(self, product_id, cust_code_list):
    """通过协同过滤召回与核心零售户列表取并集,得到待推荐商户列表"""
    itemcf_recall_list = self._get_itemcf_recall(product_id)
    # 并集:保留 itemcf 顺序,追加 cust_code_list 中不重复的部分
    seen = set(itemcf_recall_list)
    extra = [c for c in cust_code_list if c not in seen]
    result = list(itemcf_recall_list) + extra
    logger.info(f"Recall completed: {len(result)} customers (itemcf={len(itemcf_recall_list)}, core_extra={len(extra)}) for product {product_id}")
    return result
  • Step 3: Update get_recommend_list_by_gbdtlr() signature and recall call

Replace the method signature and the first recall call (lines 57–61):

Old:

def get_recommend_list_by_gbdtlr(self, product_id, recall_count=500):
    ...
    recall_cust_list = self.get_recal_cust(product_id, recall_count)

New:

def get_recommend_list_by_gbdtlr(self, product_id, cust_code_list=None):
    """根据gbdt_lr获取商户推荐列表"""
    if cust_code_list is None:
        cust_code_list = []
    logger.info(f"GBDT-LR recommend started for product {product_id}")
    recall_cust_list = self.get_recal_cust(product_id, cust_code_list)

No other changes to the GBDT-LR method body are needed — it already handles the full recall_cust_list without a count cap in its feature fetching and scoring logic.

  • Step 4: Remove get_recommend_and_delivery() method

Delete the entire method (lines 109–133):

def get_recommend_and_delivery(self, recommend_list, delivery_count=5000):
    ...
  • [ ] Step 5: Verify syntax

    cd D:/projiect/dingsheng/BrandCultivation && python -c "from models.recommend import Recommend; print('OK')"
    

Expected: OK

  • [ ] Step 6: Commit

    git add models/recommend.py
    git commit -m "refactor(models): remove hotness fallback, merge cust_code_list into ItemCF recall, remove delivery allocation"
    

Chunk 3: Core Recommendation Model — New SKU Path (Item2Vec)

Task 4: Refactor get_recommend_list_by_item2vec() in models/recommend.py

Files:

  • Modify: models/recommend.py
  • Modify: models/item2vec/inference.py

The goal: cust_code_list customers are added into the Item2Vec candidate pool and scored together with the rest via the same sales-volume normalization pipeline.

  • Step 1: Update get_recommend_list_by_item2vec() in models/recommend.py

Replace the old method (lines 89–98):

Old:

def get_recommend_list_by_item2vec(self, product_id, recall_count=500):
    logger.info(f"Item2Vec recommend started for product {product_id}")
    recommend_list = self._item2vec_model.get_recommend_cust_list(product_id, top=recall_count)
    recommend_list = recommend_list.drop(columns=["sale_qty"])
    recommend_list = recommend_list.to_dict(orient='records')
    recommend_list = recommend_list[:recall_count]
    logger.info(f"Item2Vec recommend completed: {len(recommend_list)} results")
    return recommend_list

New:

def get_recommend_list_by_item2vec(self, product_id, cust_code_list=None):
    """根据item2vec获取商户推荐列表,核心商户并入候选集统一评分"""
    if cust_code_list is None:
        cust_code_list = []
    logger.info(f"Item2Vec recommend started for product {product_id}")
    recommend_list = self._item2vec_model.get_recommend_cust_list(product_id, cust_code_list=cust_code_list)
    recommend_list = recommend_list.drop(columns=["sale_qty"])
    recommend_list = recommend_list.to_dict(orient='records')
    logger.info(f"Item2Vec recommend completed: {len(recommend_list)} results")
    return recommend_list

Key changes:

  • Replace recall_count with cust_code_list
  • Pass cust_code_list down to get_recommend_cust_list()
  • Remove the [:recall_count] slice — return all scored customers

  • [ ] Step 2: Update get_recommend_cust_list() in models/item2vec/inference.py

The method currently builds a candidate pool from similar-product order history and scores them. We need to:

  1. Accept cust_code_list parameter
  2. After building recommend_cust from order data, union in any cust_code_list members not yet present (with sale_qty=0 so they enter the normalization pipeline naturally)
  3. Run the existing log1p + StandardScaler + sigmoid normalization on the full merged set

Replace the get_recommend_cust_list() method (lines 38–70):

def get_recommend_cust_list(self, product_code, top=100, cust_code_list=None):
    """获取推荐的商户列表,核心商户并入候选集统一评分"""
    if cust_code_list is None:
        cust_code_list = []
    logger.info(f"Getting recommend list for product {product_code}, top={top}")
    product_list = self.get_similarity_list(product_code)
    order_data = self._dao.get_order_by_product_ids(self._city_uuid, product_list)[OrderConfig.FEATURE_COLUMNS]
    order_data["sale_qty"] = order_data["sale_qty"].fillna(0)
    order_data = order_data.groupby(["cust_code", "product_code"], as_index=False)["sale_qty"].mean()

    # 按照卷烟分组,取每款卷烟售卖最好的前50个商户
    order_data = (
        order_data
        .sort_values(["product_code", "sale_qty", "cust_code"], ascending=[True, False, True])
        .groupby("product_code")
        .head(top)
    )

    recommend_cust = (
        order_data.groupby(["cust_code"], as_index=False)["sale_qty"].sum()
        .query("sale_qty > 0")
        .sort_values(["sale_qty", "cust_code"], ascending=[False, True])
    )

    # 将 cust_code_list 中不在候选集的商户补入,sale_qty=0 参与归一化
    existing_custs = set(recommend_cust["cust_code"].tolist())
    extra_rows = [{"cust_code": c, "sale_qty": 0} for c in cust_code_list if c not in existing_custs]
    if extra_rows:
        extra_df = pd.DataFrame(extra_rows)
        recommend_cust = pd.concat([recommend_cust, extra_df], ignore_index=True)

    # log1p + StandardScaler + sigmoid 归一化(对全部候选集统一做)
    log_qty = np.log1p(recommend_cust["sale_qty"].values).reshape(-1, 1)
    scaler = StandardScaler()
    normalized = scaler.fit_transform(log_qty)
    recommend_cust["recommend_score"] = ((1 / (1 + np.exp(-normalized))) * 100).flatten()

    # 按分数降序返回
    recommend_cust = recommend_cust.sort_values(["recommend_score", "cust_code"], ascending=[False, True]).reset_index(drop=True)

    return recommend_cust

Key changes:

  • Accept cust_code_list parameter (default [])
  • After building recommend_cust from order history, concat any missing cust_code_list members with sale_qty=0
  • Run the normalization on the merged set — customers with sale_qty=0 will naturally score low but are present
  • Sort by score descending and return all (no count cap)
  • Remove the old top cap at the end (it was recommend_list[:recall_count] in the caller, which is also removed)

  • [ ] Step 3: Verify syntax for both files

    cd D:/projiect/dingsheng/BrandCultivation && python -c "from models.recommend import Recommend; from models.item2vec.inference import Item2VecModel; print('OK')"
    

Expected: OK

  • [ ] Step 4: Commit

    git add models/recommend.py models/item2vec/inference.py
    git commit -m "refactor(models): merge cust_code_list into Item2Vec candidate pool for unified scoring"
    

Chunk 4: Report Utilities

Task 5: Remove delivery columns from report generation

Files:

  • Modify: utils/report_utils.py
  • Modify: utils/reports_process.py

  • [ ] Step 1: Update generate_recommend_report() in utils/report_utils.py

The method currently calls calculate_delivery_by_recommend_data(..., delivery_count). We remove delivery_count and update the call.

Replace generate_recommend_report() (lines 91–100):

def generate_recommend_report(self, recall_count):
    """生成推荐报告"""
    logger.info("Generating recommend report")
    recommend_data = self._get_recommend_data(recall_count)
    recommend_list = list(map(lambda x: x["cust_code"], recommend_data))
    recommend_cust_infos = self._dao.get_cust_by_ids(self._city_uuid, recommend_list)
    report = build_recommend_report(recommend_data, recommend_cust_infos)

    report.to_excel(os.path.join(self._save_dir, "商户售卖推荐表.xlsx"), index=False)
    logger.info("Recommend report saved")

Note: rename helper function call from calculate_delivery_by_recommend_data to build_recommend_report (defined in next step).

  • Step 2: Update generate_all_data() in utils/report_utils.py

Replace generate_all_data() (lines 160–167):

def generate_all_data(self, cust_code_list):
    logger.info("Generating all reports")
    self.generate_feats_ralation_report(cust_code_list)
    self.generate_product_report()
    self.generate_recommend_report(cust_code_list)
    self.generate_similarity_product_report()
    logger.info("All reports generated")
  • Step 3: Update _get_recommend_data() in utils/report_utils.py

The method currently accepts recall_count and passes it to the model methods. Replace with cust_code_list:

Replace _get_recommend_data() (lines 26–39):

def _get_recommend_data(self, cust_code_list):
    """获取推荐商户列表"""
    products_in_order = self._dao.get_product_from_order(self._city_uuid)["product_code"].unique().tolist()
    if self._product_id in products_in_order:
        recommend_data = self._recommend_model.get_recommend_list_by_gbdtlr(
            self._product_id, cust_code_list=cust_code_list
        )
    else:
        recommend_data = self._recommend_model.get_recommend_list_by_item2vec(
            self._product_id, cust_code_list=cust_code_list
        )
    return recommend_data
  • Step 4: Update generate_feats_ralation_report() signature in utils/report_utils.py

It calls _generate_feats_map(recall_count) internally. Update both:

Replace generate_feats_ralation_report() (lines 69–79):

def generate_feats_ralation_report(self, cust_code_list):
    """生成特征相关性分析报告"""
    logger.info("Generating feature relation report")
    feats_map = self._generate_feats_map(cust_code_list)
    product_content = self._get_product_content()
    shap_result = self._recommend_model._gbdtlr_model.generate_shap_interance(feats_map)
    report = feats_relation_process(shap_result, product_content)
    report.to_excel(os.path.join(self._save_dir, "品规商户特征关系表.xlsx"), index=False)
    logger.info("Feature relation report saved")

Replace _generate_feats_map() (lines 41–61):

def _generate_feats_map(self, cust_code_list):
    """根据召回的推荐列表生成品规-商户features_map"""
    recommend_data = self._get_recommend_data(cust_code_list)
    recommend_list = list(map(lambda x: x["cust_code"], recommend_data))
    product_data = self._product_data.copy()
    cust_data = self._dao.get_cust_by_ids(self._city_uuid, recommend_list)[CustConfig.FEATURE_COLUMNS]
    product_data = sample_data_clear(product_data, ProductConfig)
    cust_data = sample_data_clear(cust_data, CustConfig)
    feats_map = generate_feats_map(product_data, cust_data)
    return feats_map
  • Step 5: Update calculate_delivery_by_recommend_data()build_recommend_report() in utils/reports_process.py

Replace calculate_delivery_by_recommend_data() (lines 33–73) with a new function that drops all delivery allocation logic:

def build_recommend_report(recommend_data, recommend_cust_infos):
    """根据推荐数据生成推荐商户报告(不含投放量)"""
    recommend_data = pd.DataFrame(recommend_data)

    cust_ids = recommend_cust_infos.set_index("cust_code")
    recommend_data = recommend_data.join(cust_ids, on="cust_code", how="inner")
    recommend_data = recommend_data[["cust_code", "cust_name", "recommend_score"]]
    recommend_data = recommend_data.reset_index(drop=True)
    recommend_data.index = recommend_data.index + 1
    recommend_data = recommend_data.reset_index()
    recommend_data = recommend_data.rename(
        columns={
            "index": "推荐序号",
            "cust_code": "商户编号",
            "cust_name": "商户名称",
            "recommend_score": "推荐系数",
        }
    )
    return recommend_data
  • Step 6: Update import in utils/report_utils.py

The import on line 10 currently imports calculate_delivery_by_recommend_data. Update it:

from utils.reports_process import feats_relation_process, build_recommend_report, eval_report_process_pre, eval_report_process
  • Step 7: Check generate_eval_data() still compiles — it references "建议投放量(条)" column

In utils/report_utils.py line 153:

recommend_data = recommend_data.drop(columns=["建议投放量(条)"])

This column no longer exists in the new report. Remove that line:

def generate_eval_data(self, start_time, end_time, recommend_data):
    """根据推荐列表生成验证报告"""
    logger.info("Generating eval report")
    if self._product_id == '350139':
        eval_product_id = "350355"
    else:
        eval_product_id = self._product_id
    delivery_data = self._dao.get_delivery_data_by_product(self._city_uuid, eval_product_id, start_time, end_time)
    delivery_data = delivery_data[DeliveryConfig.FEATURE_COLUMNS]
    delivery_data = sample_data_clear(delivery_data, DeliveryConfig)

    report = eval_report_process(delivery_data, recommend_data)

    report.to_excel(os.path.join(self._save_dir, "投放验证报告.xlsx"), index=False)
    logger.info("Eval report saved")
  • [ ] Step 8: Verify imports and syntax

    cd D:/projiect/dingsheng/BrandCultivation && python -c "from utils.report_utils import ReportUtils; from utils.reports_process import build_recommend_report; print('OK')"
    

Expected: OK

  • [ ] Step 9: Commit

    git add utils/report_utils.py utils/reports_process.py
    git commit -m "refactor(utils): remove delivery_count from recommend report, pass cust_code_list through report pipeline"
    

Chunk 5: End-to-End Smoke Test

Task 6: Verify the full pipeline loads without errors

Files:

  • Read: all modified files (no new changes)

  • [ ] Step 1: Import all changed modules

    cd D:/projiect/dingsheng/BrandCultivation && python -c "
    from api.request_body import RecommendRequest
    from api.recommend import router
    from models.recommend import Recommend
    from models.item2vec.inference import Item2VecModel
    from utils.report_utils import ReportUtils
    from utils.reports_process import build_recommend_report
    print('All imports OK')
    "
    

Expected: All imports OK

  • [ ] Step 2: Validate RecommendRequest schema with sample data

    cd D:/projiect/dingsheng/BrandCultivation && python -c "
    from api.request_body import RecommendRequest
    r = RecommendRequest(
    city_uuid='test_city',
    product_code='350139',
    cust_code_list=['C001', 'C002'],
    cultivacation_id='CULT_001',
    limit_cycle_name='2026-W01'
    )
    print('cust_code_list:', r.cust_code_list)
    assert r.cust_code_list == ['C001', 'C002']
    print('Schema validation OK')
    "
    

Expected: Schema validation OK

  • [ ] Step 3: Confirm old fields are gone

    cd D:/projiect/dingsheng/BrandCultivation && python -c "
    from api.request_body import RecommendRequest
    import inspect
    fields = RecommendRequest.model_fields
    assert 'recall_cust_count' not in fields, 'recall_cust_count still present'
    assert 'delivery_count' not in fields, 'delivery_count still present'
    assert 'cust_code_list' in fields, 'cust_code_list missing'
    print('Field removal verified OK')
    "
    

Expected: Field removal verified OK

  • [ ] Step 4: Confirm get_recommend_and_delivery is gone from Recommend

    cd D:/projiect/dingsheng/BrandCultivation && python -c "
    from models.recommend import Recommend
    assert not hasattr(Recommend, 'get_recommend_and_delivery'), 'method still exists'
    assert not hasattr(Recommend, '_get_hot_recall'), 'hot recall still exists'
    print('Method removal verified OK')
    "
    

Expected: Method removal verified OK

  • [ ] Step 5: Final commit

    git add -p  # verify nothing unintended is staged
    git commit -m "test: smoke-test all changed modules for recommend API refactor" --allow-empty
    

(Use --allow-empty only if there are no file changes at this step — this is a verification-only task.)