2025-12-04 17:39:31 +08:00
|
|
|
|
package ragflow
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"context"
|
2025-12-06 12:02:34 +08:00
|
|
|
|
"sync"
|
2025-12-04 17:39:31 +08:00
|
|
|
|
|
|
|
|
|
|
"gitee.com/red-future---jilin-g/common/redis"
|
|
|
|
|
|
"github.com/gogf/gf/v2/errors/gerror"
|
|
|
|
|
|
"github.com/gogf/gf/v2/os/glog"
|
|
|
|
|
|
"github.com/gogf/gf/v2/os/grpool"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// WorkerPool RAGFlow 请求处理协程池
|
|
|
|
|
|
type WorkerPool struct {
|
|
|
|
|
|
pool *grpool.Pool
|
|
|
|
|
|
size int
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-12-06 12:02:34 +08:00
|
|
|
|
// 单例模式相关变量
|
|
|
|
|
|
var (
|
|
|
|
|
|
workerPoolInstance *WorkerPool
|
|
|
|
|
|
workerPoolOnce sync.Once
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// GetWorkerPoolWithSize 获取指定大小的协程池单例
|
|
|
|
|
|
// 使用 sync.Once 确保只创建一次,size 仅首次调用生效
|
|
|
|
|
|
func GetWorkerPoolWithSize(size int) *WorkerPool {
|
|
|
|
|
|
workerPoolOnce.Do(func() {
|
|
|
|
|
|
if size <= 0 {
|
|
|
|
|
|
size = 200 // 默认大小
|
|
|
|
|
|
}
|
|
|
|
|
|
workerPoolInstance = &WorkerPool{
|
|
|
|
|
|
pool: grpool.New(size),
|
|
|
|
|
|
size: size,
|
|
|
|
|
|
}
|
|
|
|
|
|
})
|
|
|
|
|
|
return workerPoolInstance
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetWorkerPool 获取协程池单例(使用默认大小 200)
|
|
|
|
|
|
func GetWorkerPool() *WorkerPool {
|
|
|
|
|
|
return GetWorkerPoolWithSize(200)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// NewWorkerPool 创建协程池(兼容旧代码,内部使用单例)
|
2025-12-04 17:39:31 +08:00
|
|
|
|
// 参数:
|
2025-12-06 12:02:34 +08:00
|
|
|
|
// - size: 协程池大小,仅首次调用生效
|
2025-12-04 17:39:31 +08:00
|
|
|
|
//
|
|
|
|
|
|
// 返回:
|
2025-12-06 12:02:34 +08:00
|
|
|
|
// - *WorkerPool: 协程池单例实例
|
2025-12-04 17:39:31 +08:00
|
|
|
|
// - error: 创建失败时返回错误
|
|
|
|
|
|
func NewWorkerPool(size int) (*WorkerPool, error) {
|
|
|
|
|
|
if size <= 0 {
|
|
|
|
|
|
return nil, gerror.New("协程池大小必须大于0")
|
|
|
|
|
|
}
|
2025-12-06 12:02:34 +08:00
|
|
|
|
return GetWorkerPoolWithSize(size), nil
|
2025-12-04 17:39:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Submit 提交任务到协程池
|
|
|
|
|
|
// 参数:
|
|
|
|
|
|
// - ctx: 上下文
|
|
|
|
|
|
// - task: 要执行的任务函数
|
|
|
|
|
|
//
|
|
|
|
|
|
// 返回:error 提交失败时返回错误
|
|
|
|
|
|
func (w *WorkerPool) Submit(ctx context.Context, task func(ctx context.Context)) error {
|
|
|
|
|
|
return w.pool.Add(ctx, func(ctx context.Context) {
|
|
|
|
|
|
defer func() {
|
|
|
|
|
|
if r := recover(); r != nil {
|
|
|
|
|
|
glog.Errorf(ctx, "协程池任务执行 panic: %v", r)
|
|
|
|
|
|
}
|
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
|
|
task(ctx)
|
|
|
|
|
|
})
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Size 获取协程池大小
|
|
|
|
|
|
func (w *WorkerPool) Size() int {
|
|
|
|
|
|
return w.size
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Jobs 获取当前等待执行的任务数量
|
|
|
|
|
|
func (w *WorkerPool) Jobs() int {
|
|
|
|
|
|
return w.pool.Jobs()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Close 关闭协程池
|
|
|
|
|
|
func (w *WorkerPool) Close() {
|
|
|
|
|
|
w.pool.Close()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// WorkerStats 协程池统计信息
|
|
|
|
|
|
type WorkerStats struct {
|
|
|
|
|
|
PoolSize int // 协程池大小
|
|
|
|
|
|
Jobs int // 等待执行的任务数
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Stats 获取协程池统计信息
|
|
|
|
|
|
func (w *WorkerPool) Stats() WorkerStats {
|
|
|
|
|
|
return WorkerStats{
|
|
|
|
|
|
PoolSize: w.size,
|
|
|
|
|
|
Jobs: w.pool.Jobs(),
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// PrintStats 打印协程池统计信息
|
|
|
|
|
|
func (w *WorkerPool) PrintStats(ctx context.Context) {
|
|
|
|
|
|
stats := w.Stats()
|
|
|
|
|
|
glog.Infof(ctx, "协程池统计 - 池大小: %d, 等待任务: %d", stats.PoolSize, stats.Jobs)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-12-05 11:44:07 +08:00
|
|
|
|
// QueueProcessor Stream 处理器,从 Redis Stream 中取出任务并提交到协程池
|
2025-12-04 17:39:31 +08:00
|
|
|
|
type QueueProcessor struct {
|
2025-12-05 11:44:07 +08:00
|
|
|
|
pool *WorkerPool
|
|
|
|
|
|
streamKey string // Stream 键名
|
|
|
|
|
|
groupName string // 消费者组名称
|
|
|
|
|
|
consumerName string // 消费者名称
|
|
|
|
|
|
timeout int64 // 阻塞超时时间(毫秒)
|
|
|
|
|
|
batchSize int64 // 每次读取的消息数量
|
|
|
|
|
|
stopChan chan struct{}
|
|
|
|
|
|
handleFunc func(ctx context.Context, message map[string]interface{}) error
|
2025-12-04 17:39:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-12-05 11:44:07 +08:00
|
|
|
|
// NewQueueProcessor 创建 Stream 处理器
|
2025-12-04 17:39:31 +08:00
|
|
|
|
// 参数:
|
|
|
|
|
|
// - pool: 协程池
|
2025-12-05 11:44:07 +08:00
|
|
|
|
// - streamKey: Redis Stream 键名
|
|
|
|
|
|
// - groupName: 消费者组名称
|
|
|
|
|
|
// - consumerName: 消费者名称(唯一标识)
|
|
|
|
|
|
// - timeout: 从 Stream 取消息的超时时间(毫秒)
|
|
|
|
|
|
// - batchSize: 每次读取的消息数量
|
2025-12-04 17:39:31 +08:00
|
|
|
|
// - handleFunc: 消息处理函数
|
2025-12-05 11:44:07 +08:00
|
|
|
|
func NewQueueProcessor(pool *WorkerPool, streamKey, groupName, consumerName string, timeout int64, batchSize int64, handleFunc func(ctx context.Context, message map[string]interface{}) error) *QueueProcessor {
|
2025-12-04 17:39:31 +08:00
|
|
|
|
return &QueueProcessor{
|
2025-12-05 11:44:07 +08:00
|
|
|
|
pool: pool,
|
|
|
|
|
|
streamKey: streamKey,
|
|
|
|
|
|
groupName: groupName,
|
|
|
|
|
|
consumerName: consumerName,
|
|
|
|
|
|
timeout: timeout,
|
|
|
|
|
|
batchSize: batchSize,
|
|
|
|
|
|
stopChan: make(chan struct{}),
|
|
|
|
|
|
handleFunc: handleFunc,
|
2025-12-04 17:39:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-12-05 11:44:07 +08:00
|
|
|
|
// Start 启动 Stream 处理器
|
|
|
|
|
|
// 会阻塞运行,持续从 Redis Stream 中取出消息并提交到协程池处理
|
2025-12-04 17:39:31 +08:00
|
|
|
|
func (q *QueueProcessor) Start(ctx context.Context) error {
|
2025-12-05 11:44:07 +08:00
|
|
|
|
glog.Infof(ctx, "Stream 处理器启动 - Stream: %s, 消费者组: %s, 消费者: %s, 超时: %dms",
|
|
|
|
|
|
q.streamKey, q.groupName, q.consumerName, q.timeout)
|
2025-12-04 17:39:31 +08:00
|
|
|
|
|
2025-12-06 18:04:29 +08:00
|
|
|
|
loopCount := 0
|
2025-12-04 17:39:31 +08:00
|
|
|
|
for {
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-q.stopChan:
|
2025-12-05 11:44:07 +08:00
|
|
|
|
glog.Info(ctx, "Stream 处理器收到停止信号")
|
2025-12-04 17:39:31 +08:00
|
|
|
|
return nil
|
|
|
|
|
|
default:
|
2025-12-06 18:04:29 +08:00
|
|
|
|
loopCount++
|
|
|
|
|
|
if loopCount%10 == 1 {
|
|
|
|
|
|
glog.Debugf(ctx, "[DEBUG] 第 %d 次循环,准备读取消息...", loopCount)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-12-05 11:44:07 +08:00
|
|
|
|
// 从 Redis Stream 中读取消息
|
|
|
|
|
|
messages, err := q.fetchMessages(ctx)
|
2025-12-04 17:39:31 +08:00
|
|
|
|
if err != nil {
|
2025-12-05 11:44:07 +08:00
|
|
|
|
glog.Errorf(ctx, "从 Stream 读取消息失败: %v", err)
|
2025-12-04 17:39:31 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-12-05 11:44:07 +08:00
|
|
|
|
// 没有新消息,继续等待
|
|
|
|
|
|
if len(messages) == 0 {
|
2025-12-06 18:04:29 +08:00
|
|
|
|
if loopCount%10 == 1 {
|
|
|
|
|
|
glog.Debugf(ctx, "[DEBUG] 第 %d 次循环,无新消息", loopCount)
|
|
|
|
|
|
}
|
2025-12-04 17:39:31 +08:00
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-12-06 18:04:29 +08:00
|
|
|
|
glog.Infof(ctx, "[DEBUG] 收到 %d 条消息", len(messages))
|
|
|
|
|
|
|
2025-12-05 11:44:07 +08:00
|
|
|
|
// 处理每条消息
|
|
|
|
|
|
for _, msg := range messages {
|
2025-12-06 18:04:29 +08:00
|
|
|
|
glog.Infof(ctx, "[DEBUG] 处理消息 ID: %s, Values: %+v", msg.ID, msg.Values)
|
2025-12-05 11:44:07 +08:00
|
|
|
|
// 提交到协程池处理
|
|
|
|
|
|
if err := q.submitTask(ctx, msg); err != nil {
|
|
|
|
|
|
glog.Errorf(ctx, "提交任务到协程池失败: %v, 消息ID: %s", err, msg.ID)
|
|
|
|
|
|
}
|
2025-12-04 17:39:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Stop 停止队列处理器
|
|
|
|
|
|
func (q *QueueProcessor) Stop() {
|
|
|
|
|
|
close(q.stopChan)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-12-05 11:44:07 +08:00
|
|
|
|
// fetchMessages 从 Redis Stream 中读取消息
|
|
|
|
|
|
func (q *QueueProcessor) fetchMessages(ctx context.Context) ([]redis.StreamMessage, error) {
|
|
|
|
|
|
// 从消费者组读取消息
|
|
|
|
|
|
return redis.ReadFromStream(ctx, q.streamKey, q.groupName, q.consumerName, q.batchSize, q.timeout)
|
2025-12-04 17:39:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// submitTask 将消息处理任务提交到协程池
|
2025-12-05 11:44:07 +08:00
|
|
|
|
func (q *QueueProcessor) submitTask(ctx context.Context, message redis.StreamMessage) error {
|
2025-12-04 17:39:31 +08:00
|
|
|
|
return q.pool.Submit(ctx, func(ctx context.Context) {
|
2025-12-05 11:44:07 +08:00
|
|
|
|
// 处理消息
|
|
|
|
|
|
if err := q.handleFunc(ctx, message.Values); err != nil {
|
|
|
|
|
|
glog.Errorf(ctx, "处理消息失败: %v, 消息ID: %s", err, message.ID)
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 处理成功后确认消息
|
|
|
|
|
|
if err := redis.AckMessage(ctx, q.streamKey, q.groupName, message.ID); err != nil {
|
|
|
|
|
|
glog.Errorf(ctx, "确认消息失败: %v, 消息ID: %s", err, message.ID)
|
|
|
|
|
|
} else {
|
|
|
|
|
|
glog.Debugf(ctx, "消息处理完成并已确认: %s", message.ID)
|
2025-12-04 17:39:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
})
|
|
|
|
|
|
}
|