feat: 支持多租户多模型对话及文档去重优化

This commit is contained in:
2026-04-16 15:47:37 +08:00
parent 4ead3f82cf
commit 27b1dd3c27
34 changed files with 2188 additions and 315 deletions

View File

@@ -7,6 +7,7 @@ import (
"rag/common/eino"
"rag/consts/document"
"rag/consts/keyword"
"rag/consts/model"
"rag/consts/public"
"rag/consts/task"
"rag/dao"
@@ -22,10 +23,8 @@ import (
"github.com/bjang03/gmq/mq"
"github.com/bjang03/gmq/types"
"github.com/cloudwego/eino/schema"
"github.com/gogf/gf/v2/container/gvar"
"github.com/gogf/gf/v2/crypto/gmd5"
"github.com/gogf/gf/v2/database/gdb"
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/grpool"
"github.com/gogf/gf/v2/util/gconv"
@@ -37,7 +36,35 @@ type documentService struct{}
// Create 创建文件
func (s *documentService) Create(ctx context.Context, req *dto.CreateDocumentReq) (res *dto.CreateDocumentRes, err error) {
err = gfdb.DB(ctx).Transaction(ctx, func(ctx context.Context, tx gdb.TX) (err error) {
err = gfdb.DB(ctx, public.DbNameKnowledge).Transaction(ctx, func(ctx context.Context, tx gdb.TX) (err error) {
doc, err := dao.Document.Get(ctx, &dto.GetDocumentReq{
DatasetId: req.DatasetId,
Title: req.Title,
})
if err != nil {
return
}
if !g.IsEmpty(doc) && doc.Id > 0 {
_, err = dao.Keyword.Delete(ctx, &dto.DeleteKeywordReq{
DocumentId: doc.Id,
})
if err != nil {
return err
}
_, err = dao.DocumentVector.Delete(ctx, &dto.DeleteDocumentVectorReq{
DocumentId: doc.Id,
})
if err != nil {
return err
}
_, err = dao.Document.Delete(ctx, &dto.DeleteDocumentReq{
Id: doc.Id,
})
if err != nil {
return err
}
}
var id int64
id, err = dao.Document.Insert(ctx, req)
if err != nil {
@@ -74,11 +101,11 @@ func (s *documentService) Update(ctx context.Context, req *dto.UpdateDocumentReq
// Delete 删除文件
func (s *documentService) Delete(ctx context.Context, req *dto.DeleteDocumentReq) (err error) {
docs, err := dao.Document.GetByID(ctx, &dto.GetDocumentReq{Id: req.Id})
docs, err := dao.Document.Get(ctx, &dto.GetDocumentReq{Id: req.Id})
if err != nil {
return
}
err = gfdb.DB(ctx).Transaction(ctx, func(ctx context.Context, tx gdb.TX) (err error) {
err = gfdb.DB(ctx, public.DbNameKnowledge).Transaction(ctx, func(ctx context.Context, tx gdb.TX) (err error) {
datasetReq := &dto.UpdateDatasetReq{
Id: docs.DatasetId,
DocumentCount: -1,
@@ -92,6 +119,18 @@ func (s *documentService) Delete(ctx context.Context, req *dto.DeleteDocumentReq
return
}
if _, err = dao.Keyword.Delete(ctx, &dto.DeleteKeywordReq{
DocumentId: docs.Id,
}); err != nil {
return err
}
if _, err = dao.DocumentVector.Delete(ctx, &dto.DeleteDocumentVectorReq{
DocumentId: docs.Id,
}); err != nil {
return err
}
if _, err = dao.Task.DeleteByTaskId(ctx, &dto.DeleteTaskByTaskIdReq{
TaskId: docs.Id,
}); err != nil {
@@ -106,7 +145,7 @@ func (s *documentService) Delete(ctx context.Context, req *dto.DeleteDocumentReq
// Get 获取文件详情
func (s *documentService) Get(ctx context.Context, req *dto.GetDocumentReq) (res *dto.GetDocumentRes, err error) {
r, err := dao.Document.GetByID(ctx, req)
r, err := dao.Document.Get(ctx, req)
if err != nil {
return
}
@@ -136,7 +175,7 @@ func (s *documentService) List(ctx context.Context, req *dto.ListDocumentReq) (r
func (s *documentService) Vector(ctx context.Context, req *dto.DocumentVectorReq) (err error) {
// 1. 查询文件信息
documentReq := dto.GetDocumentReq{Id: req.Id}
doc, err := dao.Document.GetByID(ctx, &documentReq)
doc, err := dao.Document.Get(ctx, &documentReq)
if err != nil {
return err
}
@@ -172,16 +211,13 @@ func (s *documentService) Vector(ctx context.Context, req *dto.DocumentVectorReq
if err != nil {
return err
}
// ======================
// 核心grpool + g.Try 最佳实践
// ======================
// 使用带超时的background context避免HTTP请求完成后context被取消
taskCtx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
taskCtx = context.WithValue(taskCtx, "user", user)
// 任务1: SQL 切分文档
// 任务1: 语义 切分文档
grpool.Add(taskCtx, func(ctx context.Context) {
g.TryCatch(ctx, func(ctx context.Context) {
if innerErr := s.sqlSplitDocument(ctx, doc); innerErr != nil {
if innerErr := s.semanticSplitDocument(ctx, doc); innerErr != nil {
cancel()
}
}, func(ctx context.Context, err error) {
@@ -189,10 +225,10 @@ func (s *documentService) Vector(ctx context.Context, req *dto.DocumentVectorReq
})
})
// 任务2: ES 切分文档
// 任务2: 递归 切分文档
grpool.Add(taskCtx, func(ctx context.Context) {
g.TryCatch(ctx, func(ctx context.Context) {
if innerErr := s.esSplitDocument(ctx, doc); innerErr != nil {
if innerErr := s.recursiveSplitDocument(ctx, doc); innerErr != nil {
cancel()
}
}, func(ctx context.Context, err error) {
@@ -327,8 +363,8 @@ func (s *documentService) extractDocument(ctx context.Context, doc *entity.Docum
return
}
// sqlSplitDocument SQL切分支持取消
func (s *documentService) sqlSplitDocument(ctx context.Context, doc *entity.Document) (err error) {
// semanticSplitDocument 语义切分
func (s *documentService) semanticSplitDocument(ctx context.Context, doc *entity.Document) (err error) {
// ========== 取消检查 1方法入口 ==========
if ctx.Err() != nil {
err = Task.WriteTaskProgress(ctx, &dto.WriteTaskProgressReq{
@@ -354,7 +390,7 @@ func (s *documentService) sqlSplitDocument(ctx context.Context, doc *entity.Docu
}
// 2. 语义切分文件
docsSplit, err := eino.SemanticSplitDocument(ctx, docs)
docsSplit, err := eino.SemanticSplitDocument(ctx, docs, model.ModelConfigTypeVectorDashScope.Code()) //TODO 后续替换成本地模型
if err != nil {
// 写入任务进度失败 任务类型为sql存储
err = Task.WriteTaskProgress(ctx, &dto.WriteTaskProgressReq{
@@ -394,8 +430,8 @@ func (s *documentService) sqlSplitDocument(ctx context.Context, doc *entity.Docu
}
contentHash := gmd5.MustEncryptString(t.Content)
var success bool
success, err = s.checkRepeat(ctx, public.KnowledgeContentHashSqlKey, contentHash)
var isNew, needCopy bool
isNew, needCopy, err = s.checkRepeatWithDocId(ctx, public.KnowledgeContentHashSqlKey, contentHash, doc.Id)
if err != nil {
// 写入任务进度失败 任务类型为sql存储
err = Task.WriteTaskProgress(ctx, &dto.WriteTaskProgressReq{
@@ -406,7 +442,7 @@ func (s *documentService) sqlSplitDocument(ctx context.Context, doc *entity.Docu
})
return
}
if !success {
if !isNew && !needCopy {
continue
}
var metaData = make(map[string]any)
@@ -415,7 +451,13 @@ func (s *documentService) sqlSplitDocument(ctx context.Context, doc *entity.Docu
metaData[entity.DocumentCol.DatasetId] = doc.DatasetId
metaData[entity.DocumentVectorCol.DocumentId] = doc.Id
metaData[entity.DocumentVectorCol.ContentHash] = contentHash
metaData[entity.DocumentVectorCol.ChunkIndex] = gconv.Int64(i)
metaData[entity.DocumentVectorCol.ChunkIndex] = gconv.Int64(i + 1)
if isNew {
metaData["isNew"] = true
}
if needCopy {
metaData["isNew"] = false
}
t.MetaData = metaData
docsChunk = append(docsChunk, t)
}
@@ -468,8 +510,8 @@ func (s *documentService) sqlSplitDocument(ctx context.Context, doc *entity.Docu
return
}
// esSplitDocument ES切分支持取消
func (s *documentService) esSplitDocument(ctx context.Context, doc *entity.Document) (err error) {
// recursiveSplitDocument 递归切分
func (s *documentService) recursiveSplitDocument(ctx context.Context, doc *entity.Document) (err error) {
// ========== 取消检查 1方法入口 ==========
if ctx.Err() != nil {
err = Task.WriteTaskProgress(ctx, &dto.WriteTaskProgressReq{
@@ -535,8 +577,8 @@ func (s *documentService) esSplitDocument(ctx context.Context, doc *entity.Docum
}
contentHash := gmd5.MustEncryptString(t.Content)
var success bool
success, err = s.checkRepeat(ctx, public.KnowledgeContentHashEsKey, contentHash)
var isNew, needCopy bool
isNew, needCopy, err = s.checkRepeatWithDocId(ctx, public.KnowledgeContentHashEsKey, contentHash, doc.Id)
if err != nil {
// 写入任务进度失败 任务类型为es存储
err = Task.WriteTaskProgress(ctx, &dto.WriteTaskProgressReq{
@@ -547,7 +589,7 @@ func (s *documentService) esSplitDocument(ctx context.Context, doc *entity.Docum
})
return
}
if !success {
if !isNew && !needCopy {
continue
}
meiliDocs = append(meiliDocs, map[string]interface{}{
@@ -556,7 +598,7 @@ func (s *documentService) esSplitDocument(ctx context.Context, doc *entity.Docum
entity.DocumentVectorCol.DocumentId: doc.Id,
entity.DocumentVectorCol.Content: t.Content,
entity.DocumentVectorCol.ContentHash: contentHash,
entity.DocumentVectorCol.ChunkIndex: i,
entity.DocumentVectorCol.ChunkIndex: i + 1,
})
}
@@ -632,6 +674,7 @@ func (s *documentService) getHistoryData(ctx context.Context, doc *entity.Docume
// 3. Redis 无数据:根据 contentKey 类型选择查询方式
var dictData = make([]*dto.DocumentVectorRPC, 0)
if public.KnowledgeContentHashSqlKey == contentKey {
// SQL 方式:调用 HTTP 接口查询
dictData, err = s.getHistoryDataFromHttp(ctx, doc)
@@ -643,20 +686,16 @@ func (s *documentService) getHistoryData(ctx context.Context, doc *entity.Docume
return err
}
// 4. 把查询到的数据写入 Redis600s过期
for _, item := range dictData {
// 去除可能的 JSON 引号
contentHash := strings.Trim(item.ContentHash, `"`)
key := fmt.Sprintf(contentKey, contentHash)
_, err = g.Redis().Set(ctx, key, true, gredis.SetOption{
TTLOption: gredis.TTLOption{
EX: gconv.PtrInt64(600),
},
NX: true,
})
// SAdd把文档ID加入集合自动去重可存多个
_, err = g.Redis().SAdd(ctx, key, item.DocumentId)
if err != nil {
return err
}
// 设置过期时间
_, _ = g.Redis().Expire(ctx, key, 600)
}
return nil
@@ -672,8 +711,10 @@ func (s *documentService) getHistoryDataFromHttp(ctx context.Context, doc *entit
// 调用接口获取数据
res, _, err := dao.DocumentVector.List(ctx, &dto.ListDocumentVectorReq{
DatasetId: doc.DatasetId,
Status: gconv.PtrInt8(1),
})
if err != nil {
return
}
err = gconv.Struct(res, &dictData)
return
}
@@ -705,17 +746,39 @@ func (s *documentService) getHistoryDataFromMeilisearch(ctx context.Context, doc
return
}
// checkRepeat 检查是否重复
func (s *documentService) checkRepeat(ctx context.Context, contentKey, contentHash string) (success bool, err error) {
var val *gvar.Var
if val, err = g.Redis().Set(ctx, fmt.Sprintf(contentKey, contentHash), true, gredis.SetOption{
TTLOption: gredis.TTLOption{
EX: gconv.PtrInt64(600),
},
NX: true,
}); err != nil {
return
// checkRepeatWithDocId 正确版:检查当前文档是否已存在该分片
// 返回isNew(是否需要生成向量)、isCrossDoc(是否跨文档需拷贝)、err
func (s *documentService) checkRepeatWithDocId(ctx context.Context, contentKey string, contentHash string, currentDocId int64) (isNew bool, needCopy bool, err error) {
key := fmt.Sprintf(contentKey, contentHash)
// 1. 检查当前文档ID是否在集合中
exists, err := g.Redis().SIsMember(ctx, key, currentDocId)
if err != nil {
return false, false, err
}
success = val.Bool()
return
// 情况1当前文档已存在 → 完全跳过,不生成、不拷贝
if !g.IsEmpty(exists) {
return false, false, nil
}
// 2. 检查 key 是否存在(是否有任何文档拥有该分片)
keyExists, err := g.Redis().Exists(ctx, key)
if err != nil {
return false, false, err
}
// 情况2key 不存在 = 全新数据 → 需要生成向量
if g.IsEmpty(keyExists) {
// 把当前文档ID加入集合
_, err = g.Redis().SAdd(ctx, key, currentDocId)
_, _ = g.Redis().Expire(ctx, key, 600)
return true, false, err
}
// 情况3key 存在,但当前文档不在集合中 = 跨文档重复 → 不生成,需拷贝
// 把当前文档ID加入集合记录归属关系
_, err = g.Redis().SAdd(ctx, key, currentDocId)
_, _ = g.Redis().Expire(ctx, key, 600)
return false, true, err
}