Bladeren bron

完成数据库构建即读取操作

yangzeyu 3 maanden geleden
bovenliggende
commit
b3e5f07ab7
9 gewijzigde bestanden met toevoegingen van 435 en 0 verwijderingen
  1. 4 0
      .gitignore
  2. 5 0
      config/__init__.py
  3. 6 0
      config/config.py
  4. 6 0
      config/database_config.yaml
  5. 5 0
      database/__init__.py
  6. 65 0
      database/dao/mysql_dao.py
  7. 191 0
      database/db/mysql.py
  8. 118 0
      database/feature.py
  9. 35 0
      tets.py

+ 4 - 0
.gitignore

@@ -0,0 +1,4 @@
+.idea/
+.vscode/
+__pycache__/
+*.pyc

+ 5 - 0
config/__init__.py

@@ -0,0 +1,5 @@
+from config.config import laod_database_config
+
+__all__ = [
+    "laod_database_config"
+]

+ 6 - 0
config/config.py

@@ -0,0 +1,6 @@
+import yaml
+
+def laod_database_config():
+    with open('./config/database_config.yaml', encoding='utf-8') as file:
+        config = yaml.safe_load(file)
+    return config

+ 6 - 0
config/database_config.yaml

@@ -0,0 +1,6 @@
+mysql:
+  host: 'rm-t4n6rz18y4t5x47y70o.mysql.singapore.rds.aliyuncs.com'
+  port: 3036
+  db: 'brand_cultivation'
+  user: 'BrandCultivation'
+  passwd: '8BfWBc18NBXl#CMd'

+ 5 - 0
database/__init__.py

@@ -0,0 +1,5 @@
+from .db.mysql import MySqlDatabaseHelper
+
+__all__ = [
+    "MySqlDatabaseHelper"
+]

+ 65 - 0
database/dao/mysql_dao.py

@@ -0,0 +1,65 @@
+from database import  MySqlDatabaseHelper
+from sqlalchemy import text
+import pandas as pd
+
+class MySqlDao:
+    _instance = None
+    
+    def __new__(cls):
+        if not cls._instance:
+            cls._instance = super(MySqlDao, cls).__new__(cls)
+            cls._instance._initialized = False
+        return cls._instance
+    
+    def __init__(self):
+        if self._initialized:
+            return
+        
+        self.db_helper = MySqlDatabaseHelper()
+        
+        self._cust_table_name = "tads_brandcul_retail_cust_label"              # 商户信息表
+        self._analysis_table_name = "tads_brandcul_analysis_index"             # 指标分析表
+        
+        self.cust_table_dao = self.CustTableDao(self.db_helper, self._cust_table_name)
+        
+        self._initialized = True
+        
+    class CustTableDao:
+        """商户数据操作类"""
+        def __init__(self, db_helper: MySqlDatabaseHelper, tablename):
+            self.db_helper = db_helper
+            self.tablename = tablename
+            
+        def load_data(self, features, corp_uuid):
+            """读取商户数据"""
+            features_column = ",".join(features)
+            
+            query = f"SELECT {features_column} FROM {self.tablename} WHERE corp_uuid = :corp_uuid"
+            params = {"corp_uuid": corp_uuid}
+            
+            data = self.db_helper.load_data_with_page(query, params)
+            
+            return data
+        
+        def get_column_unique_value(self, column, corp_uuid):
+            """获取指定列的唯一值列表"""
+            query = f"SELECT {column} FROM {self.tablename} WHERE corp_uuid = :corp_uuid"
+            params = {"corp_uuid", corp_uuid}
+            
+            data = self.db_helper.load_data_with_page(query, params)
+            
+            return data
+            
+            
+            
+        
+        
+if __name__ == '__main__':
+    features = ["cust_code", "cust_name", "busi_place_area_section"]
+    
+    dao = MySqlDao()
+    corp_uuid = "00000000000000000000000011440101"
+    
+    data = dao.cust_table_dao.get_column_unique_value("busi_place_area_section", corp_uuid)
+    print(data)
+    

+ 191 - 0
database/db/mysql.py

