from config import laod_database_config import pandas as pd import re from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import SQLAlchemyError from tqdm import tqdm cfgs = laod_database_config() def replace_select_content(sql): """ 将SELECT和FROM之间的内容替换为SELECT COUNT(原内容) FROM """ pattern = r'SELECT\s+(.*?)\s+FROM' def replace_func(match): content = match.group(1) # 获取SELECT和FROM之间的内容 # 如果是DISTINCT或ALL等特殊关键字,需要特殊处理 if re.match(r'^(DISTINCT|ALL)\s+', content, re.IGNORECASE): # 保留DISTINCT/ALL关键字 return f'SELECT COUNT({content}) FROM' else: return f'SELECT COUNT(*) FROM' # 使用re.sub进行替换,re.IGNORECASE忽略大小写 result = re.sub(pattern, replace_func, sql, flags=re.IGNORECASE) return result class MySqlDatabaseHelper: _instance = None def __new__(cls): if not cls._instance: cls._instance = super(MySqlDatabaseHelper, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self._host = cfgs['mysql']['host'] self._port = cfgs['mysql']['port'] self._user = cfgs['mysql']['user'] self._passwd = cfgs['mysql']['passwd'] self._dbname = cfgs['mysql']['db'] self.connect_database() self._initialized = True def connect_database(self): # 创建数据库连接 try: conn = "mysql+pymysql://" + self._user + ":" + self._passwd + "@" + self._host + ":" + str(self._port) + "/" + self._dbname except Exception as e: raise ConnectionAbortedError(f"failed to create connection string: {e}") # 通过连接池创建engine self.engine = create_engine( conn, pool_size=20, # 设置连接池大小 max_overflow=30, # 超过连接池大小时的额外连接数 pool_recycle=1800, # 回收连接时间 pool_pre_ping=True, # 防止断开连接 isolation_level="READ COMMITTED" # 降低隔离级别 ) self._DBSession = sessionmaker(bind=self.engine) def load_data_with_page(self, query, params, page_size=100000): """分页查询数据""" data = pd.DataFrame() count_query = text(replace_select_content(query)) query += " LIMIT :limit OFFSET :offset" query = text(query) print(count_query) # 获取总行数 total_rows = self.fetch_one(count_query, params)[0] page = 1 with tqdm(total=total_rows, desc="Loading data", unit="rows") as pbar: # 初始化进度条 while True: offset = (page - 1) * page_size # 计算偏移量 params["limit"] = page_size params["offset"] = offset df = pd.DataFrame(self.fetch_all(query, params)) if df.empty: break data = pd.concat([data, df], ignore_index=True) # 更新进度条 pbar.update(len(df)) # 更新进度条的行数 page += 1 return data def fetch_all(self, query, params=None): """执行SQL查询并返回所有结果""" session = self._DBSession() try: results = session.execute(query, params or {}).fetchall() return results except SQLAlchemyError as e: session.rollback() print(f"error: {e}") finally: session.close() def fetch_one(self, query, params=None): """执行SQL查询并返回单条结果""" session = self._DBSession() try: result = session.execute(query, params or {}).fetchone() return result except SQLAlchemyError as e: session.rollback() print(f"error: {e}") finally: session.close() def insert_data(self, table_name, data_dict): """插入单条数据到指定表""" if not data_dict: return 0 columns = ", ".join(data_dict.keys()) values = ", ".join([f":{key}" for key in data_dict.keys()]) query = text(f"INSERT INTO {table_name} ({columns}) VALUES ({values})") session = self._DBSession() try: result = session.execute(query, data_dict) session.commit() return result.rowcount except SQLAlchemyError as e: session.rollback() print(f"Error inserting data: {e}") return 0 finally: session.close() def update_data(self, table_name, update_dict, conditions, condition_params=None): """更新表中符合条件的数据""" if not update_dict: return 0 set_clause = ", ".join([f"{key} = :{key}" for key in update_dict.keys()]) if len(conditions) == 1: where_clause = f"WHERE {conditions[0]}" elif len(conditions) > 1: where_clause = f"WHERE {' AND '.join(conditions)}" else: where_clause = "" query = text(f"UPDATE {table_name} SET {set_clause} {where_clause}") params = update_dict.copy() if condition_params: params.update(condition_params) session = self._DBSession() try: result = session.execute(query, params) session.commit() return result.rowcount except SQLAlchemyError as e: session.rollback() print(f"Error updating data: {e}") return 0 finally: session.close() def execute_query(self, query, params=None): """执行SQL语句 (无返回值, 如INSERT, UPDATE, DELETE)""" session = self._DBSession() try: session.execute(query, params or {}) session.commit() except SQLAlchemyError as e: session.rollback() print(f"Error: {e}") finally: session.close()