# Recommend API Refactor Implementation Plan > **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Remove `recall_cust_count` and `delivery_count` from the recommend API, introduce `cust_code_list` as a core retailer list that participates in recall/ranking for both old and new SKU paths, and remove all delivery allocation logic and report columns. **Architecture:** The change touches four layers in sequence: request schema → API endpoint → core recommendation model → report utilities. The old-SKU path replaces the two-stage recall (ItemCF + hotness fallback) with a union of ItemCF and `cust_code_list`. The new-SKU path merges `cust_code_list` into the Item2Vec candidate pool so all customers are scored together via the same sales-volume normalization. Delivery allocation is removed end-to-end. **Tech Stack:** Python 3, FastAPI, Pydantic, pandas, numpy, scikit-learn, Redis (ItemCF recall), MySQL (DAO layer), LightGBM + Logistic Regression (GBDT-LR), Item2Vec embeddings. --- ## Chunk 1: Request Schema & API Endpoint ### Task 1: Update `RecommendRequest` schema **Files:** - Modify: `api/request_body.py` - [ ] **Step 1: Open and read `api/request_body.py`** Confirm current fields: `city_uuid`, `product_code`, `recall_cust_count`, `delivery_count`, `cultivacation_id`, `limit_cycle_name`. - [ ] **Step 2: Replace the schema** ```python from pydantic import BaseModel from typing import List class RecommendRequest(BaseModel): city_uuid: str product_code: str cust_code_list: List[str] # 核心零售户ID列表 cultivacation_id: str limit_cycle_name: str ``` Remove `recall_cust_count: int` and `delivery_count: int`. Add `cust_code_list: List[str]`. - [ ] **Step 3: Verify no syntax errors** ```bash cd D:/projiect/dingsheng/BrandCultivation && python -c "from api.request_body import RecommendRequest; print('OK')" ``` Expected: `OK` - [ ] **Step 4: Commit** ```bash git add api/request_body.py git commit -m "refactor(api): replace recall_cust_count/delivery_count with cust_code_list in RecommendRequest" ``` --- ### Task 2: Update `api/recommend.py` endpoint **Files:** - Modify: `api/recommend.py` - [ ] **Step 1: Update the `recommend()` function signature and body** Replace the entire `recommend()` function (lines 15–54) with: ```python @router.post("/recommend") async def recommend(request: RecommendRequest, backgroundTasks: BackgroundTasks): """推荐接口""" logger.info(f"Recommend request: city={request.city_uuid}, product={request.product_code}, core_custs={len(request.cust_code_list)}") gbdtlr_model_path = os.path.join("./models/rank/weights", request.city_uuid, "gbdtlr_model.pkl") if not os.path.exists(gbdtlr_model_path): logger.warning(f"Model not found: {gbdtlr_model_path}") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="该城市的模型未训练,请先进行训练", ) recommend_model = Recommend(request.city_uuid) products_in_order = dao.get_product_from_order(request.city_uuid)["product_code"].unique().tolist() if request.product_code in products_in_order: logger.info(f"Using GBDT-LR model for existing product {request.product_code}") recommend_list = recommend_model.get_recommend_list_by_gbdtlr( request.product_code, cust_code_list=request.cust_code_list ) else: logger.info(f"Using Item2Vec model for new product {request.product_code}") recommend_list = recommend_model.get_recommend_list_by_item2vec( request.product_code, cust_code_list=request.cust_code_list ) request_data = [] for index, data in enumerate(recommend_list): request_data.append( { "id": index + 1, "cust_code": data["cust_code"], "recommend_score": data["recommend_score"], } ) logger.info(f"Recommend completed: {len(request_data)} customers recommended") backgroundTasks.add_task(generate_and_upload_report, request) return {"code": 200, "msg": "success", "data": {"recommendationInfo": request_data}} ``` Key changes: - Remove `recall_count` and `delivery_count` from all calls - Pass `cust_code_list=request.cust_code_list` to both model methods - Remove `get_recommend_and_delivery()` call - Remove `delivery_count` from response items - [ ] **Step 2: Update `generate_and_upload_report()` background task** Replace lines 57–85 with: ```python def generate_and_upload_report(request: RecommendRequest): """生成并上传报告到阿里云文件数据库""" logger.info(f"Background task started: generating report for {request.city_uuid}/{request.product_code}") try: report_util = ReportUtils(request.city_uuid, request.product_code) report_util.generate_all_data(request.cust_code_list) reports_dir = os.path.join("./data/reports", request.city_uuid, request.product_code) report_files = ["卷烟信息表", "品规商户特征关系表", "相似卷烟表", "商户售卖推荐表"] file_id_map = FileStreamUtils.upload_files(reports_dir, report_files) if file_id_map is None: logger.error(f"Report upload failed for {request.city_uuid}/{request.product_code}") return data_dict = { "cultivacation_id": request.cultivacation_id, "city_uuid": request.city_uuid, "limit_cycle_name": request.limit_cycle_name, "product_code": request.product_code, "product_info_table": file_id_map.get("卷烟信息表"), "relation_table": file_id_map.get("品规商户特征关系表"), "similarity_product_table": file_id_map.get("相似卷烟表"), "recommend_table": file_id_map.get("商户售卖推荐表"), } dao.insert_report(data_dict) logger.info(f"Background task completed: report uploaded for {request.city_uuid}/{request.product_code}") except Exception as e: logger.error(f"Background task failed: {e}", exc_info=True) ``` Key change: `generate_all_data(request.recall_cust_count, request.delivery_count)` → `generate_all_data(request.cust_code_list)`. - [ ] **Step 3: Verify imports still valid** ```bash cd D:/projiect/dingsheng/BrandCultivation && python -c "from api.recommend import router; print('OK')" ``` Expected: `OK` - [ ] **Step 4: Commit** ```bash git add api/recommend.py git commit -m "refactor(api): remove delivery allocation, pass cust_code_list to recommend models" ``` --- ## Chunk 2: Core Recommendation Model — Old SKU Path (GBDT-LR) ### Task 3: Refactor recall logic in `models/recommend.py` **Files:** - Modify: `models/recommend.py` The goal: replace `get_recal_cust(product_id, recall_count)` with a new method that takes `cust_code_list` and returns the union of ItemCF recall and `cust_code_list`, with no hotness fallback and no count cap. - [ ] **Step 1: Remove `_get_hot_recall()` method** Delete the entire method (lines 35–39): ```python def _get_hot_recall(self): """热度召回""" key = f"hot:{self._city_uuid}:sale_qty" recall_list = self._redis.zrevrange(key, 0, -1, withscores=False) return recall_list ``` - [ ] **Step 2: Rewrite `get_recal_cust()` to accept `cust_code_list`** Replace the old `get_recal_cust(self, product_id, recall_count)` (lines 41–55) with: ```python def get_recal_cust(self, product_id, cust_code_list): """通过协同过滤召回与核心零售户列表取并集,得到待推荐商户列表""" itemcf_recall_list = self._get_itemcf_recall(product_id) # 并集:保留 itemcf 顺序,追加 cust_code_list 中不重复的部分 seen = set(itemcf_recall_list) extra = [c for c in cust_code_list if c not in seen] result = list(itemcf_recall_list) + extra logger.info(f"Recall completed: {len(result)} customers (itemcf={len(itemcf_recall_list)}, core_extra={len(extra)}) for product {product_id}") return result ``` - [ ] **Step 3: Update `get_recommend_list_by_gbdtlr()` signature and recall call** Replace the method signature and the first recall call (lines 57–61): Old: ```python def get_recommend_list_by_gbdtlr(self, product_id, recall_count=500): ... recall_cust_list = self.get_recal_cust(product_id, recall_count) ``` New: ```python def get_recommend_list_by_gbdtlr(self, product_id, cust_code_list=None): """根据gbdt_lr获取商户推荐列表""" if cust_code_list is None: cust_code_list = [] logger.info(f"GBDT-LR recommend started for product {product_id}") recall_cust_list = self.get_recal_cust(product_id, cust_code_list) ``` No other changes to the GBDT-LR method body are needed — it already handles the full `recall_cust_list` without a count cap in its feature fetching and scoring logic. - [ ] **Step 4: Remove `get_recommend_and_delivery()` method** Delete the entire method (lines 109–133): ```python def get_recommend_and_delivery(self, recommend_list, delivery_count=5000): ... ``` - [ ] **Step 5: Verify syntax** ```bash cd D:/projiect/dingsheng/BrandCultivation && python -c "from models.recommend import Recommend; print('OK')" ``` Expected: `OK` - [ ] **Step 6: Commit** ```bash git add models/recommend.py git commit -m "refactor(models): remove hotness fallback, merge cust_code_list into ItemCF recall, remove delivery allocation" ``` --- ## Chunk 3: Core Recommendation Model — New SKU Path (Item2Vec) ### Task 4: Refactor `get_recommend_list_by_item2vec()` in `models/recommend.py` **Files:** - Modify: `models/recommend.py` - Modify: `models/item2vec/inference.py` The goal: `cust_code_list` customers are added into the Item2Vec candidate pool and scored together with the rest via the same sales-volume normalization pipeline. - [ ] **Step 1: Update `get_recommend_list_by_item2vec()` in `models/recommend.py`** Replace the old method (lines 89–98): Old: ```python def get_recommend_list_by_item2vec(self, product_id, recall_count=500): logger.info(f"Item2Vec recommend started for product {product_id}") recommend_list = self._item2vec_model.get_recommend_cust_list(product_id, top=recall_count) recommend_list = recommend_list.drop(columns=["sale_qty"]) recommend_list = recommend_list.to_dict(orient='records') recommend_list = recommend_list[:recall_count] logger.info(f"Item2Vec recommend completed: {len(recommend_list)} results") return recommend_list ``` New: ```python def get_recommend_list_by_item2vec(self, product_id, cust_code_list=None): """根据item2vec获取商户推荐列表,核心商户并入候选集统一评分""" if cust_code_list is None: cust_code_list = [] logger.info(f"Item2Vec recommend started for product {product_id}") recommend_list = self._item2vec_model.get_recommend_cust_list(product_id, cust_code_list=cust_code_list) recommend_list = recommend_list.drop(columns=["sale_qty"]) recommend_list = recommend_list.to_dict(orient='records') logger.info(f"Item2Vec recommend completed: {len(recommend_list)} results") return recommend_list ``` Key changes: - Replace `recall_count` with `cust_code_list` - Pass `cust_code_list` down to `get_recommend_cust_list()` - Remove the `[:recall_count]` slice — return all scored customers - [ ] **Step 2: Update `get_recommend_cust_list()` in `models/item2vec/inference.py`** The method currently builds a candidate pool from similar-product order history and scores them. We need to: 1. Accept `cust_code_list` parameter 2. After building `recommend_cust` from order data, union in any `cust_code_list` members not yet present (with `sale_qty=0` so they enter the normalization pipeline naturally) 3. Run the existing log1p + StandardScaler + sigmoid normalization on the full merged set Replace the `get_recommend_cust_list()` method (lines 38–70): ```python def get_recommend_cust_list(self, product_code, top=100, cust_code_list=None): """获取推荐的商户列表,核心商户并入候选集统一评分""" if cust_code_list is None: cust_code_list = [] logger.info(f"Getting recommend list for product {product_code}, top={top}") product_list = self.get_similarity_list(product_code) order_data = self._dao.get_order_by_product_ids(self._city_uuid, product_list)[OrderConfig.FEATURE_COLUMNS] order_data["sale_qty"] = order_data["sale_qty"].fillna(0) order_data = order_data.groupby(["cust_code", "product_code"], as_index=False)["sale_qty"].mean() # 按照卷烟分组,取每款卷烟售卖最好的前50个商户 order_data = ( order_data .sort_values(["product_code", "sale_qty", "cust_code"], ascending=[True, False, True]) .groupby("product_code") .head(top) ) recommend_cust = ( order_data.groupby(["cust_code"], as_index=False)["sale_qty"].sum() .query("sale_qty > 0") .sort_values(["sale_qty", "cust_code"], ascending=[False, True]) ) # 将 cust_code_list 中不在候选集的商户补入,sale_qty=0 参与归一化 existing_custs = set(recommend_cust["cust_code"].tolist()) extra_rows = [{"cust_code": c, "sale_qty": 0} for c in cust_code_list if c not in existing_custs] if extra_rows: extra_df = pd.DataFrame(extra_rows) recommend_cust = pd.concat([recommend_cust, extra_df], ignore_index=True) # log1p + StandardScaler + sigmoid 归一化(对全部候选集统一做) log_qty = np.log1p(recommend_cust["sale_qty"].values).reshape(-1, 1) scaler = StandardScaler() normalized = scaler.fit_transform(log_qty) recommend_cust["recommend_score"] = ((1 / (1 + np.exp(-normalized))) * 100).flatten() # 按分数降序返回 recommend_cust = recommend_cust.sort_values(["recommend_score", "cust_code"], ascending=[False, True]).reset_index(drop=True) return recommend_cust ``` Key changes: - Accept `cust_code_list` parameter (default `[]`) - After building `recommend_cust` from order history, concat any missing `cust_code_list` members with `sale_qty=0` - Run the normalization on the merged set — customers with `sale_qty=0` will naturally score low but are present - Sort by score descending and return all (no count cap) - Remove the old `top` cap at the end (it was `recommend_list[:recall_count]` in the caller, which is also removed) - [ ] **Step 3: Verify syntax for both files** ```bash cd D:/projiect/dingsheng/BrandCultivation && python -c "from models.recommend import Recommend; from models.item2vec.inference import Item2VecModel; print('OK')" ``` Expected: `OK` - [ ] **Step 4: Commit** ```bash git add models/recommend.py models/item2vec/inference.py git commit -m "refactor(models): merge cust_code_list into Item2Vec candidate pool for unified scoring" ``` --- ## Chunk 4: Report Utilities ### Task 5: Remove delivery columns from report generation **Files:** - Modify: `utils/report_utils.py` - Modify: `utils/reports_process.py` - [ ] **Step 1: Update `generate_recommend_report()` in `utils/report_utils.py`** The method currently calls `calculate_delivery_by_recommend_data(..., delivery_count)`. We remove `delivery_count` and update the call. Replace `generate_recommend_report()` (lines 91–100): ```python def generate_recommend_report(self, recall_count): """生成推荐报告""" logger.info("Generating recommend report") recommend_data = self._get_recommend_data(recall_count) recommend_list = list(map(lambda x: x["cust_code"], recommend_data)) recommend_cust_infos = self._dao.get_cust_by_ids(self._city_uuid, recommend_list) report = build_recommend_report(recommend_data, recommend_cust_infos) report.to_excel(os.path.join(self._save_dir, "商户售卖推荐表.xlsx"), index=False) logger.info("Recommend report saved") ``` Note: rename helper function call from `calculate_delivery_by_recommend_data` to `build_recommend_report` (defined in next step). - [ ] **Step 2: Update `generate_all_data()` in `utils/report_utils.py`** Replace `generate_all_data()` (lines 160–167): ```python def generate_all_data(self, cust_code_list): logger.info("Generating all reports") self.generate_feats_ralation_report(cust_code_list) self.generate_product_report() self.generate_recommend_report(cust_code_list) self.generate_similarity_product_report() logger.info("All reports generated") ``` - [ ] **Step 3: Update `_get_recommend_data()` in `utils/report_utils.py`** The method currently accepts `recall_count` and passes it to the model methods. Replace with `cust_code_list`: Replace `_get_recommend_data()` (lines 26–39): ```python def _get_recommend_data(self, cust_code_list): """获取推荐商户列表""" products_in_order = self._dao.get_product_from_order(self._city_uuid)["product_code"].unique().tolist() if self._product_id in products_in_order: recommend_data = self._recommend_model.get_recommend_list_by_gbdtlr( self._product_id, cust_code_list=cust_code_list ) else: recommend_data = self._recommend_model.get_recommend_list_by_item2vec( self._product_id, cust_code_list=cust_code_list ) return recommend_data ``` - [ ] **Step 4: Update `generate_feats_ralation_report()` signature in `utils/report_utils.py`** It calls `_generate_feats_map(recall_count)` internally. Update both: Replace `generate_feats_ralation_report()` (lines 69–79): ```python def generate_feats_ralation_report(self, cust_code_list): """生成特征相关性分析报告""" logger.info("Generating feature relation report") feats_map = self._generate_feats_map(cust_code_list) product_content = self._get_product_content() shap_result = self._recommend_model._gbdtlr_model.generate_shap_interance(feats_map) report = feats_relation_process(shap_result, product_content) report.to_excel(os.path.join(self._save_dir, "品规商户特征关系表.xlsx"), index=False) logger.info("Feature relation report saved") ``` Replace `_generate_feats_map()` (lines 41–61): ```python def _generate_feats_map(self, cust_code_list): """根据召回的推荐列表生成品规-商户features_map""" recommend_data = self._get_recommend_data(cust_code_list) recommend_list = list(map(lambda x: x["cust_code"], recommend_data)) product_data = self._product_data.copy() cust_data = self._dao.get_cust_by_ids(self._city_uuid, recommend_list)[CustConfig.FEATURE_COLUMNS] product_data = sample_data_clear(product_data, ProductConfig) cust_data = sample_data_clear(cust_data, CustConfig) feats_map = generate_feats_map(product_data, cust_data) return feats_map ``` - [ ] **Step 5: Update `calculate_delivery_by_recommend_data()` → `build_recommend_report()` in `utils/reports_process.py`** Replace `calculate_delivery_by_recommend_data()` (lines 33–73) with a new function that drops all delivery allocation logic: ```python def build_recommend_report(recommend_data, recommend_cust_infos): """根据推荐数据生成推荐商户报告(不含投放量)""" recommend_data = pd.DataFrame(recommend_data) cust_ids = recommend_cust_infos.set_index("cust_code") recommend_data = recommend_data.join(cust_ids, on="cust_code", how="inner") recommend_data = recommend_data[["cust_code", "cust_name", "recommend_score"]] recommend_data = recommend_data.reset_index(drop=True) recommend_data.index = recommend_data.index + 1 recommend_data = recommend_data.reset_index() recommend_data = recommend_data.rename( columns={ "index": "推荐序号", "cust_code": "商户编号", "cust_name": "商户名称", "recommend_score": "推荐系数", } ) return recommend_data ``` - [ ] **Step 6: Update import in `utils/report_utils.py`** The import on line 10 currently imports `calculate_delivery_by_recommend_data`. Update it: ```python from utils.reports_process import feats_relation_process, build_recommend_report, eval_report_process_pre, eval_report_process ``` - [ ] **Step 7: Check `generate_eval_data()` still compiles — it references `"建议投放量(条)"` column** In `utils/report_utils.py` line 153: ```python recommend_data = recommend_data.drop(columns=["建议投放量(条)"]) ``` This column no longer exists in the new report. Remove that line: ```python def generate_eval_data(self, start_time, end_time, recommend_data): """根据推荐列表生成验证报告""" logger.info("Generating eval report") if self._product_id == '350139': eval_product_id = "350355" else: eval_product_id = self._product_id delivery_data = self._dao.get_delivery_data_by_product(self._city_uuid, eval_product_id, start_time, end_time) delivery_data = delivery_data[DeliveryConfig.FEATURE_COLUMNS] delivery_data = sample_data_clear(delivery_data, DeliveryConfig) report = eval_report_process(delivery_data, recommend_data) report.to_excel(os.path.join(self._save_dir, "投放验证报告.xlsx"), index=False) logger.info("Eval report saved") ``` - [ ] **Step 8: Verify imports and syntax** ```bash cd D:/projiect/dingsheng/BrandCultivation && python -c "from utils.report_utils import ReportUtils; from utils.reports_process import build_recommend_report; print('OK')" ``` Expected: `OK` - [ ] **Step 9: Commit** ```bash git add utils/report_utils.py utils/reports_process.py git commit -m "refactor(utils): remove delivery_count from recommend report, pass cust_code_list through report pipeline" ``` --- ## Chunk 5: End-to-End Smoke Test ### Task 6: Verify the full pipeline loads without errors **Files:** - Read: all modified files (no new changes) - [ ] **Step 1: Import all changed modules** ```bash cd D:/projiect/dingsheng/BrandCultivation && python -c " from api.request_body import RecommendRequest from api.recommend import router from models.recommend import Recommend from models.item2vec.inference import Item2VecModel from utils.report_utils import ReportUtils from utils.reports_process import build_recommend_report print('All imports OK') " ``` Expected: `All imports OK` - [ ] **Step 2: Validate `RecommendRequest` schema with sample data** ```bash cd D:/projiect/dingsheng/BrandCultivation && python -c " from api.request_body import RecommendRequest r = RecommendRequest( city_uuid='test_city', product_code='350139', cust_code_list=['C001', 'C002'], cultivacation_id='CULT_001', limit_cycle_name='2026-W01' ) print('cust_code_list:', r.cust_code_list) assert r.cust_code_list == ['C001', 'C002'] print('Schema validation OK') " ``` Expected: `Schema validation OK` - [ ] **Step 3: Confirm old fields are gone** ```bash cd D:/projiect/dingsheng/BrandCultivation && python -c " from api.request_body import RecommendRequest import inspect fields = RecommendRequest.model_fields assert 'recall_cust_count' not in fields, 'recall_cust_count still present' assert 'delivery_count' not in fields, 'delivery_count still present' assert 'cust_code_list' in fields, 'cust_code_list missing' print('Field removal verified OK') " ``` Expected: `Field removal verified OK` - [ ] **Step 4: Confirm `get_recommend_and_delivery` is gone from `Recommend`** ```bash cd D:/projiect/dingsheng/BrandCultivation && python -c " from models.recommend import Recommend assert not hasattr(Recommend, 'get_recommend_and_delivery'), 'method still exists' assert not hasattr(Recommend, '_get_hot_recall'), 'hot recall still exists' print('Method removal verified OK') " ``` Expected: `Method removal verified OK` - [ ] **Step 5: Final commit** ```bash git add -p # verify nothing unintended is staged git commit -m "test: smoke-test all changed modules for recommend API refactor" --allow-empty ``` (Use `--allow-empty` only if there are no file changes at this step — this is a verification-only task.)