recommend.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. from database.dao.mysql_dao import MySqlDao
  2. from database.db.redis_db import RedisDatabaseHelper
  3. import os
  4. from models.item2vec.inference import Item2VecModel
  5. from models.rank.data.config import CustConfig, ProductConfig
  6. from models.rank.data.utils import sample_data_clear
  7. from models.rank import GbdtLrModel, generate_feats_map
  8. import pandas as pd
  9. class Recommend:
  10. def __init__(self, city_uuid):
  11. self._redis = RedisDatabaseHelper().redis
  12. self._dao = MySqlDao()
  13. self._load_molde(city_uuid)
  14. def _load_molde(self, city_uuid):
  15. """加载推演模型"""
  16. self._city_uuid = city_uuid
  17. gbdtlr_model_path = os.path.join("./models/rank/weights", city_uuid, "gbdtlr_model.pkl")
  18. self._gbdtlr_model = GbdtLrModel(gbdtlr_model_path)
  19. self._item2vec_model = Item2VecModel(city_uuid)
  20. def _get_itemcf_recall(self, product_id):
  21. """协同召回"""
  22. key = f"fc:{self._city_uuid}:{product_id}"
  23. recall_list = self._redis.zrevrange(key, 0, -1, withscores=False)
  24. return recall_list
  25. def _get_hot_recall(self):
  26. """热度召回"""
  27. key = f"hot:{self._city_uuid}:sale_qty"
  28. recall_list = self._redis.zrevrange(key, 0, -1, withscores=False)
  29. return recall_list
  30. def get_recal_cust(self, product_id, recall_count):
  31. """通过协同过滤和热度召回,召回待推荐商户列表"""
  32. itemcf_recall_list = self._get_itemcf_recall(product_id)
  33. hot_recall_list = self._get_hot_recall()
  34. result = list(dict.fromkeys(itemcf_recall_list))
  35. # 如果结果不足,从hot_recall中补齐
  36. if len(result) < recall_count:
  37. hot_recall_set = set(hot_recall_list) - set(result)
  38. additional_items = [item for item in hot_recall_list if item in hot_recall_set]
  39. needed = recall_count - len(result)
  40. result.extend(additional_items[:needed])
  41. return result[:recall_count]
  42. def get_recommend_list_by_gbdtlr(self, product_id, recall_count=500):
  43. """根据gbdt_lr获取商户推荐列表"""
  44. # 获取召回的商户列表
  45. recall_cust_list = self.get_recal_cust(product_id, recall_count)
  46. # 获取卷烟数据
  47. product_data = self._dao.get_product_by_id(self._city_uuid, product_id)[ProductConfig.FEATURE_COLUMNS]
  48. product_data = sample_data_clear(product_data, ProductConfig)
  49. # 获取整合商户数据
  50. cust_data = self._dao.get_cust_by_ids(self._city_uuid, recall_cust_list)[CustConfig.FEATURE_COLUMNS]
  51. # shop_data = self._dao.get_shop_by_ids(self._city_uuid, recall_cust_list)[ShopConfig.FEATURE_COLUMNS]
  52. cust_data = sample_data_clear(cust_data, CustConfig)
  53. # shop_data = sample_data_clear(shop_data, ShopConfig)
  54. # cust_feats = shop_data.set_index("cust_code")
  55. # cust_data = cust_data.join(cust_feats, on="BB_RETAIL_CUSTOMER_CODE", how="inner")
  56. # 按 recall_cust_list 顺序对齐 cust_data,确保 feats_map 行顺序与 recall_list 一致
  57. # 否则 get_recommend_list 中 zip(recall_list, scores) 会错配商户ID和分数
  58. cust_codes_in_data = set(cust_data["cust_code"].tolist())
  59. ordered_recall_list = [c for c in recall_cust_list if c in cust_codes_in_data]
  60. cust_order = {code: i for i, code in enumerate(ordered_recall_list)}
  61. cust_data = cust_data.sort_values("cust_code", key=lambda x: x.map(cust_order)).reset_index(drop=True)
  62. # 获取推理用的feats_map
  63. feats_map = generate_feats_map(product_data, cust_data)
  64. recommend_list = self._gbdtlr_model.get_recommend_list(feats_map, ordered_recall_list)
  65. # recommend_list = self.filter_recommend_list(recommend_list)
  66. return recommend_list
  67. def get_recommend_list_by_item2vec(self, product_id, recall_count=500):
  68. """根据item2vec获取商户推荐列表"""
  69. recommend_list = self._item2vec_model.get_recommend_cust_list(product_id, top=recall_count)
  70. recommend_list = recommend_list.drop(columns=["sale_qty"])
  71. recommend_list = recommend_list.to_dict(orient='records')
  72. recommend_list = recommend_list[:recall_count]
  73. # recommend_list = self.filter_recommend_list(recommend_list)
  74. return recommend_list
  75. def filter_recommend_list(self, recommend_list):
  76. """过滤掉已经歇业的商铺"""
  77. cust_set = set(self._dao.get_cust_list(self._city_uuid))
  78. filter_recommend_list = [
  79. item for item in recommend_list
  80. if item["cust_code"] in cust_set
  81. ]
  82. return filter_recommend_list
  83. def get_recommend_and_delivery(self, recommend_list, delivery_count=5000):
  84. """根据推荐列表生成投放分配"""
  85. recommend_data = pd.DataFrame(recommend_list)
  86. # 1. 计算每个商户的理论应得数量(带小数)
  87. recommend_data["delivery_float"] = (
  88. recommend_data["recommend_score"] / recommend_data["recommend_score"].sum() * delivery_count
  89. )
  90. # 2. 向下取整得到基础配额
  91. recommend_data["delivery_count"] = recommend_data["delivery_float"].astype(int)
  92. # 3. 计算余数并排序
  93. recommend_data["remainder"] = recommend_data["delivery_float"] - recommend_data["delivery_count"]
  94. recommend_data = recommend_data.sort_values(["remainder", "cust_code"], ascending=[False, True])
  95. # 4. 将剩余配额按余数从大到小分配
  96. remaining = delivery_count - recommend_data["delivery_count"].sum()
  97. recommend_data.iloc[:remaining, recommend_data.columns.get_loc("delivery_count")] += 1
  98. recommend_data = recommend_data.drop(columns=["delivery_float", "remainder"])
  99. recommend_data = recommend_data.sort_values(["recommend_score", "cust_code"], ascending=[False, True])
  100. recommend_data = recommend_data.to_dict(orient='records')
  101. return recommend_data
  102. if __name__ == "__main__":
  103. city_uuid = "00000000000000000000000011445301"
  104. product_id = '350139'
  105. recommend = Recommend(city_uuid)
  106. recommend_list = recommend.get_recommend_list_by_gbdtlr(product_id)
  107. # for i in recommend_list:
  108. # print(i)
  109. # recommend_data = recommend.get_recommend_and_delivery(recommend_list)