package ragflow import ( "context" "runtime/debug" "strings" "sync" "time" "gitea.com/red-future/common/redis" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/grpool" ) // 默认批量大小(每次从 Redis 读取并发送的消息数) const defaultBatchSize = 200 // QueueProcessor Stream 处理器,批量读取消息并发送到 RAGFlow type QueueProcessor struct { streamKey string // Stream 键名 groupName string // 消费者组名称 consumerName string // 消费者名称 timeout int64 // 阻塞超时时间(毫秒) batchSize int64 // 最大并发数(协程池大小) stopChan chan struct{} // 停止信号 pool *grpool.Pool // GoFrame协程池 handleFunc func(ctx context.Context, message map[string]interface{}) error processingMsgs sync.Map // 正在处理的消息ID(去重用) } // NewQueueProcessor 创建 Stream 处理器 func NewQueueProcessor(streamKey, groupName, consumerName string, timeout, batchSize int64, handleFunc func(ctx context.Context, message map[string]interface{}) error) *QueueProcessor { // 创建协程池:固定大小,避免频繁创建销毁goroutine pool := grpool.New(int(batchSize)) return &QueueProcessor{ streamKey: streamKey, groupName: groupName, consumerName: consumerName, timeout: timeout, batchSize: batchSize, stopChan: make(chan struct{}), pool: pool, // 使用GoFrame协程池 handleFunc: handleFunc, } } // Start 启动 Stream 处理器 // 削峰填谷:每次读取 batchSize 条消息,并发发送,发完立刻读下一批 func (q *QueueProcessor) Start(ctx context.Context) error { glog.Infof(ctx, "Stream 处理器启动 - Stream: %s, 消费者组: %s, 消费者: %s, 批量大小: %d", q.streamKey, q.groupName, q.consumerName, q.batchSize) // 确保 Consumer Group 存在(重试直到成功) for { if err := redis.CreateConsumerGroup(ctx, q.streamKey, q.groupName); err != nil { // BUSYGROUP 表示已存在,不是错误 if strings.Contains(err.Error(), "BUSYGROUP") { glog.Debugf(ctx, "Consumer Group 已存在") break } glog.Warningf(ctx, "创建 Consumer Group 失败: %v,1秒后重试", err) time.Sleep(time.Second) continue } glog.Infof(ctx, "Consumer Group 创建成功") break } for { select { case <-q.stopChan: glog.Info(ctx, "Stream 处理器收到停止信号") return nil default: // 1. 从 Redis Stream 读取一批消息 messages, err := redis.ReadFromStream(ctx, q.streamKey, q.groupName, q.consumerName, q.batchSize, q.timeout) if err != nil { glog.Errorf(ctx, "从 Stream 读取消息失败: %v", err) continue } if len(messages) == 0 { continue } glog.Infof(ctx, "✅ 从Stream读取到 %d 条消息,开始处理", len(messages)) // 2. 去重+立即ACK:对话场景优先实时性,失败不重试 for i, msg := range messages { m := msg // 捕获循环变量 msgIndex := i + 1 // 去重:如果消息正在处理,跳过 if _, exists := q.processingMsgs.LoadOrStore(m.ID, true); exists { glog.Debugf(ctx, "⏭️ 跳过正在处理的消息 - ID: %s", m.ID) continue } // 立即ACK:对话场景不需要重试,避免重复消费 if err := redis.AckMessage(ctx, q.streamKey, q.groupName, m.ID); err != nil { glog.Errorf(ctx, "确认消息失败: %v, 消息ID: %s", err, m.ID) } glog.Infof(ctx, "📨 准备处理第 %d/%d 条消息 - ID: %s", msgIndex, len(messages), m.ID) // 提交到协程池,池满时会阻塞等待空闲worker q.pool.Add(ctx, func(ctx context.Context) { defer q.processingMsgs.Delete(m.ID) // 处理完成后移除标记 q.processMessage(ctx, m) }) } // 3. 立刻读下一批(不等待,协程池自动控制并发数) } } } // processMessage 处理单条消息(异步执行) func (q *QueueProcessor) processMessage(ctx context.Context, message redis.StreamMessage) { // 捕获panic,防止协程崩溃 defer func() { if r := recover(); r != nil { glog.Errorf(ctx, "❌ PANIC: 消息处理发生panic - 消息ID: %s, panic内容: %v\n堆栈:\n%s", message.ID, r, debug.Stack()) } }() glog.Infof(ctx, "🔄 开始处理消息 - ID: %s", message.ID) // 打印实际字段名(调试用) var fieldNames []string for key := range message.Values { fieldNames = append(fieldNames, key) } glog.Infof(ctx, "📋 消息字段名列表: %v", fieldNames) glog.Infof(ctx, "📦 消息完整内容: %+v", message.Values) // 调用处理函数发送到 RAGFlow if err := q.handleFunc(ctx, message.Values); err != nil { glog.Errorf(ctx, "❌ 消息处理失败: %v, 消息ID: %s", err, message.ID) } else { glog.Infof(ctx, "✅ 消息处理成功 - ID: %s", message.ID) } // ACK已在读取后立即执行,此处无需重复ACK // 对话场景:失败直接丢弃,不重试(实时性优先) } // Stop 停止队列处理器 func (q *QueueProcessor) Stop() { close(q.stopChan) // 关闭协程池,等待所有任务完成 q.pool.Close() }