Ver código fonte

更新热度召回及协同过滤流程

yangzeyu 11 meses atrás
pai
commit
e1e9db098c

+ 3 - 2
config/__init__.py

@@ -1,7 +1,8 @@
 #!/usr/bin/env python3
 # -*- coding:utf-8 -*-
-from config.config import load_config 
+from config.config import load_config, load_model_config
 
 __all__ = [
-    "load_config"
+    "load_config",
+    "load_model_config"
 ]

+ 5 - 0
config/config.py

@@ -3,4 +3,9 @@ import yaml
 def load_config():
     with open('./config/database_config.yaml') as file:
         config = yaml.safe_load(file)
+    return config
+
+def load_model_config():
+    with open('./config/model_config.yaml') as file:
+        config = yaml.safe_load(file)
     return config

+ 2 - 0
config/model_config.yaml

@@ -0,0 +1,2 @@
+hot_recall:
+  hot_keys: ["sale_qty"]

+ 4 - 1
models/__init__.py

@@ -1,10 +1,13 @@
 #!/usr/bin/env python3
 # -*- coding:utf-8 -*-
 from models.recall.hot_recall import HotRecallModel
-from models.recall.itemCF.user_item_score import UserItemScore
+from models.recall.itemCF.score import UserItemScore
+from models.recall.itemCF.similarity_matrix import SimilarityMatrix
 from models.recall.itemCF.ItemCF import ItemCFModel
+
 __all__ = [
     "HotRecallModel",
     "UserItemScore",
+    "SimilarityMatrix",
     "ItemCFModel"
 ]

+ 77 - 77
models/recall/hot_recall.py

