from core import get_logger from database import MySqlDatabaseHelper from sqlalchemy import text, bindparam import pandas as pd logger = get_logger("database.dao") class MySqlDao: _instance = None def __new__(cls): if not cls._instance: cls._instance = super(MySqlDao, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self.db_helper = MySqlDatabaseHelper() self._product_tablename = "tads_brandcul_product_info_f" self._cust_tablename = "tads_brandcul_retail_cust_label" self._order_tablename = "tads_brandcul_consumer_order" self._eval_order_name = "tads_brandcul_consumer_order_check_week" self._order_analysis_table_name = "tads_brandcul_analysis_index" # 销售指标指标分析表 self._mock_order_tablename = "yunfu_mock_data" self._shopping_tablename = "tads_brandcul_cust_info_lbs_f" # self._shopping_tablename = "yunfu_shopping_mock_data" self._report_tablename = "tads_brandcul_report" self._initialized = True def load_product_data(self, city_uuid): """从数据库中读取商品信息""" logger.info(f"Loading product data for city_uuid={city_uuid}") query = f"SELECT * FROM {self._product_tablename} WHERE city_uuid = :city_uuid AND org_is_active = '是'" params = {"city_uuid": city_uuid} data = self.db_helper.load_data_with_page(query, params) return data def load_cust_data(self, city_uuid): """从数据库中读取商户信息""" logger.info(f"Loading cust data for city_uuid={city_uuid}") query = f"SELECT * FROM {self._cust_tablename} WHERE corp_uuid = :city_uuid" params = {"city_uuid": city_uuid} data = self.db_helper.load_data_with_page(query, params) return data def load_order_data(self, city_uuid): """从数据库中读取订单信息""" logger.info(f"Loading order data for city_uuid={city_uuid}") query = f"SELECT * FROM {self._order_tablename} WHERE city_uuid = :city_uuid" params = {"city_uuid": city_uuid} data = self.db_helper.load_data_with_page(query, params) data.drop('stat_month', axis=1, inplace=True) data.drop('city_uuid', axis=1, inplace=True) cust_list = self.get_cust_list(city_uuid) cust_index = cust_list.set_index("cust_code") data = data.join(cust_index, on="cust_code", how="inner") return data def load_order_analysis_index_data(self, city_uuid): """从数据库中读取销售指标评估表""" logger.info(f"Loading order analysis index data for city_uuid={city_uuid}") query = f"SELECT * FROM {self._order_analysis_table_name} WHERE city_uuid = :city_uuid" params = {"city_uuid": city_uuid} data = self.db_helper.load_data_with_page(query, params) return data def load_delivery_order_data(self, city_uuid, start_time, end_time): """从数据库中读取订单信息""" logger.info(f"Loading delivery order data for city_uuid={city_uuid}, start_time={start_time}, end_time={end_time}") query = f"SELECT * FROM {self._eval_order_name} WHERE city_uuid = :city_uuid AND cycle_begin_date = :start_time AND cycle_end_date = :end_time" params = { "city_uuid": city_uuid, "start_time": start_time, "end_time": end_time } data = self.db_helper.load_data_with_page(query, params) return data def load_mock_order_data(self): """从数据库中读取mock的订单信息""" logger.info("Loading mock order data") query = f"SELECT * FROM {self._mock_order_tablename}" data = self.db_helper.load_data_with_page(query, {}) return data def load_shopping_data(self, city_uuid): """从数据库中读取商圈数据""" logger.info(f"Loading shopping data for city_uuid={city_uuid}") query = f"SELECT * FROM {self._shopping_tablename} WHERE city_uuid = :city_uuid" params = {"city_uuid": city_uuid} data = self.db_helper.load_data_with_page(query, params) return data def get_product_by_id(self, city_uuid, product_id): """根据city_uuid 和 product_id 从表中获取拼柜信息""" logger.info(f"Getting product by id for city_uuid={city_uuid}, product_id={product_id}") query = text(f""" SELECT * FROM {self._product_tablename} WHERE city_uuid = :city_uuid AND product_code = :product_id """) params = {"city_uuid": city_uuid, "product_id": product_id} result = self.db_helper.fetch_one(query, params) return pd.DataFrame([dict(result._mapping)] if result else []) def get_cust_by_ids(self, city_uuid, cust_id_list): """根据零售户列表查询其信息""" logger.info(f"Getting cust by ids for city_uuid={city_uuid}, count={len(cust_id_list) if cust_id_list else 0}") if not cust_id_list: return pd.DataFrame() query = text(f""" SELECT * FROM {self._cust_tablename} WHERE corp_uuid = :city_uuid AND cust_code IN :ids """).bindparams(bindparam("ids", expanding=True)) params = {"city_uuid": city_uuid, "ids": list(cust_id_list)} data = pd.DataFrame(self.db_helper.fetch_all(query, params)) return data def get_shop_by_ids(self, city_uuid, cust_id_list): """根据零售户列表查询其信息""" logger.info(f"Getting shop by ids for city_uuid={city_uuid}, count={len(cust_id_list) if cust_id_list else 0}") if not cust_id_list: return pd.DataFrame() query = text(f""" SELECT * FROM {self._shopping_tablename} WHERE city_uuid = :city_uuid AND cust_code IN :ids """).bindparams(bindparam("ids", expanding=True)) params = {"city_uuid": city_uuid, "ids": list(cust_id_list)} data = pd.DataFrame(self.db_helper.fetch_all(query, params)) return data def get_product_by_ids(self, city_uuid, product_id_list): """根据product_code列表查询其信息""" logger.info(f"Getting products by ids for city_uuid={city_uuid}, count={len(product_id_list) if product_id_list else 0}") if not product_id_list: return pd.DataFrame() product_id_list = list(product_id_list) batch_size = 2000 batches = [product_id_list[i:i + batch_size] for i in range(0, len(product_id_list), batch_size)] results = [] for batch in batches: query = text(f""" SELECT * FROM {self._product_tablename} WHERE city_uuid = :city_uuid AND product_code IN :ids ORDER BY product_code """).bindparams(bindparam("ids", expanding=True)) params = {"city_uuid": city_uuid, "ids": batch} results.append(pd.DataFrame(self.db_helper.fetch_all(query, params))) return pd.concat(results, ignore_index=True) if results else pd.DataFrame() def get_order_by_product_ids(self, city_uuid, product_ids): """获取指定香烟列表的所有售卖记录""" logger.info(f"Getting orders by product ids for city_uuid={city_uuid}, count={len(product_ids) if product_ids else 0}") if not product_ids: return pd.DataFrame() product_ids = list(product_ids) batch_size = 2000 batches = [product_ids[i:i + batch_size] for i in range(0, len(product_ids), batch_size)] results = [] for batch in batches: query = text(f""" SELECT * FROM {self._order_tablename} WHERE city_uuid = :city_uuid AND product_code IN :ids ORDER BY cust_code, product_code """).bindparams(bindparam("ids", expanding=True)) params = {"city_uuid": city_uuid, "ids": batch} results.append(pd.DataFrame(self.db_helper.fetch_all(query, params))) data = pd.concat(results, ignore_index=True) if results else pd.DataFrame() cust_list = self.get_cust_list(city_uuid) cust_index = cust_list.set_index("cust_code") data = data.join(cust_index, on="cust_code", how="inner") return data def get_order_by_cust_ids_and_product_ids(self, city_uuid, cust_id_list, product_ids): """获取指定商户列表在指定品规列表上的售卖记录""" logger.info(f"Getting orders by cust ids and product ids for city_uuid={city_uuid}, custs={len(cust_id_list)}, products={len(product_ids)}") if not cust_id_list or not product_ids: return pd.DataFrame() query = text(f""" SELECT cust_code, product_code, sale_qty FROM {self._order_tablename} WHERE city_uuid = :city_uuid AND cust_code IN :cust_ids AND product_code IN :product_ids """).bindparams(bindparam("cust_ids", expanding=True), bindparam("product_ids", expanding=True)) params = {"city_uuid": city_uuid, "cust_ids": list(cust_id_list), "product_ids": list(product_ids)} data = pd.DataFrame(self.db_helper.fetch_all(query, params)) return data def get_order_by_product(self, city_uuid, product_id): logger.info(f"Getting orders by product for city_uuid={city_uuid}, product_id={product_id}") query = f""" SELECT * FROM {self._order_tablename} WHERE city_uuid = :city_uuid AND product_code = :product_id """ params = {"city_uuid": city_uuid, "product_id": product_id} data = self.db_helper.load_data_with_page(query, params) cust_list = self.get_cust_list(city_uuid) cust_index = cust_list.set_index("cust_code") data = data.join(cust_index, on="cust_code", how="inner") return data def get_eval_order_by_product(self, city_uuid, product_id): logger.info(f"Getting eval orders by product for city_uuid={city_uuid}, product_id={product_id}") query = f""" SELECT * FROM {self._eval_order_name} WHERE city_uuid = :city_uuid AND product_code = :product_id """ params = {"city_uuid": city_uuid, "product_id": product_id} data = self.db_helper.load_data_with_page(query, params) return data def get_delivery_data_by_product(self, city_uuid, product_id, start_time, end_time): """通过品规获取验证数据""" logger.info(f"Getting delivery data by product for city_uuid={city_uuid}, product_id={product_id}, start_time={start_time}, end_time={end_time}") query = f""" SELECT * FROM {self._eval_order_name} WHERE city_uuid = :city_uuid AND goods_code = :product_id AND cycle_begin_date = :start_time AND cycle_end_date = :end_time """ params = { "city_uuid": city_uuid, "product_id": product_id, "start_time": start_time, "end_time": end_time, } data = self.db_helper.load_data_with_page(query, params) return data def get_order_by_cust(self, city_uuid, cust_id): logger.info(f"Getting orders by cust for city_uuid={city_uuid}, cust_id={cust_id}") query = f""" SELECT * FROM {self._order_tablename} WHERE city_uuid = :city_uuid AND cust_code = :cust_id """ params = {"city_uuid": city_uuid, "cust_id": cust_id} data = self.db_helper.load_data_with_page(query, params) return data def get_order_by_cust_and_product(self, city_uuid, cust_id, product_id): logger.info(f"Getting orders by cust and product for city_uuid={city_uuid}, cust_id={cust_id}, product_id={product_id}") query = f""" SELECT * FROM {self._order_tablename} WHERE city_uuid = :city_uuid AND cust_code = :cust_id AND product_code =:product_id """ params = {"city_uuid": city_uuid, "cust_id": cust_id, "product_id": product_id} data = self.db_helper.load_data_with_page(query, params) return data def get_product_from_order(self, city_uuid): logger.info(f"Getting products from order for city_uuid={city_uuid}") query = f"SELECT DISTINCT product_code FROM {self._order_tablename} WHERE city_uuid = :city_uuid ORDER BY product_code" params = {"city_uuid": city_uuid} data = self.db_helper.load_data_with_page(query, params) return data def get_cust_list(self, city_uuid): logger.info(f"Getting cust list for city_uuid={city_uuid}") query = f"SELECT DISTINCT cust_code FROM {self._cust_tablename} WHERE corp_uuid = :city_uuid ORDER BY cust_code" params = {"city_uuid": city_uuid} data = self.db_helper.load_data_with_page(query, params) return data def insert_report(self, data_dict): """向report中插入数据""" logger.info("Inserting report data") return self.db_helper.insert_data(self._report_tablename, data_dict) def update_eval_report_data(self, cultivacation_id, eval_fileid): """更新投放记录中的验证报告fileid""" logger.info(f"Updating eval report data for cultivacation_id={cultivacation_id}") update_data = {"val_table": eval_fileid} conditions = [ "cultivacation_id = :cultivacation_id", ] condition_params = { 'cultivacation_id': cultivacation_id, } self.db_helper.update_data(self._report_tablename, update_data, conditions, condition_params) def get_report_file_id(self, cultivacation_id): """从report中根据cultivacation_id获取对应文件的fileid""" logger.info(f"Getting report file id for cultivacation_id={cultivacation_id}") query = f"SELECT product_info_table, relation_table, similarity_product_table, recommend_table, val_table FROM {self._report_tablename} WHERE cultivacation_id = :cultivacation_id" params = {"cultivacation_id": cultivacation_id} result = self.db_helper.fetch_one(text(query), params) if result is None: logger.warning(f"No report found for cultivacation_id={cultivacation_id}") return pd.DataFrame() return pd.DataFrame([dict(result._mapping)]) if __name__ == "__main__": dao = MySqlDao() city_uuid = '00000000000000000000000011445301' data = dao.load_order_data(city_uuid) print(data)