#!/usr/bin/env python3 # -*- coding:utf-8 -*- from sqlalchemy import create_engine, text from sqlalchemy.dialects.mysql import pymysql from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base from config import database_config import pandas as pd class Mysql(object): def __init__(self): host = database_config.MYSQL_HOST port = database_config.MYSQL_PORT user = database_config.MYSQL_USER passwd = database_config.MYSQL_PASSWD dbname = database_config.MYSQL_DB # 通过连接池创建engine self.engine = create_engine( self._connect(host, port, user, passwd, dbname), pool_size=10, # 设置连接池大小 max_overflow=20, # 超过连接池大小时的额外连接数 pool_recycle=3600 # 回收连接时间 ) self._DBSession = sessionmaker(bind=self.engine) def _connect(self, host, port, user, pwd, db): try: client = "mysql+pymysql://" + user + ":" + pwd + "@" + host + ":" + str(port) + "/" + db return client except Exception as e: raise ConnectionError(f"failed to create connection string: {e}") def create_session(self): """创建返回一个新的数据库session""" return self._DBSession() def closed(self): """关闭连接,回收资源""" self.engine.dispose() if __name__ == '__main__': client = Mysql() # 创建会话 session = client.create_session() # 使用 session 执行查询等操作 try: results = session.execute( text("select * from tads_brandcul_cust_order") ).mappings().all() # 将结果转换为DataFrame df = pd.DataFrame(results) # 提取列名 print(df) finally: session.close()