Files
customer-server/service/product_service.go

892 lines
27 KiB
Go
Raw Normal View History

2026-03-14 10:02:49 +08:00
// Package service - 产品服务
// 功能产品的增删改查、ZIP导入/导出、绑定/解绑客服账号、同步到RAGFlow、重试消费者
package service
import (
"archive/zip"
"bytes"
"context"
"customer-server/dao"
"customer-server/model/dto"
"customer-server/model/entity"
"customer-server/util"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"strings"
"unicode/utf8"
"gitea.com/red-future/common/jaeger"
"gitea.com/red-future/common/rabbitmq"
"gitea.com/red-future/common/ragflow"
"gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/grpool"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
"go.mongodb.org/mongo-driver/v2/bson"
)
var (
Product = new(product)
productGrpool = grpool.New(50) // 文档解析协程池最大50并发
)
type product struct{}
// Add 添加产品
// 参数: ctx - 上下文req - 添加产品请求(包含产品名称、描述、价格等)
// 返回: res - 添加成功后的产品ID和RAGFlow文档IDerr - 错误信息
// 功能: 创建产品记录并自动上传到RAGFlow产品知识库独立于话术知识库
func (s *product) Add(ctx context.Context, req *dto.AddProductReq) (res *dto.AddProductRes, err error) {
// 校验产品名称长度
if utf8.RuneCountInString(req.Name) > 64 {
return nil, gerror.New("产品名称必须在64字以内")
}
// 校验产品详情长度
if utf8.RuneCountInString(req.Description) > 8192 {
return nil, gerror.New("产品详情必须在8192字以内")
}
// 去重检查:同一租户下名称唯一
existing, err := dao.Product.FindByName(ctx, req.Name)
if err != nil {
return nil, gerror.Wrap(err, "检查产品重复失败")
}
if existing != nil {
return nil, gerror.Newf("产品名称已存在name=%s, id=%s", req.Name, existing.Id.Hex())
}
data := &entity.Product{}
if err = utils.Struct(req, data); err != nil {
return
}
// 先从token获取租户信息在Insert之前
user, err := util.GetTenantInfo(ctx)
if err != nil {
return nil, gerror.Wrap(err, "获取租户信息失败")
}
tenantId := gconv.String(user.TenantId)
if tenantId == "" {
return nil, gerror.New("租户ID为空")
}
// 确保产品知识库存在(检查话术知识库是否存在,作为租户是否初始化的标志)
speechcraftDatasetId, err := dao.RAGFlowConfig.FindDatasetIdByTenant(ctx, tenantId)
if err != nil || speechcraftDatasetId == "" {
return nil, gerror.Newf("租户知识库不存在,请先创建客服账号: tenant_id=%s", tenantId)
}
// 设置基础字段
now := gtime.Now().Time
data.CreatedAt = &now // 取地址赋值给指针类型
data.UpdatedAt = &now // 取地址赋值给指针类型
data.IsDeleted = false
// 统一使用string类型存储tenantId到MongoDB
data.TenantId = tenantId
// 插入产品到MongoDB
_, err = dao.Product.Insert(ctx, data)
if err != nil {
return nil, err
}
// 确保租户知识库存在(产品和话术共享租户知识库)
// 使用dataset_service提供的统一方法自动处理创建、查找、保存等逻辑
datasetId, err := EnsureTenantDataset(ctx, tenantId)
if err != nil {
g.Log().Errorf(ctx, "确保租户知识库失败: %v", err)
return nil, gerror.Wrap(err, "获取租户知识库失败")
}
g.Log().Infof(ctx, "租户%s的知识库ID: %s", tenantId, datasetId)
// 同步上传到RAGFlow
ragflowClient := ragflow.GetGlobalClient()
g.Log().Infof(ctx, "准备上传产品到RAGFlow: product_id=%s, dataset_id=%s, name=%s", data.Id.Hex(), datasetId, data.Name)
filename := fmt.Sprintf("%s.txt", data.Name)
documentId, err := ragflowClient.UploadDocumentFromText(ctx, datasetId, data.Description, filename)
if err != nil {
// 回滚:删除刚插入的产品
dao.MongoDAO.Delete(ctx, bson.M{"_id": data.Id}, entity.ProductCollection)
g.Log().Errorf(ctx, "产品上传RAGFlow失败: product_id=%s, dataset_id=%s, error=%v", data.Id.Hex(), datasetId, err)
jaeger.RecordError(ctx, err, "产品上传RAGFlow失败")
return nil, gerror.Wrap(err, "文档上传到知识库失败")
}
// 异步触发解析grpool自动管理goroutine生命周期WithoutCancel保留追踪避免取消
productGrpool.Add(ctx, func(ctx context.Context) {
parseCtx := context.WithoutCancel(ctx)
if err := ragflowClient.ParseDocuments(parseCtx, datasetId, []string{documentId}); err != nil {
g.Log().Errorf(parseCtx, "文档解析失败: document_id=%s, error=%v", documentId, err)
} else {
g.Log().Infof(parseCtx, "文档解析成功: document_id=%s", documentId)
}
})
// 更新MongoDB的RagSyncRecords数组使用空accountName表示租户级文档
syncTime := gtime.Now().Format("Y-m-d H:i:s")
record := entity.RagSyncRecord{
AccountName: "", // 空表示租户级文档
RagDocumentId: documentId,
RagSyncStatus: "synced",
SyncTime: syncTime,
RetryCount: 0,
}
filter := bson.M{"_id": data.Id}
update := bson.M{
"$set": bson.M{
"ragSyncRecords": []entity.RagSyncRecord{record},
"ragLastSyncTime": syncTime,
"updatedAt": gtime.Now().Time,
},
}
if _, _, err = dao.MongoDAO.UpdateOne(ctx, filter, update, entity.ProductCollection); err != nil {
g.Log().Errorf(ctx, "更新产品RagSyncRecords失败: %v", err)
// 不回滚,文档已上传成功
}
g.Log().Infof(ctx, "产品添加成功并上传到知识库: product_id=%s, document_id=%s", data.Id.Hex(), documentId)
res = &dto.AddProductRes{Id: data.Id.Hex()}
return
}
// Update 更新产品
// 参数: ctx - 上下文req - 更新产品请求包含产品ID和待更新字段
// 返回: err - 错误信息
// 功能: 更新产品信息并同步到RAGFlow支持文档删除重建
func (s *product) Update(ctx context.Context, req *dto.UpdateProductReq) (err error) {
// 如果更新了产品名称,校验长度
if req.Name != "" && utf8.RuneCountInString(req.Name) > 64 {
return gerror.New("产品名称必须在64字以内")
}
// 如果更新了产品详情,校验长度
if req.Description != "" && utf8.RuneCountInString(req.Description) > 8192 {
return gerror.New("产品详情必须在8192字以内")
}
// 去重检查:如果修改名称,检查是否与其他产品重复
if req.Name != "" {
existing, err := dao.Product.FindByName(ctx, req.Name)
if err != nil {
return gerror.Wrap(err, "检查产品重复失败")
}
if existing != nil && existing.Id.Hex() != req.Id {
return gerror.Newf("产品名称已存在name=%s, id=%s", req.Name, existing.Id.Hex())
}
}
return dao.Product.Update(ctx, req)
}
// Delete 删除产品
// 参数: ctx - 上下文req - 删除产品请求包含产品ID
// 返回: err - 错误信息
// 功能: 逻辑删除产品记录并从RAGFlow移除对应文档
func (s *product) Delete(ctx context.Context, req *dto.DeleteProductReq) (err error) {
g.Log().Infof(ctx, "[Delete] 开始删除产品 - productId: %s", req.Id)
// 1. 查询产品获取RAGFlow同步记录使用原生查询避免租户过滤
objectId, err := bson.ObjectIDFromHex(req.Id)
if err != nil {
return gerror.Wrap(err, "无效的产品ID")
}
var product entity.Product
filter := bson.M{"_id": objectId, "isDeleted": false}
err = dao.MongoDAO.FindOne(ctx, filter, &product, entity.ProductCollection)
if err != nil {
if err.Error() == "mongo: no documents in result" {
return gerror.New("产品不存在")
}
return gerror.Wrap(err, "查询产品失败")
}
g.Log().Infof(ctx, "[Delete] 查询到产品 - name: %s, ragSyncRecords数量: %d", product.Name, len(product.RagSyncRecords))
// 2. 删除RAGFlow中的文档
if len(product.RagSyncRecords) > 0 {
ragflowClient := ragflow.GetGlobalClient()
if ragflowClient != nil {
tenantId := gconv.String(product.TenantId)
// 查询租户的dataset_id
datasetId, err := dao.RAGFlowConfig.FindDatasetIdByTenant(ctx, tenantId)
if err != nil {
g.Log().Warningf(ctx, "查询租户知识库ID失败: %v", err)
} else if datasetId != "" {
// 收集所有需要删除的document_id
var documentIds []string
for _, record := range product.RagSyncRecords {
if record.RagDocumentId != "" {
documentIds = append(documentIds, record.RagDocumentId)
}
}
// 批量删除RAGFlow文档
if len(documentIds) > 0 {
if err := ragflowClient.DeleteDocument(ctx, datasetId, documentIds); err != nil {
g.Log().Errorf(ctx, "删除RAGFlow文档失败: %v, document_ids: %v", err, documentIds)
// 不阻断删除流程,记录错误后继续
} else {
g.Log().Infof(ctx, "成功删除RAGFlow文档: count=%d", len(documentIds))
}
}
}
}
}
// 3. 软删除MongoDB记录
return dao.Product.Delete(ctx, req)
}
// List 获取产品列表
// 参数: ctx - 上下文req - 列表查询请求(支持分页、关键词搜索)
// 返回: res - 产品列表及分页信息err - 错误信息
// 功能: 分页查询产品记录,支持按名称、描述模糊搜索
func (s *product) List(ctx context.Context, req *dto.ListProductReq) (res *dto.ListProductRes, err error) {
list, total, err := dao.Product.List(ctx, req)
if err != nil {
return
}
res = &dto.ListProductRes{
List: list,
Total: int(total),
}
return
}
// Export 导出产品为ZIP文件
// 参数: ctx - 上下文req - 导出请求(包含筛选条件)
// 返回: zipData - ZIP文件字节数组filename - 文件名err - 错误信息
// 功能: 将产品数据导出为ZIP文件包含JSON格式的产品列表
func (s *product) Export(ctx context.Context, req *dto.ExportProductReq) (zipData []byte, filename string, err error) {
// 清理输入参数,防止非法 UTF-8 字符
cleanName := strings.ToValidUTF8(req.Name, "")
// 1. 查询所有符合条件的产品
products, err := dao.Product.FindAllForExport(ctx, cleanName)
if err != nil {
return nil, "", err
}
if len(products) == 0 {
return nil, "", gerror.New("没有可导出的产品")
}
// 清理所有产品数据,确保 UTF-8 有效(防止数据库中的脏数据)
for i := range products {
products[i].Name = strings.ToValidUTF8(products[i].Name, "")
products[i].Description = strings.ToValidUTF8(products[i].Description, "")
// RagDocumentId字段在RagSyncRecords中不在Product主体
}
// 2. 创建 ZIP 文件(内存中)
var buf bytes.Buffer
zipWriter := zip.NewWriter(&buf)
defer zipWriter.Close()
// 3. 为每个产品生成 TXT 文件并添加到 ZIP
for _, product := range products {
// 生成 TXT 内容(产品详情)
txtContent := s.generateTxt(product)
// 文件名就是产品名称(清理特殊字符)
cleanName := strings.ToValidUTF8(product.Name, "未命名")
safeFilename := s.sanitizeFilename(cleanName)
if safeFilename == "" {
safeFilename = "product"
}
txtFilename := safeFilename + ".txt"
// 添加文件到 ZIP
writer, err := zipWriter.Create(txtFilename)
if err != nil {
return nil, "", gerror.Newf("创建ZIP文件失败: %v", err)
}
if _, err := writer.Write([]byte(txtContent)); err != nil {
return nil, "", gerror.Newf("写入ZIP文件失败: %v", err)
}
}
// 5. 生成下载文件名
timestamp := gtime.Now().Format("Ymd_His")
filename = "products_export_" + timestamp + ".zip"
return buf.Bytes(), filename, nil
}
// generateTxt 将产品转换为 TXT 格式
// 新格式:文件名=产品名称,内容=产品详情
func (s *product) generateTxt(product *entity.Product) string {
// 清理产品详情,确保 UTF-8 有效
cleanDescription := strings.ToValidUTF8(product.Description, "")
// 直接返回产品详情
if cleanDescription != "" {
return cleanDescription
}
// 如果没有详情,返回空字符串
return ""
}
// sanitizeFilename 清理文件名中的特殊字符
func (s *product) sanitizeFilename(name string) string {
// 替换不安全的文件名字符
replacer := map[rune]rune{
'/': '_',
'\\': '_',
':': '_',
'*': '_',
'?': '_',
'"': '_',
'<': '_',
'>': '_',
'|': '_',
}
// 预分配容量,避免循环中动态扩容
result := make([]rune, 0, len(name))
for _, char := range name {
if newChar, exists := replacer[char]; exists {
result = append(result, newChar)
} else {
result = append(result, char)
}
}
filename := string(result)
// 限制文件名长度
if utf8.RuneCountInString(filename) > 50 {
runes := []rune(filename)
filename = string(runes[:50])
}
return filename
}
// Import 从ZIP文件导入产品
// 参数: ctx - 上下文file - 上传的ZIP文件
// 返回: res - 导入结果成功和失败数量err - 错误信息
// 功能: 从ZIP文件批量导入产品数据并同步到RAGFlow失败记录加入重试队列
func (s *product) Import(ctx context.Context, file *multipart.FileHeader) (res *dto.ImportProductRes, err error) {
res = &dto.ImportProductRes{
SuccessCount: 0,
FailCount: 0,
FailReasons: []string{},
}
// 1. 获取租户信息
user, err := util.GetTenantInfo(ctx)
if err != nil {
return nil, gerror.Wrap(err, "获取租户信息失败")
}
tenantId := gconv.String(user.TenantId)
if tenantId == "" {
return nil, gerror.New("租户ID为空")
}
// 2. 打开上传的文件
uploadedFile, err := file.Open()
if err != nil {
return nil, gerror.Newf("无法打开上传的文件: %v", err)
}
defer uploadedFile.Close()
// 3. 读取文件内容到内存
fileData, err := io.ReadAll(uploadedFile)
if err != nil {
return nil, gerror.Newf("读取文件失败: %v", err)
}
// 4. 解析 ZIP 文件
zipReader, err := zip.NewReader(bytes.NewReader(fileData), int64(len(fileData)))
if err != nil {
return nil, gerror.Newf("无法解析ZIP文件: %v", err)
}
// 4. 遍历 ZIP 中的所有文件
for _, zipFile := range zipReader.File {
// 只处理 .txt 文件
if !strings.HasSuffix(strings.ToLower(zipFile.Name), ".txt") {
continue
}
// 读取 TXT 文件内容(产品详情)
txtContent, err := s.readZipFile(zipFile)
if err != nil {
res.FailCount++
res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+" 读取失败: "+err.Error())
continue
}
// 从文件名提取产品名称(移除 .txt 后缀)
productName := strings.TrimSuffix(zipFile.Name, ".txt")
productName = strings.TrimSpace(productName)
// 创建产品数据
productData, err := s.parseSimpleTxt(productName, txtContent)
if err != nil {
res.FailCount++
res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+" 解析失败: "+err.Error())
continue
}
// 校验产品名称和详情长度
if utf8.RuneCountInString(productData.Name) > 64 {
res.FailCount++
res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+": 产品名称超过64字")
continue
}
if utf8.RuneCountInString(productData.Description) > 8192 {
res.FailCount++
res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+": 产品详情超过8192字")
continue
}
// 设置基础字段
now := gtime.Now().Time
productData.CreatedAt = &now // 取地址赋值给指针类型
productData.UpdatedAt = &now // 取地址赋值给指针类型
productData.IsDeleted = false
// 统一使用string类型存储tenantId到MongoDB
productData.TenantId = tenantId
// 插入数据库
_, err = dao.Product.Insert(ctx, productData)
if err != nil {
res.FailCount++
res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+" 数据库插入失败: "+err.Error())
continue
}
// 同步上传到RAGFlow产品知识库使用外层已声明的tenantId变量
if tenantId != "" {
datasetId := fmt.Sprintf("dataset_product_tenant_%s", tenantId)
ragflowClient := ragflow.GetGlobalClient()
if ragflowClient != nil {
filename := fmt.Sprintf("%s.txt", productData.Name)
documentId, uploadErr := ragflowClient.UploadDocumentFromText(ctx, datasetId, productData.Description, filename)
if uploadErr != nil {
// 上传失败回滚删除MongoDB记录
dao.MongoDAO.Delete(ctx, bson.M{"_id": productData.Id}, entity.ProductCollection)
res.FailCount++
res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+" 上传知识库失败: "+uploadErr.Error())
continue
}
// 更新ragDocumentId
filter := bson.M{"_id": productData.Id}
update := bson.M{"$set": bson.M{"ragDocumentId": documentId}}
dao.MongoDAO.UpdateOne(ctx, filter, update, entity.ProductCollection)
g.Log().Infof(ctx, "ZIP产品上传成功: name=%s, document_id=%s", productData.Name, documentId)
}
}
res.SuccessCount++
}
return res, nil
}
// readZipFile 读取 ZIP 文件中的单个文件内容
func (s *product) readZipFile(file *zip.File) (string, error) {
reader, err := file.Open()
if err != nil {
return "", err
}
defer reader.Close()
content, err := io.ReadAll(reader)
if err != nil {
return "", err
}
return string(content), nil
}
// parseSimpleTxt 解析简化格式的 TXT 文件
// 文件名=产品名称,内容=产品详情
func (s *product) parseSimpleTxt(productName string, description string) (*entity.Product, error) {
product := &entity.Product{}
// 清理并验证产品名称
product.Name = strings.TrimSpace(strings.ToValidUTF8(productName, ""))
if product.Name == "" {
return nil, gerror.New("产品名称不能为空")
}
// 清理产品详情
product.Description = strings.TrimSpace(strings.ToValidUTF8(description, ""))
return product, nil
}
// parseTxt 解析 TXT 文件内容为产品实体(旧格式兼容)
func (s *product) parseTxt(content string) (*entity.Product, error) {
product := &entity.Product{}
lines := strings.Split(content, "\n")
var inDescription bool
var descriptionLines []string
for _, line := range lines {
line = strings.TrimSpace(line)
// 跳过空行
if line == "" {
continue
}
// 解析标题
if strings.HasPrefix(line, "=== ") && strings.HasSuffix(line, " ===") {
product.Name = strings.TrimSpace(line[4 : len(line)-4])
continue
}
// 跳过基本信息标记
if line == "【基本信息】" {
inDescription = false
continue
}
// 产品详情开始
if line == "【产品详情】" {
inDescription = true
continue
}
// 解析基本信息字段
if !inDescription {
// 跳过系统生成的字段产品ID、创建时间等
if strings.HasPrefix(line, "产品ID:") ||
strings.HasPrefix(line, "创建时间:") ||
strings.HasPrefix(line, "更新时间:") ||
strings.HasPrefix(line, "导出时间:") {
continue
}
// 解析 RAGFlow 文档 ID
if strings.HasPrefix(line, "RAGFlow文档ID:") {
// RAGFlow文档ID存储在RagSyncRecords中
continue
}
}
// 收集产品详情
if inDescription {
// 跳过分隔线和导出时间
if strings.Contains(line, "==================") || strings.HasPrefix(line, "导出时间:") {
continue
}
if line != "暂无产品详情" {
descriptionLines = append(descriptionLines, line)
}
}
}
// 拼接产品详情
product.Description = strings.Join(descriptionLines, "\n")
// 校验必填字段
if product.Name == "" {
return nil, gerror.New("产品名称不能为空")
}
return product, nil
}
// BindToCustomerServices 绑定产品到多个客服账号
func (p *product) BindToCustomerServices(ctx context.Context, req *dto.BindProductReq) (res *dto.BindProductRes, err error) {
res = &dto.BindProductRes{}
// 1. 查询产品
product, err := dao.Product.GetById(ctx, req.ProductId)
if err != nil {
return nil, gerror.Wrapf(err, "查询产品失败")
}
if product == nil {
return nil, gerror.New("产品不存在")
}
// 2. 构建已存在的绑定map去重
existingMap := make(map[string]bool)
for _, csId := range product.AccountNames {
existingMap[csId] = true
}
// 3. 过滤并添加新绑定
var newBindings []string
var failedIds []string
for _, csId := range req.AccountNames {
// 检查去重customer_service_id是否已存在
if existingMap[csId] {
failedIds = append(failedIds, csId)
g.Log().Warningf(ctx, "客服账号 %s 已绑定该产品,跳过", csId)
continue
}
// 验证客服账号是否存在
csAccount, err := dao.CustomerServiceAccount.FindByAccountName(ctx, csId)
if err != nil || csAccount == nil {
failedIds = append(failedIds, csId)
g.Log().Warningf(ctx, "客服账号 %s 不存在或已删除,跳过", csId)
continue
}
newBindings = append(newBindings, csId)
}
// 4. 如果没有新的绑定,直接返回
if len(newBindings) == 0 {
res.SuccessCount = 0
res.FailedIds = failedIds
res.Message = "所有客服账号均已绑定或不存在"
return res, nil
}
// 5. 更新产品绑定
product.AccountNames = append(product.AccountNames, newBindings...)
if err = dao.Product.UpdateEntity(ctx, product); err != nil {
return nil, gerror.Wrapf(err, "更新产品绑定失败")
}
// 6. 同步到RAGFlow自动创建知识库
for _, csId := range newBindings {
// 获取客服账号信息以获取tenant_id
csAccount, err := dao.CustomerServiceAccount.FindByAccountName(ctx, csId)
if err != nil || csAccount == nil {
g.Log().Errorf(ctx, "获取客服账号信息失败: %s", csId)
continue
}
// 同步到RAGFlow会自动创建知识库
tenantId := gconv.String(csAccount.TenantId)
_, err = p.SyncToRAGFlow(ctx, req.ProductId, csId, tenantId)
if err != nil {
g.Log().Errorf(ctx, "同步到RAGFlow失败: product_id=%s, cs_id=%s, error=%v", req.ProductId, csId, err)
// 不阻断绑定流程,失败会进入重试队列
}
}
res.SuccessCount = len(newBindings)
res.FailedIds = failedIds
res.Message = "绑定成功"
return
}
// UnbindFromCustomerService 从客服账号解绑产品
func (p *product) UnbindFromCustomerService(ctx context.Context, req *dto.UnbindProductReq) (res *dto.UnbindProductRes, err error) {
res = &dto.UnbindProductRes{}
product, err := dao.Product.GetById(ctx, req.ProductId)
if err != nil {
return nil, gerror.Wrapf(err, "查询产品失败")
}
if product == nil {
return nil, gerror.New("产品不存在")
}
// 查找并移除绑定
var newBindings []string
found := false
for _, csId := range product.AccountNames {
if csId == req.AccountName {
found = true
continue
}
newBindings = append(newBindings, csId)
}
if !found {
res.Success = false
res.Message = "未找到该绑定关系"
return res, nil
}
product.AccountNames = newBindings
if err = dao.Product.UpdateEntity(ctx, product); err != nil {
return nil, gerror.Wrapf(err, "解绑失败")
}
res.Success = true
res.Message = "解绑成功"
return
}
// SyncToRAGFlow 同步产品到RAGFlow租户级知识库
func (p *product) SyncToRAGFlow(ctx context.Context, productId, accountName, tenantId string) (documentId string, err error) {
// 1. 查询产品
product, err := dao.Product.GetById(ctx, productId)
if err != nil {
return "", gerror.Wrapf(err, "查询产品失败")
}
if product == nil {
return "", gerror.New("产品不存在")
}
// 2. 获取租户的产品知识库ID
datasetId := fmt.Sprintf("dataset_product_tenant_%s", tenantId)
// 2.1 确保知识库存在,不存在则自动创建
if err := p.ensureDatasetExists(ctx, datasetId, tenantId, "产品"); err != nil {
return "", gerror.Wrapf(err, "确保知识库存在失败")
}
// 3. 调用RAGFlow上传文档
ragflowClient := ragflow.GetGlobalClient()
filename := fmt.Sprintf("%s_%s.txt", product.Name, accountName)
documentId, err = ragflowClient.UploadDocumentFromText(ctx, datasetId, product.Description, filename)
if err != nil {
jaeger.RecordError(ctx, err, "产品上传RAGFlow失败")
p.sendToRetryQueue(ctx, productId, accountName, tenantId, 0)
return "", err
}
// 4. 更新MongoDB的RagSyncRecord
now := gtime.Now().Format("Y-m-d H:i:s")
updated := false
for i := range product.RagSyncRecords {
record := &product.RagSyncRecords[i]
if record.AccountName == accountName {
record.RagDocumentId = documentId
record.RagSyncStatus = "synced"
record.SyncTime = now
record.RetryCount = 0
updated = true
break
}
}
if !updated {
product.RagSyncRecords = append(product.RagSyncRecords, entity.RagSyncRecord{
AccountName: accountName,
RagDocumentId: documentId,
RagSyncStatus: "synced",
SyncTime: now,
RetryCount: 0,
})
}
product.RagLastSyncTime = now
if err = dao.Product.UpdateEntity(ctx, product); err != nil {
return "", gerror.Wrapf(err, "更新产品同步状态失败")
}
glog.Infof(ctx, "产品同步成功: product_id=%s, account_name=%s, document_id=%s", productId, accountName, documentId)
return documentId, nil
}
// ensureDatasetExists 已废弃,改用公共方法 EnsureTenantDataset
// 保留此方法仅为兼容性,直接调用公共方法
func (p *product) ensureDatasetExists(ctx context.Context, datasetId, tenantId, datasetType string) error {
_, err := EnsureTenantDataset(ctx, tenantId)
return err
}
// sendToRetryQueue 发送到重试队列
func (p *product) sendToRetryQueue(ctx context.Context, productId, accountName, tenantId string, retryCount int) {
msg := dto.RAGFlowSyncRetryMsg{
Type: "product",
Id: productId,
AccountName: accountName,
TenantId: tenantId,
RetryCount: retryCount,
}
var delay int
switch retryCount {
case 0:
delay = 5 * 60
case 1:
delay = 15 * 60
case 2:
delay = 60 * 60
default:
glog.Warningf(ctx, "产品同步重试次数超限,标记为失败: %s", productId)
p.markSyncFailed(ctx, productId, accountName)
return
}
if err := rabbitmq.PublishWithDelay(ctx, "ragflow.sync.retry.product", msg, delay); err != nil {
jaeger.RecordError(ctx, err, "发送RAGFlow重试消息失败")
}
}
// markSyncFailed 标记同步失败
func (p *product) markSyncFailed(ctx context.Context, productId, accountName string) {
product, err := dao.Product.GetById(ctx, productId)
if err != nil {
return
}
for i := range product.RagSyncRecords {
record := &product.RagSyncRecords[i]
if record.AccountName == accountName {
record.RagSyncStatus = "failed"
record.SyncTime = gtime.Now().Format("Y-m-d H:i:s")
break
}
}
dao.Product.UpdateEntity(ctx, product)
}
// HandleRAGFlowSyncRetry RAGFlow同步重试消费者
func (p *product) HandleRAGFlowSyncRetry(ctx context.Context, msg dto.RAGFlowSyncRetryMsg) error {
glog.Infof(ctx, "处理RAGFlow同步重试: type=%s, id=%s, retry=%d", msg.Type, msg.Id, msg.RetryCount)
if msg.Type != "product" {
return nil
}
_, err := p.SyncToRAGFlow(ctx, msg.Id, msg.AccountName, msg.TenantId)
if err != nil {
p.sendToRetryQueue(ctx, msg.Id, msg.AccountName, msg.TenantId, msg.RetryCount+1)
return err
}
return nil
}
// ProductRetryConsumer 产品RAGFlow重试消费者
type ProductRetryConsumer struct {
queueName string
consumer *rabbitmq.Consumer
}
// NewProductRetryConsumer 创建产品RAGFlow重试消费者
func NewProductRetryConsumer(ctx context.Context) *ProductRetryConsumer {
return &ProductRetryConsumer{
queueName: "ragflow.sync.retry.product",
}
}
// Start 启动消费者
func (c *ProductRetryConsumer) Start(ctx context.Context) error {
c.consumer = rabbitmq.NewConsumer(c.queueName, func(ctx context.Context, body []byte) error {
var msg dto.RAGFlowSyncRetryMsg
if err := json.Unmarshal(body, &msg); err != nil {
return err
}
return Product.HandleRAGFlowSyncRetry(ctx, msg)
})
return c.consumer.Start(ctx)
}
// Stop 停止消费者
func (c *ProductRetryConsumer) Stop(ctx context.Context) {
if c.consumer != nil {
c.consumer.Stop(ctx)
}
}