import gc import joblib import re # from dao import Redis, get_product_by_id, get_custs_by_ids, load_cust_data_from_mysql from database import RedisDatabaseHelper, MySqlDao from models.rank.data import DataLoader from models.rank.data import ProductConfig, CustConfig, ImportanceFeaturesMap from models.rank.data.utils import one_hot_embedding, sample_data_clear import numpy as np import pandas as pd from sklearn.preprocessing import StandardScaler def clean_column_name(col): """清理列名中的特殊字符,与 one_hot_embedding 保持一致""" return (re.sub(r'[",\\\n\r\t\b\f]', '_', col) .replace(' ', '_')) import shap from tqdm import tqdm import os def generate_feats_map(product_data, cust_data): """组合卷烟、商户特征矩阵""" # 笛卡尔积联合 cust_data["descartes"] = 1 product_data["descartes"] = 1 feats_map = pd.merge(cust_data, product_data, on="descartes").drop("descartes", axis=1) # recall_cust_list = feats_map["BB_RETAIL_CUSTOMER_CODE"].to_list() feats_map.drop('cust_code', axis=1, inplace=True) feats_map.drop('product_code', axis=1, inplace=True) # onehot编码 onehot_feats = {**CustConfig.ONEHOT_CAT, **ProductConfig.ONEHOT_CAT} onehot_columns = list(onehot_feats.keys()) numeric_columns = feats_map.drop(onehot_columns, axis=1).columns feats_map = one_hot_embedding(feats_map, onehot_feats) # 数字特征归一化 if len(numeric_columns) != 0: scaler = StandardScaler() feats_map[numeric_columns] = scaler.fit_transform(feats_map[numeric_columns]) return feats_map class GbdtLrModel: def __init__(self, model_path): self.load_model(model_path) self.redis = RedisDatabaseHelper().redis self._mysql_dao = MySqlDao() self._explanier = None def load_model(self, model_path): models = joblib.load(model_path) self.gbdt_model, self.lr_model, self.onehot_encoder = models["lgbm_model"], models["lr_model"], models["onehot_encoder"] def get_cust_and_product_data(self, city_uuid, product_id): """从商户数据库中获取指定城市所有商户的id""" self.product_data = self._mysql_dao.get_product_by_id(city_uuid, product_id)[ProductConfig.FEATURE_COLUMNS] self.custs_data = self._mysql_dao.load_cust_data(city_uuid)[CustConfig.FEATURE_COLUMNS] def get_recommend_list(self, recommend_sample, recall_list): gbdt_preds = self.gbdt_model.predict(recommend_sample, pred_leaf=True) gbdt_feats_encoded = self.onehot_encoder.transform(gbdt_preds) scores = self.lr_model.predict_proba(gbdt_feats_encoded)[:, 1] * 100 recommend_list = [] for cust_id, score in zip(recall_list, scores): recommend_list.append({cust_id: float(score)}) recommend_list.append({"cust_code": cust_id, "recommend_score": score}) recommend_list = sorted( [item for item in recommend_list if "recommend_score" in item], key=lambda x: x["recommend_score"], reverse=True ) return recommend_list def generate_feats_importance(self): """生成特征重要性""" # 获取GBDT模型的特征重要性 feats_importance = self.gbdt_model.feature_importances_ # 获取特征名称 feats_names = self.gbdt_model.feature_name_ importance_dict = dict(zip(feats_names, feats_importance)) onehot_feats = {**CustConfig.ONEHOT_CAT, **ProductConfig.ONEHOT_CAT} for feat, categories in onehot_feats.items(): related_columns = [f"{feat}_{item}" for item in categories] if related_columns: # 合并类别重要性 combined_importance = sum(importance_dict[col] for col in related_columns) # 删除onehot类别列 for col in related_columns: del importance_dict[col] # 添加合并后的重要性 importance_dict[feat] = combined_importance # 排序 sorted_importance = sorted(importance_dict.items(), key=lambda x: x[1], reverse=True) # 输出特征重要性 cust_features_importance = [] product_features_importance = [] for feat, importance in sorted_importance: if feat in list(ImportanceFeaturesMap.CUSTOM_FEATURES_MAP.keys()): cust_features_importance.append({ImportanceFeaturesMap.CUSTOM_FEATURES_MAP[feat]: float(importance)}) if feat in list(ImportanceFeaturesMap.PRODUCT_FEATRUES_MAP.keys()): product_features_importance.append({ImportanceFeaturesMap.PRODUCT_FEATRUES_MAP[feat]: float(importance)}) return cust_features_importance, product_features_importance def generate_shap_interance(self, data): # 初始化SHAP解释器 if self._explanier is None: self._explanier = shap.TreeExplainer(self.gbdt_model) # 获取数据基本信息 n_samples = len(data) n_features = len(data.columns) batch_size = 200 # 可根据内存调整 # 创建临时内存映射文件 # temp_dir = tempfile.mkdtemp() temp_dir = "./data/tmp" temp_file = os.path.join(temp_dir, "shap_interaction_temp.dat") if os.path.exists(temp_dir): os.remove(temp_file) else: os.makedirs(temp_dir) try: # 预创建内存映射文件 fp_shape = (n_samples, n_features, n_features) fp = np.memmap(temp_file, dtype=np.float32, mode='w+', shape=fp_shape) # 分批计算并存储SHAP交互值 for i in tqdm(range(0, n_samples, batch_size), desc="计算SHAP交互值..."): batch_data = data.iloc[i:i+batch_size] batch_interaction = self._explanier.shap_interaction_values(batch_data) fp[i:i+len(batch_interaction)] = batch_interaction.astype(np.float32) fp.flush() # 确保数据写入磁盘 # 分批计算均值 mean_interaction = np.zeros((n_features, n_features), dtype=np.float32) for i in tqdm(range(0, n_samples, batch_size), desc="计算均值..."): batch = fp[i:i+batch_size] # 读取批数据并取绝对值 mean_interaction += batch.sum(axis=0) # 按批累加 mean_interaction /= n_samples # 计算最终均值 # 构建交互矩阵DataFrame interaction_df = pd.DataFrame( mean_interaction, index=data.columns, columns=data.columns ) # 分离卷烟和商户特征(应用列名清理) product_feats = [ clean_column_name(f"{feat}_{item}") for feat, categories in ProductConfig.ONEHOT_CAT.items() for item in categories ] cust_feats = [ clean_column_name(f"{feat}_{item}") for feat, categories in {**CustConfig.ONEHOT_CAT}.items() for item in categories ] # 提取交叉区块 cross_matrix = interaction_df.loc[product_feats, cust_feats] # 转换为长格式 stacked = cross_matrix.stack().reset_index() stacked.columns = ['product_feat', 'cust_feat', 'relation'] # 过滤掉零值或NaN的配对 filtered = stacked[ (stacked['relation'].abs() > 1e-6) & # 排除极小值 (~stacked['relation'].isna()) # 排除NaN ].copy() # 排序结果 results = ( filtered .sort_values('relation', ascending=False) .to_dict('records') ) # 替换特征名称 feats_name_map = { **ImportanceFeaturesMap.CUSTOM_FEATURES_MAP, **ImportanceFeaturesMap.PRODUCT_FEATRUES_MAP } for item in results: # 处理产品特征名 product_f = item["product_feat"] product_feat_name = None product_feat_value = None for key in feats_name_map.keys(): if product_f.startswith(key + "_"): product_feat_name = feats_name_map[key] product_feat_value = product_f[len(key) + 1:] break if product_feat_name: item["product_feat"] = f"{product_feat_name}({product_feat_value})" # 处理客户特征名 cust_f = item["cust_feat"] cust_feat_name = None cust_feat_value = None for key in feats_name_map.keys(): if cust_f.startswith(key + "_"): cust_feat_name = feats_name_map[key] cust_feat_value = cust_f[len(key) + 1:] break if cust_feat_name: item["cust_feat"] = f"{cust_feat_name}({cust_feat_value})" # 返回最终结果 return pd.DataFrame(results, columns=['product_feat', 'cust_feat', 'relation']) finally: # 清理临时文件 try: del fp # 必须先删除内存映射对象 gc.collect() os.remove(temp_file) os.rmdir(temp_dir) except Exception as e: print(f"清理临时文件时出错: {e}") if __name__ == "__main__": model_path = "./models/rank/weights/00000000000000000000000011445301/gbdtlr_model.pkl" city_uuid = "00000000000000000000000011445301" product_id = "110102" gbdt_sort = GbdtLrModel(model_path) # gbdt_sort.sort(city_uuid, product_id) # cust_features_importance, product_features_importance = gbdt_sort.generate_feats_importance() # cust_df = pd.DataFrame([ # {"Features": list(item.keys())[0], "Importance": list(item.values())[0]} # for item in cust_features_importance # ]) # cust_df.to_csv("./data/cust_feats.csv", index=False) # product_df = pd.DataFrame([ # {"Features": list(item.keys())[0], "Importance": list(item.values())[0]} # for item in product_features_importance # ]) # product_df.to_csv("./data/product_feats.csv", index=False) data, _ = DataLoader("./data/gbdt/train_data.csv").split_dataset() data = data["data"].sample(n=300, replace=True, random_state=42) data.to_csv("./data/data.csv", index=False) # data = data["data"]