preprocess.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. from database import MySqlDao
  2. from models.rank.data.config import CustConfig, ProductConfig, OrderConfig
  3. import os
  4. import pandas as pd
  5. from sklearn.preprocessing import MinMaxScaler
  6. from sklearn.utils import shuffle
  7. import numpy as np
  8. class DataProcess():
  9. def __init__(self, city_uuid, save_dir):
  10. self._mysql_dao = MySqlDao()
  11. self.save_dir = save_dir
  12. print("正在加载cust_info...")
  13. self._cust_data = self._mysql_dao.load_cust_data(city_uuid)
  14. print("正在加载product_info...")
  15. self._product_data = self._mysql_dao.load_product_data(city_uuid)
  16. print("正在加载order_info...")
  17. self._order_data = self._mysql_dao.load_order_data(city_uuid)
  18. # self._order_data = self._mysql_dao.load_mock_order_data()
  19. print("正在加载shopping_info...")
  20. self._shopping_data = self._mysql_dao.load_shopping_data(city_uuid)
  21. def data_process(self):
  22. """数据预处理"""
  23. ori_train_data_save_path = os.path.join(self.save_dir, "original_train_data.csv")
  24. pos_train_data_save_path = os.path.join(self.save_dir, "pos_train_data.csv")
  25. shopping_train_data_save_path = os.path.join(self.save_dir, "shopping_train_data.csv")
  26. if os.path.exists(ori_train_data_save_path):
  27. os.remove(ori_train_data_save_path)
  28. if os.path.exists(pos_train_data_save_path):
  29. os.remove(pos_train_data_save_path)
  30. if os.path.exists(shopping_train_data_save_path):
  31. os.remove(shopping_train_data_save_path)
  32. # 1. 获取指定的特征组合
  33. self._cust_data = self._cust_data[CustConfig.FEATURE_COLUMNS]
  34. self._product_data = self._product_data[ProductConfig.FEATURE_COLUMNS]
  35. self._order_data = self._order_data[OrderConfig.FEATURE_COLUMNS]
  36. # 2. 数据清洗
  37. self._clean_cust_data()
  38. self._clean_product_data()
  39. self._clean_order_data()
  40. self._clean_shopping_data()
  41. # 3. 生成训练数据集
  42. ori_train_data = self._generate_original_train_data(is_pos=False)
  43. shopping_train_data = self._generate_shopping_train_data()
  44. pos_train_data = self._generate_pos_train_data()
  45. ori_train_data.to_csv(ori_train_data_save_path, index=False)
  46. shopping_train_data.to_csv(shopping_train_data_save_path, index=False)
  47. pos_train_data.to_csv(pos_train_data_save_path, index=False)
  48. def _clean_cust_data(self):
  49. """用户信息表数据清洗"""
  50. self._cust_data["BB_RETAIL_CUSTOMER_CODE"] = self._cust_data["BB_RETAIL_CUSTOMER_CODE"].astype(str)
  51. # 根据配置规则清洗数据
  52. for feature, rules, in CustConfig.CLEANING_RULES.items():
  53. if rules["type"] == "num":
  54. # 先将数值型字符串转换为数值
  55. self._cust_data[feature] = pd.to_numeric(self._cust_data[feature], errors="coerce")
  56. if rules["method"] == "fillna":
  57. if rules["opt"] == "fill":
  58. self._cust_data[feature] = self._cust_data[feature].fillna(rules["value"]).infer_objects(copy=False)
  59. elif rules["opt"] == "replace":
  60. self._cust_data[feature] = self._cust_data[feature].fillna(self._cust_data[rules["value"]]).infer_objects(copy=False)
  61. elif rules["opt"] == "mean":
  62. self._cust_data[feature] = self._cust_data[feature].fillna(self._cust_data[feature].mean()).infer_objects(copy=False)
  63. self._cust_data[feature] = self._cust_data[feature].infer_objects(copy=False)
  64. def _clean_product_data(self):
  65. """卷烟信息表数据清洗"""
  66. self._product_data["product_code"] = self._product_data["product_code"].astype(str)
  67. for feature, rules, in ProductConfig.CLEANING_RULES.items():
  68. if rules["type"] == "num":
  69. self._product_data[feature] = pd.to_numeric(self._product_data[feature], errors="coerce")
  70. if rules["method"] == "fillna":
  71. if rules["opt"] == "fill":
  72. self._product_data[feature] = self._product_data[feature].fillna(rules["value"]).infer_objects(copy=False)
  73. elif rules["opt"] == "mean":
  74. self._product_data[feature] = self._product_data[feature].fillna(self._product_data[feature].mean()).infer_objects(copy=False)
  75. self._product_data[feature] = self._product_data[feature].infer_objects(copy=False)
  76. def _clean_order_data(self):
  77. remaining_cols = self._order_data.columns.drop(OrderConfig.POSFEATURES) # 数据清洗时先不对pos数据做处理
  78. col_all_missing = remaining_cols[self._order_data[remaining_cols].isnull().all()].to_list()
  79. self._order_data.drop(columns=col_all_missing, inplace=True)
  80. # 去除重复值和填补缺失值
  81. self._order_data.drop_duplicates(inplace=True)
  82. self._order_data[remaining_cols.drop(col_all_missing)] = self._order_data[remaining_cols.drop(col_all_missing)].fillna(0)
  83. self._order_data = self._order_data.infer_objects(copy=False)
  84. def _clean_shopping_data(self):
  85. """处理商圈数据缺省值"""
  86. self._shopping_data.drop(columns=["cust_uuid", "longitude", "latitude", "range_radius"], axis=1, inplace=True)
  87. remaining_cols = self._shopping_data.columns.drop(["city_uuid", "cust_code"])
  88. col_with_missing = remaining_cols[self._shopping_data[remaining_cols].isnull().any()].tolist() # 判断有缺失的字段
  89. col_all_missing = remaining_cols[self._shopping_data[remaining_cols].isnull().all()].to_list() # 全部缺失的字段
  90. col_partial_missing = list(set(col_with_missing) - set(col_all_missing)) # 部分缺失的字段
  91. for col in col_partial_missing:
  92. self._shopping_data[col] = self._shopping_data[col].fillna(self._shopping_data[col].mean())
  93. for col in col_all_missing:
  94. self._shopping_data[col] = self._shopping_data[col].fillna(0).infer_objects(copy=False)
  95. def _generate_original_train_data(self, is_pos):
  96. union_data = self._union_order_cust_product(is_pos)
  97. scored_data = self._calculate_score(union_data)
  98. labeled_data = self._labeled_data(scored_data)
  99. # labeled_data.to_csv(save_path, index=False)
  100. return labeled_data
  101. def _generate_pos_train_data(self):
  102. pos_data = self._generate_original_train_data(is_pos=True)
  103. pos_data.dropna(subset=['YLT_TURNOVER_RATE'], inplace=True)
  104. pos_data[OrderConfig.POSFEATURES] = pos_data[OrderConfig.POSFEATURES].fillna(0)
  105. pos_data = pos_data.infer_objects(copy=False)
  106. return pos_data
  107. def _generate_shopping_train_data(self):
  108. orignal_data = self._generate_original_train_data(is_pos=False)
  109. cust_feats = self._shopping_data.set_index("cust_code")
  110. shopping_train_data = orignal_data.join(cust_feats, on="BB_RETAIL_CUSTOMER_CODE", how="inner")
  111. return shopping_train_data
  112. def _union_order_cust_product(self, is_pos):
  113. """联合order表、商户表、卷烟表"""
  114. union_data = self._order_data.copy()
  115. if not is_pos:
  116. union_data.drop(OrderConfig.POSFEATURES, axis=1, inplace=True)
  117. union_data.rename(columns={"PRODUCT_CODE": "product_code"}, inplace=True)
  118. # union_data = union_data.drop(OrderConfig.POSFEATURES) # 去除pos数据特征字段
  119. cust_feats = self._cust_data.set_index("BB_RETAIL_CUSTOMER_CODE")
  120. product_feats = self._product_data.set_index("product_code")
  121. union_data = union_data.join(cust_feats, on="BB_RETAIL_CUSTOMER_CODE", how="inner")
  122. union_data = union_data.join(product_feats, on="product_code", how="inner")
  123. return union_data
  124. # self._train_data = shuffle(self._train_data, random_state=42)
  125. def _calculate_score(self, union_data):
  126. """计算联合数据记录的分数"""
  127. # 对参与算分的特征值进行归一化
  128. scaler = MinMaxScaler()
  129. union_data[list(OrderConfig.WEIGHTS.keys())] = scaler.fit_transform(union_data[list(OrderConfig.WEIGHTS.keys())])
  130. # 计算加权分数
  131. union_data["score"] = sum(union_data[feat] * weight
  132. for feat, weight in OrderConfig.WEIGHTS.items())
  133. return union_data
  134. def _labeled_data(self, scored_data):
  135. """通过计算分数打标签"""
  136. # 按品规分组计算中位数
  137. product_medians = scored_data.groupby("product_code")["score"].median().reset_index()
  138. product_medians.columns = ["product_code", "median_score"]
  139. # 合并中位数到原始订单数据
  140. temp_data = pd.merge(scored_data, product_medians, on="product_code", how="left")
  141. # 生成标签 (1: 大于等于中位数, 0: 小于中位数)
  142. temp_data["label"] = np.where(
  143. temp_data["score"] >= temp_data["median_score"], 1, 0
  144. )
  145. temp_data = temp_data.sort_values("score", ascending=False)
  146. temp_data.drop(columns=["median_score", "score"], inplace=True)
  147. scored_data = shuffle(temp_data, random_state=42)
  148. return scored_data
  149. # def _descartes(self):
  150. # """将零售户信息与卷烟信息进行笛卡尔积连接"""
  151. # self._cust_data["descartes"] = 1
  152. # self._product_data["descartes"] = 1
  153. # self._descartes_data = pd.merge(self._cust_data, self._product_data, on="descartes").drop("descartes", axis=1)
  154. # def _labeled_data_from_descartes(self):
  155. # """根据order表信息给descartes_data数据打标签"""
  156. # # 获取order表中的正样本组合
  157. # order_combinations = self._order_data[["BB_RETAIL_CUSTOMER_CODE", "PRODUCT_CODE"]].drop_duplicates()
  158. # order_set = set(zip(order_combinations["BB_RETAIL_CUSTOMER_CODE"], order_combinations["PRODUCT_CODE"]))
  159. # # 在descartes_data中打标签:正样本为1,负样本为0
  160. # self._descartes_data['label'] = self._descartes_data.apply(
  161. # lambda row: 1 if (row['BB_RETAIL_CUSTOMER_CODE'], row['product_code']) in order_set else 0, axis=1)
  162. # def _generate_train_data_from_descartes(self):
  163. # """从descartes_data中生成训练数据"""
  164. # positive_samples = self._descartes_data[self._descartes_data["label"] == 1]
  165. # negative_samples = self._descartes_data[self._descartes_data["label"] == 0]
  166. # positive_count = len(positive_samples)
  167. # negative_count = min(1 * positive_count, len(negative_samples))
  168. # print(positive_count)
  169. # print(negative_count)
  170. # # 随机抽取2倍正样本数量的负样本
  171. # negative_samples_sampled = negative_samples.sample(n=negative_count, random_state=42)
  172. # # 合并正负样本
  173. # self._train_data = pd.concat([positive_samples, negative_samples_sampled], axis=0)
  174. # self._train_data = self._train_data.sample(frac=1, random_state=42).reset_index(drop=True)
  175. # # 保存训练数据
  176. # self._train_data.to_csv(self._save_res_path, index=False)
  177. if __name__ == '__main__':
  178. city_uuid = "00000000000000000000000011445301"
  179. # city_uuid = "00000000000000000000000011441801"
  180. save_dir = "./data"
  181. processor = DataProcess(city_uuid, save_dir)
  182. processor.data_process()