package message import ( "context" "encoding/json" "fmt" "time" "github.com/gogf/gf/v2/frame/g" amqp "github.com/rabbitmq/amqp091-go" ) type RabbitMQPublishMsgConfig struct { QueueName string Durable bool Data any } type RabbitMQPublishDelayMsgConfig struct { QueueName string Durable bool DelayTime int Data any } type RabbitMQSubscribeMsgConfig struct { QueueName string ConsumerName string AutoAck bool PrefetchCount int HandleFunc func(ctx context.Context, message map[string]interface{}) error } func (*RabbitMQPublishMsgConfig) GetPublishMsgType() { } func (*RabbitMQPublishDelayMsgConfig) GetPublishDelayMsgType() {} func (*RabbitMQSubscribeMsgConfig) GetSubscribeMsgType() { } type rabbitMQ struct { name string // 数据源名称 } func init() { // 注册 RabbitMQ 插件(默认数据源) RegisterPlugin(context.Background(), "default", MessageRabbitMQ, func() messageUtil { return &rabbitMQ{name: "default"} }) } // Connect 连接 RabbitMQ func (c *rabbitMQ) Connect(ctx context.Context) error { return rabbitmqConnect(ctx, c.name) } // Ping 检测 RabbitMQ 连接状态 func (c *rabbitMQ) Ping(ctx context.Context) bool { return rabbitmqPing(ctx, c.name) } // Close 关闭 RabbitMQ 连接 func (c *rabbitMQ) Close(ctx context.Context) error { return rabbitmqClose(ctx, c.name) } // Publish 发布消息 func (c *rabbitMQ) Publish(ctx context.Context, msgConfig messagePublishConfig) error { cfg, ok := msgConfig.(*RabbitMQPublishMsgConfig) if !ok { return fmt.Errorf("无效的 RabbitMQ 配置类型") } if g.IsEmpty(cfg.QueueName) { return fmt.Errorf("队列名称不能为空") } if cfg.Data == nil { return fmt.Errorf("数据不能为空") } return c.publishMessageInternal(ctx, cfg.QueueName, cfg.Durable, 0, cfg.Data) } // PublishDelay 发布延迟消息 func (c *rabbitMQ) PublishDelay(ctx context.Context, msgConfig messagePublishDelayConfig) error { cfg, ok := msgConfig.(*RabbitMQPublishDelayMsgConfig) if !ok { return fmt.Errorf("无效的 RabbitMQ 配置类型") } if g.IsEmpty(cfg.QueueName) { return fmt.Errorf("队列名称不能为空") } if cfg.Data == nil { return fmt.Errorf("数据不能为空") } return c.publishMessageInternal(ctx, cfg.QueueName, cfg.Durable, cfg.DelayTime, cfg.Data) } // publishMessage 发布消息内部实现 func (c *rabbitMQ) publishMessageInternal(ctx context.Context, queueName string, durable bool, delayTime int, data interface{}) error { if !c.Ping(ctx) { if err := commonConnect(ctx, MessageRabbitMQ, c.name, func(ctx context.Context) error { return c.Connect(ctx) }, func(ctx context.Context) error { return c.Close(ctx) }); err != nil { g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageRabbitMQ, c.name, err) return err } } channel := getRabbitMQChannel(c.name) if channel == nil || channel.IsClosed() { g.Log().Errorf(ctx, "❌ RabbitMQ [%s] Channel 不存在或已关闭", c.name) return fmt.Errorf("RabbitMQ Channel 不存在或已关闭") } delayMsg := delayTime > 0 // 1. 决定 Exchange 类型 exchangeType := "fanout" exchangeName := queueName routingKey := queueName args := amqp.Table{} if delayMsg { exchangeType = "x-delayed-message" exchangeName = queueName + ".delayed" args["x-delayed-type"] = "fanout" } // 2. 声明 Exchange(使用 exchangeName 而不是 queueName) if err := channel.ExchangeDeclare( exchangeName, // 修复:使用正确的交换机名称 exchangeType, durable, false, // autoDelete false, // internal false, // noWait args, ); err != nil { g.Log().Errorf(ctx, "❌ 声明 Exchange 失败: %v", err) return err } // 3. 声明队列 if _, err := channel.QueueDeclare( queueName, durable, false, // autoDelete false, // exclusive false, // noWait nil, // args ); err != nil { g.Log().Errorf(ctx, "❌ 声明队列失败: %v", err) return err } // 4. 绑定队列 if err := channel.QueueBind( queueName, routingKey, // routingKey 路由键 exchangeName, // exchange 交换机名称 false, // noWait nil, // args ); err != nil { g.Log().Errorf(ctx, "❌ 绑定队列失败: %v", err) return err } // 5. 序列化数据 body, err := json.Marshal(data) if err != nil { g.Log().Errorf(ctx, "❌ 序列化数据失败: %v", err) return err } // 6. 发布消息 deliveryMode := amqp.Transient if durable { deliveryMode = amqp.Persistent } publishing := amqp.Publishing{ ContentType: "application/json", Body: body, DeliveryMode: deliveryMode, Timestamp: time.Now(), } if delayMsg { duration := delayTime * 1000 // 延迟时间(毫秒)= 秒 * 1000 publishing.Headers = amqp.Table{ "x-delay": duration, } } err = channel.PublishWithContext( ctx, exchangeName, routingKey, false, false, publishing, ) if err != nil { g.Log().Errorf(ctx, "❌ 发布消息失败: %v", err) return err } g.Log().Infof(ctx, "📨 发布消息成功: queueName=%s, data=%v", queueName, data) return err } // Subscribe 订阅消息 func (c *rabbitMQ) Subscribe(ctx context.Context, msgConfig messageSubscribeConfig) error { cfg, ok := msgConfig.(*RabbitMQSubscribeMsgConfig) if !ok { return fmt.Errorf("无效的 RabbitMQ 配置类型") } if g.IsEmpty(cfg.QueueName) { return fmt.Errorf("队列名称不能为空") } if g.IsEmpty(cfg.ConsumerName) { return fmt.Errorf("消费者名称不能为空") } if g.IsEmpty(cfg.PrefetchCount) { cfg.PrefetchCount = 1 } if g.IsEmpty(cfg.HandleFunc) { return fmt.Errorf("必须提供处理函数") } return c.createSubscribeInternal(ctx, cfg.QueueName, cfg.ConsumerName, cfg.PrefetchCount, cfg.AutoAck, cfg.HandleFunc) } // createSubscribe 内部订阅消息 func (c *rabbitMQ) createSubscribeInternal(ctx context.Context, queueName, consumerName string, prefetchCount int, autoAck bool, handler func(ctx context.Context, message map[string]interface{}) error) error { g.Log().Infof(ctx, "🔔 RabbitMQ [%s] 开始订阅: queueName=%s, consumerName=%s", c.name, queueName, consumerName) if !c.Ping(ctx) { if err := commonConnect(ctx, MessageRabbitMQ, c.name, func(ctx context.Context) error { return c.Connect(ctx) }, func(ctx context.Context) error { return c.Close(ctx) }); err != nil { g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageRabbitMQ, c.name, err) return err } } channel := getRabbitMQChannel(c.name) if channel == nil || channel.IsClosed() { g.Log().Errorf(ctx, "❌ RabbitMQ [%s] Channel 不存在或已关闭", c.name) return fmt.Errorf("RabbitMQ Channel 不存在或已关闭") } if err := channel.Qos(prefetchCount, 0, false); err != nil { g.Log().Errorf(ctx, "❌ 设置 Qos 失败: %v", err) return err } g.Log().Infof(ctx, "📊 设置 Prefetch Count: %d", prefetchCount) msg, err := channel.Consume( queueName, // queue consumerName, // consumer autoAck, // auto-ack (根据配置决定) false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { g.Log().Errorf(ctx, "❌ 消费消息失败: %v", err) return err } g.Log().Infof(ctx, "👀 开始监听消息") for { select { case <-ctx.Done(): // Context 取消,退出 g.Log().Infof(ctx, "context cancel 监听消息退出") return nil case m, ok := <-msg: if !ok { // Channel 关闭,退出 g.Log().Infof(ctx, "channel close 监听消息退出") return nil } g.Log().Infof(ctx, "📨 收到消息: %s", string(m.Body)) var data map[string]interface{} if err := json.Unmarshal(m.Body, &data); err != nil { // 如果不是 JSON,直接使用原始内容 data = map[string]interface{}{ "data": string(m.Body), } } err := handler(ctx, data) if err != nil { g.Log().Errorf(ctx, "❌ 消息处理失败: %v", err) // 仅在手动 ACK 模式下拒绝消息 if !autoAck { // 拒绝消息不再重新入队(避免死循环) m.Nack(false, false) continue } } g.Log().Infof(ctx, "✅ 消息处理成功: %v", err) // 仅在手动 ACK 模式下确认消息 if err := m.Ack(false); err != nil { g.Log().Errorf(ctx, "❌ AUTO ACK 消息失败: %v", err) } else { g.Log().Infof(ctx, "✅ AUTO ACK 消息成功") } } } }