Przeglądaj źródła

克隆dev到master分支

Sherlock1011 1 rok temu
rodzic
commit
8c4ad157e7

+ 4 - 0
.gitignore

@@ -0,0 +1,4 @@
+.idea/
+.vscode/
+__pycache__/
+*.pyc

+ 23 - 0
Dockerfile

@@ -0,0 +1,23 @@
+FROM registry.cn-hangzhou.aliyuncs.com/hexiaoshi/python:3.10
+
+RUN apt-get update && apt-get -y install  tzdata cron vim && ln -fs /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
+
+ENV PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
+
+WORKDIR /app
+
+COPY . /app/
+
+
+RUN mv /app/crontab /etc/cron.d/crontab && chmod 0644 /etc/cron.d/crontab \ 
+        && /usr/bin/crontab /etc/cron.d/crontab \ 
+        && pip install --upgrade pip setuptools -i https://mirrors.aliyun.com/pypi/simple  \ 
+        && pip install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple
+
+RUN find . | grep -E "(__pycache__|Dockerfile|\.md|\.pyc|\.pyo$)" | xargs rm -rf && python3 -m compileall -b . \ 
+        &&  find . -name "*.py" |xargs rm -rf && touch /var/log/cron.log
+
+VOLUME ["/etc/cron.d"]
+
+CMD /bin/bash -c "/usr/bin/crontab /etc/cron.d/crontab && cron && tail -f /var/log/cron.log"
+

+ 95 - 0
app.py

@@ -0,0 +1,95 @@
+import argparse
+from dao import load_order_data_from_mysql
+from dao.redis_db import Redis
+from models import HotRecallModel, UserItemScore, ItemCFModel, calculate_similarity_and_save_results
+import os
+
+def run_hot_recall(order_data):
+    """运行热度召回算法"""
+    hot_model = HotRecallModel(order_data)
+    hot_model.calculate_all_hot_score()
+    print("热度召回已完成!")
+
+def run_itemcf(order_data, args):
+    # """运行协同过滤算法"""
+    if os.path.exists(args.interst_score_path) and os.path.exists(args.similarity_matrix_path):
+        os.remove(args.interst_score_path)
+        os.remove(args.similarity_matrix_path)
+    
+    # 计算user-score-item数据
+    cal_interest_scores_model = UserItemScore()
+    scores = cal_interest_scores_model.score(order_data)
+    scores.to_csv(args.interst_score_path, index=False, encoding="utf-8")
+    print("Interest Scores cal done!")
+    
+    # 计算商户共现矩阵及相似度矩阵
+    calculate_similarity_and_save_results(order_data, args.similarity_matrix_path)
+    print("Shops similarity matrix cal done!")
+    
+    # 运行协同过滤召回
+    itemcf_model = ItemCFModel()
+    itemcf_model.train(args.interst_score_path, args.similarity_matrix_path, args.n, args.k, args.top_n, args.n_jobs)
+    print("协同过滤已完成!")
+
+def run_itemcf_inference(product_code):
+        """
+        从 Redis 中读取推荐结果,并返回 {shop_id: score} 的列表
+        """
+        redis_db = Redis()
+        redis_key = f"fc:{product_code}"
+        recommendations = redis_db.redis.zrange(redis_key, 0, -1, withscores=True, desc=True)
+        
+        # 将推荐结果转换为 {shop_id: score} 的字典列表
+        result = [{shop_id: float(score)} for shop_id, score in recommendations]
+        
+        return result
+
+def run():
+    parser = argparse.ArgumentParser()
+    
+    # 运行方式
+    parser.add_argument("--run_all", action='store_true')
+    parser.add_argument("--run_hot", action='store_true')
+    parser.add_argument("--run_itemcf", action='store_true')
+    parser.add_argument("--run_itemcf_inference", action='store_true')
+    
+    # 协同过滤相关配置
+    parser.add_argument("--matrix_path", type=str, default="./models/recall/itemCF/matrix")
+    # parser.add_argument("--interst_score_path", type=str, default="./models/recall/itemCF/matrix/score.csv")
+    # parser.add_argument("--similarity_matrix_path", type=str, default="./models/recall/itemCF/matrix/similarity.csv")
+    parser.add_argument("--n", type=int, default=100)
+    parser.add_argument("--k", type=int, default=10)
+    parser.add_argument("--top_n", type=int, default=200, help='default n * k')
+    parser.add_argument("--n_jobs", type=int, default=4)
+    
+    # 协同过滤推理配置
+    parser.add_argument("--product_code", type=int, default=110111)
+    
+    args = parser.parse_args()
+    
+    # 初始化文件保存相关配置
+    if not os.path.exists(args.matrix_path):
+        os.makedirs(args.matrix_path)
+    args.interst_score_path = os.path.join(args.matrix_path, "score.csv")
+    args.similarity_matrix_path = os.path.join(args.matrix_path, "similarity.csv")
+    
+    
+    if args.run_all:
+        order_data = load_order_data_from_mysql()
+        run_hot_recall(order_data)
+        run_itemcf(order_data, args)
+        
+    elif args.run_hot:
+        order_data = load_order_data_from_mysql()
+        run_hot_recall(order_data)
+        
+    elif args.run_itemcf:
+        order_data = load_order_data_from_mysql()
+        run_itemcf(order_data, args)
+        
+    elif args.run_itemcf_inference:
+        recomments = run_itemcf_inference(args.product_code)
+        print(recomments)
+    
+if __name__ == "__main__":
+    run()

