Files
common/rabbitmq/consumer.go

217 lines
5.4 KiB
Go
Raw Normal View History

2025-12-03 09:59:40 +08:00
package rabbitmq
import (
"context"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/errors/gerror"
2025-12-03 09:59:40 +08:00
"github.com/gogf/gf/v2/frame/g"
amqp "github.com/rabbitmq/amqp091-go"
)
// MessageHandler 消息处理函数
type MessageHandler func(ctx context.Context, body []byte) error
// Consumer 消费者
type Consumer struct {
queue string
consumerTag string
prefetchCount int // QoS: 预取数量(并发控制)
autoAck bool // 是否自动确认
handler MessageHandler
2025-12-03 15:33:32 +08:00
workerCount int // worker 数量
cancel context.CancelFunc // 用于停止 worker
channel *amqp.Channel // 独立Channel避免并发冲突
2025-12-03 09:59:40 +08:00
}
// ConsumerOption 消费者配置选项
type ConsumerOption func(*Consumer)
// WithPrefetchCount 设置预取数量(并发控制)
func WithPrefetchCount(count int) ConsumerOption {
return func(c *Consumer) {
c.prefetchCount = count
}
}
// WithAutoAck 设置自动确认
func WithAutoAck(autoAck bool) ConsumerOption {
return func(c *Consumer) {
c.autoAck = autoAck
}
}
// WithWorkerCount 设置 worker 数量
func WithWorkerCount(count int) ConsumerOption {
return func(c *Consumer) {
c.workerCount = count
}
}
// WithConsumerTag 设置消费者标签
func WithConsumerTag(tag string) ConsumerOption {
return func(c *Consumer) {
c.consumerTag = tag
}
}
// NewConsumer 创建消费者
func NewConsumer(queue string, handler MessageHandler, opts ...ConsumerOption) *Consumer {
c := &Consumer{
queue: queue,
consumerTag: "",
prefetchCount: 1, // 默认 1 个
autoAck: false, // 默认手动确认
handler: handler,
workerCount: 1, // 默认 1 个 worker
}
// 应用选项
for _, opt := range opts {
opt(c)
}
return c
}
// Start 启动消费者
func (c *Consumer) Start(ctx context.Context) (err error) {
2025-12-03 15:33:32 +08:00
// 创建可取消的 context
workerCtx, cancel := context.WithCancel(ctx)
c.cancel = cancel
// 为每个消费者创建独立Channel避免并发冲突
conn, err := GetConnection()
if err != nil {
return gerror.Wrap(err, "获取RabbitMQ连接失败")
}
c.channel, err = conn.Channel()
2025-12-03 09:59:40 +08:00
if err != nil {
return gerror.Wrap(err, "创建独立Channel失败")
2025-12-03 09:59:40 +08:00
}
ch := c.channel
2025-12-03 09:59:40 +08:00
// 声明队列(如果不存在则创建)
2025-12-19 15:23:20 +08:00
// 注意Queue到Exchange的绑定应由message服务在发送响应时动态创建或通过运维工具提前配置
_, err = ch.QueueDeclare(
c.queue, // name
true, // durable持久化
false, // autoDelete不自动删除
false, // exclusive非独占
false, // noWait
nil, // arguments
)
if err != nil {
return gerror.Newf("声明队列失败: %v", err)
}
2025-12-03 09:59:40 +08:00
// 设置 QoS并发控制
err = ch.Qos(
c.prefetchCount, // prefetchCount: 每个 consumer 最多同时处理的消息数
0, // prefetchSize: 0 表示不限制
false, // global: false 表示仅应用于当前 channel
)
if err != nil {
return gerror.Newf("设置 QoS 失败: %v", err)
2025-12-03 09:59:40 +08:00
}
// 开始消费
msgs, err := ch.Consume(
c.queue, // queue
c.consumerTag, // consumer tag
c.autoAck, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return gerror.Newf("开始消费失败: %v", err)
2025-12-03 09:59:40 +08:00
}
g.Log().Infof(ctx, "消费者已启动: queue=%s, prefetch=%d, workers=%d",
c.queue, c.prefetchCount, c.workerCount)
// 启动多个 worker
for i := 0; i < c.workerCount; i++ {
2025-12-03 15:33:32 +08:00
go c.worker(workerCtx, i, msgs)
2025-12-03 09:59:40 +08:00
}
return
2025-12-03 09:59:40 +08:00
}
// worker 工作协程
func (c *Consumer) worker(ctx context.Context, workerID int, msgs <-chan amqp.Delivery) {
g.Log().Debugf(ctx, "Worker %d 已启动", workerID)
2025-12-03 15:33:32 +08:00
for {
select {
case <-ctx.Done():
// Context 取消,退出
g.Log().Infof(ctx, "Worker %d 收到停止信号,正在退出", workerID)
return
case msg, ok := <-msgs:
if !ok {
// Channel 关闭,退出
g.Log().Infof(ctx, "Worker %d 消息通道已关闭,退出", workerID)
return
}
2025-12-03 09:59:40 +08:00
2025-12-03 15:33:32 +08:00
// 处理消息
err := c.handler(ctx, msg.Body)
2025-12-03 09:59:40 +08:00
2025-12-03 15:33:32 +08:00
if err != nil {
g.Log().Errorf(ctx, "Worker %d 处理消息失败: %v", workerID, err)
2025-12-03 09:59:40 +08:00
2025-12-03 15:33:32 +08:00
// 如果不是自动确认,需要手动 Nack
if !c.autoAck {
// requeue=false: 不重新入队,进入死信队列
msg.Nack(false, false)
}
} else {
// 处理成功,手动确认
if !c.autoAck {
msg.Ack(false)
}
g.Log().Debugf(ctx, "Worker %d 处理消息成功", workerID)
}
2025-12-03 09:59:40 +08:00
}
}
}
// StartTypedConsumer 启动类型化消费者(自动反序列化)
func StartTypedConsumer[T any](
ctx context.Context,
queue string,
handler func(ctx context.Context, msg *T) error,
opts ...ConsumerOption,
) error {
// 包装处理函数
wrappedHandler := func(ctx context.Context, body []byte) error {
var msg T
if err := gjson.DecodeTo(body, &msg); err != nil {
return gerror.Newf("反序列化消息失败: %v", err)
2025-12-03 09:59:40 +08:00
}
return handler(ctx, &msg)
}
consumer := NewConsumer(queue, wrappedHandler, opts...)
return consumer.Start(ctx)
}
2025-12-03 15:33:32 +08:00
// Stop 停止消费者
func (c *Consumer) Stop(ctx context.Context) {
if c.cancel != nil {
c.cancel()
}
// 关闭独立Channel
if c.channel != nil && !c.channel.IsClosed() {
c.channel.Close()
g.Log().Debugf(ctx, "消费者Channel已关闭: queue=%s", c.queue)
}
g.Log().Infof(ctx, "正在停止消费者: queue=%s", c.queue)
c.cancel = nil
2025-12-03 15:33:32 +08:00
}