| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- from database import MySqlDao
- from models.rank.data.config import CustConfig, ProductConfig, OrderConfig, ShopConfig
- import os
- import pandas as pd
- from sklearn.preprocessing import MinMaxScaler
- from sklearn.utils import shuffle
- import numpy as np
- class DataProcess():
- def __init__(self, city_uuid, save_dir):
- self._mysql_dao = MySqlDao()
- self.save_dir = save_dir
- print("gbdr-lr: 正在加载cust_info...")
- self._cust_data = self._mysql_dao.load_cust_data(city_uuid)
- print("gbdr-lr: 正在加载product_info...")
- self._product_data = self._mysql_dao.load_product_data(city_uuid)
- print("gbdr-lr: 正在加载order_info...")
- self._order_data = self._mysql_dao.load_order_data(city_uuid)
- # self._order_data = self._mysql_dao.load_mock_order_data()
- print("gbdr-lr: 正在加载shopping_info...")
- self._shopping_data = self._mysql_dao.load_shopping_data(city_uuid)
-
- def data_process(self):
- """数据预处理"""
- train_data_save_path = os.path.join(self.save_dir, "train_data.csv")
- if os.path.exists(train_data_save_path):
- os.remove(train_data_save_path)
-
- # 1. 获取指定的特征组合
- self._cust_data = self._cust_data[CustConfig.FEATURE_COLUMNS]
- self._product_data = self._product_data[ProductConfig.FEATURE_COLUMNS]
- self._order_data = self._order_data[OrderConfig.FEATURE_COLUMNS]
- self._shopping_data = self._shopping_data[ShopConfig.FEATURE_COLUMNS]
-
- # 2. 数据清洗
- self._clean_cust_data()
- self._clean_product_data()
- self._clean_order_data()
- self._clean_shopping_data()
-
- # 3. 生成训练数据集
- train_data = self._generate_train_data()
- train_data.to_csv(train_data_save_path, index=False, encoding="utf-8")
-
- def _clean_cust_data(self):
- """用户信息表数据清洗"""
- self._cust_data["BB_RETAIL_CUSTOMER_CODE"] = self._cust_data["BB_RETAIL_CUSTOMER_CODE"].astype(str)
- # 根据配置规则清洗数据
- for feature, rules, in CustConfig.CLEANING_RULES.items():
- if rules["type"] == "num":
- # 先将数值型字符串转换为数值
- self._cust_data[feature] = pd.to_numeric(self._cust_data[feature], errors="coerce")
-
- if rules["method"] == "fillna":
- if rules["opt"] == "fill":
- self._cust_data[feature] = self._cust_data[feature].fillna(rules["value"]).infer_objects(copy=False)
- elif rules["opt"] == "replace":
- self._cust_data[feature] = self._cust_data[feature].fillna(self._cust_data[rules["value"]]).infer_objects(copy=False)
- elif rules["opt"] == "mean":
- self._cust_data[feature] = self._cust_data[feature].fillna(self._cust_data[feature].mean()).infer_objects(copy=False)
- self._cust_data[feature] = self._cust_data[feature].infer_objects(copy=False)
-
- def _clean_product_data(self):
- """卷烟信息表数据清洗"""
- self._product_data["product_code"] = self._product_data["product_code"].astype(str)
- for feature, rules, in ProductConfig.CLEANING_RULES.items():
- if rules["type"] == "num":
- self._product_data[feature] = pd.to_numeric(self._product_data[feature], errors="coerce")
-
- if rules["method"] == "fillna":
- if rules["opt"] == "fill":
- self._product_data[feature] = self._product_data[feature].fillna(rules["value"]).infer_objects(copy=False)
- elif rules["opt"] == "mean":
- self._product_data[feature] = self._product_data[feature].fillna(self._product_data[feature].mean()).infer_objects(copy=False)
- self._product_data[feature] = self._product_data[feature].infer_objects(copy=False)
-
- def _clean_order_data(self):
- self._order_data["cust_code"] = self._order_data["cust_code"].astype(str)
- self._order_data["product_code"] = self._order_data["product_code"].astype(str)
-
- # self._order_data[order_cols.drop(col_all_missing)] = self._order_data[order_cols.drop(col_all_missing)].fillna(0)
- self._order_data["sale_qty"] = self._order_data["sale_qty"].fillna(0)
- self._order_data = self._order_data.infer_objects(copy=False)
-
- # 将销售量进行分组求和
- self._order_data = self._order_data.groupby(["cust_code", "product_code"], as_index=False)["sale_qty"].sum()
-
-
- def _clean_shopping_data(self):
- """处理商圈数据缺省值"""
- self._shopping_data["cust_code"] = self._shopping_data["cust_code"].astype(str)
- # 根据配置规则清洗数据
- for feature, rules, in ShopConfig.CLEANING_RULES.items():
- if rules["type"] == "num":
- # 先将数值型字符串转换为数值
- self._shopping_data[feature] = pd.to_numeric(self._shopping_data[feature], errors="coerce")
-
- if rules["method"] == "fillna":
- if rules["opt"] == "fill":
- self._shopping_data[feature] = self._shopping_data[feature].fillna(rules["value"]).infer_objects(copy=False)
- elif rules["opt"] == "replace":
- self._shopping_data[feature] = self._shopping_data[feature].fillna(self._shopping_data[rules["value"]]).infer_objects(copy=False)
- elif rules["opt"] == "mean":
- self._shopping_data[feature] = self._shopping_data[feature].fillna(self._shopping_data[feature].mean()).infer_objects(copy=False)
- self._shopping_data[feature] = self._shopping_data[feature].infer_objects(copy=False)
-
- def _generate_train_data(self):
- """生成训练数据"""
- # 将商户表与商圈表进行连接
- cust_feats = self._shopping_data.set_index("cust_code")
- self._cust_data = self._cust_data.join(cust_feats, on="BB_RETAIL_CUSTOMER_CODE", how="inner")
-
- union_data = self._union_order_cust_product()
-
- train_data = self._labeled_data(union_data)
-
- return train_data
-
- def _union_order_cust_product(self):
- """联合order表、商户表、卷烟表"""
- # 使用merge进行连接
- union_data = self._order_data.merge(self._product_data, on="product_code", how="inner")
- union_data = union_data.merge(self._cust_data, left_on='cust_code', right_on='BB_RETAIL_CUSTOMER_CODE', how="inner")
- union_data = union_data.drop(columns=['BB_RETAIL_CUSTOMER_CODE'])
-
- return union_data
-
- def _labeled_data(self, union_data):
- union_data['label'] = union_data['sale_qty'].apply(lambda x: 0 if x == 0 else 1)
- train_data = union_data.drop(columns=['sale_qty'])
- train_data = shuffle(train_data, random_state=42)
-
- return train_data
-
- if __name__ == '__main__':
- city_uuid = "00000000000000000000000011445301"
- # city_uuid = "00000000000000000000000011441801"
- save_dir = "./data"
- processor = DataProcess(city_uuid, save_dir)
- processor.data_process()
|