// ============================================================================= // Redis 连接管理 // 负责 Redis 的连接、重连、健康检查和优雅关闭 // ============================================================================= package message import ( "context" "fmt" "sync" "time" "github.com/gogf/gf/v2/database/gredis" "github.com/gogf/gf/v2/frame/g" ) var ( muRedis sync.RWMutex redisConns map[string]*gredis.Redis redisConfigs map[string]*gredis.Config ) func init() { redisConns = make(map[string]*gredis.Redis) redisConfigs = make(map[string]*gredis.Config) } // redisConnect 建立 Redis 连接 // name: 数据源名称,如果为空则使用默认数据源 func redisConnect(ctx context.Context, name string) error { if g.Cfg().MustGet(ctx, "redis").IsEmpty() { g.Log().Errorf(ctx, "❌ Redis 配置不存在") return fmt.Errorf("redis Configuration does not exist") } // 确定数据源名称 dsName := "default" if !g.IsEmpty(name) { dsName = name } g.Log().Infof(ctx, "🔔 Redis [%s] 开始创建连接", dsName) muRedis.Lock() defer muRedis.Unlock() // 安全地关闭旧连接(仅针对该数据源) if oldRedis, exists := redisConns[dsName]; exists && oldRedis != nil { oldRedis.Close(ctx) delete(redisConns, dsName) } // 从配置文件读取 Redis 配置 redisAddr := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.address", dsName)).String() if g.IsEmpty(redisAddr) { g.Log().Errorf(ctx, "❌ Redis 配置错误: address 不能为空 (数据源: %s)", dsName) return fmt.Errorf("❌ Redis 配置错误: address 不能为空 (数据源: %s)", dsName) } redisDB := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.db", dsName)).Int() if redisDB < 0 || redisDB > 15 { g.Log().Errorf(ctx, "❌ Redis 配置错误: db 必须在 0-15 之间 (当前值: %d)", redisDB) return fmt.Errorf("❌ Redis 配置错误: db 必须在 0-15 之间 (当前值: %d)", redisDB) } idleTimeout := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.idleTimeout", dsName)).String() redisIdleTimeout, err := time.ParseDuration(idleTimeout) if err != nil { g.Log().Errorf(ctx, "❌ Redis idleTimeout 格式错误: %v", err) return err } maxConnLifetime := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.maxConnLifetime", dsName)).String() redisMaxConnLifetime, err := time.ParseDuration(maxConnLifetime) if err != nil { g.Log().Errorf(ctx, "❌ Redis maxConnLifetime 格式错误: %v", err) return err } waitTimeout := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.waitTimeout", dsName)).String() redisWaitTimeout, err := time.ParseDuration(waitTimeout) if err != nil { g.Log().Errorf(ctx, "❌ Redis waitTimeout 格式错误: %v", err) return err } dialTimeout := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.dialTimeout", dsName)).String() redisDialTimeout, err := time.ParseDuration(dialTimeout) if err != nil { g.Log().Errorf(ctx, "❌ Redis dialTimeout 格式错误: %v", err) return err } readTimeout := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.readTimeout", dsName)).String() redisReadTimeout, err := time.ParseDuration(readTimeout) if err != nil { g.Log().Errorf(ctx, "❌ Redis readTimeout 格式错误: %v", err) return err } writeTimeout := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.writeTimeout", dsName)).String() redisWriteTimeout, err := time.ParseDuration(writeTimeout) if err != nil { g.Log().Errorf(ctx, "❌ Redis writeTimeout 格式错误: %v", err) return err } maxActive := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.maxActive", dsName)).Int() if g.IsEmpty(maxActive) { g.Log().Errorf(ctx, "❌ Redis maxActive 配置错误: %v", maxActive) return fmt.Errorf("❌ Redis maxActive 配置错误") } // 构建 GoFrame Redis 配置 redisConfig := &gredis.Config{ Address: redisAddr, Db: redisDB, IdleTimeout: redisIdleTimeout, MaxConnLifetime: redisMaxConnLifetime, WaitTimeout: redisWaitTimeout, DialTimeout: redisDialTimeout, ReadTimeout: redisReadTimeout, WriteTimeout: redisWriteTimeout, MaxActive: maxActive, } redisConfigs[dsName] = redisConfig // 使用 GoFrame 的 Redis 连接 newRedis, err := gredis.New(redisConfig) if err != nil { g.Log().Errorf(ctx, "❌ Redis [%s] 连接失败: %v", dsName, err) return err } // 测试连接(直接调用避免死锁) _, err = newRedis.Do(ctx, "PING") if err != nil { g.Log().Errorf(ctx, "❌ Redis [%s] 连接失败: ping 失败 - %v", dsName, err) _ = newRedis.Close(ctx) return err } redisConns[dsName] = newRedis g.Log().Infof(ctx, "✅ Redis [%s] 连接成功: %s (DB: %d)", dsName, redisAddr, redisDB) return nil } // redisPing 检测 Redis 连接状态(带超时保护) func redisPing(ctx context.Context, name string) bool { // 确定数据源名称 dsName := "default" if !g.IsEmpty(name) { dsName = name } muRedis.RLock() defer muRedis.RUnlock() rc, exists := redisConns[dsName] if !exists || rc == nil { g.Log().Errorf(ctx, "❌ Redis [%s] 连接未建立", dsName) return false } // 创建带超时的子上下文,避免死锁 timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() _, err := rc.Do(timeoutCtx, "PING") if err != nil { g.Log().Errorf(ctx, "❌ Redis [%s] ping 失败: %v", dsName, err) return false } g.Log().Infof(ctx, "📊 Redis [%s] 连接正常", dsName) return true } // redisClose 关闭 Redis 连接 func redisClose(ctx context.Context, name string) error { // 确定数据源名称 dsName := "default" if !g.IsEmpty(name) { dsName = name } muRedis.Lock() defer muRedis.Unlock() if rc, exists := redisConns[dsName]; exists && rc != nil { if err := rc.Close(ctx); err != nil { g.Log().Errorf(ctx, "❌ Redis [%s] 关闭失败: %v", dsName, err) return err } delete(redisConns, dsName) } g.Log().Infof(ctx, "✅ Redis [%s] 连接已关闭", dsName) return nil } // getRedisConn 获取 Redis 连接(内部使用) func getRedisConn(name string) *gredis.Redis { dsName := "default" if !g.IsEmpty(name) { dsName = name } return redisConns[dsName] }