Files
common/message/rabbit.go

352 lines
8.6 KiB
Go
Raw Normal View History

2026-01-09 10:19:31 +08:00
package message
import (
"context"
"sync"
"time"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
rabbitConn *amqp.Connection
rabbitChannel *amqp.Channel
rabbitOnce sync.Once
rabbitMu sync.RWMutex
rabbitCloseWatcher chan struct{}
rabbitWatcherStarted bool
)
// Config RabbitMQ 配置
type RabbitMQConfig struct {
Host string
Port int
Username string
Password string
VHost string
}
// rabbitMQConfig 默认配置
func getRabbitMQConfig() *RabbitMQConfig {
return &RabbitMQConfig{
Host: g.Cfg().MustGet(context.Background(), "rabbitmq.host").String(),
Port: g.Cfg().MustGet(context.Background(), "rabbitmq.port").Int(),
Username: g.Cfg().MustGet(context.Background(), "rabbitmq.username").String(),
Password: g.Cfg().MustGet(context.Background(), "rabbitmq.password").String(),
VHost: g.Cfg().MustGet(context.Background(), "rabbitmq.vhost", "/").String(),
}
}
// initRabbitMQ 初始化 RabbitMQ 连接
func initRabbitMQ(ctx context.Context) error {
var err error
rabbitOnce.Do(func() {
cfg := getRabbitMQConfig()
url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost
rabbitConn, err = amqp.Dial(url)
if err != nil {
g.Log().Errorf(ctx, "RabbitMQ 连接失败: %v", err)
return
}
rabbitChannel, err = rabbitConn.Channel()
if err != nil {
g.Log().Errorf(ctx, "创建 RabbitMQ Channel 失败: %v", err)
return
}
rabbitCloseWatcher = make(chan struct{})
if !rabbitWatcherStarted {
go handleRabbitMQConnectionClose(ctx)
rabbitWatcherStarted = true
}
g.Log().Info(ctx, "RabbitMQ 连接成功")
})
return err
}
// getRabbitMQChannel 获取 RabbitMQ Channel
func getRabbitMQChannel() (*amqp.Channel, error) {
rabbitMu.RLock()
defer rabbitMu.RUnlock()
if rabbitChannel == nil || rabbitChannel.IsClosed() {
return nil, gerror.New("RabbitMQ Channel 未初始化或已关闭")
}
return rabbitChannel, nil
}
// getRabbitMQConnection 获取 RabbitMQ 连接
func getRabbitMQConnection() (*amqp.Connection, error) {
rabbitMu.RLock()
defer rabbitMu.RUnlock()
if rabbitConn == nil || rabbitConn.IsClosed() {
return nil, gerror.New("RabbitMQ 连接未初始化或已关闭")
}
return rabbitConn, nil
}
// handleRabbitMQConnectionClose 监听连接关闭并重连
func handleRabbitMQConnectionClose(ctx context.Context) {
for {
select {
case <-rabbitCloseWatcher:
g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态")
return
default:
}
rabbitMu.RLock()
currentConn := rabbitConn
rabbitMu.RUnlock()
if currentConn == nil {
return
}
closeErr := make(chan *amqp.Error, 1)
currentConn.NotifyClose(closeErr)
select {
case err := <-closeErr:
if err != nil {
g.Log().Errorf(ctx, "RabbitMQ 连接关闭: %v尝试重连...", err)
reconnectRabbitMQ(ctx)
}
case <-rabbitCloseWatcher:
g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态")
return
}
}
}
// reconnectRabbitMQ 重新连接
func reconnectRabbitMQ(ctx context.Context) {
rabbitMu.Lock()
defer rabbitMu.Unlock()
for i := 0; i < 10; i++ {
time.Sleep(time.Duration(i+1) * time.Second)
cfg := getRabbitMQConfig()
url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost
var err error
rabbitConn, err = amqp.Dial(url)
if err != nil {
g.Log().Errorf(ctx, "重连失败 (尝试 %d/10): %v", i+1, err)
continue
}
rabbitChannel, err = rabbitConn.Channel()
if err != nil {
g.Log().Errorf(ctx, "创建 Channel 失败 (尝试 %d/10): %v", i+1, err)
continue
}
g.Log().Info(ctx, "RabbitMQ 重连成功")
return
}
g.Log().Fatal(ctx, "RabbitMQ 重连失败,已达到最大重试次数")
}
// startRabbitMQConsumer 启动 RabbitMQ 消费者
func startRabbitMQConsumer(ctx context.Context, msg QueueMessage) error {
// 初始化连接
if err := initRabbitMQ(ctx); err != nil {
return gerror.Wrap(err, "初始化 RabbitMQ 连接失败")
}
// 创建独立 Channel避免并发冲突
conn, err := getRabbitMQConnection()
if err != nil {
return gerror.Wrap(err, "获取RabbitMQ连接失败")
}
ch, err := conn.Channel()
if err != nil {
return gerror.Wrap(err, "创建独立Channel失败")
}
// 声明队列
_, err = ch.QueueDeclare(
msg.Queue, // name
true, // durable
false, // autoDelete
false, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
return gerror.Newf("声明队列失败: %v", err)
}
// 设置 QoS并发控制
prefetchCount := msg.PrefetchCount
if prefetchCount == 0 {
prefetchCount = 1
}
err = ch.Qos(
prefetchCount, // prefetchCount
0, // prefetchSize
false, // global
)
if err != nil {
return gerror.Newf("设置 QoS 失败: %v", err)
}
// 开始消费
msgs, err := ch.Consume(
msg.Queue, // queue
msg.ConsumerTag, // consumer tag
msg.AutoAck, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return gerror.Newf("开始消费失败: %v", err)
}
workerCount := msg.WorkerCount
if workerCount == 0 {
workerCount = 1
}
g.Log().Infof(ctx, "RabbitMQ 消费者已启动: queue=%s, prefetch=%d, workers=%d",
msg.Queue, prefetchCount, workerCount)
// 启动多个 worker
for i := 0; i < workerCount; i++ {
go rabbitMQWorker(ctx, i, msgs, msg)
}
return nil
}
// rabbitMQWorker RabbitMQ 工作协程
func rabbitMQWorker(ctx context.Context, workerID int, msgs <-chan amqp.Delivery, msg QueueMessage) {
g.Log().Debugf(ctx, "RabbitMQ Worker %d 已启动", workerID)
for {
select {
case <-ctx.Done():
g.Log().Infof(ctx, "RabbitMQ Worker %d 收到停止信号,正在退出", workerID)
return
case delivery, ok := <-msgs:
if !ok {
g.Log().Infof(ctx, "RabbitMQ Worker %d 消息通道已关闭,退出", workerID)
return
}
// 反序列化消息
var message map[string]interface{}
if err := gjson.DecodeTo(delivery.Body, &message); err != nil {
g.Log().Errorf(ctx, "RabbitMQ Worker %d 反序列化消息失败: %v", workerID, err)
if !msg.AutoAck {
delivery.Nack(false, false)
}
continue
}
// 处理消息
err := msg.HandleFunc(ctx, message)
if err != nil {
g.Log().Errorf(ctx, "RabbitMQ Worker %d 处理消息失败: %v", workerID, err)
if !msg.AutoAck {
delivery.Nack(false, false)
}
} else {
if !msg.AutoAck {
delivery.Ack(false)
}
g.Log().Debugf(ctx, "RabbitMQ Worker %d 处理消息成功", workerID)
}
}
}
}
// publishToRabbitMQ 发布消息到 RabbitMQ
func publishToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}) (messageID string, err error) {
ch, err := getRabbitMQChannel()
if err != nil {
return
}
body, err := gjson.Encode(message)
if err != nil {
return "", gerror.Newf("消息序列化失败: %v", err)
}
err = ch.PublishWithContext(
ctx,
exchange, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: body,
},
)
if err != nil {
g.Log().Errorf(ctx, "发布消息失败: exchange=%s, routingKey=%s, err=%v", exchange, routingKey, err)
return
}
g.Log().Debugf(ctx, "消息发布成功: exchange=%s, routingKey=%s", exchange, routingKey)
return messageID, nil
}
// publishDelayedToRabbitMQ 发布延时消息到 RabbitMQ
func publishDelayedToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}, delaySeconds int) (messageID string, err error) {
ch, err := getRabbitMQChannel()
if err != nil {
return
}
body, err := gjson.Encode(message)
if err != nil {
return "", gerror.Newf("消息序列化失败: %v", err)
}
err = ch.PublishWithContext(
ctx,
exchange, // exchange必须是 x-delayed-message 类型)
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: body,
Headers: amqp.Table{
"x-delay": delaySeconds * 1000, // 延时(毫秒)
},
},
)
if err != nil {
g.Log().Errorf(ctx, "发布延时消息失败: exchange=%s, routingKey=%s, delay=%ds, err=%v", exchange, routingKey, delaySeconds, err)
return
}
g.Log().Debugf(ctx, "延时消息发布成功: exchange=%s, routingKey=%s, delay=%ds", exchange, routingKey, delaySeconds)
return messageID, nil
}