from database import MySqlDao from models.rank.data.config import CustConfig, ProductConfig, OrderConfig import os import pandas as pd from sklearn.preprocessing import MinMaxScaler from sklearn.utils import shuffle import numpy as np class DataProcess(): def __init__(self, city_uuid, save_dir): self._mysql_dao = MySqlDao() self.save_dir = save_dir print("gbdr-lr: 正在加载cust_info...") self._cust_data = self._mysql_dao.load_cust_data(city_uuid) print("gbdr-lr: 正在加载product_info...") self._product_data = self._mysql_dao.load_product_data(city_uuid) print("gbdr-lr: 正在加载order_info...") self._order_data = self._mysql_dao.load_order_data(city_uuid) # self._order_data = self._mysql_dao.load_mock_order_data() # print("gbdr-lr: 正在加载shopping_info...") # self._shopping_data = self._mysql_dao.load_shopping_data(city_uuid) def data_process(self): """数据预处理""" train_data_save_path = os.path.join(self.save_dir, "train_data.csv") if os.path.exists(train_data_save_path): os.remove(train_data_save_path) # 1. 获取指定的特征组合 self._cust_data = self._cust_data[CustConfig.FEATURE_COLUMNS] self._product_data = self._product_data[ProductConfig.FEATURE_COLUMNS] self._order_data = self._order_data[OrderConfig.FEATURE_COLUMNS] # self._shopping_data = self._shopping_data[ShopConfig.FEATURE_COLUMNS] # 2. 数据清洗 self._clean_cust_data() self._clean_product_data() self._clean_order_data() # self._clean_shopping_data() # 3. 生成训练数据集 train_data = self._generate_train_data() train_data.to_csv(train_data_save_path, index=False, encoding="utf-8") def _clean_cust_data(self): """用户信息表数据清洗""" self._cust_data["cust_code"] = self._cust_data["cust_code"].astype(str) # 根据配置规则清洗数据 for feature, rules, in CustConfig.CLEANING_RULES.items(): if rules["type"] == "num": # 先将数值型字符串转换为数值 self._cust_data[feature] = pd.to_numeric(self._cust_data[feature], errors="coerce") if rules["method"] == "fillna": if rules["opt"] == "fill": self._cust_data[feature] = self._cust_data[feature].fillna(rules["value"]).infer_objects(copy=False) elif rules["opt"] == "replace": self._cust_data[feature] = self._cust_data[feature].fillna(self._cust_data[rules["value"]]).infer_objects(copy=False) elif rules["opt"] == "mean": self._cust_data[feature] = self._cust_data[feature].fillna(self._cust_data[feature].mean()).infer_objects(copy=False) self._cust_data[feature] = self._cust_data[feature].infer_objects(copy=False) def _clean_product_data(self): """卷烟信息表数据清洗""" self._product_data["product_code"] = self._product_data["product_code"].astype(str) for feature, rules, in ProductConfig.CLEANING_RULES.items(): if rules["type"] == "num": self._product_data[feature] = pd.to_numeric(self._product_data[feature], errors="coerce") if rules["method"] == "fillna": if rules["opt"] == "fill": self._product_data[feature] = self._product_data[feature].fillna(rules["value"]).infer_objects(copy=False) elif rules["opt"] == "mean": self._product_data[feature] = self._product_data[feature].fillna(self._product_data[feature].mean()).infer_objects(copy=False) self._product_data[feature] = self._product_data[feature].infer_objects(copy=False) def _clean_order_data(self): self._order_data["cust_code"] = self._order_data["cust_code"].astype(str) self._order_data["product_code"] = self._order_data["product_code"].astype(str) # self._order_data[order_cols.drop(col_all_missing)] = self._order_data[order_cols.drop(col_all_missing)].fillna(0) self._order_data["sale_qty"] = self._order_data["sale_qty"].fillna(0) self._order_data = self._order_data.infer_objects(copy=False) # 将销售量进行分组求月平均销售额 self._order_data = self._order_data.groupby(["cust_code", "product_code"], as_index=False)["sale_qty"].mean() # def _clean_shopping_data(self): # """处理商圈数据缺省值""" # self._shopping_data["cust_code"] = self._shopping_data["cust_code"].astype(str) # # 根据配置规则清洗数据 # for feature, rules, in ShopConfig.CLEANING_RULES.items(): # if rules["type"] == "num": # # 先将数值型字符串转换为数值 # self._shopping_data[feature] = pd.to_numeric(self._shopping_data[feature], errors="coerce") # if rules["method"] == "fillna": # if rules["opt"] == "fill": # self._shopping_data[feature] = self._shopping_data[feature].fillna(rules["value"]).infer_objects(copy=False) # elif rules["opt"] == "replace": # self._shopping_data[feature] = self._shopping_data[feature].fillna(self._shopping_data[rules["value"]]).infer_objects(copy=False) # elif rules["opt"] == "mean": # self._shopping_data[feature] = self._shopping_data[feature].fillna(self._shopping_data[feature].mean()).infer_objects(copy=False) # self._shopping_data[feature] = self._shopping_data[feature].infer_objects(copy=False) def _generate_train_data(self): """生成训练数据""" # # 将商户表与商圈表进行连接 # cust_feats = self._shopping_data.set_index("cust_code") # self._cust_data = self._cust_data.join(cust_feats, on="BB_RETAIL_CUSTOMER_CODE", how="inner") union_data = self._union_order_cust_product() train_data = self._labeled_data(union_data) return train_data def _union_order_cust_product(self): """联合order表、商户表、卷烟表""" # 使用merge进行连接 union_data = self._order_data.merge(self._product_data, on="product_code", how="inner") union_data = union_data.merge(self._cust_data, left_on='cust_code', right_on='cust_code', how="inner") # union_data = union_data.drop(columns=['cust_code']) return union_data def _labeled_data(self, union_data): union_data['label'] = union_data['sale_qty'].apply(lambda x: 0 if x == 0 else 1) train_data = union_data.drop(columns=['sale_qty']) train_data = shuffle(train_data, random_state=42) return train_data if __name__ == '__main__': city_uuid = "00000000000000000000000011445301" # city_uuid = "00000000000000000000000011441801" save_dir = "./data" processor = DataProcess(city_uuid, save_dir) processor.data_process()