@@ -0,0 +1,191 @@
+from config import laod_database_config
+import pandas as pd
+import re
+from sqlalchemy import create_engine, text
+from sqlalchemy.orm import sessionmaker
+from sqlalchemy.exc import SQLAlchemyError
+from tqdm import tqdm
+
+cfgs = laod_database_config()
+
+def replace_select_content(sql):
+    """
+    将SELECT和FROM之间的内容替换为SELECT COUNT(原内容) FROM
+    """
+    pattern = r'SELECT\s+(.*?)\s+FROM'
+    
+    def replace_func(match):
+        content = match.group(1)  # 获取SELECT和FROM之间的内容
+        # 如果是DISTINCT或ALL等特殊关键字,需要特殊处理
+        if re.match(r'^(DISTINCT|ALL)\s+', content, re.IGNORECASE):
+            # 保留DISTINCT/ALL关键字
+            return f'SELECT COUNT({content}) FROM'
+        else:
+            return f'SELECT COUNT(*) FROM'
+    
+    # 使用re.sub进行替换,re.IGNORECASE忽略大小写
+    result = re.sub(pattern, replace_func, sql, flags=re.IGNORECASE)
+    return result
+
+class MySqlDatabaseHelper:
+    _instance = None
+    
+    def __new__(cls):
+        if not cls._instance:
+            cls._instance = super(MySqlDatabaseHelper, cls).__new__(cls)
+            cls._instance._initialized = False
+        return cls._instance
+        
+    def __init__(self):
+        if self._initialized:
+            return
+        
+        self._host = cfgs['mysql']['host']
+        self._port = cfgs['mysql']['port']
+        self._user = cfgs['mysql']['user']
+        self._passwd = cfgs['mysql']['passwd']
+        self._dbname = cfgs['mysql']['db']
+        
+        self.connect_database()
+        self._initialized = True
+        
+    def connect_database(self):
+        # 创建数据库连接
+        try:
+            conn = "mysql+pymysql://" + self._user + ":" + self._passwd + "@" + self._host + ":" + str(self._port) + "/" + self._dbname
+        except Exception as e:
+            raise ConnectionAbortedError(f"failed to create connection string: {e}")
+        
+        # 通过连接池创建engine
+        self.engine = create_engine(
+            conn,
+            pool_size=20, # 设置连接池大小
+            max_overflow=30, # 超过连接池大小时的额外连接数
+            pool_recycle=1800, # 回收连接时间
+            pool_pre_ping=True, # 防止断开连接
+            isolation_level="READ COMMITTED" # 降低隔离级别
+        )
+        
+        self._DBSession = sessionmaker(bind=self.engine)
+    
+    def load_data_with_page(self, query, params, page_size=100000):
+        """分页查询数据"""
+        data = pd.DataFrame()
+        count_query = text(replace_select_content(query))
+        query += " LIMIT :limit OFFSET :offset"
+        query = text(query)
+        print(count_query)
+        # 获取总行数
+        total_rows = self.fetch_one(count_query, params)[0]
+
+        page = 1
+        with tqdm(total=total_rows, desc="Loading data", unit="rows") as pbar:  # 初始化进度条
+            while True:
+                offset = (page - 1) * page_size  # 计算偏移量
+                params["limit"] = page_size
+                params["offset"] = offset
+
+                df = pd.DataFrame(self.fetch_all(query, params))
+                if df.empty:
+                    break
+                data = pd.concat([data, df], ignore_index=True)
+            
+                # 更新进度条
+                pbar.update(len(df))  # 更新进度条的行数
+            
+                page += 1
+        return data
+        
+        
+    def fetch_all(self, query, params=None):
+        """执行SQL查询并返回所有结果"""
+        session = self._DBSession()
+        try:
+            results = session.execute(query, params or {}).fetchall()
+            return results
+        except SQLAlchemyError as e:
+            session.rollback()
+            print(f"error: {e}")
+        finally:
+            session.close()
+            
+    def fetch_one(self, query, params=None):
+        """执行SQL查询并返回单条结果"""
+        session = self._DBSession()
+        try:
+            result = session.execute(query, params or {}).fetchone()
+            return result
+            
+        except SQLAlchemyError as e:
+            session.rollback()
+            print(f"error: {e}")
+        finally:
+            session.close()
+            
+    def insert_data(self, table_name, data_dict):
+        """插入单条数据到指定表"""
+        if not data_dict:
+            return 0
+        
+        columns = ", ".join(data_dict.keys())
+        values = ", ".join([f":{key}" for key in data_dict.keys()])
+        query = text(f"INSERT INTO {table_name} ({columns}) VALUES ({values})")
+        
+        session = self._DBSession()
+        
+        try:
+            result = session.execute(query, data_dict)
+            session.commit()
+            return result.rowcount
+        
+        except SQLAlchemyError as e:
+            session.rollback()
+            print(f"Error inserting data: {e}")
+            return 0
+        finally:
+            session.close()
+            
+    def update_data(self, table_name, update_dict, conditions, condition_params=None):
+        """更新表中符合条件的数据"""
+        if not update_dict:
+            return 0
+        
+        set_clause = ", ".join([f"{key} = :{key}" for key in update_dict.keys()])
+        
+        if len(conditions) == 1:
+            where_clause = f"WHERE {conditions[0]}"
+        elif len(conditions) > 1:
+            where_clause = f"WHERE {' AND '.join(conditions)}"
+        else:
+            where_clause = ""
+        
+        query = text(f"UPDATE {table_name} SET {set_clause} {where_clause}")
+        
+        params = update_dict.copy()
+        if condition_params:
+            params.update(condition_params)
+            
+        session = self._DBSession()
+        try:
+            result = session.execute(query, params)
+            session.commit()
+            return result.rowcount
+        except SQLAlchemyError as e:
+            session.rollback()
+            print(f"Error updating data: {e}")
+            return 0
+        
+        finally:
+            session.close()
+    
+    def execute_query(self, query, params=None):
+        """执行SQL语句 (无返回值, 如INSERT, UPDATE, DELETE)"""
+        session = self._DBSession()
+        try:
+            session.execute(query, params or {})
+            session.commit()
+        except SQLAlchemyError as e:
+            session.rollback()
+            print(f"Error: {e}")
+        finally:
+            session.close()

