from dao.dao import load_cust_data_from_mysql, load_product_data_from_mysql, load_order_data_from_mysql from models.rank.data.config import CustConfig, ProductConfig import pandas as pd class DataProcess(): def __init__(self, city_uuid): print("正在加载cust_info...") self._cust_data = load_cust_data_from_mysql(city_uuid) print("正在加载product_info...") self._product_data = load_product_data_from_mysql(city_uuid) print("正在加载order_info...") self._order_data = load_order_data_from_mysql(city_uuid) def data_process(self): """数据预处理""" # 1. 获取指定的特征组合 self._cust_data = self._cust_data[CustConfig.FEATURE_COLUMNS] self._product_data = self._product_data[ProductConfig.FEATURE_COLUMNS] # 2. 数据清洗 self._clean_cust_data() self._clean_product_data() # 3. 将零售户信息表与卷烟信息表进行笛卡尔积连接 self._descartes() # 4. 根据order表中的信息给数据打标签 self._labeled_data() # 5. 选取训练样本 self._generate_train_data() def _clean_cust_data(self): """用户信息表数据清洗""" # 根据配置规则清洗数据 for feature, rules, in CustConfig.CLEANING_RULES.items(): if rules["type"] == "num": # 先将数值型字符串转换为数值 self._cust_data[feature] = pd.to_numeric(self._cust_data[feature], errors="coerce") if rules["method"] == "fillna": if rules["opt"] == "fill": self._cust_data[feature] = self._cust_data[feature].fillna(rules["value"]) elif rules["opt"] == "replace": self._cust_data[feature] = self._cust_data[feature].fillna(self._cust_data[rules["value"]]) elif rules["opt"] == "mean": self._cust_data[feature] = self._cust_data[feature].fillna(self._cust_data[feature].mean()) def _clean_product_data(self): """卷烟信息表数据清洗""" for feature, rules, in ProductConfig.CLEANING_RULES.items(): if rules["type"] == "num": self._product_data[feature] = pd.to_numeric(self._product_data[feature], errors="coerce") if rules["method"] == "fillna": if rules["opt"] == "fill": self._product_data[feature] = self._product_data[feature].fillna(rules["value"]) elif rules["opt"] == "mean": self._product_data[feature] = self._product_data[feature].fillna(self._product_data[feature].mean()) def _descartes(self): """将零售户信息与卷烟信息进行笛卡尔积连接""" self._cust_data["descartes"] = 1 self._product_data["descartes"] = 1 self._descartes_data = pd.merge(self._cust_data, self._product_data, on="descartes").drop("descartes", axis=1) def _labeled_data(self): """根据order表信息给descartes_data数据打标签""" # 获取order表中的正样本组合 order_combinations = self._order_data[["BB_RETAIL_CUSTOMER_CODE", "PRODUCT_CODE"]].drop_duplicates() order_set = set(zip(order_combinations["BB_RETAIL_CUSTOMER_CODE"], order_combinations["PRODUCT_CODE"])) # 在descartes_data中打标签:正样本为1,负样本为2 self._descartes_data['label'] = self._descartes_data.apply( lambda row: 1 if (row['BB_RETAIL_CUSTOMER_CODE'], row['product_code']) in order_set else 0, axis=1) def _generate_train_data(self): """从descartes_data中生成训练数据""" positive_samples = self._descartes_data[self._descartes_data["label"] == 1] negative_samples = self._descartes_data[self._descartes_data["label"] == 0] positive_count = len(positive_samples) negative_count = min(2 * positive_count, len(negative_samples)) print(positive_count) print(negative_count) # 随机抽取2倍正样本数量的负样本 negative_samples_sampled = negative_samples.sample(n=negative_count, random_state=42) # 合并正负样本 self._train_data = pd.concat([positive_samples, negative_samples_sampled], axis=0) self._train_data = self._train_data.sample(frac=1, random_state=42).reset_index(drop=True) # 保存训练数据 self._train_data.to_csv("./models/rank/data/gbdt_data.csv", index=False) if __name__ == '__main__': processor = DataProcess("00000000000000000000000011445301") processor.data_process()