from dao.redis_db import Redis import pandas as pd import numpy as np from tqdm import tqdm from scipy.sparse import csr_matrix from joblib import Parallel, delayed import joblib class ItemCFModel: def __init__(self): self._recommendations = {} def train(self, score_path, similatity_path, n=100, k=10, top_n=100, n_jobs=4): self._score_df = pd.read_csv(score_path) self._similarity_df = pd.read_csv(similatity_path, index_col=0) self._similarity_matrix = csr_matrix(self._similarity_df.values) self._shop_index = {shop: idx for idx, shop in enumerate(self._similarity_df.index)} self._index_shop = {idx: shop for idx, shop in enumerate(self._similarity_df.index)} def process_product(product_code, scores): # 获取热度最高的n个商户 top_n_shops = scores.nlargest(n, "SCORE")["BB_RETAIL_CUSTOMER_CODE"].values top_n_indices = [self._shop_index[shop] for shop in top_n_shops] # 找到每个商户最相似的k个商户 similar_shops = {} for shop_idx in top_n_indices: similarities = self._similarity_matrix[shop_idx].toarray().flatten() similar_indices = np.argpartition(similarities, -k-1)[-k-1:] similar_indices = similar_indices[similar_indices != shop_idx][:k] similar_shops[self._index_shop[shop_idx]] = [self._index_shop[idx] for idx in similar_indices] # 生成候选商户列表 candidate_shops = list(set([m for sublist in similar_shops.values() for m in sublist])) candidate_indices = [self._shop_index[shop] for shop in candidate_shops] # 计算每个候选商户的兴趣得分 interest_scores = {} for candidate_idx in candidate_indices: interest_score = 0 for shop_idx in top_n_indices: if self._index_shop[candidate_idx] in similar_shops[self._index_shop[shop_idx]]: shop_score = scores[scores["BB_RETAIL_CUSTOMER_CODE"]==self._index_shop[shop_idx]]["SCORE"].values[0] interest_score += shop_score * self._similarity_matrix[shop_idx, candidate_idx] interest_scores[self._index_shop[candidate_idx]] = interest_score # 将候选商户的兴趣得分转换为字典列表,并按照从大到小排列 sorted_candidates = sorted([{shop_id: s} for shop_id, s in interest_scores.items()], key=lambda x: list(x.values())[0], reverse=True)[:top_n] return product_code, sorted_candidates # 并行处理每个品规 results = Parallel(n_jobs=n_jobs)(delayed(process_product)(product_code, scores) for product_code, scores in tqdm(self._score_df.groupby("PRODUCT_CODE"), desc="train:正在计算候选得分")) print(len(results)) # 存储结果 self._recommendations = {product_code: sorted_candidates for product_code, sorted_candidates in results} self.to_redis_zset() def to_redis_zset(self): """ 将 self._recommendations 中的数据保存到 Redis 的 Sorted Set (ZSET) 中 存储格式为 fc:product_code,其中商户 ID 作为成员,得分作为分数 """ redis_db = Redis() for product_code, recommendations in tqdm(self._recommendations.items(), desc="train:正在存储推荐结果"): redis_key = f"fc:{product_code}" zset_data = {} for rec in recommendations: for shop_id, score in rec.items(): try: zset_data[shop_id] = float(score) except ValueError as e: print(f"Error converting score to float for shop_id {shop_id}: {score}") raise e redis_db.redis.zadd(redis_key, zset_data) if __name__ == "__main__": score_path = "./models/recall/itemCF/matrix/score.csv" similarity_path = "./models/recall/itemCF/matrix/similarity.csv" # itemcf_model = ItemCFModel() # itemcf_model.train(score_path, similarity_path, n_jobs=4) # recommend_list = itemcf_model.inference(110111) # itemcf_model.to_redis_zset() # print(len(recommend_list)) # print(recommend_list) # joblib.dump(itemcf_model, "itemCF.model") # model = joblib.load("./itemCF.model") # recommend_list = model.inference(110102) # print(len(recommend_list)) # print(recommend_list) data = pd.read_csv(similarity_path, index_col=0) print(data)