+ 118 - 0
database/feature.py

@@ -0,0 +1,118 @@
+class CustConfig:
+    FEATURES_COLUMNS = [
+        "cust_uuid",                    # 客户唯一标识
+        "cust_code",                    # 零售户编码
+        "busi_place_area_section",      # 营业面积区间
+        "rent_section",                 # 租金区间
+        "rent_price_section",           # 租金单价区间
+        "busi_open",                    # 营业开始时间
+        "busi_close",                   # 营业结束时间
+        "is_chain_storename",           # 是否连锁
+        "criterion_codename",           # 守法经营情况
+        "market_info_codename",         # 市场采集点情况
+        "tag_codename",                 # 卷烟价格执行情况
+        "cooperate_codename",           # 配合程度
+        "store_appearance_name",        # 店面形象
+        "position_codename",            # 商圈名称
+        "sub_position_codename",        # 次级商圈名称
+        "zone_appraise_name",           # 地段评价
+        "choose_road_name",             # 路段评价
+        "choose_address_name",          # 选址
+        "area_position_type_name",      # 区域位置划分
+        "area_func_type_name",          # 区域功能划分
+        "community_func_type_name",     # 社区功能划分
+        "rate_pay_type_name",           # 纳税性质
+        "order_cycle_type_name",        # 订货周期
+        "is_modern_terminalname",       # 是否现代终端
+        "modern_terminal_name",         # 现代终端类型
+        "cooperate_type_name",          # 加盟类型
+        "terminal_star_name",           # 终端星级
+        "star_terminal_name",           # 星级终端类型
+        "appearance_span_section",      # 门面跨度区间
+        "upholster_name",               # 店内装潢名称
+        "shop_feature_name",            # 门店特色名称
+        "shop_char_type_name",          # 经营特色名称
+        "has_taste_name",               # 是否卷烟品吸区
+        "show_area_section",            # 卷烟成列面积区间
+        "sign_status_name",             # 是否有店招门头(店招门头状态)
+        "shopsunny_vi_name",            # 现代终端VI门头名称
+        "header_name",                  # 门头标识
+        "counter_status_name",          # 地柜状态
+        "counter_number",               # 地柜个数
+        "counter_put_type_name",        # 地柜陈列样式
+        "back_counter_status_name",     # 背柜状态
+        "back_counter_put_type_name",   # 背柜陈列样式
+        "back_counter_style_name",      # 背柜样式
+        "back_counter_number",          # 背柜个数
+        "back_counter_has_show_name",   # 背柜条烟陈列区状态
+        "legal_person_gender",          # 法人性别
+        "legal_education_name",         # 法人文化程度
+        "legal_is_cpc_member",          # 法人是否为党员
+        "operator_person_gender",       # 经营者性别
+        "operator_education_name",      # 经营者文化程度
+        "operator_is_cpc_member",       # 经营者是否为党员
+        "market_type_name",             # 市场类型名称
+        "busi_place_codename",          # 经营业态名称
+        "sub_busi_codename",            # 业态细分名称
+        "sub_market_type_name",         # 城乡分类名称
+        "creditclass_name",             # 信用等级名称
+    ]
+    
+    FEATURES_MAP = {
+        "cust_uuid":                    "客户唯一标识",
+        "cust_code":                    "零售户编码",
+        "busi_place_area_section":      "营业面积区间",
+        "rent_section":                 "租金区间",
+        "rent_price_section":           "租金单价区间",
+        "busi_open":                    "营业开始时间",
+        "busi_close":                   "营业结束时间",
+        "is_chain_storename":           "是否连锁",
+        "criterion_codename":           "守法经营情况",
+        "market_info_codename":         "市场采集点情况",
+        "tag_codename":                 "卷烟价格执行情况",
+        "cooperate_codename":           "配合程度",
+        "store_appearance_name":        "店面形象",
+        "position_codename":            "商圈名称",
+        "sub_position_codename":        "次级商圈名称",
+        "zone_appraise_name":           "地段评价",
+        "choose_road_name":             "路段评价",
+        "choose_address_name":          "选址",
+        "area_position_type_name":      "区域位置划分",
+        "area_func_type_name":          "区域功能划分",
+        "community_func_type_name":     "社区功能划分",
+        "rate_pay_type_name":           "纳税性质",
+        "order_cycle_type_name":        "订货周期",
+        "is_modern_terminalname":       "是否现代终端",
+        "modern_terminal_name":         "现代终端类型",
+        "cooperate_type_name":          "加盟类型",
+        "terminal_star_name":           "终端星级",
+        "star_terminal_name":           "星级终端类型",
+        "appearance_span_section":      "门面跨度区间",
+        "upholster_name":               "店内装潢名称",
+        "shop_feature_name":            "门店特色名称",
+        "shop_char_type_name":          "经营特色名称",
+        "has_taste_name":               "是否卷烟品吸区",
+        "show_area_section":            "卷烟成列面积区间",
+        "sign_status_name":             "是否有店招门头(店招门头状态)",
+        "shopsunny_vi_name":            "现代终端VI门头名称",
+        "header_name":                  "门头标识",
+        "counter_status_name":          "地柜状态",
+        "counter_number":               "地柜个数",
+        "counter_put_type_name":        "地柜陈列样式",
+        "back_counter_status_name":     "背柜状态",
+        "back_counter_put_type_name":   "背柜陈列样式",
+        "back_counter_style_name":      "背柜样式",
+        "back_counter_number":          "背柜个数",
+        "back_counter_has_show_name":   "背柜条烟陈列区状态",
+        "legal_person_gender":          "法人性别",
+        "legal_education_name":         "法人文化程度",
+        "legal_is_cpc_member":          "法人是否为党员",
+        "operator_person_gender":       "经营者性别",
+        "operator_education_name":      "经营者文化程度",
+        "operator_is_cpc_member":       "经营者是否为党员",
+        "market_type_name":             "市场类型名称",
+        "busi_place_codename":          "经营业态名称",
+        "sub_busi_codename":            "业态细分名称",
+        "sub_market_type_name":         "城乡分类名称",
+        "creditclass_name":             "信用等级名称"
+    }

