2025-12-03 09:59:40 +08:00
|
|
|
|
package rabbitmq
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"context"
|
|
|
|
|
|
|
2025-12-06 18:04:29 +08:00
|
|
|
|
"github.com/gogf/gf/v2/encoding/gjson"
|
|
|
|
|
|
"github.com/gogf/gf/v2/errors/gerror"
|
2025-12-03 09:59:40 +08:00
|
|
|
|
"github.com/gogf/gf/v2/frame/g"
|
|
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// MessageHandler 消息处理函数
|
|
|
|
|
|
type MessageHandler func(ctx context.Context, body []byte) error
|
|
|
|
|
|
|
|
|
|
|
|
// Consumer 消费者
|
|
|
|
|
|
type Consumer struct {
|
|
|
|
|
|
queue string
|
|
|
|
|
|
consumerTag string
|
|
|
|
|
|
prefetchCount int // QoS: 预取数量(并发控制)
|
|
|
|
|
|
autoAck bool // 是否自动确认
|
|
|
|
|
|
handler MessageHandler
|
2025-12-03 15:33:32 +08:00
|
|
|
|
workerCount int // worker 数量
|
|
|
|
|
|
cancel context.CancelFunc // 用于停止 worker
|
2025-12-03 09:59:40 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ConsumerOption 消费者配置选项
|
|
|
|
|
|
type ConsumerOption func(*Consumer)
|
|
|
|
|
|
|
|
|
|
|
|
// WithPrefetchCount 设置预取数量(并发控制)
|
|
|
|
|
|
func WithPrefetchCount(count int) ConsumerOption {
|
|
|
|
|
|
return func(c *Consumer) {
|
|
|
|
|
|
c.prefetchCount = count
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// WithAutoAck 设置自动确认
|
|
|
|
|
|
func WithAutoAck(autoAck bool) ConsumerOption {
|
|
|
|
|
|
return func(c *Consumer) {
|
|
|
|
|
|
c.autoAck = autoAck
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// WithWorkerCount 设置 worker 数量
|
|
|
|
|
|
func WithWorkerCount(count int) ConsumerOption {
|
|
|
|
|
|
return func(c *Consumer) {
|
|
|
|
|
|
c.workerCount = count
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// WithConsumerTag 设置消费者标签
|
|
|
|
|
|
func WithConsumerTag(tag string) ConsumerOption {
|
|
|
|
|
|
return func(c *Consumer) {
|
|
|
|
|
|
c.consumerTag = tag
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// NewConsumer 创建消费者
|
|
|
|
|
|
func NewConsumer(queue string, handler MessageHandler, opts ...ConsumerOption) *Consumer {
|
|
|
|
|
|
c := &Consumer{
|
|
|
|
|
|
queue: queue,
|
|
|
|
|
|
consumerTag: "",
|
|
|
|
|
|
prefetchCount: 1, // 默认 1 个
|
|
|
|
|
|
autoAck: false, // 默认手动确认
|
|
|
|
|
|
handler: handler,
|
|
|
|
|
|
workerCount: 1, // 默认 1 个 worker
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 应用选项
|
|
|
|
|
|
for _, opt := range opts {
|
|
|
|
|
|
opt(c)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return c
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Start 启动消费者
|
2025-12-06 18:04:29 +08:00
|
|
|
|
func (c *Consumer) Start(ctx context.Context) (err error) {
|
2025-12-03 15:33:32 +08:00
|
|
|
|
// 创建可取消的 context
|
|
|
|
|
|
workerCtx, cancel := context.WithCancel(ctx)
|
|
|
|
|
|
c.cancel = cancel
|
2025-12-03 09:59:40 +08:00
|
|
|
|
ch, err := GetChannel()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-12-19 15:02:05 +08:00
|
|
|
|
// 声明队列(如果不存在则创建)
|
2025-12-19 15:23:20 +08:00
|
|
|
|
// 注意:Queue到Exchange的绑定应由message服务在发送响应时动态创建,或通过运维工具提前配置
|
2025-12-19 15:02:05 +08:00
|
|
|
|
_, err = ch.QueueDeclare(
|
|
|
|
|
|
c.queue, // name
|
|
|
|
|
|
true, // durable(持久化)
|
|
|
|
|
|
false, // autoDelete(不自动删除)
|
|
|
|
|
|
false, // exclusive(非独占)
|
|
|
|
|
|
false, // noWait
|
|
|
|
|
|
nil, // arguments
|
|
|
|
|
|
)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return gerror.Newf("声明队列失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-12-03 09:59:40 +08:00
|
|
|
|
// 设置 QoS(并发控制)
|
|
|
|
|
|
err = ch.Qos(
|
|
|
|
|
|
c.prefetchCount, // prefetchCount: 每个 consumer 最多同时处理的消息数
|
|
|
|
|
|
0, // prefetchSize: 0 表示不限制
|
|
|
|
|
|
false, // global: false 表示仅应用于当前 channel
|
|
|
|
|
|
)
|
|
|
|
|
|
if err != nil {
|
2025-12-06 18:04:29 +08:00
|
|
|
|
return gerror.Newf("设置 QoS 失败: %v", err)
|
2025-12-03 09:59:40 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 开始消费
|
|
|
|
|
|
msgs, err := ch.Consume(
|
|
|
|
|
|
c.queue, // queue
|
|
|
|
|
|
c.consumerTag, // consumer tag
|
|
|
|
|
|
c.autoAck, // auto-ack
|
|
|
|
|
|
false, // exclusive
|
|
|
|
|
|
false, // no-local
|
|
|
|
|
|
false, // no-wait
|
|
|
|
|
|
nil, // args
|
|
|
|
|
|
)
|
|
|
|
|
|
if err != nil {
|
2025-12-06 18:04:29 +08:00
|
|
|
|
return gerror.Newf("开始消费失败: %v", err)
|
2025-12-03 09:59:40 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
g.Log().Infof(ctx, "消费者已启动: queue=%s, prefetch=%d, workers=%d",
|
|
|
|
|
|
c.queue, c.prefetchCount, c.workerCount)
|
|
|
|
|
|
|
|
|
|
|
|
// 启动多个 worker
|
|
|
|
|
|
for i := 0; i < c.workerCount; i++ {
|
2025-12-03 15:33:32 +08:00
|
|
|
|
go c.worker(workerCtx, i, msgs)
|
2025-12-03 09:59:40 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-12-06 18:04:29 +08:00
|
|
|
|
return
|
2025-12-03 09:59:40 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// worker 工作协程
|
|
|
|
|
|
func (c *Consumer) worker(ctx context.Context, workerID int, msgs <-chan amqp.Delivery) {
|
|
|
|
|
|
g.Log().Debugf(ctx, "Worker %d 已启动", workerID)
|
|
|
|
|
|
|
2025-12-03 15:33:32 +08:00
|
|
|
|
for {
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
|
// Context 取消,退出
|
|
|
|
|
|
g.Log().Infof(ctx, "Worker %d 收到停止信号,正在退出", workerID)
|
|
|
|
|
|
return
|
|
|
|
|
|
case msg, ok := <-msgs:
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
// Channel 关闭,退出
|
|
|
|
|
|
g.Log().Infof(ctx, "Worker %d 消息通道已关闭,退出", workerID)
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
2025-12-03 09:59:40 +08:00
|
|
|
|
|
2025-12-03 15:33:32 +08:00
|
|
|
|
// 处理消息
|
|
|
|
|
|
err := c.handler(ctx, msg.Body)
|
2025-12-03 09:59:40 +08:00
|
|
|
|
|
2025-12-03 15:33:32 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "Worker %d 处理消息失败: %v", workerID, err)
|
2025-12-03 09:59:40 +08:00
|
|
|
|
|
2025-12-03 15:33:32 +08:00
|
|
|
|
// 如果不是自动确认,需要手动 Nack
|
|
|
|
|
|
if !c.autoAck {
|
|
|
|
|
|
// requeue=false: 不重新入队,进入死信队列
|
|
|
|
|
|
msg.Nack(false, false)
|
|
|
|
|
|
}
|
|
|
|
|
|
} else {
|
|
|
|
|
|
// 处理成功,手动确认
|
|
|
|
|
|
if !c.autoAck {
|
|
|
|
|
|
msg.Ack(false)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
g.Log().Debugf(ctx, "Worker %d 处理消息成功", workerID)
|
|
|
|
|
|
}
|
2025-12-03 09:59:40 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// StartTypedConsumer 启动类型化消费者(自动反序列化)
|
|
|
|
|
|
func StartTypedConsumer[T any](
|
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
|
queue string,
|
|
|
|
|
|
handler func(ctx context.Context, msg *T) error,
|
|
|
|
|
|
opts ...ConsumerOption,
|
|
|
|
|
|
) error {
|
|
|
|
|
|
// 包装处理函数
|
|
|
|
|
|
wrappedHandler := func(ctx context.Context, body []byte) error {
|
|
|
|
|
|
var msg T
|
2025-12-06 18:04:29 +08:00
|
|
|
|
if err := gjson.DecodeTo(body, &msg); err != nil {
|
|
|
|
|
|
return gerror.Newf("反序列化消息失败: %v", err)
|
2025-12-03 09:59:40 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return handler(ctx, &msg)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
consumer := NewConsumer(queue, wrappedHandler, opts...)
|
|
|
|
|
|
return consumer.Start(ctx)
|
|
|
|
|
|
}
|
2025-12-03 15:33:32 +08:00
|
|
|
|
|
|
|
|
|
|
// Stop 停止消费者
|
|
|
|
|
|
func (c *Consumer) Stop(ctx context.Context) {
|
|
|
|
|
|
if c.cancel != nil {
|
|
|
|
|
|
g.Log().Infof(ctx, "正在停止消费者: queue=%s", c.queue)
|
|
|
|
|
|
c.cancel()
|
|
|
|
|
|
c.cancel = nil
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|