添加消息队列实现:NATS、RabbitMQ 和 Redis 的 Stream 支持
This commit is contained in:
204
message/nats_msg.go
Normal file
204
message/nats_msg.go
Normal file
@@ -0,0 +1,204 @@
|
||||
package message
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
// natsMessageClient NATS 实现
|
||||
type natsMessageClient struct {
|
||||
clientType messageClientType
|
||||
}
|
||||
|
||||
// StreamGroup 创建消费组(支持单个或批量)
|
||||
func (q *natsMessageClient) streamGroup(ctx context.Context, configs ...interface{}) error {
|
||||
if len(configs) == 0 {
|
||||
return fmt.Errorf("配置不能为空")
|
||||
}
|
||||
for _, config := range configs {
|
||||
cfg, ok := config.(*NATSConfig)
|
||||
if !ok {
|
||||
return fmt.Errorf("无效的 NATS 配置类型")
|
||||
}
|
||||
if err := q.createStreamGroup(ctx, cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// createStreamGroup 内部单个创建消费组
|
||||
func (q *natsMessageClient) createStreamGroup(ctx context.Context, cfg *NATSConfig) error {
|
||||
// Stream 不存在,创建新的
|
||||
storage := jetstream.FileStorage
|
||||
if !cfg.Durable {
|
||||
storage = jetstream.MemoryStorage
|
||||
}
|
||||
if g.IsEmpty(cfg.Replicas) {
|
||||
cfg.Replicas = 1
|
||||
}
|
||||
// 构建流配置
|
||||
jsConfig := jetstream.StreamConfig{
|
||||
Name: cfg.Stream,
|
||||
Subjects: []string{fmt.Sprintf("%s.>", cfg.Stream)},
|
||||
Replicas: cfg.Replicas,
|
||||
NoAck: cfg.AutoAck,
|
||||
AllowMsgSchedules: cfg.DelayMessage, // 延迟消息核心开关
|
||||
Storage: storage,
|
||||
Discard: jetstream.DiscardOld, // 达到上限删除旧消息
|
||||
}
|
||||
// 检查流是否已存在
|
||||
stream, err := js.Stream(ctx, cfg.Stream)
|
||||
if err == nil {
|
||||
// 流已存在,更新配置
|
||||
_, err = js.UpdateStream(ctx, jsConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("更新任务流失败: %w", err)
|
||||
}
|
||||
g.Log().Infof(ctx, "任务流已更新: %s", stream.CachedInfo().Config.Name)
|
||||
return nil
|
||||
}
|
||||
// 创建新流
|
||||
stream, err = js.CreateStream(ctx, jsConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("创建任务流失败: %w", err)
|
||||
}
|
||||
|
||||
g.Log().Infof(ctx, "✅ NATS 队列初始化成功: stream=%s, consumer=%s", cfg.Stream, cfg.Consumer)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Publish 发布消息(支持单个或批量)
|
||||
func (q *natsMessageClient) publish(ctx context.Context, config interface{}, data interface{}) error {
|
||||
cfg, ok := config.(*NATSConfig)
|
||||
if !ok {
|
||||
return fmt.Errorf("无效的 NATS 配置类型")
|
||||
}
|
||||
payload, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("序列化数据失败: %w", err)
|
||||
}
|
||||
|
||||
// 发布消息到 JetStream
|
||||
subject := fmt.Sprintf("%s.>", cfg.Stream)
|
||||
_, err = js.Publish(ctx, subject, payload)
|
||||
if err != nil {
|
||||
g.Log().Errorf(ctx, "❌ NATS 发布消息失败: topic=%s, err=%v", cfg.Stream, err)
|
||||
return err
|
||||
}
|
||||
|
||||
g.Log().Infof(ctx, "✅ NATS 发布消息成功: topic=%s", cfg.Stream)
|
||||
return nil
|
||||
}
|
||||
|
||||
// PublishDelayed 发布延迟消息(支持单个或批量)
|
||||
func (q *natsMessageClient) publishDelayed(ctx context.Context, config interface{}, data interface{}, delay int) error {
|
||||
|
||||
cfg, ok := config.(*NATSConfig)
|
||||
if !ok {
|
||||
return fmt.Errorf("无效的 NATS 配置类型")
|
||||
}
|
||||
|
||||
payload, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("序列化数据失败: %w", err)
|
||||
}
|
||||
|
||||
// 使用 goroutine 实现简单的延迟发布
|
||||
go func() {
|
||||
time.Sleep(time.Duration(delay))
|
||||
subject := fmt.Sprintf("%s.>", cfg.Stream)
|
||||
if err := q.publishInternal(ctx, subject, payload); err != nil {
|
||||
g.Log().Errorf(ctx, "❌ NATS 延迟消息发布失败: topic=%s, delay=%v, err=%v", cfg.Stream, delay, err)
|
||||
}
|
||||
}()
|
||||
|
||||
g.Log().Infof(ctx, "✅ NATS 延迟消息已提交: topic=%s, delay=%v", cfg.Stream, delay)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// publishInternal 内部发布消息
|
||||
func (q *natsMessageClient) publishInternal(ctx context.Context, subject string, payload []byte) error {
|
||||
_, err := js.Publish(ctx, subject, payload)
|
||||
return err
|
||||
}
|
||||
|
||||
// Subscribe 订阅消息(支持单个或批量)
|
||||
func (q *natsMessageClient) subscribe(ctx context.Context, configs ...interface{}) error {
|
||||
if len(configs) == 0 {
|
||||
return fmt.Errorf("配置不能为空")
|
||||
}
|
||||
|
||||
for _, config := range configs {
|
||||
cfg, ok := config.(*NATSConfig)
|
||||
if !ok {
|
||||
return fmt.Errorf("无效的 NATS 配置类型")
|
||||
}
|
||||
handler := cfg.HandleFunc
|
||||
if handler == nil {
|
||||
return fmt.Errorf("必须提供处理函数")
|
||||
}
|
||||
if err := q.createSubscribe(ctx, cfg, handler); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// subscribe 内部单个订阅消息
|
||||
func (q *natsMessageClient) createSubscribe(ctx context.Context, cfg *NATSConfig, handler func(ctx context.Context, message map[string]interface{}) error) error {
|
||||
g.Log().Infof(ctx, "🔔 NATS 开始订阅: stream=%s, consumer=%s", cfg.Stream, cfg.Consumer)
|
||||
// Stream 不存在,创建新的
|
||||
ackPolicy := jetstream.AckExplicitPolicy
|
||||
if cfg.AutoAck {
|
||||
ackPolicy = jetstream.AckNonePolicy
|
||||
}
|
||||
jsConfig := jetstream.ConsumerConfig{
|
||||
Name: cfg.Consumer,
|
||||
Durable: cfg.Consumer,
|
||||
AckPolicy: ackPolicy,
|
||||
MaxDeliver: 3,
|
||||
MaxAckPending: cfg.PrefetchCount,
|
||||
}
|
||||
// 创建新消费者
|
||||
consumer, err := js.CreateOrUpdateConsumer(ctx, cfg.Stream, jsConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("创建消费者失败: %w", err)
|
||||
}
|
||||
// 创建消息处理函数
|
||||
msgHandler := func(msg jetstream.Msg) {
|
||||
// 解析消息
|
||||
var data map[string]any
|
||||
if err := json.Unmarshal(msg.Data(), &data); err != nil {
|
||||
g.Log().Errorf(ctx, "解析消息失败: %v", err)
|
||||
msg.Nak()
|
||||
return
|
||||
}
|
||||
// 处理业务逻辑
|
||||
if err := handler(ctx, data); err != nil {
|
||||
g.Log().Errorf(ctx, "处理消息失败: %v", err)
|
||||
msg.Nak()
|
||||
return
|
||||
}
|
||||
g.Log().Infof(ctx, "处理消息成功")
|
||||
if !cfg.AutoAck {
|
||||
msg.Ack()
|
||||
}
|
||||
}
|
||||
|
||||
// 开始消费
|
||||
_, err = consumer.Consume(msgHandler)
|
||||
if err != nil {
|
||||
return fmt.Errorf("开始消费失败: %w", err)
|
||||
}
|
||||
|
||||
g.Log().Infof(ctx, "✅ 开始消费消息: %s/%s", cfg.Stream, cfg.Consumer)
|
||||
|
||||
return nil
|
||||
}
|
||||
286
message/rabbitmq_msg.go
Normal file
286
message/rabbitmq_msg.go
Normal file
@@ -0,0 +1,286 @@
|
||||
package message
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
// rabbitMQMessageClient RabbitMQ 实现
|
||||
type rabbitMQMessageClient struct {
|
||||
clientType messageClientType
|
||||
}
|
||||
|
||||
// StreamGroup 创建消费组(支持单个或批量)
|
||||
func (q *rabbitMQMessageClient) streamGroup(ctx context.Context, configs ...interface{}) error {
|
||||
if len(configs) == 0 {
|
||||
return fmt.Errorf("配置不能为空")
|
||||
}
|
||||
for _, config := range configs {
|
||||
cfg, ok := config.(*RabbitMQConfig)
|
||||
if !ok {
|
||||
return fmt.Errorf("无效的 RabbitMQ 配置类型")
|
||||
}
|
||||
if err := q.setupQueue(ctx, channel, cfg, cfg.DelayMessage); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Publish 发布消息(支持单个或批量)
|
||||
func (q *rabbitMQMessageClient) publish(ctx context.Context, config interface{}, data interface{}) error {
|
||||
cfg, ok := config.(*RabbitMQConfig)
|
||||
if !ok {
|
||||
return fmt.Errorf("无效的 RabbitMQ 配置类型")
|
||||
}
|
||||
if err := q.publishMessage(ctx, cfg, "work", data, 0); err != nil {
|
||||
g.Log().Errorf(ctx, "❌ RabbitMQ 发布消息失败: err=%v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// PublishDelayed 发布延迟消息
|
||||
func (q *rabbitMQMessageClient) publishDelayed(ctx context.Context, config interface{}, data interface{}, delaySeconds int) error {
|
||||
cfg, ok := config.(*RabbitMQConfig)
|
||||
if !ok {
|
||||
return fmt.Errorf("无效的 RabbitMQ 配置类型")
|
||||
}
|
||||
if err := q.publishMessage(ctx, cfg, "delayed", data, delaySeconds); err != nil {
|
||||
g.Log().Errorf(ctx, "❌ RabbitMQ 发布延迟消息失败: err=%v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *rabbitMQMessageClient) publishMessage(ctx context.Context, cfg *RabbitMQConfig, mode string, data interface{}, delaySeconds int) error {
|
||||
body, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("序列化数据失败: %w", err)
|
||||
}
|
||||
deliveryMode := amqp.Transient
|
||||
if cfg.Durable {
|
||||
deliveryMode = amqp.Persistent
|
||||
}
|
||||
publishing := amqp.Publishing{
|
||||
ContentType: "application/json",
|
||||
Body: body,
|
||||
DeliveryMode: deliveryMode,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
if delaySeconds > 0 {
|
||||
publishing.Headers = amqp.Table{
|
||||
"x-delay": delaySeconds * 1000, // 延时时间(毫秒)
|
||||
}
|
||||
}
|
||||
exchange, routingKey := q.parseExchangeAndRoutingKey(ctx, mode, cfg)
|
||||
err = channel.PublishWithContext(
|
||||
ctx,
|
||||
exchange,
|
||||
routingKey,
|
||||
false, false,
|
||||
publishing,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (q *rabbitMQMessageClient) parseExchangeAndRoutingKey(_ context.Context, mode string, cfg *RabbitMQConfig) (exchange, routingKey string) {
|
||||
switch mode {
|
||||
case "work", "":
|
||||
exchange = "" // 默认交换机
|
||||
routingKey = cfg.Name // 队列名
|
||||
case "event", "topic":
|
||||
exchange = cfg.Exchange
|
||||
routingKey = cfg.Topic
|
||||
case "broadcast":
|
||||
exchange = cfg.Exchange
|
||||
routingKey = "" // fanout忽略路由键
|
||||
case "delayed":
|
||||
exchange = cfg.Exchange + ".delayed"
|
||||
routingKey = cfg.Topic
|
||||
default:
|
||||
exchange = ""
|
||||
routingKey = cfg.Name
|
||||
}
|
||||
return exchange, routingKey
|
||||
}
|
||||
|
||||
// setupQueue 统一的队列设置方法(声明 Exchange、队列、绑定、延迟 Exchange)
|
||||
func (q *rabbitMQMessageClient) setupQueue(ctx context.Context, ch *amqp.Channel, cfg *RabbitMQConfig, delayMessage bool) error {
|
||||
exchange, routingKey := q.parseExchangeAndRoutingKey(ctx, cfg.Mode, cfg)
|
||||
|
||||
// 声明 Exchange
|
||||
if err := ch.ExchangeDeclare(exchange, "topic", cfg.Durable, false, false, false, nil); err != nil {
|
||||
return fmt.Errorf("声明 Exchange 失败: %w", err)
|
||||
}
|
||||
|
||||
// 声明队列
|
||||
if _, err := ch.QueueDeclare(cfg.Queue, cfg.Durable, false, false, false, nil); err != nil {
|
||||
return fmt.Errorf("声明队列失败: %w", err)
|
||||
}
|
||||
|
||||
// 绑定队列
|
||||
if err := ch.QueueBind(cfg.Queue, routingKey, exchange, false, nil); err != nil {
|
||||
return fmt.Errorf("绑定队列失败: %w", err)
|
||||
}
|
||||
|
||||
// 声明延迟 Exchange(如果需要)
|
||||
if delayMessage {
|
||||
if err := ch.ExchangeDeclare(exchange, "x-delayed-message", true, false, false, false, amqp.Table{"x-delayed-type": "direct"}); err != nil {
|
||||
return fmt.Errorf("声明延迟 Exchange 失败: %w", err)
|
||||
}
|
||||
if err := ch.QueueBind(cfg.Name, routingKey, exchange, false, nil); err != nil {
|
||||
return fmt.Errorf("绑定延迟队列失败: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Subscribe 订阅消息(支持单个或批量)
|
||||
func (q *rabbitMQMessageClient) subscribe(ctx context.Context, configs ...interface{}) error {
|
||||
if len(configs) == 0 {
|
||||
return fmt.Errorf("配置不能为空")
|
||||
}
|
||||
|
||||
for _, config := range configs {
|
||||
cfg, ok := config.(*RabbitMQConfig)
|
||||
if !ok {
|
||||
return fmt.Errorf("无效的 RabbitMQ 配置类型")
|
||||
}
|
||||
handler := cfg.HandleFunc
|
||||
if handler == nil {
|
||||
return fmt.Errorf("必须提供处理函数")
|
||||
}
|
||||
if err := q.createSubscribe(ctx, cfg, handler); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// subscribe 内部单个订阅消息
|
||||
func (q *rabbitMQMessageClient) createSubscribe(ctx context.Context, cfg *RabbitMQConfig, handler func(ctx context.Context, message map[string]interface{}) error) error {
|
||||
g.Log().Infof(ctx, "🔔 RabbitMQ 开始订阅: exchange=%s, queue=%s", cfg.Exchange, cfg.Queue)
|
||||
|
||||
// 设置 Qos (预取数量),控制每次推送的消息数量
|
||||
// prefetchCount: 未 ACK 消息的最大数量
|
||||
// prefetchSize: 未 ACK 消息的总大小(0 表示不限制)
|
||||
// global: false 表示仅应用于当前消费者
|
||||
prefetchCount := cfg.PrefetchCount
|
||||
if prefetchCount <= 0 {
|
||||
prefetchCount = 10 // 默认值为 10
|
||||
}
|
||||
if err := channel.Qos(prefetchCount, 0, false); err != nil {
|
||||
return fmt.Errorf("设置 Qos 失败: %w", err)
|
||||
}
|
||||
g.Log().Infof(ctx, "📊 设置 Prefetch Count: %d", prefetchCount)
|
||||
|
||||
msg, err := channel.Consume(
|
||||
cfg.Queue, // queue
|
||||
cfg.Queue, // consumer
|
||||
cfg.AutoAck, // auto-ack (根据配置决定)
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
nil, // args
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("注册消费者失败: %w", err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
g.Log().Errorf(ctx, "❌ RabbitMQ 消费者 panic: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
// 并发控制信号量
|
||||
semaphore := make(chan struct{}, 10) // 限制最大并发数为 10
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
g.Log().Infof(ctx, "🔕 RabbitMQ 消费者停止: queue=%s", cfg.Queue)
|
||||
return
|
||||
case msg, ok := <-msg:
|
||||
if !ok {
|
||||
g.Log().Warningf(ctx, "⚠️ RabbitMQ 消息通道关闭")
|
||||
return
|
||||
}
|
||||
|
||||
// 获取并发控制槽位
|
||||
semaphore <- struct{}{}
|
||||
|
||||
go func(m amqp.Delivery) {
|
||||
defer func() {
|
||||
<-semaphore // 释放槽位
|
||||
if r := recover(); r != nil {
|
||||
g.Log().Errorf(ctx, "❌ 消息处理 panic: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := q.handleMessageWithRetry(ctx, m, handler, cfg.MaxRetry); err != nil {
|
||||
g.Log().Errorf(ctx, "❌ 消息处理失败(重试次数耗尽): %v", err)
|
||||
|
||||
// 仅在手动 ACK 模式下拒绝消息
|
||||
if !cfg.AutoAck {
|
||||
// 拒绝消息不再重新入队(避免死循环)
|
||||
m.Nack(false, false)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// 仅在手动 ACK 模式下确认消息
|
||||
if cfg.AutoAck {
|
||||
if err := m.Ack(false); err != nil {
|
||||
g.Log().Errorf(ctx, "❌ ACK 消息失败: %v", err)
|
||||
}
|
||||
}
|
||||
}(msg)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleMessageWithRetry 处理消息(支持重试)
|
||||
func (q *rabbitMQMessageClient) handleMessageWithRetry(ctx context.Context, msg amqp.Delivery, handler func(ctx context.Context, message map[string]interface{}) error, maxRetry int) error {
|
||||
var data map[string]interface{}
|
||||
|
||||
if err := json.Unmarshal(msg.Body, &data); err != nil {
|
||||
// 如果不是 JSON,直接使用原始内容
|
||||
data = map[string]interface{}{
|
||||
"data": string(msg.Body),
|
||||
}
|
||||
}
|
||||
|
||||
// 重试逻辑
|
||||
for attempt := 0; attempt <= maxRetry; attempt++ {
|
||||
if attempt > 0 {
|
||||
g.Log().Infof(ctx, "🔄 消息处理重试 (第%d次)", attempt)
|
||||
// 指数退避
|
||||
time.Sleep(time.Duration(attempt) * time.Second)
|
||||
}
|
||||
|
||||
err := handler(ctx, data)
|
||||
if err == nil {
|
||||
return nil // 成功
|
||||
}
|
||||
|
||||
g.Log().Warningf(ctx, "⚠️ 消息处理失败 (第%d次): %v", attempt+1, err)
|
||||
|
||||
if attempt == maxRetry {
|
||||
return fmt.Errorf("达到最大重试次数 %d: %w", maxRetry, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
344
message/redis_msg.go
Normal file
344
message/redis_msg.go
Normal file
@@ -0,0 +1,344 @@
|
||||
package message
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/os/glog"
|
||||
"github.com/gogf/gf/v2/util/gconv"
|
||||
)
|
||||
|
||||
// redisMessageClient Redis 实现
|
||||
type redisMessageClient struct {
|
||||
clientType messageClientType
|
||||
}
|
||||
|
||||
// RedisStreamMessage Redis Stream 消息结构
|
||||
type RedisStreamMessage struct {
|
||||
ID string
|
||||
Values map[string]interface{}
|
||||
}
|
||||
|
||||
// StreamGroup 创建消费组(支持单个或批量)
|
||||
func (q *redisMessageClient) streamGroup(ctx context.Context, configs ...interface{}) error {
|
||||
if len(configs) == 0 {
|
||||
return fmt.Errorf("配置不能为空")
|
||||
}
|
||||
for _, config := range configs {
|
||||
cfg, ok := config.(*RedisConfig)
|
||||
if !ok {
|
||||
return fmt.Errorf("无效的 Redis 配置类型")
|
||||
}
|
||||
if err := q.createStreamGroup(ctx, cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// streamGroup 内部单个创建消费组
|
||||
func (q *redisMessageClient) createStreamGroup(ctx context.Context, cfg *RedisConfig) error {
|
||||
// 获取默认数据源
|
||||
ds, err := GetManager().GetDefaultDataSource()
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取默认数据源失败: %w", err)
|
||||
}
|
||||
|
||||
// 检查连接状态,未连接则自动重连
|
||||
if !ds.IsConnected() {
|
||||
if err := ds.Reconnect(ctx); err != nil {
|
||||
return fmt.Errorf("redis重连失败: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
_, err = ds.Redis().Do(ctx, "XGROUP", "CREATE", cfg.Stream, cfg.Group, "0", "MKSTREAM")
|
||||
if err != nil {
|
||||
errStr := err.Error()
|
||||
if strings.Contains(errStr, "BUSYGROUP") && strings.Contains(errStr, "already exists") {
|
||||
glog.Infof(ctx, "✅ Redis 消费者组已存在: %s", cfg.Group)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("初始化消费者组失败: %w", err)
|
||||
}
|
||||
glog.Infof(ctx, "✅ Redis 消费者组创建成功: %s", cfg.Group)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Publish 内部单个发布消息
|
||||
func (q *redisMessageClient) publish(ctx context.Context, config interface{}, data interface{}) error {
|
||||
ds, err := GetManager().GetDefaultDataSource()
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取默认数据源失败: %w", err)
|
||||
}
|
||||
|
||||
if !ds.IsConnected() {
|
||||
if err := ds.Reconnect(ctx); err != nil {
|
||||
return fmt.Errorf("redis重连失败: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
cfg, ok := config.(*RedisConfig)
|
||||
if !ok {
|
||||
return fmt.Errorf("无效的redis配置类型")
|
||||
}
|
||||
values := gconv.Map(data)
|
||||
args := make([]interface{}, 0, len(values)*2+2)
|
||||
args = append(args, cfg.Stream, "*")
|
||||
for key, val := range values {
|
||||
args = append(args, key, val)
|
||||
}
|
||||
result, err := ds.Redis().Do(ctx, "XADD", args...)
|
||||
if err != nil {
|
||||
g.Log().Errorf(ctx, "❌ Redis 发布消息失败: topic=%s, err=%v", cfg.Stream, err)
|
||||
return err
|
||||
}
|
||||
g.Log().Infof(ctx, "✅ Redis 发布消息成功: topic=%s, messageID=%s", cfg.Stream, gconv.String(result))
|
||||
return nil
|
||||
}
|
||||
|
||||
// PublishDelayed 发布延迟消息(使用 ZSET)
|
||||
func (q *redisMessageClient) publishDelayed(ctx context.Context, config interface{}, data interface{}, delay int) error {
|
||||
ds, err := GetManager().GetDefaultDataSource()
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取默认数据源失败: %w", err)
|
||||
}
|
||||
|
||||
if !ds.IsConnected() {
|
||||
if err := ds.Reconnect(ctx); err != nil {
|
||||
return fmt.Errorf("redis重连失败: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
cfg, ok := config.(*RedisConfig)
|
||||
if !ok {
|
||||
return fmt.Errorf("无效的redis配置类型")
|
||||
}
|
||||
payload, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("序列化数据失败: %w", err)
|
||||
}
|
||||
score := float64(time.Now().Add(time.Duration(delay)).UnixMilli())
|
||||
delayedKey := fmt.Sprintf("delayed:%s", cfg.Stream)
|
||||
|
||||
// ZADD delayedKey score payload
|
||||
_, err = ds.Redis().Do(ctx, "ZADD", delayedKey, score, string(payload))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
g.Log().Infof(ctx, "✅ Redis 延迟消息已发布: topic=%s, delay=%v", cfg.Stream, delay)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Subscribe 订阅消息(支持单个或批量)
|
||||
func (q *redisMessageClient) subscribe(ctx context.Context, configs ...interface{}) error {
|
||||
if len(configs) == 0 {
|
||||
return fmt.Errorf("配置不能为空")
|
||||
}
|
||||
for _, config := range configs {
|
||||
cfg, ok := config.(*RedisConfig)
|
||||
if !ok {
|
||||
return fmt.Errorf("无效的 Redis 配置类型")
|
||||
}
|
||||
handler := cfg.HandleFunc
|
||||
if handler == nil {
|
||||
return fmt.Errorf("必须提供处理函数")
|
||||
}
|
||||
if err := q.createSubscribe(ctx, cfg, handler); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// subscribe 内部单个订阅消息
|
||||
func (q *redisMessageClient) createSubscribe(ctx context.Context, cfg *RedisConfig, handler func(ctx context.Context, message map[string]interface{}) error) error {
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
g.Log().Errorf(ctx, "❌ Redis 消费者 panic: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
retryTicker := time.NewTicker(time.Second)
|
||||
defer retryTicker.Stop()
|
||||
|
||||
// 重试计数器
|
||||
var consecutiveErrors int
|
||||
const maxConsecutiveErrors = 3
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
g.Log().Infof(ctx, "🔕 Redis 消费者停止: topic=%s", cfg.Stream)
|
||||
return
|
||||
case <-retryTicker.C:
|
||||
err := q.consumeMessages(ctx, cfg, handler)
|
||||
if err != nil {
|
||||
// 对于超时错误,返回nil继续循环,而不是返回错误
|
||||
if strings.Contains(err.Error(), "i/o timeout") || strings.Contains(err.Error(), "timeout") ||
|
||||
strings.Contains(err.Error(), "context deadline exceeded") || strings.Contains(err.Error(), "context canceled") {
|
||||
|
||||
consecutiveErrors++
|
||||
if consecutiveErrors > maxConsecutiveErrors {
|
||||
g.Log().Errorf(ctx, "Max retries exceeded, giving up")
|
||||
return
|
||||
}
|
||||
backoffTime := 5 * time.Second
|
||||
g.Log().Warningf(ctx, "⚠️ 等待 %v 后重试...", backoffTime)
|
||||
|
||||
time.Sleep(backoffTime)
|
||||
} else {
|
||||
// 非超时错误(严重错误)
|
||||
consecutiveErrors = 0 // 重置计数
|
||||
g.Log().Errorf(ctx, "严重错误,立即重试: %v", err)
|
||||
|
||||
// 短暂等待后重试
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(time.Second):
|
||||
// 继续循环
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// 成功时重置错误计数器
|
||||
consecutiveErrors = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// consumeMessages 消费消息
|
||||
func (q *redisMessageClient) consumeMessages(ctx context.Context, cfg *RedisConfig, handler func(ctx context.Context, message map[string]interface{}) error) error {
|
||||
ds, err := GetManager().GetDefaultDataSource()
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取默认数据源失败: %w", err)
|
||||
}
|
||||
|
||||
if !ds.IsConnected() {
|
||||
if err := ds.Reconnect(ctx); err != nil {
|
||||
return fmt.Errorf("redis重连失败: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// 检查消费者组是否存在
|
||||
if err := q.createStreamGroup(ctx, cfg); err != nil {
|
||||
return fmt.Errorf("create stream group failed: %w", err)
|
||||
}
|
||||
|
||||
// 使用带重试的命令执行
|
||||
result, err := ds.Redis().Do(ctx, "XREADGROUP", "GROUP", cfg.Group, cfg.Consumer, "COUNT", cfg.Count, "BLOCK", 0, "STREAMS", cfg.Stream, ">")
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "i/o timeout") || strings.Contains(err.Error(), "timeout") ||
|
||||
strings.Contains(err.Error(), "context deadline exceeded") || strings.Contains(err.Error(), "context canceled") {
|
||||
|
||||
}
|
||||
return err
|
||||
}
|
||||
messages, err := q.parseStreamResult(result)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, msg := range messages {
|
||||
// 处理消息
|
||||
if err := handler(ctx, msg.Values); err != nil {
|
||||
g.Log().Errorf(ctx, "❌ 消息处理失败: messageID=%s, err=%v", msg.ID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// ACK 消息
|
||||
if cfg.AutoAck {
|
||||
if err := q.ackMessage(ctx, cfg.Stream, cfg.Group, msg.ID); err != nil {
|
||||
g.Log().Errorf(ctx, "❌ ACK 消息失败: messageID=%s, err=%v", msg.ID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ackMessage ACK 消息
|
||||
func (q *redisMessageClient) ackMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error {
|
||||
ds, err := GetManager().GetDefaultDataSource()
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取默认数据源失败: %w", err)
|
||||
}
|
||||
|
||||
if !ds.IsConnected() {
|
||||
if err := ds.Reconnect(ctx); err != nil {
|
||||
return fmt.Errorf("redis重连失败: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
args := make([]interface{}, 0, len(messageIDs)+2)
|
||||
args = append(args, streamKey, groupName)
|
||||
for _, id := range messageIDs {
|
||||
args = append(args, id)
|
||||
}
|
||||
_, err = ds.Redis().Do(ctx, "XACK", args...)
|
||||
return err
|
||||
}
|
||||
|
||||
// parseStreamResult 解析 Stream 结果
|
||||
func (q *redisMessageClient) parseStreamResult(result interface{}) ([]RedisStreamMessage, error) {
|
||||
if result == nil {
|
||||
return []RedisStreamMessage{}, nil
|
||||
}
|
||||
|
||||
var resultVal interface{}
|
||||
|
||||
// 尝试获取 Val() 方法
|
||||
if valuer, ok := result.(interface{ Val() interface{} }); ok {
|
||||
resultVal = valuer.Val()
|
||||
} else {
|
||||
resultVal = result
|
||||
}
|
||||
|
||||
// 检查是否为空
|
||||
if resultVal == nil {
|
||||
return []RedisStreamMessage{}, nil
|
||||
}
|
||||
|
||||
// 预分配切片容量,避免多次扩容
|
||||
messages := make([]RedisStreamMessage, 0)
|
||||
|
||||
if streamsMap, ok := resultVal.(map[interface{}]interface{}); ok {
|
||||
for _, streamMsg := range streamsMap {
|
||||
msgArray, ok := streamMsg.([]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
for _, msgData := range msgArray {
|
||||
msgArray, ok := msgData.([]interface{})
|
||||
if !ok || len(msgArray) < 2 {
|
||||
continue
|
||||
}
|
||||
msgID := gconv.String(msgArray[0])
|
||||
fieldsArray, ok := msgArray[1].([]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
values := make(map[string]interface{}, len(fieldsArray)/2)
|
||||
for i := 0; i < len(fieldsArray); i += 2 {
|
||||
if i+1 < len(fieldsArray) {
|
||||
key := gconv.String(fieldsArray[i])
|
||||
values[key] = fieldsArray[i+1]
|
||||
}
|
||||
}
|
||||
messages = append(messages, RedisStreamMessage{
|
||||
ID: msgID,
|
||||
Values: values,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return messages, nil
|
||||
}
|
||||
Reference in New Issue
Block a user