+ 7 - 0
config/__init__.py

@@ -0,0 +1,7 @@
+#!/usr/bin/env python3
+# -*- coding:utf-8 -*-
+from config.config import load_config 
+
+__all__ = [
+    "load_config"
+]

+ 6 - 0
config/config.py

@@ -0,0 +1,6 @@
+import yaml
+
+def load_config():
+    with open('./config/database_config.yaml') as file:
+        config = yaml.safe_load(file)
+    return config

+ 12 - 0
config/database_config.yaml

@@ -0,0 +1,12 @@
+mysql:
+  host: 'rm-t4n6rz18y4t5x47y70o.mysql.singapore.rds.aliyuncs.com'
+  port: 3036
+  db: 'brand_cultivation'
+  user: 'BrandCultivation'
+  passwd: '8BfWBc18NBXl#CMd'
+
+redis:
+  host: 'r-t4nb4n9i8je7u6ogk1pd.redis.singapore.rds.aliyuncs.com'
+  port: 5000
+  db: 10
+  passwd: 'gHmNkVBd88sZybj'

+ 4 - 0
crontab

@@ -0,0 +1,4 @@
+# START CRON JOB
+11 13 * * * cd /app && /usr/local/bin/python app.pyc --run_all >> /var/log/app.log 2>&1
+#* * * * * echo "asdf" >> /var/log/test.log
+# END CRON JOB

+ 9 - 0
dao/__init__.py

@@ -0,0 +1,9 @@
+#!/usr/bin/env python3
+# -*- coding:utf-8 -*-
+from dao.mysql_client import Mysql
+from dao.dao import load_order_data_from_mysql
+
+__all__ = [
+    "Mysql",
+    "load_order_data_from_mysql"
+]

+ 17 - 0
dao/dao.py

@@ -0,0 +1,17 @@
+from dao import Mysql
+
+def load_order_data_from_mysql():
+    """从数据库中读取数据"""
+    client = Mysql()
+    tablename = "tads_brandcul_cust_order"
+    query_text = "*"
+    
+    df = client.load_data(tablename, query_text)
+    
+    df.drop('stat_month', axis=1, inplace=True)
+    print(df.columns)
+    
+     # 去除重复值和填补缺失值
+    df.drop_duplicates(inplace=True)
+    df.fillna(0, inplace=True)
+    return df

+ 88 - 0
dao/mysql_client.py

@@ -0,0 +1,88 @@
+#!/usr/bin/env python3
+# -*- coding:utf-8 -*-
+import os
+from sqlalchemy import create_engine, text
+from sqlalchemy.dialects.mysql import pymysql
+from sqlalchemy.orm import sessionmaker
+from sqlalchemy.ext.declarative import declarative_base
+from config import load_config
+import pandas as pd
+import sys
+
+cfgs = load_config()
+
+class Mysql(object):
+    def __init__(self):
+        self._host = cfgs['mysql']['host']
+        self._port = cfgs['mysql']['port']
+        self._user = cfgs['mysql']['user']
+        self._passwd = cfgs['mysql']['passwd']
+        self._dbname = cfgs['mysql']['db']
+        
+        # 通过连接池创建engine
+        self.engine = create_engine(
+            self._connect(self._host, self._port, self._user, self._passwd, self._dbname),
+            pool_size=10, # 设置连接池大小
+            max_overflow=20, # 超过连接池大小时的额外连接数
+            pool_recycle=3600 # 回收连接时间
+        )
+        self._DBSession = sessionmaker(bind=self.engine)
+
+    def _connect(self, host, port, user, pwd, db):
+        try:
+            client = "mysql+pymysql://" + user + ":" + pwd + "@" + host + ":" + str(port) + "/" + db
+            return client
+        except Exception as e:
+            raise ConnectionError(f"failed to create connection string: {e}")
+        
+    def create_session(self):
+        """创建返回一个新的数据库session"""
+        return self._DBSession()
+    
+    def fetch_data_with_pagination(self, tablename, query_text, page=1, page_size=1000):
+        """分页查询数据"""
+        offset = (page - 1) * page_size # 计算偏移量
+        query = text(f"select {query_text} from {tablename} LIMIT :limit OFFSET :offset")
+        with self.create_session() as session:
+            results = session.execute(query, {"limit": page_size, "offset": offset}).fetchall()
+            df = pd.DataFrame(results)
+        return df
+    
+    def load_data(self, tablename, query_text, page=1, page_size=1000):
+        # 创建一个空的DataFrame用于存储所有数据
+        total_df = pd.DataFrame()
+    
+        try:
+            while True:
+                df = self.fetch_data_with_pagination(tablename, query_text, page, page_size)
+                if df.empty:
+                    break
+            
+                total_df = pd.concat([total_df, df], ignore_index=True)
+                print(f"Page {page}: Retrieved {len(df)} rows, Total rows so far: {len(total_df)}")
+                page += 1  # 继续下一页
+                
+        except Exception as e:
+            print(f"Error: {e}")
+            return None
+        
+        finally:
+            self.closed()
+            return total_df
+    
+    def closed(self):
+        """关闭连接,回收资源"""
+        self.engine.dispose()
+
+
+if __name__ == '__main__':
+    
+    client = Mysql()
+    tablename = "mock_order"
+    
+    # 设置分页参数
+    page = 1
+    page_size = 1000
+    
+    query_text = '*'
+    client.load_data("mock_order", query_text, page, page_size)

