ItemCF.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. from dao.redis_db import Redis
  2. import pandas as pd
  3. import numpy as np
  4. from tqdm import tqdm
  5. from scipy.sparse import csr_matrix
  6. from joblib import Parallel, delayed
  7. import joblib
  8. class ItemCF:
  9. def __init__(self):
  10. self._recommendations = {}
  11. def train(self, score_path, similatity_path, n=100, k=10, top_n=100, n_jobs=4):
  12. self._score_df = pd.read_csv(score_path)
  13. self._similarity_df = pd.read_csv(similatity_path, index_col=0)
  14. self._similarity_matrix = csr_matrix(self._similarity_df.values)
  15. self._shop_index = {shop: idx for idx, shop in enumerate(self._similarity_df.index)}
  16. self._index_shop = {idx: shop for idx, shop in enumerate(self._similarity_df.index)}
  17. def process_product(product_code, scores):
  18. # 获取热度最高的n个商户
  19. top_n_shops = scores.nlargest(n, "SCORE")["BB_RETAIL_CUSTOMER_CODE"].values
  20. top_n_indices = [self._shop_index[shop] for shop in top_n_shops]
  21. # 找到每个商户最相似的k个商户
  22. similar_shops = {}
  23. for shop_idx in top_n_indices:
  24. similarities = self._similarity_matrix[shop_idx].toarray().flatten()
  25. similar_indices = np.argpartition(similarities, -k-1)[-k-1:]
  26. similar_indices = similar_indices[similar_indices != shop_idx][:k]
  27. similar_shops[self._index_shop[shop_idx]] = [self._index_shop[idx] for idx in similar_indices]
  28. # 生成候选商户列表
  29. candidate_shops = list(set([m for sublist in similar_shops.values() for m in sublist]))
  30. candidate_indices = [self._shop_index[shop] for shop in candidate_shops]
  31. # 计算每个候选商户的兴趣得分
  32. interest_scores = {}
  33. for candidate_idx in candidate_indices:
  34. interest_score = 0
  35. for shop_idx in top_n_indices:
  36. if self._index_shop[candidate_idx] in similar_shops[self._index_shop[shop_idx]]:
  37. shop_score = scores[scores["BB_RETAIL_CUSTOMER_CODE"]==self._index_shop[shop_idx]]["SCORE"].values[0]
  38. interest_score += shop_score * self._similarity_matrix[shop_idx, candidate_idx]
  39. interest_scores[self._index_shop[candidate_idx]] = interest_score
  40. # 将候选商户的兴趣得分转换为字典列表,并按照从大到小排列
  41. sorted_candidates = sorted([{shop_id: s} for shop_id, s in interest_scores.items()],
  42. key=lambda x: list(x.values())[0], reverse=True)[:top_n]
  43. return product_code, sorted_candidates
  44. # 并行处理每个品规
  45. results = Parallel(n_jobs=n_jobs)(delayed(process_product)(product_code, scores)
  46. for product_code, scores in tqdm(self._score_df.groupby("PRODUCT_CODE"), desc="train:正在计算候选得分"))
  47. # 存储结果
  48. self._recommendations = {product_code: sorted_candidates for product_code, sorted_candidates in results}
  49. def to_redis_zset(self):
  50. """
  51. 将 self._recommendations 中的数据保存到 Redis 的 Sorted Set (ZSET) 中
  52. 存储格式为 fc:product_code,其中商户 ID 作为成员,得分作为分数
  53. """
  54. redis_db = Redis()
  55. for product_code, recommendations in tqdm(self._recommendations.items(), desc="train:正在存储推荐结果"):
  56. redis_key = f"fc:{product_code}"
  57. zset_data = {}
  58. for rec in recommendations:
  59. for shop_id, score in rec.items():
  60. try:
  61. zset_data[shop_id] = float(score)
  62. except ValueError as e:
  63. print(f"Error converting score to float for shop_id {shop_id}: {score}")
  64. raise e
  65. redis_db.redis.zadd(redis_key, zset_data)
  66. def inference(self, product_code):
  67. """
  68. 从 Redis 中读取推荐结果,并返回 {shop_id: score} 的列表
  69. """
  70. redis_db = Redis()
  71. redis_key = f"fc:{product_code}"
  72. recommendations = redis_db.redis.zrange(redis_key, 0, -1, withscores=True, desc=True)
  73. # 将推荐结果转换为 {shop_id: score} 的字典列表
  74. result = [{shop_id: float(score)} for shop_id, score in recommendations]
  75. return result
  76. if __name__ == "__main__":
  77. score_path = "./models/recall/itemCF/matrix/score.csv"
  78. similarity_path = "./models/recall/itemCF/matrix/similarity.csv"
  79. itemcf_model = ItemCF()
  80. # itemcf_model.train(score_path, similarity_path, n_jobs=4)
  81. recommend_list = itemcf_model.inference(110111)
  82. # itemcf_model.to_redis_zset()
  83. # print(len(recommend_list))
  84. print(recommend_list)
  85. # joblib.dump(itemcf_model, "itemCF.model")
  86. # model = joblib.load("./itemCF.model")
  87. # recommend_list = model.inference(110102)
  88. # print(len(recommend_list))
  89. # print(recommend_list)