@@ -1,77 +1,77 @@
-#!/usr/bin/env python
-# -*- encoding: utf-8 -*-
-'''
-@filename     : hot_recall.py
-@description     : 热度召回算法   
-@time     : 2025/01/21/00
-@author     : Sherlock1011 & Min1027
-@Version     : 1.0
-'''
-import pandas as pd
-from database import RedisDatabaseHelper
-from tqdm import tqdm
-
-class HotRecallModel:
-    def __init__(self, order_data):
-        self._redis_db = RedisDatabaseHelper()
-        self._hotkeys = self.get_hotkeys()
-        self._order_data = order_data
-
-
-    def get_hotkeys(self):
-        info = self._redis_db.redis.zrange("configs:hotkeys", 0, -1, withscores=True)
-        hotkeys = []
-        for item, _ in info:
-            hotkeys.append(item)
-        return hotkeys
-        
-    def _calculate_hot_score(self, hot_name):
-        """
-        根据热度指标计算热度得分
-        :param hot_name: 热度指标A
-        :type param: string
-        :return: 所有热度指标的得分
-        :rtype: list
-        """
-        results = self._order_data.groupby("BB_RETAIL_CUSTOMER_CODE")[hot_name].mean().reset_index()
-        sorted_results = results.sort_values(by=hot_name, ascending=False).reset_index(drop=True)
-        item_hot_score = []
-        # mock热度召回最大分数
-        max_score = 1.0
-        total_score = sorted_results.loc[0, hot_name] / max_score
-        for row in sorted_results.itertuples(index=True, name="Row"):
-            item = {row[1]:(row[2]/total_score)*100}
-            item_hot_score.append(item)
-        return {"key":f"{hot_name}", "value":item_hot_score}
-
-    def calculate_all_hot_score(self, city_uuid):
-        """
-        计算所有的热度指标得分
-        """
-        # hot_datas = []
-        for hotkey_name in tqdm(self._hotkeys, desc="hot_recall:正在计算热度分数"):
-            self.to_redis(self._calculate_hot_score(hotkey_name), city_uuid)
-
-    def to_redis(self, rec_content_score, city_uuid):
-        hotkey_name = rec_content_score["key"]
-        rec_item_id = f"hot:{city_uuid}:{str(hotkey_name)}" # 修正 rec_item_id 拼接方式
-        print("自动清除历史id前数量", self._redis_db.redis.zcard(rec_item_id))
-        # 清空 sorted set 数据,确保不会影响后续的存储
-        self._redis_db.redis.delete(rec_item_id)
-        print("自动清除历史id后数量", self._redis_db.redis.zcard(rec_item_id))
-         
-        res = {}
-
-        for item in rec_content_score["value"]:  
-            for content, score in item.items():  # item 形如 {A001: 75.0}
-                res[content] = float(score)  # 确保 score 是 float 类型
-
-        if res:  # 只有当 res 不为空时才执行 zadd
-            self._redis_db.redis.zadd(rec_item_id, res)
-
-
-if __name__ == "__main__":
-    # 序列化
-    model = HotRecallModel()
-    model.calculate_all_hot_score()
-    # joblib.dump(model, "hot_recall.model")
+from sklearn.preprocessing import StandardScaler
+from config import load_model_config
+from database import RedisDatabaseHelper, MySqlDao
+from tqdm import tqdm
+
+from models.rank.data.config import OrderConfig
+import numpy as np
+
+cfgs = load_model_config()
+
+class HotRecallModel:
+    def __init__(self, city_uuid):
+        self._redis_db = RedisDatabaseHelper().redis
+        self._dao = MySqlDao()
+        self._load_data(city_uuid)
+        self._hotkeys = cfgs["hot_recall"]["hot_keys"]
+    
+    def _load_data(self, city_uuid):
+        """加载订单记录表"""
+        print("hot_recall: 正在加载order_info...")
+        self._order_data = self._dao.load_order_data(city_uuid)
+        self._order_data =self._order_data[OrderConfig.FEATURE_COLUMNS]
+        
+        # 数据清洗
+        self._order_data["sale_qty"] = self._order_data["sale_qty"].fillna(0)
+        self._order_data = self._order_data.groupby(["cust_code", "product_code"], as_index=False)["sale_qty"].sum()
+        self._order_data = self._order_data[self._order_data["sale_qty"] != 0]
+        
+        
+    def _calculate_hot_score(self, hot_name):
+        """
+        根据热度指标计算热度得分
+        :param hot_name: 热度指标A
+        :type param: string
+        :return: 所有热度指标的得分
+        :rtype: list
+        """
+        results = self._order_data.groupby("cust_code")[hot_name].mean().reset_index()
+        sorted_results = results.sort_values(by=hot_name, ascending=False).reset_index(drop=True)
+        
+        scaler = StandardScaler()
+        normalized = scaler.fit_transform(sorted_results["sale_qty"].values.reshape(-1, 1))
+        sorted_results["sale_qty"] = ((1 / (1 + np.exp(-normalized))) * 100).flatten()
+        item_hot_score = []
+        for _, row in sorted_results.iterrows():
+            item_hot_score.append({row["cust_code"]: row[hot_name]})
+            
+        return {"key":f"{hot_name}", "value":item_hot_score}
+    
+    def _to_redis(self, rec_content_score, city_uuid):
+        hotkey_name = rec_content_score["key"]
+        rec_item_id = f"hot:{city_uuid}:{str(hotkey_name)}" # 修正 rec_item_id 拼接方式
+        # 清空 sorted set 数据,确保不会影响后续的存储
+        self._redis_db.delete(rec_item_id)
+         
+        res = {}
+
+        for item in rec_content_score["value"]:  
+            for content, score in item.items():  # item 形如 {A001: 75.0}
+                res[content] = float(score)  # 确保 score 是 float 类型
+
+        if res:  # 只有当 res 不为空时才执行 zadd
+            self._redis_db.zadd(rec_item_id, res)
+    
+    def calculate_all_hot_score(self, city_uuid):
+        """
+        计算所有的热度指标得分
+        """
+        # hot_datas = []
+        for hotkey_name in tqdm(self._hotkeys, desc="hot_recall:正在计算热度分数"):
+            self._to_redis(self._calculate_hot_score(hotkey_name), city_uuid)
+            
+    
+        
+if __name__ == "__main__":
+    hot_recall = HotRecallModel("00000000000000000000000011445301")
+    hot_recall.calculate_all_hot_score("00000000000000000000000011445301")

+ 75 - 0
models/recall/hot_recall_ori.py

