Files
common/message/rabbitmq_msg.go

312 lines
8.2 KiB
Go
Raw Normal View History

package message
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/gogf/gf/v2/frame/g"
amqp "github.com/rabbitmq/amqp091-go"
)
type RabbitMQPublishMsgConfig struct {
QueueName string
Durable bool
Data any
}
type RabbitMQPublishDelayMsgConfig struct {
QueueName string
Durable bool
DelayTime int
Data any
}
type RabbitMQSubscribeMsgConfig struct {
QueueName string
ConsumerName string
AutoAck bool
PrefetchCount int
HandleFunc func(ctx context.Context, message map[string]interface{}) error
}
func (*RabbitMQPublishMsgConfig) GetPublishMsgType() {
}
func (*RabbitMQPublishDelayMsgConfig) GetPublishDelayMsgType() {}
func (*RabbitMQSubscribeMsgConfig) GetSubscribeMsgType() {
}
type rabbitMQ struct {
name string // 数据源名称
}
func init() {
// 注册 RabbitMQ 插件(默认数据源)
RegisterPlugin(context.Background(), "default", MessageRabbitMQ, func() messageUtil {
return &rabbitMQ{name: "default"}
})
}
// Connect 连接 RabbitMQ
func (c *rabbitMQ) Connect(ctx context.Context) error {
return rabbitmqConnect(ctx, c.name)
}
// Ping 检测 RabbitMQ 连接状态
func (c *rabbitMQ) Ping(ctx context.Context) bool {
return rabbitmqPing(ctx, c.name)
}
// Close 关闭 RabbitMQ 连接
func (c *rabbitMQ) Close(ctx context.Context) error {
return rabbitmqClose(ctx, c.name)
}
// Publish 发布消息
func (c *rabbitMQ) Publish(ctx context.Context, msgConfig messagePublishConfig) error {
cfg, ok := msgConfig.(*RabbitMQPublishMsgConfig)
if !ok {
return fmt.Errorf("无效的 RabbitMQ 配置类型")
}
if g.IsEmpty(cfg.QueueName) {
return fmt.Errorf("队列名称不能为空")
}
if cfg.Data == nil {
return fmt.Errorf("数据不能为空")
}
return c.publishMessageInternal(ctx, cfg.QueueName, cfg.Durable, 0, cfg.Data)
}
// PublishDelay 发布延迟消息
func (c *rabbitMQ) PublishDelay(ctx context.Context, msgConfig messagePublishDelayConfig) error {
cfg, ok := msgConfig.(*RabbitMQPublishDelayMsgConfig)
if !ok {
return fmt.Errorf("无效的 RabbitMQ 配置类型")
}
if g.IsEmpty(cfg.QueueName) {
return fmt.Errorf("队列名称不能为空")
}
if cfg.Data == nil {
return fmt.Errorf("数据不能为空")
}
return c.publishMessageInternal(ctx, cfg.QueueName, cfg.Durable, cfg.DelayTime, cfg.Data)
}
// publishMessage 发布消息内部实现
func (c *rabbitMQ) publishMessageInternal(ctx context.Context, queueName string, durable bool, delayTime int, data interface{}) error {
if !c.Ping(ctx) {
if err := commonConnect(ctx, MessageRabbitMQ, c.name, func(ctx context.Context) error {
return c.Connect(ctx)
}, func(ctx context.Context) error {
return c.Close(ctx)
}); err != nil {
g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageRabbitMQ, c.name, err)
return err
}
}
channel := getRabbitMQChannel(c.name)
if channel == nil || channel.IsClosed() {
g.Log().Errorf(ctx, "❌ RabbitMQ [%s] Channel 不存在或已关闭", c.name)
return fmt.Errorf("RabbitMQ Channel 不存在或已关闭")
}
delayMsg := delayTime > 0
// 1. 决定 Exchange 类型
exchangeType := "fanout"
exchangeName := queueName
routingKey := queueName
args := amqp.Table{}
if delayMsg {
exchangeType = "x-delayed-message"
exchangeName = queueName + ".delayed"
args["x-delayed-type"] = "fanout"
}
// 2. 声明 Exchange使用 exchangeName 而不是 queueName
if err := channel.ExchangeDeclare(
exchangeName, // 修复:使用正确的交换机名称
exchangeType,
durable,
false, // autoDelete
false, // internal
false, // noWait
args,
); err != nil {
g.Log().Errorf(ctx, "❌ 声明 Exchange 失败: %v", err)
return err
}
// 3. 声明队列
if _, err := channel.QueueDeclare(
queueName,
durable,
false, // autoDelete
false, // exclusive
false, // noWait
nil, // args
); err != nil {
g.Log().Errorf(ctx, "❌ 声明队列失败: %v", err)
return err
}
// 4. 绑定队列
if err := channel.QueueBind(
queueName,
routingKey, // routingKey 路由键
exchangeName, // exchange 交换机名称
false, // noWait
nil, // args
); err != nil {
g.Log().Errorf(ctx, "❌ 绑定队列失败: %v", err)
return err
}
// 5. 序列化数据
body, err := json.Marshal(data)
if err != nil {
g.Log().Errorf(ctx, "❌ 序列化数据失败: %v", err)
return err
}
// 6. 发布消息
deliveryMode := amqp.Transient
if durable {
deliveryMode = amqp.Persistent
}
publishing := amqp.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: deliveryMode,
Timestamp: time.Now(),
}
if delayMsg {
duration := delayTime * 1000 // 延迟时间(毫秒)= 秒 * 1000
publishing.Headers = amqp.Table{
"x-delay": duration,
}
}
err = channel.PublishWithContext(
ctx,
exchangeName,
routingKey,
false, false,
publishing,
)
if err != nil {
g.Log().Errorf(ctx, "❌ 发布消息失败: %v", err)
return err
}
g.Log().Infof(ctx, "📨 发布消息成功: queueName=%s, data=%v", queueName, data)
return err
}
// Subscribe 订阅消息
func (c *rabbitMQ) Subscribe(ctx context.Context, msgConfig messageSubscribeConfig) error {
cfg, ok := msgConfig.(*RabbitMQSubscribeMsgConfig)
if !ok {
return fmt.Errorf("无效的 RabbitMQ 配置类型")
}
if g.IsEmpty(cfg.QueueName) {
return fmt.Errorf("队列名称不能为空")
}
if g.IsEmpty(cfg.ConsumerName) {
return fmt.Errorf("消费者名称不能为空")
}
if g.IsEmpty(cfg.PrefetchCount) {
cfg.PrefetchCount = 1
}
if g.IsEmpty(cfg.HandleFunc) {
return fmt.Errorf("必须提供处理函数")
}
return c.createSubscribeInternal(ctx, cfg.QueueName, cfg.ConsumerName, cfg.PrefetchCount, cfg.AutoAck, cfg.HandleFunc)
}
// createSubscribe 内部订阅消息
func (c *rabbitMQ) createSubscribeInternal(ctx context.Context, queueName, consumerName string, prefetchCount int, autoAck bool, handler func(ctx context.Context, message map[string]interface{}) error) error {
g.Log().Infof(ctx, "🔔 RabbitMQ [%s] 开始订阅: queueName=%s, consumerName=%s", c.name, queueName, consumerName)
if !c.Ping(ctx) {
if err := commonConnect(ctx, MessageRabbitMQ, c.name, func(ctx context.Context) error {
return c.Connect(ctx)
}, func(ctx context.Context) error {
return c.Close(ctx)
}); err != nil {
g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageRabbitMQ, c.name, err)
return err
}
}
channel := getRabbitMQChannel(c.name)
if channel == nil || channel.IsClosed() {
g.Log().Errorf(ctx, "❌ RabbitMQ [%s] Channel 不存在或已关闭", c.name)
return fmt.Errorf("RabbitMQ Channel 不存在或已关闭")
}
if err := channel.Qos(prefetchCount, 0, false); err != nil {
g.Log().Errorf(ctx, "❌ 设置 Qos 失败: %v", err)
return err
}
g.Log().Infof(ctx, "📊 设置 Prefetch Count: %d", prefetchCount)
msg, err := channel.Consume(
queueName, // queue
consumerName, // consumer
autoAck, // auto-ack (根据配置决定)
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
g.Log().Errorf(ctx, "❌ 消费消息失败: %v", err)
return err
}
g.Log().Infof(ctx, "👀 开始监听消息")
for {
select {
case <-ctx.Done():
// Context 取消,退出
g.Log().Infof(ctx, "context cancel 监听消息退出")
return nil
case m, ok := <-msg:
if !ok {
// Channel 关闭,退出
g.Log().Infof(ctx, "channel close 监听消息退出")
return nil
}
g.Log().Infof(ctx, "📨 收到消息: %s", string(m.Body))
var data map[string]interface{}
if err := json.Unmarshal(m.Body, &data); err != nil {
// 如果不是 JSON直接使用原始内容
data = map[string]interface{}{
"data": string(m.Body),
}
}
err := handler(ctx, data)
if err != nil {
g.Log().Errorf(ctx, "❌ 消息处理失败: %v", err)
// 仅在手动 ACK 模式下拒绝消息
if !autoAck {
// 拒绝消息不再重新入队(避免死循环)
m.Nack(false, false)
continue
}
}
g.Log().Infof(ctx, "✅ 消息处理成功: %v", err)
// 仅在手动 ACK 模式下确认消息
if err := m.Ack(false); err != nil {
g.Log().Errorf(ctx, "❌ AUTO ACK 消息失败: %v", err)
} else {
g.Log().Infof(ctx, "✅ AUTO ACK 消息成功")
}
}
}
}