Files
customer-server/consumer/ragflow_processor.go
2026-03-14 10:02:49 +08:00

592 lines
21 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package consumer
import (
"context"
"strings"
"gitea.com/red-future/common/db/mongo"
"gitea.com/red-future/common/rabbitmq"
"gitea.com/red-future/common/ragflow"
"gitea.com/red-future/common/redis"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gfile"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
)
// processorConfig 处理器配置(避免重复读取配置)
type processorConfig struct {
chatId string // RAGFlow Chat ID默认
responseExchange string // 响应队列 Exchange
responseRoutingKey string // 响应队列 RoutingKey
followUpExchange string // 追问队列 Exchange
followUpRoutingKey string // 追问队列 RoutingKey
archiveExchange string // 归档队列 Exchange
archiveRoutingKey string // 归档队列 RoutingKey
// Mock模式已删除不再用于压测
}
var cfg *processorConfig
// responsePublisher 响应消息发布器(单例)
var responsePublisher *rabbitmq.Publisher
// followUpPublisher 追问消息发布器(单例)
var followUpPublisher *rabbitmq.Publisher
// archivePublisher 归档消息发布器(单例)
var archivePublisher *rabbitmq.Publisher
// Processor 消息处理器
type Processor struct {
processor *ragflow.QueueProcessor
}
// NewProcessor 创建消息处理器
func NewProcessor(ctx context.Context) (*Processor, error) {
// 从配置读取参数
streamKey := g.Cfg().MustGet(ctx, "stream.streamKey").String()
groupName := g.Cfg().MustGet(ctx, "stream.groupName").String()
consumerName := g.Cfg().MustGet(ctx, "stream.consumerName").String()
batchSize := g.Cfg().MustGet(ctx, "stream.batchSize", 200).Int64()
blockTimeout := g.Cfg().MustGet(ctx, "stream.blockTimeout", 2000).Int64()
// 初始化处理器配置(单例)
cfg = &processorConfig{
chatId: g.Cfg().MustGet(ctx, "ragflow.chat_id").String(),
responseExchange: g.Cfg().MustGet(ctx, "rabbitmq.responseExchange", "ragflow.response").String(),
responseRoutingKey: g.Cfg().MustGet(ctx, "rabbitmq.responseRoutingKey", "response").String(),
followUpExchange: g.Cfg().MustGet(ctx, "followUp.exchange", "followup.delayed").String(),
followUpRoutingKey: g.Cfg().MustGet(ctx, "followUp.routingKey", "followup").String(),
archiveExchange: g.Cfg().MustGet(ctx, "archive.exchange", "archive.delayed").String(),
archiveRoutingKey: g.Cfg().MustGet(ctx, "archive.routingKey", "archive").String(),
}
// 咨询方向配置已从Consul自动加载common/config包init时自动执行
// 初始化响应发布器
responsePublisher = rabbitmq.NewPublisher(cfg.responseExchange, cfg.responseRoutingKey)
glog.Infof(ctx, "响应发布器已初始化 - Exchange: %s, RoutingKey: %s", cfg.responseExchange, cfg.responseRoutingKey)
// 初始化追问发布器
followUpPublisher = rabbitmq.NewPublisher(cfg.followUpExchange, cfg.followUpRoutingKey)
glog.Info(ctx, "追问发布器已初始化")
// 初始化归档发布器
archivePublisher = rabbitmq.NewPublisher(cfg.archiveExchange, cfg.archiveRoutingKey)
glog.Info(ctx, "归档发布器已初始化")
// 创建消息处理器(批量读取 + 并发发送,削峰填谷)
return &Processor{
processor: ragflow.NewQueueProcessor(
streamKey,
groupName,
consumerName,
blockTimeout,
batchSize,
handleMessage,
),
}, nil
}
// Start 启动消息处理
func (p *Processor) Start(ctx context.Context) error {
glog.Info(ctx, "开始消费消息...")
return p.processor.Start(ctx)
}
// Stop 停止消息处理
func (p *Processor) Stop() {
p.processor.Stop()
}
// getChatIdByAccountName 根据客服账号名称从ragflow_config表查询chat_id
func getChatIdByAccountName(ctx context.Context, tenantId, accountName string) string {
if accountName == "" {
return ""
}
db := mongo.GetDB()
if db == nil {
glog.Error(ctx, "MongoDB未初始化")
return ""
}
collection := db.Collection("ragflow_config")
// 从MongoDB查询ragflow_config先尝试字符串tenantId
filter := map[string]interface{}{
"accountName": accountName,
"isDeleted": false,
}
if tenantId != "" {
filter["tenantId"] = tenantId
}
var config struct {
ChatId string `json:"chatId" bson:"chatId"`
}
err := collection.FindOne(ctx, filter).Decode(&config)
// 如果未找到且tenantId可以转为数字尝试用数字查询兼容MongoDB中存储为int的情况
if err != nil && tenantId != "" {
tenantIdInt := gconv.Int(tenantId)
if tenantIdInt > 0 {
filter["tenantId"] = tenantIdInt
err = collection.FindOne(ctx, filter).Decode(&config)
}
}
if err != nil {
glog.Warningf(ctx, "未找到客服账号对应的RAGFlow配置 - 账号: %s, tenantId: %s, err: %v", accountName, tenantId, err)
return ""
}
glog.Infof(ctx, "使用客服账号对应的chat_id - 账号: %s, chat_id: %s", accountName, config.ChatId)
return config.ChatId
}
// getChatIdByDirection 根据用户选择的咨询方向获取对应的chat_id从Consul读取
func getChatIdByDirection(ctx context.Context, userId, platform string) string {
// 从Redis获取用户状态
userState, err := redis.GetUserState(ctx, userId, platform)
if err != nil || userState.Direction == "" {
// 无方向或获取失败返回默认chat_id
if cfg != nil {
return cfg.chatId
}
return ""
}
// 直接使用accountName查询
chatId := ""
if chatId != "" {
glog.Infof(ctx, "使用咨询方向对应的chat_id - 用户: %s, 方向: %s, chat_id: %s", userId, userState.Direction, chatId)
return chatId
}
// 未找到匹配方向返回默认chat_id
glog.Warningf(ctx, "未找到方向对应的chat_id使用默认 - 用户: %s, 方向: %s", userId, userState.Direction)
if cfg != nil {
return cfg.chatId
}
return ""
}
// HandleMessageHTTP 处理HTTP请求的消息导出供controller调用
func HandleMessageHTTP(ctx context.Context, message map[string]interface{}) error {
return handleMessage(ctx, message)
}
// handleMessage 处理单条消息
func handleMessage(ctx context.Context, message map[string]interface{}) (err error) {
// gconv.Map转换结构体时使用驼峰字段名而非json标签
userId := gconv.String(message["UserId"])
content := gconv.String(message["Content"])
messageId := gconv.String(message["MessageId"])
platform := gconv.String(message["Platform"])
tenantId := gconv.String(message["TenantId"])
accountId := gconv.String(message["AccountId"]) // 客服账号ID
accountName := gconv.String(message["AccountName"]) // 客服账号名称如cs_xhs_qixue
messageChatId := gconv.String(message["ChatId"]) // 消息中携带的chat_id从ragflow_config查询
if platform == "" {
platform = "xiaohongshu" // 默认平台
}
// 解析历史对话(由 customerservice 从 MongoDB 读取后携带)
var history []redis.HistoryMessage
if historyData := message["History"]; historyData != nil {
_ = gjson.New(historyData).Scan(&history)
}
glog.Infof(ctx, "处理消息 - 用户: %s, 客服账号: %s, ChatId: %s, 内容: %s, 历史轮数: %d",
userId, accountName, messageChatId, content, len(history))
var answer string
var sessionId string
startTime := gtime.Now()
// 调用RAGFlow处理消息
{
// 1. 获取chat_id优先级消息携带 > 客服账号查询 > 用户方向查询 > 默认)
var chatId string
if messageChatId != "" {
chatId = messageChatId
glog.Infof(ctx, "使用消息携带的chat_id: %s", chatId)
} else if accountName != "" {
chatId = getChatIdByAccountName(ctx, tenantId, accountName)
}
if chatId == "" {
chatId = getChatIdByDirection(ctx, userId, platform)
}
// 2. 检测chatId是否变更不再需要因为缓存key已包含chatId
// 不同chatId自动使用不同的session缓存 }
// 3. 获取或创建 Session使用正确的chat_id包含租户隔离
var isNewSession bool
sessionId, isNewSession, err = getOrCreateSession(ctx, tenantId, userId, chatId)
if err != nil {
// 检测Chat权限错误assistant/chat不属于当前API Key
if strings.Contains(err.Error(), "do not own the assistant") || strings.Contains(err.Error(), "don't own the chat") || strings.Contains(err.Error(), "doesn't exist") || strings.Contains(err.Error(), "not found") {
glog.Warningf(ctx, "创建Session时检测到Chat权限错误尝试自动重建 - 用户: %s, accountName: %s, chat_id: %s", userId, accountName, chatId)
// 调用重建逻辑
if accountName != "" {
if recreateErr := recreateChatIfNeeded(ctx, tenantId, accountName, platform); recreateErr != nil {
glog.Errorf(ctx, "自动重建Chat失败: %v", recreateErr)
return
}
// 重新查询chat_id
newChatId := getChatIdByAccountName(ctx, tenantId, accountName)
if newChatId == "" {
glog.Errorf(ctx, "重建Chat后仍无法获取chat_id")
return
}
chatId = newChatId
glog.Infof(ctx, "Chat重建成功新chat_id: %s", chatId)
// 清理旧session缓存
redis.DelSessionCache(ctx, tenantId, userId)
// 使用新chat_id重新创建session
sessionId, isNewSession, err = getOrCreateSession(ctx, tenantId, userId, chatId)
if err != nil {
glog.Errorf(ctx, "使用新Chat创建Session失败: %v", err)
return
}
} else {
glog.Errorf(ctx, "Chat权限错误但缺少accountName无法自动重建")
return
}
} else {
glog.Errorf(ctx, "获取 Session 失败: %v", err)
return
}
}
// 3. 调用 RAGFlow API
client := ragflow.GetGlobalClient()
if client == nil {
glog.Error(ctx, "RAGFlow 客户端未初始化")
return
}
// 如果是新 Session 且有历史对话,把历史拼接到问题中调用 RAGFlow
if isNewSession && len(history) > 0 {
var newSessionId string
newSessionId, answer, err = callWithHistory(ctx, client, userId, content, history, chatId)
if err != nil {
glog.Errorf(ctx, "带历史调用 RAGFlow 失败: %v", err)
return
}
// 更新 Session 缓存(包含租户隔离)
if newSessionId != "" {
redis.SetSessionCache(ctx, tenantId, userId, newSessionId)
sessionId = newSessionId
}
} else {
// 已有 Session 或无历史,使用普通 APIRAGFlow 内部维护上下文)
var res *ragflow.ChatCompletionRes
res, err = client.ChatCompletion(ctx, chatId, &ragflow.ChatCompletionReq{
Question: content,
SessionId: sessionId,
UserId: userId,
Stream: false,
})
if err != nil {
// 检测Chat不存在或权限错误RAGFlow中Chat被删除或不属于当前API Key
if strings.Contains(err.Error(), "doesn't exist") || strings.Contains(err.Error(), "not found") || strings.Contains(err.Error(), "404") || strings.Contains(err.Error(), "don't own the chat") {
glog.Warningf(ctx, "检测到Chat不存在尝试自动重建 - 用户: %s, accountName: %s, chat_id: %s", userId, accountName, chatId)
// 调用重建逻辑需要accountName
if accountName != "" {
if recreateErr := recreateChatIfNeeded(ctx, tenantId, accountName, platform); recreateErr != nil {
glog.Errorf(ctx, "自动重建Chat失败: %v", recreateErr)
return
}
// 重新查询chat_id
newChatId := getChatIdByAccountName(ctx, tenantId, accountName)
if newChatId == "" {
glog.Errorf(ctx, "重建Chat后仍无法获取chat_id")
return
}
chatId = newChatId
glog.Infof(ctx, "Chat重建成功新chat_id: %s", chatId)
// 清理旧session缓存
redis.DelSessionCache(ctx, tenantId, userId)
// 重新创建session并调用
sessionId, _, err = getOrCreateSession(ctx, tenantId, userId, chatId)
if err != nil {
glog.Errorf(ctx, "创建新Session失败: %v", err)
return
}
// 重试调用
res, err = client.ChatCompletion(ctx, chatId, &ragflow.ChatCompletionReq{
Question: content,
SessionId: sessionId,
UserId: userId,
Stream: false,
})
if err != nil {
glog.Errorf(ctx, "使用新Chat重试失败: %v", err)
return
}
} else {
glog.Errorf(ctx, "Chat不存在但缺少accountName无法自动重建")
return
}
// 检测session ownership错误session不属于当前chat_id
} else if strings.Contains(err.Error(), "don't own the session") {
glog.Warningf(ctx, "Session不属于当前chat_id清理缓存并重新创建 - 用户: %s, 旧session: %s, chat_id: %s", userId, sessionId, chatId)
// 清理缓存(包含租户隔离)
redis.DelSessionCache(ctx, tenantId, userId)
// 重新创建session并调用
sessionId, _, err = getOrCreateSession(ctx, tenantId, userId, chatId)
if err != nil {
glog.Errorf(ctx, "重新创建Session失败: %v", err)
return
}
// 重试调用
res, err = client.ChatCompletion(ctx, chatId, &ragflow.ChatCompletionReq{
Question: content,
SessionId: sessionId,
UserId: userId,
Stream: false,
})
if err != nil {
glog.Errorf(ctx, "重试调用RAGFlow失败: %v", err)
return
}
} else {
glog.Errorf(ctx, "调用 RAGFlow 失败: %v", err)
return
}
}
answer = res.Data.Answer
// 更新 Session 缓存(如果 RAGFlow 返回了新的 session_id包含租户隔离
if res.Data.SessionId != "" && res.Data.SessionId != sessionId {
redis.SetSessionCache(ctx, tenantId, userId, res.Data.SessionId)
sessionId = res.Data.SessionId
}
}
}
// 计算耗时并写入文件
endTime := gtime.Now()
elapsed := endTime.Sub(startTime)
// 截取回复前20字
answerPreview := answer
if len([]rune(answerPreview)) > 20 {
answerPreview = string([]rune(answerPreview)[:20]) + "..."
}
glog.Infof(ctx, "回复 - 用户: %s, 耗时: %s, 回复: %s", userId, elapsed, answerPreview)
// 日志格式:发送时间 | 响应时间 | 耗时 | 用户 | 问题 | 回复前20字
// 写入时间日志,失败时记录警告但不影响主流程
logContent := startTime.Format("Y-m-d H:i:s") + "\t" + endTime.Format("Y-m-d H:i:s") + "\t" + elapsed.String() + "\t" + userId + "\t" + content + "\t" + answerPreview + "\n"
if err := gfile.PutContentsAppend("timelog/ragflow_time.log", logContent); err != nil {
glog.Warningf(ctx, "写入时间日志失败: %v", err)
}
// 3. 更新用户最后活跃时间
redis.SetSessionLastActive(ctx, userId)
// 6. 原样写入 RabbitMQ 结果队列(透传 TenantId、AccountId、AccountName
responseMsg := &redis.ResponseStreamMessage{
UserId: userId,
Platform: platform,
TenantId: tenantId,
AccountId: accountId,
AccountName: accountName,
Question: content,
Content: answer,
SessionId: sessionId,
Timestamp: gtime.Now().Timestamp(),
MessageId: messageId,
}
// 读取请求中的ReplyQueue字段支持多实例独立队列
replyQueue := gconv.String(message["reply_queue"])
if replyQueue != "" && replyQueue != cfg.responseRoutingKey {
// 使用自定义响应队列(多实例场景)
// routing key使用队列名实现精确路由避免广播到所有实例
glog.Infof(ctx, "使用自定义响应队列: %s - 用户: %s", replyQueue, userId)
customPublisher := rabbitmq.NewPublisher(cfg.responseExchange, replyQueue)
if err = customPublisher.PublishWithRoutingKey(ctx, replyQueue, responseMsg); err != nil {
glog.Errorf(ctx, "写入自定义响应队列失败: %v", err)
return
}
glog.Infof(ctx, "响应已写入 RabbitMQ - 用户: %s, routingKey: %s (队列名)", userId, replyQueue)
} else {
// 使用默认响应队列(单实例或旧版本兼容)
// routing key使用tenantId.userId
routingKey := tenantId + "." + userId
if err = responsePublisher.PublishWithRoutingKey(ctx, routingKey, responseMsg); err != nil {
glog.Errorf(ctx, "写入 RabbitMQ 结果队列失败: %v", err)
return
}
glog.Infof(ctx, "响应已写入 RabbitMQ - 用户: %s, routingKey: %s (租户.用户)", userId, routingKey)
}
// 7. 发送追问消息到延时队列
sendFollowUpMessages(ctx, userId, platform)
// 8. 发送归档消息到延时队列60分钟后
sendArchiveMessage(ctx, userId, platform, sessionId, tenantId)
glog.Infof(ctx, "消息处理完成 - 用户: %s", userId)
return
}
// sendArchiveMessage 发送归档消息到延时队列
func sendArchiveMessage(ctx context.Context, userId, platform, sessionId, tenantId string) {
msg := &redis.ArchiveMessage{
UserId: userId,
Platform: platform,
SessionId: sessionId,
TenantId: tenantId,
Timestamp: gtime.Now().Timestamp(),
}
if err := archivePublisher.PublishDelayed(ctx, msg, redis.GetArchiveDelay()); err != nil {
glog.Errorf(ctx, "发送归档消息失败: %v", err)
}
}
// callWithHistory 带历史上下文调用 RAGFlow用于新 Session
// history 由 customerservice 从 MongoDB 读取后通过消息携带
// 使用 RAGFlow 原生 API把历史对话拼接到问题中保留提示词和知识库功能
// chatId 为用户选择方向对应的chat_id而非默认chat_id
func callWithHistory(ctx context.Context, client *ragflow.Client, userId, content string, history []redis.HistoryMessage, chatId string) (sessionId, answer string, err error) {
// 构建带历史上下文的问题
var question string
if len(history) > 0 {
// 限制历史对话长度最多3轮或总字符数不超过8000避免超过RAGFlow输入限制
const maxHistoryRounds = 3
const maxHistoryChars = 8000
var builder strings.Builder
builder.WriteString("[以下是之前的对话历史,请参考]\n")
// 限制历史轮数
historyToUse := history
if len(history) > maxHistoryRounds {
historyToUse = history[len(history)-maxHistoryRounds:] // 只取最近3轮
glog.Infof(ctx, "历史对话超过%d轮截取最近%d轮 - 用户: %s", len(history), maxHistoryRounds, userId)
}
// 拼接历史对话,同时检查字符数限制
for _, h := range historyToUse {
builder.WriteString("用户: ")
builder.WriteString(h.Question)
builder.WriteString("\nAI: ")
builder.WriteString(h.Answer)
builder.WriteString("\n")
// 检查是否超过字符数限制
if builder.Len() > maxHistoryChars {
glog.Warningf(ctx, "历史对话超过%d字符停止追加 - 用户: %s", maxHistoryChars, userId)
break
}
}
builder.WriteString("\n[当前问题]\n")
builder.WriteString(content)
question = builder.String()
} else {
question = content
}
glog.Infof(ctx, "注入 %d 轮历史对话上下文 - chat_id: %s", len(history), chatId)
// 先创建新 session使用方向对应的chat_id
session, err := client.CreateSession(ctx, chatId, &ragflow.CreateSessionReq{
Name: "session_" + userId,
UserId: userId,
})
if err != nil {
glog.Errorf(ctx, "创建 Session 失败: %v", err)
return
}
sessionId = session.Id
// 使用 RAGFlow 原生 API保留提示词和知识库使用方向对应的chat_id
res, err := client.ChatCompletion(ctx, chatId, &ragflow.ChatCompletionReq{
Question: question,
SessionId: sessionId,
UserId: userId,
Stream: false,
})
if err != nil {
return
}
answer = res.Data.Answer
return
}
// sendFollowUpMessages 发送追问消息到延时队列
func sendFollowUpMessages(ctx context.Context, userId, platform string) {
now := gtime.Now().Timestamp()
for followUpType := redis.FollowUpType1; followUpType <= redis.FollowUpType3; followUpType++ {
msg := &redis.FollowUpMessage{
UserId: userId,
Platform: platform,
Content: redis.GetFollowUpContent(followUpType),
FollowUpType: followUpType,
Timestamp: now,
}
if err := followUpPublisher.PublishDelayed(ctx, msg, redis.GetFollowUpDelay(followUpType)); err != nil {
glog.Errorf(ctx, "发送追问消息失败 - 类型: %d, 错误: %v", followUpType, err)
}
}
}
// getOrCreateSession 获取或创建 RAGFlow Session支持租户隔离
// 返回 isNew=true 表示是新创建的 session需要注入历史上下文
func getOrCreateSession(ctx context.Context, tenantId, userId, chatId string) (sessionId string, isNew bool, err error) {
// 先从缓存获取(包含租户隔离)
if sessionId, err = redis.GetSessionCache(ctx, tenantId, userId); err != nil {
return
}
if sessionId != "" {
glog.Infof(ctx, "使用缓存的session - 租户: %s, 用户: %s, chat_id: %s, session: %s", tenantId, userId, chatId, sessionId)
return // 已有 session不是新的
}
// 缓存不存在,创建新 Session使用传入的chat_id
client := ragflow.GetGlobalClient()
if client == nil {
return
}
glog.Infof(ctx, "创建新session - 租户: %s, 用户: %s, chat_id: %s", tenantId, userId, chatId)
session, err := client.CreateSession(ctx, chatId, &ragflow.CreateSessionReq{
Name: "session_" + tenantId + "_" + userId,
UserId: userId,
})
if err != nil {
return
}
sessionId = session.Id
isNew = true // 标记为新创建
// 缓存 Session ID包含租户隔离
redis.SetSessionCache(ctx, tenantId, userId, sessionId)
glog.Infof(ctx, "新session已创建并缓存 - 租户: %s, 用户: %s, session: %s", tenantId, userId, sessionId)
return
}