from config import load_config from pymongo import MongoClient from pymongo.errors import ConnectionFailure from urllib.parse import quote_plus import pandas as pd class MongoClientHelper: def __init__(self): self.cfgs = load_config()['mongodb'] self.client = None self.db = None if self.client is None: self.connect() def connect(self): """连接MongoDB""" try: host = self.cfgs['host'] port = self.cfgs['port'] username = quote_plus(self.cfgs['username']) password = quote_plus(self.cfgs['password']) database = self.cfgs['database'] uri = f"mongodb://{username}:{password}@{host}:{port}/{database}?authSource=admin" self.client = MongoClient(uri) self.db = self.client[database] except ConnectionFailure as e: print(f"MongoDB数据库连接失败: {e}") def find_one(self, collection_name, query): """根据条件查询一条结果""" collection = self.db[collection_name] return collection.find_one(query) def find_many(self, collection_name, query): """根据条件查询多条结果""" collection = self.db[collection_name] return collection.find(query) def find_all(self, collection_name): """返回mongdb中指定collection中的所有数据""" collection = self.db[collection_name] return collection.find() def find_fields(self, collection_name, fields): """获取指定字段的所有数据""" collection = self.db[collection_name] projection = {field: 1 for field in fields} projection["_id"] = 0 return collection.find({}, projection) if __name__ == "__main__": db_client = MongoClientHelper() collection_name = 'obrand-ec' query = {"nick": "李宁篮球旗舰店"} fields = ["nick"] res = db_client.find_fields(collection_name, fields) l = [] for r in res: l.append(r) data = pd.DataFrame(l) print(data)