from database import MySqlDatabaseHelper from sqlalchemy import text, bindparam import pandas as pd class MySqlDao: _instance = None def __new__(cls): if not cls._instance: cls._instance = super(MySqlDao, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self.db_helper = MySqlDatabaseHelper() self._product_tablename = "tads_brandcul_product_info_f" self._cust_tablename = "tads_brandcul_cust_info_f" self._order_tablename = "tads_brandcul_consumer_order" self._eval_order_name = "tads_brandcul_consumer_order_check_week" self._mock_order_tablename = "yunfu_mock_data" self._shopping_tablename = "tads_brandcul_cust_info_lbs_f" # self._shopping_tablename = "yunfu_shopping_mock_data" self._report_tablename = "tads_brandcul_report" self._initialized = True def load_product_data(self, city_uuid): """从数据库中读取商品信息""" query = f"SELECT * FROM {self._product_tablename} WHERE city_uuid = :city_uuid" params = {"city_uuid": city_uuid} data = self.db_helper.load_data_with_page(query, params) return data def load_cust_data(self, city_uuid): """从数据库中读取商户信息""" query = f"SELECT * FROM {self._cust_tablename} WHERE BA_CITY_ORG_CODE = :city_uuid" params = {"city_uuid": city_uuid} data = self.db_helper.load_data_with_page(query, params) return data def load_order_data(self, city_uuid): """从数据库中读取订单信息""" query = f"SELECT * FROM {self._order_tablename} WHERE city_uuid = :city_uuid" params = {"city_uuid": city_uuid} data = self.db_helper.load_data_with_page(query, params) data.drop('stat_month', axis=1, inplace=True) data.drop('city_uuid', axis=1, inplace=True) cust_list = self.get_cust_list(city_uuid) cust_index = cust_list.set_index("BB_RETAIL_CUSTOMER_CODE") data = data.join(cust_index, on="cust_code", how="inner") return data def load_delivery_order_data(self, city_uuid, start_time, end_time): """从数据库中读取订单信息""" query = f"SELECT * FROM {self._eval_order_name} WHERE city_uuid = :city_uuid AND cycle_begin_date = :start_time AND cycle_end_date = :end_time" params = { "city_uuid": city_uuid, "start_time": start_time, "end_time": end_time } data = self.db_helper.load_data_with_page(query, params) return data def load_mock_order_data(self): """从数据库中读取mock的订单信息""" query = f"SELECT * FROM {self._mock_order_tablename}" data = self.db_helper.load_data_with_page(query, {}) return data def load_shopping_data(self, city_uuid): """从数据库中读取商圈数据""" query = f"SELECT * FROM {self._shopping_tablename} WHERE city_uuid = :city_uuid" params = {"city_uuid": city_uuid} data = self.db_helper.load_data_with_page(query, params) return data def get_product_by_id(self, city_uuid, product_id): """根据city_uuid 和 product_id 从表中获取拼柜信息""" query = f""" SELECT * FROM {self._product_tablename} WHERE city_uuid = :city_uuid AND product_code = :product_id """ params = {"city_uuid": city_uuid, "product_id": product_id} data = self.db_helper.load_data_with_page(query, params) return data def get_cust_by_ids(self, city_uuid, cust_id_list): """根据零售户列表查询其信息""" if not cust_id_list: return pd.DataFrame() query = text(f""" SELECT * FROM {self._cust_tablename} WHERE BA_CITY_ORG_CODE = :city_uuid AND BB_RETAIL_CUSTOMER_CODE IN :ids """).bindparams(bindparam("ids", expanding=True)) params = {"city_uuid": city_uuid, "ids": list(cust_id_list)} data = pd.DataFrame(self.db_helper.fetch_all(query, params)) return data def get_shop_by_ids(self, city_uuid, cust_id_list): """根据零售户列表查询其信息""" if not cust_id_list: return pd.DataFrame() query = text(f""" SELECT * FROM {self._shopping_tablename} WHERE city_uuid = :city_uuid AND cust_code IN :ids """).bindparams(bindparam("ids", expanding=True)) params = {"city_uuid": city_uuid, "ids": list(cust_id_list)} data = pd.DataFrame(self.db_helper.fetch_all(query, params)) return data def get_product_by_ids(self, city_uuid, product_id_list): """根据product_code列表查询其信息""" if not product_id_list: return pd.DataFrame() query = text(f""" SELECT * FROM {self._product_tablename} WHERE city_uuid = :city_uuid AND product_code IN :ids """).bindparams(bindparam("ids", expanding=True)) params = {"city_uuid": city_uuid, "ids": list(product_id_list)} data = pd.DataFrame(self.db_helper.fetch_all(query, params)) return data def get_order_by_product_ids(self, city_uuid, product_ids): """获取指定香烟列表的所有售卖记录""" if not product_ids: return pd.DataFrame() query = text(f""" SELECT * FROM {self._order_tablename} WHERE city_uuid = :city_uuid AND product_code IN :ids """).bindparams(bindparam("ids", expanding=True)) params = {"city_uuid": city_uuid, "ids": list(product_ids)} data = pd.DataFrame(self.db_helper.fetch_all(query, params)) cust_list = self.get_cust_list(city_uuid) cust_index = cust_list.set_index("BB_RETAIL_CUSTOMER_CODE") data = data.join(cust_index, on="cust_code", how="inner") return data def get_order_by_product(self, city_uuid, product_id): query = f""" SELECT * FROM {self._order_tablename} WHERE city_uuid = :city_uuid AND product_code = :product_id """ params = {"city_uuid": city_uuid, "product_id": product_id} data = self.db_helper.load_data_with_page(query, params) cust_list = self.get_cust_list(city_uuid) cust_index = cust_list.set_index("BB_RETAIL_CUSTOMER_CODE") data = data.join(cust_index, on="cust_code", how="inner") return data def get_eval_order_by_product(self, city_uuid, product_id): query = f""" SELECT * FROM {self._eval_order_name} WHERE city_uuid = :city_uuid AND product_code = :product_id """ params = {"city_uuid": city_uuid, "product_id": product_id} data = self.db_helper.load_data_with_page(query, params) return data def get_delivery_data_by_product(self, city_uuid, product_id, start_time, end_time): """通过品规获取验证数据""" query = f""" SELECT * FROM {self._eval_order_name} WHERE city_uuid = :city_uuid AND goods_code = :product_id AND cycle_begin_date = :start_time AND cycle_end_date = :end_time """ params = { "city_uuid": city_uuid, "product_id": product_id, "start_time": start_time, "end_time": end_time, } data = self.db_helper.load_data_with_page(query, params) return data def get_order_by_cust(self, city_uuid, cust_id): query = f""" SELECT * FROM {self._order_tablename} WHERE city_uuid = :city_uuid AND cust_code = :cust_id """ params = {"city_uuid": city_uuid, "cust_id": cust_id} data = self.db_helper.load_data_with_page(query, params) return data def get_order_by_cust_and_product(self, city_uuid, cust_id, product_id): query = f""" SELECT * FROM {self._order_tablename} WHERE city_uuid = :city_uuid AND cust_code = :cust_id AND product_code =:product_id """ params = {"city_uuid": city_uuid, "cust_id": cust_id, "product_id": product_id} data = self.db_helper.load_data_with_page(query, params) return data def get_product_from_order(self, city_uuid): query = f"SELECT DISTINCT product_code FROM {self._order_tablename} WHERE city_uuid = :city_uuid" params = {"city_uuid": city_uuid} data = self.db_helper.load_data_with_page(query, params) return data def get_cust_list(self, city_uuid): query = f"SELECT DISTINCT BB_RETAIL_CUSTOMER_CODE FROM {self._cust_tablename} WHERE BA_CITY_ORG_CODE = :city_uuid" params = {"city_uuid": city_uuid} data = self.db_helper.load_data_with_page(query, params) return data def data_preprocess(self, data: pd.DataFrame): """数据预处理""" data.drop(["cust_uuid", "longitude", "latitude", "range_radius"], axis=1, inplace=True) remaining_cols = data.columns.drop(["city_uuid", "cust_code"]) col_with_missing = remaining_cols[data[remaining_cols].isnull().any()].tolist() # 判断有缺失的字段 col_all_missing = remaining_cols[data[remaining_cols].isnull().all()].to_list() # 全部缺失的字段 col_partial_missing = list(set(col_with_missing) - set(col_all_missing)) # 部分缺失的字段 for col in col_partial_missing: data[col] = data[col].fillna(data[col].mean()) for col in col_all_missing: data[col] = data[col].fillna(0).infer_objects(copy=False) def insert_report(self, data_dict): """向report中插入数据""" return self.db_helper.insert_data(self._report_tablename, data_dict) def update_eval_report_data(self, cultivacation_id, eval_fileid): """更新投放记录中的验证报告fileid""" update_data = {"val_table": eval_fileid} conditions = [ "cultivacation_id = :cultivacation_id", ] condition_params = { 'cultivacation_id': cultivacation_id, } self.db_helper.update_data(self._report_tablename, update_data, conditions, condition_params) def get_report_file_id(self, cultivacation_id): """从report中根据cultivacation_id获取对应文件的fileid""" query = f"SELECT product_info_table, relation_table, similarity_product_table, recommend_table, val_table FROM {self._report_tablename} WHERE cultivacation_id = :cultivacation_id" params = {"cultivacation_id": cultivacation_id} result = self.db_helper.fetch_one(text(query), params) data = pd.DataFrame([dict(result._mapping)] if result else None) return data if __name__ == "__main__": dao = MySqlDao() cultivacation_id = '10000001' data = dao.get_report_file_id(cultivacation_id) print(data)