Kaynağa Gözat

封装一体化部署流程

Sherlock 11 ay önce
ebeveyn
işleme
a1451c087b
8 değiştirilmiş dosya ile 119 ekleme ve 386 silme
  1. 0 105
      app.py
  2. 0 0
      data/Readme.md
  3. 0 158
      gbdt_lr.py
  4. 0 97
      gbdt_lr_api.py
  5. 9 8
      models/recall/hot_recall.py
  6. 0 2
      models/recall/itemCF/ItemCF.py
  7. 19 16
      test.py
  8. 91 0
      train.py

+ 0 - 105
app.py

@@ -1,105 +0,0 @@
-import argparse
-from dao import load_order_data_from_mysql
-from dao.redis_db import Redis
-from models import HotRecallModel, UserItemScore, ItemCFModel, calculate_similarity_and_save_results
-import os
-
-def run_hot_recall(order_data, city_uuid):
-    """运行热度召回算法"""
-    hot_model = HotRecallModel(order_data)
-    hot_model.calculate_all_hot_score(city_uuid)
-    print("热度召回已完成!")
-
-def run_itemcf(order_data, args):
-    # """运行协同过滤算法"""
-    if os.path.exists(args.interst_score_path) and os.path.exists(args.similarity_matrix_path):
-        os.remove(args.interst_score_path)
-        os.remove(args.similarity_matrix_path)
-    
-    # 计算user-score-item数据
-    cal_interest_scores_model = UserItemScore()
-    scores = cal_interest_scores_model.score(order_data)
-    scores.to_csv(args.interst_score_path, index=False, encoding="utf-8")
-    print("Interest Scores cal done!")
-    
-    # 计算商户共现矩阵及相似度矩阵
-    calculate_similarity_and_save_results(order_data, args.similarity_matrix_path)
-    print("Shops similarity matrix cal done!")
-    
-    # 运行协同过滤召回
-    itemcf_model = ItemCFModel()
-    itemcf_model.train(args.interst_score_path, args.similarity_matrix_path, args.city_uuid, args.n, args.k, args.top_n, args.n_jobs)
-    print("协同过滤已完成!")
-
-def run_itemcf_inference(product_code):
-        """
-        从 Redis 中读取推荐结果,并返回 {shop_id: score} 的列表
-        """
-        redis_db = Redis()
-        redis_key = f"fc:{product_code}"
-        recommendations = redis_db.redis.zrange(redis_key, 0, -1, withscores=True, desc=True)
-        
-        # 将推荐结果转换为 {shop_id: score} 的字典列表
-        result = [{shop_id: float(score)} for shop_id, score in recommendations]
-        
-        return result
-
-def run():
-    parser = argparse.ArgumentParser()
-    
-    # 运行方式
-    parser.add_argument("--run_all", action='store_true')
-    parser.add_argument("--run_hot", action='store_true')
-    parser.add_argument("--run_itemcf", action='store_true')
-    parser.add_argument("--run_itemcf_inference", action='store_true')
-    
-    # 协同过滤相关配置
-    parser.add_argument("--matrix_path", type=str, default="./models/recall/itemCF/matrix")
-    # parser.add_argument("--interst_score_path", type=str, default="./models/recall/itemCF/matrix/score.csv")
-    # parser.add_argument("--similarity_matrix_path", type=str, default="./models/recall/itemCF/matrix/similarity.csv")
-    parser.add_argument("--n", type=int, default=100)
-    parser.add_argument("--k", type=int, default=20)
-    parser.add_argument("--top_n", type=int, default=2000, help='default n * k')
-    parser.add_argument("--n_jobs", type=int, default=4)
-    parser.add_argument("--city_uuid", type=str, default='00000000000000000000000011445301', help="City UUID for filtering data")
-    
-    # 协同过滤推理配置
-    parser.add_argument("--product_code", type=int, default=110111)
-    
-    args = parser.parse_args()
-    
-    # 初始化文件保存相关配置
-    if not os.path.exists(args.matrix_path):
-        os.makedirs(args.matrix_path)
-    args.interst_score_path = os.path.join(args.matrix_path, "score.csv")
-    args.similarity_matrix_path = os.path.join(args.matrix_path, "similarity.csv")
-    
-    
-    if args.run_all:
-        order_data = load_order_data_from_mysql(args.city_uuid)
-        if order_data is not None:
-            run_hot_recall(order_data, args.city_uuid)
-            run_itemcf(order_data, args)
-        else:
-            print("数据库中暂无数据")
-        
-    elif args.run_hot:
-        order_data = load_order_data_from_mysql(args.city_uuid)
-        if order_data is not None:
-            run_hot_recall(order_data, args.city_uuid)
-        else:
-            print("数据库中暂无数据")
-        
-    elif args.run_itemcf:
-        order_data = load_order_data_from_mysql(args.city_uuid)
-        if order_data is not None:
-            run_itemcf(order_data, args)
-        else:
-            print("数据库中暂无数据")  
-        
-    elif args.run_itemcf_inference:
-        recomments = run_itemcf_inference(args.product_code)
-        print(recomments)
-    
-if __name__ == "__main__":
-    run()

