diff --git a/message/nats_msg.go b/message/nats_msg.go new file mode 100644 index 0000000..519de96 --- /dev/null +++ b/message/nats_msg.go @@ -0,0 +1,204 @@ +package message + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/gogf/gf/v2/frame/g" + "github.com/nats-io/nats.go/jetstream" +) + +// natsMessageClient NATS 实现 +type natsMessageClient struct { + clientType messageClientType +} + +// StreamGroup 创建消费组(支持单个或批量) +func (q *natsMessageClient) streamGroup(ctx context.Context, configs ...interface{}) error { + if len(configs) == 0 { + return fmt.Errorf("配置不能为空") + } + for _, config := range configs { + cfg, ok := config.(*NATSConfig) + if !ok { + return fmt.Errorf("无效的 NATS 配置类型") + } + if err := q.createStreamGroup(ctx, cfg); err != nil { + return err + } + } + return nil +} + +// createStreamGroup 内部单个创建消费组 +func (q *natsMessageClient) createStreamGroup(ctx context.Context, cfg *NATSConfig) error { + // Stream 不存在,创建新的 + storage := jetstream.FileStorage + if !cfg.Durable { + storage = jetstream.MemoryStorage + } + if g.IsEmpty(cfg.Replicas) { + cfg.Replicas = 1 + } + // 构建流配置 + jsConfig := jetstream.StreamConfig{ + Name: cfg.Stream, + Subjects: []string{fmt.Sprintf("%s.>", cfg.Stream)}, + Replicas: cfg.Replicas, + NoAck: cfg.AutoAck, + AllowMsgSchedules: cfg.DelayMessage, // 延迟消息核心开关 + Storage: storage, + Discard: jetstream.DiscardOld, // 达到上限删除旧消息 + } + // 检查流是否已存在 + stream, err := js.Stream(ctx, cfg.Stream) + if err == nil { + // 流已存在,更新配置 + _, err = js.UpdateStream(ctx, jsConfig) + if err != nil { + return fmt.Errorf("更新任务流失败: %w", err) + } + g.Log().Infof(ctx, "任务流已更新: %s", stream.CachedInfo().Config.Name) + return nil + } + // 创建新流 + stream, err = js.CreateStream(ctx, jsConfig) + if err != nil { + return fmt.Errorf("创建任务流失败: %w", err) + } + + g.Log().Infof(ctx, "✅ NATS 队列初始化成功: stream=%s, consumer=%s", cfg.Stream, cfg.Consumer) + return nil +} + +// Publish 发布消息(支持单个或批量) +func (q *natsMessageClient) publish(ctx context.Context, config interface{}, data interface{}) error { + cfg, ok := config.(*NATSConfig) + if !ok { + return fmt.Errorf("无效的 NATS 配置类型") + } + payload, err := json.Marshal(data) + if err != nil { + return fmt.Errorf("序列化数据失败: %w", err) + } + + // 发布消息到 JetStream + subject := fmt.Sprintf("%s.>", cfg.Stream) + _, err = js.Publish(ctx, subject, payload) + if err != nil { + g.Log().Errorf(ctx, "❌ NATS 发布消息失败: topic=%s, err=%v", cfg.Stream, err) + return err + } + + g.Log().Infof(ctx, "✅ NATS 发布消息成功: topic=%s", cfg.Stream) + return nil +} + +// PublishDelayed 发布延迟消息(支持单个或批量) +func (q *natsMessageClient) publishDelayed(ctx context.Context, config interface{}, data interface{}, delay int) error { + + cfg, ok := config.(*NATSConfig) + if !ok { + return fmt.Errorf("无效的 NATS 配置类型") + } + + payload, err := json.Marshal(data) + if err != nil { + return fmt.Errorf("序列化数据失败: %w", err) + } + + // 使用 goroutine 实现简单的延迟发布 + go func() { + time.Sleep(time.Duration(delay)) + subject := fmt.Sprintf("%s.>", cfg.Stream) + if err := q.publishInternal(ctx, subject, payload); err != nil { + g.Log().Errorf(ctx, "❌ NATS 延迟消息发布失败: topic=%s, delay=%v, err=%v", cfg.Stream, delay, err) + } + }() + + g.Log().Infof(ctx, "✅ NATS 延迟消息已提交: topic=%s, delay=%v", cfg.Stream, delay) + + return nil +} + +// publishInternal 内部发布消息 +func (q *natsMessageClient) publishInternal(ctx context.Context, subject string, payload []byte) error { + _, err := js.Publish(ctx, subject, payload) + return err +} + +// Subscribe 订阅消息(支持单个或批量) +func (q *natsMessageClient) subscribe(ctx context.Context, configs ...interface{}) error { + if len(configs) == 0 { + return fmt.Errorf("配置不能为空") + } + + for _, config := range configs { + cfg, ok := config.(*NATSConfig) + if !ok { + return fmt.Errorf("无效的 NATS 配置类型") + } + handler := cfg.HandleFunc + if handler == nil { + return fmt.Errorf("必须提供处理函数") + } + if err := q.createSubscribe(ctx, cfg, handler); err != nil { + return err + } + } + return nil +} + +// subscribe 内部单个订阅消息 +func (q *natsMessageClient) createSubscribe(ctx context.Context, cfg *NATSConfig, handler func(ctx context.Context, message map[string]interface{}) error) error { + g.Log().Infof(ctx, "🔔 NATS 开始订阅: stream=%s, consumer=%s", cfg.Stream, cfg.Consumer) + // Stream 不存在,创建新的 + ackPolicy := jetstream.AckExplicitPolicy + if cfg.AutoAck { + ackPolicy = jetstream.AckNonePolicy + } + jsConfig := jetstream.ConsumerConfig{ + Name: cfg.Consumer, + Durable: cfg.Consumer, + AckPolicy: ackPolicy, + MaxDeliver: 3, + MaxAckPending: cfg.PrefetchCount, + } + // 创建新消费者 + consumer, err := js.CreateOrUpdateConsumer(ctx, cfg.Stream, jsConfig) + if err != nil { + return fmt.Errorf("创建消费者失败: %w", err) + } + // 创建消息处理函数 + msgHandler := func(msg jetstream.Msg) { + // 解析消息 + var data map[string]any + if err := json.Unmarshal(msg.Data(), &data); err != nil { + g.Log().Errorf(ctx, "解析消息失败: %v", err) + msg.Nak() + return + } + // 处理业务逻辑 + if err := handler(ctx, data); err != nil { + g.Log().Errorf(ctx, "处理消息失败: %v", err) + msg.Nak() + return + } + g.Log().Infof(ctx, "处理消息成功") + if !cfg.AutoAck { + msg.Ack() + } + } + + // 开始消费 + _, err = consumer.Consume(msgHandler) + if err != nil { + return fmt.Errorf("开始消费失败: %w", err) + } + + g.Log().Infof(ctx, "✅ 开始消费消息: %s/%s", cfg.Stream, cfg.Consumer) + + return nil +} diff --git a/message/rabbitmq_msg.go b/message/rabbitmq_msg.go new file mode 100644 index 0000000..8d49663 --- /dev/null +++ b/message/rabbitmq_msg.go @@ -0,0 +1,286 @@ +package message + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/gogf/gf/v2/frame/g" + amqp "github.com/rabbitmq/amqp091-go" +) + +// rabbitMQMessageClient RabbitMQ 实现 +type rabbitMQMessageClient struct { + clientType messageClientType +} + +// StreamGroup 创建消费组(支持单个或批量) +func (q *rabbitMQMessageClient) streamGroup(ctx context.Context, configs ...interface{}) error { + if len(configs) == 0 { + return fmt.Errorf("配置不能为空") + } + for _, config := range configs { + cfg, ok := config.(*RabbitMQConfig) + if !ok { + return fmt.Errorf("无效的 RabbitMQ 配置类型") + } + if err := q.setupQueue(ctx, channel, cfg, cfg.DelayMessage); err != nil { + return err + } + } + return nil +} + +// Publish 发布消息(支持单个或批量) +func (q *rabbitMQMessageClient) publish(ctx context.Context, config interface{}, data interface{}) error { + cfg, ok := config.(*RabbitMQConfig) + if !ok { + return fmt.Errorf("无效的 RabbitMQ 配置类型") + } + if err := q.publishMessage(ctx, cfg, "work", data, 0); err != nil { + g.Log().Errorf(ctx, "❌ RabbitMQ 发布消息失败: err=%v", err) + return err + } + return nil +} + +// PublishDelayed 发布延迟消息 +func (q *rabbitMQMessageClient) publishDelayed(ctx context.Context, config interface{}, data interface{}, delaySeconds int) error { + cfg, ok := config.(*RabbitMQConfig) + if !ok { + return fmt.Errorf("无效的 RabbitMQ 配置类型") + } + if err := q.publishMessage(ctx, cfg, "delayed", data, delaySeconds); err != nil { + g.Log().Errorf(ctx, "❌ RabbitMQ 发布延迟消息失败: err=%v", err) + return err + } + return nil +} + +func (q *rabbitMQMessageClient) publishMessage(ctx context.Context, cfg *RabbitMQConfig, mode string, data interface{}, delaySeconds int) error { + body, err := json.Marshal(data) + if err != nil { + return fmt.Errorf("序列化数据失败: %w", err) + } + deliveryMode := amqp.Transient + if cfg.Durable { + deliveryMode = amqp.Persistent + } + publishing := amqp.Publishing{ + ContentType: "application/json", + Body: body, + DeliveryMode: deliveryMode, + Timestamp: time.Now(), + } + if delaySeconds > 0 { + publishing.Headers = amqp.Table{ + "x-delay": delaySeconds * 1000, // 延时时间(毫秒) + } + } + exchange, routingKey := q.parseExchangeAndRoutingKey(ctx, mode, cfg) + err = channel.PublishWithContext( + ctx, + exchange, + routingKey, + false, false, + publishing, + ) + return err +} + +func (q *rabbitMQMessageClient) parseExchangeAndRoutingKey(_ context.Context, mode string, cfg *RabbitMQConfig) (exchange, routingKey string) { + switch mode { + case "work", "": + exchange = "" // 默认交换机 + routingKey = cfg.Name // 队列名 + case "event", "topic": + exchange = cfg.Exchange + routingKey = cfg.Topic + case "broadcast": + exchange = cfg.Exchange + routingKey = "" // fanout忽略路由键 + case "delayed": + exchange = cfg.Exchange + ".delayed" + routingKey = cfg.Topic + default: + exchange = "" + routingKey = cfg.Name + } + return exchange, routingKey +} + +// setupQueue 统一的队列设置方法(声明 Exchange、队列、绑定、延迟 Exchange) +func (q *rabbitMQMessageClient) setupQueue(ctx context.Context, ch *amqp.Channel, cfg *RabbitMQConfig, delayMessage bool) error { + exchange, routingKey := q.parseExchangeAndRoutingKey(ctx, cfg.Mode, cfg) + + // 声明 Exchange + if err := ch.ExchangeDeclare(exchange, "topic", cfg.Durable, false, false, false, nil); err != nil { + return fmt.Errorf("声明 Exchange 失败: %w", err) + } + + // 声明队列 + if _, err := ch.QueueDeclare(cfg.Queue, cfg.Durable, false, false, false, nil); err != nil { + return fmt.Errorf("声明队列失败: %w", err) + } + + // 绑定队列 + if err := ch.QueueBind(cfg.Queue, routingKey, exchange, false, nil); err != nil { + return fmt.Errorf("绑定队列失败: %w", err) + } + + // 声明延迟 Exchange(如果需要) + if delayMessage { + if err := ch.ExchangeDeclare(exchange, "x-delayed-message", true, false, false, false, amqp.Table{"x-delayed-type": "direct"}); err != nil { + return fmt.Errorf("声明延迟 Exchange 失败: %w", err) + } + if err := ch.QueueBind(cfg.Name, routingKey, exchange, false, nil); err != nil { + return fmt.Errorf("绑定延迟队列失败: %w", err) + } + } + + return nil +} + +// Subscribe 订阅消息(支持单个或批量) +func (q *rabbitMQMessageClient) subscribe(ctx context.Context, configs ...interface{}) error { + if len(configs) == 0 { + return fmt.Errorf("配置不能为空") + } + + for _, config := range configs { + cfg, ok := config.(*RabbitMQConfig) + if !ok { + return fmt.Errorf("无效的 RabbitMQ 配置类型") + } + handler := cfg.HandleFunc + if handler == nil { + return fmt.Errorf("必须提供处理函数") + } + if err := q.createSubscribe(ctx, cfg, handler); err != nil { + return err + } + } + return nil +} + +// subscribe 内部单个订阅消息 +func (q *rabbitMQMessageClient) createSubscribe(ctx context.Context, cfg *RabbitMQConfig, handler func(ctx context.Context, message map[string]interface{}) error) error { + g.Log().Infof(ctx, "🔔 RabbitMQ 开始订阅: exchange=%s, queue=%s", cfg.Exchange, cfg.Queue) + + // 设置 Qos (预取数量),控制每次推送的消息数量 + // prefetchCount: 未 ACK 消息的最大数量 + // prefetchSize: 未 ACK 消息的总大小(0 表示不限制) + // global: false 表示仅应用于当前消费者 + prefetchCount := cfg.PrefetchCount + if prefetchCount <= 0 { + prefetchCount = 10 // 默认值为 10 + } + if err := channel.Qos(prefetchCount, 0, false); err != nil { + return fmt.Errorf("设置 Qos 失败: %w", err) + } + g.Log().Infof(ctx, "📊 设置 Prefetch Count: %d", prefetchCount) + + msg, err := channel.Consume( + cfg.Queue, // queue + cfg.Queue, // consumer + cfg.AutoAck, // auto-ack (根据配置决定) + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + return fmt.Errorf("注册消费者失败: %w", err) + } + + go func() { + defer func() { + if r := recover(); r != nil { + g.Log().Errorf(ctx, "❌ RabbitMQ 消费者 panic: %v", r) + } + }() + + // 并发控制信号量 + semaphore := make(chan struct{}, 10) // 限制最大并发数为 10 + + for { + select { + case <-ctx.Done(): + g.Log().Infof(ctx, "🔕 RabbitMQ 消费者停止: queue=%s", cfg.Queue) + return + case msg, ok := <-msg: + if !ok { + g.Log().Warningf(ctx, "⚠️ RabbitMQ 消息通道关闭") + return + } + + // 获取并发控制槽位 + semaphore <- struct{}{} + + go func(m amqp.Delivery) { + defer func() { + <-semaphore // 释放槽位 + if r := recover(); r != nil { + g.Log().Errorf(ctx, "❌ 消息处理 panic: %v", r) + } + }() + + if err := q.handleMessageWithRetry(ctx, m, handler, cfg.MaxRetry); err != nil { + g.Log().Errorf(ctx, "❌ 消息处理失败(重试次数耗尽): %v", err) + + // 仅在手动 ACK 模式下拒绝消息 + if !cfg.AutoAck { + // 拒绝消息不再重新入队(避免死循环) + m.Nack(false, false) + } + return + } + + // 仅在手动 ACK 模式下确认消息 + if cfg.AutoAck { + if err := m.Ack(false); err != nil { + g.Log().Errorf(ctx, "❌ ACK 消息失败: %v", err) + } + } + }(msg) + } + } + }() + + return nil +} + +// handleMessageWithRetry 处理消息(支持重试) +func (q *rabbitMQMessageClient) handleMessageWithRetry(ctx context.Context, msg amqp.Delivery, handler func(ctx context.Context, message map[string]interface{}) error, maxRetry int) error { + var data map[string]interface{} + + if err := json.Unmarshal(msg.Body, &data); err != nil { + // 如果不是 JSON,直接使用原始内容 + data = map[string]interface{}{ + "data": string(msg.Body), + } + } + + // 重试逻辑 + for attempt := 0; attempt <= maxRetry; attempt++ { + if attempt > 0 { + g.Log().Infof(ctx, "🔄 消息处理重试 (第%d次)", attempt) + // 指数退避 + time.Sleep(time.Duration(attempt) * time.Second) + } + + err := handler(ctx, data) + if err == nil { + return nil // 成功 + } + + g.Log().Warningf(ctx, "⚠️ 消息处理失败 (第%d次): %v", attempt+1, err) + + if attempt == maxRetry { + return fmt.Errorf("达到最大重试次数 %d: %w", maxRetry, err) + } + } + + return nil +} diff --git a/message/redis_msg.go b/message/redis_msg.go new file mode 100644 index 0000000..135a78c --- /dev/null +++ b/message/redis_msg.go @@ -0,0 +1,344 @@ +package message + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/glog" + "github.com/gogf/gf/v2/util/gconv" +) + +// redisMessageClient Redis 实现 +type redisMessageClient struct { + clientType messageClientType +} + +// RedisStreamMessage Redis Stream 消息结构 +type RedisStreamMessage struct { + ID string + Values map[string]interface{} +} + +// StreamGroup 创建消费组(支持单个或批量) +func (q *redisMessageClient) streamGroup(ctx context.Context, configs ...interface{}) error { + if len(configs) == 0 { + return fmt.Errorf("配置不能为空") + } + for _, config := range configs { + cfg, ok := config.(*RedisConfig) + if !ok { + return fmt.Errorf("无效的 Redis 配置类型") + } + if err := q.createStreamGroup(ctx, cfg); err != nil { + return err + } + } + return nil +} + +// streamGroup 内部单个创建消费组 +func (q *redisMessageClient) createStreamGroup(ctx context.Context, cfg *RedisConfig) error { + // 获取默认数据源 + ds, err := GetManager().GetDefaultDataSource() + if err != nil { + return fmt.Errorf("获取默认数据源失败: %w", err) + } + + // 检查连接状态,未连接则自动重连 + if !ds.IsConnected() { + if err := ds.Reconnect(ctx); err != nil { + return fmt.Errorf("redis重连失败: %w", err) + } + } + + _, err = ds.Redis().Do(ctx, "XGROUP", "CREATE", cfg.Stream, cfg.Group, "0", "MKSTREAM") + if err != nil { + errStr := err.Error() + if strings.Contains(errStr, "BUSYGROUP") && strings.Contains(errStr, "already exists") { + glog.Infof(ctx, "✅ Redis 消费者组已存在: %s", cfg.Group) + return nil + } + return fmt.Errorf("初始化消费者组失败: %w", err) + } + glog.Infof(ctx, "✅ Redis 消费者组创建成功: %s", cfg.Group) + return nil +} + +// Publish 内部单个发布消息 +func (q *redisMessageClient) publish(ctx context.Context, config interface{}, data interface{}) error { + ds, err := GetManager().GetDefaultDataSource() + if err != nil { + return fmt.Errorf("获取默认数据源失败: %w", err) + } + + if !ds.IsConnected() { + if err := ds.Reconnect(ctx); err != nil { + return fmt.Errorf("redis重连失败: %w", err) + } + } + + cfg, ok := config.(*RedisConfig) + if !ok { + return fmt.Errorf("无效的redis配置类型") + } + values := gconv.Map(data) + args := make([]interface{}, 0, len(values)*2+2) + args = append(args, cfg.Stream, "*") + for key, val := range values { + args = append(args, key, val) + } + result, err := ds.Redis().Do(ctx, "XADD", args...) + if err != nil { + g.Log().Errorf(ctx, "❌ Redis 发布消息失败: topic=%s, err=%v", cfg.Stream, err) + return err + } + g.Log().Infof(ctx, "✅ Redis 发布消息成功: topic=%s, messageID=%s", cfg.Stream, gconv.String(result)) + return nil +} + +// PublishDelayed 发布延迟消息(使用 ZSET) +func (q *redisMessageClient) publishDelayed(ctx context.Context, config interface{}, data interface{}, delay int) error { + ds, err := GetManager().GetDefaultDataSource() + if err != nil { + return fmt.Errorf("获取默认数据源失败: %w", err) + } + + if !ds.IsConnected() { + if err := ds.Reconnect(ctx); err != nil { + return fmt.Errorf("redis重连失败: %w", err) + } + } + + cfg, ok := config.(*RedisConfig) + if !ok { + return fmt.Errorf("无效的redis配置类型") + } + payload, err := json.Marshal(data) + if err != nil { + return fmt.Errorf("序列化数据失败: %w", err) + } + score := float64(time.Now().Add(time.Duration(delay)).UnixMilli()) + delayedKey := fmt.Sprintf("delayed:%s", cfg.Stream) + + // ZADD delayedKey score payload + _, err = ds.Redis().Do(ctx, "ZADD", delayedKey, score, string(payload)) + if err != nil { + return err + } + + g.Log().Infof(ctx, "✅ Redis 延迟消息已发布: topic=%s, delay=%v", cfg.Stream, delay) + return nil +} + +// Subscribe 订阅消息(支持单个或批量) +func (q *redisMessageClient) subscribe(ctx context.Context, configs ...interface{}) error { + if len(configs) == 0 { + return fmt.Errorf("配置不能为空") + } + for _, config := range configs { + cfg, ok := config.(*RedisConfig) + if !ok { + return fmt.Errorf("无效的 Redis 配置类型") + } + handler := cfg.HandleFunc + if handler == nil { + return fmt.Errorf("必须提供处理函数") + } + if err := q.createSubscribe(ctx, cfg, handler); err != nil { + return err + } + } + return nil +} + +// subscribe 内部单个订阅消息 +func (q *redisMessageClient) createSubscribe(ctx context.Context, cfg *RedisConfig, handler func(ctx context.Context, message map[string]interface{}) error) error { + go func() { + defer func() { + if r := recover(); r != nil { + g.Log().Errorf(ctx, "❌ Redis 消费者 panic: %v", r) + } + }() + + retryTicker := time.NewTicker(time.Second) + defer retryTicker.Stop() + + // 重试计数器 + var consecutiveErrors int + const maxConsecutiveErrors = 3 + + for { + select { + case <-ctx.Done(): + g.Log().Infof(ctx, "🔕 Redis 消费者停止: topic=%s", cfg.Stream) + return + case <-retryTicker.C: + err := q.consumeMessages(ctx, cfg, handler) + if err != nil { + // 对于超时错误,返回nil继续循环,而不是返回错误 + if strings.Contains(err.Error(), "i/o timeout") || strings.Contains(err.Error(), "timeout") || + strings.Contains(err.Error(), "context deadline exceeded") || strings.Contains(err.Error(), "context canceled") { + + consecutiveErrors++ + if consecutiveErrors > maxConsecutiveErrors { + g.Log().Errorf(ctx, "Max retries exceeded, giving up") + return + } + backoffTime := 5 * time.Second + g.Log().Warningf(ctx, "⚠️ 等待 %v 后重试...", backoffTime) + + time.Sleep(backoffTime) + } else { + // 非超时错误(严重错误) + consecutiveErrors = 0 // 重置计数 + g.Log().Errorf(ctx, "严重错误,立即重试: %v", err) + + // 短暂等待后重试 + select { + case <-ctx.Done(): + return + case <-time.After(time.Second): + // 继续循环 + } + } + } else { + // 成功时重置错误计数器 + consecutiveErrors = 0 + } + } + } + }() + return nil +} + +// consumeMessages 消费消息 +func (q *redisMessageClient) consumeMessages(ctx context.Context, cfg *RedisConfig, handler func(ctx context.Context, message map[string]interface{}) error) error { + ds, err := GetManager().GetDefaultDataSource() + if err != nil { + return fmt.Errorf("获取默认数据源失败: %w", err) + } + + if !ds.IsConnected() { + if err := ds.Reconnect(ctx); err != nil { + return fmt.Errorf("redis重连失败: %w", err) + } + } + + // 检查消费者组是否存在 + if err := q.createStreamGroup(ctx, cfg); err != nil { + return fmt.Errorf("create stream group failed: %w", err) + } + + // 使用带重试的命令执行 + result, err := ds.Redis().Do(ctx, "XREADGROUP", "GROUP", cfg.Group, cfg.Consumer, "COUNT", cfg.Count, "BLOCK", 0, "STREAMS", cfg.Stream, ">") + if err != nil { + if strings.Contains(err.Error(), "i/o timeout") || strings.Contains(err.Error(), "timeout") || + strings.Contains(err.Error(), "context deadline exceeded") || strings.Contains(err.Error(), "context canceled") { + + } + return err + } + messages, err := q.parseStreamResult(result) + if err != nil { + return err + } + for _, msg := range messages { + // 处理消息 + if err := handler(ctx, msg.Values); err != nil { + g.Log().Errorf(ctx, "❌ 消息处理失败: messageID=%s, err=%v", msg.ID, err) + continue + } + + // ACK 消息 + if cfg.AutoAck { + if err := q.ackMessage(ctx, cfg.Stream, cfg.Group, msg.ID); err != nil { + g.Log().Errorf(ctx, "❌ ACK 消息失败: messageID=%s, err=%v", msg.ID, err) + } + } + } + + return nil +} + +// ackMessage ACK 消息 +func (q *redisMessageClient) ackMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error { + ds, err := GetManager().GetDefaultDataSource() + if err != nil { + return fmt.Errorf("获取默认数据源失败: %w", err) + } + + if !ds.IsConnected() { + if err := ds.Reconnect(ctx); err != nil { + return fmt.Errorf("redis重连失败: %w", err) + } + } + + args := make([]interface{}, 0, len(messageIDs)+2) + args = append(args, streamKey, groupName) + for _, id := range messageIDs { + args = append(args, id) + } + _, err = ds.Redis().Do(ctx, "XACK", args...) + return err +} + +// parseStreamResult 解析 Stream 结果 +func (q *redisMessageClient) parseStreamResult(result interface{}) ([]RedisStreamMessage, error) { + if result == nil { + return []RedisStreamMessage{}, nil + } + + var resultVal interface{} + + // 尝试获取 Val() 方法 + if valuer, ok := result.(interface{ Val() interface{} }); ok { + resultVal = valuer.Val() + } else { + resultVal = result + } + + // 检查是否为空 + if resultVal == nil { + return []RedisStreamMessage{}, nil + } + + // 预分配切片容量,避免多次扩容 + messages := make([]RedisStreamMessage, 0) + + if streamsMap, ok := resultVal.(map[interface{}]interface{}); ok { + for _, streamMsg := range streamsMap { + msgArray, ok := streamMsg.([]interface{}) + if !ok { + continue + } + for _, msgData := range msgArray { + msgArray, ok := msgData.([]interface{}) + if !ok || len(msgArray) < 2 { + continue + } + msgID := gconv.String(msgArray[0]) + fieldsArray, ok := msgArray[1].([]interface{}) + if !ok { + continue + } + values := make(map[string]interface{}, len(fieldsArray)/2) + for i := 0; i < len(fieldsArray); i += 2 { + if i+1 < len(fieldsArray) { + key := gconv.String(fieldsArray[i]) + values[key] = fieldsArray[i+1] + } + } + messages = append(messages, RedisStreamMessage{ + ID: msgID, + Values: values, + }) + } + } + } + + return messages, nil +}