| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354 |
- from database.dao.mysql_dao import MySqlDao
- from database.db.redis_db import RedisDatabaseHelper
- import os
- from models.item2vec.inference import Item2VecModel
- from models.rank.data.config import CustConfig, ProductConfig
- from models.rank.data.utils import sample_data_clear
- from models.rank import GbdtLrModel, generate_feats_map
- import pandas as pd
- from core import get_logger
- logger = get_logger("models.recommend")
- CORE_RERANK_CONFIG = {
- "existing": {
- "core_model_weight": 0.75,
- "core_quality_weight": 0.15,
- "core_boost": 35,
- "low_model_threshold": 35,
- "low_model_weight": 0.85,
- "low_quality_weight": 0.10,
- "low_core_boost": 65,
- "normal_model_weight": 0.90,
- },
- "new": {
- "core_model_weight": 0.55,
- "core_quality_weight": 0.25,
- "core_boost": 50,
- "normal_model_weight": 0.90,
- },
- }
- class Recommend:
- def __init__(self, city_uuid):
- self._redis = RedisDatabaseHelper().redis
- self._dao = MySqlDao()
-
- self._load_molde(city_uuid)
-
- def _load_molde(self, city_uuid):
- """加载推演模型"""
- self._city_uuid = city_uuid
- gbdtlr_model_path = os.path.join("./models/rank/weights", city_uuid, "gbdtlr_model.pkl")
- self._gbdtlr_model = GbdtLrModel(gbdtlr_model_path)
- self._item2vec_model = Item2VecModel(city_uuid)
- logger.info(f"Models loaded for city_uuid={city_uuid}")
-
- def _get_itemcf_recall(self, product_id):
- """协同召回"""
- key = f"fc:{self._city_uuid}:{product_id}"
- recall_list = self._redis.zrevrange(key, 0, -1, withscores=False)
- return recall_list
-
- def get_recal_cust(self, product_id, cust_code_list):
- """通过协同过滤召回与核心零售户列表取并集,得到待推荐商户列表"""
- itemcf_recall_list = self._get_itemcf_recall(product_id)
- seen = set(itemcf_recall_list)
- extra = [c for c in cust_code_list if c not in seen]
- result = list(itemcf_recall_list) + extra
- logger.info(f"Recall completed: {len(result)} customers (itemcf={len(itemcf_recall_list)}, core_extra={len(extra)}) for product {product_id}")
- return result
- def get_recommend_list_by_gbdtlr(self, product_id, cust_code_list=None):
- """根据gbdt_lr获取商户推荐列表"""
- logger.info(f"GBDT-LR recommend started for product {product_id}")
- # 获取召回的商户列表
- if cust_code_list is None:
- cust_code_list = []
- recall_cust_list = self.get_recal_cust(product_id, cust_code_list)
- # 获取卷烟数据
- product_data = self._dao.get_product_by_id(self._city_uuid, product_id)[ProductConfig.FEATURE_COLUMNS]
- product_data = sample_data_clear(product_data, ProductConfig)
-
- # 获取整合商户数据
- cust_data = self._dao.get_cust_by_ids(self._city_uuid, recall_cust_list)[CustConfig.FEATURE_COLUMNS]
- # shop_data = self._dao.get_shop_by_ids(self._city_uuid, recall_cust_list)[ShopConfig.FEATURE_COLUMNS]
- cust_data = sample_data_clear(cust_data, CustConfig)
- # shop_data = sample_data_clear(shop_data, ShopConfig)
- # cust_feats = shop_data.set_index("cust_code")
- # cust_data = cust_data.join(cust_feats, on="BB_RETAIL_CUSTOMER_CODE", how="inner")
- # 按 recall_cust_list 顺序对齐 cust_data,确保 feats_map 行顺序与 recall_list 一致
- # 否则 get_recommend_list 中 zip(recall_list, scores) 会错配商户ID和分数
- cust_codes_in_data = set(cust_data["cust_code"].tolist())
- ordered_recall_list = [c for c in recall_cust_list if c in cust_codes_in_data]
- cust_order = {code: i for i, code in enumerate(ordered_recall_list)}
- cust_data = cust_data.sort_values("cust_code", key=lambda x: x.map(cust_order)).reset_index(drop=True)
- # 获取推理用的feats_map
- feats_map = generate_feats_map(product_data, cust_data)
- recommend_list = self._gbdtlr_model.get_recommend_list(feats_map, ordered_recall_list)
- recommend_list = self._rerank_existing_product(recommend_list, cust_code_list)
- # recommend_list = self.filter_recommend_list(recommend_list)
- logger.info(f"GBDT-LR recommend completed: {len(recommend_list)} results")
- return recommend_list
-
- def get_recommend_list_by_item2vec(self, product_id, cust_code_list=None):
- """根据item2vec获取商户推荐列表,核心商户并入候选集统一评分"""
- if cust_code_list is None:
- cust_code_list = []
- logger.info(f"Item2Vec recommend started for product {product_id}")
- recommend_list = self._item2vec_model.get_recommend_cust_list(product_id, cust_code_list=cust_code_list)
- recommend_list = recommend_list.drop(columns=["sale_qty"])
- recommend_list = recommend_list.to_dict(orient='records')
- recommend_list = self._rerank_new_product(recommend_list, cust_code_list)
- # recommend_list = self.filter_recommend_list(recommend_list)
- logger.info(f"Item2Vec recommend completed: {len(recommend_list)} results")
- return recommend_list
- def _rerank_existing_product(self, recommend_list, core_cust_list):
- """Rerank existing-product results with core-customer boosts, then sort by score."""
- core_set = {str(cust_code) for cust_code in (core_cust_list or [])}
- if not core_set or not recommend_list:
- return recommend_list
- quality_score_map = self._build_quality_score_map(core_set)
- cfg = CORE_RERANK_CONFIG["existing"]
- for item in recommend_list:
- cust_code = str(item["cust_code"])
- model_score = float(item.get("recommend_score", 0) or 0)
- is_core = cust_code in core_set
- quality_score = quality_score_map.get(cust_code, 60.0)
- if is_core:
- if model_score >= cfg["low_model_threshold"]:
- final_score = (
- model_score * cfg["core_model_weight"]
- + quality_score * cfg["core_quality_weight"]
- + cfg["core_boost"]
- )
- else:
- final_score = (
- model_score * cfg["low_model_weight"]
- + quality_score * cfg["low_quality_weight"]
- + cfg["low_core_boost"]
- )
- else:
- final_score = model_score * cfg["normal_model_weight"]
- item["model_score"] = model_score
- item["is_core_cust"] = is_core
- item["core_quality_score"] = quality_score if is_core else None
- item["recommend_score"] = min(float(final_score), 100.0)
- recommend_list.sort(key=lambda x: x["recommend_score"], reverse=True)
- logger.info(f"Core boost rerank completed for existing product: core_count={len(core_set)}")
- return recommend_list
- def _rerank_new_product(self, recommend_list, core_cust_list):
- """Rerank Item2Vec cold-start results with core-customer boosts, then sort by score."""
- core_set = {str(cust_code) for cust_code in (core_cust_list or [])}
- if not core_set or not recommend_list:
- return recommend_list
- quality_score_map = self._build_quality_score_map(core_set)
- cfg = CORE_RERANK_CONFIG["new"]
- for item in recommend_list:
- cust_code = str(item["cust_code"])
- model_score = float(item.get("recommend_score", 0) or 0)
- is_core = cust_code in core_set
- quality_score = quality_score_map.get(cust_code, 60.0)
- if is_core:
- final_score = (
- model_score * cfg["core_model_weight"]
- + quality_score * cfg["core_quality_weight"]
- + cfg["core_boost"]
- )
- else:
- final_score = model_score * cfg["normal_model_weight"]
- item["item2vec_score"] = model_score
- item["is_core_cust"] = is_core
- item["core_quality_score"] = quality_score if is_core else None
- item["recommend_score"] = min(float(final_score), 100.0)
- recommend_list.sort(key=lambda x: x["recommend_score"], reverse=True)
- logger.info(f"Core boost rerank completed for new product: core_count={len(core_set)}")
- return recommend_list
- def _build_quality_score_map(self, cust_list):
- """Build a 0-100 business-quality score for candidate customers."""
- if not cust_list:
- return {}
- unique_cust_list = list(dict.fromkeys(str(cust_code) for cust_code in cust_list))
- cust_data = self._dao.get_cust_by_ids(self._city_uuid, unique_cust_list)
- if cust_data.empty:
- return {cust_code: 60.0 for cust_code in unique_cust_list}
- score_map = {}
- for _, row in cust_data.iterrows():
- cust_code = row.get("cust_code")
- if pd.isna(cust_code):
- continue
- score_map[str(cust_code)] = self._calculate_core_quality_score(row)
- for cust_code in unique_cust_list:
- score_map.setdefault(cust_code, 60.0)
- return score_map
- def _calculate_core_quality_score(self, row):
- """Calculate a 0-100 quality score using only fields defined in CustConfig."""
- field_scores = [
- ("terminal_star_name", {
- "五星终端": 100,
- "四星终端": 90,
- "三星终端": 80,
- "二星终端": 70,
- "一星终端": 60,
- "其他": 50,
- "无": 40,
- }, 0.18),
- ("cooperate_codename", {
- "好": 90,
- "较好": 75,
- "一般": 60,
- }, 0.14),
- ("store_appearance_name", {
- "好": 90,
- "较好": 75,
- "一般": 60,
- "差": 40,
- }, 0.12),
- ("is_modern_terminalname", {
- "是": 85,
- "否": 55,
- }, 0.10),
- ("modern_terminal_name", {
- "直营终端": 95,
- "合作终端": 90,
- "加盟终端": 85,
- "一般现代终端": 75,
- "普通终端": 60,
- "无法识别": 50,
- }, 0.08),
- ("cooperate_type_name", {
- "品牌加盟": 90,
- "冠名加盟": 85,
- "无": 55,
- }, 0.08),
- ("creditclass_name", {
- "AAA": 95,
- "AA": 90,
- "A": 85,
- "C": 60,
- "D": 45,
- }, 0.10),
- ("counter_status_name", {
- "有": 80,
- "计划中": 65,
- "无": 50,
- }, 0.05),
- ("counter_put_type_name", {
- "独立陈列": 85,
- "混杂陈列": 70,
- "无陈列": 50,
- }, 0.05),
- ("back_counter_status_name", {
- "有": 80,
- "计划中": 65,
- "无": 50,
- }, 0.04),
- ("back_counter_put_type_name", {
- "独立陈列": 85,
- "混杂陈列": 70,
- "无陈列": 50,
- }, 0.03),
- ("back_counter_has_show_name", {
- "有": 80,
- "无": 50,
- }, 0.03),
- ]
- weighted_score = 0.0
- total_weight = 0.0
- for field, score_map, weight in field_scores:
- score = self._score_by_config_value(row, field, score_map)
- if score is None:
- continue
- weighted_score += score * weight
- total_weight += weight
- for field, weight in [("counter_number", 0.05), ("back_counter_number", 0.05)]:
- score = self._counter_score(self._get_row_value(row, field))
- if score is None:
- continue
- weighted_score += score * weight
- total_weight += weight
- if total_weight == 0:
- return 60.0
- return round(weighted_score / total_weight, 4)
- def _score_by_config_value(self, row, field, score_map):
- if field not in CustConfig.FEATURE_COLUMNS:
- return None
- value = self._get_row_value(row, field)
- if pd.isna(value):
- return None
- text = str(value)
- if text not in CustConfig.ONEHOT_CAT.get(field, []):
- return None
- return float(score_map.get(text, 60.0))
- @staticmethod
- def _get_row_value(row, field):
- if field not in row.index:
- return None
- return row.get(field)
- @staticmethod
- def _counter_score(value):
- if pd.isna(value):
- return None
- try:
- number = float(value)
- except (TypeError, ValueError):
- return None
- if number <= 0:
- return 50.0
- if number >= 4:
- return 90.0
- return 50.0 + number * 10.0
-
- def filter_recommend_list(self, recommend_list):
- """过滤掉已经歇业的商铺"""
- cust_set = set(self._dao.get_cust_list(self._city_uuid))
- filter_recommend_list = [
- item for item in recommend_list
- if item["cust_code"] in cust_set
- ]
- return filter_recommend_list
- if __name__ == "__main__":
- city_uuid = "00000000000000000000000011445301"
- product_id = '350139'
- recommend = Recommend(city_uuid)
- recommend_list = recommend.get_recommend_list_by_gbdtlr(product_id)
- # for i in recommend_list:
- # print(i)
|