Files
common/ragflow/worker_pool.go

113 lines
3.7 KiB
Go
Raw Normal View History

2025-12-04 17:39:31 +08:00
package ragflow
import (
"context"
"strings"
"time"
2025-12-04 17:39:31 +08:00
"gitee.com/red-future---jilin-g/common/redis"
"github.com/gogf/gf/v2/os/glog"
)
// 默认批量大小(每次从 Redis 读取并发送的消息数)
const defaultBatchSize = 200
2025-12-09 09:20:44 +08:00
// QueueProcessor Stream 处理器,批量读取消息并发送到 RAGFlow
2025-12-04 17:39:31 +08:00
type QueueProcessor struct {
streamKey string // Stream 键名
groupName string // 消费者组名称
consumerName string // 消费者名称
timeout int64 // 阻塞超时时间(毫秒)
batchSize int64 // 最大并发数(信号量容量)
stopChan chan struct{} // 停止信号
semaphore chan struct{} // 并发信号量(控制最大并发)
2025-12-05 11:44:07 +08:00
handleFunc func(ctx context.Context, message map[string]interface{}) error
2025-12-04 17:39:31 +08:00
}
2025-12-05 11:44:07 +08:00
// NewQueueProcessor 创建 Stream 处理器
func NewQueueProcessor(streamKey, groupName, consumerName string, timeout, batchSize int64, handleFunc func(ctx context.Context, message map[string]interface{}) error) *QueueProcessor {
2025-12-04 17:39:31 +08:00
return &QueueProcessor{
2025-12-05 11:44:07 +08:00
streamKey: streamKey,
groupName: groupName,
consumerName: consumerName,
timeout: timeout,
batchSize: batchSize,
stopChan: make(chan struct{}),
semaphore: make(chan struct{}, batchSize), // 信号量容量 = 最大并发数
2025-12-05 11:44:07 +08:00
handleFunc: handleFunc,
2025-12-04 17:39:31 +08:00
}
}
2025-12-05 11:44:07 +08:00
// Start 启动 Stream 处理器
// 削峰填谷:每次读取 batchSize 条消息,并发发送,发完立刻读下一批
2025-12-04 17:39:31 +08:00
func (q *QueueProcessor) Start(ctx context.Context) error {
glog.Infof(ctx, "Stream 处理器启动 - Stream: %s, 消费者组: %s, 消费者: %s, 批量大小: %d",
q.streamKey, q.groupName, q.consumerName, q.batchSize)
// 确保 Consumer Group 存在(重试直到成功)
for {
if err := redis.CreateConsumerGroup(ctx, q.streamKey, q.groupName); err != nil {
// BUSYGROUP 表示已存在,不是错误
if strings.Contains(err.Error(), "BUSYGROUP") {
glog.Debugf(ctx, "Consumer Group 已存在")
break
}
glog.Warningf(ctx, "创建 Consumer Group 失败: %v1秒后重试", err)
time.Sleep(time.Second)
continue
}
glog.Infof(ctx, "Consumer Group 创建成功")
break
}
2025-12-04 17:39:31 +08:00
for {
select {
case <-q.stopChan:
2025-12-05 11:44:07 +08:00
glog.Info(ctx, "Stream 处理器收到停止信号")
2025-12-04 17:39:31 +08:00
return nil
default:
// 1. 从 Redis Stream 读取一批消息
messages, err := redis.ReadFromStream(ctx, q.streamKey, q.groupName, q.consumerName, q.batchSize, q.timeout)
2025-12-04 17:39:31 +08:00
if err != nil {
2025-12-05 11:44:07 +08:00
glog.Errorf(ctx, "从 Stream 读取消息失败: %v", err)
2025-12-04 17:39:31 +08:00
continue
}
2025-12-05 11:44:07 +08:00
if len(messages) == 0 {
2025-12-04 17:39:31 +08:00
continue
}
glog.Debugf(ctx, "读取 %d 条消息,开始发送", len(messages))
// 2. 用信号量控制并发:获取信号量后发送,完成后释放
2025-12-05 11:44:07 +08:00
for _, msg := range messages {
// 获取信号量(阻塞直到有空位)
q.semaphore <- struct{}{}
go func(m redis.StreamMessage) {
defer func() { <-q.semaphore }() // 完成后释放信号量
q.processMessage(ctx, m)
}(msg)
2025-12-04 17:39:31 +08:00
}
// 3. 立刻读下一批(不等待,信号量自动控制并发数)
2025-12-04 17:39:31 +08:00
}
}
}
// processMessage 处理单条消息(异步执行)
func (q *QueueProcessor) processMessage(ctx context.Context, message redis.StreamMessage) {
// 调用处理函数发送到 RAGFlow
if err := q.handleFunc(ctx, message.Values); err != nil {
glog.Errorf(ctx, "消息处理失败: %v, 消息ID: %s", err, message.ID)
}
2025-12-04 17:39:31 +08:00
// 无论成功失败都 ACK避免重复消费
if err := redis.AckMessage(ctx, q.streamKey, q.groupName, message.ID); err != nil {
glog.Errorf(ctx, "确认消息失败: %v, 消息ID: %s", err, message.ID)
}
2025-12-04 17:39:31 +08:00
}
// Stop 停止队列处理器
func (q *QueueProcessor) Stop() {
close(q.stopChan)
2025-12-04 17:39:31 +08:00
}