preprocess.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. from dao.dao import load_cust_data_from_mysql, load_product_data_from_mysql, load_order_data_from_mysql
  2. from models.rank.data.config import CustConfig, ProductConfig, OrderConfig
  3. import os
  4. import pandas as pd
  5. from sklearn.preprocessing import MinMaxScaler
  6. import numpy as np
  7. class DataProcess():
  8. def __init__(self, city_uuid):
  9. self._save_res_path = "./models/rank/data/gbdt_data.csv"
  10. print("正在加载cust_info...")
  11. self._cust_data = load_cust_data_from_mysql(city_uuid)
  12. print("正在加载product_info...")
  13. self._product_data = load_product_data_from_mysql(city_uuid)
  14. print("正在加载order_info...")
  15. self._order_data = load_order_data_from_mysql(city_uuid)
  16. def data_process(self):
  17. """数据预处理"""
  18. if os.path.exists(self._save_res_path):
  19. os.remove(self._save_res_path)
  20. # 1. 获取指定的特征组合
  21. self._cust_data = self._cust_data[CustConfig.FEATURE_COLUMNS]
  22. self._product_data = self._product_data[ProductConfig.FEATURE_COLUMNS]
  23. self._order_data = self._order_data[OrderConfig.FEATURE_COLUMNS]
  24. # 2. 数据清洗
  25. self._clean_cust_data()
  26. self._clean_product_data()
  27. self._clean_order_data()
  28. # # 3. 将零售户信息表与卷烟信息表进行笛卡尔积连接
  29. # self._descartes()
  30. # # 4. 根据order表中的信息给数据打标签
  31. # self._labeled_data()
  32. # 3. 根据特征权重给order表中的记录打分
  33. self._calculate_score()
  34. # 4. 根据中位数打标签
  35. self.labeled_data_by_score()
  36. # 5. 选取训练样本
  37. self._generate_train_data()
  38. def _clean_cust_data(self):
  39. """用户信息表数据清洗"""
  40. # 根据配置规则清洗数据
  41. for feature, rules, in CustConfig.CLEANING_RULES.items():
  42. if rules["type"] == "num":
  43. # 先将数值型字符串转换为数值
  44. self._cust_data[feature] = pd.to_numeric(self._cust_data[feature], errors="coerce")
  45. if rules["method"] == "fillna":
  46. if rules["opt"] == "fill":
  47. self._cust_data[feature] = self._cust_data[feature].fillna(rules["value"])
  48. elif rules["opt"] == "replace":
  49. self._cust_data[feature] = self._cust_data[feature].fillna(self._cust_data[rules["value"]])
  50. elif rules["opt"] == "mean":
  51. self._cust_data[feature] = self._cust_data[feature].fillna(self._cust_data[feature].mean())
  52. def _clean_product_data(self):
  53. """卷烟信息表数据清洗"""
  54. for feature, rules, in ProductConfig.CLEANING_RULES.items():
  55. if rules["type"] == "num":
  56. self._product_data[feature] = pd.to_numeric(self._product_data[feature], errors="coerce")
  57. if rules["method"] == "fillna":
  58. if rules["opt"] == "fill":
  59. self._product_data[feature] = self._product_data[feature].fillna(rules["value"])
  60. elif rules["opt"] == "mean":
  61. self._product_data[feature] = self._product_data[feature].fillna(self._product_data[feature].mean())
  62. def _clean_order_data(self):
  63. pass
  64. def _calculate_score(self):
  65. """计算order记录的fens"""
  66. self._order_score = self._order_data.copy()
  67. # 对参与算分的特征值进行归一化
  68. scaler = MinMaxScaler()
  69. self._order_score[list(OrderConfig.WEIGHTS.keys())] = scaler.fit_transform(self._order_score[list(OrderConfig.WEIGHTS.keys())])
  70. # 计算加权分数
  71. self._order_score["score"] = sum(self._order_score[feat] * weight
  72. for feat, weight in OrderConfig.WEIGHTS.items())
  73. def labeled_data_by_score(self):
  74. """通过计算分数打标签"""
  75. # 按品规分组计算中位数
  76. product_medians = self._order_score.groupby("PRODUCT_CODE")["score"].median().reset_index()
  77. product_medians.columns = ["PRODUCT_CODE", "median_score"]
  78. # 合并中位数到原始订单数据
  79. self._order_score = pd.merge(self._order_score, product_medians, on="PRODUCT_CODE")
  80. # 生成标签 (1: 大于等于中位数, 0: 小于中位数)
  81. self._order_score["label"] = np.where(
  82. self._order_score["score"] >= self._order_score["median_score"], 1, 0
  83. )
  84. self._order_score = self._order_score.sort_values("score", ascending=False)
  85. self._order_score = self._order_score[["BB_RETAIL_CUSTOMER_CODE", "PRODUCT_CODE", "label"]]
  86. self._order_score.to_csv("./models/rank/data/train.csv")
  87. def _descartes(self):
  88. """将零售户信息与卷烟信息进行笛卡尔积连接"""
  89. self._cust_data["descartes"] = 1
  90. self._product_data["descartes"] = 1
  91. self._descartes_data = pd.merge(self._cust_data, self._product_data, on="descartes").drop("descartes", axis=1)
  92. def _labeled_data(self):
  93. """根据order表信息给descartes_data数据打标签"""
  94. # 获取order表中的正样本组合
  95. order_combinations = self._order_data[["BB_RETAIL_CUSTOMER_CODE", "PRODUCT_CODE"]].drop_duplicates()
  96. order_set = set(zip(order_combinations["BB_RETAIL_CUSTOMER_CODE"], order_combinations["PRODUCT_CODE"]))
  97. # 在descartes_data中打标签:正样本为1,负样本为0
  98. self._descartes_data['label'] = self._descartes_data.apply(
  99. lambda row: 1 if (row['BB_RETAIL_CUSTOMER_CODE'], row['product_code']) in order_set else 0, axis=1)
  100. def _generate_train_data(self):
  101. """从descartes_data中生成训练数据"""
  102. positive_samples = self._descartes_data[self._descartes_data["label"] == 1]
  103. negative_samples = self._descartes_data[self._descartes_data["label"] == 0]
  104. positive_count = len(positive_samples)
  105. negative_count = min(1 * positive_count, len(negative_samples))
  106. print(positive_count)
  107. print(negative_count)
  108. # 随机抽取2倍正样本数量的负样本
  109. negative_samples_sampled = negative_samples.sample(n=negative_count, random_state=42)
  110. # 合并正负样本
  111. self._train_data = pd.concat([positive_samples, negative_samples_sampled], axis=0)
  112. self._train_data = self._train_data.sample(frac=1, random_state=42).reset_index(drop=True)
  113. # 保存训练数据
  114. self._train_data.to_csv(self._save_res_path, index=False)
  115. if __name__ == '__main__':
  116. processor = DataProcess("00000000000000000000000011445301")
  117. processor.data_process()