mysql_client.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. #!/usr/bin/env python3
  2. # -*- coding:utf-8 -*-
  3. import os
  4. from sqlalchemy import create_engine, text
  5. from sqlalchemy.dialects.mysql import pymysql
  6. from sqlalchemy.orm import sessionmaker
  7. from sqlalchemy.ext.declarative import declarative_base
  8. from config import load_config
  9. import pandas as pd
  10. import sys
  11. cfgs = load_config()
  12. class Mysql(object):
  13. def __init__(self):
  14. self._host = cfgs['mysql']['host']
  15. self._port = cfgs['mysql']['port']
  16. self._user = cfgs['mysql']['user']
  17. self._passwd = cfgs['mysql']['passwd']
  18. self._dbname = cfgs['mysql']['db']
  19. # 通过连接池创建engine
  20. self.engine = create_engine(
  21. self._connect(self._host, self._port, self._user, self._passwd, self._dbname),
  22. pool_size=10, # 设置连接池大小
  23. max_overflow=20, # 超过连接池大小时的额外连接数
  24. pool_recycle=3600 # 回收连接时间
  25. )
  26. self._DBSession = sessionmaker(bind=self.engine)
  27. def _connect(self, host, port, user, pwd, db):
  28. try:
  29. client = "mysql+pymysql://" + user + ":" + pwd + "@" + host + ":" + str(port) + "/" + db
  30. return client
  31. except Exception as e:
  32. raise ConnectionError(f"failed to create connection string: {e}")
  33. def create_session(self):
  34. """创建返回一个新的数据库session"""
  35. return self._DBSession()
  36. def fetch_data_with_pagination(self, tablename, query_text, field_name, city_uuid, page=1, page_size=1000):
  37. """分页查询数据,并根据 city_uuid 进行过滤"""
  38. offset = (page - 1) * page_size # 计算偏移量
  39. query = text(f"SELECT {query_text} FROM {tablename} WHERE {field_name} = :city_uuid LIMIT :limit OFFSET :offset")
  40. with self.create_session() as session:
  41. results = session.execute(query, {"city_uuid": city_uuid, "limit": page_size, "offset": offset}).fetchall()
  42. df = pd.DataFrame(results)
  43. return df
  44. def load_data(self, tablename, query_text, field_name, city_uuid, page=1, page_size=1000):
  45. # 创建一个空的DataFrame用于存储所有数据
  46. total_df = pd.DataFrame()
  47. try:
  48. while True:
  49. df = self.fetch_data_with_pagination(tablename, query_text, field_name, city_uuid, page, page_size)
  50. if df.empty:
  51. break
  52. total_df = pd.concat([total_df, df], ignore_index=True)
  53. print(f"Page {page}: Retrieved {len(df)} rows, Total rows so far: {len(total_df)}")
  54. page += 1 # 继续下一页
  55. except Exception as e:
  56. print(f"Error: {e}")
  57. return None
  58. finally:
  59. self.closed()
  60. return total_df
  61. def closed(self):
  62. """关闭连接,回收资源"""
  63. self.engine.dispose()
  64. if __name__ == '__main__':
  65. client = Mysql()
  66. tablename = "mock_order"
  67. # 设置分页参数
  68. page = 1
  69. page_size = 1000
  70. query_text = '*'
  71. client.load_data("mock_order", query_text, page, page_size)