96 lines
2.3 KiB
Go
96 lines
2.3 KiB
Go
|
|
// Package rabbitmq - RabbitMQ延时消息发布
|
|||
|
|
package rabbitmq
|
|||
|
|
|
|||
|
|
import (
|
|||
|
|
"context"
|
|||
|
|
"time"
|
|||
|
|
|
|||
|
|
"github.com/gogf/gf/v2/encoding/gjson"
|
|||
|
|
"github.com/gogf/gf/v2/errors/gerror"
|
|||
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// PublishWithDelay 发布延时消息到RabbitMQ
|
|||
|
|
// delaySeconds: 延时秒数
|
|||
|
|
func PublishWithDelay(ctx context.Context, routingKey string, message interface{}, delaySeconds int) error {
|
|||
|
|
ch, err := GetChannel()
|
|||
|
|
if err != nil {
|
|||
|
|
return gerror.Wrap(err, "获取RabbitMQ通道失败")
|
|||
|
|
}
|
|||
|
|
if ch == nil {
|
|||
|
|
return gerror.New("RabbitMQ通道未初始化")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 序列化消息
|
|||
|
|
body, err := gjson.Encode(message)
|
|||
|
|
if err != nil {
|
|||
|
|
return gerror.Wrapf(err, "序列化消息失败")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 声明延时交换机(x-delayed-message类型)
|
|||
|
|
// 注意:需要RabbitMQ安装延时插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
|
|||
|
|
exchangeName := "delayed.exchange"
|
|||
|
|
err = ch.ExchangeDeclare(
|
|||
|
|
exchangeName,
|
|||
|
|
"x-delayed-message", // 延时交换机类型
|
|||
|
|
true, // durable
|
|||
|
|
false, // auto-deleted
|
|||
|
|
false, // internal
|
|||
|
|
false, // no-wait
|
|||
|
|
amqp.Table{
|
|||
|
|
"x-delayed-type": "direct", // 底层交换机类型
|
|||
|
|
},
|
|||
|
|
)
|
|||
|
|
if err != nil {
|
|||
|
|
return gerror.Wrapf(err, "声明延时交换机失败")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 声明队列
|
|||
|
|
queue, err := ch.QueueDeclare(
|
|||
|
|
routingKey, // 队列名使用routingKey
|
|||
|
|
true, // durable
|
|||
|
|
false, // delete when unused
|
|||
|
|
false, // exclusive
|
|||
|
|
false, // no-wait
|
|||
|
|
nil,
|
|||
|
|
)
|
|||
|
|
if err != nil {
|
|||
|
|
return gerror.Wrapf(err, "声明队列失败")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 绑定队列到交换机
|
|||
|
|
err = ch.QueueBind(
|
|||
|
|
queue.Name, // queue name
|
|||
|
|
routingKey, // routing key
|
|||
|
|
exchangeName, // exchange
|
|||
|
|
false,
|
|||
|
|
nil,
|
|||
|
|
)
|
|||
|
|
if err != nil {
|
|||
|
|
return gerror.Wrapf(err, "绑定队列失败")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 发布延时消息
|
|||
|
|
err = ch.PublishWithContext(
|
|||
|
|
ctx,
|
|||
|
|
exchangeName, // exchange
|
|||
|
|
routingKey, // routing key
|
|||
|
|
false, // mandatory
|
|||
|
|
false, // immediate
|
|||
|
|
amqp.Publishing{
|
|||
|
|
ContentType: "application/json",
|
|||
|
|
Body: body,
|
|||
|
|
DeliveryMode: amqp.Persistent, // 持久化消息
|
|||
|
|
Headers: amqp.Table{
|
|||
|
|
"x-delay": delaySeconds * 1000, // 延时时间(毫秒)
|
|||
|
|
},
|
|||
|
|
Timestamp: time.Now(),
|
|||
|
|
},
|
|||
|
|
)
|
|||
|
|
if err != nil {
|
|||
|
|
return gerror.Wrapf(err, "发布延时消息失败")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return nil
|
|||
|
|
}
|