For agentic workers: REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Remove recall_cust_count and delivery_count from the recommend API, introduce cust_code_list as a core retailer list that participates in recall/ranking for both old and new SKU paths, and remove all delivery allocation logic and report columns.
Architecture: The change touches four layers in sequence: request schema → API endpoint → core recommendation model → report utilities. The old-SKU path replaces the two-stage recall (ItemCF + hotness fallback) with a union of ItemCF and cust_code_list. The new-SKU path merges cust_code_list into the Item2Vec candidate pool so all customers are scored together via the same sales-volume normalization. Delivery allocation is removed end-to-end.
Tech Stack: Python 3, FastAPI, Pydantic, pandas, numpy, scikit-learn, Redis (ItemCF recall), MySQL (DAO layer), LightGBM + Logistic Regression (GBDT-LR), Item2Vec embeddings.
RecommendRequest schemaFiles:
Modify: api/request_body.py
[ ] Step 1: Open and read api/request_body.py
Confirm current fields: city_uuid, product_code, recall_cust_count, delivery_count, cultivacation_id, limit_cycle_name.
[ ] Step 2: Replace the schema
from pydantic import BaseModel
from typing import List
class RecommendRequest(BaseModel):
city_uuid: str
product_code: str
cust_code_list: List[str] # 核心零售户ID列表
cultivacation_id: str
limit_cycle_name: str
Remove recall_cust_count: int and delivery_count: int. Add cust_code_list: List[str].
[ ] Step 3: Verify no syntax errors
cd D:/projiect/dingsheng/BrandCultivation && python -c "from api.request_body import RecommendRequest; print('OK')"
Expected: OK
[ ] Step 4: Commit
git add api/request_body.py
git commit -m "refactor(api): replace recall_cust_count/delivery_count with cust_code_list in RecommendRequest"
api/recommend.py endpointFiles:
Modify: api/recommend.py
[ ] Step 1: Update the recommend() function signature and body
Replace the entire recommend() function (lines 15–54) with:
@router.post("/recommend")
async def recommend(request: RecommendRequest, backgroundTasks: BackgroundTasks):
"""推荐接口"""
logger.info(f"Recommend request: city={request.city_uuid}, product={request.product_code}, core_custs={len(request.cust_code_list)}")
gbdtlr_model_path = os.path.join("./models/rank/weights", request.city_uuid, "gbdtlr_model.pkl")
if not os.path.exists(gbdtlr_model_path):
logger.warning(f"Model not found: {gbdtlr_model_path}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="该城市的模型未训练,请先进行训练",
)
recommend_model = Recommend(request.city_uuid)
products_in_order = dao.get_product_from_order(request.city_uuid)["product_code"].unique().tolist()
if request.product_code in products_in_order:
logger.info(f"Using GBDT-LR model for existing product {request.product_code}")
recommend_list = recommend_model.get_recommend_list_by_gbdtlr(
request.product_code, cust_code_list=request.cust_code_list
)
else:
logger.info(f"Using Item2Vec model for new product {request.product_code}")
recommend_list = recommend_model.get_recommend_list_by_item2vec(
request.product_code, cust_code_list=request.cust_code_list
)
request_data = []
for index, data in enumerate(recommend_list):
request_data.append(
{
"id": index + 1,
"cust_code": data["cust_code"],
"recommend_score": data["recommend_score"],
}
)
logger.info(f"Recommend completed: {len(request_data)} customers recommended")
backgroundTasks.add_task(generate_and_upload_report, request)
return {"code": 200, "msg": "success", "data": {"recommendationInfo": request_data}}
Key changes:
recall_count and delivery_count from all callscust_code_list=request.cust_code_list to both model methodsget_recommend_and_delivery() callRemove delivery_count from response items
[ ] Step 2: Update generate_and_upload_report() background task
Replace lines 57–85 with:
def generate_and_upload_report(request: RecommendRequest):
"""生成并上传报告到阿里云文件数据库"""
logger.info(f"Background task started: generating report for {request.city_uuid}/{request.product_code}")
try:
report_util = ReportUtils(request.city_uuid, request.product_code)
report_util.generate_all_data(request.cust_code_list)
reports_dir = os.path.join("./data/reports", request.city_uuid, request.product_code)
report_files = ["卷烟信息表", "品规商户特征关系表", "相似卷烟表", "商户售卖推荐表"]
file_id_map = FileStreamUtils.upload_files(reports_dir, report_files)
if file_id_map is None:
logger.error(f"Report upload failed for {request.city_uuid}/{request.product_code}")
return
data_dict = {
"cultivacation_id": request.cultivacation_id,
"city_uuid": request.city_uuid,
"limit_cycle_name": request.limit_cycle_name,
"product_code": request.product_code,
"product_info_table": file_id_map.get("卷烟信息表"),
"relation_table": file_id_map.get("品规商户特征关系表"),
"similarity_product_table": file_id_map.get("相似卷烟表"),
"recommend_table": file_id_map.get("商户售卖推荐表"),
}
dao.insert_report(data_dict)
logger.info(f"Background task completed: report uploaded for {request.city_uuid}/{request.product_code}")
except Exception as e:
logger.error(f"Background task failed: {e}", exc_info=True)
Key change: generate_all_data(request.recall_cust_count, request.delivery_count) → generate_all_data(request.cust_code_list).
[ ] Step 3: Verify imports still valid
cd D:/projiect/dingsheng/BrandCultivation && python -c "from api.recommend import router; print('OK')"
Expected: OK
[ ] Step 4: Commit
git add api/recommend.py
git commit -m "refactor(api): remove delivery allocation, pass cust_code_list to recommend models"
models/recommend.pyFiles:
models/recommend.pyThe goal: replace get_recal_cust(product_id, recall_count) with a new method that takes cust_code_list and returns the union of ItemCF recall and cust_code_list, with no hotness fallback and no count cap.
_get_hot_recall() methodDelete the entire method (lines 35–39):
def _get_hot_recall(self):
"""热度召回"""
key = f"hot:{self._city_uuid}:sale_qty"
recall_list = self._redis.zrevrange(key, 0, -1, withscores=False)
return recall_list
get_recal_cust() to accept cust_code_listReplace the old get_recal_cust(self, product_id, recall_count) (lines 41–55) with:
def get_recal_cust(self, product_id, cust_code_list):
"""通过协同过滤召回与核心零售户列表取并集,得到待推荐商户列表"""
itemcf_recall_list = self._get_itemcf_recall(product_id)
# 并集:保留 itemcf 顺序,追加 cust_code_list 中不重复的部分
seen = set(itemcf_recall_list)
extra = [c for c in cust_code_list if c not in seen]
result = list(itemcf_recall_list) + extra
logger.info(f"Recall completed: {len(result)} customers (itemcf={len(itemcf_recall_list)}, core_extra={len(extra)}) for product {product_id}")
return result
get_recommend_list_by_gbdtlr() signature and recall callReplace the method signature and the first recall call (lines 57–61):
Old:
def get_recommend_list_by_gbdtlr(self, product_id, recall_count=500):
...
recall_cust_list = self.get_recal_cust(product_id, recall_count)
New:
def get_recommend_list_by_gbdtlr(self, product_id, cust_code_list=None):
"""根据gbdt_lr获取商户推荐列表"""
if cust_code_list is None:
cust_code_list = []
logger.info(f"GBDT-LR recommend started for product {product_id}")
recall_cust_list = self.get_recal_cust(product_id, cust_code_list)
No other changes to the GBDT-LR method body are needed — it already handles the full recall_cust_list without a count cap in its feature fetching and scoring logic.
get_recommend_and_delivery() methodDelete the entire method (lines 109–133):
def get_recommend_and_delivery(self, recommend_list, delivery_count=5000):
...
[ ] Step 5: Verify syntax
cd D:/projiect/dingsheng/BrandCultivation && python -c "from models.recommend import Recommend; print('OK')"
Expected: OK
[ ] Step 6: Commit
git add models/recommend.py
git commit -m "refactor(models): remove hotness fallback, merge cust_code_list into ItemCF recall, remove delivery allocation"
get_recommend_list_by_item2vec() in models/recommend.pyFiles:
models/recommend.pymodels/item2vec/inference.pyThe goal: cust_code_list customers are added into the Item2Vec candidate pool and scored together with the rest via the same sales-volume normalization pipeline.
get_recommend_list_by_item2vec() in models/recommend.pyReplace the old method (lines 89–98):
Old:
def get_recommend_list_by_item2vec(self, product_id, recall_count=500):
logger.info(f"Item2Vec recommend started for product {product_id}")
recommend_list = self._item2vec_model.get_recommend_cust_list(product_id, top=recall_count)
recommend_list = recommend_list.drop(columns=["sale_qty"])
recommend_list = recommend_list.to_dict(orient='records')
recommend_list = recommend_list[:recall_count]
logger.info(f"Item2Vec recommend completed: {len(recommend_list)} results")
return recommend_list
New:
def get_recommend_list_by_item2vec(self, product_id, cust_code_list=None):
"""根据item2vec获取商户推荐列表,核心商户并入候选集统一评分"""
if cust_code_list is None:
cust_code_list = []
logger.info(f"Item2Vec recommend started for product {product_id}")
recommend_list = self._item2vec_model.get_recommend_cust_list(product_id, cust_code_list=cust_code_list)
recommend_list = recommend_list.drop(columns=["sale_qty"])
recommend_list = recommend_list.to_dict(orient='records')
logger.info(f"Item2Vec recommend completed: {len(recommend_list)} results")
return recommend_list
Key changes:
recall_count with cust_code_listcust_code_list down to get_recommend_cust_list()Remove the [:recall_count] slice — return all scored customers
[ ] Step 2: Update get_recommend_cust_list() in models/item2vec/inference.py
The method currently builds a candidate pool from similar-product order history and scores them. We need to:
cust_code_list parameterrecommend_cust from order data, union in any cust_code_list members not yet present (with sale_qty=0 so they enter the normalization pipeline naturally)Replace the get_recommend_cust_list() method (lines 38–70):
def get_recommend_cust_list(self, product_code, top=100, cust_code_list=None):
"""获取推荐的商户列表,核心商户并入候选集统一评分"""
if cust_code_list is None:
cust_code_list = []
logger.info(f"Getting recommend list for product {product_code}, top={top}")
product_list = self.get_similarity_list(product_code)
order_data = self._dao.get_order_by_product_ids(self._city_uuid, product_list)[OrderConfig.FEATURE_COLUMNS]
order_data["sale_qty"] = order_data["sale_qty"].fillna(0)
order_data = order_data.groupby(["cust_code", "product_code"], as_index=False)["sale_qty"].mean()
# 按照卷烟分组,取每款卷烟售卖最好的前50个商户
order_data = (
order_data
.sort_values(["product_code", "sale_qty", "cust_code"], ascending=[True, False, True])
.groupby("product_code")
.head(top)
)
recommend_cust = (
order_data.groupby(["cust_code"], as_index=False)["sale_qty"].sum()
.query("sale_qty > 0")
.sort_values(["sale_qty", "cust_code"], ascending=[False, True])
)
# 将 cust_code_list 中不在候选集的商户补入,sale_qty=0 参与归一化
existing_custs = set(recommend_cust["cust_code"].tolist())
extra_rows = [{"cust_code": c, "sale_qty": 0} for c in cust_code_list if c not in existing_custs]
if extra_rows:
extra_df = pd.DataFrame(extra_rows)
recommend_cust = pd.concat([recommend_cust, extra_df], ignore_index=True)
# log1p + StandardScaler + sigmoid 归一化(对全部候选集统一做)
log_qty = np.log1p(recommend_cust["sale_qty"].values).reshape(-1, 1)
scaler = StandardScaler()
normalized = scaler.fit_transform(log_qty)
recommend_cust["recommend_score"] = ((1 / (1 + np.exp(-normalized))) * 100).flatten()
# 按分数降序返回
recommend_cust = recommend_cust.sort_values(["recommend_score", "cust_code"], ascending=[False, True]).reset_index(drop=True)
return recommend_cust
Key changes:
cust_code_list parameter (default [])recommend_cust from order history, concat any missing cust_code_list members with sale_qty=0sale_qty=0 will naturally score low but are presentRemove the old top cap at the end (it was recommend_list[:recall_count] in the caller, which is also removed)
[ ] Step 3: Verify syntax for both files
cd D:/projiect/dingsheng/BrandCultivation && python -c "from models.recommend import Recommend; from models.item2vec.inference import Item2VecModel; print('OK')"
Expected: OK
[ ] Step 4: Commit
git add models/recommend.py models/item2vec/inference.py
git commit -m "refactor(models): merge cust_code_list into Item2Vec candidate pool for unified scoring"
Files:
utils/report_utils.pyModify: utils/reports_process.py
[ ] Step 1: Update generate_recommend_report() in utils/report_utils.py
The method currently calls calculate_delivery_by_recommend_data(..., delivery_count). We remove delivery_count and update the call.
Replace generate_recommend_report() (lines 91–100):
def generate_recommend_report(self, recall_count):
"""生成推荐报告"""
logger.info("Generating recommend report")
recommend_data = self._get_recommend_data(recall_count)
recommend_list = list(map(lambda x: x["cust_code"], recommend_data))
recommend_cust_infos = self._dao.get_cust_by_ids(self._city_uuid, recommend_list)
report = build_recommend_report(recommend_data, recommend_cust_infos)
report.to_excel(os.path.join(self._save_dir, "商户售卖推荐表.xlsx"), index=False)
logger.info("Recommend report saved")
Note: rename helper function call from calculate_delivery_by_recommend_data to build_recommend_report (defined in next step).
generate_all_data() in utils/report_utils.pyReplace generate_all_data() (lines 160–167):
def generate_all_data(self, cust_code_list):
logger.info("Generating all reports")
self.generate_feats_ralation_report(cust_code_list)
self.generate_product_report()
self.generate_recommend_report(cust_code_list)
self.generate_similarity_product_report()
logger.info("All reports generated")
_get_recommend_data() in utils/report_utils.pyThe method currently accepts recall_count and passes it to the model methods. Replace with cust_code_list:
Replace _get_recommend_data() (lines 26–39):
def _get_recommend_data(self, cust_code_list):
"""获取推荐商户列表"""
products_in_order = self._dao.get_product_from_order(self._city_uuid)["product_code"].unique().tolist()
if self._product_id in products_in_order:
recommend_data = self._recommend_model.get_recommend_list_by_gbdtlr(
self._product_id, cust_code_list=cust_code_list
)
else:
recommend_data = self._recommend_model.get_recommend_list_by_item2vec(
self._product_id, cust_code_list=cust_code_list
)
return recommend_data
generate_feats_ralation_report() signature in utils/report_utils.pyIt calls _generate_feats_map(recall_count) internally. Update both:
Replace generate_feats_ralation_report() (lines 69–79):
def generate_feats_ralation_report(self, cust_code_list):
"""生成特征相关性分析报告"""
logger.info("Generating feature relation report")
feats_map = self._generate_feats_map(cust_code_list)
product_content = self._get_product_content()
shap_result = self._recommend_model._gbdtlr_model.generate_shap_interance(feats_map)
report = feats_relation_process(shap_result, product_content)
report.to_excel(os.path.join(self._save_dir, "品规商户特征关系表.xlsx"), index=False)
logger.info("Feature relation report saved")
Replace _generate_feats_map() (lines 41–61):
def _generate_feats_map(self, cust_code_list):
"""根据召回的推荐列表生成品规-商户features_map"""
recommend_data = self._get_recommend_data(cust_code_list)
recommend_list = list(map(lambda x: x["cust_code"], recommend_data))
product_data = self._product_data.copy()
cust_data = self._dao.get_cust_by_ids(self._city_uuid, recommend_list)[CustConfig.FEATURE_COLUMNS]
product_data = sample_data_clear(product_data, ProductConfig)
cust_data = sample_data_clear(cust_data, CustConfig)
feats_map = generate_feats_map(product_data, cust_data)
return feats_map
calculate_delivery_by_recommend_data() → build_recommend_report() in utils/reports_process.pyReplace calculate_delivery_by_recommend_data() (lines 33–73) with a new function that drops all delivery allocation logic:
def build_recommend_report(recommend_data, recommend_cust_infos):
"""根据推荐数据生成推荐商户报告(不含投放量)"""
recommend_data = pd.DataFrame(recommend_data)
cust_ids = recommend_cust_infos.set_index("cust_code")
recommend_data = recommend_data.join(cust_ids, on="cust_code", how="inner")
recommend_data = recommend_data[["cust_code", "cust_name", "recommend_score"]]
recommend_data = recommend_data.reset_index(drop=True)
recommend_data.index = recommend_data.index + 1
recommend_data = recommend_data.reset_index()
recommend_data = recommend_data.rename(
columns={
"index": "推荐序号",
"cust_code": "商户编号",
"cust_name": "商户名称",
"recommend_score": "推荐系数",
}
)
return recommend_data
utils/report_utils.pyThe import on line 10 currently imports calculate_delivery_by_recommend_data. Update it:
from utils.reports_process import feats_relation_process, build_recommend_report, eval_report_process_pre, eval_report_process
generate_eval_data() still compiles — it references "建议投放量(条)" columnIn utils/report_utils.py line 153:
recommend_data = recommend_data.drop(columns=["建议投放量(条)"])
This column no longer exists in the new report. Remove that line:
def generate_eval_data(self, start_time, end_time, recommend_data):
"""根据推荐列表生成验证报告"""
logger.info("Generating eval report")
if self._product_id == '350139':
eval_product_id = "350355"
else:
eval_product_id = self._product_id
delivery_data = self._dao.get_delivery_data_by_product(self._city_uuid, eval_product_id, start_time, end_time)
delivery_data = delivery_data[DeliveryConfig.FEATURE_COLUMNS]
delivery_data = sample_data_clear(delivery_data, DeliveryConfig)
report = eval_report_process(delivery_data, recommend_data)
report.to_excel(os.path.join(self._save_dir, "投放验证报告.xlsx"), index=False)
logger.info("Eval report saved")
[ ] Step 8: Verify imports and syntax
cd D:/projiect/dingsheng/BrandCultivation && python -c "from utils.report_utils import ReportUtils; from utils.reports_process import build_recommend_report; print('OK')"
Expected: OK
[ ] Step 9: Commit
git add utils/report_utils.py utils/reports_process.py
git commit -m "refactor(utils): remove delivery_count from recommend report, pass cust_code_list through report pipeline"
Files:
Read: all modified files (no new changes)
[ ] Step 1: Import all changed modules
cd D:/projiect/dingsheng/BrandCultivation && python -c "
from api.request_body import RecommendRequest
from api.recommend import router
from models.recommend import Recommend
from models.item2vec.inference import Item2VecModel
from utils.report_utils import ReportUtils
from utils.reports_process import build_recommend_report
print('All imports OK')
"
Expected: All imports OK
[ ] Step 2: Validate RecommendRequest schema with sample data
cd D:/projiect/dingsheng/BrandCultivation && python -c "
from api.request_body import RecommendRequest
r = RecommendRequest(
city_uuid='test_city',
product_code='350139',
cust_code_list=['C001', 'C002'],
cultivacation_id='CULT_001',
limit_cycle_name='2026-W01'
)
print('cust_code_list:', r.cust_code_list)
assert r.cust_code_list == ['C001', 'C002']
print('Schema validation OK')
"
Expected: Schema validation OK
[ ] Step 3: Confirm old fields are gone
cd D:/projiect/dingsheng/BrandCultivation && python -c "
from api.request_body import RecommendRequest
import inspect
fields = RecommendRequest.model_fields
assert 'recall_cust_count' not in fields, 'recall_cust_count still present'
assert 'delivery_count' not in fields, 'delivery_count still present'
assert 'cust_code_list' in fields, 'cust_code_list missing'
print('Field removal verified OK')
"
Expected: Field removal verified OK
[ ] Step 4: Confirm get_recommend_and_delivery is gone from Recommend
cd D:/projiect/dingsheng/BrandCultivation && python -c "
from models.recommend import Recommend
assert not hasattr(Recommend, 'get_recommend_and_delivery'), 'method still exists'
assert not hasattr(Recommend, '_get_hot_recall'), 'hot recall still exists'
print('Method removal verified OK')
"
Expected: Method removal verified OK
[ ] Step 5: Final commit
git add -p # verify nothing unintended is staged
git commit -m "test: smoke-test all changed modules for recommend API refactor" --allow-empty
(Use --allow-empty only if there are no file changes at this step — this is a verification-only task.)