+ 0 - 0
data/Readme.md


+ 0 - 158
gbdt_lr.py

@@ -1,158 +0,0 @@
-import argparse
-import os
-from models.rank import DataProcess, Trainer, GbdtLrModel
-import time
-import pandas as pd
-
-# train_data_path = "./moldes/rank/data/gbdt_data.csv"
-# model_path = "./models/rank/weights"
-
-def train(args):
-    model_dir = os.path.join(args.model_path, args.city_uuid)
-    train_data_dir = args.train_data_dir
-    if not os.path.exists(model_dir):
-        os.makedirs(model_dir)
-    
-    if not os.path.exists(train_data_dir):
-        os.makedirs(train_data_dir)
-    
-    # 准备数据集  
-    print("正在整合训练数据...")
-    processor = DataProcess(args.city_uuid, args.train_data_dir)
-    processor.data_process()
-    print("训练数据整合完成!")
-    
-    # 进行训练
-    print("开始训练gbdt-lr模型")
-    trainer(args, os.path.join(args.train_data_dir, "train_data.csv"), model_dir, "gbdtlr_model.pkl")
-
-def trainer(args, train_data_path, model_dir, model_name):
-    trainer = Trainer(train_data_path)
-    
-    start_time = time.time()
-    trainer.train()
-    end_time = time.time()
-    
-    training_time_hours = (end_time - start_time) / 3600
-    print(f"训练时间: {training_time_hours:.4f} 小时")
-    
-    eval_metrics = trainer.evaluate()
-    
-    # 输出评估结果
-    print("GBDT-LR Evaluation Metrics:")
-    for metric, value in eval_metrics.items():
-        print(f"{metric}: {value:.4f}")
-        
-    # 保存模型
-    trainer.save_model(os.path.join(model_dir, model_name))
-
-def recommend_by_product(args):
-    model_dir = os.path.join(args.model_path, args.city_uuid)
-    if not os.path.exists(model_dir):
-        print("暂无该城市的模型,请先进行模型训练")
-        return
-    
-    # 加载模型
-    model = GbdtLrModel(os.path.join(model_dir, args.model_name))
-    recommend_list = model.sort(args.city_uuid, args.product_id)
-    for item in recommend_list[:min(args.last_n, len(recommend_list))]:
-        print(item)
-
-def get_features_importance(args):
-    model_dir = os.path.join(args.model_path, args.city_uuid)
-    if not os.path.exists(model_dir):
-        print("暂无该城市的模型,请先进行模型训练")
-        return
-    
-    # # 加载模型
-    # model = GbdtLrModel(os.path.join(model_dir, args.model_name))
-    # cust_features_importance, product_features_importance = model.generate_feats_importance()
-    
-    # # 将字典列表转换为 DataFrame
-    # cust_df = pd.DataFrame([
-    #     {"Features": list(item.keys())[0], "Importance": list(item.values())[0]}
-    #     for item in cust_features_importance
-    # ])
-    
-    # product_df = pd.DataFrame([
-    #     {"Features": list(item.keys())[0], "Importance": list(item.values())[0]}
-    #     for item in product_features_importance
-    # ])
-    
-    # cust_file_path = os.path.join(model_dir, "cust_features_importance.csv")
-    # product_file_path = os.path.join(model_dir, "product_features_importance.csv")
-    # cust_df.to_csv(cust_file_path, index=False, encoding='utf-8')
-    # product_df.to_csv(product_file_path, index=False, encoding='utf-8')
-    
-    get_features_importance_by_model(model_dir, "ori_model")
-    get_features_importance_by_model(model_dir, "pos_model")
-    get_features_importance_by_model(model_dir, "shopping_model")
-    
-def get_features_importance_by_model(model_dir, modelname):
-    model = GbdtLrModel(os.path.join(model_dir, f"{modelname}.pkl"))
-    cust_features_importance, product_features_importance, order_features_importance = model.generate_feats_importance()
-    
-    # 将字典列表转换为 DataFrame
-    cust_df = pd.DataFrame([
-        {"Features": list(item.keys())[0], "Importance": list(item.values())[0]}
-        for item in cust_features_importance
-    ])
-    
-    product_df = pd.DataFrame([
-        {"Features": list(item.keys())[0], "Importance": list(item.values())[0]}
-        for item in product_features_importance
-    ])
-    
-    order_df = pd.DataFrame([
-        {"Features": list(item.keys())[0], "Importance": list(item.values())[0]}
-        for item in order_features_importance
-    ])
-    
-    importance_dir = os.path.join(model_dir, "importance")
-    if modelname == 'ori_model':
-        importance_dir = os.path.join(importance_dir, "ori")
-    elif modelname == 'pos_model':
-        importance_dir = os.path.join(importance_dir, "pos")
-    elif modelname == 'shopping_model':
-        importance_dir = os.path.join(importance_dir, "shopping")
-    
-    if not os.path.exists(importance_dir):
-        os.makedirs(importance_dir)
-        
-    cust_file_path = os.path.join(importance_dir, "cust_features_importance.csv")
-    product_file_path = os.path.join(importance_dir, "product_features_importance.csv")
-    order_file_path = os.path.join(importance_dir, "order_features_importance.csv")
-    
-    cust_df.to_csv(cust_file_path, index=False, encoding='utf-8')
-    product_df.to_csv(product_file_path, index=False, encoding='utf-8')
-    order_df.to_csv(order_file_path, index=False, encoding='utf-8')
-        
-def run():
-    parser = argparse.ArgumentParser()
-    
-    parser.add_argument("--run_train", action='store_true')
-    parser.add_argument("--recommend", action='store_true')
-    parser.add_argument("--importance", action='store_true')
-    
-    parser.add_argument("--train_data_dir", type=str, default="./data/gbdt")
-    parser.add_argument("--model_path", type=str, default="./models/rank/weights")
-    parser.add_argument("--model_name", type=str, default='model.pkl')
-    parser.add_argument("--last_n", type=int, default=200)
-    
-    parser.add_argument("--city_uuid", type=str, default='00000000000000000000000011445301')
-    parser.add_argument("--product_id", type=str, default='110102')
-    
-    
-    args = parser.parse_args()
-    
-    if args.run_train:
-        train(args)
-        
-    if args.recommend:
-        recommend_by_product(args)
-        
-    if args.importance:
-        get_features_importance(args)
-        
-if __name__ == "__main__":
-    run()

