preprocess.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. from dao.dao import load_cust_data_from_mysql, load_product_data_from_mysql, load_order_data_from_mysql
  2. from models.rank.data.config import CustConfig, ProductConfig, OrderConfig
  3. import os
  4. import pandas as pd
  5. from sklearn.preprocessing import MinMaxScaler
  6. from sklearn.utils import shuffle
  7. import numpy as np
  8. class DataProcess():
  9. def __init__(self, city_uuid):
  10. self._save_res_path = "./models/rank/data/gbdt_data.csv"
  11. print("正在加载cust_info...")
  12. self._cust_data = load_cust_data_from_mysql(city_uuid)
  13. print("正在加载product_info...")
  14. self._product_data = load_product_data_from_mysql(city_uuid)
  15. print("正在加载order_info...")
  16. self._order_data = load_order_data_from_mysql(city_uuid)
  17. def data_process(self):
  18. """数据预处理"""
  19. if os.path.exists(self._save_res_path):
  20. os.remove(self._save_res_path)
  21. # 1. 获取指定的特征组合
  22. self._cust_data = self._cust_data[CustConfig.FEATURE_COLUMNS]
  23. self._product_data = self._product_data[ProductConfig.FEATURE_COLUMNS]
  24. self._order_data = self._order_data[OrderConfig.FEATURE_COLUMNS]
  25. # 2. 数据清洗
  26. self._clean_cust_data()
  27. self._clean_product_data()
  28. self._clean_order_data()
  29. # # 3. 将零售户信息表与卷烟信息表进行笛卡尔积连接
  30. # self._descartes()
  31. # # 4. 根据order表中的信息给数据打标签
  32. # self._labeled_data()
  33. # 3. 根据特征权重给order表中的记录打分
  34. self._calculate_score()
  35. # 4. 根据中位数打标签
  36. self.labeled_data()
  37. # 5. 选取训练样本
  38. self._generate_train_data()
  39. def _clean_cust_data(self):
  40. """用户信息表数据清洗"""
  41. # 根据配置规则清洗数据
  42. for feature, rules, in CustConfig.CLEANING_RULES.items():
  43. if rules["type"] == "num":
  44. # 先将数值型字符串转换为数值
  45. self._cust_data[feature] = pd.to_numeric(self._cust_data[feature], errors="coerce")
  46. if rules["method"] == "fillna":
  47. if rules["opt"] == "fill":
  48. self._cust_data[feature] = self._cust_data[feature].fillna(rules["value"])
  49. elif rules["opt"] == "replace":
  50. self._cust_data[feature] = self._cust_data[feature].fillna(self._cust_data[rules["value"]])
  51. elif rules["opt"] == "mean":
  52. self._cust_data[feature] = self._cust_data[feature].fillna(self._cust_data[feature].mean())
  53. self._cust_data[feature] = self._cust_data[feature].infer_objects(copy=False)
  54. def _clean_product_data(self):
  55. """卷烟信息表数据清洗"""
  56. for feature, rules, in ProductConfig.CLEANING_RULES.items():
  57. if rules["type"] == "num":
  58. self._product_data[feature] = pd.to_numeric(self._product_data[feature], errors="coerce")
  59. if rules["method"] == "fillna":
  60. if rules["opt"] == "fill":
  61. self._product_data[feature] = self._product_data[feature].fillna(rules["value"])
  62. elif rules["opt"] == "mean":
  63. self._product_data[feature] = self._product_data[feature].fillna(self._product_data[feature].mean())
  64. self._product_data[feature] = self._product_data[feature].infer_objects(copy=False)
  65. def _clean_order_data(self):
  66. pass
  67. def _calculate_score(self):
  68. """计算order记录的fens"""
  69. self._order_score = self._order_data.copy()
  70. # 对参与算分的特征值进行归一化
  71. scaler = MinMaxScaler()
  72. self._order_score[list(OrderConfig.WEIGHTS.keys())] = scaler.fit_transform(self._order_score[list(OrderConfig.WEIGHTS.keys())])
  73. # 计算加权分数
  74. self._order_score["score"] = sum(self._order_score[feat] * weight
  75. for feat, weight in OrderConfig.WEIGHTS.items())
  76. def labeled_data(self):
  77. """通过计算分数打标签"""
  78. # 按品规分组计算中位数
  79. product_medians = self._order_score.groupby("PRODUCT_CODE")["score"].median().reset_index()
  80. product_medians.columns = ["PRODUCT_CODE", "median_score"]
  81. # 合并中位数到原始订单数据
  82. self._order_score = pd.merge(self._order_score, product_medians, on="PRODUCT_CODE")
  83. # 生成标签 (1: 大于等于中位数, 0: 小于中位数)
  84. self._order_score["label"] = np.where(
  85. self._order_score["score"] >= self._order_score["median_score"], 1, 0
  86. )
  87. self._order_score = self._order_score.sort_values("score", ascending=False)
  88. self._order_score = self._order_score[["BB_RETAIL_CUSTOMER_CODE", "PRODUCT_CODE", "label"]]
  89. self._order_score.rename(columns={"PRODUCT_CODE": "product_code"}, inplace=True)
  90. def _generate_train_data(self):
  91. cust_feats = self._cust_data.set_index("BB_RETAIL_CUSTOMER_CODE")
  92. product_feats = self._product_data.set_index("product_code")
  93. self._train_data = self._order_score.copy()
  94. self._train_data = self._train_data.join(cust_feats, on="BB_RETAIL_CUSTOMER_CODE", how="left")
  95. self._train_data = self._train_data.join(product_feats, on="product_code", how="left")
  96. self._train_data = shuffle(self._train_data, random_state=42)
  97. self._train_data.to_csv(self._save_res_path, index=False)
  98. def _descartes(self):
  99. """将零售户信息与卷烟信息进行笛卡尔积连接"""
  100. self._cust_data["descartes"] = 1
  101. self._product_data["descartes"] = 1
  102. self._descartes_data = pd.merge(self._cust_data, self._product_data, on="descartes").drop("descartes", axis=1)
  103. def _labeled_data_from_descartes(self):
  104. """根据order表信息给descartes_data数据打标签"""
  105. # 获取order表中的正样本组合
  106. order_combinations = self._order_data[["BB_RETAIL_CUSTOMER_CODE", "PRODUCT_CODE"]].drop_duplicates()
  107. order_set = set(zip(order_combinations["BB_RETAIL_CUSTOMER_CODE"], order_combinations["PRODUCT_CODE"]))
  108. # 在descartes_data中打标签:正样本为1,负样本为0
  109. self._descartes_data['label'] = self._descartes_data.apply(
  110. lambda row: 1 if (row['BB_RETAIL_CUSTOMER_CODE'], row['product_code']) in order_set else 0, axis=1)
  111. def _generate_train_data_from_descartes(self):
  112. """从descartes_data中生成训练数据"""
  113. positive_samples = self._descartes_data[self._descartes_data["label"] == 1]
  114. negative_samples = self._descartes_data[self._descartes_data["label"] == 0]
  115. positive_count = len(positive_samples)
  116. negative_count = min(1 * positive_count, len(negative_samples))
  117. print(positive_count)
  118. print(negative_count)
  119. # 随机抽取2倍正样本数量的负样本
  120. negative_samples_sampled = negative_samples.sample(n=negative_count, random_state=42)
  121. # 合并正负样本
  122. self._train_data = pd.concat([positive_samples, negative_samples_sampled], axis=0)
  123. self._train_data = self._train_data.sample(frac=1, random_state=42).reset_index(drop=True)
  124. # 保存训练数据
  125. self._train_data.to_csv(self._save_res_path, index=False)
  126. if __name__ == '__main__':
  127. processor = DataProcess("00000000000000000000000011445301")
  128. processor.data_process()