| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- #!/usr/bin/env python
- # -*- encoding: utf-8 -*-
- '''
- @filename : itemCF.py
- @description : 基于物品的协同过滤算法
- @time : 2025/01/21/00
- @author : Sherlock1011 & Min1027
- @Version : 1.0
- '''
- import joblib
- import pandas as pd
- import numpy as np
- from sqlalchemy import create_engine, text
- from dao.mysql_client import Mysql
- from decimal import Decimal
- # 算法封装成一个类
- class ItemCFModel:
- """TODO 1. 将结果保存到redis数据库中"""
- def __init__(self):
- self.weights = {
- "MONTH6_SALE_QTY": Decimal(0.1),
- "MONTH6_SALE_AMT": Decimal(0.1),
- "MONTH6_GROSS_PROFIT_RATE": Decimal(0.03),
- "MONTH6_SALE_QTY_YOY": Decimal(0.1),
- "MONTH6_SALE_QTY_MOM": Decimal(0.1),
- "MONTH6_SALE_AMT_YOY": Decimal(0.1),
- "MONTH6_SALE_AMT_MOM": Decimal(0.1),
- "ORDER_FULLORDR_RATE": Decimal(0.1),
- "NEW_PRODUCT_ORDER_QTY_OCC": Decimal(0.03),
- "LISTING_RATE": Decimal(0.1),
- "OUT_STOCK_DAYS": Decimal(0.02),
- "RETAIL_PRICE_INDEX": Decimal(0.02)
- }
- # 均值方差归一化函数
- def standardize_column(self, column):
- if(column.max() == column.min() and column.max() == 0):
- return 0
- elif (column.max() == column.min() and column.max() != 0):
- return 1
- else:
- return (column - column.min()) / (column.max() - column.min())
- # 按照品规分组归一化和计算热度值
- def calculate_heart_per_product(self, group):
- for column in self.weights.keys():
- if column == "OUT_STOCK_DAYS":
- group[column] = 1 - self.standardize_column(group[column])
- else:
- group[column] = self.standardize_column(group[column])
- group["FC_SCORE"] = group.apply(
- lambda row: sum(Decimal(row[col]) * weight for col, weight in self.weights.items()) * 100, axis=1
- )
- return group
- # 主算法函数:计算推荐结果
- def recommend(self, order_data):
- # 去除重复值和填补缺失值
- order_data.drop_duplicates(inplace=True)
- order_data.fillna(0, inplace=True)
- # 应用分组计算
- df_result = order_data.groupby("PRODUCT_CODE").apply(self.calculate_heart_per_product).reset_index(drop=True)
- df_result = df_result.sort_values(by=["PRODUCT_CODE", "FC_SCORE"], ascending=[True, False])
- recomend_list = []
- for product_code, group in df_result.groupby("PRODUCT_CODE"):
- group_values = group[["BB_RETAIL_CUSTOMER_CODE", "FC_SCORE"]].apply(
- lambda row: {row["BB_RETAIL_CUSTOMER_CODE"]: row["FC_SCORE"]}, axis=1
- ).tolist()
- recomend_list.append({"keys": product_code, "value": group_values})
-
- return recomend_list
- def load_data_from_dataset():
- client = Mysql()
- # 创建会话
- session = client.create_session()
-
- # 使用 session 执行查询等操作
- try:
- results = session.execute(
- text("select * from tads_brandcul_cust_order")
- ).all()
- # 将结果转换为DataFrame
- df = pd.DataFrame(results).drop(columns=['stat_month']) # 提取列名
-
- finally:
- session.close()
-
- return df
-
- if __name__ == "__main__":
- # 创建一个 ItemCF 类的实例
- item_cf_algorithm = ItemCFModel()
- # 读取数据
- # order_data = pd.read_csv('order.csv')
- order_data = load_data_from_dataset()
- # 调用算法
- recomand_list = item_cf_algorithm.recommend(order_data)
- print(recomand_list)
- # # 序列化
- # joblib.dump(item_cf_algorithm, "item_cf.model")
|