#!/usr/bin/env python # -*- encoding: utf-8 -*- ''' @filename : itemCF.py @description : 基于物品的协同过滤算法 @time : 2025/01/21/00 @author : Sherlock1011 & Min1027 @Version : 1.0 ''' import joblib import pandas as pd import numpy as np from sqlalchemy import create_engine, text from dao.mysql_client import Mysql from decimal import Decimal # 算法封装成一个类 class ItemCFModel: """TODO 1. 将结果保存到redis数据库中""" def __init__(self): self.weights = { "MONTH6_SALE_QTY": Decimal(0.1), "MONTH6_SALE_AMT": Decimal(0.1), "MONTH6_GROSS_PROFIT_RATE": Decimal(0.03), "MONTH6_SALE_QTY_YOY": Decimal(0.1), "MONTH6_SALE_QTY_MOM": Decimal(0.1), "MONTH6_SALE_AMT_YOY": Decimal(0.1), "MONTH6_SALE_AMT_MOM": Decimal(0.1), "ORDER_FULLORDR_RATE": Decimal(0.1), "NEW_PRODUCT_ORDER_QTY_OCC": Decimal(0.03), "LISTING_RATE": Decimal(0.1), "OUT_STOCK_DAYS": Decimal(0.02), "RETAIL_PRICE_INDEX": Decimal(0.02) } # 均值方差归一化函数 def standardize_column(self, column): if(column.max() == column.min() and column.max() == 0): return 0 elif (column.max() == column.min() and column.max() != 0): return 1 else: return (column - column.min()) / (column.max() - column.min()) # 按照品规分组归一化和计算热度值 def calculate_heart_per_product(self, group): for column in self.weights.keys(): if column == "OUT_STOCK_DAYS": group[column] = 1 - self.standardize_column(group[column]) else: group[column] = self.standardize_column(group[column]) group["FC_SCORE"] = group.apply( lambda row: sum(Decimal(row[col]) * weight for col, weight in self.weights.items()) * 100, axis=1 ) return group # 主算法函数:计算推荐结果 def recommend(self, order_data): # 去除重复值和填补缺失值 order_data.drop_duplicates(inplace=True) order_data.fillna(0, inplace=True) # 应用分组计算 df_result = order_data.groupby("PRODUCT_CODE").apply(self.calculate_heart_per_product).reset_index(drop=True) df_result = df_result.sort_values(by=["PRODUCT_CODE", "FC_SCORE"], ascending=[True, False]) recomend_list = [] for product_code, group in df_result.groupby("PRODUCT_CODE"): group_values = group[["BB_RETAIL_CUSTOMER_CODE", "FC_SCORE"]].apply( lambda row: {row["BB_RETAIL_CUSTOMER_CODE"]: row["FC_SCORE"]}, axis=1 ).tolist() recomend_list.append({"keys": product_code, "value": group_values}) return recomend_list def load_data_from_dataset(): client = Mysql() # 创建会话 session = client.create_session() # 使用 session 执行查询等操作 try: results = session.execute( text("select * from tads_brandcul_cust_order") ).all() # 将结果转换为DataFrame df = pd.DataFrame(results).drop(columns=['stat_month']) # 提取列名 finally: session.close() return df if __name__ == "__main__": # 创建一个 ItemCF 类的实例 item_cf_algorithm = ItemCFModel() # 读取数据 # order_data = pd.read_csv('order.csv') order_data = load_data_from_dataset() # 调用算法 recomand_list = item_cf_algorithm.recommend(order_data) print(recomand_list) # # 序列化 # joblib.dump(item_cf_algorithm, "item_cf.model")