Files
customer-server/service/speechcraft_service.go
2026-03-14 10:02:49 +08:00

654 lines
24 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package service - 话术服务
// 功能:话术的增删改查、绑定/解绑客服账号、同步到RAGFlow、重试消费者
package service
import (
"context"
"customer-server/dao"
"customer-server/model/dto"
"customer-server/model/entity"
"customer-server/util"
"fmt"
"gitea.com/red-future/common/db/mongo"
"gitea.com/red-future/common/jaeger"
"gitea.com/red-future/common/rabbitmq"
"gitea.com/red-future/common/ragflow"
"gitea.com/red-future/common/redis"
"gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/grpool"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/text/gstr"
"github.com/gogf/gf/v2/util/gconv"
"go.mongodb.org/mongo-driver/v2/bson"
)
var (
Speechcraft = new(speechcraft)
speechcraftGrpool = grpool.New(50) // 文档解析协程池最大50并发
)
type speechcraft struct{}
// Add 添加话术
// 参数: ctx - 上下文req - 添加话术请求(包含标题、内容、方向、标签等)
// 返回: res - 添加成功后的话术ID和RAGFlow文档IDerr - 错误信息
// 功能: 创建话术记录并自动上传到RAGFlow租户知识库支持去重检查
func (s *speechcraft) Add(ctx context.Context, req *dto.AddSpeechcraftReq) (res *dto.AddSpeechcraftRes, err error) {
// 去重检查同一租户下tag唯一
if req.Tag != "" {
existing, err := dao.Speechcraft.FindByTag(ctx, req.Tag)
if err != nil {
return nil, gerror.Wrap(err, "检查话术重复失败")
}
if existing != nil {
return nil, gerror.Newf("话术tag已存在tag=%s, id=%s", req.Tag, existing.Id.Hex())
}
}
// 先从token或请求获取租户信息在Insert之前
var tenantId string
if req.TenantId != nil {
tenantId = gconv.String(req.TenantId)
g.Log().Debugf(ctx, "使用请求中的TenantId: %v", req.TenantId)
} else {
user, err := util.GetTenantInfo(ctx)
if err != nil {
return nil, gerror.Wrap(err, "获取租户信息失败")
}
tenantId = gconv.String(user.TenantId)
if tenantId == "" {
return nil, gerror.New("租户ID为空")
}
}
// 查询租户知识库ID
datasetId, err := dao.RAGFlowConfig.FindDatasetIdByTenant(ctx, tenantId)
if err != nil || datasetId == "" {
return nil, gerror.Newf("租户知识库不存在,请先创建客服账号: tenant_id=%s", tenantId)
}
data := &entity.Speechcraft{}
if err = utils.Struct(req, data); err != nil {
return
}
// 设置基础字段
now := gtime.Now().Time
data.CreatedAt = &now // 取地址赋值给指针类型
data.UpdatedAt = &now // 取地址赋值给指针类型
data.IsDeleted = false
// 统一使用string类型存储tenantId到MongoDB
data.TenantId = tenantId
// 使用DAO封装的Insert方法
id, err := dao.Speechcraft.Insert(ctx, data)
if err != nil {
return nil, gerror.Wrap(err, "插入话术失败")
}
data.Id = &id // 取地址赋值给指针类型
// 同步上传到RAGFlow
ragflowClient := ragflow.GetGlobalClient()
if ragflowClient == nil {
// 回滚:删除刚插入的话术
dao.MongoDAO.Delete(ctx, bson.M{"_id": data.Id}, entity.SpeechcraftCollection)
return nil, gerror.New("RAGFlow客户端未初始化请检查配置")
}
g.Log().Infof(ctx, "准备上传话术到RAGFlow: speechcraft_id=%s, dataset_id=%s, direction=%s, tag=%s",
data.Id.Hex(), datasetId, data.Direction, data.Tag)
filename := fmt.Sprintf("%s_%s.txt", data.Direction, data.Tag)
documentId, err := ragflowClient.UploadDocumentFromText(ctx, datasetId, data.Content, filename)
if err != nil {
// 回滚:删除刚插入的话术
dao.MongoDAO.Delete(ctx, bson.M{"_id": data.Id}, entity.SpeechcraftCollection)
g.Log().Errorf(ctx, "话术上传RAGFlow失败: speechcraft_id=%s, dataset_id=%s, error=%v", data.Id.Hex(), datasetId, err)
jaeger.RecordError(ctx, err, "话术上传RAGFlow失败")
return nil, gerror.Wrap(err, "文档上传到知识库失败")
}
// 异步触发解析grpool自动管理goroutine生命周期WithoutCancel保留追踪避免取消
speechcraftGrpool.Add(ctx, func(ctx context.Context) {
parseCtx := context.WithoutCancel(ctx)
if err := ragflowClient.ParseDocuments(parseCtx, datasetId, []string{documentId}); err != nil {
g.Log().Errorf(parseCtx, "文档解析失败: document_id=%s, error=%v", documentId, err)
} else {
g.Log().Infof(parseCtx, "文档解析成功: document_id=%s", documentId)
}
})
// 更新MongoDB的RagSyncRecords数组使用空accountName表示租户级文档
syncTime := gtime.Now().Format("Y-m-d H:i:s")
record := entity.RagSyncRecord{
AccountName: "", // 空表示租户级文档
RagDocumentId: documentId,
RagSyncStatus: "synced",
SyncTime: syncTime,
RetryCount: 0,
}
filter := bson.M{"_id": data.Id}
update := bson.M{
"$set": bson.M{
"ragSyncRecords": []entity.RagSyncRecord{record},
"ragLastSyncTime": syncTime,
"updatedAt": gtime.Now().Time,
},
}
if _, _, err = dao.MongoDAO.UpdateOne(ctx, filter, update, entity.SpeechcraftCollection); err != nil {
g.Log().Errorf(ctx, "更新话术RagSyncRecords失败: %v", err)
// 不回滚,文档已上传成功
}
g.Log().Infof(ctx, "话术添加成功并上传到知识库: speechcraft_id=%s, document_id=%s", data.Id.Hex(), documentId)
res = &dto.AddSpeechcraftRes{Id: data.Id.Hex()}
return
}
// Update 更新话术
// 参数: ctx - 上下文req - 更新话术请求包含话术ID和待更新字段
// 返回: err - 错误信息
// 功能: 更新话术内容并同步到RAGFlow支持文档删除重建
func (s *speechcraft) Update(ctx context.Context, req *dto.UpdateSpeechcraftReq) (err error) {
return dao.Speechcraft.Update(ctx, req)
}
// Delete 删除话术
// 参数: ctx - 上下文req - 删除话术请求包含话术ID
// 返回: err - 错误信息
// 功能: 逻辑删除话术记录并从RAGFlow移除对应文档
func (s *speechcraft) Delete(ctx context.Context, req *dto.DeleteSpeechcraftReq) (err error) {
g.Log().Infof(ctx, "[Delete] 开始删除话术 - speechcraftId: %s", req.Id)
// 1. 查询话术获取RAGFlow同步记录使用原生查询避免租户过滤
objectId, err := bson.ObjectIDFromHex(req.Id)
if err != nil {
return gerror.Wrap(err, "无效的话术ID")
}
var speechcraft entity.Speechcraft
filter := bson.M{"_id": objectId, "isDeleted": false}
err = dao.MongoDAO.FindOne(ctx, filter, &speechcraft, entity.SpeechcraftCollection)
if err != nil {
if err.Error() == "mongo: no documents in result" {
return gerror.New("话术不存在")
}
return gerror.Wrap(err, "查询话术失败")
}
g.Log().Infof(ctx, "[Delete] 查询到话术 - tag: %s, ragSyncRecords数量: %d", speechcraft.Tag, len(speechcraft.RagSyncRecords))
// 2. 删除RAGFlow中的文档
if len(speechcraft.RagSyncRecords) > 0 {
ragflowClient := ragflow.GetGlobalClient()
if ragflowClient != nil {
tenantId := gconv.String(speechcraft.TenantId)
// 查询租户的dataset_id
datasetId, err := dao.RAGFlowConfig.FindDatasetIdByTenant(ctx, tenantId)
if err != nil {
g.Log().Warningf(ctx, "查询租户知识库ID失败: %v", err)
} else if datasetId != "" {
// 收集所有需要删除的document_id
var documentIds []string
for _, record := range speechcraft.RagSyncRecords {
if record.RagDocumentId != "" {
documentIds = append(documentIds, record.RagDocumentId)
}
}
// 批量删除RAGFlow文档
if len(documentIds) > 0 {
if err := ragflowClient.DeleteDocument(ctx, datasetId, documentIds); err != nil {
g.Log().Errorf(ctx, "删除RAGFlow文档失败: %v, document_ids: %v", err, documentIds)
// 不阻断删除流程,记录错误后继续
} else {
g.Log().Infof(ctx, "成功删除RAGFlow文档: count=%d", len(documentIds))
}
}
}
}
}
// 3. 软删除MongoDB记录
return dao.Speechcraft.Delete(ctx, req)
}
// List 获取话术列表
// 参数: ctx - 上下文req - 列表查询请求(支持分页、关键词搜索、平台筛选)
// 返回: res - 话术列表及分页信息err - 错误信息
// 功能: 分页查询话术记录,支持按标题、内容模糊搜索和平台筛选
func (s *speechcraft) List(ctx context.Context, req *dto.ListSpeechcraftReq) (res *dto.ListSpeechcraftRes, err error) {
list, total, err := dao.Speechcraft.List(ctx, req)
if err != nil {
return
}
res = &dto.ListSpeechcraftRes{
List: list,
Total: int(total),
}
return
}
// Match 话术匹配(核心方法)
// 根据用户当前阶段、行为、输入内容匹配话术
//
// func (s *speechcraft) Match(ctx context.Context, userId, platform, content, status string) (answer string, nextStage int, err error) {
// // 1. 获取用户当前阶段
// state, err := dao.UserStage.GetOrCreate(ctx, userId, platform)
func (s *speechcraft) Match(ctx context.Context, userId, platform, tenantId, content, status string) (answer string, nextStage int, err error) {
// 1. 获取用户当前状态Redis5分钟过期
userState, err := redis.GetUserState(ctx, userId, platform)
if err != nil {
jaeger.RecordError(ctx, err, "获取用户状态失败")
return
}
glog.Infof(ctx, "话术匹配 - 用户: %s, 当前阶段: %d, 行为: %s, 内容: %s", userId, userState.Stage, status, content)
// 2. 状态3发卡片状态持续提示用户添加联系方式
if userState.Stage == 3 {
answer = "请加一下卡片的联系方式,进行更专业的咨询" // TODO: 替换为实际卡片内容
nextStage = 3 // 保持状态3
glog.Infof(ctx, "用户处于发卡片状态 - 用户: %s", userId)
return
}
// 4. 检测用户是否想要联系方式5次内立即发卡
if s.isRequestingContact(content) {
glog.Infof(ctx, "检测到联系方式请求关键词 - 用户: %s, 内容: %s", userId, content)
return s.handleCardRequest(ctx, userId, platform)
}
// 5. 所有其他消息直接走RAGFlow话术已上传到知识库
// 后端只负责开场白WebSocket连接时发送+ 发卡片(上面的逻辑)
glog.Infof(ctx, "无话术匹配转发到RAGFlow - 用户: %s, 内容: %s", userId, content)
nextStage = 0
if updateErr := redis.SetUserStage(ctx, userId, platform, 0); updateErr != nil {
jaeger.RecordError(ctx, updateErr, "更新用户阶段为0失败")
}
// answer为空调用方会走RAGFlow
return
}
// ProcessAndPublish 处理用户消息并推送到Redis Stream
// 参数: ctx - 上下文userId - 用户IDplatform - 平台tenantId - 租户IDcontent - 消息内容status - 用户行为状态accountName - 客服账号名
// 返回: isPushed - 是否成功推送话术匹配结果err - 错误信息
// 功能: 尝试匹配话术匹配成功则直接推送话术内容失败则转发到Redis Stream由RAGFlow处理
func (s *speechcraft) ProcessAndPublish(ctx context.Context, userId, platform, tenantId, content, status, accountName string) (isPushed bool, err error) {
// 1. 话术匹配
answer, _, err := s.Match(ctx, userId, platform, tenantId, content, status)
if err != nil {
return
}
// 2. 未匹配到话术直接推送到Redis Stream
if answer == "" {
glog.Infof(ctx, "话术未匹配,转发到 AI 模型 - 用户: %s, 客服账号: %s", userId, accountName)
// 获取当前实例的动态响应队列名(自动生成,支持多实例部署)
baseQueue := GetConfigString(ctx, "rabbitmq.responseQueue")
replyQueue := rabbitmq.GetInstanceQueueName(baseQueue)
messageId := userId + "_" + gconv.String(gtime.Now().TimestampNano())
// 构造Stream消息
msg := &redis.SendStreamMessage{
UserId: userId,
Platform: platform,
TenantId: tenantId,
AccountName: accountName,
Content: content,
Timestamp: gtime.Now().Timestamp(),
MessageId: messageId,
ReplyQueue: replyQueue,
}
// 检查是否有session缓存无缓存说明已归档需要读取历史
if sessionId, _ := redis.GetSessionCache(ctx, tenantId, userId); sessionId == "" {
if history, histErr := dao.Conversation.GetRecentHistory(ctx, userId, redis.GetHistoryContextLimit()); histErr == nil && len(history) > 0 {
msg.History = history
glog.Infof(ctx, "用户已归档,读取 %d 轮历史对话 - 用户: %s", len(history), userId)
}
}
// 写入Redis Stream
var streamMsgId string
streamMsgId, err = redis.AddToStream(ctx, redis.RAGFlowRequestStreamKey, msg)
if err != nil {
jaeger.RecordError(ctx, err, "写入Stream失败")
return
}
glog.Infof(ctx, "消息已写入Stream - StreamID: %s, 用户: %s", streamMsgId, userId)
isPushed = false // 未匹配话术消息转发到AI处理
return
}
// 3. 匹配到话术,直接推送 WebSocket无需经过 RabbitMQ
if err = WebSocket.PushRAGFlowResponse(ctx, tenantId, userId, platform, answer); err != nil {
jaeger.RecordError(ctx, err, "推送话术响应失败")
return
}
glog.Infof(ctx, "话术响应已推送 - 用户: %s, 话术长度: %d", userId, len(answer))
isPushed = true // 已直接推送响应
return
}
// ResetUserStage 重置用户阶段
func (s *speechcraft) ResetUserStage(ctx context.Context, userId, platform string) (err error) {
return dao.UserStage.Reset(ctx, userId, platform)
}
// isRequestingContact 检测用户是否想要联系方式(触发立即发卡)
func (s *speechcraft) isRequestingContact(content string) bool {
// 联系方式相关关键词
contactKeywords := []string{
"联系方式", "联系你", "联系",
"微信", "VX", "vx", "wx", "WX",
"电话", "手机号", "电话号码",
"怎么找你", "如何联系", "加你",
"私信", "私聊",
}
// 清理内容(去除空格、标点)
cleanContent := gstr.Trim(content)
cleanContent = gstr.ToLower(cleanContent)
for _, keyword := range contactKeywords {
if gstr.Contains(cleanContent, gstr.ToLower(keyword)) {
return true
}
}
return false
}
// handleCardRequest 处理用户请求联系方式(立即发卡片)
func (s *speechcraft) handleCardRequest(ctx context.Context, userId, platform string) (answer string, nextStage int, err error) {
// 更新用户状态为3发卡片状态
if err = redis.SetUserStage(ctx, userId, platform, 3); err != nil {
jaeger.RecordError(ctx, err, "更新用户状态为3失败")
return
}
// 返回卡片话术
answer = "请加一下卡片的联系方式,进行更专业的咨询" // TODO: 替换为实际卡片内容
nextStage = 3
glog.Infof(ctx, "用户请求联系方式,立即发送卡片 - 用户: %s", userId)
return
}
// BindToCustomerServices 绑定话术到客服账号
// 参数: ctx - 上下文req - 绑定请求包含话术ID和客服账号名列表
// 返回: res - 绑定结果成功和失败的账号列表err - 错误信息
// 功能: 将话术同步到指定客服账号的RAGFlow知识库批量处理并返回每个账号的结果
func (s *speechcraft) BindToCustomerServices(ctx context.Context, req *dto.BindSpeechcraftReq) (res *dto.BindSpeechcraftRes, err error) {
res = &dto.BindSpeechcraftRes{}
// 0. 参数验证
if len(req.AccountNames) == 0 {
return nil, gerror.New("客服账号ID列表不能为空")
}
// 1. 查询话术(验证存在性和获取租户信息)
r := g.RequestFromCtx(ctx)
if r != nil {
r.SetParam("accountName", req.AccountNames[0])
}
speechcraft, err := dao.Speechcraft.GetById(ctx, req.SpeechcraftId)
if err != nil {
return nil, gerror.Wrapf(err, "查询话术失败")
}
if speechcraft == nil {
return nil, gerror.New("话术不存在")
}
speechcraftTenantId := gconv.String(speechcraft.TenantId)
// 2. 遍历客服账号更新每个账号的speechcraft_ids
var newBindings []string
var alreadyBound []string
var notFound []string
for _, csId := range req.AccountNames {
// 查询客服账号
csAccount, err := dao.CustomerServiceAccount.FindByAccountName(ctx, csId)
if err != nil || csAccount == nil {
notFound = append(notFound, csId)
g.Log().Warningf(ctx, "客服账号 %s 不存在或已删除,跳过", csId)
continue
}
// 租户隔离校验
accountTenantId := gconv.String(csAccount.TenantId)
if speechcraftTenantId != accountTenantId {
g.Log().Warningf(ctx, "话术和客服账号不属于同一租户,跳过: speechcraft_tenant=%s, account_tenant=%s",
speechcraftTenantId, accountTenantId)
notFound = append(notFound, csId)
continue
}
// 检查是否已绑定
alreadyExists := false
for _, existingId := range csAccount.SpeechcraftIds {
if existingId == req.SpeechcraftId {
alreadyExists = true
break
}
}
if alreadyExists {
alreadyBound = append(alreadyBound, csId)
g.Log().Warningf(ctx, "客服账号 %s 已绑定该话术,跳过", csId)
continue
}
// 添加到speechcraft_ids列表
csAccount.SpeechcraftIds = append(csAccount.SpeechcraftIds, req.SpeechcraftId)
// 更新数据库
filter := bson.M{"_id": csAccount.Id, "isDeleted": false}
update := bson.M{"$set": bson.M{"speechcraftIds": csAccount.SpeechcraftIds}}
if _, err := mongo.DB().Update(ctx, filter, update, entity.CustomerServiceAccountCollection); err != nil {
g.Log().Errorf(ctx, "更新客服账号绑定失败: %s, error=%v", csId, err)
notFound = append(notFound, csId)
continue
}
newBindings = append(newBindings, csId)
}
// 3. 如果没有新的绑定,直接返回
if len(newBindings) == 0 {
res.SuccessCount = 0
res.AlreadyBound = alreadyBound
res.NotFound = notFound
if len(alreadyBound) > 0 && len(notFound) > 0 {
res.Message = "部分客服账号已绑定,部分不存在"
} else if len(alreadyBound) > 0 {
res.Message = "所有客服账号已绑定,无需重复绑定"
} else if len(notFound) > 0 {
res.Message = "所有客服账号都不存在或租户不匹配"
}
return res, nil
}
// 6. 同步到RAGFlow自动创建知识库
for _, csId := range newBindings {
// 获取客服账号信息以获取tenant_id
csAccount, err := dao.CustomerServiceAccount.FindByAccountName(ctx, csId)
if err != nil || csAccount == nil {
g.Log().Errorf(ctx, "获取客服账号信息失败: %s", csId)
continue
}
// 同步到RAGFlow会自动创建知识库
tenantId := gconv.String(csAccount.TenantId)
g.Log().Infof(ctx, "客服账号租户信息: cs_id=%s, tenant_id=%v, tenant_id_type=%T", csId, csAccount.TenantId, csAccount.TenantId)
_, err = s.SyncToRAGFlow(ctx, req.SpeechcraftId, csId, tenantId)
if err != nil {
g.Log().Errorf(ctx, "同步到RAGFlow失败: speechcraft_id=%s, cs_id=%s, error=%v", req.SpeechcraftId, csId, err)
// 不阻断绑定流程,失败会进入重试队列
}
}
res.SuccessCount = len(newBindings)
res.AlreadyBound = alreadyBound
res.NotFound = notFound
// 生成详细的响应消息
if len(alreadyBound) > 0 || len(notFound) > 0 {
res.Message = fmt.Sprintf("成功绑定%d个", len(newBindings))
if len(alreadyBound) > 0 {
res.Message += fmt.Sprintf("%d个已绑定", len(alreadyBound))
}
if len(notFound) > 0 {
res.Message += fmt.Sprintf("%d个不存在", len(notFound))
}
} else {
res.Message = fmt.Sprintf("成功绑定%d个客服账号", len(newBindings))
}
return
}
// UnbindFromCustomerService 从客服账号解绑话术
// 参数: ctx - 上下文req - 解绑请求包含话术ID和客服账号名
// 返回: res - 解绑结果信息err - 错误信息
// 功能: 从客服账号的RAGFlow知识库中删除话术文档
func (s *speechcraft) UnbindFromCustomerService(ctx context.Context, req *dto.UnbindSpeechcraftReq) (res *dto.UnbindSpeechcraftRes, err error) {
res = &dto.UnbindSpeechcraftRes{}
// 1. 验证话术存在
speechcraft, err := dao.Speechcraft.GetById(ctx, req.SpeechcraftId)
if err != nil {
return nil, gerror.Wrapf(err, "查询话术失败")
}
if speechcraft == nil {
return nil, gerror.New("话术不存在")
}
// 2. 查询客服账号
csAccount, err := dao.CustomerServiceAccount.FindByAccountName(ctx, req.AccountName)
if err != nil || csAccount == nil {
res.Success = false
res.Message = "客服账号不存在"
return res, nil
}
// 3. 从 speechcraft_ids 中移除话术ID
var newSpeechcraftIds []string
found := false
for _, scId := range csAccount.SpeechcraftIds {
if scId == req.SpeechcraftId {
found = true
continue
}
newSpeechcraftIds = append(newSpeechcraftIds, scId)
}
if !found {
res.Success = false
res.Message = "未找到该绑定关系"
return res, nil
}
// 4. 更新数据库
filter := bson.M{"_id": csAccount.Id, "isDeleted": false}
update := bson.M{"$set": bson.M{"speechcraftIds": newSpeechcraftIds}}
if _, err := mongo.DB().Update(ctx, filter, update, entity.CustomerServiceAccountCollection); err != nil {
return nil, gerror.Wrapf(err, "解绑失败")
}
res.Success = true
res.Message = "解绑成功"
return
}
// SyncToRAGFlow 同步话术到RAGFlow
// 参数: ctx - 上下文speechcraftId - 话术IDaccountName - 客服账号名tenantId - 租户ID
// 返回: documentId - RAGFlow文档IDerr - 错误信息
// 功能: 将话术上传到指定客服账号的RAGFlow知识库失败时自动加入重试队列
func (s *speechcraft) SyncToRAGFlow(ctx context.Context, speechcraftId, accountName, tenantId string) (documentId string, err error) {
// 1. 查询话术
speechcraft, err := dao.Speechcraft.GetById(ctx, speechcraftId)
if err != nil {
return "", gerror.Wrapf(err, "查询话术失败")
}
if speechcraft == nil {
return "", gerror.New("话术不存在")
}
// 2. 确保知识库存在获取真实的datasetId
datasetId, err := s.ensureDatasetExists(ctx, tenantId, "话术")
if err != nil {
return "", gerror.Wrapf(err, "确保知识库存在失败")
}
// 3. 调用RAGFlow上传文档
ragflowClient := ragflow.GetGlobalClient()
filename := fmt.Sprintf("%s_%s_%s.txt", speechcraft.Direction, speechcraft.Tag, accountName)
documentId, err = ragflowClient.UploadDocumentFromText(ctx, datasetId, speechcraft.Content, filename)
if err != nil {
jaeger.RecordError(ctx, err, "话术上传RAGFlow失败")
return "", gerror.Wrap(err, "话术上传RAGFlow失败")
}
// 3.1 上传成功后立即调用解析接口
g.Log().Infof(ctx, "文档上传成功,开始解析: document_id=%s", documentId)
if err = ragflowClient.ParseDocuments(ctx, datasetId, []string{documentId}); err != nil {
// 解析失败只记录日志,不影响绑定流程(文档已上传,可以手动重试解析)
g.Log().Errorf(ctx, "文档解析失败: document_id=%s, error=%v", documentId, err)
jaeger.RecordError(ctx, err, "文档解析失败")
} else {
g.Log().Infof(ctx, "文档解析请求已发送: document_id=%s", documentId)
}
// 4. 更新MongoDB的RagSyncRecord
now := gtime.Now().Format("Y-m-d H:i:s")
updated := false
for i := range speechcraft.RagSyncRecords {
record := &speechcraft.RagSyncRecords[i]
if record.AccountName == accountName {
record.RagDocumentId = documentId
record.RagSyncStatus = "synced"
record.SyncTime = now
record.RetryCount = 0
updated = true
break
}
}
// 如果没有找到记录,新增
if !updated {
speechcraft.RagSyncRecords = append(speechcraft.RagSyncRecords, entity.RagSyncRecord{
AccountName: accountName,
RagDocumentId: documentId,
RagSyncStatus: "synced",
SyncTime: now,
RetryCount: 0,
})
}
if err = dao.Speechcraft.UpdateEntity(ctx, speechcraft); err != nil {
return "", gerror.Wrapf(err, "更新话术同步状态失败")
}
// 注意不再更新Chat的datasetIds因为创建Chat时已经绑定了知识库
// 话术文档上传到知识库后Chat会自动使用该知识库的内容
glog.Infof(ctx, "话术同步成功: speechcraft_id=%s, account_name=%s, document_id=%s", speechcraftId, accountName, documentId)
return documentId, nil
}
// ensureDatasetExists 已废弃,改用公共方法 EnsureTenantDataset
// 保留此方法仅为兼容性,直接调用公共方法
func (s *speechcraft) ensureDatasetExists(ctx context.Context, tenantId, datasetType string) (datasetId string, err error) {
return EnsureTenantDataset(ctx, tenantId)
}