package message import ( "context" "encoding/json" "fmt" "strings" "time" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/util/gconv" ) // redisMessageClient Redis 实现 type redisMessageClient struct { clientType messageClientType } // RedisStreamMessage Redis Stream 消息结构 type RedisStreamMessage struct { ID string Values map[string]interface{} } // StreamGroup 创建消费组(支持单个或批量) func (q *redisMessageClient) streamGroup(ctx context.Context, configs ...interface{}) error { if len(configs) == 0 { return fmt.Errorf("配置不能为空") } for _, config := range configs { cfg, ok := config.(*RedisConfig) if !ok { return fmt.Errorf("无效的 Redis 配置类型") } if err := q.createStreamGroup(ctx, cfg); err != nil { return err } } return nil } // streamGroup 内部单个创建消费组 func (q *redisMessageClient) createStreamGroup(ctx context.Context, cfg *RedisConfig) error { // 获取默认数据源 ds, err := GetManager().GetDefaultDataSource() if err != nil { return fmt.Errorf("获取默认数据源失败: %w", err) } // 检查连接状态,未连接则自动重连 if !ds.IsConnected() { if err := ds.Reconnect(ctx); err != nil { return fmt.Errorf("redis重连失败: %w", err) } } _, err = ds.Redis().Do(ctx, "XGROUP", "CREATE", cfg.Stream, cfg.Group, "0", "MKSTREAM") if err != nil { errStr := err.Error() if strings.Contains(errStr, "BUSYGROUP") && strings.Contains(errStr, "already exists") { glog.Infof(ctx, "✅ Redis 消费者组已存在: %s", cfg.Group) return nil } return fmt.Errorf("初始化消费者组失败: %w", err) } glog.Infof(ctx, "✅ Redis 消费者组创建成功: %s", cfg.Group) return nil } // Publish 内部单个发布消息 func (q *redisMessageClient) publish(ctx context.Context, config interface{}, data interface{}) error { ds, err := GetManager().GetDefaultDataSource() if err != nil { return fmt.Errorf("获取默认数据源失败: %w", err) } if !ds.IsConnected() { if err := ds.Reconnect(ctx); err != nil { return fmt.Errorf("redis重连失败: %w", err) } } cfg, ok := config.(*RedisConfig) if !ok { return fmt.Errorf("无效的redis配置类型") } values := gconv.Map(data) args := make([]interface{}, 0, len(values)*2+2) args = append(args, cfg.Stream, "*") for key, val := range values { args = append(args, key, val) } result, err := ds.Redis().Do(ctx, "XADD", args...) if err != nil { g.Log().Errorf(ctx, "❌ Redis 发布消息失败: topic=%s, err=%v", cfg.Stream, err) return err } g.Log().Infof(ctx, "✅ Redis 发布消息成功: topic=%s, messageID=%s", cfg.Stream, gconv.String(result)) return nil } // PublishDelayed 发布延迟消息(使用 ZSET) func (q *redisMessageClient) publishDelayed(ctx context.Context, config interface{}, data interface{}, delay int) error { ds, err := GetManager().GetDefaultDataSource() if err != nil { return fmt.Errorf("获取默认数据源失败: %w", err) } if !ds.IsConnected() { if err := ds.Reconnect(ctx); err != nil { return fmt.Errorf("redis重连失败: %w", err) } } cfg, ok := config.(*RedisConfig) if !ok { return fmt.Errorf("无效的redis配置类型") } payload, err := json.Marshal(data) if err != nil { return fmt.Errorf("序列化数据失败: %w", err) } score := float64(time.Now().Add(time.Duration(delay)).UnixMilli()) delayedKey := fmt.Sprintf("delayed:%s", cfg.Stream) // ZADD delayedKey score payload _, err = ds.Redis().Do(ctx, "ZADD", delayedKey, score, string(payload)) if err != nil { return err } g.Log().Infof(ctx, "✅ Redis 延迟消息已发布: topic=%s, delay=%v", cfg.Stream, delay) return nil } // Subscribe 订阅消息(支持单个或批量) func (q *redisMessageClient) subscribe(ctx context.Context, configs ...interface{}) error { if len(configs) == 0 { return fmt.Errorf("配置不能为空") } for _, config := range configs { cfg, ok := config.(*RedisConfig) if !ok { return fmt.Errorf("无效的 Redis 配置类型") } handler := cfg.HandleFunc if handler == nil { return fmt.Errorf("必须提供处理函数") } if err := q.createSubscribe(ctx, cfg, handler); err != nil { return err } } return nil } // subscribe 内部单个订阅消息 func (q *redisMessageClient) createSubscribe(ctx context.Context, cfg *RedisConfig, handler func(ctx context.Context, message map[string]interface{}) error) error { go func() { defer func() { if r := recover(); r != nil { g.Log().Errorf(ctx, "❌ Redis 消费者 panic: %v", r) } }() retryTicker := time.NewTicker(time.Second) defer retryTicker.Stop() // 重试计数器 var consecutiveErrors int const maxConsecutiveErrors = 3 for { select { case <-ctx.Done(): g.Log().Infof(ctx, "🔕 Redis 消费者停止: topic=%s", cfg.Stream) return case <-retryTicker.C: err := q.consumeMessages(ctx, cfg, handler) if err != nil { // 对于超时错误,返回nil继续循环,而不是返回错误 if strings.Contains(err.Error(), "i/o timeout") || strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "context deadline exceeded") || strings.Contains(err.Error(), "context canceled") { consecutiveErrors++ if consecutiveErrors > maxConsecutiveErrors { g.Log().Errorf(ctx, "Max retries exceeded, giving up") return } backoffTime := 5 * time.Second g.Log().Warningf(ctx, "⚠️ 等待 %v 后重试...", backoffTime) time.Sleep(backoffTime) } else { // 非超时错误(严重错误) consecutiveErrors = 0 // 重置计数 g.Log().Errorf(ctx, "严重错误,立即重试: %v", err) // 短暂等待后重试 select { case <-ctx.Done(): return case <-time.After(time.Second): // 继续循环 } } } else { // 成功时重置错误计数器 consecutiveErrors = 0 } } } }() return nil } // consumeMessages 消费消息 func (q *redisMessageClient) consumeMessages(ctx context.Context, cfg *RedisConfig, handler func(ctx context.Context, message map[string]interface{}) error) error { ds, err := GetManager().GetDefaultDataSource() if err != nil { return fmt.Errorf("获取默认数据源失败: %w", err) } if !ds.IsConnected() { if err := ds.Reconnect(ctx); err != nil { return fmt.Errorf("redis重连失败: %w", err) } } // 检查消费者组是否存在 if err := q.createStreamGroup(ctx, cfg); err != nil { return fmt.Errorf("create stream group failed: %w", err) } // 使用带重试的命令执行 result, err := ds.Redis().Do(ctx, "XREADGROUP", "GROUP", cfg.Group, cfg.Consumer, "COUNT", cfg.Count, "BLOCK", 0, "STREAMS", cfg.Stream, ">") if err != nil { if strings.Contains(err.Error(), "i/o timeout") || strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "context deadline exceeded") || strings.Contains(err.Error(), "context canceled") { } return err } messages, err := q.parseStreamResult(result) if err != nil { return err } for _, msg := range messages { // 处理消息 if err := handler(ctx, msg.Values); err != nil { g.Log().Errorf(ctx, "❌ 消息处理失败: messageID=%s, err=%v", msg.ID, err) continue } // ACK 消息 if cfg.AutoAck { if err := q.ackMessage(ctx, cfg.Stream, cfg.Group, msg.ID); err != nil { g.Log().Errorf(ctx, "❌ ACK 消息失败: messageID=%s, err=%v", msg.ID, err) } } } return nil } // ackMessage ACK 消息 func (q *redisMessageClient) ackMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error { ds, err := GetManager().GetDefaultDataSource() if err != nil { return fmt.Errorf("获取默认数据源失败: %w", err) } if !ds.IsConnected() { if err := ds.Reconnect(ctx); err != nil { return fmt.Errorf("redis重连失败: %w", err) } } args := make([]interface{}, 0, len(messageIDs)+2) args = append(args, streamKey, groupName) for _, id := range messageIDs { args = append(args, id) } _, err = ds.Redis().Do(ctx, "XACK", args...) return err } // parseStreamResult 解析 Stream 结果 func (q *redisMessageClient) parseStreamResult(result interface{}) ([]RedisStreamMessage, error) { if result == nil { return []RedisStreamMessage{}, nil } var resultVal interface{} // 尝试获取 Val() 方法 if valuer, ok := result.(interface{ Val() interface{} }); ok { resultVal = valuer.Val() } else { resultVal = result } // 检查是否为空 if resultVal == nil { return []RedisStreamMessage{}, nil } // 预分配切片容量,避免多次扩容 messages := make([]RedisStreamMessage, 0) if streamsMap, ok := resultVal.(map[interface{}]interface{}); ok { for _, streamMsg := range streamsMap { msgArray, ok := streamMsg.([]interface{}) if !ok { continue } for _, msgData := range msgArray { msgArray, ok := msgData.([]interface{}) if !ok || len(msgArray) < 2 { continue } msgID := gconv.String(msgArray[0]) fieldsArray, ok := msgArray[1].([]interface{}) if !ok { continue } values := make(map[string]interface{}, len(fieldsArray)/2) for i := 0; i < len(fieldsArray); i += 2 { if i+1 < len(fieldsArray) { key := gconv.String(fieldsArray[i]) values[key] = fieldsArray[i+1] } } messages = append(messages, RedisStreamMessage{ ID: msgID, Values: values, }) } } } return messages, nil }