Files
customer-server/service/conversation_service.go

354 lines
12 KiB
Go
Raw Permalink Normal View History

2026-03-14 10:02:49 +08:00
// Package service - 对话服务
// 功能处理RAGFlow响应、批量落库、卡片触发逻辑
package service
import (
"context"
"customer-server/dao"
"customer-server/model/entity"
"gitea.com/red-future/common/jaeger"
"gitea.com/red-future/common/rabbitmq"
"gitea.com/red-future/common/redis"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/gtime"
)
// conversationService 对话服务(操作 conversation 表)
type conversationService struct{}
// ConversationService 对话服务单例
var ConversationService = new(conversationService)
// ============== RabbitMQ 消费者RAGFlow 响应消息)==============
// ResponseConsumer RabbitMQ 响应消费者
type ResponseConsumer struct {
queueName string
consumer *rabbitmq.Consumer
}
// NewResponseConsumer 创建响应消费者
// 参数: ctx - 上下文
// 返回: ResponseConsumer - 响应消费者实例
// 功能: 创建当前实例的唯一响应队列,支持多实例部署
func NewResponseConsumer(ctx context.Context) *ResponseConsumer {
// 从配置读取基础队列名(用于生成唯一实例队列名)
baseQueue := GetConfigString(ctx, "rabbitmq.responseQueue")
// 生成当前实例的唯一队列名ragflow.response.queue.{hostname}.{uuid8}
// 支持多实例部署,每个实例有独立响应队列
queueName := rabbitmq.GetInstanceQueueName(baseQueue)
glog.Infof(ctx, "响应队列动态生成 - 实例队列: %s", queueName)
return &ResponseConsumer{
queueName: queueName,
}
}
// Start 启动消费者
// 参数: ctx - 上下文
// 返回: err - 错误信息
// 功能: 声明并绑定当前实例的响应队列开始消费RAGFlow响应消息
func (c *ResponseConsumer) Start(ctx context.Context) (err error) {
glog.Infof(ctx, "RabbitMQ 响应消费者启动 - Queue: %s", c.queueName)
// 声明当前实例的动态响应队列
if err = rabbitmq.DeclareQueue(ctx, &rabbitmq.QueueConfig{
Name: c.queueName,
Durable: true,
}); err != nil {
glog.Errorf(ctx, "声明动态响应队列失败: %v", err)
return err
}
// 绑定队列到 Exchange使用队列名作为routing key实现精确路由
// message发送消息时会使用队列名作为routing key
if err = rabbitmq.BindQueue(ctx, &rabbitmq.BindingConfig{
Queue: c.queueName,
Exchange: "ragflow.response",
RoutingKey: c.queueName, // 使用队列名,只接收发给自己的消息
}); err != nil {
glog.Errorf(ctx, "绑定动态响应队列失败: %v", err)
return err
}
glog.Infof(ctx, "动态响应队列已绑定: %s -> ragflow.response (routingKey=#)", c.queueName)
c.consumer = rabbitmq.NewConsumer(c.queueName, handleResponse)
return c.consumer.Start(ctx)
}
// Stop 停止消费者
// 参数: ctx - 上下文
// 功能: 停止消费RAGFlow响应消息
func (c *ResponseConsumer) Stop(ctx context.Context) {
if c.consumer != nil {
c.consumer.Stop(ctx)
}
}
// ============== 卡片触发配置(待接入小红书卡片接口后修改)==============
//
// 【用户状态存储】
// 使用 Redis Hash 存储用户会话状态(阶段+对话计数统一5分钟TTL
// Key: ragflow:user:state:{userId}_{platform}
// Fields: stage阶段、count对话计数
//
// 【状态定义】
// 状态0走AI模型 | 状态1打招呼 | 状态2业务咨询 | 状态3发卡片
//
// 【卡片触发逻辑】
// 对话轮数>=配置值时更新状态为3并发送卡片消息
// 配置项config.yml中的card.triggerCount默认5轮
//
// 【待接入小红书卡片 API 后修改位置】
// checkAndSendCard() 函数中的 cardMessage 变量
const (
// ConversationFlushDelaySeconds 对话缓存延时落库时间(秒)
ConversationFlushDelaySeconds = 600 // 10分钟
)
// ============== 延时落库消费者 ==============
// DelayedFlushMessage 延时落库消息按sessionId
type DelayedFlushMessage struct {
SessionId string `json:"sessionId"`
}
// DelayedFlushConsumer 延时落库消费者
type DelayedFlushConsumer struct {
queueName string
consumer *rabbitmq.Consumer
}
// NewDelayedFlushConsumer 创建延时落库消费者
func NewDelayedFlushConsumer(ctx context.Context) *DelayedFlushConsumer {
return &DelayedFlushConsumer{
queueName: "conversation.flush.queue",
}
}
// Start 启动消费者
func (c *DelayedFlushConsumer) Start(ctx context.Context) (err error) {
glog.Infof(ctx, "延时落库消费者启动 - Queue: %s", c.queueName)
c.consumer = rabbitmq.NewConsumer(c.queueName, handleDelayedFlush)
return c.consumer.Start(ctx)
}
// Stop 停止消费者
func (c *DelayedFlushConsumer) Stop(ctx context.Context) {
if c.consumer != nil {
c.consumer.Stop(ctx)
}
}
// handleDelayedFlush 处理延时落库消息
func handleDelayedFlush(ctx context.Context, body []byte) error {
var msg DelayedFlushMessage
if err := gjson.DecodeTo(body, &msg); err != nil {
glog.Errorf(ctx, "解析延时落库消息失败: %v", err)
return err
}
glog.Infof(ctx, "收到延时落库消息 - SessionId: %s", msg.SessionId)
// 检查是否有未落库的缓存
count, err := redis.GetCachedConversationCount(ctx, msg.SessionId)
if err != nil {
glog.Errorf(ctx, "获取缓存数量失败: %v", err)
return err
}
if count == 0 {
glog.Debugf(ctx, "无需落库(缓存为空或已落库)- SessionId: %s", msg.SessionId)
return nil
}
// 执行落库
if err = flushConversationCache(ctx, msg.SessionId); err != nil {
glog.Errorf(ctx, "延时落库失败: %v", err)
return err
}
glog.Infof(ctx, "延时落库完成 - SessionId: %s", msg.SessionId)
return nil
}
// 延时落库发布器(单例)
var delayedFlushPublisher *rabbitmq.Publisher
// getDelayedFlushPublisher 获取延时落库发布器
func getDelayedFlushPublisher() *rabbitmq.Publisher {
if delayedFlushPublisher == nil {
delayedFlushPublisher = rabbitmq.NewPublisher("conversation.flush.delayed", "flush")
}
return delayedFlushPublisher
}
// sendDelayedFlushMessage 发送延时落库消息
func sendDelayedFlushMessage(ctx context.Context, sessionId string) error {
msg := &DelayedFlushMessage{
SessionId: sessionId,
}
return getDelayedFlushPublisher().PublishDelayed(ctx, msg, ConversationFlushDelaySeconds)
}
// handleResponse 处理 RabbitMQ 消息(幂等)
// 落库逻辑前5句缓存到Redis第5句时批量落库MongoDB超过5句不落库
func handleResponse(ctx context.Context, body []byte) error {
ctx, span := jaeger.NewSpan(ctx, "consumer.response")
defer span.End()
glog.Infof(ctx, ">>> handleResponse 被调用,消息长度: %d", len(body))
// 解析消息到结构体
var msg redis.ResponseStreamMessage
if err := gjson.DecodeTo(body, &msg); err != nil {
jaeger.RecordError(ctx, err, "解析响应消息失败")
return err
}
glog.Infof(ctx, "收到 RAGFlow 响应 - 用户: %s, MessageId: %s", msg.UserId, msg.MessageId)
// 1. 获取当前对话轮数
state, err := redis.GetUserState(ctx, msg.UserId, msg.Platform)
if err != nil {
jaeger.RecordError(ctx, err, "获取用户状态失败")
}
count := state.Count
// 2. 根据轮数决定落库策略
cardTriggerCount := g.Cfg().MustGet(ctx, "card.triggerCount", 5).Int64()
if count <= cardTriggerCount {
// 前N句缓存到Redis按sessionId
msgTime := gtime.NewFromTimeStamp(msg.Timestamp).Time
conversation := &entity.Conversation{
UserId: msg.UserId,
Platform: msg.Platform,
SessionId: msg.SessionId,
Question: msg.Question,
Answer: msg.Content,
MessageId: msg.MessageId,
MsgTime: &msgTime, // 取地址赋值给指针类型
}
conversation.TenantId = msg.TenantId
// 序列化后缓存使用sessionId作为key
data, _ := gjson.Encode(conversation)
if cacheErr := redis.CacheConversation(ctx, msg.SessionId, data); cacheErr != nil {
jaeger.RecordError(ctx, cacheErr, "缓存对话记录失败")
} else {
glog.Debugf(ctx, "对话已缓存到 Redis - SessionId: %s, 第 %d 轮", msg.SessionId, count)
}
// 第1句时发送10分钟延时落库消息兜底
if count == 1 {
if delayErr := sendDelayedFlushMessage(ctx, msg.SessionId); delayErr != nil {
glog.Warningf(ctx, "发送延时落库消息失败: %v", delayErr)
}
}
// 第N句时立即批量落库
if count == cardTriggerCount {
if flushErr := flushConversationCache(ctx, msg.SessionId); flushErr != nil {
jaeger.RecordError(ctx, flushErr, "批量落库失败")
}
}
} else {
// 超过N句不落库已发卡片
glog.Debugf(ctx, "第 %d 轮(>%d跳过落库 - 用户: %s", count, cardTriggerCount, msg.UserId)
}
// 3. 推送给 WebSocket 用户(无论是否落库都推送)
glog.Infof(ctx, "准备推送 WebSocket - 用户: %s_%s, 内容长度: %d", msg.UserId, msg.Platform, len(msg.Content))
if err = WebSocket.PushRAGFlowResponse(ctx, msg.TenantId, msg.UserId, msg.Platform, msg.Content); err != nil {
jaeger.RecordError(ctx, err, "推送 WebSocket 失败")
} else {
glog.Infof(ctx, "WebSocket 推送成功 - 用户: %s_%s", msg.UserId, msg.Platform)
}
return nil
}
// flushConversationCache 将Redis缓存的对话批量落库到MongoDB
func flushConversationCache(ctx context.Context, sessionId string) error {
// 获取缓存的对话列表
cached, err := redis.GetCachedConversations(ctx, sessionId)
if err != nil {
return err
}
if len(cached) == 0 {
return nil
}
// 反序列化
list := make([]*entity.Conversation, 0, len(cached))
for _, data := range cached {
var conv entity.Conversation
if decErr := gjson.DecodeTo([]byte(data), &conv); decErr != nil {
glog.Warningf(ctx, "反序列化对话失败: %v", decErr)
continue
}
list = append(list, &conv)
}
// 批量插入MongoDB
if len(list) > 0 {
if insertErr := dao.Conversation.BatchInsert(ctx, list); insertErr != nil {
return insertErr
}
glog.Infof(ctx, "批量落库成功 - SessionId: %s, 共 %d 条", sessionId, len(list))
}
return nil
}
// checkCardBeforeProcess 检查对话轮数,达到阈值时发卡片
// 返回 handled=true 表示已处理(发送卡片),调用方应跳过后续话术处理
func checkCardBeforeProcess(ctx context.Context, tenantId, userId, platform string) (handled bool, err error) {
// 获取用户当前状态
state, err := redis.GetUserState(ctx, userId, platform)
if err != nil {
return
}
// 状态5未选择方向时不计数等用户选择方向后再开始计数
if state.Stage == 5 {
glog.Debugf(ctx, "用户 %s_%s 处于状态5未选择方向跳过计数", userId, platform)
return false, nil
}
// 增加对话计数
count, err := redis.IncrUserCount(ctx, userId, platform)
if err != nil {
return
}
glog.Infof(ctx, "用户 %s_%s 当前对话轮数: %d", userId, platform, count)
// 对话>=配置轮数发卡片并跳过话术从配置值开始就发卡片不再调用AI
cardTriggerCount := g.Cfg().MustGet(ctx, "card.triggerCount", 5).Int64()
if count >= cardTriggerCount {
glog.Infof(ctx, "用户 %s_%s 对话第 %d 轮(>=%d触发发送卡片", userId, platform, count, cardTriggerCount)
// 更新用户状态为3发卡片状态
if updateErr := redis.SetUserStage(ctx, userId, platform, 3); updateErr != nil {
jaeger.RecordError(ctx, updateErr, "更新用户状态为3失败")
}
cardMessage := "请加一下卡片的联系方式,进行更专业的咨询" // TODO: 替换为实际卡片发送逻辑
if pushErr := WebSocket.PushRAGFlowResponse(ctx, tenantId, userId, platform, cardMessage); pushErr != nil {
jaeger.RecordError(ctx, pushErr, "推送卡片消息失败")
glog.Errorf(ctx, "推送卡片失败 - 用户: %s_%s, 错误: %v", userId, platform, pushErr)
err = pushErr
return
}
glog.Infof(ctx, "卡片消息已推送 - 用户: %s_%s", userId, platform)
handled = true
}
return
}