+ 40 - 0
dao/redis_db.py

@@ -0,0 +1,40 @@
+#!/usr/bin/env python3
+# -*- coding:utf-8 -*-
+import redis
+from config import load_config
+
+cfgs = load_config()
+
+
+class Redis(object):
+    def __init__(self):
+        self.redis = redis.StrictRedis(host=cfgs['redis']['host'],
+                                       port=cfgs['redis']['port'],
+                                       password=cfgs['redis']['passwd'],
+                                       db=cfgs['redis']['db'],
+                                       decode_responses=True)
+
+
+if __name__ == '__main__':
+    import random
+    # 连接到 Redis 服务器
+    r = Redis().redis
+
+    # 有序集合的键名
+    zset_key = 'configs:hotkeys'
+
+    data_list = ['ORDER_FULLORDR_RATE', 'MONTH6_SALE_QTY_YOY', 'MONTH6_SALE_QTY_MOM', 'MONTH6_SALE_QTY']
+
+    # 清空已有的有序集合(可选,若需要全新的集合可执行此操作)
+    r.delete(zset_key)
+    
+    for item in data_list:
+        # 生成 80 到 100 之间的随机数,小数点后保留 4 位
+        score = round(random.uniform(80, 100), 4)
+        # 将元素和对应的分数添加到有序集合中
+        r.zadd(zset_key, {item: score})
+
+    # # 从 Redis 中读取有序集合并打印
+    # result = r.zrange(zset_key, 0, -1, withscores=True)
+    # for item, score in result:
+    #     print(f"元素: {item}, 分数: {score}")

+ 0 - 0
data/Readme.md


+ 12 - 0
models/__init__.py

@@ -0,0 +1,12 @@
+#!/usr/bin/env python3
+# -*- coding:utf-8 -*-
+from models.recall.hot_recall import HotRecallModel
+from models.recall.itemCF.calculate_similarity_matrix import calculate_similarity_and_save_results
+from models.recall.itemCF.user_item_score import UserItemScore
+from models.recall.itemCF.ItemCF import ItemCFModel
+__all__ = [
+    "HotRecallModel",
+    "UserItemScore",
+    "calculate_similarity_and_save_results",
+    "ItemCFModel"
+]

+ 2 - 0
models/rank/__init__.py

@@ -0,0 +1,2 @@
+#!/usr/bin/env python3
+# -*- coding:utf-8 -*-

+ 2 - 0
models/rank/gbdt_lr.py

@@ -0,0 +1,2 @@
+#!/usr/bin/env python3
+# -*- coding:utf-8 -*-

+ 88 - 0
models/recall/hot_recall.py

@@ -0,0 +1,88 @@
+#!/usr/bin/env python
+# -*- encoding: utf-8 -*-
+'''
+@filename     : hot_recall.py
+@description     : 热度召回算法   
+@time     : 2025/01/21/00
+@author     : Sherlock1011 & Min1027
+@Version     : 1.0
+'''
+import pandas as pd
+from dao.redis_db import Redis
+from dao.mysql_client import Mysql
+from tqdm import tqdm
+
+class HotRecallModel:
+    def __init__(self, order_data):
+        self._redis_db = Redis()
+        self._hotkeys = self.get_hotkeys()
+        self._order_data = order_data
+
+
+    def get_hotkeys(self):
+        info = self._redis_db.redis.zrange("configs:hotkeys", 0, -1, withscores=True)
+        hotkeys = []
+        for item, _ in info:
+            hotkeys.append(item)
+        return hotkeys
+
+
+    def _load_data_from_dataset(self):
+        """从数据库中读取数据"""
+        client = Mysql()
+        tablename = "mock_order"
+        query_text = "*"
+    
+        df = client.load_data(tablename, query_text)
+    
+        # 去除重复值和填补缺失值
+        df.drop_duplicates(inplace=True)
+        df.fillna(0, inplace=True)
+        return df
+        
+    def _calculate_hot_score(self, hot_name):
+        """
+        根据热度指标计算热度得分
+        :param hot_name: 热度指标A
+        :type param: string
+        :return: 所有热度指标的得分
+        :rtype: list
+        """
+        results = self._order_data.groupby("BB_RETAIL_CUSTOMER_CODE")[hot_name].mean().reset_index()
+        sorted_results = results.sort_values(by=hot_name, ascending=False).reset_index(drop=True)
+        item_hot_score = []
+        # mock热度召回最大分数
+        max_score = 1.0
+        total_score = sorted_results.loc[0, hot_name] / max_score
+        for row in sorted_results.itertuples(index=True, name="Row"):
+            item = {row[1]:(row[2]/total_score)*100}
+            item_hot_score.append(item)
+        return {"key":f"{hot_name}", "value":item_hot_score}
+
+    def calculate_all_hot_score(self):
+        """
+        计算所有的热度指标得分
+        """
+        # hot_datas = []
+        for hotkey_name in tqdm(self._hotkeys, desc="hot_recall:正在计算热度分数"):
+            self.to_redis(self._calculate_hot_score(hotkey_name))
+
+    def to_redis(self, rec_content_score):
+        hotkey_name = rec_content_score["key"]
+        rec_item_id = "hot:" + str(hotkey_name)  # 修正 rec_item_id 拼接方式
+        res = {}
+
+        # rec_content_score["value"] 是一个包含字典的列表
+        for item in rec_content_score["value"]:  
+            for content, score in item.items():  # item 形如 {A001: 75.0}
+                res[content] = float(score)  # 确保 score 是 float 类型
+
+        if res:  # 只有当 res 不为空时才执行 zadd
+            self._redis_db.redis.zadd(rec_item_id, res)
+
+
+if __name__ == "__main__":
+    # 序列化
+    model = HotRecallModel()
+    model.calculate_all_hot_score()
+    # joblib.dump(model, "hot_recall.model")

