| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- import gc
- import joblib
- # from dao import Redis, get_product_by_id, get_custs_by_ids, load_cust_data_from_mysql
- from database import RedisDatabaseHelper, MySqlDao
- from models.rank.data import DataLoader
- from models.rank.data import ProductConfig, CustConfig, ShopConfig, ImportanceFeaturesMap
- from models.rank.data.utils import one_hot_embedding, sample_data_clear
- import numpy as np
- import pandas as pd
- from sklearn.preprocessing import StandardScaler
- import shap
- from tqdm import tqdm
- import os
- def generate_feats_map(product_data, cust_data):
- """组合卷烟、商户特征矩阵"""
- # 笛卡尔积联合
- cust_data["descartes"] = 1
- product_data["descartes"] = 1
- feats_map = pd.merge(cust_data, product_data, on="descartes").drop("descartes", axis=1)
- # recall_cust_list = feats_map["BB_RETAIL_CUSTOMER_CODE"].to_list()
- feats_map.drop('BB_RETAIL_CUSTOMER_CODE', axis=1, inplace=True)
- feats_map.drop('product_code', axis=1, inplace=True)
-
- # onehot编码
- onehot_feats = {**CustConfig.ONEHOT_CAT, **ProductConfig.ONEHOT_CAT, **ShopConfig.ONEHOT_CAT}
- onehot_columns = list(onehot_feats.keys())
- numeric_columns = feats_map.drop(onehot_columns, axis=1).columns
- feats_map = one_hot_embedding(feats_map, onehot_feats)
-
- # 数字特征归一化
- if len(numeric_columns) != 0:
- scaler = StandardScaler()
- feats_map[numeric_columns] = scaler.fit_transform(feats_map[numeric_columns])
-
- return feats_map
- class GbdtLrModel:
- def __init__(self, model_path):
- self.load_model(model_path)
- self.redis = RedisDatabaseHelper().redis
- self._mysql_dao = MySqlDao()
- self._explanier = None
-
- def load_model(self, model_path):
- models = joblib.load(model_path)
- self.gbdt_model, self.lr_model, self.onehot_encoder = models["lgbm_model"], models["lr_model"], models["onehot_encoder"]
-
- def get_cust_and_product_data(self, city_uuid, product_id):
- """从商户数据库中获取指定城市所有商户的id"""
- self.product_data = self._mysql_dao.get_product_by_id(city_uuid, product_id)[ProductConfig.FEATURE_COLUMNS]
- self.custs_data = self._mysql_dao.load_cust_data(city_uuid)[CustConfig.FEATURE_COLUMNS]
-
- def get_recommend_list(self, recommend_sample, recall_list):
-
- gbdt_preds = self.gbdt_model.predict(recommend_sample, pred_leaf=True)
- gbdt_feats_encoded = self.onehot_encoder.transform(gbdt_preds)
- scores = self.lr_model.predict_proba(gbdt_feats_encoded)[:, 1]
-
- recommend_list = []
- for cust_id, score in zip(recall_list, scores):
- recommend_list.append({cust_id: float(score)})
- recommend_list.append({"cust_code": cust_id, "recommend_score": score})
-
- recommend_list = sorted(
- [item for item in recommend_list if "recommend_score" in item],
- key=lambda x: x["recommend_score"],
- reverse=True
- )
-
- return recommend_list
-
-
- def generate_feats_importance(self):
- """生成特征重要性"""
- # 获取GBDT模型的特征重要性
- feats_importance = self.gbdt_model.feature_importances_
- # 获取特征名称
- feats_names = self.gbdt_model.feature_name_
- importance_dict = dict(zip(feats_names, feats_importance))
-
- onehot_feats = {**CustConfig.ONEHOT_CAT, **ShopConfig.ONEHOT_CAT, **ProductConfig.ONEHOT_CAT}
- for feat, categories in onehot_feats.items():
- related_columns = [f"{feat}_{item}" for item in categories]
-
- if related_columns:
- # 合并类别重要性
- combined_importance = sum(importance_dict[col] for col in related_columns)
- # 删除onehot类别列
- for col in related_columns:
- del importance_dict[col]
- # 添加合并后的重要性
- importance_dict[feat] = combined_importance
-
- # 排序
- sorted_importance = sorted(importance_dict.items(), key=lambda x: x[1], reverse=True)
-
- # 输出特征重要性
- cust_features_importance = []
- product_features_importance = []
-
- for feat, importance in sorted_importance:
- if feat in list(ImportanceFeaturesMap.CUSTOM_FEATURES_MAP.keys()):
- cust_features_importance.append({ImportanceFeaturesMap.CUSTOM_FEATURES_MAP[feat]: float(importance)})
- if feat in list(ImportanceFeaturesMap.SHOPING_FEATURES_MAP.keys()):
- cust_features_importance.append({ImportanceFeaturesMap.SHOPING_FEATURES_MAP[feat]: float(importance)})
- if feat in list(ImportanceFeaturesMap.PRODUCT_FEATRUES_MAP.keys()):
- product_features_importance.append({ImportanceFeaturesMap.PRODUCT_FEATRUES_MAP[feat]: float(importance)})
-
- return cust_features_importance, product_features_importance
-
- def generate_shap_interance(self, data):
- # 初始化SHAP解释器
- if self._explanier is None:
- self._explanier = shap.TreeExplainer(self.gbdt_model)
-
- # 获取数据基本信息
- n_samples = len(data)
- n_features = len(data.columns)
- batch_size = 500 # 可根据内存调整
-
- # 创建临时内存映射文件
- # temp_dir = tempfile.mkdtemp()
- temp_dir = "./data/tmp"
- temp_file = os.path.join(temp_dir, "shap_interaction_temp.dat")
- if os.path.exists(temp_dir):
- os.remove(temp_file)
- else:
- os.makedirs(temp_dir)
-
- try:
- # 预创建内存映射文件
- fp_shape = (n_samples, n_features, n_features)
- fp = np.memmap(temp_file, dtype=np.float32,
- mode='w+',
- shape=fp_shape)
-
- # 分批计算并存储SHAP交互值
- for i in tqdm(range(0, n_samples, batch_size), desc="计算SHAP交互值..."):
- batch_data = data.iloc[i:i+batch_size]
- batch_interaction = self._explanier.shap_interaction_values(batch_data)
- fp[i:i+len(batch_interaction)] = batch_interaction.astype(np.float32)
- fp.flush() # 确保数据写入磁盘
-
- # 分批计算均值
- mean_interaction = np.zeros((n_features, n_features), dtype=np.float32)
- for i in tqdm(range(0, n_samples, batch_size), desc="计算均值..."):
- batch = fp[i:i+batch_size] # 读取批数据并取绝对值
- mean_interaction += batch.sum(axis=0) # 按批累加
-
- mean_interaction /= n_samples # 计算最终均值
-
- # 构建交互矩阵DataFrame
- interaction_df = pd.DataFrame(
- mean_interaction,
- index=data.columns,
- columns=data.columns
- )
-
- # 分离卷烟和商户特征
- product_feats = [
- f"{feat}_{item}"
- for feat, categories in ProductConfig.ONEHOT_CAT.items()
- for item in categories
- ]
-
- cust_feats = [
- f"{feat}_{item}"
- for feat, categories in {**CustConfig.ONEHOT_CAT, **ShopConfig.ONEHOT_CAT}.items()
- for item in categories
- ]
-
- # 提取交叉区块
- cross_matrix = interaction_df.loc[product_feats, cust_feats]
-
- # 转换为长格式
- stacked = cross_matrix.stack().reset_index()
- stacked.columns = ['product_feat', 'cust_feat', 'relation']
-
- # 过滤掉零值或NaN的配对
- filtered = stacked[
- (stacked['relation'].abs() > 1e-6) & # 排除极小值
- (~stacked['relation'].isna()) # 排除NaN
- ].copy()
-
- # 排序结果
- results = (
- filtered
- .sort_values('relation', ascending=False)
- .to_dict('records')
- )
-
- # 替换特征名称
- feats_name_map = {
- **ImportanceFeaturesMap.CUSTOM_FEATURES_MAP,
- **ImportanceFeaturesMap.SHOPING_FEATURES_MAP,
- **ImportanceFeaturesMap.PRODUCT_FEATRUES_MAP
- }
-
- for item in results:
- # 处理产品特征名
- product_f = item["product_feat"]
- product_infos = product_f.split("_")
- item["product_feat"] = f"{feats_name_map['_'.join(product_infos[:-1])]}({product_infos[-1]})"
-
- # 处理客户特征名
- cust_f = item["cust_feat"]
- cust_infos = cust_f.split("_")
- item["cust_feat"] = f"{feats_name_map['_'.join(cust_infos[:-1])]}({cust_infos[-1]})"
-
- # 返回最终结果
- return pd.DataFrame(results, columns=['product_feat', 'cust_feat', 'relation'])
-
- finally:
- # 清理临时文件
- try:
- del fp # 必须先删除内存映射对象
- gc.collect()
- os.remove(temp_file)
- os.rmdir(temp_dir)
- except Exception as e:
- print(f"清理临时文件时出错: {e}")
-
- if __name__ == "__main__":
- model_path = "./models/rank/weights/00000000000000000000000011445301/gbdtlr_model.pkl"
- city_uuid = "00000000000000000000000011445301"
- product_id = "110102"
- gbdt_sort = GbdtLrModel(model_path)
- # gbdt_sort.sort(city_uuid, product_id)
-
- # cust_features_importance, product_features_importance = gbdt_sort.generate_feats_importance()
- # cust_df = pd.DataFrame([
- # {"Features": list(item.keys())[0], "Importance": list(item.values())[0]}
- # for item in cust_features_importance
- # ])
- # cust_df.to_csv("./data/cust_feats.csv", index=False)
-
- # product_df = pd.DataFrame([
- # {"Features": list(item.keys())[0], "Importance": list(item.values())[0]}
- # for item in product_features_importance
- # ])
- # product_df.to_csv("./data/product_feats.csv", index=False)
- data, _ = DataLoader("./data/gbdt/train_data.csv").split_dataset()
- data = data["data"].sample(n=300, replace=True, random_state=42)
- data.to_csv("./data/data.csv", index=False)
- # data = data["data"]
-
-
|