| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 |
- #!/usr/bin/env python3
- # -*- coding:utf-8 -*-
- from sqlalchemy import create_engine, text
- from sqlalchemy.dialects.mysql import pymysql
- from sqlalchemy.orm import sessionmaker
- from sqlalchemy.ext.declarative import declarative_base
- from config import database_config
- import pandas as pd
- class Mysql(object):
- def __init__(self):
- host = database_config.MYSQL_HOST
- port = database_config.MYSQL_PORT
- user = database_config.MYSQL_USER
- passwd = database_config.MYSQL_PASSWD
- dbname = database_config.MYSQL_DB
-
- # 通过连接池创建engine
- self.engine = create_engine(
- self._connect(host, port, user, passwd, dbname),
- pool_size=10, # 设置连接池大小
- max_overflow=20, # 超过连接池大小时的额外连接数
- pool_recycle=3600 # 回收连接时间
- )
- self._DBSession = sessionmaker(bind=self.engine)
- def _connect(self, host, port, user, pwd, db):
- try:
- client = "mysql+pymysql://" + user + ":" + pwd + "@" + host + ":" + str(port) + "/" + db
- return client
- except Exception as e:
- raise ConnectionError(f"failed to create connection string: {e}")
-
- def create_session(self):
- """创建返回一个新的数据库session"""
- return self._DBSession()
- def closed(self):
- """关闭连接,回收资源"""
- self.engine.dispose()
- if __name__ == '__main__':
- client = Mysql()
- # 创建会话
- session = client.create_session()
-
- # 使用 session 执行查询等操作
- try:
- results = session.execute(
- text("select * from tads_brandcul_cust_order")
- ).mappings().all()
- # 将结果转换为DataFrame
- df = pd.DataFrame(results) # 提取列名
- print(df)
- finally:
- session.close()
|