#!/usr/bin/env python # -*- encoding: utf-8 -*- ''' @filename : hot_recall.py @description : 热度召回算法 @time : 2025/01/21/00 @author : Sherlock1011 & Min1027 @Version : 1.0 ''' import pandas as pd from dao.redis_db import Redis from dao.mysql_client import Mysql from tqdm import tqdm class HotRecallModel: def __init__(self, order_data): self._redis_db = Redis() self._hotkeys = self.get_hotkeys() self._order_data = order_data def get_hotkeys(self): info = self._redis_db.redis.zrange("configs:hotkeys", 0, -1, withscores=True) hotkeys = [] for item, _ in info: hotkeys.append(item) return hotkeys def _calculate_hot_score(self, hot_name): """ 根据热度指标计算热度得分 :param hot_name: 热度指标A :type param: string :return: 所有热度指标的得分 :rtype: list """ results = self._order_data.groupby("BB_RETAIL_CUSTOMER_CODE")[hot_name].mean().reset_index() sorted_results = results.sort_values(by=hot_name, ascending=False).reset_index(drop=True) item_hot_score = [] # mock热度召回最大分数 max_score = 1.0 total_score = sorted_results.loc[0, hot_name] / max_score for row in sorted_results.itertuples(index=True, name="Row"): item = {row[1]:(row[2]/total_score)*100} item_hot_score.append(item) return {"key":f"{hot_name}", "value":item_hot_score} def calculate_all_hot_score(self, city_uuid): """ 计算所有的热度指标得分 """ # hot_datas = [] for hotkey_name in tqdm(self._hotkeys, desc="hot_recall:正在计算热度分数"): self.to_redis(self._calculate_hot_score(hotkey_name), city_uuid) def to_redis(self, rec_content_score, city_uuid): hotkey_name = rec_content_score["key"] rec_item_id = f"hot:{city_uuid}:{str(hotkey_name)}" # 修正 rec_item_id 拼接方式 print("自动清除历史id前数量", self._redis_db.redis.zcard(rec_item_id)) # 清空 sorted set 数据,确保不会影响后续的存储 self._redis_db.redis.delete(rec_item_id) print("自动清除历史id后数量", self._redis_db.redis.zcard(rec_item_id)) res = {} for item in rec_content_score["value"]: for content, score in item.items(): # item 形如 {A001: 75.0} res[content] = float(score) # 确保 score 是 float 类型 if res: # 只有当 res 不为空时才执行 zadd self._redis_db.redis.zadd(rec_item_id, res) if __name__ == "__main__": # 序列化 model = HotRecallModel() model.calculate_all_hot_score() # joblib.dump(model, "hot_recall.model")