瀏覽代碼

增加商圈的数据处理

yangzeyu 1 年之前
父節點
當前提交
024dc62715

+ 31 - 7
database/dao/mysql_dao.py

@@ -1,5 +1,6 @@
 from database import MySqlDatabaseHelper
 from sqlalchemy import text
+import pandas as pd
 
 class MySqlDao:
     _instance = None
@@ -20,6 +21,7 @@ class MySqlDao:
         self._cust_tablename = "tads_brandcul_cust_info"
         self._order_tablename = "tads_brandcul_cust_order"
         self._mock_order_tablename = "yunfu_mock_data"
+        self._shopping_tablename = "tads_brandcul_cust_info_lbs"
         
         self._initialized = True
         
@@ -55,12 +57,11 @@ class MySqlDao:
         data = data.infer_objects(copy=False)
         return data
     
-    def load_mock_order_data(self, city_uuid):
+    def load_mock_order_data(self):
         """从数据库中读取mock的订单信息"""
         query = f"SELECT * FROM {self._mock_order_tablename}"
-        params = {"city_uuid": city_uuid}
         
-        data = self.db_helper.load_data_with_page(query, params)
+        data = self.db_helper.load_data_with_page(query, {})
         
         # 去除重复值和填补缺失值
         data.drop_duplicates(inplace=True)
@@ -69,6 +70,15 @@ class MySqlDao:
         
         return data
     
+    def load_shopping_data(self, city_uuid):
+        """从数据库中读取商圈数据"""
+        query = f"SELECT * FROM {self._shopping_tablename} WHERE city_uuid = :city_uuid"
+        params = {"city_uuid": city_uuid}
+        
+        data = self.db_helper.load_data_with_page(query, params)
+        
+        return data
+    
     def get_cust_list(self, city_uuid):
         """获取商户列表"""
         data = self.load_cust_data(city_uuid)
@@ -107,11 +117,25 @@ class MySqlDao:
         data = self.db_helper.fetch_all(query, params)
         
         return data
+    
+    def data_preprocess(self, data: pd.DataFrame):
+        
+        data.drop(["cust_uuid", "longitude", "latitude", "range_radius"], axis=1, inplace=True)
+        remaining_cols = data.columns.drop(["city_uuid", "cust_code"])
+        col_with_missing = remaining_cols[data[remaining_cols].isnull().any()].tolist() # 判断有缺失的字段
+        col_all_missing = remaining_cols[data[remaining_cols].isnull().all()].to_list() # 全部缺失的字段
+        col_partial_missing = list(set(col_with_missing) - set(col_all_missing)) # 部分缺失的字段
+        
+        for col in col_partial_missing:
+            data[col] = data[col].fillna(data[col].mean())
+        
+        for col in col_all_missing:
+            data[col] = data[col].fillna(0).infer_objects(copy=False)
+        
         
 if __name__ == "__main__":
     dao = MySqlDao()
-    # city_uuid = "00000000000000000000000011445301"
-    city_uuid = "00000000000000000000000011441801"
+    city_uuid = "00000000000000000000000011445301"
+    # city_uuid = "00000000000000000000000011441801"
     cust_id_list = ["441800100006", "441800100051", "441800100811"]
-    cust_list = dao.get_cust_by_ids(city_uuid, cust_id_list)
-    print(len(cust_list))
+    cust_list = dao.load_mock_order_data()

+ 19 - 11
database/db/mysql.py

@@ -3,6 +3,7 @@ import pandas as pd
 from sqlalchemy import create_engine, text
 from sqlalchemy.orm import sessionmaker
 from sqlalchemy.exc import SQLAlchemyError
+from tqdm import tqdm
 
 cfgs = load_config()
 
@@ -49,22 +50,29 @@ class MySqlDatabaseHelper:
     def load_data_with_page(self, query, params, page_size=1000):
         """分页查询数据"""
         data = pd.DataFrame()
+        count_query = text(query.replace("SELECT *", "SELECT COUNT(*)"))
         query += " LIMIT :limit OFFSET :offset"
         query = text(query)
-        
+    
+        # 获取总行数
+        total_rows = self.fetch_one(count_query, params)[0]
+
         page = 1
-        while True:
-            offset = (page - 1) * page_size # 计算偏移量
-            params["limit"] = page_size
-            params["offset"] = offset
+        with tqdm(total=total_rows, desc="Loading data", unit="rows") as pbar:  # 初始化进度条
+            while True:
+                offset = (page - 1) * page_size  # 计算偏移量
+                params["limit"] = page_size
+                params["offset"] = offset
 
-            df = pd.DataFrame(self.fetch_all(query, params))
-            if df.empty:
-                break
-            data = pd.concat([data, df], ignore_index=True)
-            print(f"Page {page}: Retrieved {len(df)} rows, Total rows so far: {len(data)}")
+                df = pd.DataFrame(self.fetch_all(query, params))
+                if df.empty:
+                    break
+                data = pd.concat([data, df], ignore_index=True)
+            
+                # 更新进度条
+                pbar.update(len(df))  # 更新进度条的行数
             
-            page += 1
+                page += 1
         return data
         
         

+ 22 - 2
models/rank/data/preprocess.py

@@ -16,7 +16,9 @@ class DataProcess():
         self._product_data = self._mysql_dao.load_product_data(city_uuid)
         print("正在加载order_info...")
         # self._order_data = self._mysql_dao.load_cust_data(city_uuid)
