from database import MySqlDao from models.rank.data.config import CustConfig, ProductConfig, OrderConfig import os import pandas as pd from sklearn.preprocessing import MinMaxScaler from sklearn.utils import shuffle import numpy as np class DataProcess(): def __init__(self, city_uuid, save_dir): self._mysql_dao = MySqlDao() self.save_dir = save_dir print("正在加载cust_info...") self._cust_data = self._mysql_dao.load_cust_data(city_uuid) print("正在加载product_info...") self._product_data = self._mysql_dao.load_product_data(city_uuid) print("正在加载order_info...") self._order_data = self._mysql_dao.load_order_data(city_uuid) # self._order_data = self._mysql_dao.load_mock_order_data() print("正在加载shopping_info...") self._shopping_data = self._mysql_dao.load_shopping_data(city_uuid) def data_process(self): """数据预处理""" ori_train_data_save_path = os.path.join(self.save_dir, "original_train_data.csv") pos_train_data_save_path = os.path.join(self.save_dir, "pos_train_data.csv") shopping_train_data_save_path = os.path.join(self.save_dir, "shopping_train_data.csv") if os.path.exists(ori_train_data_save_path): os.remove(ori_train_data_save_path) if os.path.exists(pos_train_data_save_path): os.remove(pos_train_data_save_path) if os.path.exists(shopping_train_data_save_path): os.remove(shopping_train_data_save_path) # 1. 获取指定的特征组合 self._cust_data = self._cust_data[CustConfig.FEATURE_COLUMNS] self._product_data = self._product_data[ProductConfig.FEATURE_COLUMNS] self._order_data = self._order_data[OrderConfig.FEATURE_COLUMNS] # 2. 数据清洗 self._clean_cust_data() self._clean_product_data() self._clean_order_data() self._clean_shopping_data() # 3. 生成训练数据集 ori_train_data = self._generate_original_train_data(is_pos=False) shopping_train_data = self._generate_shopping_train_data() pos_train_data = self._generate_pos_train_data() ori_train_data.to_csv(ori_train_data_save_path, index=False) shopping_train_data.to_csv(shopping_train_data_save_path, index=False) pos_train_data.to_csv(pos_train_data_save_path, index=False) def _clean_cust_data(self): """用户信息表数据清洗""" # 根据配置规则清洗数据 for feature, rules, in CustConfig.CLEANING_RULES.items(): if rules["type"] == "num": # 先将数值型字符串转换为数值 self._cust_data[feature] = pd.to_numeric(self._cust_data[feature], errors="coerce") if rules["method"] == "fillna": if rules["opt"] == "fill": self._cust_data[feature] = self._cust_data[feature].fillna(rules["value"]) elif rules["opt"] == "replace": self._cust_data[feature] = self._cust_data[feature].fillna(self._cust_data[rules["value"]]) elif rules["opt"] == "mean": self._cust_data[feature] = self._cust_data[feature].fillna(self._cust_data[feature].mean()) self._cust_data[feature] = self._cust_data[feature].infer_objects(copy=False) def _clean_product_data(self): """卷烟信息表数据清洗""" for feature, rules, in ProductConfig.CLEANING_RULES.items(): if rules["type"] == "num": self._product_data[feature] = pd.to_numeric(self._product_data[feature], errors="coerce") if rules["method"] == "fillna": if rules["opt"] == "fill": self._product_data[feature] = self._product_data[feature].fillna(rules["value"]) elif rules["opt"] == "mean": self._product_data[feature] = self._product_data[feature].fillna(self._product_data[feature].mean()) self._product_data[feature] = self._product_data[feature].infer_objects(copy=False) def _clean_order_data(self): remaining_cols = self._order_data.columns.drop(OrderConfig.POSFEATURES) # 数据清洗时先不对pos数据做处理 col_all_missing = remaining_cols[self._order_data[remaining_cols].isnull().all()].to_list() self._order_data = self._order_data.drop(col_all_missing) # 去除重复值和填补缺失值 self._order_data.drop_duplicates(inplace=True) self._order_data[remaining_cols.drop(remaining_cols)].fillna(0, inplace=True) self._order_data = self._order_data.infer_objects(copy=False) def _clean_shopping_data(self): """处理商圈数据缺省值""" self._shopping_data.drop(["cust_uuid", "longitude", "latitude", "range_radius"], axis=1, inplace=True) remaining_cols = self._shopping_data.columns.drop(["city_uuid", "cust_code"]) col_with_missing = remaining_cols[self._shopping_data[remaining_cols].isnull().any()].tolist() # 判断有缺失的字段 col_all_missing = remaining_cols[self._shopping_data[remaining_cols].isnull().all()].to_list() # 全部缺失的字段 col_partial_missing = list(set(col_with_missing) - set(col_all_missing)) # 部分缺失的字段 for col in col_partial_missing: self._shopping_data[col] = self._shopping_data[col].fillna(self._shopping_data[col].mean()) for col in col_all_missing: self._shopping_data[col] = self._shopping_data[col].fillna(0).infer_objects(copy=False) def _generate_original_train_data(self, is_pos): union_data = self._union_order_cust_product(is_pos) scored_data = self._calculate_score(union_data) labeled_data = self._labeled_data(scored_data) # labeled_data.to_csv(save_path, index=False) return labeled_data def _generate_pos_train_data(self): pos_data = self._generate_original_train_data(is_pos=True) pos_data.dropna(subset=['YLT_TURNOVER_RATE'], inplace=True) return pos_data def _generate_shopping_train_data(self): orignal_data = self._generate_original_train_data(is_pos=False) cust_feats = self._shopping_data.set_index("cust_code") shopping_train_data = orignal_data.join(cust_feats, on="BB_RETAIL_CUSTOMER_CODE", how="inner") return shopping_train_data def _union_order_cust_product(self, is_pos): """联合order表、商户表、卷烟表""" union_data = self._order_data.copy() if not is_pos: union_data.drop(OrderConfig.POSFEATURES, axis=1, inplace=True) union_data.rename(columns={"PRODUCT_CODE": "product_code"}, inplace=True) # union_data = union_data.drop(OrderConfig.POSFEATURES) # 去除pos数据特征字段 cust_feats = self._cust_data.set_index("BB_RETAIL_CUSTOMER_CODE") product_feats = self._product_data.set_index("product_code") union_data = union_data.join(cust_feats, on="BB_RETAIL_CUSTOMER_CODE", how="inner") union_data = union_data.join(product_feats, on="product_code", how="inner") return union_data # self._train_data = shuffle(self._train_data, random_state=42) def _calculate_score(self, union_data): """计算联合数据记录的分数""" # 对参与算分的特征值进行归一化 scaler = MinMaxScaler() union_data[list(OrderConfig.WEIGHTS.keys())] = scaler.fit_transform(union_data[list(OrderConfig.WEIGHTS.keys())]) # 计算加权分数 union_data["score"] = sum(union_data[feat] * weight for feat, weight in OrderConfig.WEIGHTS.items()) return union_data def _labeled_data(self, scored_data): """通过计算分数打标签""" # 按品规分组计算中位数 product_medians = scored_data.groupby("product_code")["score"].median().reset_index() product_medians.columns = ["product_code", "median_score"] # 合并中位数到原始订单数据 temp_data = pd.merge(scored_data, product_medians, on="product_code") # 生成标签 (1: 大于等于中位数, 0: 小于中位数) scored_data["label"] = np.where( scored_data["score"] >= temp_data["median_score"], 1, 0 ) scored_data = scored_data.sort_values("score", ascending=False) scored_data = shuffle(scored_data, random_state=42) return scored_data # def _descartes(self): # """将零售户信息与卷烟信息进行笛卡尔积连接""" # self._cust_data["descartes"] = 1 # self._product_data["descartes"] = 1 # self._descartes_data = pd.merge(self._cust_data, self._product_data, on="descartes").drop("descartes", axis=1) # def _labeled_data_from_descartes(self): # """根据order表信息给descartes_data数据打标签""" # # 获取order表中的正样本组合 # order_combinations = self._order_data[["BB_RETAIL_CUSTOMER_CODE", "PRODUCT_CODE"]].drop_duplicates() # order_set = set(zip(order_combinations["BB_RETAIL_CUSTOMER_CODE"], order_combinations["PRODUCT_CODE"])) # # 在descartes_data中打标签:正样本为1,负样本为0 # self._descartes_data['label'] = self._descartes_data.apply( # lambda row: 1 if (row['BB_RETAIL_CUSTOMER_CODE'], row['product_code']) in order_set else 0, axis=1) # def _generate_train_data_from_descartes(self): # """从descartes_data中生成训练数据""" # positive_samples = self._descartes_data[self._descartes_data["label"] == 1] # negative_samples = self._descartes_data[self._descartes_data["label"] == 0] # positive_count = len(positive_samples) # negative_count = min(1 * positive_count, len(negative_samples)) # print(positive_count) # print(negative_count) # # 随机抽取2倍正样本数量的负样本 # negative_samples_sampled = negative_samples.sample(n=negative_count, random_state=42) # # 合并正负样本 # self._train_data = pd.concat([positive_samples, negative_samples_sampled], axis=0) # self._train_data = self._train_data.sample(frac=1, random_state=42).reset_index(drop=True) # # 保存训练数据 # self._train_data.to_csv(self._save_res_path, index=False) if __name__ == '__main__': city_uuid = "00000000000000000000000011445301" # city_uuid = "00000000000000000000000011441801" save_dir = "./data" processor = DataProcess(city_uuid, save_dir) processor.data_process()