+ 0 - 97
gbdt_lr_api.py

@@ -1,97 +0,0 @@
-import argparse
-import os
-from models.rank import DataProcess, Trainer, GbdtLrModel
-import time
-import pandas as pd
-from fastapi import FastAPI, HTTPException
-from pydantic import BaseModel
-
-app = FastAPI()
-
-model_path = "./models/rank/weights"
-model_name = "model.pkl"
-
-# 定义请求体
-class TrainRequest(BaseModel):
-    city_uuid: str
-    train_data_path: str = "./models/rank/train_data/gbdt_data.csv"
-    model_path: str = model_path
-    model_name: str = model_name
-    
-class RecommendRequest(BaseModel):
-    city_uuid: str
-    product_id: str
-    last_n: int = 200
-    model_path: str = model_path
-    model_name: str = model_name
-    
-class ImportanceRequest(BaseModel):
-    city_uuid: str
-    model_path: str = model_path
-    model_name: str = model_name
-    
-@app.post("/train")
-def train(request: TrainRequest):
-    model_dir = os.path.join(request.model_path, request.city_uuid)
-    train_data_dir = os.path.dirname(request.train_data_path)
-    if not os.path.exists(model_dir):
-        os.makedirs(model_dir)
-    
-    if not os.path.exists(train_data_dir):
-        os.makedirs(train_data_dir)
-        
-    # 准备数据集  
-    print("正在整合训练数据...")
-    processor = DataProcess(request.city_uuid, request.train_data_path)
-    processor.data_process()
-    print("训练数据整合完成!")
-    
-    # 进行训练
-    trainer = Trainer(request.train_data_path)
-    
-    start_time = time.time()
-    trainer.train()
-    end_time = time.time()
-    
-    training_time_hours = (end_time - start_time) / 3600
-    print(f"训练时间: {training_time_hours:.4f} 小时")
-    
-    eval_metrics = trainer.evaluate()
-    
-    # 保存模型
-    trainer.save_model(os.path.join(model_dir, request.model_name))
-    
-    # 输出评估结果
-    print("GBDT-LR Evaluation Metrics:")
-    for metric, value in eval_metrics.items():
-        print(f"{metric}: {value:.4f}")
-    
-    return {"message": "训练完成!"}
-
-@app.post("/recommend")
-def recommend(request: RecommendRequest):
-    model_dir = os.path.join(request.model_path, request.city_uuid)
-    if not os.path.exists(model_dir):
-        raise HTTPException(status_code=404, detail="暂无该城市的模型,请先进行模型训练")
-    
-    # 加载模型
-    model = GbdtLrModel(os.path.join(model_dir, request.model_name))
-    recommend_list = model.sort(request.city_uuid, request.product_id)
-    
-    return {"recommendations": recommend_list[:min(request.last_n, len(recommend_list))]}
-
-@app.post("/importance")
-def importance(request: ImportanceRequest):
-    model_dir = os.path.join(request.model_path, request.city_uuid)
-    if not os.path.exists(model_dir):
-        raise HTTPException(status_code=404, detail="暂无该城市的模型,请先进行模型训练")
-    
-    # 加载模型
-    model = GbdtLrModel(os.path.join(model_dir, request.model_name))
-    cust_features_importance, product_features_importance = model.generate_feats_importance()
-    
-    return {"cust_features_importance": cust_features_importance, "product_features_importance": product_features_importance}
-
-if __name__ == "__main__":
-    import uvicorn
-    uvicorn.run(app, host="0.0.0.0", port=8000)

