177 lines
5.2 KiB
Go
177 lines
5.2 KiB
Go
|
|
package service
|
|||
|
|
|
|||
|
|
import (
|
|||
|
|
"context"
|
|||
|
|
"database/sql"
|
|||
|
|
"errors"
|
|||
|
|
"fmt"
|
|||
|
|
"rag/consts/document"
|
|||
|
|
"rag/consts/public"
|
|||
|
|
"rag/dao"
|
|||
|
|
"rag/model/dto"
|
|||
|
|
"rag/model/entity"
|
|||
|
|
|
|||
|
|
"gitea.com/red-future/common/beans"
|
|||
|
|
"gitea.com/red-future/common/rag/eino"
|
|||
|
|
gmq "github.com/bjang03/gmq/core/gmq"
|
|||
|
|
"github.com/bjang03/gmq/mq"
|
|||
|
|
"github.com/bjang03/gmq/types"
|
|||
|
|
"github.com/gogf/gf/v2/frame/g"
|
|||
|
|
"github.com/gogf/gf/v2/util/gconv"
|
|||
|
|
"github.com/pgvector/pgvector-go"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
var DocumentChunk = new(documentChunkService)
|
|||
|
|
|
|||
|
|
type documentChunkService struct{}
|
|||
|
|
|
|||
|
|
const (
|
|||
|
|
DatasetIndexStatusReady = "ready"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// Update 更新文件块
|
|||
|
|
func (s *documentChunkService) Update(ctx context.Context, req *dto.UpdateDocumentChunkReq) (err error) {
|
|||
|
|
_, err = dao.DocumentChunk.Update(ctx, req)
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// List 获取文件块列表
|
|||
|
|
func (s *documentChunkService) List(ctx context.Context, req *dto.ListDocumentChunkReq) (res *dto.ListDocumentChunkRes, err error) {
|
|||
|
|
list, total, err := dao.DocumentChunk.List(ctx, req)
|
|||
|
|
if err != nil {
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
res = &dto.ListDocumentChunkRes{
|
|||
|
|
Total: total,
|
|||
|
|
}
|
|||
|
|
err = gconv.Struct(list, &res.List)
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (s *documentChunkService) DocsChunkMsg(ctx context.Context, msg any) (err error) {
|
|||
|
|
var req = make([]*dto.VectorDocumentChunkMsg, 0)
|
|||
|
|
msgMap := gconv.Map(msg)
|
|||
|
|
if err = gconv.Structs(msgMap["data"], &req); err != nil {
|
|||
|
|
g.Log().Error(ctx, "DocsChunkMsg err:", err)
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
if len(req) == 0 {
|
|||
|
|
g.Log().Error(ctx, "DocsChunkMsg err:", "msg is empty")
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
ctx = context.WithValue(ctx, "user", &beans.User{
|
|||
|
|
TenantId: req[0].TenantId,
|
|||
|
|
UserName: req[0].Creator,
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
// 调用eino接口获取向量
|
|||
|
|
var vectorDocsStr = make([]string, 0, len(req))
|
|||
|
|
for _, t := range req {
|
|||
|
|
vectorDocsStr = append(vectorDocsStr, t.Content)
|
|||
|
|
}
|
|||
|
|
embeddings, err := eino.EmbedStrings(ctx, vectorDocsStr)
|
|||
|
|
if err != nil {
|
|||
|
|
g.Log().Error(ctx, "DocsChunkMsg err:", err)
|
|||
|
|
err = s.publishKnowledgeDocumentMsg(ctx, req[0].TenantId, req[0].Creator, req[0].DocumentId, document.VectorStatusFailed.Code())
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 获取向量维度
|
|||
|
|
dimension := 0
|
|||
|
|
if len(embeddings) > 0 {
|
|||
|
|
dimension = len(embeddings[0])
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 创建或更新DatasetIndex
|
|||
|
|
err = s.createOrUpdateDatasetIndex(ctx, req[0].DatasetId, dimension, int64(len(req)))
|
|||
|
|
if err != nil {
|
|||
|
|
g.Log().Error(ctx, "CreateOrUpdateDatasetIndex err:", err)
|
|||
|
|
err = s.publishKnowledgeDocumentMsg(ctx, req[0].TenantId, req[0].Creator, req[0].DocumentId, document.VectorStatusFailed.Code())
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 更新向量文档
|
|||
|
|
for i, embedding := range embeddings {
|
|||
|
|
req[i].Vector = pgvector.NewVector(gconv.Float32s(embedding))
|
|||
|
|
req[i].VectorStatus = document.VectorStatusCompleted.Code()
|
|||
|
|
req[i].Status = document.StatusEnable.Code()
|
|||
|
|
}
|
|||
|
|
_, err = dao.DocumentChunk.BatchInsert(ctx, req)
|
|||
|
|
if err != nil {
|
|||
|
|
g.Log().Error(ctx, "DocsChunkMsg err:", err)
|
|||
|
|
err = s.publishKnowledgeDocumentMsg(ctx, req[0].TenantId, req[0].Creator, req[0].DocumentId, document.VectorStatusFailed.Code())
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
err = s.publishKnowledgeDocumentMsg(ctx, req[0].TenantId, req[0].Creator, req[0].DocumentId, document.VectorStatusCompleted.Code())
|
|||
|
|
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// createOrUpdateDatasetIndex 创建或更新数据集索引
|
|||
|
|
func (s *documentChunkService) createOrUpdateDatasetIndex(ctx context.Context, datasetId int64, dimension int, vectorCount int64) (err error) {
|
|||
|
|
// 查询数据集是否已有索引
|
|||
|
|
existIndex, err := dao.DatasetIndex.GetByDatasetId(ctx, datasetId)
|
|||
|
|
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 已有索引 → 只更新数量
|
|||
|
|
if existIndex != nil {
|
|||
|
|
_ = dao.DatasetIndex.IncVectorCount(ctx, existIndex.Id, vectorCount)
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ====================== 创建新索引 ======================
|
|||
|
|
indexName := fmt.Sprintf("idx_dataset_%d_vector", datasetId) // 真实PG索引名
|
|||
|
|
// 1. 插入索引配置
|
|||
|
|
index := &entity.DatasetIndex{
|
|||
|
|
DatasetId: datasetId,
|
|||
|
|
Name: indexName,
|
|||
|
|
Dimension: dimension,
|
|||
|
|
FieldType: "float",
|
|||
|
|
MetricType: "COSINE",
|
|||
|
|
Status: gconv.PtrInt8(1),
|
|||
|
|
VectorCount: vectorCount,
|
|||
|
|
Description: fmt.Sprintf("数据集%d向量索引", datasetId),
|
|||
|
|
}
|
|||
|
|
_, err = dao.DatasetIndex.Insert(ctx, index)
|
|||
|
|
if err != nil {
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 2. 真正创建 PGVector 索引(唯一真实索引!)
|
|||
|
|
err = s.createRealPGVectorIndex(ctx, indexName)
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// createRealPGVectorIndex 真正在PostgreSQL创建向量索引(真实可用)
|
|||
|
|
func (s *documentChunkService) createRealPGVectorIndex(ctx context.Context, indexName string) error {
|
|||
|
|
// 执行真实建索引语句
|
|||
|
|
err := dao.DatasetIndex.InsertIndex(ctx, indexName)
|
|||
|
|
if err != nil {
|
|||
|
|
g.Log().Error(ctx, "创建向量索引失败:", err)
|
|||
|
|
return err
|
|||
|
|
}
|
|||
|
|
g.Log().Info(ctx, "PGVector真实索引创建成功:"+indexName)
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// publishKnowledgeDocumentMsg 发布消息
|
|||
|
|
func (s *documentChunkService) publishKnowledgeDocumentMsg(ctx context.Context, tenantId uint64, creator string, documentId int64, vectorStatus document.VectorStatus) (err error) {
|
|||
|
|
knowledgeDocumentMsg := dto.KnowledgeDocumentMsg{
|
|||
|
|
TenantId: tenantId,
|
|||
|
|
Creator: creator,
|
|||
|
|
Id: documentId,
|
|||
|
|
VectorStatus: vectorStatus,
|
|||
|
|
}
|
|||
|
|
err = gmq.GetGmq("primary").GmqPublish(ctx, &mq.RedisPubMessage{
|
|||
|
|
PubMessage: types.PubMessage{
|
|||
|
|
Topic: public.KnowledgeDocumentVectorStatusTopic,
|
|||
|
|
Data: knowledgeDocumentMsg,
|
|||
|
|
},
|
|||
|
|
})
|
|||
|
|
return
|
|||
|
|
}
|