import redis from core import get_logger, settings, DatabaseException logger = get_logger("database.redis") class RedisDatabaseHelper: _instance = None def __new__(cls): if not cls._instance: cls._instance = super(RedisDatabaseHelper, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return try: pool = redis.ConnectionPool( host=settings.redis_host, port=settings.redis_port, password=settings.redis_password, db=settings.redis_db, decode_responses=True, max_connections=50, ) self.redis = redis.StrictRedis(connection_pool=pool) self.redis.ping() logger.info("Redis connection established", extra={"extra_data": {"host": settings.redis_host, "db": settings.redis_db}}) except redis.ConnectionError as e: logger.error("Failed to connect to Redis", exc_info=True) raise DatabaseException(message="Redis连接失败", detail=str(e)) self._initialized = True def check_connection(self) -> bool: """检查 Redis 连接是否正常""" try: self.redis.ping() return True except Exception: return False