+ 120 - 0
models/recall/item2vec.py

@@ -0,0 +1,120 @@
+#!/usr/bin/env python3
+# -*- coding:utf-8 -*-
+import gensim
+from dao.mysql_client import Mysql
+
+class Item2Vec(object):
+    def __init__(self):
+        mysql_client = Mysql()
+        # 创建会话
+        self.session = mysql_client.create_session()
+def load_item_sequences_from_mysql():
+    try:
+        conn = mysql.connector.connect(
+            host='localhost',
+            user='your_username',
+            password='your_password',
+            database='your_database'
+        )
+        cursor = conn.cursor()
+        query = "SELECT user_id, sequence FROM item_sequences"
+        cursor.execute(query)
+        for row in cursor:
+            user_id, sequence_str = row
+            sequence = sequence_str.split(',')
+            yield user_id, sequence
+        cursor.close()
+        conn.close()
+    except mysql.connector.Error as err:
+        print(f"数据库连接或查询出错: {err}")
+
+
+def load_item_attributes_from_mysql():
+    try:
+        conn = mysql.connector.connect(
+            host='localhost',
+            user='your_username',
+            password='your_password',
+            database='your_database'
+        )
+        cursor = conn.cursor()
+        query = "SELECT item, attributes FROM item_attributes"
+        cursor.execute(query)
+        item_attributes = {}
+        for item, attributes_str in cursor:
+            attributes = attributes_str.split(',')
+            item_attributes[item] = attributes
+        cursor.close()
+        conn.close()
+        return item_attributes
+    except mysql.connector.Error as err:
+        print(f"数据库连接或查询出错: {err}")
+
+
+def load_user_attributes_from_mysql():
+    try:
+        conn = mysql.connector.connect(
+            host='localhost',
+            user='your_username',
+            password='your_password',
+            database='your_database'
+        )
+        cursor = conn.cursor()
+        query = "SELECT user_id, taste, cigarette_length, cigarette_type, packaging_color FROM user_attributes"
+        cursor.execute(query)
+        for row in cursor:
+            user_id, taste, cigarette_length, cigarette_type, packaging_color = row
+            user_attrs = [attr for attr in [taste, cigarette_length, cigarette_type, packaging_color] if attr]
+            yield user_id, user_attrs
+        cursor.close()
+        conn.close()
+    except mysql.connector.Error as err:
+        print(f"数据库连接或查询出错: {err}")
+
+
+def combine_user_item_attributes(item_sequences, item_attributes):
+    user_attributes = {user_id: attrs for user_id, attrs in load_user_attributes_from_mysql()}
+    for user_id, sequence in item_sequences:
+        user_attrs = user_attributes.get(user_id, [])
+        combined_sequence = user_attrs.copy()
+        for item in sequence:
+            combined_sequence.append(item)
+            combined_sequence.extend(item_attributes.get(item, []))
+        yield combined_sequence
+
+
+def train_item2vec(item_sequences, vector_size=100, window=5, min_count=10, workers=4):
+    model = gensim.models.Word2Vec(sentences=item_sequences, vector_size=vector_size, window=window,
+                                   min_count=min_count, workers=workers)
+    return model
+
+
+def get_item_vector(item, model):
+    try:
+        return model.wv[item]
+    except KeyError:
+        print(f"物品 {item} 未在模型中找到。")
+        return None
+
+
+def find_similar_items(item, model, topn=5):
+    try:
+        similar_items = model.wv.most_similar(item, topn=topn)
+        filtered_similar_items = [(item, score) for item, score in similar_items if not item.startswith(('attr', 'user_'))]
+        return filtered_similar_items
+    except KeyError:
+        print(f"物品 {item} 未在模型中找到。")
+        return None
+
+
+if __name__ == "__main__":
+    item_sequences = load_item_sequences_from_mysql()
+    item_attributes = load_item_attributes_from_mysql()
+    combined_sequences = combine_user_item_attributes(item_sequences, item_attributes)
+    item2vec_model = train_item2vec(combined_sequences)
+    item_vector = get_item_vector('item1', item2vec_model)
+    if item_vector is not None:
+        print(f"物品 'item1' 的向量表示: {item_vector}")
+    similar_items = find_similar_items('item1', item2vec_model, topn=3)
+    if similar_items is not None:
+        print(f"与物品 'item1' 最相似的 3 个物品: {similar_items}")

+ 96 - 0
models/recall/itemCF/ItemCF.py

