mysql_client.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. #!/usr/bin/env python3
  2. # -*- coding:utf-8 -*-
  3. import os
  4. from sqlalchemy import create_engine, text
  5. from sqlalchemy.dialects.mysql import pymysql
  6. from sqlalchemy.orm import sessionmaker
  7. from sqlalchemy.ext.declarative import declarative_base
  8. from config import database_config
  9. import pandas as pd
  10. import sys
  11. class Mysql(object):
  12. def __init__(self):
  13. host = database_config.MYSQL_HOST
  14. port = database_config.MYSQL_PORT
  15. user = database_config.MYSQL_USER
  16. passwd = database_config.MYSQL_PASSWD
  17. dbname = database_config.MYSQL_DB
  18. # 通过连接池创建engine
  19. self.engine = create_engine(
  20. self._connect(host, port, user, passwd, dbname),
  21. pool_size=10, # 设置连接池大小
  22. max_overflow=20, # 超过连接池大小时的额外连接数
  23. pool_recycle=3600 # 回收连接时间
  24. )
  25. self._DBSession = sessionmaker(bind=self.engine)
  26. def _connect(self, host, port, user, pwd, db):
  27. try:
  28. client = "mysql+pymysql://" + user + ":" + pwd + "@" + host + ":" + str(port) + "/" + db
  29. return client
  30. except Exception as e:
  31. raise ConnectionError(f"failed to create connection string: {e}")
  32. def create_session(self):
  33. """创建返回一个新的数据库session"""
  34. return self._DBSession()
  35. def closed(self):
  36. """关闭连接,回收资源"""
  37. self.engine.dispose()
  38. if __name__ == '__main__':
  39. client = Mysql()
  40. # 创建会话
  41. session = client.create_session()
  42. # 使用 session 执行查询等操作
  43. try:
  44. results = session.execute(
  45. text("select * from tads_brandcul_cust_order")
  46. ).all()
  47. # 将结果转换为DataFrame
  48. df = pd.DataFrame(results).drop(columns=['stat_month']) # 提取列名
  49. # 获取c
  50. print(len(df.columns))
  51. finally:
  52. session.close()