@@ -0,0 +1,75 @@
+#!/usr/bin/env python
+# -*- encoding: utf-8 -*-
+'''
+@filename     : hot_recall.py
+@description     : 热度召回算法   
+@time     : 2025/01/21/00
+@author     : Sherlock1011 & Min1027
+@Version     : 1.0
+'''
+import pandas as pd
+from database import RedisDatabaseHelper
+from tqdm import tqdm
+
+class HotRecallModel:
+    def __init__(self, order_data):
+        self._redis_db = RedisDatabaseHelper()
+        self._hotkeys = self.get_hotkeys()
+        self._order_data = order_data
+
+
+    def get_hotkeys(self):
+        info = self._redis_db.redis.zrange("configs:hotkeys", 0, -1, withscores=True)
+        hotkeys = []
+        for item, _ in info:
+            hotkeys.append(item)
+        return hotkeys
+        
+    def _calculate_hot_score(self, hot_name):
+        """
+        根据热度指标计算热度得分
+        :param hot_name: 热度指标A
+        :type param: string
+        :return: 所有热度指标的得分
+        :rtype: list
+        """
+        results = self._order_data.groupby("BB_RETAIL_CUSTOMER_CODE")[hot_name].mean().reset_index()
+        sorted_results = results.sort_values(by=hot_name, ascending=False).reset_index(drop=True)
+        item_hot_score = []
+        # mock热度召回最大分数
+        max_score = 1.0
+        total_score = sorted_results.loc[0, hot_name] / max_score
+        for row in sorted_results.itertuples(index=True, name="Row"):
+            item = {row[1]:(row[2]/total_score)*100}
+            item_hot_score.append(item)
+        return {"key":f"{hot_name}", "value":item_hot_score}
+
+    def calculate_all_hot_score(self, city_uuid):
+        """
+        计算所有的热度指标得分
+        """
+        # hot_datas = []
+        for hotkey_name in tqdm(self._hotkeys, desc="hot_recall:正在计算热度分数"):
+            self.to_redis(self._calculate_hot_score(hotkey_name), city_uuid)
+
+    def to_redis(self, rec_content_score, city_uuid):
+        hotkey_name = rec_content_score["key"]
+        rec_item_id = f"hot:{city_uuid}:{str(hotkey_name)}" # 修正 rec_item_id 拼接方式
+        # 清空 sorted set 数据,确保不会影响后续的存储
+        self._redis_db.redis.delete(rec_item_id)
+         
+        res = {}
+
+        for item in rec_content_score["value"]:  
+            for content, score in item.items():  # item 形如 {A001: 75.0}
+                res[content] = float(score)  # 确保 score 是 float 类型
+
+        if res:  # 只有当 res 不为空时才执行 zadd
+            self._redis_db.redis.zadd(rec_item_id, res)
+
+
+if __name__ == "__main__":
+    # 序列化
+    model = HotRecallModel()
+    model.calculate_all_hot_score()
+    # joblib.dump(model, "hot_recall.model")

+ 27 - 29
models/recall/itemCF/ItemCF.py

@@ -1,48 +1,56 @@
-from database import RedisDatabaseHelper
+from database import RedisDatabaseHelper, MySqlDao
 import pandas as pd
+from models import UserItemScore, SimilarityMatrix
 import numpy as np
 from tqdm import tqdm
 from scipy.sparse import csr_matrix
 from joblib import Parallel, delayed
-import joblib
 
 class ItemCFModel:
     def __init__(self):
         self._recommendations = {}
+        self._dao = MySqlDao()
         