@@ -0,0 +1,96 @@
+from dao.redis_db import Redis
+import pandas as pd
+import numpy as np
+from tqdm import tqdm
+from scipy.sparse import csr_matrix
+from joblib import Parallel, delayed
+import joblib
+
+class ItemCFModel:
+    def __init__(self):
+        self._recommendations = {}
+        
+    def train(self, score_path, similatity_path, n=100, k=10, top_n=100, n_jobs=4):
+        self._score_df = pd.read_csv(score_path)
+        self._similarity_df = pd.read_csv(similatity_path, index_col=0)
+        self._similarity_matrix = csr_matrix(self._similarity_df.values)
+        self._shop_index = {shop: idx for idx, shop in enumerate(self._similarity_df.index)}
+        self._index_shop = {idx: shop for idx, shop in enumerate(self._similarity_df.index)}
+        
+        def process_product(product_code, scores):
+            # 获取热度最高的n个商户
+            top_n_shops = scores.nlargest(n, "SCORE")["BB_RETAIL_CUSTOMER_CODE"].values
+            top_n_indices = [self._shop_index[shop] for shop in top_n_shops]
+            
+            # 找到每个商户最相似的k个商户
+            similar_shops = {}
+            for shop_idx in top_n_indices:
+                similarities = self._similarity_matrix[shop_idx].toarray().flatten()
+                similar_indices = np.argpartition(similarities, -k-1)[-k-1:]
+                similar_indices = similar_indices[similar_indices != shop_idx][:k]
+                similar_shops[self._index_shop[shop_idx]] = [self._index_shop[idx] for idx in similar_indices]
+            
+            # 生成候选商户列表
+            candidate_shops = list(set([m for sublist in similar_shops.values() for m in sublist]))
+            candidate_indices = [self._shop_index[shop] for shop in candidate_shops]
+            
+            # 计算每个候选商户的兴趣得分
+            interest_scores = {}
+            for candidate_idx in candidate_indices:
+                interest_score = 0
+                for shop_idx in top_n_indices:
+                    if self._index_shop[candidate_idx] in similar_shops[self._index_shop[shop_idx]]:
+                        shop_score = scores[scores["BB_RETAIL_CUSTOMER_CODE"]==self._index_shop[shop_idx]]["SCORE"].values[0]
+                        interest_score += shop_score * self._similarity_matrix[shop_idx, candidate_idx]
+                interest_scores[self._index_shop[candidate_idx]] = interest_score
+            
+            # 将候选商户的兴趣得分转换为字典列表,并按照从大到小排列
+            sorted_candidates = sorted([{shop_id: s} for shop_id, s in interest_scores.items()],
+                                       key=lambda x: list(x.values())[0], reverse=True)[:top_n]
+            
+            return product_code, sorted_candidates
+        
+        # 并行处理每个品规
+        results = Parallel(n_jobs=n_jobs)(delayed(process_product)(product_code, scores) 
+                                          for product_code, scores in tqdm(self._score_df.groupby("PRODUCT_CODE"), desc="train:正在计算候选得分"))
+        print(len(results))
+        # 存储结果
+        self._recommendations = {product_code: sorted_candidates for product_code, sorted_candidates in results}
+        self.to_redis_zset()
+    
+    def to_redis_zset(self):
+        """
+        将 self._recommendations 中的数据保存到 Redis 的 Sorted Set (ZSET) 中
+        存储格式为 fc:product_code,其中商户 ID 作为成员,得分作为分数
+        """
+        redis_db = Redis()
+        for product_code, recommendations in tqdm(self._recommendations.items(), desc="train:正在存储推荐结果"):
+            redis_key = f"fc:{product_code}"
+            zset_data = {}
+            for rec in recommendations:
+                for shop_id, score in rec.items():
+                    try:
+                        zset_data[shop_id] = float(score)
+                    except ValueError as e:
+                        print(f"Error converting score to float for shop_id {shop_id}: {score}")
+                        raise e
+            
+            redis_db.redis.zadd(redis_key, zset_data)
+    
+if __name__ == "__main__":
+    score_path = "./models/recall/itemCF/matrix/score.csv"
+    similarity_path = "./models/recall/itemCF/matrix/similarity.csv"
+    # itemcf_model = ItemCFModel()
+    # itemcf_model.train(score_path, similarity_path, n_jobs=4)
+    # recommend_list = itemcf_model.inference(110111)
+    # itemcf_model.to_redis_zset()
+    # print(len(recommend_list))
+    # print(recommend_list)
+    # joblib.dump(itemcf_model, "itemCF.model")
+    
+    # model = joblib.load("./itemCF.model")
+    # recommend_list = model.inference(110102)
+    # print(len(recommend_list))
+    # print(recommend_list)
+    data = pd.read_csv(similarity_path, index_col=0)
+    print(data)

+ 80 - 0
models/recall/itemCF/calculate_similarity_matrix.py

