| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- from database.dao.mysql_dao import MySqlDao
- from database.db.redis_db import RedisDatabaseHelper
- import os
- from models.item2vec.inference import Item2VecModel
- from models.rank.data.config import CustConfig, ProductConfig
- from models.rank.data.utils import sample_data_clear
- from models.rank import GbdtLrModel, generate_feats_map
- import pandas as pd
- from core import get_logger
- logger = get_logger("models.recommend")
- class Recommend:
- def __init__(self, city_uuid):
- self._redis = RedisDatabaseHelper().redis
- self._dao = MySqlDao()
-
- self._load_molde(city_uuid)
-
- def _load_molde(self, city_uuid):
- """加载推演模型"""
- self._city_uuid = city_uuid
- gbdtlr_model_path = os.path.join("./models/rank/weights", city_uuid, "gbdtlr_model.pkl")
- self._gbdtlr_model = GbdtLrModel(gbdtlr_model_path)
- self._item2vec_model = Item2VecModel(city_uuid)
- logger.info(f"Models loaded for city_uuid={city_uuid}")
-
- def _get_itemcf_recall(self, product_id):
- """协同召回"""
- key = f"fc:{self._city_uuid}:{product_id}"
- recall_list = self._redis.zrevrange(key, 0, -1, withscores=False)
- return recall_list
-
- def get_recal_cust(self, product_id, cust_code_list):
- """通过协同过滤召回与核心零售户列表取并集,得到待推荐商户列表"""
- itemcf_recall_list = self._get_itemcf_recall(product_id)
- seen = set(itemcf_recall_list)
- extra = [c for c in cust_code_list if c not in seen]
- result = list(itemcf_recall_list) + extra
- logger.info(f"Recall completed: {len(result)} customers (itemcf={len(itemcf_recall_list)}, core_extra={len(extra)}) for product {product_id}")
- return result
- def get_recommend_list_by_gbdtlr(self, product_id, cust_code_list=None):
- """根据gbdt_lr获取商户推荐列表"""
- logger.info(f"GBDT-LR recommend started for product {product_id}")
- # 获取召回的商户列表
- if cust_code_list is None:
- cust_code_list = []
- recall_cust_list = self.get_recal_cust(product_id, cust_code_list)
- # 获取卷烟数据
- product_data = self._dao.get_product_by_id(self._city_uuid, product_id)[ProductConfig.FEATURE_COLUMNS]
- product_data = sample_data_clear(product_data, ProductConfig)
-
- # 获取整合商户数据
- cust_data = self._dao.get_cust_by_ids(self._city_uuid, recall_cust_list)[CustConfig.FEATURE_COLUMNS]
- # shop_data = self._dao.get_shop_by_ids(self._city_uuid, recall_cust_list)[ShopConfig.FEATURE_COLUMNS]
- cust_data = sample_data_clear(cust_data, CustConfig)
- # shop_data = sample_data_clear(shop_data, ShopConfig)
- # cust_feats = shop_data.set_index("cust_code")
- # cust_data = cust_data.join(cust_feats, on="BB_RETAIL_CUSTOMER_CODE", how="inner")
- # 按 recall_cust_list 顺序对齐 cust_data,确保 feats_map 行顺序与 recall_list 一致
- # 否则 get_recommend_list 中 zip(recall_list, scores) 会错配商户ID和分数
- cust_codes_in_data = set(cust_data["cust_code"].tolist())
- ordered_recall_list = [c for c in recall_cust_list if c in cust_codes_in_data]
- cust_order = {code: i for i, code in enumerate(ordered_recall_list)}
- cust_data = cust_data.sort_values("cust_code", key=lambda x: x.map(cust_order)).reset_index(drop=True)
- # 获取推理用的feats_map
- feats_map = generate_feats_map(product_data, cust_data)
- recommend_list = self._gbdtlr_model.get_recommend_list(feats_map, ordered_recall_list)
- # recommend_list = self.filter_recommend_list(recommend_list)
- logger.info(f"GBDT-LR recommend completed: {len(recommend_list)} results")
- return recommend_list
-
- def get_recommend_list_by_item2vec(self, product_id, cust_code_list=None):
- """根据item2vec获取商户推荐列表,核心商户并入候选集统一评分"""
- if cust_code_list is None:
- cust_code_list = []
- logger.info(f"Item2Vec recommend started for product {product_id}")
- recommend_list = self._item2vec_model.get_recommend_cust_list(product_id, cust_code_list=cust_code_list)
- recommend_list = recommend_list.drop(columns=["sale_qty"])
- recommend_list = recommend_list.to_dict(orient='records')
- # recommend_list = self.filter_recommend_list(recommend_list)
- logger.info(f"Item2Vec recommend completed: {len(recommend_list)} results")
- return recommend_list
-
- def filter_recommend_list(self, recommend_list):
- """过滤掉已经歇业的商铺"""
- cust_set = set(self._dao.get_cust_list(self._city_uuid))
- filter_recommend_list = [
- item for item in recommend_list
- if item["cust_code"] in cust_set
- ]
- return filter_recommend_list
- if __name__ == "__main__":
- city_uuid = "00000000000000000000000011445301"
- product_id = '350139'
- recommend = Recommend(city_uuid)
- recommend_list = recommend.get_recommend_list_by_gbdtlr(product_id)
- # for i in recommend_list:
- # print(i)
- # recommend_data = recommend.get_recommend_and_delivery(recommend_list)
-
|