import joblib # from dao import Redis, get_product_by_id, get_custs_by_ids, load_cust_data_from_mysql from database import RedisDatabaseHelper, MySqlDao from models.rank.data import DataLoader from models.rank.data import ProductConfig, CustConfig, ShopConfig, ImportanceFeaturesMap from models.rank.data.utils import one_hot_embedding, sample_data_clear import numpy as np import pandas as pd from sklearn.preprocessing import StandardScaler import shap from tqdm import tqdm from utils import split_relation_subtable import os import tempfile class GbdtLrModel: def __init__(self, model_path): self.load_model(model_path) self.redis = RedisDatabaseHelper().redis self._mysql_dao = MySqlDao() self._explanier = None def load_model(self, model_path): models = joblib.load(model_path) self.gbdt_model, self.lr_model, self.onehot_encoder = models["gbdt_model"], models["lr_model"], models["onehot_encoder"] # def get_recall_list(self, city_uuid, product_id): # """根据卷烟id获取召回的商铺列表""" # key = f"fc:{city_uuid}:{product_id}" # self.recall_cust_list = self.redis.zrange(key, 0, -1, withscores=False) # def load_recall_data(self, city_uuid, product_id): # self.product_data = self._mysql_dao.get_product_by_id(city_uuid, product_id)[ProductConfig.FEATURE_COLUMNS] # self.custs_data = self._mysql_dao.get_cust_by_ids(city_uuid, self.recall_cust_list)[CustConfig.FEATURE_COLUMNS] def get_cust_and_product_data(self, city_uuid, product_id): """从商户数据库中获取指定城市所有商户的id""" self.product_data = self._mysql_dao.get_product_by_id(city_uuid, product_id)[ProductConfig.FEATURE_COLUMNS] self.custs_data = self._mysql_dao.load_cust_data(city_uuid)[CustConfig.FEATURE_COLUMNS] def generate_feats_map(self, city_uuid, product_id): """组合卷烟、商户特征矩阵""" # self.get_recall_list(city_uuid, product_id) # self.load_recall_data(city_uuid, product_id) self.get_cust_and_product_data(city_uuid, product_id) # 做数据清洗 self.product_data = sample_data_clear(self.product_data, ProductConfig) self.custs_data = sample_data_clear(self.custs_data, CustConfig) # 笛卡尔积联合 self.custs_data["descartes"] = 1 self.product_data["descartes"] = 1 self.feats_map = pd.merge(self.custs_data, self.product_data, on="descartes").drop("descartes", axis=1) self.recall_cust_list = self.feats_map["BB_RETAIL_CUSTOMER_CODE"].to_list() self.feats_map.drop('BB_RETAIL_CUSTOMER_CODE', axis=1, inplace=True) self.feats_map.drop('product_code', axis=1, inplace=True) # onehot编码 onehot_feats = {**CustConfig.ONEHOT_CAT, **ProductConfig.ONEHOT_CAT} onehot_columns = list(onehot_feats.keys()) numeric_columns = self.feats_map.drop(onehot_columns, axis=1).columns self.feats_map = one_hot_embedding(self.feats_map, onehot_feats) # 数字特征归一化 scaler = StandardScaler() self.feats_map[numeric_columns] = scaler.fit_transform(self.feats_map[numeric_columns]) def recommend_sort(self, city_uuid, product_id): self.generate_feats_map(city_uuid, product_id) gbdt_preds = self.gbdt_model.apply(self.feats_map)[:, :, 0] gbdt_feats_encoded = self.onehot_encoder.transform(gbdt_preds) scores = self.lr_model.predict_proba(gbdt_feats_encoded)[:, 1] self.recommend_list = [] for cust_id, score in zip(self.recall_cust_list, scores): self.recommend_list.append({cust_id: float(score)}) self.recommend_list = sorted(self.recommend_list, key=lambda x: list(x.values())[0], reverse=True) # for res in self.recommend_list[:200]: # print(res) return self.recommend_list def generate_feats_importance(self): """生成特征重要性""" # 获取GBDT模型的特征重要性 feats_importance = self.gbdt_model.feature_importances_ # 获取特征名称 feats_names = self.gbdt_model.feature_names_in_ importance_dict = dict(zip(feats_names, feats_importance)) onehot_feats = {**CustConfig.ONEHOT_CAT, **ShopConfig.ONEHOT_CAT, **ProductConfig.ONEHOT_CAT} for feat, categories in onehot_feats.items(): related_columns = [f"{feat}_{item}" for item in categories] if related_columns: # 合并类别重要性 combined_importance = sum(importance_dict[col] for col in related_columns) # 删除onehot类别列 for col in related_columns: del importance_dict[col] # 添加合并后的重要性 importance_dict[feat] = combined_importance # 排序 sorted_importance = sorted(importance_dict.items(), key=lambda x: x[1], reverse=True) # 输出特征重要性 cust_features_importance = [] product_features_importance = [] for feat, importance in sorted_importance: if feat in list(ImportanceFeaturesMap.CUSTOM_FEATURES_MAP.keys()): cust_features_importance.append({ImportanceFeaturesMap.CUSTOM_FEATURES_MAP[feat]: float(importance)}) if feat in list(ImportanceFeaturesMap.SHOPING_FEATURES_MAP.keys()): cust_features_importance.append({ImportanceFeaturesMap.SHOPING_FEATURES_MAP[feat]: float(importance)}) if feat in list(ImportanceFeaturesMap.PRODUCT_FEATRUES_MAP.keys()): product_features_importance.append({ImportanceFeaturesMap.PRODUCT_FEATRUES_MAP[feat]: float(importance)}) return cust_features_importance, product_features_importance def generate_shap_interance(self, data): # 初始化SHAP解释器 if self._explanier is None: self._explanier = shap.TreeExplainer(self.gbdt_model) # 获取数据基本信息 n_samples = len(data) n_features = len(data.columns) batch_size = 500 # 可根据内存调整 # 创建临时内存映射文件 temp_dir = tempfile.mkdtemp() temp_file = os.path.join(temp_dir, "shap_interaction_temp.dat") try: # 预创建内存映射文件 fp_shape = (n_samples, n_features, n_features) fp = np.memmap(temp_file, dtype=np.float32, mode='w+', shape=fp_shape) # 分批计算并存储SHAP交互值 for i in tqdm(range(0, n_samples, batch_size), desc="计算SHAP交互值..."): batch_data = data.iloc[i:i+batch_size] batch_interaction = self._explanier.shap_interaction_values(batch_data) fp[i:i+len(batch_interaction)] = batch_interaction.astype(np.float32) fp.flush() # 确保数据写入磁盘 print("SHAP交互值计算并存储完成") # 分批计算均值 mean_interaction = np.zeros((n_features, n_features), dtype=np.float32) for i in tqdm(range(0, n_samples, batch_size), desc="计算均值..."): batch = np.abs(fp[i:i+batch_size]) # 读取批数据并取绝对值 mean_interaction += batch.sum(axis=0) # 按批累加 mean_interaction /= n_samples # 计算最终均值 print("均值计算完成") # 构建交互矩阵DataFrame interaction_df = pd.DataFrame( mean_interaction, index=data.columns, columns=data.columns ) print("交互矩阵构建完成") # 分离卷烟和商户特征 product_feats = [ f"{feat}_{item}" for feat, categories in ProductConfig.ONEHOT_CAT.items() for item in categories ] cust_feats = [ f"{feat}_{item}" for feat, categories in {**CustConfig.ONEHOT_CAT, **ShopConfig.ONEHOT_CAT}.items() for item in categories ] print("特征分离完成") # 提取交叉区块 cross_matrix = interaction_df.loc[product_feats, cust_feats] print("交叉区块提取完成") # 转换为长格式 stacked = cross_matrix.stack().reset_index() stacked.columns = ['product_feat', 'cust_feat', 'relation'] print("转换为长格式完成") # 过滤掉零值或NaN的配对 filtered = stacked[ (stacked['relation'].abs() > 1e-6) & # 排除极小值 (~stacked['relation'].isna()) # 排除NaN ].copy() print("过滤完成") # 排序结果 results = ( filtered .sort_values('relation', ascending=False) .to_dict('records') ) print("排序完成") # 替换特征名称 feats_name_map = { **ImportanceFeaturesMap.CUSTOM_FEATURES_MAP, **ImportanceFeaturesMap.SHOPING_FEATURES_MAP, **ImportanceFeaturesMap.PRODUCT_FEATRUES_MAP } for item in results: # 处理产品特征名 product_f = item["product_feat"] product_infos = product_f.split("_") item["product_feat"] = f"{feats_name_map['_'.join(product_infos[:-1])]}({product_infos[-1]})" # 处理客户特征名 cust_f = item["cust_feat"] cust_infos = cust_f.split("_") item["cust_feat"] = f"{feats_name_map['_'.join(cust_infos[:-1])]}({cust_infos[-1]})" print("名称替换完成") # 返回最终结果 return pd.DataFrame(results, columns=['product_feat', 'cust_feat', 'relation']) finally: # 清理临时文件 try: del fp # 必须先删除内存映射对象 os.remove(temp_file) os.rmdir(temp_dir) except Exception as e: print(f"清理临时文件时出错: {e}") if __name__ == "__main__": model_path = "./models/rank/weights/00000000000000000000000011445301/gbdtlr_model.pkl" city_uuid = "00000000000000000000000011445301" product_id = "110102" gbdt_sort = GbdtLrModel(model_path) # gbdt_sort.sort(city_uuid, product_id) # cust_features_importance, product_features_importance = gbdt_sort.generate_feats_importance() # cust_df = pd.DataFrame([ # {"Features": list(item.keys())[0], "Importance": list(item.values())[0]} # for item in cust_features_importance # ]) # cust_df.to_csv("./data/cust_feats.csv", index=False) # product_df = pd.DataFrame([ # {"Features": list(item.keys())[0], "Importance": list(item.values())[0]} # for item in product_features_importance # ]) # product_df.to_csv("./data/product_feats.csv", index=False) data, _ = DataLoader("./data/gbdt/train_data.csv").split_dataset() # data = data["data"].sample(n=1000, replace=True, random_state=42) data = data["data"] result = gbdt_sort.generate_shap_interance(data) print("保存结果") result.to_csv("./data/feats_interaction.csv", index=False, encoding='utf-8-sig') split_relation_subtable(result, "./data")