+ 35 - 0
tets.py

@@ -0,0 +1,35 @@
+import re
+
+def replace_select_content(sql):
+    """
+    将SELECT和FROM之间的内容替换为SELECT COUNT(原内容) FROM
+    """
+    pattern = r'SELECT\s+(.*?)\s+FROM'
+    
+    def replace_func(match):
+        content = match.group(1)  # 获取SELECT和FROM之间的内容
+        # 如果是DISTINCT或ALL等特殊关键字,需要特殊处理
+        if re.match(r'^(DISTINCT|ALL)\s+', content, re.IGNORECASE):
+            # 保留DISTINCT/ALL关键字
+            return f'SELECT COUNT({content}) FROM'
+        else:
+            return f'SELECT COUNT({content}) FROM'
+    
+    # 使用re.sub进行替换,re.IGNORECASE忽略大小写
+    result = re.sub(pattern, replace_func, sql, flags=re.IGNORECASE)
+    return result
+
+# 测试用例
+test_sqls = [
+    "SELECT name FROM users",
+    "SELECT name, age, high FROM users WHERE age > 20",
+    "SELECT DISTINCT name FROM users",
+    "SELECT * FROM employees",
+    "select id, name from students where grade = 'A'",
+    "SELECT name FROM (SELECT * FROM users) as t",
+]
+
+for sql in test_sqls:
+    result = replace_select_content(sql)
+    print(f"原始SQL: {sql}")
+    print(f"替换后: {result}\n")