mysql.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. from config import load_config
  2. import pandas as pd
  3. from sqlalchemy import create_engine, text
  4. from sqlalchemy.orm import sessionmaker
  5. from sqlalchemy.exc import SQLAlchemyError
  6. from tqdm import tqdm
  7. cfgs = load_config()
  8. class MySqlDatabaseHelper:
  9. _instance = None
  10. def __new__(cls):
  11. if not cls._instance:
  12. cls._instance = super(MySqlDatabaseHelper, cls).__new__(cls)
  13. cls._instance._initialized = False
  14. return cls._instance
  15. def __init__(self):
  16. if self._initialized:
  17. return
  18. self._host = cfgs['mysql']['host']
  19. self._port = cfgs['mysql']['port']
  20. self._user = cfgs['mysql']['user']
  21. self._passwd = cfgs['mysql']['passwd']
  22. self._dbname = cfgs['mysql']['db']
  23. self.connect_database()
  24. self._initialized = True
  25. def connect_database(self):
  26. # 创建数据库连接
  27. try:
  28. conn = "mysql+pymysql://" + self._user + ":" + self._passwd + "@" + self._host + ":" + str(self._port) + "/" + self._dbname
  29. except Exception as e:
  30. raise ConnectionAbortedError(f"failed to create connection string: {e}")
  31. # 通过连接池创建engine
  32. self.engine = create_engine(
  33. conn,
  34. pool_size=10, # 设置连接池大小
  35. max_overflow=20, # 超过连接池大小时的额外连接数
  36. pool_recycle=3600 # 回收连接时间
  37. )
  38. self._DBSession = sessionmaker(bind=self.engine)
  39. def load_data_with_page(self, query, params, page_size=1000):
  40. """分页查询数据"""
  41. data = pd.DataFrame()
  42. count_query = text(query.replace("SELECT *", "SELECT COUNT(*)"))
  43. query += " LIMIT :limit OFFSET :offset"
  44. query = text(query)
  45. # 获取总行数
  46. total_rows = self.fetch_one(count_query, params)[0]
  47. page = 1
  48. with tqdm(total=total_rows, desc="Loading data", unit="rows") as pbar: # 初始化进度条
  49. while True:
  50. offset = (page - 1) * page_size # 计算偏移量
  51. params["limit"] = page_size
  52. params["offset"] = offset
  53. df = pd.DataFrame(self.fetch_all(query, params))
  54. if df.empty:
  55. break
  56. data = pd.concat([data, df], ignore_index=True)
  57. # 更新进度条
  58. pbar.update(len(df)) # 更新进度条的行数
  59. page += 1
  60. return data
  61. def fetch_all(self, query, params=None):
  62. """执行SQL查询并返回所有结果"""
  63. session = self._DBSession()
  64. try:
  65. results = session.execute(query, params or {}).fetchall()
  66. return results
  67. except SQLAlchemyError as e:
  68. session.rollback()
  69. print(f"error: {e}")
  70. finally:
  71. session.close()
  72. def fetch_one(self, query, params=None):
  73. """执行SQL查询并返回单条结果"""
  74. session = self._DBSession()
  75. try:
  76. result = session.execute(query, params or {}).fetchone()
  77. return result
  78. except SQLAlchemyError as e:
  79. session.rollback()
  80. print(f"error: {e}")
  81. finally:
  82. session.close()
  83. def execute_query(self, query, params=None):
  84. """执行SQL语句 (无返回值, 如INSERT, UPDATE, DELETE)"""
  85. session = self._DBSession()
  86. try:
  87. session.execute(query, params or {})
  88. session.commit()
  89. except SQLAlchemyError as e:
  90. session.rollback()
  91. print(f"Error: {e}")
  92. finally:
  93. session.close()