@@ -0,0 +1,80 @@
+from dao import load_order_data_from_mysql
+import pandas as pd
+import numpy as np
+
+from itertools import combinations
+from dao.mysql_client import Mysql
+from tqdm import tqdm
+
+
+def build_co_occurence_matrix(order_data):
+    """
+    构建商户共现矩阵
+    """
+    # 获取所有商户的唯一列表
+    shops = order_data["BB_RETAIL_CUSTOMER_CODE"].unique()
+    num_shops = len(shops)
+    
+    # 创建商户到索引的映射
+    shops_to_index = {shop: idx for idx, shop in enumerate(shops)}
+    # 初始化共现矩阵(上三角部分)
+    co_occurrence_matrix = np.zeros((num_shops, num_shops), dtype=int)
+    
+    # 按照品规分组
+    grouped = order_data.groupby("PRODUCT_CODE")["BB_RETAIL_CUSTOMER_CODE"].apply(list)
+    
+    # 遍历每个品规的商户列表
+    for shop_in_product in grouped:
+        # 生成商户对
+        shop_pairs = combinations(shop_in_product, 2)
+        for shop1, shop2 in shop_pairs:
+            # 获取商户索引
+            idx1 = shops_to_index[shop1]
+            idx2 = shops_to_index[shop2]
+            # 更新共现矩阵
+            co_occurrence_matrix[idx1, idx2] += 1
+            co_occurrence_matrix[idx2, idx1] += 1
+    return co_occurrence_matrix, shops, shops_to_index
+
+def calculate_similarity_matrix(co_occurrence_matrix, order_data, shops_to_index):
+    """
+    使用向量计算商铺之间的相似度矩阵
+    """
+    # 计算每个商铺售卖品规的总次数
+    shop_counts = order_data.groupby("BB_RETAIL_CUSTOMER_CODE").size()
+    
+    # 将商户售卖次数转换为数组
+    counts = np.array([shop_counts[shop] for shop in shops_to_index.keys()])
+    
+    # 计算分母部分 (sqrt(count_i * count_j))
+    denominator = np.sqrt(np.outer(counts, counts))
+    
+    # 计算相似度矩阵
+    similarity_matrix = co_occurrence_matrix / denominator
+    
+    # 将对角线设置为1
+    np.fill_diagonal(similarity_matrix, 1.0)
+    
+    return similarity_matrix
+
+def save_matrix(matrix, shops, save_path):
+    """
+    保存共现矩阵
+    """
+    matrix_df = pd.DataFrame(matrix, index=shops, columns=shops)
+    matrix_df.to_csv(save_path, index=True, encoding="utf-8")
+    
+def calculate_similarity_and_save_results(order_data, similarity_matrix_save_path):
+    co_occurrence_matrix, shops, shops_to_index = build_co_occurence_matrix(order_data)
+    similarity_matrix = calculate_similarity_matrix(co_occurrence_matrix, order_data, shops_to_index)
+    save_matrix(similarity_matrix, shops, similarity_matrix_save_path)
+    
+if __name__ == "__main__":
+    co_occurrence_save_path = "./models/recall/itemCF/matrix/occurrence.csv"
+    similarity_matrix_save_path = "./models/recall/itemCF/matrix/similarity.csv"
+    # 从数据库中读取订单数据
+    order_data = load_order_data_from_mysql()
+    
+    calculate_similarity_and_save_results(order_data, similarity_matrix_save_path)
+    
+    

+ 80 - 0
models/recall/itemCF/user_item_score.py

@@ -0,0 +1,80 @@
+#!/usr/bin/env python
+# -*- encoding: utf-8 -*-
+'''
+@filename     : ShopScore.py
+@description     : 品规-商户-评分矩阵:品规(用户)对商铺(物品)的评分矩阵,将结果保存在score.csv文件中
+@time     : 2025/01/31/02
+@author     : Sherlock1011 & Min1027
+@Version     : 1.0
+'''
+
+
+from dao import load_order_data_from_mysql
+from decimal import Decimal
+
+# 算法封装成一个类
+class UserItemScore:
+    """TODO 1. 将结果保存到redis数据库中"""
+    def __init__(self):
+        self.weights = {
+            "MONTH6_SALE_QTY": Decimal(0.1),
+            "MONTH6_SALE_AMT": Decimal(0.1),
+            "MONTH6_GROSS_PROFIT_RATE": Decimal(0.03),
+            "MONTH6_SALE_QTY_YOY": Decimal(0.1),
+            "MONTH6_SALE_QTY_MOM": Decimal(0.1),
+            "MONTH6_SALE_AMT_YOY": Decimal(0.1),
+            "MONTH6_SALE_AMT_MOM": Decimal(0.1),
+            "ORDER_FULLORDR_RATE": Decimal(0.1),
+            "NEW_PRODUCT_ORDER_QTY_OCC": Decimal(0.03),
+            "LISTING_RATE": Decimal(0.1),
+            "OUT_STOCK_DAYS": Decimal(0.02),
+            "RETAIL_PRICE_INDEX": Decimal(0.02)
+        }
+
+    # 均值方差归一化函数
+    def standardize_column(self, column):
+        if(column.max() == column.min() and column.max() == 0):
+            return 0
+        elif (column.max() == column.min() and column.max() != 0):
+            return 1
+        else:
+            return (column - column.min()) / (column.max() - column.min())
+
+    # 按照品规分组归一化并计算评分
+    def calculate_heart_per_product(self, group):
+        for column in self.weights.keys():
+            if column == "OUT_STOCK_DAYS":
+                group[column] = 1 - self.standardize_column(group[column])
+            else:
+                group[column] = self.standardize_column(group[column])
+        group["SCORE"] = group.apply(
+            lambda row: sum(Decimal(row[col]) * weight for col, weight in self.weights.items()) * 100, axis=1
+        )
+        return group
+
+    # 主算法函数:计算品规-商铺评分矩阵
+    def score(self, order_data):
+       
+
+        # 应用分组计算
+        df_result = order_data.groupby("PRODUCT_CODE").apply(self.calculate_heart_per_product).reset_index(drop=True)
+        df_result = df_result.sort_values(by=["PRODUCT_CODE", "SCORE"], ascending=[True, False])
+
+        # 选择要保存的列
+        return df_result[['PRODUCT_CODE', 'BB_RETAIL_CUSTOMER_CODE', 'SCORE']]
+ 
+if __name__ == "__main__":
+    # 创建一个 ItemCF 类的实例
+    item_cf_algorithm = UserItemScore()
+    
+    # 读取数据
+    order_data = load_order_data_from_mysql()
+
+    # 调用算法
+    scores = item_cf_algorithm.score(order_data)
+    
+    scores_path = "./models/recall/itemCF/matrix/score.csv"
+    
+    # 保存评分结果到csv文件
+    scores.to_csv(scores_path, index=False, encoding="utf-8")
+    

