from database import MySqlDao from models.rank.data.config import CustConfig, ProductConfig, OrderConfig, ShopConfig import os import pandas as pd from sklearn.preprocessing import MinMaxScaler from sklearn.utils import shuffle import numpy as np class DataProcess(): def __init__(self, city_uuid, save_dir): self._mysql_dao = MySqlDao() self.save_dir = save_dir print("gbdr-lr: 正在加载cust_info...") self._cust_data = self._mysql_dao.load_cust_data(city_uuid) print("gbdr-lr: 正在加载product_info...") self._product_data = self._mysql_dao.load_product_data(city_uuid) print("gbdr-lr: 正在加载order_info...") self._order_data = self._mysql_dao.load_order_data(city_uuid) # self._order_data = self._mysql_dao.load_mock_order_data() print("gbdr-lr: 正在加载shopping_info...") self._shopping_data = self._mysql_dao.load_shopping_data(city_uuid) def data_process(self): """数据预处理""" train_data_save_path = os.path.join(self.save_dir, "train_data.csv") if os.path.exists(train_data_save_path): os.remove(train_data_save_path) # 1. 获取指定的特征组合 self._cust_data = self._cust_data[CustConfig.FEATURE_COLUMNS] self._product_data = self._product_data[ProductConfig.FEATURE_COLUMNS] self._order_data = self._order_data[OrderConfig.FEATURE_COLUMNS] self._shopping_data = self._shopping_data[ShopConfig.FEATURE_COLUMNS] # 2. 数据清洗 self._clean_cust_data() self._clean_product_data() self._clean_order_data() self._clean_shopping_data() # 3. 生成训练数据集 train_data = self._generate_train_data() train_data.to_csv(train_data_save_path, index=False, encoding="utf-8") def _clean_cust_data(self): """用户信息表数据清洗""" self._cust_data["BB_RETAIL_CUSTOMER_CODE"] = self._cust_data["BB_RETAIL_CUSTOMER_CODE"].astype(str) # 根据配置规则清洗数据 for feature, rules, in CustConfig.CLEANING_RULES.items(): if rules["type"] == "num": # 先将数值型字符串转换为数值 self._cust_data[feature] = pd.to_numeric(self._cust_data[feature], errors="coerce") if rules["method"] == "fillna": if rules["opt"] == "fill": self._cust_data[feature] = self._cust_data[feature].fillna(rules["value"]).infer_objects(copy=False) elif rules["opt"] == "replace": self._cust_data[feature] = self._cust_data[feature].fillna(self._cust_data[rules["value"]]).infer_objects(copy=False) elif rules["opt"] == "mean": self._cust_data[feature] = self._cust_data[feature].fillna(self._cust_data[feature].mean()).infer_objects(copy=False) self._cust_data[feature] = self._cust_data[feature].infer_objects(copy=False) def _clean_product_data(self): """卷烟信息表数据清洗""" self._product_data["product_code"] = self._product_data["product_code"].astype(str) for feature, rules, in ProductConfig.CLEANING_RULES.items(): if rules["type"] == "num": self._product_data[feature] = pd.to_numeric(self._product_data[feature], errors="coerce") if rules["method"] == "fillna": if rules["opt"] == "fill": self._product_data[feature] = self._product_data[feature].fillna(rules["value"]).infer_objects(copy=False) elif rules["opt"] == "mean": self._product_data[feature] = self._product_data[feature].fillna(self._product_data[feature].mean()).infer_objects(copy=False) self._product_data[feature] = self._product_data[feature].infer_objects(copy=False) def _clean_order_data(self): self._order_data["cust_code"] = self._order_data["cust_code"].astype(str) self._order_data["product_code"] = self._order_data["product_code"].astype(str) # self._order_data[order_cols.drop(col_all_missing)] = self._order_data[order_cols.drop(col_all_missing)].fillna(0) self._order_data["sale_qty"] = self._order_data["sale_qty"].fillna(0) self._order_data = self._order_data.infer_objects(copy=False) # 将销售量进行分组求月平均销售额 self._order_data = self._order_data.groupby(["cust_code", "product_code"], as_index=False)["sale_qty"].mean() def _clean_shopping_data(self): """处理商圈数据缺省值""" self._shopping_data["cust_code"] = self._shopping_data["cust_code"].astype(str) # 根据配置规则清洗数据 for feature, rules, in ShopConfig.CLEANING_RULES.items(): if rules["type"] == "num": # 先将数值型字符串转换为数值 self._shopping_data[feature] = pd.to_numeric(self._shopping_data[feature], errors="coerce") if rules["method"] == "fillna": if rules["opt"] == "fill": self._shopping_data[feature] = self._shopping_data[feature].fillna(rules["value"]).infer_objects(copy=False) elif rules["opt"] == "replace": self._shopping_data[feature] = self._shopping_data[feature].fillna(self._shopping_data[rules["value"]]).infer_objects(copy=False) elif rules["opt"] == "mean": self._shopping_data[feature] = self._shopping_data[feature].fillna(self._shopping_data[feature].mean()).infer_objects(copy=False) self._shopping_data[feature] = self._shopping_data[feature].infer_objects(copy=False) def _generate_train_data(self): """生成训练数据""" # 将商户表与商圈表进行连接 cust_feats = self._shopping_data.set_index("cust_code") self._cust_data = self._cust_data.join(cust_feats, on="BB_RETAIL_CUSTOMER_CODE", how="inner") union_data = self._union_order_cust_product() train_data = self._labeled_data(union_data) return train_data def _union_order_cust_product(self): """联合order表、商户表、卷烟表""" # 使用merge进行连接 union_data = self._order_data.merge(self._product_data, on="product_code", how="inner") union_data = union_data.merge(self._cust_data, left_on='cust_code', right_on='BB_RETAIL_CUSTOMER_CODE', how="inner") union_data = union_data.drop(columns=['BB_RETAIL_CUSTOMER_CODE']) return union_data def _labeled_data(self, union_data): union_data['label'] = union_data['sale_qty'].apply(lambda x: 0 if x == 0 else 1) train_data = union_data.drop(columns=['sale_qty']) train_data = shuffle(train_data, random_state=42) return train_data if __name__ == '__main__': city_uuid = "00000000000000000000000011445301" # city_uuid = "00000000000000000000000011441801" save_dir = "./data" processor = DataProcess(city_uuid, save_dir) processor.data_process()