176 lines
4.8 KiB
Go
176 lines
4.8 KiB
Go
// Package rabbitmq 提供 RabbitMQ 消费者管理功能
|
||
//
|
||
// 本文件实现消费者统一管理,简化业务层的启动逻辑
|
||
package rabbitmq
|
||
|
||
import (
|
||
"context"
|
||
"sync"
|
||
|
||
"github.com/gogf/gf/v2/frame/g"
|
||
"github.com/gogf/gf/v2/os/glog"
|
||
)
|
||
|
||
// ManagedConsumer 托管消费者(包含启动和停止函数)
|
||
type ManagedConsumer struct {
|
||
Name string // 消费者名称
|
||
Start func(ctx context.Context) error // 启动函数
|
||
Stop func(ctx context.Context) // 停止函数
|
||
}
|
||
|
||
// ConsumerManager RabbitMQ 消费者管理器
|
||
//
|
||
// 职责:
|
||
// 1. 统一管理所有 RabbitMQ 消费者的生命周期
|
||
// 2. 初始化 RabbitMQ 连接和队列
|
||
// 3. 启动/停止所有消费者
|
||
// 4. 协调消费者的优雅退出
|
||
//
|
||
// 使用示例:
|
||
//
|
||
// mgr := rabbitmq.NewConsumerManager(ctx)
|
||
// mgr.Register("响应消费者", responseConsumer.Start, responseConsumer.Stop)
|
||
// mgr.Init()
|
||
// defer mgr.Stop()
|
||
type ConsumerManager struct {
|
||
ctx context.Context // 全局上下文
|
||
consumers []*ManagedConsumer // 消费者列表
|
||
wg sync.WaitGroup // 等待所有消费者协程退出
|
||
}
|
||
|
||
// NewConsumerManager 创建消费者管理器
|
||
//
|
||
// 参数:
|
||
//
|
||
// ctx: 上下文
|
||
//
|
||
// 返回:
|
||
//
|
||
// *ConsumerManager: 消费者管理器实例
|
||
func NewConsumerManager(ctx context.Context) *ConsumerManager {
|
||
return &ConsumerManager{
|
||
ctx: ctx,
|
||
consumers: make([]*ManagedConsumer, 0),
|
||
}
|
||
}
|
||
|
||
// Register 注册消费者
|
||
//
|
||
// 参数:
|
||
//
|
||
// name: 消费者名称(用于日志)
|
||
// startFunc: 启动函数
|
||
// stopFunc: 停止函数
|
||
//
|
||
// 使用示例:
|
||
//
|
||
// consumer := service.NewResponseConsumer(ctx)
|
||
// mgr.Register("响应消费者", consumer.Start, consumer.Stop)
|
||
func (cm *ConsumerManager) Register(name string, startFunc func(ctx context.Context) error, stopFunc func(ctx context.Context)) {
|
||
cm.consumers = append(cm.consumers, &ManagedConsumer{
|
||
Name: name,
|
||
Start: startFunc,
|
||
Stop: stopFunc,
|
||
})
|
||
}
|
||
|
||
// Init 初始化并启动所有消费者
|
||
//
|
||
// 执行流程:
|
||
// 1. 检查 RabbitMQ 配置(未配置则跳过)
|
||
// 2. 初始化 RabbitMQ 连接
|
||
// 3. 声明并绑定队列(响应队列、延时落库队列)
|
||
// 4. 异步启动所有已注册的消费者
|
||
//
|
||
// 返回:
|
||
//
|
||
// err: 错误信息,成功返回 nil
|
||
//
|
||
// 注意:
|
||
// - 如果 RabbitMQ 未配置,不会报错,只是跳过初始化
|
||
// - 响应队列初始化失败会导致 Fatal 退出
|
||
// - 延时落库队列失败只会 Warning,不影响主流程
|
||
func (cm *ConsumerManager) Init() (err error) {
|
||
// 检查配置文件中是否配置了 RabbitMQ
|
||
if g.Cfg().MustGet(cm.ctx, "rabbitmq").IsEmpty() {
|
||
glog.Info(cm.ctx, "RabbitMQ未配置,跳过消费者初始化")
|
||
return
|
||
}
|
||
|
||
// 初始化 RabbitMQ 连接(从 config.yml 读取配置)
|
||
if err = InitFromConfig(cm.ctx); err != nil {
|
||
glog.Fatalf(cm.ctx, "初始化 RabbitMQ 失败: %v", err)
|
||
return
|
||
}
|
||
glog.Info(cm.ctx, "RabbitMQ 连接已初始化")
|
||
|
||
// 声明响应Exchange(队列由各消费者自己声明和绑定)
|
||
if err = DeclareExchange(cm.ctx, &ExchangeConfig{
|
||
Name: "ragflow.response",
|
||
Type: "topic",
|
||
Durable: true,
|
||
}); err != nil {
|
||
glog.Fatalf(cm.ctx, "声明响应Exchange失败: %v", err)
|
||
return
|
||
}
|
||
|
||
// 设置延时落库队列(对话缓存兜底机制)
|
||
// 失败不影响主流程,只记录 Warning
|
||
if err = SetupDelayedFlushQueue(cm.ctx); err != nil {
|
||
glog.Warningf(cm.ctx, "设置延时落库队列失败: %v", err)
|
||
}
|
||
|
||
// 异步启动所有已注册的消费者
|
||
cm.startConsumers()
|
||
return
|
||
}
|
||
|
||
// startConsumers 启动所有消费者(内部方法)
|
||
//
|
||
// 实现:
|
||
// 1. 遍历已注册的消费者
|
||
// 2. 每个消费者在独立的 goroutine 中运行
|
||
// 3. 使用 WaitGroup 追踪所有消费者协程
|
||
func (cm *ConsumerManager) startConsumers() {
|
||
for _, c := range cm.consumers {
|
||
cm.wg.Add(1)
|
||
go func(consumer *ManagedConsumer) {
|
||
defer cm.wg.Done()
|
||
if err := consumer.Start(cm.ctx); err != nil {
|
||
glog.Errorf(cm.ctx, "%s启动失败: %v", consumer.Name, err)
|
||
}
|
||
}(c)
|
||
glog.Infof(cm.ctx, "%s已启动", c.Name)
|
||
}
|
||
}
|
||
|
||
// Stop 停止所有消费者(优雅退出)
|
||
//
|
||
// 执行流程:
|
||
// 1. 依次停止所有消费者(调用各自的 Stop 方法)
|
||
// 2. 等待所有消费者协程退出(WaitGroup.Wait)
|
||
// 3. 关闭 RabbitMQ 连接
|
||
//
|
||
// 使用场景:
|
||
// - 收到 SIGINT/SIGTERM 信号时
|
||
// - 程序正常退出时
|
||
// - defer mgr.Stop()
|
||
//
|
||
// 注意:
|
||
// - Stop 方法会阻塞直到所有消费者完全退出
|
||
// - 确保消费者能正确响应 Stop 信号
|
||
func (cm *ConsumerManager) Stop() {
|
||
// 依次停止所有消费者
|
||
for _, c := range cm.consumers {
|
||
c.Stop(cm.ctx)
|
||
glog.Infof(cm.ctx, "%s已停止", c.Name)
|
||
}
|
||
|
||
// 等待所有消费者协程退出
|
||
cm.wg.Wait()
|
||
|
||
// 关闭 RabbitMQ 连接
|
||
Close(cm.ctx)
|
||
glog.Info(cm.ctx, "所有消费者已停止,RabbitMQ连接已关闭")
|
||
}
|