+ 46 - 0
requirements.txt

@@ -0,0 +1,46 @@
+asttokens==3.0.0
+async-timeout==5.0.1
+comm==0.2.2
+debugpy==1.8.12
+decorator==5.1.1
+et_xmlfile==2.0.0
+exceptiongroup==1.2.2
+executing==2.2.0
+filelock==3.17.0
+greenlet==3.1.1
+ipykernel==6.29.5
+ipython==8.31.0
+jedi==0.19.2
+joblib==1.4.2
+jupyter_client==8.6.3
+jupyter_core==5.7.2
+matplotlib-inline==0.1.7
+nest-asyncio==1.6.0
+numpy==2.2.2
+openpyxl==3.1.5
+packaging==24.2
+pandas==2.2.3
+parso==0.8.4
+pexpect==4.9.0
+platformdirs==4.3.6
+prompt_toolkit==3.0.50
+psutil==6.1.1
+ptyprocess==0.7.0
+pure_eval==0.2.3
+Pygments==2.19.1
+PyMySQL==1.1.1
+python-dateutil==2.9.0.post0
+pytz==2024.2
+PyYAML==6.0.2
+pyzmq==26.2.1
+redis==5.2.1
+scipy==1.15.1
+six==1.17.0
+SQLAlchemy==2.0.37
+stack-data==0.6.3
+tornado==6.4.2
+tqdm==4.67.1
+traitlets==5.14.3
+typing_extensions==4.12.2
+tzdata==2025.1
+wcwidth==0.2.13

+ 2 - 0
utils/__init__.py

@@ -0,0 +1,2 @@
+#!/usr/bin/env python3
+# -*- coding:utf-8 -*-

+ 83 - 0
utils/mock_data_to_database.py

@@ -0,0 +1,83 @@
+#!/usr/bin/env python
+# -*- encoding: utf-8 -*-
+'''
+@filename     : mock_data_to_database.py
+@description     : 将mock数据写入到数据库中
+@time     : 2025/01/31/00
+@author     : Sherlock1011 & Min1027
+@Version     : 1.0
+'''
+from dao.mysql_client import Mysql
+
+import pandas as pd
+from sqlalchemy import Column, Integer, VARCHAR, Float, DECIMAL
+from sqlalchemy.ext.declarative import declarative_base
+
+# 定义数据库表结构
+Base = declarative_base()
+
+class MockOrder(Base):
+    __tablename__ = "mock_order"
+    id = Column(Integer, primary_key=True, autoincrement=True)  # 添加主键列
+    BB_RETAIL_CUSTOMER_CODE = Column(VARCHAR(50))
+    PRODUCT_CODE = Column(VARCHAR(50))
+    MONTH6_SALE_QTY = Column(DECIMAL(18, 6))
+    MONTH6_SALE_AMT = Column(DECIMAL(18, 6))
+    MONTH6_GROSS_PROFIT_RATE = Column(DECIMAL(18, 6))
+    MONTH6_SALE_QTY_YOY = Column(DECIMAL(18, 6))
+    MONTH6_SALE_QTY_MOM = Column(DECIMAL(18, 6))
+    MONTH6_SALE_AMT_YOY = Column(DECIMAL(18, 6))
+    MONTH6_SALE_AMT_MOM = Column(DECIMAL(18, 6))
+    ORDER_FULLORDR_RATE = Column(DECIMAL(18, 6))
+    CUSTOMER_REPURCHASE_RATE = Column(DECIMAL(18, 6))
+    NEW_PRODUCT_ORDER_QTY_OCC = Column(DECIMAL(18, 6))
+    LISTING_RATE = Column(DECIMAL(18, 6))
+    OUT_STOCK_DAYS = Column(DECIMAL(18, 6))
+    RETAIL_PRICE_INDEX = Column(DECIMAL(18, 6))
+    
+def insert_data(db, data_path):
+    df = pd.read_excel(data_path)
+    session = db.create_session()
+    try:
+        df.columns = ['BB_RETAIL_CUSTOMER_CODE', 
+                      'PRODUCT_CODE', 
+                      'MONTH6_SALE_QTY', 
+                      'MONTH6_SALE_AMT', 
+                      'MONTH6_GROSS_PROFIT_RATE',
+                      'MONTH6_SALE_QTY_YOY', 
+                      'MONTH6_SALE_QTY_MOM', 
+                      'MONTH6_SALE_AMT_YOY', 
+                      'MONTH6_SALE_AMT_MOM', 
+                      'ORDER_FULLORDR_RATE',
+                      'CUSTOMER_REPURCHASE_RATE', 
+                      'NEW_PRODUCT_ORDER_QTY_OCC', 
+                      'LISTING_RATE', 
+                      'OUT_STOCK_DAYS', 
+                      'RETAIL_PRICE_INDEX',
+                      ]  # 确保列名匹配
+        session.bulk_insert_mappings(MockOrder, df.to_dict(orient='records'))
+        session.commit()
+        print("数据成功插入数据库")
+        
+    except Exception as e:
+        session.rollback()
+        print(f"插入数据时出错: {e}")
+        
+    finally:
+        session.close()
+    
+        
+if __name__ == "__main__":
+    data_path = "./data/order.xlsx"
+    # 创建数据库链接
+    db = Mysql()
+    
+
+    # 创建表(如果不存在)
+    Base.metadata.create_all(db.engine)
+    insert_data(db, data_path)
+    
+    db.closed()
+    
+    
+    

