preprocess.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. from dao.dao import load_cust_data_from_mysql, load_product_data_from_mysql, load_order_data_from_mysql
  2. from models.rank.data.config import CustConfig, ProductConfig
  3. import os
  4. import pandas as pd
  5. class DataProcess():
  6. def __init__(self, city_uuid):
  7. self._save_res_path = "./models/rank/data/gbdt_data.csv"
  8. print("正在加载cust_info...")
  9. self._cust_data = load_cust_data_from_mysql(city_uuid)
  10. print("正在加载product_info...")
  11. self._product_data = load_product_data_from_mysql(city_uuid)
  12. print("正在加载order_info...")
  13. self._order_data = load_order_data_from_mysql(city_uuid)
  14. def data_process(self):
  15. """数据预处理"""
  16. if os.path.exists(self._save_res_path):
  17. os.remove(self._save_res_path)
  18. # 1. 获取指定的特征组合
  19. self._cust_data = self._cust_data[CustConfig.FEATURE_COLUMNS]
  20. self._product_data = self._product_data[ProductConfig.FEATURE_COLUMNS]
  21. # 2. 数据清洗
  22. self._clean_cust_data()
  23. self._clean_product_data()
  24. # 3. 将零售户信息表与卷烟信息表进行笛卡尔积连接
  25. self._descartes()
  26. # 4. 根据order表中的信息给数据打标签
  27. self._labeled_data()
  28. # 5. 选取训练样本
  29. self._generate_train_data()
  30. def _clean_cust_data(self):
  31. """用户信息表数据清洗"""
  32. # 根据配置规则清洗数据
  33. for feature, rules, in CustConfig.CLEANING_RULES.items():
  34. if rules["type"] == "num":
  35. # 先将数值型字符串转换为数值
  36. self._cust_data[feature] = pd.to_numeric(self._cust_data[feature], errors="coerce")
  37. if rules["method"] == "fillna":
  38. if rules["opt"] == "fill":
  39. self._cust_data[feature] = self._cust_data[feature].fillna(rules["value"])
  40. elif rules["opt"] == "replace":
  41. self._cust_data[feature] = self._cust_data[feature].fillna(self._cust_data[rules["value"]])
  42. elif rules["opt"] == "mean":
  43. self._cust_data[feature] = self._cust_data[feature].fillna(self._cust_data[feature].mean())
  44. def _clean_product_data(self):
  45. """卷烟信息表数据清洗"""
  46. for feature, rules, in ProductConfig.CLEANING_RULES.items():
  47. if rules["type"] == "num":
  48. self._product_data[feature] = pd.to_numeric(self._product_data[feature], errors="coerce")
  49. if rules["method"] == "fillna":
  50. if rules["opt"] == "fill":
  51. self._product_data[feature] = self._product_data[feature].fillna(rules["value"])
  52. elif rules["opt"] == "mean":
  53. self._product_data[feature] = self._product_data[feature].fillna(self._product_data[feature].mean())
  54. def _descartes(self):
  55. """将零售户信息与卷烟信息进行笛卡尔积连接"""
  56. self._cust_data["descartes"] = 1
  57. self._product_data["descartes"] = 1
  58. self._descartes_data = pd.merge(self._cust_data, self._product_data, on="descartes").drop("descartes", axis=1)
  59. def _labeled_data(self):
  60. """根据order表信息给descartes_data数据打标签"""
  61. # 获取order表中的正样本组合
  62. order_combinations = self._order_data[["BB_RETAIL_CUSTOMER_CODE", "PRODUCT_CODE"]].drop_duplicates()
  63. order_set = set(zip(order_combinations["BB_RETAIL_CUSTOMER_CODE"], order_combinations["PRODUCT_CODE"]))
  64. # 在descartes_data中打标签:正样本为1,负样本为2
  65. self._descartes_data['label'] = self._descartes_data.apply(
  66. lambda row: 1 if (row['BB_RETAIL_CUSTOMER_CODE'], row['product_code']) in order_set else 0, axis=1)
  67. def _generate_train_data(self):
  68. """从descartes_data中生成训练数据"""
  69. positive_samples = self._descartes_data[self._descartes_data["label"] == 1]
  70. negative_samples = self._descartes_data[self._descartes_data["label"] == 0]
  71. positive_count = len(positive_samples)
  72. negative_count = min(1 * positive_count, len(negative_samples))
  73. print(positive_count)
  74. print(negative_count)
  75. # 随机抽取2倍正样本数量的负样本
  76. negative_samples_sampled = negative_samples.sample(n=negative_count, random_state=42)
  77. # 合并正负样本
  78. self._train_data = pd.concat([positive_samples, negative_samples_sampled], axis=0)
  79. self._train_data = self._train_data.sample(frac=1, random_state=42).reset_index(drop=True)
  80. # 保存训练数据
  81. self._train_data.to_csv(self._save_res_path, index=False)
  82. if __name__ == '__main__':
  83. processor = DataProcess("00000000000000000000000011445301")
  84. processor.data_process()