-        self._order_data = self._mysql_dao.load_mock_order_data(city_uuid)
+        self._order_data = self._mysql_dao.load_mock_order_data()
+        print("正在加载shopping_info...")
+        self._shopping_data = self._mysql_dao.load_shopping_data(city_uuid)
         
     def data_process(self):
         """数据预处理"""
@@ -32,6 +34,7 @@ class DataProcess():
         self._clean_cust_data()
         self._clean_product_data()
         self._clean_order_data()
+        self._clean_shopping_data()
         
         # # 3. 将零售户信息表与卷烟信息表进行笛卡尔积连接
         # self._descartes()
@@ -80,7 +83,24 @@ class DataProcess():
                 self._product_data[feature] = self._product_data[feature].infer_objects(copy=False)
                     
     def _clean_order_data(self):
-        pass
+        # 去除重复值和填补缺失值
+        self._order_data.drop_duplicates(inplace=True)
+        self._order_data.fillna(0, inplace=True)
+        self._order_data = self._order_data.infer_objects(copy=False)
+        
+    def _clean_shopping_data(self):
+        """处理商圈数据缺省值"""
+        self._shopping_data.drop(["cust_uuid", "longitude", "latitude", "range_radius"], axis=1, inplace=True)
+        remaining_cols = self._shopping_data.columns.drop(["city_uuid", "cust_code"])
+        col_with_missing = remaining_cols[self._shopping_data[remaining_cols].isnull().any()].tolist() # 判断有缺失的字段
+        col_all_missing = remaining_cols[self._shopping_data[remaining_cols].isnull().all()].to_list() # 全部缺失的字段
+        col_partial_missing = list(set(col_with_missing) - set(col_all_missing)) # 部分缺失的字段
+        
+        for col in col_partial_missing:
+            self._shopping_data[col] = self._shopping_data[col].fillna(self._shopping_data[col].mean())
+        
+        for col in col_all_missing:
+            self._shopping_data[col] = self._shopping_data[col].fillna(0).infer_objects(copy=False)
     
     def _calculate_score(self):
         """计算order记录的fens"""

+ 2 - 3
models/recall/hot_recall.py

@@ -8,13 +8,12 @@
 @Version     : 1.0
 '''
 import pandas as pd
-from dao.redis_db import Redis
-from dao.mysql_client import Mysql
+from database import RedisDatabaseHelper
 from tqdm import tqdm
 
 class HotRecallModel:
     def __init__(self, order_data):
-        self._redis_db = Redis()
+        self._redis_db = RedisDatabaseHelper()
         self._hotkeys = self.get_hotkeys()
         self._order_data = order_data
 

+ 2 - 2
models/recall/itemCF/ItemCF.py

@@ -1,4 +1,4 @@
-from dao.redis_db import Redis
+from database import RedisDatabaseHelper
 import pandas as pd
 import numpy as np
 from tqdm import tqdm
@@ -63,7 +63,7 @@ class ItemCFModel:
         将 self._recommendations 中的数据保存到 Redis 的 Sorted Set (ZSET) 中
         存储格式为 fc:product_code,其中商户 ID 作为成员,得分作为分数
         """
-        redis_db = Redis()
+        redis_db = RedisDatabaseHelper()
         
         # 存redis之前,先进行删除操作
         pattern = f"fc:{city_uuid}:*"

+ 3 - 4
models/recall/itemCF/calculate_similarity_matrix.py

@@ -1,12 +1,11 @@
-from dao import load_order_data_from_mysql
+from database import MySqlDao
 import pandas as pd
 import numpy as np
 
 from itertools import combinations
-from dao.mysql_client import Mysql
 from tqdm import tqdm
 
-
+dao = MySqlDao()
 def build_co_occurence_matrix(order_data):
     """
     构建商户共现矩阵
@@ -73,7 +72,7 @@ if __name__ == "__main__":
     co_occurrence_save_path = "./models/recall/itemCF/matrix/occurrence.csv"
     similarity_matrix_save_path = "./models/recall/itemCF/matrix/similarity.csv"
     # 从数据库中读取订单数据
-    order_data = load_order_data_from_mysql()
+    order_data = dao.load_order_data()
     
     calculate_similarity_and_save_results(order_data, similarity_matrix_save_path)
     

+ 4 - 3
models/recall/itemCF/user_item_score.py

@@ -9,7 +9,7 @@
 '''
 
 
-from dao import load_order_data_from_mysql
+from database import MySqlDao
 from decimal import Decimal
 
 # 算法封装成一个类
@@ -31,6 +31,7 @@ class UserItemScore:
             "OUT_STOCK_DAYS": Decimal(0.02),
             "RETAIL_PRICE_INDEX": Decimal(0.02)
         }
+        self.dao = MySqlDao()
 
     # 均值方差归一化函数
     def standardize_column(self, column):
@@ -67,9 +68,9 @@ class UserItemScore:
 if __name__ == "__main__":
     # 创建一个 ItemCF 类的实例
     item_cf_algorithm = UserItemScore()
-    
+    dao = MySqlDao()
     # 读取数据
-    order_data = load_order_data_from_mysql()
+    order_data = dao.load_order_data()
 
     # 调用算法
     scores = item_cf_algorithm.score(order_data)