+ 9 - 8
models/recall/hot_recall.py

@@ -10,15 +10,16 @@ cfgs = load_model_config()
 
 class HotRecallModel:
     def __init__(self, city_uuid):
+        self._city_uuid = city_uuid
         self._redis_db = RedisDatabaseHelper().redis
         self._dao = MySqlDao()
-        self._load_data(city_uuid)
+        self._load_data()
         self._hotkeys = cfgs["hot_recall"]["hot_keys"]
     
-    def _load_data(self, city_uuid):
+    def _load_data(self):
         """加载订单记录表"""
         print("hot_recall: 正在加载order_info...")
-        self._order_data = self._dao.load_order_data(city_uuid)
+        self._order_data = self._dao.load_order_data(self._city_uuid)
         self._order_data =self._order_data[OrderConfig.FEATURE_COLUMNS] 
         
         # 数据清洗
@@ -47,9 +48,9 @@ class HotRecallModel:
             
         return {"key":f"{hot_name}", "value":item_hot_score}
     
-    def _to_redis(self, rec_content_score, city_uuid):
+    def _to_redis(self, rec_content_score):
         hotkey_name = rec_content_score["key"]
-        rec_item_id = f"hot:{city_uuid}:{str(hotkey_name)}" # 修正 rec_item_id 拼接方式
+        rec_item_id = f"hot:{self._city_uuid}:{str(hotkey_name)}" # 修正 rec_item_id 拼接方式
         # 清空 sorted set 数据,确保不会影响后续的存储
         self._redis_db.delete(rec_item_id)
          
