mysql.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. from contextlib import contextmanager
  2. from core import get_logger, settings, DatabaseException
  3. import pandas as pd
  4. from sqlalchemy import create_engine, text
  5. from sqlalchemy.orm import sessionmaker
  6. from sqlalchemy.exc import SQLAlchemyError
  7. logger = get_logger("database.mysql")
  8. class MySqlDatabaseHelper:
  9. _instance = None
  10. def __new__(cls):
  11. if not cls._instance:
  12. cls._instance = super(MySqlDatabaseHelper, cls).__new__(cls)
  13. cls._instance._initialized = False
  14. return cls._instance
  15. def __init__(self):
  16. if self._initialized:
  17. return
  18. self._connect_database()
  19. self._initialized = True
  20. def _connect_database(self):
  21. try:
  22. conn_str = (
  23. f"mysql+pymysql://{settings.mysql_user}:{settings.mysql_password}"
  24. f"@{settings.mysql_host}:{settings.mysql_port}/{settings.mysql_db}"
  25. )
  26. self.engine = create_engine(
  27. conn_str,
  28. pool_size=20,
  29. max_overflow=30,
  30. pool_recycle=1800,
  31. pool_pre_ping=True,
  32. isolation_level="READ COMMITTED",
  33. )
  34. self._DBSession = sessionmaker(bind=self.engine)
  35. logger.info("MySQL connection pool created", extra={"extra_data": {"host": settings.mysql_host, "db": settings.mysql_db}})
  36. except Exception as e:
  37. logger.error("Failed to create MySQL connection", exc_info=True)
  38. raise DatabaseException(message="数据库连接失败", detail=str(e))
  39. @contextmanager
  40. def get_session(self):
  41. session = self._DBSession()
  42. try:
  43. yield session
  44. except SQLAlchemyError as e:
  45. session.rollback()
  46. logger.error("Database operation failed", exc_info=True)
  47. raise DatabaseException(message="数据库操作失败", detail=str(e))
  48. finally:
  49. session.close()
  50. def load_data_with_page(self, query, params, page_size=100000):
  51. """分页查询数据"""
  52. count_query = text(f"SELECT COUNT(*) FROM ({query}) AS _count_subq")
  53. query += " LIMIT :limit OFFSET :offset"
  54. query = text(query)
  55. result = self.fetch_one(count_query, params)
  56. total_rows = result[0] if result is not None else 0
  57. if total_rows == 0:
  58. logger.debug("Query returned 0 rows")
  59. return pd.DataFrame()
  60. logger.debug(f"Loading {total_rows} rows with page_size={page_size}")
  61. data = pd.DataFrame()
  62. page = 1
  63. while True:
  64. offset = (page - 1) * page_size
  65. page_params = dict(params)
  66. page_params["limit"] = page_size
  67. page_params["offset"] = offset
  68. df = pd.DataFrame(self.fetch_all(query, page_params))
  69. if df.empty:
  70. break
  71. data = pd.concat([data, df], ignore_index=True)
  72. page += 1
  73. logger.debug(f"Loaded {len(data)} rows in {page - 1} pages")
  74. return data
  75. def fetch_all(self, query, params=None):
  76. """执行SQL查询并返回所有结果"""
  77. with self.get_session() as session:
  78. results = session.execute(query, params or {}).fetchall()
  79. return results
  80. def fetch_one(self, query, params=None):
  81. """执行SQL查询并返回单条结果"""
  82. with self.get_session() as session:
  83. result = session.execute(query, params or {}).fetchone()
  84. return result
  85. def insert_data(self, table_name, data_dict):
  86. """插入单条数据到指定表"""
  87. if not data_dict:
  88. return 0
  89. columns = ", ".join(data_dict.keys())
  90. values = ", ".join([f":{key}" for key in data_dict.keys()])
  91. query = text(f"INSERT INTO {table_name} ({columns}) VALUES ({values})")
  92. with self.get_session() as session:
  93. result = session.execute(query, data_dict)
  94. session.commit()
  95. logger.info(f"Inserted 1 row into {table_name}")
  96. return result.rowcount
  97. def update_data(self, table_name, update_dict, conditions, condition_params=None):
  98. """更新表中符合条件的数据"""
  99. if not update_dict:
  100. return 0
  101. set_clause = ", ".join([f"{key} = :{key}" for key in update_dict.keys()])
  102. if len(conditions) == 1:
  103. where_clause = f"WHERE {conditions[0]}"
  104. elif len(conditions) > 1:
  105. where_clause = f"WHERE {' AND '.join(conditions)}"
  106. else:
  107. where_clause = ""
  108. query = text(f"UPDATE {table_name} SET {set_clause} {where_clause}")
  109. params = update_dict.copy()
  110. if condition_params:
  111. params.update(condition_params)
  112. with self.get_session() as session:
  113. result = session.execute(query, params)
  114. session.commit()
  115. logger.info(f"Updated {result.rowcount} rows in {table_name}")
  116. return result.rowcount
  117. def execute_query(self, query, params=None):
  118. """执行SQL语句"""
  119. with self.get_session() as session:
  120. session.execute(query, params or {})
  121. session.commit()
  122. def check_connection(self) -> bool:
  123. """检查数据库连接是否正常"""
  124. try:
  125. self.fetch_one(text("SELECT 1"), {})
  126. return True
  127. except Exception:
  128. return False