Przeglądaj źródła

协同过滤算法实现

Sherlock1011 1 rok temu
rodzic
commit
d4ba741db3
1 zmienionych plików z 105 dodań i 0 usunięć
  1. 105 0
      models/recall/itemCF/ItemCF.py

+ 105 - 0
models/recall/itemCF/ItemCF.py

@@ -0,0 +1,105 @@
+from dao.redis_db import Redis
+import pandas as pd
+import numpy as np
+from tqdm import tqdm
+from scipy.sparse import csr_matrix
+from joblib import Parallel, delayed
+import joblib
+
+class ItemCF:
+    def __init__(self, score_path, similatity_path):
+        self._recommendations = {}
+        self._score_df = pd.read_csv(score_path)
+        self._similarity_df = pd.read_csv(similatity_path, index_col=0)
+        self._similarity_matrix = csr_matrix(self._similarity_df.values)
+        self._shop_index = {shop: idx for idx, shop in enumerate(self._similarity_df.index)}
+        self._index_shop = {idx: shop for idx, shop in enumerate(self._similarity_df.index)}
+        
+    def train(self, n=100, k=10, top_n=100, n_jobs=4):
+        def process_product(product_code, scores):
+            # 获取热度最高的n个商户
+            top_n_shops = scores.nlargest(n, "SCORE")["BB_RETAIL_CUSTOMER_CODE"].values
+            top_n_indices = [self._shop_index[shop] for shop in top_n_shops]
+            
+            # 找到每个商户最相似的k个商户
+            similar_shops = {}
+            for shop_idx in top_n_indices:
+                similarities = self._similarity_matrix[shop_idx].toarray().flatten()
+                similar_indices = np.argpartition(similarities, -k-1)[-k-1:]
+                similar_indices = similar_indices[similar_indices != shop_idx][:k]
+                similar_shops[self._index_shop[shop_idx]] = [self._index_shop[idx] for idx in similar_indices]
+            
+            # 生成候选商户列表
+            candidate_shops = list(set([m for sublist in similar_shops.values() for m in sublist]))
+            candidate_indices = [self._shop_index[shop] for shop in candidate_shops]
+            
+            # 计算每个候选商户的兴趣得分
+            interest_scores = {}
+            for candidate_idx in candidate_indices:
+                interest_score = 0
+                for shop_idx in top_n_indices:
+                    if self._index_shop[candidate_idx] in similar_shops[self._index_shop[shop_idx]]:
+                        shop_score = scores[scores["BB_RETAIL_CUSTOMER_CODE"]==self._index_shop[shop_idx]]["SCORE"].values[0]
+                        interest_score += shop_score * self._similarity_matrix[shop_idx, candidate_idx]
+                interest_scores[self._index_shop[candidate_idx]] = interest_score
+            
+            # 将候选商户的兴趣得分转换为字典列表,并按照从大到小排列
+            sorted_candidates = sorted([{shop_id: s} for shop_id, s in interest_scores.items()],
+                                       key=lambda x: list(x.values())[0], reverse=True)[:top_n]
+            
+            return product_code, sorted_candidates
+        
+        # 并行处理每个品规
+        results = Parallel(n_jobs=n_jobs)(delayed(process_product)(product_code, scores) 
+                                          for product_code, scores in tqdm(self._score_df.groupby("PRODUCT_CODE"), desc="正在计算候选得分..."))
+        
+        # 存储结果
+        self._recommendations = {product_code: sorted_candidates for product_code, sorted_candidates in results}
+    
+    def to_redis_zset(self):
+        """
+        将 self._recommendations 中的数据保存到 Redis 的 Sorted Set (ZSET) 中
+        存储格式为 fc:product_code,其中商户 ID 作为成员,得分作为分数
+        """
+        redis_db = Redis()
+        for product_code, recommendations in self._recommendations.items():
+            redis_key = f"fc:{product_code}"
+            zset_data = {}
+            for rec in recommendations:
+                for shop_id, score in rec.items():
+                    try:
+                        zset_data[shop_id] = float(score)
+                    except ValueError as e:
+                        print(f"Error converting score to float for shop_id {shop_id}: {score}")
+                        raise e
+            
+            redis_db.redis.zadd(redis_key, zset_data)
+    
+    def inference(self, product_code):
+        """
+        从 Redis 中读取推荐结果,并返回 {shop_id: score} 的列表
+        """
+        redis_db = Redis()
+        redis_key = f"fc:{product_code}"
+        recommendations = redis_db.redis.zrange(redis_key, 0, -1, withscores=True, desc=True)
+        
+        # 将推荐结果转换为 {shop_id: score} 的字典列表
+        result = [{shop_id: float(score)} for shop_id, score in recommendations]
+        
+        return result
+    
+if __name__ == "__main__":
+    score_path = "./models/recall/itemCF/matrix/score.csv"
+    similarity_path = "./models/recall/itemCF/matrix/similarity.csv"
+    itemcf_model = ItemCF(score_path, similarity_path)
+    # itemcf_model.train(n_jobs=4)
+    # recommend_list = itemcf_model.inference(110102)
+    # itemcf_model.to_redis_zset()
+    # print(len(recommend_list))
+    # print(recommend_list)
+    joblib.dump(itemcf_model, "itemCF.model")
+    
+    # model = joblib.load("./itemCF.model")
+    # recommend_list = model.inference(110102)
+    # print(len(recommend_list))
+    # print(recommend_list)