@@ -62,16 +63,16 @@ class HotRecallModel:
         if res:  # 只有当 res 不为空时才执行 zadd
             self._redis_db.zadd(rec_item_id, res)
     
-    def calculate_all_hot_score(self, city_uuid):
+    def calculate_all_hot_score(self):
         """
         计算所有的热度指标得分
         """
         # hot_datas = []
         for hotkey_name in tqdm(self._hotkeys, desc="hot_recall:正在计算热度分数"):
-            self._to_redis(self._calculate_hot_score(hotkey_name), city_uuid)
+            self._to_redis(self._calculate_hot_score(hotkey_name))
             
     
         
 if __name__ == "__main__":
     hot_recall = HotRecallModel("00000000000000000000000011445301")
-    hot_recall.calculate_all_hot_score("00000000000000000000000011445301")
+    hot_recall.calculate_all_hot_score()

+ 0 - 2
models/recall/itemCF/ItemCF.py

@@ -93,7 +93,5 @@ class ItemCFModel:
             redis_db.redis.zadd(redis_key, zset_data)
     
 if __name__ == "__main__":
-    score_path = "./data/itemcf/scores.csv"
-    similarity_path = "./data/itemcf/similarity.csv"
     itemcf_model = ItemCFModel()
     itemcf_model.train("00000000000000000000000011445301", n_jobs=4)

+ 19 - 16
test.py

@@ -6,24 +6,27 @@ from models.rank.data.utils import sample_data_clear
 dao = MySqlDao()
 city_uuid = "00000000000000000000000011445301"
     
-order_data = dao.load_order_data(city_uuid)
-order_data["sale_qty"] = order_data["sale_qty"].fillna(0)
-print(order_data.columns.to_list())
-order_data = order_data.infer_objects(copy=False)
+# order_data = dao.load_order_data(city_uuid)
+# order_data["sale_qty"] = order_data["sale_qty"].fillna(0)
+# print(order_data.columns.to_list())
+# order_data = order_data.infer_objects(copy=False)
         
-# 将销售量进行分组求和
-order_data = order_data.groupby(["stat_month", "cust_code", "product_code"], as_index=False)["sale_qty"].sum()
+# # 将销售量进行分组求和
+# order_data = order_data.groupby(["stat_month", "cust_code", "product_code"], as_index=False)["sale_qty"].sum()
     
-cust_data = dao.load_cust_data(city_uuid)
-cust_data = cust_data[["BB_RETAIL_CUSTOMER_CODE", "BB_RETAIL_CUSTOMER_NAME"]]
+# cust_data = dao.load_cust_data(city_uuid)
+# cust_data = cust_data[["BB_RETAIL_CUSTOMER_CODE", "BB_RETAIL_CUSTOMER_NAME"]]
 
-product_data = dao.load_product_data(city_uuid)
-product_data = product_data[ProductConfig.FEATURE_COLUMNS]
-product_data = sample_data_clear(product_data, ProductConfig)
+# product_data = dao.load_product_data(city_uuid)
+# product_data = product_data[ProductConfig.FEATURE_COLUMNS]
+# product_data = sample_data_clear(product_data, ProductConfig)
 
 
-sale_data = order_data.merge(cust_data, left_on='cust_code', right_on='BB_RETAIL_CUSTOMER_CODE', how="inner")
-sale_data = sale_data.merge(product_data, left_on='product_code', right_on='product_code', how="inner")
-sale_data = sale_data[["cust_code", "BB_RETAIL_CUSTOMER_NAME"] + ProductConfig.FEATURE_COLUMNS + ["sale_qty", "stat_month"]]
-sale_data = sale_data.rename(columns=ImportanceFeaturesMap.PRODUCT_FEATRUES_MAP)
-sale_data.to_csv("./data/sale_month.csv", index=False)
+# sale_data = order_data.merge(cust_data, left_on='cust_code', right_on='BB_RETAIL_CUSTOMER_CODE', how="inner")
+# sale_data = sale_data.merge(product_data, left_on='product_code', right_on='product_code', how="inner")
+# sale_data = sale_data[["cust_code", "BB_RETAIL_CUSTOMER_NAME"] + ProductConfig.FEATURE_COLUMNS + ["sale_qty", "stat_month"]]
+# sale_data = sale_data.rename(columns=ImportanceFeaturesMap.PRODUCT_FEATRUES_MAP)
+# sale_data.to_csv("./data/sale_month.csv", index=False)
+
+product = dao.get_product_from_order(city_uuid)
+print(len(product))

