日期:2026-03-15
范围:database/db/mysql.py、database/dao/mysql_dao.py
方案:外科手术式 bug 修复,不改架构
connect_database:try/except 只包了连接字符串拼接,create_engine 在 try 块外,真正的连接失败不会被捕获。load_data_with_page:用 query.replace("SELECT *", "SELECT COUNT(*)") 构建 count 查询,查询不以 SELECT * 开头时会出错。另外会直接修改调用方传入的 params 字典(添加 limit、offset 键),产生副作用。fetch_all / fetch_one:捕获 SQLAlchemyError 后只打印错误、静默返回 None,调用方无法感知失败。get_cust_by_ids、get_shop_by_ids、get_product_by_ids、get_order_by_product_ids:IN (...) 子句通过字符串拼接构建,存在 SQL 注入风险。get_product_by_id:查询单条记录却走了分页逻辑,有不必要的开销。get_cust_list、get_product_from_order:直接调用 fetch_all(text(query)),与其他方法统一使用 load_data_with_page 的风格不一致。依赖修复 2 先完成。修复 1 — connect_database
将 create_engine(...) 移入 try 块,确保连接失败时被捕获并抛出 ConnectionAbortedError。
修复 2 — load_data_with_page
用子查询包裹原始查询来构建 count 查询,不再依赖字符串替换:
SELECT COUNT(*) FROM (<原始查询>) AS _count_subq
无论原始查询的 SELECT 子句是什么都能正确计数。同时在修改 params 前先复制,避免副作用:
params = dict(params) # 避免修改调用方的字典
修复 3 — fetch_all / fetch_one
在 except 块打印错误后加 raise,让异常向上传播。
注意:这是一个行为破坏性变更。当前调用方在失败时收到 None,修复后会收到未处理的异常。这是有意为之——显式异常比静默失败更容易排查问题。
修复 4 — IN 子句 SQL 注入
load_data_with_page 将 query 参数作为普通字符串处理(字符串拼接后再包装成 text()),与预构建的 text() 对象不兼容。
因此,受影响的四个方法改为直接调用 fetch_all,配合 bindparam(expanding=True) 参数化:
from sqlalchemy import bindparam, text
query = text("""
SELECT * FROM table
WHERE city_uuid = :city_uuid
AND col IN :ids
""").bindparams(bindparam("ids", expanding=True))
params = {"city_uuid": city_uuid, "ids": list(id_list)}
data = pd.DataFrame(self.db_helper.fetch_all(query, params))
这些方法的结果集大小由输入列表决定,跳过分页是安全的。
修复 5 — get_product_by_id
改用 fetch_one 查单条,返回单行 DataFrame 与其他方法保持一致:
result = self.db_helper.fetch_one(text(query), params)
return pd.DataFrame([dict(result._mapping)] if result else [])
修复 6 — get_cust_list / get_product_from_order
将直接调用 fetch_all(text(query)) 改为 load_data_with_page(query, params),统一接口风格。需要修复 2 先完成(子查询方式能正确处理 SELECT DISTINCT)。
由于存在依赖关系,按以下顺序执行: