from database.dao.mysql_dao import MySqlDao from database.db.redis_db import RedisDatabaseHelper import os from models.item2vec.inference import Item2VecModel from models.rank.data.config import CustConfig, ProductConfig from models.rank.data.utils import sample_data_clear from models.rank import GbdtLrModel, generate_feats_map import pandas as pd from core import get_logger logger = get_logger("models.recommend") CORE_RERANK_CONFIG = { "existing": { "core_model_weight": 0.75, "core_quality_weight": 0.15, "core_boost": 35, "low_model_threshold": 35, "low_model_weight": 0.85, "low_quality_weight": 0.10, "low_core_boost": 65, "normal_model_weight": 0.90, }, "new": { "core_model_weight": 0.55, "core_quality_weight": 0.25, "core_boost": 50, "normal_model_weight": 0.90, }, } class Recommend: def __init__(self, city_uuid): self._redis = RedisDatabaseHelper().redis self._dao = MySqlDao() self._load_molde(city_uuid) def _load_molde(self, city_uuid): """加载推演模型""" self._city_uuid = city_uuid gbdtlr_model_path = os.path.join("./models/rank/weights", city_uuid, "gbdtlr_model.pkl") self._gbdtlr_model = GbdtLrModel(gbdtlr_model_path) self._item2vec_model = Item2VecModel(city_uuid) logger.info(f"Models loaded for city_uuid={city_uuid}") def _get_itemcf_recall(self, product_id): """协同召回""" key = f"fc:{self._city_uuid}:{product_id}" recall_list = self._redis.zrevrange(key, 0, -1, withscores=False) return recall_list def get_recal_cust(self, product_id, cust_code_list): """通过协同过滤召回与核心零售户列表取并集,得到待推荐商户列表""" itemcf_recall_list = self._get_itemcf_recall(product_id) seen = set(itemcf_recall_list) extra = [c for c in cust_code_list if c not in seen] result = list(itemcf_recall_list) + extra logger.info(f"Recall completed: {len(result)} customers (itemcf={len(itemcf_recall_list)}, core_extra={len(extra)}) for product {product_id}") return result def get_recommend_list_by_gbdtlr(self, product_id, cust_code_list=None): """根据gbdt_lr获取商户推荐列表""" logger.info(f"GBDT-LR recommend started for product {product_id}") # 获取召回的商户列表 if cust_code_list is None: cust_code_list = [] recall_cust_list = self.get_recal_cust(product_id, cust_code_list) # 获取卷烟数据 product_data = self._dao.get_product_by_id(self._city_uuid, product_id)[ProductConfig.FEATURE_COLUMNS] product_data = sample_data_clear(product_data, ProductConfig) # 获取整合商户数据 cust_data = self._dao.get_cust_by_ids(self._city_uuid, recall_cust_list)[CustConfig.FEATURE_COLUMNS] # shop_data = self._dao.get_shop_by_ids(self._city_uuid, recall_cust_list)[ShopConfig.FEATURE_COLUMNS] cust_data = sample_data_clear(cust_data, CustConfig) # shop_data = sample_data_clear(shop_data, ShopConfig) # cust_feats = shop_data.set_index("cust_code") # cust_data = cust_data.join(cust_feats, on="BB_RETAIL_CUSTOMER_CODE", how="inner") # 按 recall_cust_list 顺序对齐 cust_data,确保 feats_map 行顺序与 recall_list 一致 # 否则 get_recommend_list 中 zip(recall_list, scores) 会错配商户ID和分数 cust_codes_in_data = set(cust_data["cust_code"].tolist()) ordered_recall_list = [c for c in recall_cust_list if c in cust_codes_in_data] cust_order = {code: i for i, code in enumerate(ordered_recall_list)} cust_data = cust_data.sort_values("cust_code", key=lambda x: x.map(cust_order)).reset_index(drop=True) # 获取推理用的feats_map feats_map = generate_feats_map(product_data, cust_data) recommend_list = self._gbdtlr_model.get_recommend_list(feats_map, ordered_recall_list) recommend_list = self._rerank_existing_product(recommend_list, cust_code_list) # recommend_list = self.filter_recommend_list(recommend_list) logger.info(f"GBDT-LR recommend completed: {len(recommend_list)} results") return recommend_list def get_recommend_list_by_item2vec(self, product_id, cust_code_list=None): """根据item2vec获取商户推荐列表,核心商户并入候选集统一评分""" if cust_code_list is None: cust_code_list = [] logger.info(f"Item2Vec recommend started for product {product_id}") recommend_list = self._item2vec_model.get_recommend_cust_list(product_id, cust_code_list=cust_code_list) recommend_list = recommend_list.drop(columns=["sale_qty"]) recommend_list = recommend_list.to_dict(orient='records') recommend_list = self._rerank_new_product(recommend_list, cust_code_list) # recommend_list = self.filter_recommend_list(recommend_list) logger.info(f"Item2Vec recommend completed: {len(recommend_list)} results") return recommend_list def _rerank_existing_product(self, recommend_list, core_cust_list): """Rerank existing-product results with core-customer boosts, then sort by score.""" core_set = {str(cust_code) for cust_code in (core_cust_list or [])} if not core_set or not recommend_list: return recommend_list quality_score_map = self._build_quality_score_map(core_set) cfg = CORE_RERANK_CONFIG["existing"] for item in recommend_list: cust_code = str(item["cust_code"]) model_score = float(item.get("recommend_score", 0) or 0) is_core = cust_code in core_set quality_score = quality_score_map.get(cust_code, 60.0) if is_core: if model_score >= cfg["low_model_threshold"]: final_score = ( model_score * cfg["core_model_weight"] + quality_score * cfg["core_quality_weight"] + cfg["core_boost"] ) else: final_score = ( model_score * cfg["low_model_weight"] + quality_score * cfg["low_quality_weight"] + cfg["low_core_boost"] ) else: final_score = model_score * cfg["normal_model_weight"] item["model_score"] = model_score item["is_core_cust"] = is_core item["core_quality_score"] = quality_score if is_core else None item["recommend_score"] = min(float(final_score), 100.0) recommend_list.sort(key=lambda x: x["recommend_score"], reverse=True) logger.info(f"Core boost rerank completed for existing product: core_count={len(core_set)}") return recommend_list def _rerank_new_product(self, recommend_list, core_cust_list): """Rerank Item2Vec cold-start results with core-customer boosts, then sort by score.""" core_set = {str(cust_code) for cust_code in (core_cust_list or [])} if not core_set or not recommend_list: return recommend_list quality_score_map = self._build_quality_score_map(core_set) cfg = CORE_RERANK_CONFIG["new"] for item in recommend_list: cust_code = str(item["cust_code"]) model_score = float(item.get("recommend_score", 0) or 0) is_core = cust_code in core_set quality_score = quality_score_map.get(cust_code, 60.0) if is_core: final_score = ( model_score * cfg["core_model_weight"] + quality_score * cfg["core_quality_weight"] + cfg["core_boost"] ) else: final_score = model_score * cfg["normal_model_weight"] item["item2vec_score"] = model_score item["is_core_cust"] = is_core item["core_quality_score"] = quality_score if is_core else None item["recommend_score"] = min(float(final_score), 100.0) recommend_list.sort(key=lambda x: x["recommend_score"], reverse=True) logger.info(f"Core boost rerank completed for new product: core_count={len(core_set)}") return recommend_list def _build_quality_score_map(self, cust_list): """Build a 0-100 business-quality score for candidate customers.""" if not cust_list: return {} unique_cust_list = list(dict.fromkeys(str(cust_code) for cust_code in cust_list)) cust_data = self._dao.get_cust_by_ids(self._city_uuid, unique_cust_list) if cust_data.empty: return {cust_code: 60.0 for cust_code in unique_cust_list} score_map = {} for _, row in cust_data.iterrows(): cust_code = row.get("cust_code") if pd.isna(cust_code): continue score_map[str(cust_code)] = self._calculate_core_quality_score(row) for cust_code in unique_cust_list: score_map.setdefault(cust_code, 60.0) return score_map def _calculate_core_quality_score(self, row): """Calculate a 0-100 quality score using only fields defined in CustConfig.""" field_scores = [ ("terminal_star_name", { "五星终端": 100, "四星终端": 90, "三星终端": 80, "二星终端": 70, "一星终端": 60, "其他": 50, "无": 40, }, 0.18), ("cooperate_codename", { "好": 90, "较好": 75, "一般": 60, }, 0.14), ("store_appearance_name", { "好": 90, "较好": 75, "一般": 60, "差": 40, }, 0.12), ("is_modern_terminalname", { "是": 85, "否": 55, }, 0.10), ("modern_terminal_name", { "直营终端": 95, "合作终端": 90, "加盟终端": 85, "一般现代终端": 75, "普通终端": 60, "无法识别": 50, }, 0.08), ("cooperate_type_name", { "品牌加盟": 90, "冠名加盟": 85, "无": 55, }, 0.08), ("creditclass_name", { "AAA": 95, "AA": 90, "A": 85, "C": 60, "D": 45, }, 0.10), ("counter_status_name", { "有": 80, "计划中": 65, "无": 50, }, 0.05), ("counter_put_type_name", { "独立陈列": 85, "混杂陈列": 70, "无陈列": 50, }, 0.05), ("back_counter_status_name", { "有": 80, "计划中": 65, "无": 50, }, 0.04), ("back_counter_put_type_name", { "独立陈列": 85, "混杂陈列": 70, "无陈列": 50, }, 0.03), ("back_counter_has_show_name", { "有": 80, "无": 50, }, 0.03), ] weighted_score = 0.0 total_weight = 0.0 for field, score_map, weight in field_scores: score = self._score_by_config_value(row, field, score_map) if score is None: continue weighted_score += score * weight total_weight += weight for field, weight in [("counter_number", 0.05), ("back_counter_number", 0.05)]: score = self._counter_score(self._get_row_value(row, field)) if score is None: continue weighted_score += score * weight total_weight += weight if total_weight == 0: return 60.0 return round(weighted_score / total_weight, 4) def _score_by_config_value(self, row, field, score_map): if field not in CustConfig.FEATURE_COLUMNS: return None value = self._get_row_value(row, field) if pd.isna(value): return None text = str(value) if text not in CustConfig.ONEHOT_CAT.get(field, []): return None return float(score_map.get(text, 60.0)) @staticmethod def _get_row_value(row, field): if field not in row.index: return None return row.get(field) @staticmethod def _counter_score(value): if pd.isna(value): return None try: number = float(value) except (TypeError, ValueError): return None if number <= 0: return 50.0 if number >= 4: return 90.0 return 50.0 + number * 10.0 def filter_recommend_list(self, recommend_list): """过滤掉已经歇业的商铺""" cust_set = set(self._dao.get_cust_list(self._city_uuid)) filter_recommend_list = [ item for item in recommend_list if item["cust_code"] in cust_set ] return filter_recommend_list if __name__ == "__main__": city_uuid = "00000000000000000000000011445301" product_id = '350139' recommend = Recommend(city_uuid) recommend_list = recommend.get_recommend_list_by_gbdtlr(product_id) # for i in recommend_list: # print(i)