from database import MySqlDatabaseHelper from sqlalchemy import text class MySqlDao: _instance = None def __new__(cls): if not cls._instance: cls._instance = super(MySqlDao, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self.db_helper = MySqlDatabaseHelper() self._product_tablename = "tads_brandcul_product_info" self._cust_tablename = "tads_brandcul_cust_info" self._order_tablename = "tads_brandcul_cust_order" self._mock_order_tablename = "yunfu_mock_data" self._initialized = True def load_product_data(self, city_uuid): """从数据库中读取商品信息""" query = f"SELECT * FROM {self._product_tablename} WHERE city_uuid = :city_uuid" params = {"city_uuid": city_uuid} data = self.db_helper.load_data_with_page(query, params) return data def load_cust_data(self, city_uuid): """从数据库中读取商户信息""" query = f"SELECT * FROM {self._cust_tablename} WHERE BA_CITY_ORG_CODE = :city_uuid" params = {"city_uuid": city_uuid} data = self.db_helper.load_data_with_page(query, params) return data def load_order_data(self, city_uuid): """从数据库中读取订单信息""" query = f"SELECT * FROM {self._order_tablename} WHERE city_uuid = :city_uuid" params = {"city_uuid": city_uuid} data = self.db_helper.load_data_with_page(query, params) data.drop('stat_month', axis=1, inplace=True) data.drop('city_uuid', axis=1, inplace=True) # 去除重复值和填补缺失值 data.drop_duplicates(inplace=True) data.fillna(0, inplace=True) data = data.infer_objects(copy=False) return data def load_mock_order_data(self, city_uuid): """从数据库中读取mock的订单信息""" query = f"SELECT * FROM {self._mock_order_tablename}" params = {"city_uuid": city_uuid} data = self.db_helper.load_data_with_page(query, params) # 去除重复值和填补缺失值 data.drop_duplicates(inplace=True) data.fillna(0, inplace=True) data = data.infer_objects(copy=False) return data def get_cust_list(self, city_uuid): """获取商户列表""" data = self.load_cust_data(city_uuid) cust_list = data["BB_RETAIL_CUSTOMER_CODE"].to_list() if len(cust_list) == 0: return [] return cust_list def get_product_by_id(self, city_uuid, product_id): """根据city_uuid 和 product_id 从表中获取拼柜信息""" query = text(f""" SELECT * FROM {self._product_tablename} WHERE city_uuid = :city_uuid AND product_code = :product_id """) params = {"city_uuid": city_uuid, "product_id": product_id} data = self.db_helper.fetch_one(query, params) return data def get_cust_by_ids(self, city_uuid, cust_id_list): """根据零售户列表查询其信息""" if not cust_id_list: return None cust_id_str = ",".join([f"'{cust_id}'" for cust_id in cust_id_list]) query = text(f""" SELECT * FROM {self._cust_tablename} WHERE BA_CITY_ORG_CODE = :city_uuid AND BB_RETAIL_CUSTOMER_CODE IN ({cust_id_str}) """) params = {"city_uuid": city_uuid} data = self.db_helper.fetch_all(query, params) return data if __name__ == "__main__": dao = MySqlDao() # city_uuid = "00000000000000000000000011445301" city_uuid = "00000000000000000000000011441801" cust_id_list = ["441800100006", "441800100051", "441800100811"] cust_list = dao.get_cust_by_ids(city_uuid, cust_id_list) print(len(cust_list))