Files
common/ragflow/worker_pool.go

155 lines
5.1 KiB
Go
Raw Normal View History

2025-12-04 17:39:31 +08:00
package ragflow
import (
"context"
"runtime/debug"
"strings"
2026-01-21 10:20:32 +08:00
"sync"
"time"
2025-12-04 17:39:31 +08:00
2026-02-24 15:42:36 +08:00
"gitea.com/red-future/common/redis"
2025-12-04 17:39:31 +08:00
"github.com/gogf/gf/v2/os/glog"
2025-12-26 18:11:00 +08:00
"github.com/gogf/gf/v2/os/grpool"
2025-12-04 17:39:31 +08:00
)
// 默认批量大小(每次从 Redis 读取并发送的消息数)
const defaultBatchSize = 200
2025-12-09 09:20:44 +08:00
// QueueProcessor Stream 处理器,批量读取消息并发送到 RAGFlow
2025-12-04 17:39:31 +08:00
type QueueProcessor struct {
2026-01-21 10:20:32 +08:00
streamKey string // Stream 键名
groupName string // 消费者组名称
consumerName string // 消费者名称
timeout int64 // 阻塞超时时间(毫秒)
batchSize int64 // 最大并发数(协程池大小)
stopChan chan struct{} // 停止信号
pool *grpool.Pool // GoFrame协程池
handleFunc func(ctx context.Context, message map[string]interface{}) error
processingMsgs sync.Map // 正在处理的消息ID去重用
2025-12-04 17:39:31 +08:00
}
2025-12-05 11:44:07 +08:00
// NewQueueProcessor 创建 Stream 处理器
func NewQueueProcessor(streamKey, groupName, consumerName string, timeout, batchSize int64, handleFunc func(ctx context.Context, message map[string]interface{}) error) *QueueProcessor {
2025-12-26 18:11:00 +08:00
// 创建协程池固定大小避免频繁创建销毁goroutine
pool := grpool.New(int(batchSize))
2025-12-04 17:39:31 +08:00
return &QueueProcessor{
2025-12-05 11:44:07 +08:00
streamKey: streamKey,
groupName: groupName,
consumerName: consumerName,
timeout: timeout,
batchSize: batchSize,
stopChan: make(chan struct{}),
2025-12-26 18:11:00 +08:00
pool: pool, // 使用GoFrame协程池
2025-12-05 11:44:07 +08:00
handleFunc: handleFunc,
2025-12-04 17:39:31 +08:00
}
}
2025-12-05 11:44:07 +08:00
// Start 启动 Stream 处理器
// 削峰填谷:每次读取 batchSize 条消息,并发发送,发完立刻读下一批
2025-12-04 17:39:31 +08:00
func (q *QueueProcessor) Start(ctx context.Context) error {
glog.Infof(ctx, "Stream 处理器启动 - Stream: %s, 消费者组: %s, 消费者: %s, 批量大小: %d",
q.streamKey, q.groupName, q.consumerName, q.batchSize)
// 确保 Consumer Group 存在(重试直到成功)
for {
if err := redis.CreateConsumerGroup(ctx, q.streamKey, q.groupName); err != nil {
// BUSYGROUP 表示已存在,不是错误
if strings.Contains(err.Error(), "BUSYGROUP") {
glog.Debugf(ctx, "Consumer Group 已存在")
break
}
glog.Warningf(ctx, "创建 Consumer Group 失败: %v1秒后重试", err)
time.Sleep(time.Second)
continue
}
glog.Infof(ctx, "Consumer Group 创建成功")
break
}
2025-12-04 17:39:31 +08:00
for {
select {
case <-q.stopChan:
2025-12-05 11:44:07 +08:00
glog.Info(ctx, "Stream 处理器收到停止信号")
2025-12-04 17:39:31 +08:00
return nil
default:
// 1. 从 Redis Stream 读取一批消息
messages, err := redis.ReadFromStream(ctx, q.streamKey, q.groupName, q.consumerName, q.batchSize, q.timeout)
2025-12-04 17:39:31 +08:00
if err != nil {
2025-12-05 11:44:07 +08:00
glog.Errorf(ctx, "从 Stream 读取消息失败: %v", err)
2025-12-04 17:39:31 +08:00
continue
}
2025-12-05 11:44:07 +08:00
if len(messages) == 0 {
2025-12-04 17:39:31 +08:00
continue
}
glog.Infof(ctx, "✅ 从Stream读取到 %d 条消息,开始处理", len(messages))
2026-01-21 10:20:32 +08:00
// 2. 去重+立即ACK对话场景优先实时性失败不重试
for i, msg := range messages {
2025-12-26 18:11:00 +08:00
m := msg // 捕获循环变量
msgIndex := i + 1
2026-01-21 10:20:32 +08:00
// 去重:如果消息正在处理,跳过
if _, exists := q.processingMsgs.LoadOrStore(m.ID, true); exists {
glog.Debugf(ctx, "⏭️ 跳过正在处理的消息 - ID: %s", m.ID)
continue
}
// 立即ACK对话场景不需要重试避免重复消费
if err := redis.AckMessage(ctx, q.streamKey, q.groupName, m.ID); err != nil {
glog.Errorf(ctx, "确认消息失败: %v, 消息ID: %s", err, m.ID)
}
glog.Infof(ctx, "📨 准备处理第 %d/%d 条消息 - ID: %s", msgIndex, len(messages), m.ID)
2025-12-26 18:11:00 +08:00
// 提交到协程池池满时会阻塞等待空闲worker
q.pool.Add(ctx, func(ctx context.Context) {
2026-01-21 10:20:32 +08:00
defer q.processingMsgs.Delete(m.ID) // 处理完成后移除标记
q.processMessage(ctx, m)
2025-12-26 18:11:00 +08:00
})
2025-12-04 17:39:31 +08:00
}
2025-12-26 18:11:00 +08:00
// 3. 立刻读下一批(不等待,协程池自动控制并发数)
2025-12-04 17:39:31 +08:00
}
}
}
// processMessage 处理单条消息(异步执行)
func (q *QueueProcessor) processMessage(ctx context.Context, message redis.StreamMessage) {
// 捕获panic防止协程崩溃
defer func() {
if r := recover(); r != nil {
glog.Errorf(ctx, "❌ PANIC: 消息处理发生panic - 消息ID: %s, panic内容: %v\n堆栈:\n%s",
message.ID, r, debug.Stack())
}
}()
glog.Infof(ctx, "🔄 开始处理消息 - ID: %s", message.ID)
// 打印实际字段名(调试用)
var fieldNames []string
for key := range message.Values {
fieldNames = append(fieldNames, key)
}
glog.Infof(ctx, "📋 消息字段名列表: %v", fieldNames)
glog.Infof(ctx, "📦 消息完整内容: %+v", message.Values)
// 调用处理函数发送到 RAGFlow
if err := q.handleFunc(ctx, message.Values); err != nil {
glog.Errorf(ctx, "❌ 消息处理失败: %v, 消息ID: %s", err, message.ID)
} else {
glog.Infof(ctx, "✅ 消息处理成功 - ID: %s", message.ID)
}
2025-12-04 17:39:31 +08:00
2026-01-21 10:20:32 +08:00
// ACK已在读取后立即执行此处无需重复ACK
// 对话场景:失败直接丢弃,不重试(实时性优先)
2025-12-04 17:39:31 +08:00
}
// Stop 停止队列处理器
func (q *QueueProcessor) Stop() {
close(q.stopChan)
2025-12-26 18:11:00 +08:00
// 关闭协程池,等待所有任务完成
q.pool.Close()
2025-12-04 17:39:31 +08:00
}