mysql_client.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. #!/usr/bin/env python3
  2. # -*- coding:utf-8 -*-
  3. from sqlalchemy import create_engine, text
  4. from sqlalchemy.dialects.mysql import pymysql
  5. from sqlalchemy.orm import sessionmaker
  6. from sqlalchemy.ext.declarative import declarative_base
  7. from config import database_config
  8. import pandas as pd
  9. class Mysql(object):
  10. def __init__(self):
  11. host = database_config.MYSQL_HOST
  12. port = database_config.MYSQL_PORT
  13. user = database_config.MYSQL_USER
  14. passwd = database_config.MYSQL_PASSWD
  15. dbname = database_config.MYSQL_DB
  16. # 通过连接池创建engine
  17. self.engine = create_engine(
  18. self._connect(host, port, user, passwd, dbname),
  19. pool_size=10, # 设置连接池大小
  20. max_overflow=20, # 超过连接池大小时的额外连接数
  21. pool_recycle=3600 # 回收连接时间
  22. )
  23. self._DBSession = sessionmaker(bind=self.engine)
  24. def _connect(self, host, port, user, pwd, db):
  25. try:
  26. client = "mysql+pymysql://" + user + ":" + pwd + "@" + host + ":" + str(port) + "/" + db
  27. return client
  28. except Exception as e:
  29. raise ConnectionError(f"failed to create connection string: {e}")
  30. def create_session(self):
  31. """创建返回一个新的数据库session"""
  32. return self._DBSession()
  33. def closed(self):
  34. """关闭连接,回收资源"""
  35. self.engine.dispose()
  36. if __name__ == '__main__':
  37. client = Mysql()
  38. # 创建会话
  39. session = client.create_session()
  40. # 使用 session 执行查询等操作
  41. try:
  42. results = session.execute(
  43. text("select * from tads_brandcul_cust_order")
  44. ).mappings().all()
  45. # 将结果转换为DataFrame
  46. df = pd.DataFrame(results) # 提取列名
  47. print(df)
  48. finally:
  49. session.close()