mysql_dao.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. from core import get_logger
  2. from database import MySqlDatabaseHelper
  3. from sqlalchemy import text, bindparam
  4. import pandas as pd
  5. logger = get_logger("database.dao")
  6. class MySqlDao:
  7. _instance = None
  8. def __new__(cls):
  9. if not cls._instance:
  10. cls._instance = super(MySqlDao, cls).__new__(cls)
  11. cls._instance._initialized = False
  12. return cls._instance
  13. def __init__(self):
  14. if self._initialized:
  15. return
  16. self.db_helper = MySqlDatabaseHelper()
  17. self._product_tablename = "tads_brandcul_product_info_f"
  18. self._cust_tablename = "tads_brandcul_retail_cust_label"
  19. self._order_tablename = "tads_brandcul_consumer_order"
  20. self._eval_order_name = "tads_brandcul_consumer_order_check_week"
  21. self._order_analysis_table_name = "tads_brandcul_analysis_index" # 销售指标指标分析表
  22. self._mock_order_tablename = "yunfu_mock_data"
  23. self._shopping_tablename = "tads_brandcul_cust_info_lbs_f"
  24. # self._shopping_tablename = "yunfu_shopping_mock_data"
  25. self._report_tablename = "tads_brandcul_report"
  26. self._initialized = True
  27. def load_product_data(self, city_uuid):
  28. """从数据库中读取商品信息"""
  29. logger.info(f"Loading product data for city_uuid={city_uuid}")
  30. query = f"SELECT * FROM {self._product_tablename} WHERE city_uuid = :city_uuid AND org_is_active = '是'"
  31. params = {"city_uuid": city_uuid}
  32. data = self.db_helper.load_data_with_page(query, params)
  33. return data
  34. def load_cust_data(self, city_uuid):
  35. """从数据库中读取商户信息"""
  36. logger.info(f"Loading cust data for city_uuid={city_uuid}")
  37. query = f"SELECT * FROM {self._cust_tablename} WHERE corp_uuid = :city_uuid"
  38. params = {"city_uuid": city_uuid}
  39. data = self.db_helper.load_data_with_page(query, params)
  40. return data
  41. def load_order_data(self, city_uuid):
  42. """从数据库中读取订单信息"""
  43. logger.info(f"Loading order data for city_uuid={city_uuid}")
  44. query = f"SELECT * FROM {self._order_tablename} WHERE city_uuid = :city_uuid"
  45. params = {"city_uuid": city_uuid}
  46. data = self.db_helper.load_data_with_page(query, params)
  47. data.drop('stat_month', axis=1, inplace=True)
  48. data.drop('city_uuid', axis=1, inplace=True)
  49. cust_list = self.get_cust_list(city_uuid)
  50. cust_index = cust_list.set_index("cust_code")
  51. data = data.join(cust_index, on="cust_code", how="inner")
  52. return data
  53. def load_order_analysis_index_data(self, city_uuid):
  54. """从数据库中读取销售指标评估表"""
  55. logger.info(f"Loading order analysis index data for city_uuid={city_uuid}")
  56. query = f"SELECT * FROM {self._order_analysis_table_name} WHERE city_uuid = :city_uuid"
  57. params = {"city_uuid": city_uuid}
  58. data = self.db_helper.load_data_with_page(query, params)
  59. return data
  60. def load_delivery_order_data(self, city_uuid, start_time, end_time):
  61. """从数据库中读取订单信息"""
  62. logger.info(f"Loading delivery order data for city_uuid={city_uuid}, start_time={start_time}, end_time={end_time}")
  63. query = f"SELECT * FROM {self._eval_order_name} WHERE city_uuid = :city_uuid AND cycle_begin_date = :start_time AND cycle_end_date = :end_time"
  64. params = {
  65. "city_uuid": city_uuid,
  66. "start_time": start_time,
  67. "end_time": end_time
  68. }
  69. data = self.db_helper.load_data_with_page(query, params)
  70. return data
  71. def load_mock_order_data(self):
  72. """从数据库中读取mock的订单信息"""
  73. logger.info("Loading mock order data")
  74. query = f"SELECT * FROM {self._mock_order_tablename}"
  75. data = self.db_helper.load_data_with_page(query, {})
  76. return data
  77. def load_shopping_data(self, city_uuid):
  78. """从数据库中读取商圈数据"""
  79. logger.info(f"Loading shopping data for city_uuid={city_uuid}")
  80. query = f"SELECT * FROM {self._shopping_tablename} WHERE city_uuid = :city_uuid"
  81. params = {"city_uuid": city_uuid}
  82. data = self.db_helper.load_data_with_page(query, params)
  83. return data
  84. def get_product_by_id(self, city_uuid, product_id):
  85. """根据city_uuid 和 product_id 从表中获取拼柜信息"""
  86. logger.info(f"Getting product by id for city_uuid={city_uuid}, product_id={product_id}")
  87. query = text(f"""
  88. SELECT *
  89. FROM {self._product_tablename}
  90. WHERE city_uuid = :city_uuid
  91. AND product_code = :product_id
  92. """)
  93. params = {"city_uuid": city_uuid, "product_id": product_id}
  94. result = self.db_helper.fetch_one(query, params)
  95. return pd.DataFrame([dict(result._mapping)] if result else [])
  96. def get_cust_by_ids(self, city_uuid, cust_id_list):
  97. """根据零售户列表查询其信息"""
  98. logger.info(f"Getting cust by ids for city_uuid={city_uuid}, count={len(cust_id_list) if cust_id_list else 0}")
  99. if not cust_id_list:
  100. return pd.DataFrame()
  101. query = text(f"""
  102. SELECT *
  103. FROM {self._cust_tablename}
  104. WHERE corp_uuid = :city_uuid
  105. AND cust_code IN :ids
  106. """).bindparams(bindparam("ids", expanding=True))
  107. params = {"city_uuid": city_uuid, "ids": list(cust_id_list)}
  108. data = pd.DataFrame(self.db_helper.fetch_all(query, params))
  109. return data
  110. def get_shop_by_ids(self, city_uuid, cust_id_list):
  111. """根据零售户列表查询其信息"""
  112. logger.info(f"Getting shop by ids for city_uuid={city_uuid}, count={len(cust_id_list) if cust_id_list else 0}")
  113. if not cust_id_list:
  114. return pd.DataFrame()
  115. query = text(f"""
  116. SELECT *
  117. FROM {self._shopping_tablename}
  118. WHERE city_uuid = :city_uuid
  119. AND cust_code IN :ids
  120. """).bindparams(bindparam("ids", expanding=True))
  121. params = {"city_uuid": city_uuid, "ids": list(cust_id_list)}
  122. data = pd.DataFrame(self.db_helper.fetch_all(query, params))
  123. return data
  124. def get_product_by_ids(self, city_uuid, product_id_list):
  125. """根据product_code列表查询其信息"""
  126. logger.info(f"Getting products by ids for city_uuid={city_uuid}, count={len(product_id_list) if product_id_list else 0}")
  127. if not product_id_list:
  128. return pd.DataFrame()
  129. product_id_list = list(product_id_list)
  130. batch_size = 2000
  131. batches = [product_id_list[i:i + batch_size] for i in range(0, len(product_id_list), batch_size)]
  132. results = []
  133. for batch in batches:
  134. query = text(f"""
  135. SELECT *
  136. FROM {self._product_tablename}
  137. WHERE city_uuid = :city_uuid
  138. AND product_code IN :ids
  139. ORDER BY product_code
  140. """).bindparams(bindparam("ids", expanding=True))
  141. params = {"city_uuid": city_uuid, "ids": batch}
  142. results.append(pd.DataFrame(self.db_helper.fetch_all(query, params)))
  143. return pd.concat(results, ignore_index=True) if results else pd.DataFrame()
  144. def get_order_by_product_ids(self, city_uuid, product_ids):
  145. """获取指定香烟列表的所有售卖记录"""
  146. logger.info(f"Getting orders by product ids for city_uuid={city_uuid}, count={len(product_ids) if product_ids else 0}")
  147. if not product_ids:
  148. return pd.DataFrame()
  149. product_ids = list(product_ids)
  150. batch_size = 2000
  151. batches = [product_ids[i:i + batch_size] for i in range(0, len(product_ids), batch_size)]
  152. results = []
  153. for batch in batches:
  154. query = text(f"""
  155. SELECT *
  156. FROM {self._order_tablename}
  157. WHERE city_uuid = :city_uuid
  158. AND product_code IN :ids
  159. ORDER BY cust_code, product_code
  160. """).bindparams(bindparam("ids", expanding=True))
  161. params = {"city_uuid": city_uuid, "ids": batch}
  162. results.append(pd.DataFrame(self.db_helper.fetch_all(query, params)))
  163. data = pd.concat(results, ignore_index=True) if results else pd.DataFrame()
  164. cust_list = self.get_cust_list(city_uuid)
  165. cust_index = cust_list.set_index("cust_code")
  166. data = data.join(cust_index, on="cust_code", how="inner")
  167. return data
  168. def get_order_by_product(self, city_uuid, product_id):
  169. logger.info(f"Getting orders by product for city_uuid={city_uuid}, product_id={product_id}")
  170. query = f"""
  171. SELECT *
  172. FROM {self._order_tablename}
  173. WHERE city_uuid = :city_uuid
  174. AND product_code = :product_id
  175. """
  176. params = {"city_uuid": city_uuid, "product_id": product_id}
  177. data = self.db_helper.load_data_with_page(query, params)
  178. cust_list = self.get_cust_list(city_uuid)
  179. cust_index = cust_list.set_index("cust_code")
  180. data = data.join(cust_index, on="cust_code", how="inner")
  181. return data
  182. def get_eval_order_by_product(self, city_uuid, product_id):
  183. logger.info(f"Getting eval orders by product for city_uuid={city_uuid}, product_id={product_id}")
  184. query = f"""
  185. SELECT *
  186. FROM {self._eval_order_name}
  187. WHERE city_uuid = :city_uuid
  188. AND product_code = :product_id
  189. """
  190. params = {"city_uuid": city_uuid, "product_id": product_id}
  191. data = self.db_helper.load_data_with_page(query, params)
  192. return data
  193. def get_delivery_data_by_product(self, city_uuid, product_id, start_time, end_time):
  194. """通过品规获取验证数据"""
  195. logger.info(f"Getting delivery data by product for city_uuid={city_uuid}, product_id={product_id}, start_time={start_time}, end_time={end_time}")
  196. query = f"""
  197. SELECT *
  198. FROM {self._eval_order_name}
  199. WHERE city_uuid = :city_uuid
  200. AND goods_code = :product_id
  201. AND cycle_begin_date = :start_time
  202. AND cycle_end_date = :end_time
  203. """
  204. params = {
  205. "city_uuid": city_uuid,
  206. "product_id": product_id,
  207. "start_time": start_time,
  208. "end_time": end_time,
  209. }
  210. data = self.db_helper.load_data_with_page(query, params)
  211. return data
  212. def get_order_by_cust(self, city_uuid, cust_id):
  213. logger.info(f"Getting orders by cust for city_uuid={city_uuid}, cust_id={cust_id}")
  214. query = f"""
  215. SELECT *
  216. FROM {self._order_tablename}
  217. WHERE city_uuid = :city_uuid
  218. AND cust_code = :cust_id
  219. """
  220. params = {"city_uuid": city_uuid, "cust_id": cust_id}
  221. data = self.db_helper.load_data_with_page(query, params)
  222. return data
  223. def get_order_by_cust_and_product(self, city_uuid, cust_id, product_id):
  224. logger.info(f"Getting orders by cust and product for city_uuid={city_uuid}, cust_id={cust_id}, product_id={product_id}")
  225. query = f"""
  226. SELECT *
  227. FROM {self._order_tablename}
  228. WHERE city_uuid = :city_uuid
  229. AND cust_code = :cust_id
  230. AND product_code =:product_id
  231. """
  232. params = {"city_uuid": city_uuid, "cust_id": cust_id, "product_id": product_id}
  233. data = self.db_helper.load_data_with_page(query, params)
  234. return data
  235. def get_product_from_order(self, city_uuid):
  236. logger.info(f"Getting products from order for city_uuid={city_uuid}")
  237. query = f"SELECT DISTINCT product_code FROM {self._order_tablename} WHERE city_uuid = :city_uuid ORDER BY product_code"
  238. params = {"city_uuid": city_uuid}
  239. data = self.db_helper.load_data_with_page(query, params)
  240. return data
  241. def get_cust_list(self, city_uuid):
  242. logger.info(f"Getting cust list for city_uuid={city_uuid}")
  243. query = f"SELECT DISTINCT cust_code FROM {self._cust_tablename} WHERE corp_uuid = :city_uuid ORDER BY cust_code"
  244. params = {"city_uuid": city_uuid}
  245. data = self.db_helper.load_data_with_page(query, params)
  246. return data
  247. def insert_report(self, data_dict):
  248. """向report中插入数据"""
  249. logger.info("Inserting report data")
  250. return self.db_helper.insert_data(self._report_tablename, data_dict)
  251. def update_eval_report_data(self, cultivacation_id, eval_fileid):
  252. """更新投放记录中的验证报告fileid"""
  253. logger.info(f"Updating eval report data for cultivacation_id={cultivacation_id}")
  254. update_data = {"val_table": eval_fileid}
  255. conditions = [
  256. "cultivacation_id = :cultivacation_id",
  257. ]
  258. condition_params = {
  259. 'cultivacation_id': cultivacation_id,
  260. }
  261. self.db_helper.update_data(self._report_tablename, update_data, conditions, condition_params)
  262. def get_report_file_id(self, cultivacation_id):
  263. """从report中根据cultivacation_id获取对应文件的fileid"""
  264. logger.info(f"Getting report file id for cultivacation_id={cultivacation_id}")
  265. query = f"SELECT product_info_table, relation_table, similarity_product_table, recommend_table, val_table FROM {self._report_tablename} WHERE cultivacation_id = :cultivacation_id"
  266. params = {"cultivacation_id": cultivacation_id}
  267. result = self.db_helper.fetch_one(text(query), params)
  268. if result is None:
  269. logger.warning(f"No report found for cultivacation_id={cultivacation_id}")
  270. return pd.DataFrame()
  271. return pd.DataFrame([dict(result._mapping)])
  272. if __name__ == "__main__":
  273. dao = MySqlDao()
  274. city_uuid = '00000000000000000000000011445301'
  275. data = dao.load_order_data(city_uuid)
  276. print(data)