mysql.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. from config import load_config
  2. import pandas as pd
  3. from sqlalchemy import create_engine, text
  4. from sqlalchemy.orm import sessionmaker
  5. from sqlalchemy.exc import SQLAlchemyError
  6. from tqdm import tqdm
  7. cfgs = load_config()
  8. class MySqlDatabaseHelper:
  9. _instance = None
  10. def __new__(cls):
  11. if not cls._instance:
  12. cls._instance = super(MySqlDatabaseHelper, cls).__new__(cls)
  13. cls._instance._initialized = False
  14. return cls._instance
  15. def __init__(self):
  16. if self._initialized:
  17. return
  18. self._host = cfgs['mysql']['host']
  19. self._port = cfgs['mysql']['port']
  20. self._user = cfgs['mysql']['user']
  21. self._passwd = cfgs['mysql']['passwd']
  22. self._dbname = cfgs['mysql']['db']
  23. self.connect_database()
  24. self._initialized = True
  25. def connect_database(self):
  26. # 创建数据库连接
  27. try:
  28. conn = "mysql+pymysql://" + self._user + ":" + self._passwd + "@" + self._host + ":" + str(self._port) + "/" + self._dbname
  29. # 通过连接池创建engine
  30. self.engine = create_engine(
  31. conn,
  32. pool_size=20, # 设置连接池大小
  33. max_overflow=30, # 超过连接池大小时的额外连接数
  34. pool_recycle=1800, # 回收连接时间
  35. pool_pre_ping=True, # 防止断开连接
  36. isolation_level="READ COMMITTED" # 降低隔离级别
  37. )
  38. except Exception as e:
  39. raise ConnectionAbortedError(f"failed to create connection: {e}")
  40. self._DBSession = sessionmaker(bind=self.engine)
  41. def load_data_with_page(self, query, params, page_size=100000):
  42. """分页查询数据"""
  43. data = pd.DataFrame()
  44. # 用子查询包裹原始查询来计数,避免字符串替换
  45. count_query = text(f"SELECT COUNT(*) FROM ({query}) AS _count_subq")
  46. query += " LIMIT :limit OFFSET :offset"
  47. query = text(query)
  48. # 获取总行数
  49. result = self.fetch_one(count_query, params)
  50. total_rows = result[0] if result is not None else 0
  51. if total_rows == 0:
  52. return data
  53. page = 1
  54. with tqdm(total=total_rows, desc="Loading data", unit="rows") as pbar:
  55. while True:
  56. offset = (page - 1) * page_size
  57. # 复制 params 避免修改调用方的字典
  58. page_params = dict(params)
  59. page_params["limit"] = page_size
  60. page_params["offset"] = offset
  61. df = pd.DataFrame(self.fetch_all(query, page_params))
  62. if df.empty:
  63. break
  64. data = pd.concat([data, df], ignore_index=True)
  65. pbar.update(len(df))
  66. page += 1
  67. return data
  68. def fetch_all(self, query, params=None):
  69. """执行SQL查询并返回所有结果"""
  70. session = self._DBSession()
  71. try:
  72. results = session.execute(query, params or {}).fetchall()
  73. return results
  74. except SQLAlchemyError as e:
  75. session.rollback()
  76. print(f"error: {e}")
  77. raise
  78. finally:
  79. session.close()
  80. def fetch_one(self, query, params=None):
  81. """执行SQL查询并返回单条结果"""
  82. session = self._DBSession()
  83. try:
  84. result = session.execute(query, params or {}).fetchone()
  85. return result
  86. except SQLAlchemyError as e:
  87. session.rollback()
  88. print(f"error: {e}")
  89. raise
  90. finally:
  91. session.close()
  92. def insert_data(self, table_name, data_dict):
  93. """插入单条数据到指定表"""
  94. if not data_dict:
  95. return 0
  96. columns = ", ".join(data_dict.keys())
  97. values = ", ".join([f":{key}" for key in data_dict.keys()])
  98. query = text(f"INSERT INTO {table_name} ({columns}) VALUES ({values})")
  99. session = self._DBSession()
  100. try:
  101. result = session.execute(query, data_dict)
  102. session.commit()
  103. return result.rowcount
  104. except SQLAlchemyError as e:
  105. session.rollback()
  106. print(f"Error inserting data: {e}")
  107. return 0
  108. finally:
  109. session.close()
  110. def update_data(self, table_name, update_dict, conditions, condition_params=None):
  111. """更新表中符合条件的数据"""
  112. if not update_dict:
  113. return 0
  114. set_clause = ", ".join([f"{key} = :{key}" for key in update_dict.keys()])
  115. if len(conditions) == 1:
  116. where_clause = f"WHERE {conditions[0]}"
  117. elif len(conditions) > 1:
  118. where_clause = f"WHERE {' AND '.join(conditions)}"
  119. else:
  120. where_clause = ""
  121. query = text(f"UPDATE {table_name} SET {set_clause} {where_clause}")
  122. params = update_dict.copy()
  123. if condition_params:
  124. params.update(condition_params)
  125. session = self._DBSession()
  126. try:
  127. result = session.execute(query, params)
  128. session.commit()
  129. return result.rowcount
  130. except SQLAlchemyError as e:
  131. session.rollback()
  132. print(f"Error updating data: {e}")
  133. return 0
  134. finally:
  135. session.close()
  136. def execute_query(self, query, params=None):
  137. """执行SQL语句 (无返回值, 如INSERT, UPDATE, DELETE)"""
  138. session = self._DBSession()
  139. try:
  140. session.execute(query, params or {})
  141. session.commit()
  142. except SQLAlchemyError as e:
  143. session.rollback()
  144. print(f"Error: {e}")
  145. finally:
  146. session.close()
  147. if __name__ == '__main__':
  148. db_helper = MySqlDatabaseHelper()
  149. table_name = 'tads_brandcul_report'
  150. data_dict = {
  151. 'cultivacation_id': 10000002,
  152. 'city_uuid': '00000000000000000000000011445301',
  153. 'limit_cycle_name': '202505W1(05.05-05.11)',
  154. 'product_code': '440298',
  155. 'product_info_table': 'D72E3FAE8DCE4270BD23C3EC015C0A35',
  156. 'relation_table': 'AD889019FD4F4EE7B887981162BA09EC',
  157. 'similarity_product_table': 'CE436AC24D96461FA0C091CB01E9BC05',
  158. 'recommend_table': 'A7C5918B8DDB4BEA9D921936955CBAF6',
  159. }
  160. # db_helper.insert_data(table_name, data_dict)
  161. update_data = {"val_table": "A7C5918B8DDB4BEA9D921936955CBAF6"}
  162. conditions = [
  163. "cultivacation_id = :cultivacation_id",
  164. "city_uuid = :city_uuid"
  165. ]
  166. condition_params = {
  167. 'cultivacation_id': 10000001,
  168. 'city_uuid': '00000000000000000000000011445301',
  169. }
  170. db_helper.update_data(table_name, update_data, conditions, condition_params)