#!/usr/bin/env python3 # -*- coding:utf-8 -*- import os from sqlalchemy import create_engine, text from sqlalchemy.dialects.mysql import pymysql from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base from config import load_config import pandas as pd import sys cfgs = load_config() class Mysql(object): def __init__(self): self._host = cfgs['mysql']['host'] self._port = cfgs['mysql']['port'] self._user = cfgs['mysql']['user'] self._passwd = cfgs['mysql']['passwd'] self._dbname = cfgs['mysql']['db'] # 通过连接池创建engine self.engine = create_engine( self._connect(self._host, self._port, self._user, self._passwd, self._dbname), pool_size=10, # 设置连接池大小 max_overflow=20, # 超过连接池大小时的额外连接数 pool_recycle=3600 # 回收连接时间 ) self._DBSession = sessionmaker(bind=self.engine) def _connect(self, host, port, user, pwd, db): try: client = "mysql+pymysql://" + user + ":" + pwd + "@" + host + ":" + str(port) + "/" + db return client except Exception as e: raise ConnectionError(f"failed to create connection string: {e}") def create_session(self): """创建返回一个新的数据库session""" return self._DBSession() def fetch_data_with_pagination(self, tablename, query_text, field_name, city_uuid, page=1, page_size=1000): """分页查询数据,并根据 city_uuid 进行过滤""" offset = (page - 1) * page_size # 计算偏移量 query = text(f"SELECT {query_text} FROM {tablename} WHERE {field_name} = :city_uuid LIMIT :limit OFFSET :offset") with self.create_session() as session: results = session.execute(query, {"city_uuid": city_uuid, "limit": page_size, "offset": offset}).fetchall() df = pd.DataFrame(results) return df def load_data(self, tablename, query_text, field_name, city_uuid, page=1, page_size=1000): # 创建一个空的DataFrame用于存储所有数据 total_df = pd.DataFrame() try: while True: df = self.fetch_data_with_pagination(tablename, query_text, field_name, city_uuid, page, page_size) if df.empty: break total_df = pd.concat([total_df, df], ignore_index=True) print(f"Page {page}: Retrieved {len(df)} rows, Total rows so far: {len(total_df)}") page += 1 # 继续下一页 except Exception as e: print(f"Error: {e}") return None finally: self.closed() return total_df def get_product_by_id(self, city_uuid, product_id): """根据 city_uuid 和 product_id 从表中获取品规信息""" query = text(""" SELECT * FROM tads_brandcul_product_info WHERE city_uuid = :city_uuid AND product_code = :product_id """) with self.create_session() as session: result = session.execute(query, {"city_uuid": city_uuid, "product_id": product_id}).fetchall() result = pd.DataFrame(result) return result def get_cust_by_ids(self, city_uuid, cust_id_list): """根据 city_uuid 和 cust_id 列表从表中获取零售户信息""" if not cust_id_list: return [] cust_id_str = ",".join([f"'{cust_id}'" for cust_id in cust_id_list]) query = text(f""" SELECT * FROM tads_brandcul_cust_info WHERE BA_CITY_ORG_CODE = :city_uuid AND BB_RETAIL_CUSTOMER_CODE IN ({cust_id_str}) """) with self.create_session() as session: results = session.execute(query, {"city_uuid": city_uuid}).fetchall() results = pd.DataFrame(results) return results def load_mock_data(self, tablename, query_text, page=1, page_size=1000): # 创建一个空的DataFrame用于存储所有数据 total_df = pd.DataFrame() try: while True: offset = (page - 1) * page_size # 计算偏移量 query = text(f"SELECT {query_text} FROM {tablename} LIMIT :limit OFFSET :offset") with self.create_session() as session: results = session.execute(query, { "limit": page_size, "offset": offset}).fetchall() df = pd.DataFrame(results) if df.empty: break total_df = pd.concat([total_df, df], ignore_index=True) print(f"Page {page}: Retrieved {len(df)} rows, Total rows so far: {len(total_df)}") page += 1 # 继续下一页 except Exception as e: print(f"Error: {e}") return None finally: self.closed() return total_df def closed(self): """关闭连接,回收资源""" self.engine.dispose() if __name__ == '__main__': client = Mysql() tablename = "mock_order" # 设置分页参数 page = 1 page_size = 1000 query_text = '*' client.load_data("mock_order", query_text, page, page_size)