354 lines
12 KiB
Go
354 lines
12 KiB
Go
|
|
// Package service - 对话服务
|
|||
|
|
// 功能:处理RAGFlow响应、批量落库、卡片触发逻辑
|
|||
|
|
package service
|
|||
|
|
|
|||
|
|
import (
|
|||
|
|
"context"
|
|||
|
|
"customer-server/dao"
|
|||
|
|
"customer-server/model/entity"
|
|||
|
|
|
|||
|
|
"gitea.com/red-future/common/jaeger"
|
|||
|
|
"gitea.com/red-future/common/rabbitmq"
|
|||
|
|
"gitea.com/red-future/common/redis"
|
|||
|
|
"github.com/gogf/gf/v2/encoding/gjson"
|
|||
|
|
"github.com/gogf/gf/v2/frame/g"
|
|||
|
|
"github.com/gogf/gf/v2/os/glog"
|
|||
|
|
"github.com/gogf/gf/v2/os/gtime"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// conversationService 对话服务(操作 conversation 表)
|
|||
|
|
type conversationService struct{}
|
|||
|
|
|
|||
|
|
// ConversationService 对话服务单例
|
|||
|
|
var ConversationService = new(conversationService)
|
|||
|
|
|
|||
|
|
// ============== RabbitMQ 消费者(RAGFlow 响应消息)==============
|
|||
|
|
|
|||
|
|
// ResponseConsumer RabbitMQ 响应消费者
|
|||
|
|
type ResponseConsumer struct {
|
|||
|
|
queueName string
|
|||
|
|
consumer *rabbitmq.Consumer
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// NewResponseConsumer 创建响应消费者
|
|||
|
|
// 参数: ctx - 上下文
|
|||
|
|
// 返回: ResponseConsumer - 响应消费者实例
|
|||
|
|
// 功能: 创建当前实例的唯一响应队列,支持多实例部署
|
|||
|
|
func NewResponseConsumer(ctx context.Context) *ResponseConsumer {
|
|||
|
|
// 从配置读取基础队列名(用于生成唯一实例队列名)
|
|||
|
|
baseQueue := GetConfigString(ctx, "rabbitmq.responseQueue")
|
|||
|
|
|
|||
|
|
// 生成当前实例的唯一队列名:ragflow.response.queue.{hostname}.{uuid8}
|
|||
|
|
// 支持多实例部署,每个实例有独立响应队列
|
|||
|
|
queueName := rabbitmq.GetInstanceQueueName(baseQueue)
|
|||
|
|
glog.Infof(ctx, "响应队列动态生成 - 实例队列: %s", queueName)
|
|||
|
|
|
|||
|
|
return &ResponseConsumer{
|
|||
|
|
queueName: queueName,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Start 启动消费者
|
|||
|
|
// 参数: ctx - 上下文
|
|||
|
|
// 返回: err - 错误信息
|
|||
|
|
// 功能: 声明并绑定当前实例的响应队列,开始消费RAGFlow响应消息
|
|||
|
|
func (c *ResponseConsumer) Start(ctx context.Context) (err error) {
|
|||
|
|
glog.Infof(ctx, "RabbitMQ 响应消费者启动 - Queue: %s", c.queueName)
|
|||
|
|
|
|||
|
|
// 声明当前实例的动态响应队列
|
|||
|
|
if err = rabbitmq.DeclareQueue(ctx, &rabbitmq.QueueConfig{
|
|||
|
|
Name: c.queueName,
|
|||
|
|
Durable: true,
|
|||
|
|
}); err != nil {
|
|||
|
|
glog.Errorf(ctx, "声明动态响应队列失败: %v", err)
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 绑定队列到 Exchange(使用队列名作为routing key,实现精确路由)
|
|||
|
|
// message发送消息时会使用队列名作为routing key
|
|||
|
|
if err = rabbitmq.BindQueue(ctx, &rabbitmq.BindingConfig{
|
|||
|
|
Queue: c.queueName,
|
|||
|
|
Exchange: "ragflow.response",
|
|||
|
|
RoutingKey: c.queueName, // 使用队列名,只接收发给自己的消息
|
|||
|
|
}); err != nil {
|
|||
|
|
glog.Errorf(ctx, "绑定动态响应队列失败: %v", err)
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
glog.Infof(ctx, "动态响应队列已绑定: %s -> ragflow.response (routingKey=#)", c.queueName)
|
|||
|
|
|
|||
|
|
c.consumer = rabbitmq.NewConsumer(c.queueName, handleResponse)
|
|||
|
|
return c.consumer.Start(ctx)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Stop 停止消费者
|
|||
|
|
// 参数: ctx - 上下文
|
|||
|
|
// 功能: 停止消费RAGFlow响应消息
|
|||
|
|
func (c *ResponseConsumer) Stop(ctx context.Context) {
|
|||
|
|
if c.consumer != nil {
|
|||
|
|
c.consumer.Stop(ctx)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ============== 卡片触发配置(待接入小红书卡片接口后修改)==============
|
|||
|
|
//
|
|||
|
|
// 【用户状态存储】
|
|||
|
|
// 使用 Redis Hash 存储用户会话状态(阶段+对话计数),统一5分钟TTL
|
|||
|
|
// Key: ragflow:user:state:{userId}_{platform}
|
|||
|
|
// Fields: stage(阶段)、count(对话计数)
|
|||
|
|
//
|
|||
|
|
// 【状态定义】
|
|||
|
|
// 状态0:走AI模型 | 状态1:打招呼 | 状态2:业务咨询 | 状态3:发卡片
|
|||
|
|
//
|
|||
|
|
// 【卡片触发逻辑】
|
|||
|
|
// 对话轮数>=配置值时,更新状态为3,并发送卡片消息
|
|||
|
|
// 配置项:config.yml中的card.triggerCount(默认5轮)
|
|||
|
|
//
|
|||
|
|
// 【待接入小红书卡片 API 后修改位置】
|
|||
|
|
// checkAndSendCard() 函数中的 cardMessage 变量
|
|||
|
|
|
|||
|
|
const (
|
|||
|
|
// ConversationFlushDelaySeconds 对话缓存延时落库时间(秒)
|
|||
|
|
ConversationFlushDelaySeconds = 600 // 10分钟
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// ============== 延时落库消费者 ==============
|
|||
|
|
|
|||
|
|
// DelayedFlushMessage 延时落库消息(按sessionId)
|
|||
|
|
type DelayedFlushMessage struct {
|
|||
|
|
SessionId string `json:"sessionId"`
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// DelayedFlushConsumer 延时落库消费者
|
|||
|
|
type DelayedFlushConsumer struct {
|
|||
|
|
queueName string
|
|||
|
|
consumer *rabbitmq.Consumer
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// NewDelayedFlushConsumer 创建延时落库消费者
|
|||
|
|
func NewDelayedFlushConsumer(ctx context.Context) *DelayedFlushConsumer {
|
|||
|
|
return &DelayedFlushConsumer{
|
|||
|
|
queueName: "conversation.flush.queue",
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Start 启动消费者
|
|||
|
|
func (c *DelayedFlushConsumer) Start(ctx context.Context) (err error) {
|
|||
|
|
glog.Infof(ctx, "延时落库消费者启动 - Queue: %s", c.queueName)
|
|||
|
|
c.consumer = rabbitmq.NewConsumer(c.queueName, handleDelayedFlush)
|
|||
|
|
return c.consumer.Start(ctx)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Stop 停止消费者
|
|||
|
|
func (c *DelayedFlushConsumer) Stop(ctx context.Context) {
|
|||
|
|
if c.consumer != nil {
|
|||
|
|
c.consumer.Stop(ctx)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// handleDelayedFlush 处理延时落库消息
|
|||
|
|
func handleDelayedFlush(ctx context.Context, body []byte) error {
|
|||
|
|
var msg DelayedFlushMessage
|
|||
|
|
if err := gjson.DecodeTo(body, &msg); err != nil {
|
|||
|
|
glog.Errorf(ctx, "解析延时落库消息失败: %v", err)
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
glog.Infof(ctx, "收到延时落库消息 - SessionId: %s", msg.SessionId)
|
|||
|
|
|
|||
|
|
// 检查是否有未落库的缓存
|
|||
|
|
count, err := redis.GetCachedConversationCount(ctx, msg.SessionId)
|
|||
|
|
if err != nil {
|
|||
|
|
glog.Errorf(ctx, "获取缓存数量失败: %v", err)
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if count == 0 {
|
|||
|
|
glog.Debugf(ctx, "无需落库(缓存为空或已落库)- SessionId: %s", msg.SessionId)
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 执行落库
|
|||
|
|
if err = flushConversationCache(ctx, msg.SessionId); err != nil {
|
|||
|
|
glog.Errorf(ctx, "延时落库失败: %v", err)
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
glog.Infof(ctx, "延时落库完成 - SessionId: %s", msg.SessionId)
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 延时落库发布器(单例)
|
|||
|
|
var delayedFlushPublisher *rabbitmq.Publisher
|
|||
|
|
|
|||
|
|
// getDelayedFlushPublisher 获取延时落库发布器
|
|||
|
|
func getDelayedFlushPublisher() *rabbitmq.Publisher {
|
|||
|
|
if delayedFlushPublisher == nil {
|
|||
|
|
delayedFlushPublisher = rabbitmq.NewPublisher("conversation.flush.delayed", "flush")
|
|||
|
|
}
|
|||
|
|
return delayedFlushPublisher
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// sendDelayedFlushMessage 发送延时落库消息
|
|||
|
|
func sendDelayedFlushMessage(ctx context.Context, sessionId string) error {
|
|||
|
|
msg := &DelayedFlushMessage{
|
|||
|
|
SessionId: sessionId,
|
|||
|
|
}
|
|||
|
|
return getDelayedFlushPublisher().PublishDelayed(ctx, msg, ConversationFlushDelaySeconds)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// handleResponse 处理 RabbitMQ 消息(幂等)
|
|||
|
|
// 落库逻辑:前5句缓存到Redis,第5句时批量落库MongoDB,超过5句不落库
|
|||
|
|
func handleResponse(ctx context.Context, body []byte) error {
|
|||
|
|
ctx, span := jaeger.NewSpan(ctx, "consumer.response")
|
|||
|
|
defer span.End()
|
|||
|
|
|
|||
|
|
glog.Infof(ctx, ">>> handleResponse 被调用,消息长度: %d", len(body))
|
|||
|
|
|
|||
|
|
// 解析消息到结构体
|
|||
|
|
var msg redis.ResponseStreamMessage
|
|||
|
|
if err := gjson.DecodeTo(body, &msg); err != nil {
|
|||
|
|
jaeger.RecordError(ctx, err, "解析响应消息失败")
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
glog.Infof(ctx, "收到 RAGFlow 响应 - 用户: %s, MessageId: %s", msg.UserId, msg.MessageId)
|
|||
|
|
|
|||
|
|
// 1. 获取当前对话轮数
|
|||
|
|
state, err := redis.GetUserState(ctx, msg.UserId, msg.Platform)
|
|||
|
|
if err != nil {
|
|||
|
|
jaeger.RecordError(ctx, err, "获取用户状态失败")
|
|||
|
|
}
|
|||
|
|
count := state.Count
|
|||
|
|
|
|||
|
|
// 2. 根据轮数决定落库策略
|
|||
|
|
cardTriggerCount := g.Cfg().MustGet(ctx, "card.triggerCount", 5).Int64()
|
|||
|
|
if count <= cardTriggerCount {
|
|||
|
|
// 前N句:缓存到Redis(按sessionId)
|
|||
|
|
msgTime := gtime.NewFromTimeStamp(msg.Timestamp).Time
|
|||
|
|
conversation := &entity.Conversation{
|
|||
|
|
UserId: msg.UserId,
|
|||
|
|
Platform: msg.Platform,
|
|||
|
|
SessionId: msg.SessionId,
|
|||
|
|
Question: msg.Question,
|
|||
|
|
Answer: msg.Content,
|
|||
|
|
MessageId: msg.MessageId,
|
|||
|
|
MsgTime: &msgTime, // 取地址赋值给指针类型
|
|||
|
|
}
|
|||
|
|
conversation.TenantId = msg.TenantId
|
|||
|
|
|
|||
|
|
// 序列化后缓存(使用sessionId作为key)
|
|||
|
|
data, _ := gjson.Encode(conversation)
|
|||
|
|
if cacheErr := redis.CacheConversation(ctx, msg.SessionId, data); cacheErr != nil {
|
|||
|
|
jaeger.RecordError(ctx, cacheErr, "缓存对话记录失败")
|
|||
|
|
} else {
|
|||
|
|
glog.Debugf(ctx, "对话已缓存到 Redis - SessionId: %s, 第 %d 轮", msg.SessionId, count)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 第1句时:发送10分钟延时落库消息(兜底)
|
|||
|
|
if count == 1 {
|
|||
|
|
if delayErr := sendDelayedFlushMessage(ctx, msg.SessionId); delayErr != nil {
|
|||
|
|
glog.Warningf(ctx, "发送延时落库消息失败: %v", delayErr)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 第N句时:立即批量落库
|
|||
|
|
if count == cardTriggerCount {
|
|||
|
|
if flushErr := flushConversationCache(ctx, msg.SessionId); flushErr != nil {
|
|||
|
|
jaeger.RecordError(ctx, flushErr, "批量落库失败")
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
} else {
|
|||
|
|
// 超过N句:不落库(已发卡片)
|
|||
|
|
glog.Debugf(ctx, "第 %d 轮(>%d),跳过落库 - 用户: %s", count, cardTriggerCount, msg.UserId)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 3. 推送给 WebSocket 用户(无论是否落库都推送)
|
|||
|
|
glog.Infof(ctx, "准备推送 WebSocket - 用户: %s_%s, 内容长度: %d", msg.UserId, msg.Platform, len(msg.Content))
|
|||
|
|
if err = WebSocket.PushRAGFlowResponse(ctx, msg.TenantId, msg.UserId, msg.Platform, msg.Content); err != nil {
|
|||
|
|
jaeger.RecordError(ctx, err, "推送 WebSocket 失败")
|
|||
|
|
} else {
|
|||
|
|
glog.Infof(ctx, "WebSocket 推送成功 - 用户: %s_%s", msg.UserId, msg.Platform)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// flushConversationCache 将Redis缓存的对话批量落库到MongoDB
|
|||
|
|
func flushConversationCache(ctx context.Context, sessionId string) error {
|
|||
|
|
// 获取缓存的对话列表
|
|||
|
|
cached, err := redis.GetCachedConversations(ctx, sessionId)
|
|||
|
|
if err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
if len(cached) == 0 {
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 反序列化
|
|||
|
|
list := make([]*entity.Conversation, 0, len(cached))
|
|||
|
|
for _, data := range cached {
|
|||
|
|
var conv entity.Conversation
|
|||
|
|
if decErr := gjson.DecodeTo([]byte(data), &conv); decErr != nil {
|
|||
|
|
glog.Warningf(ctx, "反序列化对话失败: %v", decErr)
|
|||
|
|
continue
|
|||
|
|
}
|
|||
|
|
list = append(list, &conv)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 批量插入MongoDB
|
|||
|
|
if len(list) > 0 {
|
|||
|
|
if insertErr := dao.Conversation.BatchInsert(ctx, list); insertErr != nil {
|
|||
|
|
return insertErr
|
|||
|
|
}
|
|||
|
|
glog.Infof(ctx, "批量落库成功 - SessionId: %s, 共 %d 条", sessionId, len(list))
|
|||
|
|
}
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// checkCardBeforeProcess 检查对话轮数,达到阈值时发卡片
|
|||
|
|
// 返回 handled=true 表示已处理(发送卡片),调用方应跳过后续话术处理
|
|||
|
|
func checkCardBeforeProcess(ctx context.Context, tenantId, userId, platform string) (handled bool, err error) {
|
|||
|
|
// 获取用户当前状态
|
|||
|
|
state, err := redis.GetUserState(ctx, userId, platform)
|
|||
|
|
if err != nil {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 状态5(未选择方向)时不计数,等用户选择方向后再开始计数
|
|||
|
|
if state.Stage == 5 {
|
|||
|
|
glog.Debugf(ctx, "用户 %s_%s 处于状态5(未选择方向),跳过计数", userId, platform)
|
|||
|
|
return false, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 增加对话计数
|
|||
|
|
count, err := redis.IncrUserCount(ctx, userId, platform)
|
|||
|
|
if err != nil {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
glog.Infof(ctx, "用户 %s_%s 当前对话轮数: %d", userId, platform, count)
|
|||
|
|
|
|||
|
|
// 对话>=配置轮数,发卡片并跳过话术(从配置值开始就发卡片,不再调用AI)
|
|||
|
|
cardTriggerCount := g.Cfg().MustGet(ctx, "card.triggerCount", 5).Int64()
|
|||
|
|
if count >= cardTriggerCount {
|
|||
|
|
glog.Infof(ctx, "用户 %s_%s 对话第 %d 轮(>=%d),触发发送卡片", userId, platform, count, cardTriggerCount)
|
|||
|
|
|
|||
|
|
// 更新用户状态为3(发卡片状态)
|
|||
|
|
if updateErr := redis.SetUserStage(ctx, userId, platform, 3); updateErr != nil {
|
|||
|
|
jaeger.RecordError(ctx, updateErr, "更新用户状态为3失败")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
cardMessage := "请加一下卡片的联系方式,进行更专业的咨询" // TODO: 替换为实际卡片发送逻辑
|
|||
|
|
if pushErr := WebSocket.PushRAGFlowResponse(ctx, tenantId, userId, platform, cardMessage); pushErr != nil {
|
|||
|
|
jaeger.RecordError(ctx, pushErr, "推送卡片消息失败")
|
|||
|
|
glog.Errorf(ctx, "推送卡片失败 - 用户: %s_%s, 错误: %v", userId, platform, pushErr)
|
|||
|
|
err = pushErr
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
glog.Infof(ctx, "卡片消息已推送 - 用户: %s_%s", userId, platform)
|
|||
|
|
handled = true
|
|||
|
|
}
|
|||
|
|
return
|
|||
|
|
}
|