mysql_client.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. #!/usr/bin/env python3
  2. # -*- coding:utf-8 -*-
  3. import os
  4. from sqlalchemy import create_engine, text
  5. from sqlalchemy.dialects.mysql import pymysql
  6. from sqlalchemy.orm import sessionmaker
  7. from sqlalchemy.ext.declarative import declarative_base
  8. from config import database_config
  9. import pandas as pd
  10. import sys
  11. class Mysql(object):
  12. def __init__(self):
  13. host = database_config.MYSQL_HOST
  14. port = database_config.MYSQL_PORT
  15. user = database_config.MYSQL_USER
  16. passwd = database_config.MYSQL_PASSWD
  17. dbname = database_config.MYSQL_DB
  18. # 通过连接池创建engine
  19. self.engine = create_engine(
  20. self._connect(host, port, user, passwd, dbname),
  21. pool_size=10, # 设置连接池大小
  22. max_overflow=20, # 超过连接池大小时的额外连接数
  23. pool_recycle=3600 # 回收连接时间
  24. )
  25. self._DBSession = sessionmaker(bind=self.engine)
  26. def _connect(self, host, port, user, pwd, db):
  27. try:
  28. client = "mysql+pymysql://" + user + ":" + pwd + "@" + host + ":" + str(port) + "/" + db
  29. return client
  30. except Exception as e:
  31. raise ConnectionError(f"failed to create connection string: {e}")
  32. def create_session(self):
  33. """创建返回一个新的数据库session"""
  34. return self._DBSession()
  35. def fetch_data_with_pagination(self, tablename, query_text, page=1, page_size=1000):
  36. """分页查询数据"""
  37. offset = (page - 1) * page_size # 计算偏移量
  38. query = text(f"select {query_text} from {tablename} LIMIT :limit OFFSET :offset")
  39. with self.create_session() as session:
  40. results = session.execute(query, {"limit": page_size, "offset": offset}).fetchall()
  41. df = pd.DataFrame(results)
  42. return df
  43. def load_data(self, tablename, query_text, page=1, page_size=1000):
  44. # 创建一个空的DataFrame用于存储所有数据
  45. total_df = pd.DataFrame()
  46. try:
  47. while True:
  48. df = self.fetch_data_with_pagination(tablename, query_text, page, page_size)
  49. if df.empty:
  50. break
  51. total_df = pd.concat([total_df, df], ignore_index=True)
  52. print(f"Page {page}: Retrieved {len(df)} rows, Total rows so far: {len(total_df)}")
  53. page += 1 # 继续下一页
  54. except Exception as e:
  55. print(f"Error: {e}")
  56. return None
  57. finally:
  58. self.closed()
  59. return total_df
  60. def closed(self):
  61. """关闭连接,回收资源"""
  62. self.engine.dispose()
  63. if __name__ == '__main__':
  64. client = Mysql()
  65. tablename = "mock_order"
  66. # 设置分页参数
  67. page = 1
  68. page_size = 1000
  69. query_text = '*'
  70. client.load_data("mock_order", query_text, page, page_size)