服务器准备
This commit is contained in:
171
rabbitmq/consumer_manager.go
Normal file
171
rabbitmq/consumer_manager.go
Normal file
@@ -0,0 +1,171 @@
|
||||
// Package rabbitmq 提供 RabbitMQ 消费者管理功能
|
||||
//
|
||||
// 本文件实现消费者统一管理,简化业务层的启动逻辑
|
||||
package rabbitmq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/os/glog"
|
||||
)
|
||||
|
||||
// ManagedConsumer 托管消费者(包含启动和停止函数)
|
||||
type ManagedConsumer struct {
|
||||
Name string // 消费者名称
|
||||
Start func(ctx context.Context) error // 启动函数
|
||||
Stop func(ctx context.Context) // 停止函数
|
||||
}
|
||||
|
||||
// ConsumerManager RabbitMQ 消费者管理器
|
||||
//
|
||||
// 职责:
|
||||
// 1. 统一管理所有 RabbitMQ 消费者的生命周期
|
||||
// 2. 初始化 RabbitMQ 连接和队列
|
||||
// 3. 启动/停止所有消费者
|
||||
// 4. 协调消费者的优雅退出
|
||||
//
|
||||
// 使用示例:
|
||||
//
|
||||
// mgr := rabbitmq.NewConsumerManager(ctx)
|
||||
// mgr.Register("响应消费者", responseConsumer.Start, responseConsumer.Stop)
|
||||
// mgr.Init()
|
||||
// defer mgr.Stop()
|
||||
type ConsumerManager struct {
|
||||
ctx context.Context // 全局上下文
|
||||
consumers []*ManagedConsumer // 消费者列表
|
||||
wg sync.WaitGroup // 等待所有消费者协程退出
|
||||
}
|
||||
|
||||
// NewConsumerManager 创建消费者管理器
|
||||
//
|
||||
// 参数:
|
||||
//
|
||||
// ctx: 上下文
|
||||
//
|
||||
// 返回:
|
||||
//
|
||||
// *ConsumerManager: 消费者管理器实例
|
||||
func NewConsumerManager(ctx context.Context) *ConsumerManager {
|
||||
return &ConsumerManager{
|
||||
ctx: ctx,
|
||||
consumers: make([]*ManagedConsumer, 0),
|
||||
}
|
||||
}
|
||||
|
||||
// Register 注册消费者
|
||||
//
|
||||
// 参数:
|
||||
//
|
||||
// name: 消费者名称(用于日志)
|
||||
// startFunc: 启动函数
|
||||
// stopFunc: 停止函数
|
||||
//
|
||||
// 使用示例:
|
||||
//
|
||||
// consumer := service.NewResponseConsumer(ctx)
|
||||
// mgr.Register("响应消费者", consumer.Start, consumer.Stop)
|
||||
func (cm *ConsumerManager) Register(name string, startFunc func(ctx context.Context) error, stopFunc func(ctx context.Context)) {
|
||||
cm.consumers = append(cm.consumers, &ManagedConsumer{
|
||||
Name: name,
|
||||
Start: startFunc,
|
||||
Stop: stopFunc,
|
||||
})
|
||||
}
|
||||
|
||||
// Init 初始化并启动所有消费者
|
||||
//
|
||||
// 执行流程:
|
||||
// 1. 检查 RabbitMQ 配置(未配置则跳过)
|
||||
// 2. 初始化 RabbitMQ 连接
|
||||
// 3. 声明并绑定队列(响应队列、延时落库队列)
|
||||
// 4. 异步启动所有已注册的消费者
|
||||
//
|
||||
// 返回:
|
||||
//
|
||||
// err: 错误信息,成功返回 nil
|
||||
//
|
||||
// 注意:
|
||||
// - 如果 RabbitMQ 未配置,不会报错,只是跳过初始化
|
||||
// - 响应队列初始化失败会导致 Fatal 退出
|
||||
// - 延时落库队列失败只会 Warning,不影响主流程
|
||||
func (cm *ConsumerManager) Init() (err error) {
|
||||
// 检查配置文件中是否配置了 RabbitMQ
|
||||
if g.Cfg().MustGet(cm.ctx, "rabbitmq").IsEmpty() {
|
||||
glog.Info(cm.ctx, "RabbitMQ未配置,跳过消费者初始化")
|
||||
return
|
||||
}
|
||||
|
||||
// 初始化 RabbitMQ 连接(从 config.yml 读取配置)
|
||||
if err = InitFromConfig(cm.ctx); err != nil {
|
||||
glog.Fatalf(cm.ctx, "初始化 RabbitMQ 失败: %v", err)
|
||||
return
|
||||
}
|
||||
glog.Info(cm.ctx, "RabbitMQ 连接已初始化")
|
||||
|
||||
// 设置响应队列(RAGFlow 响应消息)
|
||||
if err = SetupResponseQueue(cm.ctx); err != nil {
|
||||
glog.Fatalf(cm.ctx, "设置响应队列失败: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 设置延时落库队列(对话缓存兜底机制)
|
||||
// 失败不影响主流程,只记录 Warning
|
||||
if err = SetupDelayedFlushQueue(cm.ctx); err != nil {
|
||||
glog.Warningf(cm.ctx, "设置延时落库队列失败: %v", err)
|
||||
}
|
||||
|
||||
// 异步启动所有已注册的消费者
|
||||
cm.startConsumers()
|
||||
return
|
||||
}
|
||||
|
||||
// startConsumers 启动所有消费者(内部方法)
|
||||
//
|
||||
// 实现:
|
||||
// 1. 遍历已注册的消费者
|
||||
// 2. 每个消费者在独立的 goroutine 中运行
|
||||
// 3. 使用 WaitGroup 追踪所有消费者协程
|
||||
func (cm *ConsumerManager) startConsumers() {
|
||||
for _, c := range cm.consumers {
|
||||
cm.wg.Add(1)
|
||||
go func(consumer *ManagedConsumer) {
|
||||
defer cm.wg.Done()
|
||||
if err := consumer.Start(cm.ctx); err != nil {
|
||||
glog.Errorf(cm.ctx, "%s启动失败: %v", consumer.Name, err)
|
||||
}
|
||||
}(c)
|
||||
glog.Infof(cm.ctx, "%s已启动", c.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// Stop 停止所有消费者(优雅退出)
|
||||
//
|
||||
// 执行流程:
|
||||
// 1. 依次停止所有消费者(调用各自的 Stop 方法)
|
||||
// 2. 等待所有消费者协程退出(WaitGroup.Wait)
|
||||
// 3. 关闭 RabbitMQ 连接
|
||||
//
|
||||
// 使用场景:
|
||||
// - 收到 SIGINT/SIGTERM 信号时
|
||||
// - 程序正常退出时
|
||||
// - defer mgr.Stop()
|
||||
//
|
||||
// 注意:
|
||||
// - Stop 方法会阻塞直到所有消费者完全退出
|
||||
// - 确保消费者能正确响应 Stop 信号
|
||||
func (cm *ConsumerManager) Stop() {
|
||||
// 依次停止所有消费者
|
||||
for _, c := range cm.consumers {
|
||||
c.Stop(cm.ctx)
|
||||
glog.Infof(cm.ctx, "%s已停止", c.Name)
|
||||
}
|
||||
|
||||
// 等待所有消费者协程退出
|
||||
cm.wg.Wait()
|
||||
|
||||
// 关闭 RabbitMQ 连接
|
||||
Close(cm.ctx)
|
||||
glog.Info(cm.ctx, "所有消费者已停止,RabbitMQ连接已关闭")
|
||||
}
|
||||
Reference in New Issue
Block a user