| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- from database import RedisDatabaseHelper, MySqlDao
- import pandas as pd
- from models import UserItemScore, SimilarityMatrix
- import numpy as np
- from tqdm import tqdm
- from scipy.sparse import csr_matrix
- from joblib import Parallel, delayed
- class ItemCFModel:
- def __init__(self):
- self._recommendations = {}
- self._dao = MySqlDao()
-
- def train(self, city_uuid, n=300, k=100, top_n=300, n_jobs=4):
- # self._score_df = pd.read_csv(score_path)
- # self._similarity_df = pd.read_csv(similatity_path, index_col=0)
- print("itemcf: 正在加载order_info...")
- self._order_data = self._dao.load_order_data(city_uuid)
- print("正在计算品规培育分数...")
- self._score_df = UserItemScore(self._order_data).generate_product_scores()
- print("正在计算商户相似度矩阵...")
- self._similarity_df = SimilarityMatrix(self._order_data).generate_similarity_matrix()
-
- similarity_matrix = csr_matrix(self._similarity_df.values)
- shop_index = {shop: idx for idx, shop in enumerate(self._similarity_df.index)}
- index_shop = {idx: shop for idx, shop in enumerate(self._similarity_df.index)}
-
- def process_product(product_code, scores):
- # 获取热度最高的n个商户
- top_n_shops = scores.nlargest(n, "score")["cust_code"].values
- top_n_indices = [shop_index[shop] for shop in top_n_shops]
-
- # 找到每个商户最相似的k个商户
- similar_shops = {}
- for shop_idx in top_n_indices:
- similarities = similarity_matrix[shop_idx].toarray().flatten()
- similar_indices = np.argpartition(similarities, -k-1)[-k-1:]
- similar_indices = similar_indices[similar_indices != shop_idx][:k]
- similar_shops[index_shop[shop_idx]] = [index_shop[idx] for idx in similar_indices]
-
- # 生成候选商户列表
- candidate_shops = list(set(top_n_shops).union(set([m for sublist in similar_shops.values() for m in sublist])))
- candidate_indices = [shop_index[shop] for shop in candidate_shops]
-
- # 计算每个候选商户的兴趣得分
- interest_scores = {}
- for candidate_idx in candidate_indices:
- interest_score = 0
- for shop_idx in top_n_indices:
- if index_shop[candidate_idx] in similar_shops[index_shop[shop_idx]]:
- shop_score = scores[scores["cust_code"]==index_shop[shop_idx]]["score"].values[0]
- interest_score += shop_score * similarity_matrix[shop_idx, candidate_idx]
- interest_scores[index_shop[candidate_idx]] = interest_score
-
- # 将候选商户的兴趣得分转换为字典列表,并按照从大到小排列
- sorted_candidates = sorted([{shop_id: s} for shop_id, s in interest_scores.items()],
- key=lambda x: list(x.values())[0], reverse=True)[:top_n]
-
- return product_code, sorted_candidates
-
- # 并行处理每个品规
- results = Parallel(n_jobs=n_jobs)(delayed(process_product)(product_code, scores)
- for product_code, scores in tqdm(self._score_df.groupby("product_code"), desc="train:正在计算候选得分"))
-
- # 存储结果
- self._recommendations = {product_code: sorted_candidates for product_code, sorted_candidates in results}
- self.to_redis_zset(city_uuid)
-
- def to_redis_zset(self, city_uuid):
- """
- 将 self._recommendations 中的数据保存到 Redis 的 Sorted Set (ZSET) 中
- 存储格式为 fc:product_code,其中商户 ID 作为成员,得分作为分数
- """
- redis_db = RedisDatabaseHelper()
-
- # 存redis之前,先进行删除操作
- pattern = f"fc:{city_uuid}:*"
- keys_to_delete = redis_db.redis.keys(pattern)
- if keys_to_delete:
- redis_db.redis.delete(*keys_to_delete)
-
- for product_code, recommendations in tqdm(self._recommendations.items(), desc="train:正在存储推荐结果"):
- redis_key = f"fc:{city_uuid}:{product_code}"
- zset_data = {}
- for rec in recommendations:
- for shop_id, score in rec.items():
- try:
- zset_data[shop_id] = float(score)
- except ValueError as e:
- print(f"Error converting score to float for shop_id {shop_id}: {score}")
- raise e
-
- redis_db.redis.zadd(redis_key, zset_data)
-
- if __name__ == "__main__":
- itemcf_model = ItemCFModel()
- itemcf_model.train("00000000000000000000000011445301", n_jobs=4)
|