|
@@ -52,28 +52,30 @@ class MySqlDatabaseHelper:
|
|
|
def load_data_with_page(self, query, params, page_size=100000):
|
|
def load_data_with_page(self, query, params, page_size=100000):
|
|
|
"""分页查询数据"""
|
|
"""分页查询数据"""
|
|
|
data = pd.DataFrame()
|
|
data = pd.DataFrame()
|
|
|
- count_query = text(query.replace("SELECT *", "SELECT COUNT(*)"))
|
|
|
|
|
|
|
+ # 用子查询包裹原始查询来计数,避免字符串替换
|
|
|
|
|
+ count_query = text(f"SELECT COUNT(*) FROM ({query}) AS _count_subq")
|
|
|
query += " LIMIT :limit OFFSET :offset"
|
|
query += " LIMIT :limit OFFSET :offset"
|
|
|
query = text(query)
|
|
query = text(query)
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# 获取总行数
|
|
# 获取总行数
|
|
|
total_rows = self.fetch_one(count_query, params)[0]
|
|
total_rows = self.fetch_one(count_query, params)[0]
|
|
|
|
|
|
|
|
page = 1
|
|
page = 1
|
|
|
- with tqdm(total=total_rows, desc="Loading data", unit="rows") as pbar: # 初始化进度条
|
|
|
|
|
|
|
+ with tqdm(total=total_rows, desc="Loading data", unit="rows") as pbar:
|
|
|
while True:
|
|
while True:
|
|
|
- offset = (page - 1) * page_size # 计算偏移量
|
|
|
|
|
- params["limit"] = page_size
|
|
|
|
|
- params["offset"] = offset
|
|
|
|
|
|
|
+ offset = (page - 1) * page_size
|
|
|
|
|
+ # 复制 params 避免修改调用方的字典
|
|
|
|
|
+ page_params = dict(params)
|
|
|
|
|
+ page_params["limit"] = page_size
|
|
|
|
|
+ page_params["offset"] = offset
|
|
|
|
|
|
|
|
- df = pd.DataFrame(self.fetch_all(query, params))
|
|
|
|
|
|
|
+ df = pd.DataFrame(self.fetch_all(query, page_params))
|
|
|
if df.empty:
|
|
if df.empty:
|
|
|
break
|
|
break
|
|
|
data = pd.concat([data, df], ignore_index=True)
|
|
data = pd.concat([data, df], ignore_index=True)
|
|
|
-
|
|
|
|
|
- # 更新进度条
|
|
|
|
|
- pbar.update(len(df)) # 更新进度条的行数
|
|
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
|
|
+ pbar.update(len(df))
|
|
|
|
|
+
|
|
|
page += 1
|
|
page += 1
|
|
|
return data
|
|
return data
|
|
|
|
|
|