#!/usr/bin/env python3 # -*- coding:utf-8 -*- import os from sqlalchemy import create_engine, text from sqlalchemy.dialects.mysql import pymysql from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base from config import database_config import pandas as pd import sys class Mysql(object): def __init__(self): host = database_config.MYSQL_HOST port = database_config.MYSQL_PORT user = database_config.MYSQL_USER passwd = database_config.MYSQL_PASSWD dbname = database_config.MYSQL_DB # 通过连接池创建engine self.engine = create_engine( self._connect(host, port, user, passwd, dbname), pool_size=10, # 设置连接池大小 max_overflow=20, # 超过连接池大小时的额外连接数 pool_recycle=3600 # 回收连接时间 ) self._DBSession = sessionmaker(bind=self.engine) def _connect(self, host, port, user, pwd, db): try: client = "mysql+pymysql://" + user + ":" + pwd + "@" + host + ":" + str(port) + "/" + db return client except Exception as e: raise ConnectionError(f"failed to create connection string: {e}") def create_session(self): """创建返回一个新的数据库session""" return self._DBSession() def fetch_data_with_pagination(self, tablename, query_text, page=1, page_size=1000): """分页查询数据""" offset = (page - 1) * page_size # 计算偏移量 query = text(f"select {query_text} from {tablename} LIMIT :limit OFFSET :offset") with self.create_session() as session: results = session.execute(query, {"limit": page_size, "offset": offset}).fetchall() df = pd.DataFrame(results) return df def load_data(self, tablename, query_text, page=1, page_size=1000): # 创建一个空的DataFrame用于存储所有数据 total_df = pd.DataFrame() try: while True: df = self.fetch_data_with_pagination(tablename, query_text, page, page_size) if df.empty: break total_df = pd.concat([total_df, df], ignore_index=True) print(f"Page {page}: Retrieved {len(df)} rows, Total rows so far: {len(total_df)}") page += 1 # 继续下一页 except Exception as e: print(f"Error: {e}") return None finally: self.closed() return total_df def closed(self): """关闭连接,回收资源""" self.engine.dispose() if __name__ == '__main__': client = Mysql() tablename = "mock_order" # 设置分页参数 page = 1 page_size = 1000 query_text = '*' client.load_data("mock_order", query_text, page, page_size)