Files
customer-server/service/speechcraft_service.go

654 lines
24 KiB
Go
Raw Normal View History

2026-03-14 10:02:49 +08:00
// Package service - 话术服务
// 功能:话术的增删改查、绑定/解绑客服账号、同步到RAGFlow、重试消费者
package service
import (
"context"
"customer-server/dao"
"customer-server/model/dto"
"customer-server/model/entity"
"customer-server/util"
"fmt"
"gitea.com/red-future/common/db/mongo"
"gitea.com/red-future/common/jaeger"
"gitea.com/red-future/common/rabbitmq"
"gitea.com/red-future/common/ragflow"
"gitea.com/red-future/common/redis"
"gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/grpool"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/text/gstr"
"github.com/gogf/gf/v2/util/gconv"
"go.mongodb.org/mongo-driver/v2/bson"
)
var (
Speechcraft = new(speechcraft)
speechcraftGrpool = grpool.New(50) // 文档解析协程池最大50并发
)
type speechcraft struct{}
// Add 添加话术
// 参数: ctx - 上下文req - 添加话术请求(包含标题、内容、方向、标签等)
// 返回: res - 添加成功后的话术ID和RAGFlow文档IDerr - 错误信息
// 功能: 创建话术记录并自动上传到RAGFlow租户知识库支持去重检查
func (s *speechcraft) Add(ctx context.Context, req *dto.AddSpeechcraftReq) (res *dto.AddSpeechcraftRes, err error) {
// 去重检查同一租户下tag唯一
if req.Tag != "" {
existing, err := dao.Speechcraft.FindByTag(ctx, req.Tag)
if err != nil {
return nil, gerror.Wrap(err, "检查话术重复失败")
}
if existing != nil {
return nil, gerror.Newf("话术tag已存在tag=%s, id=%s", req.Tag, existing.Id.Hex())
}
}
// 先从token或请求获取租户信息在Insert之前
var tenantId string
if req.TenantId != nil {
tenantId = gconv.String(req.TenantId)
g.Log().Debugf(ctx, "使用请求中的TenantId: %v", req.TenantId)
} else {
user, err := util.GetTenantInfo(ctx)
if err != nil {
return nil, gerror.Wrap(err, "获取租户信息失败")
}
tenantId = gconv.String(user.TenantId)
if tenantId == "" {
return nil, gerror.New("租户ID为空")
}
}
// 查询租户知识库ID
datasetId, err := dao.RAGFlowConfig.FindDatasetIdByTenant(ctx, tenantId)
if err != nil || datasetId == "" {
return nil, gerror.Newf("租户知识库不存在,请先创建客服账号: tenant_id=%s", tenantId)
}
data := &entity.Speechcraft{}
if err = utils.Struct(req, data); err != nil {
return
}
// 设置基础字段
now := gtime.Now().Time
data.CreatedAt = &now // 取地址赋值给指针类型
data.UpdatedAt = &now // 取地址赋值给指针类型
data.IsDeleted = false
// 统一使用string类型存储tenantId到MongoDB
data.TenantId = tenantId
// 使用DAO封装的Insert方法
id, err := dao.Speechcraft.Insert(ctx, data)
if err != nil {
return nil, gerror.Wrap(err, "插入话术失败")
}
data.Id = &id // 取地址赋值给指针类型
// 同步上传到RAGFlow
ragflowClient := ragflow.GetGlobalClient()
if ragflowClient == nil {
// 回滚:删除刚插入的话术
dao.MongoDAO.Delete(ctx, bson.M{"_id": data.Id}, entity.SpeechcraftCollection)
return nil, gerror.New("RAGFlow客户端未初始化请检查配置")
}
g.Log().Infof(ctx, "准备上传话术到RAGFlow: speechcraft_id=%s, dataset_id=%s, direction=%s, tag=%s",
data.Id.Hex(), datasetId, data.Direction, data.Tag)
filename := fmt.Sprintf("%s_%s.txt", data.Direction, data.Tag)
documentId, err := ragflowClient.UploadDocumentFromText(ctx, datasetId, data.Content, filename)
if err != nil {
// 回滚:删除刚插入的话术
dao.MongoDAO.Delete(ctx, bson.M{"_id": data.Id}, entity.SpeechcraftCollection)
g.Log().Errorf(ctx, "话术上传RAGFlow失败: speechcraft_id=%s, dataset_id=%s, error=%v", data.Id.Hex(), datasetId, err)
jaeger.RecordError(ctx, err, "话术上传RAGFlow失败")
return nil, gerror.Wrap(err, "文档上传到知识库失败")
}
// 异步触发解析grpool自动管理goroutine生命周期WithoutCancel保留追踪避免取消
speechcraftGrpool.Add(ctx, func(ctx context.Context) {
parseCtx := context.WithoutCancel(ctx)
if err := ragflowClient.ParseDocuments(parseCtx, datasetId, []string{documentId}); err != nil {
g.Log().Errorf(parseCtx, "文档解析失败: document_id=%s, error=%v", documentId, err)
} else {
g.Log().Infof(parseCtx, "文档解析成功: document_id=%s", documentId)
}
})
// 更新MongoDB的RagSyncRecords数组使用空accountName表示租户级文档
syncTime := gtime.Now().Format("Y-m-d H:i:s")
record := entity.RagSyncRecord{
AccountName: "", // 空表示租户级文档
RagDocumentId: documentId,
RagSyncStatus: "synced",
SyncTime: syncTime,
RetryCount: 0,
}
filter := bson.M{"_id": data.Id}
update := bson.M{
"$set": bson.M{
"ragSyncRecords": []entity.RagSyncRecord{record},
"ragLastSyncTime": syncTime,
"updatedAt": gtime.Now().Time,
},
}
if _, _, err = dao.MongoDAO.UpdateOne(ctx, filter, update, entity.SpeechcraftCollection); err != nil {
g.Log().Errorf(ctx, "更新话术RagSyncRecords失败: %v", err)
// 不回滚,文档已上传成功
}
g.Log().Infof(ctx, "话术添加成功并上传到知识库: speechcraft_id=%s, document_id=%s", data.Id.Hex(), documentId)
res = &dto.AddSpeechcraftRes{Id: data.Id.Hex()}
return
}
// Update 更新话术
// 参数: ctx - 上下文req - 更新话术请求包含话术ID和待更新字段
// 返回: err - 错误信息
// 功能: 更新话术内容并同步到RAGFlow支持文档删除重建
func (s *speechcraft) Update(ctx context.Context, req *dto.UpdateSpeechcraftReq) (err error) {
return dao.Speechcraft.Update(ctx, req)
}
// Delete 删除话术
// 参数: ctx - 上下文req - 删除话术请求包含话术ID
// 返回: err - 错误信息
// 功能: 逻辑删除话术记录并从RAGFlow移除对应文档
func (s *speechcraft) Delete(ctx context.Context, req *dto.DeleteSpeechcraftReq) (err error) {
g.Log().Infof(ctx, "[Delete] 开始删除话术 - speechcraftId: %s", req.Id)
// 1. 查询话术获取RAGFlow同步记录使用原生查询避免租户过滤
objectId, err := bson.ObjectIDFromHex(req.Id)
if err != nil {
return gerror.Wrap(err, "无效的话术ID")
}
var speechcraft entity.Speechcraft
filter := bson.M{"_id": objectId, "isDeleted": false}
err = dao.MongoDAO.FindOne(ctx, filter, &speechcraft, entity.SpeechcraftCollection)
if err != nil {
if err.Error() == "mongo: no documents in result" {
return gerror.New("话术不存在")
}
return gerror.Wrap(err, "查询话术失败")
}
g.Log().Infof(ctx, "[Delete] 查询到话术 - tag: %s, ragSyncRecords数量: %d", speechcraft.Tag, len(speechcraft.RagSyncRecords))
// 2. 删除RAGFlow中的文档
if len(speechcraft.RagSyncRecords) > 0 {
ragflowClient := ragflow.GetGlobalClient()
if ragflowClient != nil {
tenantId := gconv.String(speechcraft.TenantId)
// 查询租户的dataset_id
datasetId, err := dao.RAGFlowConfig.FindDatasetIdByTenant(ctx, tenantId)
if err != nil {
g.Log().Warningf(ctx, "查询租户知识库ID失败: %v", err)
} else if datasetId != "" {
// 收集所有需要删除的document_id
var documentIds []string
for _, record := range speechcraft.RagSyncRecords {
if record.RagDocumentId != "" {
documentIds = append(documentIds, record.RagDocumentId)
}
}
// 批量删除RAGFlow文档
if len(documentIds) > 0 {
if err := ragflowClient.DeleteDocument(ctx, datasetId, documentIds); err != nil {
g.Log().Errorf(ctx, "删除RAGFlow文档失败: %v, document_ids: %v", err, documentIds)
// 不阻断删除流程,记录错误后继续
} else {
g.Log().Infof(ctx, "成功删除RAGFlow文档: count=%d", len(documentIds))
}
}
}
}
}
// 3. 软删除MongoDB记录
return dao.Speechcraft.Delete(ctx, req)
}
// List 获取话术列表
// 参数: ctx - 上下文req - 列表查询请求(支持分页、关键词搜索、平台筛选)
// 返回: res - 话术列表及分页信息err - 错误信息
// 功能: 分页查询话术记录,支持按标题、内容模糊搜索和平台筛选
func (s *speechcraft) List(ctx context.Context, req *dto.ListSpeechcraftReq) (res *dto.ListSpeechcraftRes, err error) {
list, total, err := dao.Speechcraft.List(ctx, req)
if err != nil {
return
}
res = &dto.ListSpeechcraftRes{
List: list,
Total: int(total),
}
return
}
// Match 话术匹配(核心方法)
// 根据用户当前阶段、行为、输入内容匹配话术
//
// func (s *speechcraft) Match(ctx context.Context, userId, platform, content, status string) (answer string, nextStage int, err error) {
// // 1. 获取用户当前阶段
// state, err := dao.UserStage.GetOrCreate(ctx, userId, platform)
func (s *speechcraft) Match(ctx context.Context, userId, platform, tenantId, content, status string) (answer string, nextStage int, err error) {
// 1. 获取用户当前状态Redis5分钟过期
userState, err := redis.GetUserState(ctx, userId, platform)
if err != nil {
jaeger.RecordError(ctx, err, "获取用户状态失败")
return
}
glog.Infof(ctx, "话术匹配 - 用户: %s, 当前阶段: %d, 行为: %s, 内容: %s", userId, userState.Stage, status, content)
// 2. 状态3发卡片状态持续提示用户添加联系方式
if userState.Stage == 3 {
answer = "请加一下卡片的联系方式,进行更专业的咨询" // TODO: 替换为实际卡片内容
nextStage = 3 // 保持状态3
glog.Infof(ctx, "用户处于发卡片状态 - 用户: %s", userId)
return
}
// 4. 检测用户是否想要联系方式5次内立即发卡
if s.isRequestingContact(content) {
glog.Infof(ctx, "检测到联系方式请求关键词 - 用户: %s, 内容: %s", userId, content)
return s.handleCardRequest(ctx, userId, platform)
}
// 5. 所有其他消息直接走RAGFlow话术已上传到知识库
// 后端只负责开场白WebSocket连接时发送+ 发卡片(上面的逻辑)
glog.Infof(ctx, "无话术匹配转发到RAGFlow - 用户: %s, 内容: %s", userId, content)
nextStage = 0
if updateErr := redis.SetUserStage(ctx, userId, platform, 0); updateErr != nil {
jaeger.RecordError(ctx, updateErr, "更新用户阶段为0失败")
}
// answer为空调用方会走RAGFlow
return
}
// ProcessAndPublish 处理用户消息并推送到Redis Stream
// 参数: ctx - 上下文userId - 用户IDplatform - 平台tenantId - 租户IDcontent - 消息内容status - 用户行为状态accountName - 客服账号名
// 返回: isPushed - 是否成功推送话术匹配结果err - 错误信息
// 功能: 尝试匹配话术匹配成功则直接推送话术内容失败则转发到Redis Stream由RAGFlow处理
func (s *speechcraft) ProcessAndPublish(ctx context.Context, userId, platform, tenantId, content, status, accountName string) (isPushed bool, err error) {
// 1. 话术匹配
answer, _, err := s.Match(ctx, userId, platform, tenantId, content, status)
if err != nil {
return
}
// 2. 未匹配到话术直接推送到Redis Stream
if answer == "" {
glog.Infof(ctx, "话术未匹配,转发到 AI 模型 - 用户: %s, 客服账号: %s", userId, accountName)
// 获取当前实例的动态响应队列名(自动生成,支持多实例部署)
baseQueue := GetConfigString(ctx, "rabbitmq.responseQueue")
replyQueue := rabbitmq.GetInstanceQueueName(baseQueue)
messageId := userId + "_" + gconv.String(gtime.Now().TimestampNano())
// 构造Stream消息
msg := &redis.SendStreamMessage{
UserId: userId,
Platform: platform,
TenantId: tenantId,
AccountName: accountName,
Content: content,
Timestamp: gtime.Now().Timestamp(),
MessageId: messageId,
ReplyQueue: replyQueue,
}
// 检查是否有session缓存无缓存说明已归档需要读取历史
if sessionId, _ := redis.GetSessionCache(ctx, tenantId, userId); sessionId == "" {
if history, histErr := dao.Conversation.GetRecentHistory(ctx, userId, redis.GetHistoryContextLimit()); histErr == nil && len(history) > 0 {
msg.History = history
glog.Infof(ctx, "用户已归档,读取 %d 轮历史对话 - 用户: %s", len(history), userId)
}
}
// 写入Redis Stream
var streamMsgId string
streamMsgId, err = redis.AddToStream(ctx, redis.RAGFlowRequestStreamKey, msg)
if err != nil {
jaeger.RecordError(ctx, err, "写入Stream失败")
return
}
glog.Infof(ctx, "消息已写入Stream - StreamID: %s, 用户: %s", streamMsgId, userId)
isPushed = false // 未匹配话术消息转发到AI处理
return
}
// 3. 匹配到话术,直接推送 WebSocket无需经过 RabbitMQ
if err = WebSocket.PushRAGFlowResponse(ctx, tenantId, userId, platform, answer); err != nil {
jaeger.RecordError(ctx, err, "推送话术响应失败")
return
}
glog.Infof(ctx, "话术响应已推送 - 用户: %s, 话术长度: %d", userId, len(answer))
isPushed = true // 已直接推送响应
return
}
// ResetUserStage 重置用户阶段
func (s *speechcraft) ResetUserStage(ctx context.Context, userId, platform string) (err error) {
return dao.UserStage.Reset(ctx, userId, platform)
}
// isRequestingContact 检测用户是否想要联系方式(触发立即发卡)
func (s *speechcraft) isRequestingContact(content string) bool {
// 联系方式相关关键词
contactKeywords := []string{
"联系方式", "联系你", "联系",
"微信", "VX", "vx", "wx", "WX",
"电话", "手机号", "电话号码",
"怎么找你", "如何联系", "加你",
"私信", "私聊",
}
// 清理内容(去除空格、标点)
cleanContent := gstr.Trim(content)
cleanContent = gstr.ToLower(cleanContent)
for _, keyword := range contactKeywords {
if gstr.Contains(cleanContent, gstr.ToLower(keyword)) {
return true
}
}
return false
}
// handleCardRequest 处理用户请求联系方式(立即发卡片)
func (s *speechcraft) handleCardRequest(ctx context.Context, userId, platform string) (answer string, nextStage int, err error) {
// 更新用户状态为3发卡片状态
if err = redis.SetUserStage(ctx, userId, platform, 3); err != nil {
jaeger.RecordError(ctx, err, "更新用户状态为3失败")
return
}
// 返回卡片话术
answer = "请加一下卡片的联系方式,进行更专业的咨询" // TODO: 替换为实际卡片内容
nextStage = 3
glog.Infof(ctx, "用户请求联系方式,立即发送卡片 - 用户: %s", userId)
return
}
// BindToCustomerServices 绑定话术到客服账号
// 参数: ctx - 上下文req - 绑定请求包含话术ID和客服账号名列表
// 返回: res - 绑定结果成功和失败的账号列表err - 错误信息
// 功能: 将话术同步到指定客服账号的RAGFlow知识库批量处理并返回每个账号的结果
func (s *speechcraft) BindToCustomerServices(ctx context.Context, req *dto.BindSpeechcraftReq) (res *dto.BindSpeechcraftRes, err error) {
res = &dto.BindSpeechcraftRes{}
// 0. 参数验证
if len(req.AccountNames) == 0 {
return nil, gerror.New("客服账号ID列表不能为空")
}
// 1. 查询话术(验证存在性和获取租户信息)
r := g.RequestFromCtx(ctx)
if r != nil {
r.SetParam("accountName", req.AccountNames[0])
}
speechcraft, err := dao.Speechcraft.GetById(ctx, req.SpeechcraftId)
if err != nil {
return nil, gerror.Wrapf(err, "查询话术失败")
}
if speechcraft == nil {
return nil, gerror.New("话术不存在")
}
speechcraftTenantId := gconv.String(speechcraft.TenantId)
// 2. 遍历客服账号更新每个账号的speechcraft_ids
var newBindings []string
var alreadyBound []string
var notFound []string
for _, csId := range req.AccountNames {
// 查询客服账号
csAccount, err := dao.CustomerServiceAccount.FindByAccountName(ctx, csId)
if err != nil || csAccount == nil {
notFound = append(notFound, csId)
g.Log().Warningf(ctx, "客服账号 %s 不存在或已删除,跳过", csId)
continue
}
// 租户隔离校验
accountTenantId := gconv.String(csAccount.TenantId)
if speechcraftTenantId != accountTenantId {
g.Log().Warningf(ctx, "话术和客服账号不属于同一租户,跳过: speechcraft_tenant=%s, account_tenant=%s",
speechcraftTenantId, accountTenantId)
notFound = append(notFound, csId)
continue
}
// 检查是否已绑定
alreadyExists := false
for _, existingId := range csAccount.SpeechcraftIds {
if existingId == req.SpeechcraftId {
alreadyExists = true
break
}
}
if alreadyExists {
alreadyBound = append(alreadyBound, csId)
g.Log().Warningf(ctx, "客服账号 %s 已绑定该话术,跳过", csId)
continue
}
// 添加到speechcraft_ids列表
csAccount.SpeechcraftIds = append(csAccount.SpeechcraftIds, req.SpeechcraftId)
// 更新数据库
filter := bson.M{"_id": csAccount.Id, "isDeleted": false}
update := bson.M{"$set": bson.M{"speechcraftIds": csAccount.SpeechcraftIds}}
if _, err := mongo.DB().Update(ctx, filter, update, entity.CustomerServiceAccountCollection); err != nil {
g.Log().Errorf(ctx, "更新客服账号绑定失败: %s, error=%v", csId, err)
notFound = append(notFound, csId)
continue
}
newBindings = append(newBindings, csId)
}
// 3. 如果没有新的绑定,直接返回
if len(newBindings) == 0 {
res.SuccessCount = 0
res.AlreadyBound = alreadyBound
res.NotFound = notFound
if len(alreadyBound) > 0 && len(notFound) > 0 {
res.Message = "部分客服账号已绑定,部分不存在"
} else if len(alreadyBound) > 0 {
res.Message = "所有客服账号已绑定,无需重复绑定"
} else if len(notFound) > 0 {
res.Message = "所有客服账号都不存在或租户不匹配"
}
return res, nil
}
// 6. 同步到RAGFlow自动创建知识库
for _, csId := range newBindings {
// 获取客服账号信息以获取tenant_id
csAccount, err := dao.CustomerServiceAccount.FindByAccountName(ctx, csId)
if err != nil || csAccount == nil {
g.Log().Errorf(ctx, "获取客服账号信息失败: %s", csId)
continue
}
// 同步到RAGFlow会自动创建知识库
tenantId := gconv.String(csAccount.TenantId)
g.Log().Infof(ctx, "客服账号租户信息: cs_id=%s, tenant_id=%v, tenant_id_type=%T", csId, csAccount.TenantId, csAccount.TenantId)
_, err = s.SyncToRAGFlow(ctx, req.SpeechcraftId, csId, tenantId)
if err != nil {
g.Log().Errorf(ctx, "同步到RAGFlow失败: speechcraft_id=%s, cs_id=%s, error=%v", req.SpeechcraftId, csId, err)
// 不阻断绑定流程,失败会进入重试队列
}
}
res.SuccessCount = len(newBindings)
res.AlreadyBound = alreadyBound
res.NotFound = notFound
// 生成详细的响应消息
if len(alreadyBound) > 0 || len(notFound) > 0 {
res.Message = fmt.Sprintf("成功绑定%d个", len(newBindings))
if len(alreadyBound) > 0 {
res.Message += fmt.Sprintf("%d个已绑定", len(alreadyBound))
}
if len(notFound) > 0 {
res.Message += fmt.Sprintf("%d个不存在", len(notFound))
}
} else {
res.Message = fmt.Sprintf("成功绑定%d个客服账号", len(newBindings))
}
return
}
// UnbindFromCustomerService 从客服账号解绑话术
// 参数: ctx - 上下文req - 解绑请求包含话术ID和客服账号名
// 返回: res - 解绑结果信息err - 错误信息
// 功能: 从客服账号的RAGFlow知识库中删除话术文档
func (s *speechcraft) UnbindFromCustomerService(ctx context.Context, req *dto.UnbindSpeechcraftReq) (res *dto.UnbindSpeechcraftRes, err error) {
res = &dto.UnbindSpeechcraftRes{}
// 1. 验证话术存在
speechcraft, err := dao.Speechcraft.GetById(ctx, req.SpeechcraftId)
if err != nil {
return nil, gerror.Wrapf(err, "查询话术失败")
}
if speechcraft == nil {
return nil, gerror.New("话术不存在")
}
// 2. 查询客服账号
csAccount, err := dao.CustomerServiceAccount.FindByAccountName(ctx, req.AccountName)
if err != nil || csAccount == nil {
res.Success = false
res.Message = "客服账号不存在"
return res, nil
}
// 3. 从 speechcraft_ids 中移除话术ID
var newSpeechcraftIds []string
found := false
for _, scId := range csAccount.SpeechcraftIds {
if scId == req.SpeechcraftId {
found = true
continue
}
newSpeechcraftIds = append(newSpeechcraftIds, scId)
}
if !found {
res.Success = false
res.Message = "未找到该绑定关系"
return res, nil
}
// 4. 更新数据库
filter := bson.M{"_id": csAccount.Id, "isDeleted": false}
update := bson.M{"$set": bson.M{"speechcraftIds": newSpeechcraftIds}}
if _, err := mongo.DB().Update(ctx, filter, update, entity.CustomerServiceAccountCollection); err != nil {
return nil, gerror.Wrapf(err, "解绑失败")
}
res.Success = true
res.Message = "解绑成功"
return
}
// SyncToRAGFlow 同步话术到RAGFlow
// 参数: ctx - 上下文speechcraftId - 话术IDaccountName - 客服账号名tenantId - 租户ID
// 返回: documentId - RAGFlow文档IDerr - 错误信息
// 功能: 将话术上传到指定客服账号的RAGFlow知识库失败时自动加入重试队列
func (s *speechcraft) SyncToRAGFlow(ctx context.Context, speechcraftId, accountName, tenantId string) (documentId string, err error) {
// 1. 查询话术
speechcraft, err := dao.Speechcraft.GetById(ctx, speechcraftId)
if err != nil {
return "", gerror.Wrapf(err, "查询话术失败")
}
if speechcraft == nil {
return "", gerror.New("话术不存在")
}
// 2. 确保知识库存在获取真实的datasetId
datasetId, err := s.ensureDatasetExists(ctx, tenantId, "话术")
if err != nil {
return "", gerror.Wrapf(err, "确保知识库存在失败")
}
// 3. 调用RAGFlow上传文档
ragflowClient := ragflow.GetGlobalClient()
filename := fmt.Sprintf("%s_%s_%s.txt", speechcraft.Direction, speechcraft.Tag, accountName)
documentId, err = ragflowClient.UploadDocumentFromText(ctx, datasetId, speechcraft.Content, filename)
if err != nil {
jaeger.RecordError(ctx, err, "话术上传RAGFlow失败")
return "", gerror.Wrap(err, "话术上传RAGFlow失败")
}
// 3.1 上传成功后立即调用解析接口
g.Log().Infof(ctx, "文档上传成功,开始解析: document_id=%s", documentId)
if err = ragflowClient.ParseDocuments(ctx, datasetId, []string{documentId}); err != nil {
// 解析失败只记录日志,不影响绑定流程(文档已上传,可以手动重试解析)
g.Log().Errorf(ctx, "文档解析失败: document_id=%s, error=%v", documentId, err)
jaeger.RecordError(ctx, err, "文档解析失败")
} else {
g.Log().Infof(ctx, "文档解析请求已发送: document_id=%s", documentId)
}
// 4. 更新MongoDB的RagSyncRecord
now := gtime.Now().Format("Y-m-d H:i:s")
updated := false
for i := range speechcraft.RagSyncRecords {
record := &speechcraft.RagSyncRecords[i]
if record.AccountName == accountName {
record.RagDocumentId = documentId
record.RagSyncStatus = "synced"
record.SyncTime = now
record.RetryCount = 0
updated = true
break
}
}
// 如果没有找到记录,新增
if !updated {
speechcraft.RagSyncRecords = append(speechcraft.RagSyncRecords, entity.RagSyncRecord{
AccountName: accountName,
RagDocumentId: documentId,
RagSyncStatus: "synced",
SyncTime: now,
RetryCount: 0,
})
}
if err = dao.Speechcraft.UpdateEntity(ctx, speechcraft); err != nil {
return "", gerror.Wrapf(err, "更新话术同步状态失败")
}
// 注意不再更新Chat的datasetIds因为创建Chat时已经绑定了知识库
// 话术文档上传到知识库后Chat会自动使用该知识库的内容
glog.Infof(ctx, "话术同步成功: speechcraft_id=%s, account_name=%s, document_id=%s", speechcraftId, accountName, documentId)
return documentId, nil
}
// ensureDatasetExists 已废弃,改用公共方法 EnsureTenantDataset
// 保留此方法仅为兼容性,直接调用公共方法
func (s *speechcraft) ensureDatasetExists(ctx context.Context, tenantId, datasetType string) (datasetId string, err error) {
return EnsureTenantDataset(ctx, tenantId)
}