-    def train(self, score_path, similatity_path, city_uuid, n=100, k=10, top_n=100, n_jobs=4):
-        self._score_df = pd.read_csv(score_path)
-        self._similarity_df = pd.read_csv(similatity_path, index_col=0)
-        self._similarity_matrix = csr_matrix(self._similarity_df.values)
-        self._shop_index = {shop: idx for idx, shop in enumerate(self._similarity_df.index)}
-        self._index_shop = {idx: shop for idx, shop in enumerate(self._similarity_df.index)}
+    def train(self, city_uuid, n=300, k=100, top_n=300, n_jobs=4):
+        # self._score_df = pd.read_csv(score_path)
+        # self._similarity_df = pd.read_csv(similatity_path, index_col=0)
+        print("itemcf: 正在加载order_info...")
+        self._order_data = self._dao.load_order_data(city_uuid)
+        print("正在计算品规培育分数...")
+        self._score_df = UserItemScore(self._order_data).generate_product_scores()
+        print("正在计算商户相似度矩阵...")
+        self._similarity_df = SimilarityMatrix(self._order_data).generate_similarity_matrix()
+        
+        similarity_matrix = csr_matrix(self._similarity_df.values)
+        shop_index = {shop: idx for idx, shop in enumerate(self._similarity_df.index)}
+        index_shop = {idx: shop for idx, shop in enumerate(self._similarity_df.index)}
         
         def process_product(product_code, scores):
             # 获取热度最高的n个商户
             top_n_shops = scores.nlargest(n, "score")["cust_code"].values
-            top_n_indices = [self._shop_index[shop] for shop in top_n_shops]
+            top_n_indices = [shop_index[shop] for shop in top_n_shops]
             
             # 找到每个商户最相似的k个商户
             similar_shops = {}
             for shop_idx in top_n_indices:
-                similarities = self._similarity_matrix[shop_idx].toarray().flatten()
+                similarities = similarity_matrix[shop_idx].toarray().flatten()
                 similar_indices = np.argpartition(similarities, -k-1)[-k-1:]
                 similar_indices = similar_indices[similar_indices != shop_idx][:k]
-                similar_shops[self._index_shop[shop_idx]] = [self._index_shop[idx] for idx in similar_indices]
+                similar_shops[index_shop[shop_idx]] = [index_shop[idx] for idx in similar_indices]
             
             # 生成候选商户列表
-            candidate_shops = list(set([m for sublist in similar_shops.values() for m in sublist]))
-            candidate_indices = [self._shop_index[shop] for shop in candidate_shops]
+            candidate_shops = list(set(top_n_shops).union(set([m for sublist in similar_shops.values() for m in sublist])))
+            candidate_indices = [shop_index[shop] for shop in candidate_shops]
             
             # 计算每个候选商户的兴趣得分
             interest_scores = {}
             for candidate_idx in candidate_indices:
                 interest_score = 0
                 for shop_idx in top_n_indices:
-                    if self._index_shop[candidate_idx] in similar_shops[self._index_shop[shop_idx]]:
-                        shop_score = scores[scores["cust_code"]==self._index_shop[shop_idx]]["score"].values[0]
-                        interest_score += shop_score * self._similarity_matrix[shop_idx, candidate_idx]
-                interest_scores[self._index_shop[candidate_idx]] = interest_score
+                    if index_shop[candidate_idx] in similar_shops[index_shop[shop_idx]]:
+                        shop_score = scores[scores["cust_code"]==index_shop[shop_idx]]["score"].values[0]
+                        interest_score += shop_score * similarity_matrix[shop_idx, candidate_idx]
+                interest_scores[index_shop[candidate_idx]] = interest_score
             
             # 将候选商户的兴趣得分转换为字典列表,并按照从大到小排列
             sorted_candidates = sorted([{shop_id: s} for shop_id, s in interest_scores.items()],
@@ -53,7 +61,7 @@ class ItemCFModel:
         # 并行处理每个品规
         results = Parallel(n_jobs=n_jobs)(delayed(process_product)(product_code, scores) 
                                           for product_code, scores in tqdm(self._score_df.groupby("product_code"), desc="train:正在计算候选得分"))
-        print(len(results))
+        
         # 存储结果
         self._recommendations = {product_code: sorted_candidates for product_code, sorted_candidates in results}
         self.to_redis_zset(city_uuid)
@@ -88,14 +96,4 @@ if __name__ == "__main__":
     score_path = "./data/itemcf/scores.csv"
     similarity_path = "./data/itemcf/similarity.csv"
     itemcf_model = ItemCFModel()
-    itemcf_model.train(score_path, similarity_path, "00000000000000000000000011445301", n_jobs=4)
-    # recommend_list = itemcf_model.inference(110111)
-    # itemcf_model.to_redis_zset()
-    # print(len(recommend_list))
-    # print(recommend_list)
-    # joblib.dump(itemcf_model, "itemCF.model")
-    
-    # model = joblib.load("./itemCF.model")
-    # recommend_list = model.inference(110102)
-    # print(len(recommend_list))
-    # print(recommend_list)
+    itemcf_model.train("00000000000000000000000011445301", n_jobs=4)

+ 7 - 8
models/recall/itemCF/score.py

@@ -3,14 +3,12 @@ from models.rank.data.config import OrderConfig
 import numpy as np
 from sklearn.preprocessing import StandardScaler
 class UserItemScore:
-    def __init__(self, city_uuid):
-        self._dao = MySqlDao()
-        self._load_data(city_uuid)
+    def __init__(self, data):
+        self._order_data = data.copy()
+        self._load_data()
         
-    def _load_data(self, city_uuid):
+    def _load_data(self):
         """加载订单记录表"""
-        print("item-cf: 正在加载order_info...")
-        self._order_data = self._dao.load_order_data(city_uuid)
         self._order_data =self._order_data[OrderConfig.FEATURE_COLUMNS]
         
         # 数据清洗
@@ -24,11 +22,12 @@ class UserItemScore:
         self._order_data["sale_qty"] = ((1 / (1 + np.exp(-normalized))) * 100).flatten()
     
         
-    def generate_product_scores(self, save_path):
+    def generate_product_scores(self):
         self._order_data = self._order_data.rename(columns={'sale_qty': 'score'})
         self._order_data = self._order_data.sort_values(['product_code', 'score'], ascending=[True, False])
         self._score_data = self._order_data[['product_code', 'cust_code', 'score']]
-        self._score_data.to_csv(save_path, index=False, encoding="utf-8")
+        # self._score_data.to_csv(save_path, index=False, encoding="utf-8")
+        return self._score_data
         
         
 if __name__ == "__main__":

+ 8 - 9
models/recall/itemCF/similarity_matrix.py

@@ -7,15 +7,13 @@ from tqdm import tqdm
 
 
 class SimilarityMatrix:
-    def __init__(self, city_uuid):
-        self._dao = MySqlDao()
-        self._load_data(city_uuid)
+    def __init__(self, data):
+        self._order_data = data.copy()
+        self._load_data()
         self._build_co_occurace_matrix()
         
-    def _load_data(self, city_uuid):
+    def _load_data(self):
         """加载订单记录表"""
-        print("item-cf: 正在加载order_info...")
-        self._order_data = self._dao.load_order_data(city_uuid)
         self._order_data =self._order_data[OrderConfig.FEATURE_COLUMNS]
         
         # 数据清洗
@@ -49,7 +47,7 @@ class SimilarityMatrix:
                 self._co_occurrence_matrix[idx1, idx2] += 1
                 self._co_occurrence_matrix[idx2, idx1] += 1
                 
-    def calculate_similarity_matrix(self, save_path):
+    def generate_similarity_matrix(self):
         """使用向量计算商铺之间的相似度矩阵"""
         # 计算每个商铺售卖品规的总次数
         shop_counts = self._order_data.groupby("cust_code").size()
@@ -68,9 +66,10 @@ class SimilarityMatrix:
         
         # 保存结果
         self._similarity_matrix = pd.DataFrame(self._similarity_matrix, index=self._shops, columns=self._shops)
-        self._similarity_matrix.to_csv(save_path, index=True, encoding="utf-8")
+        # self._similarity_matrix.to_csv(save_path, index=True, encoding="utf-8")
+        return self._similarity_matrix
         
 if __name__ == "__main__":
     similarity_matrix_save_path = "./data/itemcf/similarity.csv"
     similarity_matrix = SimilarityMatrix("00000000000000000000000011445301")
-    similarity_matrix.calculate_similarity_matrix(similarity_matrix_save_path)
+    similarity_matrix.generate_similarity_matrix(similarity_matrix_save_path)