+ 144 - 0
烟草模型部署文档.md

@@ -0,0 +1,144 @@
+# 烟草推荐模型部署文档
+
+## 1、配置文件说明:
+
+- ### database_config.yaml  这个是数据配置文件
+    
+
+```
+mysql:
+  host: 'rm-t4n6rz18y4t5x47y70o.mysql.singapore.rds.aliyuncs.com'
+  port: 3036
+  db: 'brand_cultivation'
+  user: 'xxxxx'
+  passwd: 'xxxxx'
+
+redis:
+  host: 'r-t4nb4n9i8je7u6ogk1pd.redis.singapore.rds.aliyuncs.com'
+  port: 5000
+  db: 10
+  passwd: 'xxxxx'
+```
+
+- ### crontab 定时任务配置文件
+    
+
+```
+# START CRON JOB
+1 2 * * * python /app/app.pyc --run_all
+# END CRON JOB
+```
+
+ 
+
+## 2、模型启动配置说明:
+
+### app.py
+
+```
+    parser.add_argument("--run_all", action='store_true')
+    parser.add_argument("--run_hot", action='store_true')
+    parser.add_argument("--run_itemcf", action='store_true')
+    parser.add_argument("--run_itemcf_inference", action='store_true'
+```
+
+### 总共有4种启动模式分别是:
+
+1\. 启动热度召回和协同过滤  
+        2. 启动热度召回  
+        3. 启动协同过滤  
+        4. 启动系统过滤推理
+
+## 3、模型docker运行配置说明:
+
+### docker镜像是:registry.cn-hangzhou.aliyuncs.com/hexiaoshi/brandcultivation:0.0.1
+
+```yaml
+docker run --name BrandCultivation -d -v /export/brandcultivation/crontab:/etc/cron.d/crontab -v /export/brandcultivation/database_config.yaml:/app/config/database_config.yaml  registry.cn-hangzhou.aliyuncs.com/hexiaoshi/brandcultivation:0.0.1
+```
+
+## 4、模型kubernetes运行配置说明
+
+yaml文件如下:
+
+```yaml
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: brandcultivation
+  namespace: default
+  labels:
+    app: brandcultivation
+spec:
+  selector:
+    matchLabels:
+      app: brandcultivation
+  replicas: 1
+  strategy:
+    rollingUpdate:
+      maxSurge: 25%
+      maxUnavailable: 25%
+    type: RollingUpdate
+  template:
+    metadata:
+      labels:
+        app: brandcultivation
+    spec:
+      containers:
+      - name: brandcultivation
+        image: registry.cn-hangzhou.aliyuncs.com/hexiaoshi/brandcultivation:0.0.1
+        imagePullPolicy: IfNotPresent
+        resources:
+          requests:
+            cpu: 4000m
+            memory: 4096Mi
+            ephemeral-storage: 20Gi             
+          limits:
+            cpu: 4000m
+            memory: 4096Mi
+            ephemeral-storage: 20Gi            
+        ports:
+        - containerPort:  80
+          name: brandcultivation
+        volumeMounts:
+        - name: localtime
+          mountPath: /etc/localtime
+        - name: config
+          mountPath: /app/config/database_config.yaml
+          subPath: database_config.yaml
+        - name: config
+          mountPath: /etc/cron.d/crontab
+          subPath: crontab          
+        - name: localtime
+          hostPath:
+            path: /usr/share/zoneinfo/Asia/Shanghai
+        - name: config
+          configMap:
+            name: brandcultivation
+      restartPolicy: Always
+---
+kind: ConfigMap
+apiVersion: v1
+metadata:
+  name: brandcultivation
+  namespace: default
+data:
+  database_config.yaml: |
+    mysql:
+      host: 'rm-t4n6rz18y4t5x47y70o.mysql.singapore.rds.aliyuncs.com'
+      port: 3036
+      db: 'brand_cultivation'
+      user: 'BrandCultivation'
+      passwd: '8BfWBc18NBXl#CMd'
+
+    redis:
+      host: 'r-t4nb4n9i8je7u6ogk1pd.redis.singapore.rds.aliyuncs.com'
+      port: 5000
+      db: 10
+      passwd: 'gHmNkVBd88sZybj'
+  crontab: |
+    # START CRON JOB
+    1 2 * * * python /app/app.pyc
+    # END CRON JOB
+
+```