+ 91 - 0
train.py

@@ -0,0 +1,91 @@
+import argparse
+import os
+from models.rank import DataProcess, Trainer, GbdtLrModel
+from models import ItemCFModel, HotRecallModel
+import time
+import pandas as pd
+
+# train_data_path = "./moldes/rank/data/gbdt_data.csv"
+# model_path = "./models/rank/weights"
+
+def gbdtlr_train(args):
+    model_dir = os.path.join(args.model_path, args.city_uuid)
+    train_data_dir = args.train_data_dir
+    if not os.path.exists(model_dir):
+        os.makedirs(model_dir)
+    
+    if not os.path.exists(train_data_dir):
+        os.makedirs(train_data_dir)
+    
+    # 准备数据集  
+    print("正在整合训练数据...")
+    processor = DataProcess(args.city_uuid, args.train_data_dir)
+    processor.data_process()
+    print("训练数据整合完成!")
+    
+    # 进行训练
+    print("开始训练gbdt-lr模型")
+    gbdtlr_trainer(os.path.join(args.train_data_dir, "train_data.csv"), model_dir, "gbdtlr_model.pkl")
+
+def gbdtlr_trainer(train_data_path, model_dir, model_name):
+    trainer = Trainer(train_data_path)
+    
+    start_time = time.time()
+    trainer.train()
+    end_time = time.time()
+    
+    training_time_hours = (end_time - start_time) / 3600
+    print(f"训练时间: {training_time_hours:.4f} 小时")
+    
+    eval_metrics = trainer.evaluate()
+    
+    # 输出评估结果
+    print("GBDT-LR Evaluation Metrics:")
+    for metric, value in eval_metrics.items():
+        print(f"{metric}: {value:.4f}")
+        
+    # 保存模型
+    trainer.save_model(os.path.join(model_dir, model_name))
+
+def itemCF(args):
+    itemcf_model = ItemCFModel()
+    itemcf_model.train(city_uuid=args.city_uuid, n=args.largest_n, k=args.similarity_k, top_n=args.top_n, n_jobs=args.n_jobs)
+
+def hot_recall(args):
+    hot_recall = HotRecallModel(args.city_uuid)
+    hot_recall.calculate_all_hot_score()
+
+    
+def run():
+    parser = argparse.ArgumentParser()
+    # 全局参数
+    parser.add_argument("--run_train", action='store_true')
+    
+    parser.add_argument("--city_uuid", type=str, default='00000000000000000000000011445301')
+    
+    # GBDT-LR模型训练参数
+    parser.add_argument("--train_data_dir", type=str, default="./data/gbdt")
+    parser.add_argument("--model_path", type=str, default="./models/rank/weights")
+    
+    # 协同过滤参数
+    parser.add_argument("--largest_n", type=int, default=300)
+    parser.add_argument("--similarity_k", type=int, default=100)
+    parser.add_argument("--top_n", type=int, default=1500)
+    parser.add_argument("--n_jobs", type=int, default=4)
+    
+    
+    args = parser.parse_args()
+    
+    if args.run_train:
+        print("正在计算协同过滤...")
+        itemCF(args)
+        
+        print("正在计算热度召回...")
+        hot_recall(args)
+        
+        print("正在进行gbdt_lr训练...")
+        gbdtlr_train(args)
+        
+        
+if __name__ == "__main__":
+    run()