Files
common/rabbitmq/consumer_manager.go

172 lines
4.7 KiB
Go
Raw Normal View History

2025-12-18 18:01:21 +08:00
// Package rabbitmq 提供 RabbitMQ 消费者管理功能
//
// 本文件实现消费者统一管理,简化业务层的启动逻辑
package rabbitmq
import (
"context"
"sync"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
)
// ManagedConsumer 托管消费者(包含启动和停止函数)
type ManagedConsumer struct {
Name string // 消费者名称
Start func(ctx context.Context) error // 启动函数
Stop func(ctx context.Context) // 停止函数
}
// ConsumerManager RabbitMQ 消费者管理器
//
// 职责:
// 1. 统一管理所有 RabbitMQ 消费者的生命周期
// 2. 初始化 RabbitMQ 连接和队列
// 3. 启动/停止所有消费者
// 4. 协调消费者的优雅退出
//
// 使用示例:
//
// mgr := rabbitmq.NewConsumerManager(ctx)
// mgr.Register("响应消费者", responseConsumer.Start, responseConsumer.Stop)
// mgr.Init()
// defer mgr.Stop()
type ConsumerManager struct {
ctx context.Context // 全局上下文
consumers []*ManagedConsumer // 消费者列表
wg sync.WaitGroup // 等待所有消费者协程退出
}
// NewConsumerManager 创建消费者管理器
//
// 参数:
//
// ctx: 上下文
//
// 返回:
//
// *ConsumerManager: 消费者管理器实例
func NewConsumerManager(ctx context.Context) *ConsumerManager {
return &ConsumerManager{
ctx: ctx,
consumers: make([]*ManagedConsumer, 0),
}
}
// Register 注册消费者
//
// 参数:
//
// name: 消费者名称(用于日志)
// startFunc: 启动函数
// stopFunc: 停止函数
//
// 使用示例:
//
// consumer := service.NewResponseConsumer(ctx)
// mgr.Register("响应消费者", consumer.Start, consumer.Stop)
func (cm *ConsumerManager) Register(name string, startFunc func(ctx context.Context) error, stopFunc func(ctx context.Context)) {
cm.consumers = append(cm.consumers, &ManagedConsumer{
Name: name,
Start: startFunc,
Stop: stopFunc,
})
}
// Init 初始化并启动所有消费者
//
// 执行流程:
// 1. 检查 RabbitMQ 配置(未配置则跳过)
// 2. 初始化 RabbitMQ 连接
// 3. 声明并绑定队列(响应队列、延时落库队列)
// 4. 异步启动所有已注册的消费者
//
// 返回:
//
// err: 错误信息,成功返回 nil
//
// 注意:
// - 如果 RabbitMQ 未配置,不会报错,只是跳过初始化
// - 响应队列初始化失败会导致 Fatal 退出
// - 延时落库队列失败只会 Warning不影响主流程
func (cm *ConsumerManager) Init() (err error) {
// 检查配置文件中是否配置了 RabbitMQ
if g.Cfg().MustGet(cm.ctx, "rabbitmq").IsEmpty() {
glog.Info(cm.ctx, "RabbitMQ未配置跳过消费者初始化")
return
}
// 初始化 RabbitMQ 连接(从 config.yml 读取配置)
if err = InitFromConfig(cm.ctx); err != nil {
glog.Fatalf(cm.ctx, "初始化 RabbitMQ 失败: %v", err)
return
}
glog.Info(cm.ctx, "RabbitMQ 连接已初始化")
// 设置响应队列RAGFlow 响应消息)
if err = SetupResponseQueue(cm.ctx); err != nil {
glog.Fatalf(cm.ctx, "设置响应队列失败: %v", err)
return
}
// 设置延时落库队列(对话缓存兜底机制)
// 失败不影响主流程,只记录 Warning
if err = SetupDelayedFlushQueue(cm.ctx); err != nil {
glog.Warningf(cm.ctx, "设置延时落库队列失败: %v", err)
}
// 异步启动所有已注册的消费者
cm.startConsumers()
return
}
// startConsumers 启动所有消费者(内部方法)
//
// 实现:
// 1. 遍历已注册的消费者
// 2. 每个消费者在独立的 goroutine 中运行
// 3. 使用 WaitGroup 追踪所有消费者协程
func (cm *ConsumerManager) startConsumers() {
for _, c := range cm.consumers {
cm.wg.Add(1)
go func(consumer *ManagedConsumer) {
defer cm.wg.Done()
if err := consumer.Start(cm.ctx); err != nil {
glog.Errorf(cm.ctx, "%s启动失败: %v", consumer.Name, err)
}
}(c)
glog.Infof(cm.ctx, "%s已启动", c.Name)
}
}
// Stop 停止所有消费者(优雅退出)
//
// 执行流程:
// 1. 依次停止所有消费者(调用各自的 Stop 方法)
// 2. 等待所有消费者协程退出WaitGroup.Wait
// 3. 关闭 RabbitMQ 连接
//
// 使用场景:
// - 收到 SIGINT/SIGTERM 信号时
// - 程序正常退出时
// - defer mgr.Stop()
//
// 注意:
// - Stop 方法会阻塞直到所有消费者完全退出
// - 确保消费者能正确响应 Stop 信号
func (cm *ConsumerManager) Stop() {
// 依次停止所有消费者
for _, c := range cm.consumers {
c.Stop(cm.ctx)
glog.Infof(cm.ctx, "%s已停止", c.Name)
}
// 等待所有消费者协程退出
cm.wg.Wait()
// 关闭 RabbitMQ 连接
Close(cm.ctx)
glog.Info(cm.ctx, "所有消费者已停止RabbitMQ连接已关闭")
}