feat: 支持多数据库配置与PGVector检索

This commit is contained in:
2026-04-03 17:59:05 +08:00
parent 86c2b7d66e
commit 026beea4d9
12 changed files with 304 additions and 182 deletions

View File

@@ -44,13 +44,13 @@ func (i *PGVectorIndexer) Store(ctx context.Context, docs []*schema.Document, op
// 回调 // 回调
ctx = callbacks.OnStart(ctx, &indexer.CallbackInput{Docs: docs}) ctx = callbacks.OnStart(ctx, &indexer.CallbackInput{Docs: docs})
ids, err := i.bulkStore(ctx, docs, commonOpts) rows, err = i.bulkStore(ctx, docs, commonOpts)
if err != nil { if err != nil {
callbacks.OnError(ctx, err) callbacks.OnError(ctx, err)
return return
} }
callbacks.OnEnd(ctx, &indexer.CallbackOutput{IDs: gconv.Strings(ids)}) callbacks.OnEnd(ctx, &indexer.CallbackOutput{IDs: gconv.Strings(rows)})
return return
} }

117
common/eino/retriever.go Normal file
View File

@@ -0,0 +1,117 @@
package eino
import (
"context"
"errors"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/components/embedding"
"github.com/cloudwego/eino/components/retriever"
"github.com/cloudwego/eino/schema"
"github.com/gogf/gf/v2/util/gconv"
"github.com/pgvector/pgvector-go"
)
type PGVectorRetrieverConfig struct {
Embedder embedding.Embedder
DefaultTopK int
DefaultIndex string
}
type PGVectorRetriever struct {
embedder embedding.Embedder
topK int
index string
}
func NewPGVectorRetriever(config *PGVectorRetrieverConfig) (*PGVectorRetriever, error) {
if config.Embedder == nil {
return nil, errors.New("embedder is required")
}
if config.DefaultTopK <= 0 {
config.DefaultTopK = 5
}
return &PGVectorRetriever{
embedder: config.Embedder,
topK: config.DefaultTopK,
index: config.DefaultIndex,
}, nil
}
func (r *PGVectorRetriever) Retrieve(ctx context.Context, query string, opts ...retriever.Option) ([]*schema.Document, error) {
// 1. 处理公共 Option官方标准写法
options := &retriever.Options{
Index: &r.index,
TopK: &r.topK,
Embedding: r.embedder,
}
options = retriever.GetCommonOptions(options, opts...)
// 2. 回调(官方标准)
ctx = callbacks.OnStart(ctx, &retriever.CallbackInput{
Query: query,
TopK: *options.TopK,
})
// 3. 执行检索
docs, err := r.doRetrieve(ctx, query, options)
if err != nil {
callbacks.OnError(ctx, err)
return nil, err
}
// 4. 完成回调
callbacks.OnEnd(ctx, &retriever.CallbackOutput{
Docs: docs,
})
return docs, nil
}
func (r *PGVectorRetriever) doRetrieve(ctx context.Context, query string, opts *retriever.Options) ([]*schema.Document, error) {
// 1. 生成向量
vectors, err := opts.Embedding.EmbedStrings(ctx, []string{query})
if err != nil {
return nil, err
}
if len(vectors) == 0 {
return nil, errors.New("empty query vector")
}
queryVec := pgvector.NewVector(vectors[0])
topK := *opts.TopK
// 2. PG 向量相似度检索 SQL
sql := `
SELECT id, content, dataset_id, document_id,
vector <-> ? AS distance
FROM document_chunk
ORDER BY distance ASC
LIMIT ?
`
// 3. 查询
rows, err := dao.DocumentChunk.GetDB().GetAll(ctx, sql, queryVec, topK)
if err != nil {
return nil, err
}
// 4. 转为 Eino Document
docs := make([]*schema.Document, 0, len(rows))
for _, row := range rows {
docs = append(docs, &schema.Document{
ID: gconv.String(row["id"]),
Content: gconv.String(row["content"]),
Metadata: map[string]any{
"dataset_id": row["dataset_id"],
"document_id": row["document_id"],
"distance": row["distance"],
},
})
}
return docs, nil
}

View File

@@ -12,8 +12,9 @@ database:
user: "postgres" user: "postgres"
pass: "Bjang09@686^*^" pass: "Bjang09@686^*^"
name: "rag" name: "rag"
prefix: "rag_knowledge_" # (可选)表名前缀
role: "master" # (可选)数据库主从角色(master/slave)默认为master。如果不使用应用主从机制请不配置或留空即可。 role: "master" # (可选)数据库主从角色(master/slave)默认为master。如果不使用应用主从机制请不配置或留空即可。
debug: false # (可选)开启调试模式 debug: true # (可选)开启调试模式
dryRun: false # (可选)ORM空跑(只读不写) dryRun: false # (可选)ORM空跑(只读不写)
charset: "utf8" # (可选)数据库编码(如: utf8mb4/utf8/gbk/gb2312)一般设置为utf8mb4。默认为utf8。 charset: "utf8" # (可选)数据库编码(如: utf8mb4/utf8/gbk/gb2312)一般设置为utf8mb4。默认为utf8。
timezone: "Asia/Shanghai" # (可选)时区配置,例如:Local timezone: "Asia/Shanghai" # (可选)时区配置,例如:Local
@@ -30,7 +31,8 @@ database:
port: "15432" port: "15432"
user: "postgres" user: "postgres"
pass: "Bjang09@686^*^" pass: "Bjang09@686^*^"
name: "rag" name: "tenant-1"
prefix: "rag_knowledge_" # (可选)表名前缀
role: "slave" # (可选)数据库主从角色(master/slave)默认为master。如果不使用应用主从机制请不配置或留空即可。 role: "slave" # (可选)数据库主从角色(master/slave)默认为master。如果不使用应用主从机制请不配置或留空即可。
debug: false # (可选)开启调试模式 debug: false # (可选)开启调试模式
dryRun: false # (可选)ORM空跑(只读不写) dryRun: false # (可选)ORM空跑(只读不写)
@@ -44,15 +46,36 @@ database:
updatedAt: "updated_at" # (可选)自动更新时间字段名称 updatedAt: "updated_at" # (可选)自动更新时间字段名称
deletedAt: "deleted_at" # (可选)软删除时间字段名称 deletedAt: "deleted_at" # (可选)软删除时间字段名称
timeMaintainDisabled: false # (可选)是否完全关闭时间更新特性为true时CreatedAt/UpdatedAt/DeletedAt都将失效 timeMaintainDisabled: false # (可选)是否完全关闭时间更新特性为true时CreatedAt/UpdatedAt/DeletedAt都将失效
tenant-1: rag_knowledge:
- type: "pgsql" - type: "pgsql"
host: "localhost" host: "localhost"
port: "5432" port: "5432"
user: "postgres" user: "postgres"
pass: "123456" pass: "123456"
name: "tenant" name: "tenant-1"
prefix: "rag_knowledge_" # (可选)表名前缀
role: "master"
debug: true # (可选)开启调试模式
dryRun: false # (可选)ORM空跑(只读不写)
charset: "utf8" # (可选)数据库编码(如: utf8mb4/utf8/gbk/gb2312)一般设置为utf8mb4。默认为utf8。
timezone: "Asia/Shanghai" # (可选)时区配置,例如:Local
maxIdle: 5 # (可选)连接池最大闲置的连接数(默认10)
maxOpen: 20 # (可选)连接池最大打开的连接数(默认无限制)
maxLifetime: "30s" # (可选)连接对象可重复使用的时间长度(默认30秒)
maxIdleConnTime: "30s" # (可选v2.10新增)连接池中空闲连接的最大生存时间(默认30秒)。可以通过配置文件或SetConnMaxIdleTime方法设置避免长时间空闲连接占用资源。
createdAt: "created_at" # (可选)自动创建时间字段名称
updatedAt: "updated_at" # (可选)自动更新时间字段名称
deletedAt: "deleted_at" # (可选)软删除时间字段名称
timeMaintainDisabled: false # (可选)是否完全关闭时间更新特性为true时CreatedAt/UpdatedAt/DeletedAt都将失效
rag_vector:
- type: "pgsql"
host: "localhost"
port: "5432"
user: "postgres"
pass: "123456"
name: "tenant-1"
prefix: "rag_vector_" # (可选)表名前缀
role: "master" role: "master"
prefix: "rag_" # (可选)表名前缀
debug: true # (可选)开启调试模式 debug: true # (可选)开启调试模式
dryRun: false # (可选)ORM空跑(只读不写) dryRun: false # (可选)ORM空跑(只读不写)
charset: "utf8" # (可选)数据库编码(如: utf8mb4/utf8/gbk/gb2312)一般设置为utf8mb4。默认为utf8。 charset: "utf8" # (可选)数据库编码(如: utf8mb4/utf8/gbk/gb2312)一般设置为utf8mb4。默认为utf8。

View File

@@ -1,5 +1,10 @@
package public package public
const (
DbNameKnowledge = "rag_knowledge"
DbNameVector = "rag_vector"
)
// sql 数据库表名 // sql 数据库表名
const ( const (
TableNameDocument = "document" TableNameDocument = "document"

View File

@@ -22,7 +22,7 @@ func (d *datasetDao) Insert(ctx context.Context, req *dto.CreateDatasetReq) (id
if err = gconv.Struct(req, &res); err != nil { if err = gconv.Struct(req, &res); err != nil {
return return
} }
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDataset).Data(&res).Insert() r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDataset).Data(&res).Insert()
if err != nil { if err != nil {
return return
} }
@@ -31,7 +31,7 @@ func (d *datasetDao) Insert(ctx context.Context, req *dto.CreateDatasetReq) (id
// Update 更新数据集 // Update 更新数据集
func (d *datasetDao) Update(ctx context.Context, req *dto.UpdateDatasetReq) (rows int64, err error) { func (d *datasetDao) Update(ctx context.Context, req *dto.UpdateDatasetReq) (rows int64, err error) {
model := gfdb.DB(ctx).Model(ctx, public.TableNameDataset).OmitEmpty() model := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDataset).OmitEmpty()
if !g.IsEmpty(req.DocumentCount) { if !g.IsEmpty(req.DocumentCount) {
model.Data(entity.DatasetCol.DocumentCount, &gdb.Counter{ model.Data(entity.DatasetCol.DocumentCount, &gdb.Counter{
Field: entity.DatasetCol.DocumentCount, Field: entity.DatasetCol.DocumentCount,
@@ -53,7 +53,7 @@ func (d *datasetDao) Update(ctx context.Context, req *dto.UpdateDatasetReq) (row
// Delete 删除数据集 // Delete 删除数据集
func (d *datasetDao) Delete(ctx context.Context, req *dto.DeleteDatasetReq) (rows int64, err error) { func (d *datasetDao) Delete(ctx context.Context, req *dto.DeleteDatasetReq) (rows int64, err error) {
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDataset).Where(entity.DatasetCol.Id, req.Id).Delete() r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDataset).Where(entity.DatasetCol.Id, req.Id).Delete()
if err != nil { if err != nil {
return return
} }
@@ -61,7 +61,7 @@ func (d *datasetDao) Delete(ctx context.Context, req *dto.DeleteDatasetReq) (row
} }
func (d *datasetDao) GetByID(ctx context.Context, req *dto.GetDatasetReq, fields ...string) (res *entity.Dataset, err error) { func (d *datasetDao) GetByID(ctx context.Context, req *dto.GetDatasetReq, fields ...string) (res *entity.Dataset, err error) {
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDataset).Where(entity.DatasetCol.Id, req.Id).Fields(fields).One() r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDataset).Where(entity.DatasetCol.Id, req.Id).Fields(fields).One()
if err != nil { if err != nil {
return return
} }
@@ -71,7 +71,7 @@ func (d *datasetDao) GetByID(ctx context.Context, req *dto.GetDatasetReq, fields
// List 获取数据集列表 // List 获取数据集列表
func (d *datasetDao) List(ctx context.Context, req *dto.ListDatasetReq, fields ...string) (res []*entity.Dataset, total int, err error) { func (d *datasetDao) List(ctx context.Context, req *dto.ListDatasetReq, fields ...string) (res []*entity.Dataset, total int, err error) {
model := gfdb.DB(ctx).Model(ctx, public.TableNameDataset).Fields(fields).OmitEmpty() model := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDataset).Fields(fields).OmitEmpty()
if !g.IsEmpty(req.Keyword) { if !g.IsEmpty(req.Keyword) {
model.WhereLike(entity.DatasetCol.Name, "%"+req.Keyword+"%") model.WhereLike(entity.DatasetCol.Name, "%"+req.Keyword+"%")
} }

View File

@@ -16,7 +16,7 @@ type datasetIndexDao struct{}
// Insert 插入数据集索引 // Insert 插入数据集索引
func (d *datasetIndexDao) Insert(ctx context.Context, index *entity.DatasetIndex) (id int64, err error) { func (d *datasetIndexDao) Insert(ctx context.Context, index *entity.DatasetIndex) (id int64, err error) {
_, err = gfdb.DB(ctx).Model(ctx, public.TableNameDatasetIndex).Data(index).Insert() _, err = gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDatasetIndex).Data(index).Insert()
if err != nil { if err != nil {
return return
} }
@@ -25,7 +25,7 @@ func (d *datasetIndexDao) Insert(ctx context.Context, index *entity.DatasetIndex
// GetByDatasetId 根据数据集ID获取索引 // GetByDatasetId 根据数据集ID获取索引
func (d *datasetIndexDao) GetByDatasetId(ctx context.Context, datasetId int64) (result *entity.DatasetIndex, err error) { func (d *datasetIndexDao) GetByDatasetId(ctx context.Context, datasetId int64) (result *entity.DatasetIndex, err error) {
err = gfdb.DB(ctx).Model(ctx, public.TableNameDatasetIndex).Where(entity.DatasetIndexCol.DatasetId, datasetId).Scan(&result) err = gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDatasetIndex).Where(entity.DatasetIndexCol.DatasetId, datasetId).Scan(&result)
if err != nil { if err != nil {
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
return nil, nil return nil, nil
@@ -37,23 +37,20 @@ func (d *datasetIndexDao) GetByDatasetId(ctx context.Context, datasetId int64) (
// IncVectorCount 增加或减少向量数量 // IncVectorCount 增加或减少向量数量
func (d *datasetIndexDao) IncVectorCount(ctx context.Context, id int64, delta int64) (err error) { func (d *datasetIndexDao) IncVectorCount(ctx context.Context, id int64, delta int64) (err error) {
_, err = gfdb.DB(ctx).Model(ctx, public.TableNameDatasetIndex). _, err = gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDatasetIndex).
Where(entity.DatasetIndexCol.Id, id). Where(entity.DatasetIndexCol.Id, id).
Increment(entity.DatasetIndexCol.VectorCount, delta) Increment(entity.DatasetIndexCol.VectorCount, delta)
return return
} }
func (d *datasetIndexDao) InsertIndex(ctx context.Context, indexName string) (err error) { func (d *datasetIndexDao) InsertIndex(ctx context.Context, indexName string) (err error) {
prefix, err := gfdb.GetTablePrefix(ctx) db := gfdb.DB(ctx, public.DbNameVector)
if err != nil {
return
}
sqlStr := fmt.Sprintf(` sqlStr := fmt.Sprintf(`
CREATE INDEX IF NOT EXISTS %s CREATE INDEX IF NOT EXISTS %s
ON %s ON %s
USING ivfflat (vector vector_cosine_ops) USING ivfflat (vector vector_cosine_ops)
WHERE vector IS NOT NULL; WHERE vector IS NOT NULL;
`, indexName, prefix+public.TableNameDocumentChunk) `, indexName, gfdb.TablePrefix+public.TableNameDocumentChunk)
_, err = gfdb.DB(ctx).Exec(ctx, sqlStr) _, err = db.Exec(ctx, sqlStr)
return return
} }

View File

@@ -22,7 +22,7 @@ func (d *documentDao) Insert(ctx context.Context, req *dto.CreateDocumentReq) (i
if err = gconv.Struct(req, &res); err != nil { if err = gconv.Struct(req, &res); err != nil {
return return
} }
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDocument).Data(&res).Insert() r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDocument).Data(&res).Insert()
if err != nil { if err != nil {
return return
} }
@@ -31,7 +31,7 @@ func (d *documentDao) Insert(ctx context.Context, req *dto.CreateDocumentReq) (i
// Update 更新文件 // Update 更新文件
func (d *documentDao) Update(ctx context.Context, req *dto.UpdateDocumentReq) (rows int64, err error) { func (d *documentDao) Update(ctx context.Context, req *dto.UpdateDocumentReq) (rows int64, err error) {
model := gfdb.DB(ctx).Model(ctx, public.TableNameDocument).OmitEmpty() model := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDocument).OmitEmpty()
if !g.IsEmpty(req.ChunkCount) { if !g.IsEmpty(req.ChunkCount) {
model.Data(entity.DocumentCol.ChunkCount, &gdb.Counter{ model.Data(entity.DocumentCol.ChunkCount, &gdb.Counter{
Field: entity.DocumentCol.ChunkCount, Field: entity.DocumentCol.ChunkCount,
@@ -48,7 +48,7 @@ func (d *documentDao) Update(ctx context.Context, req *dto.UpdateDocumentReq) (r
// Delete 删除文件 // Delete 删除文件
func (d *documentDao) Delete(ctx context.Context, req *dto.DeleteDocumentReq) (rows int64, err error) { func (d *documentDao) Delete(ctx context.Context, req *dto.DeleteDocumentReq) (rows int64, err error) {
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDocument).Where(entity.DocumentCol.Id, req.Id).Delete() r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDocument).Where(entity.DocumentCol.Id, req.Id).Delete()
if err != nil { if err != nil {
return return
} }
@@ -57,7 +57,7 @@ func (d *documentDao) Delete(ctx context.Context, req *dto.DeleteDocumentReq) (r
// GetByID 根据ID获取文件 // GetByID 根据ID获取文件
func (d *documentDao) GetByID(ctx context.Context, req *dto.GetDocumentReq, fields ...string) (res *entity.Document, err error) { func (d *documentDao) GetByID(ctx context.Context, req *dto.GetDocumentReq, fields ...string) (res *entity.Document, err error) {
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDocument).Where(entity.DocumentCol.Id, req.Id).Fields(fields).One() r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDocument).Where(entity.DocumentCol.Id, req.Id).Fields(fields).One()
if err != nil { if err != nil {
return return
} }
@@ -67,7 +67,7 @@ func (d *documentDao) GetByID(ctx context.Context, req *dto.GetDocumentReq, fiel
// List 获取文件列表 // List 获取文件列表
func (d *documentDao) List(ctx context.Context, req *dto.ListDocumentReq, fields ...string) (res []*entity.Document, total int, err error) { func (d *documentDao) List(ctx context.Context, req *dto.ListDocumentReq, fields ...string) (res []*entity.Document, total int, err error) {
model := gfdb.DB(ctx).Model(ctx, public.TableNameDocument).OmitEmpty() model := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDocument).OmitEmpty()
if !g.IsEmpty(req.Keyword) { if !g.IsEmpty(req.Keyword) {
model.WhereLike(entity.DocumentCol.Title, "%"+req.Keyword+"%") model.WhereLike(entity.DocumentCol.Title, "%"+req.Keyword+"%")
} }

View File

@@ -20,7 +20,7 @@ func (d *documentChunkDao) BatchInsert(ctx context.Context, req []*dto.VectorDoc
if err = gconv.Structs(req, &res); err != nil { if err = gconv.Structs(req, &res); err != nil {
return return
} }
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDocumentChunk).Data(&res).Insert() r, err := gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDocumentChunk).Data(&res).Insert()
if err != nil { if err != nil {
return return
} }
@@ -29,7 +29,7 @@ func (d *documentChunkDao) BatchInsert(ctx context.Context, req []*dto.VectorDoc
// Update 更新文件块 // Update 更新文件块
func (d *documentChunkDao) Update(ctx context.Context, req *dto.UpdateDocumentChunkReq) (rows int64, err error) { func (d *documentChunkDao) Update(ctx context.Context, req *dto.UpdateDocumentChunkReq) (rows int64, err error) {
model := gfdb.DB(ctx).Model(ctx, public.TableNameDocumentChunk) model := gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDocumentChunk)
r, err := model.Data(&req).Where(entity.DocumentChunkCol.Id, req.Id).Update() r, err := model.Data(&req).Where(entity.DocumentChunkCol.Id, req.Id).Update()
if err != nil { if err != nil {
return return
@@ -39,7 +39,7 @@ func (d *documentChunkDao) Update(ctx context.Context, req *dto.UpdateDocumentCh
// List 文件块列表 // List 文件块列表
func (d *documentChunkDao) List(ctx context.Context, req *dto.ListDocumentChunkReq, fields ...string) (res []*entity.DocumentChunk, total int, err error) { func (d *documentChunkDao) List(ctx context.Context, req *dto.ListDocumentChunkReq, fields ...string) (res []*entity.DocumentChunk, total int, err error) {
model := gfdb.DB(ctx).Model(ctx, public.TableNameDocumentChunk).Fields(fields).OmitEmpty(). model := gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDocumentChunk).Fields(fields).OmitEmpty().
Where(entity.DocumentChunkCol.DatasetId, req.DatasetId). Where(entity.DocumentChunkCol.DatasetId, req.DatasetId).
Where(entity.DocumentChunkCol.DocumentId, req.DocumentId). Where(entity.DocumentChunkCol.DocumentId, req.DocumentId).
Where(entity.DocumentChunkCol.Status, req.Status). Where(entity.DocumentChunkCol.Status, req.Status).
@@ -55,50 +55,3 @@ func (d *documentChunkDao) List(ctx context.Context, req *dto.ListDocumentChunkR
err = r.Structs(&res) err = r.Structs(&res)
return return
} }
//// Insert 插入向量文档
//func (d *vectorDocumentDao) Insert(ctx context.Context, docs []*entity.DocumentChunk) (ids []interface{}, err error) {
// if len(docs) == 0 {
// return
// }
// interfaces := make([]interface{}, len(docs))
// for i := range docs {
// interfaces[i] = docs[i]
// }
// return mongoDB.Insert(ctx, interfaces, CollectionVectorDoc)
//}
//
//// DeleteByIDs 根据ID删除向量文档
//func (d *vectorDocumentDao) DeleteByIDs(ctx context.Context, ids []string) (err error) {
// if len(ids) == 0 {
// return
// }
// objectIDs := make([]bson.ObjectID, len(ids))
// for i, id := range ids {
// objectIDs[i], err = bson.ObjectIDFromHex(id)
// if err != nil {
// return err
// }
// }
// filter := bson.M{"_id": bson.M{"$in": objectIDs}}
// _, err = mongoDB.Delete(ctx, filter, CollectionVectorDoc)
// return
//}
//
//// GetByIndexID 根据索引ID获取向量文档
//func (d *vectorDocumentDao) GetByIndexID(ctx context.Context, indexID string, limit int) (result []*entity.DocumentChunk, err error) {
// filter := bson.M{"indexId": indexID}
// page := &beans.Page{PageNum: 1, PageSize: int64(limit)}
// _, err = mongoDB.Find(ctx, filter, &result, CollectionVectorDoc, page, nil)
// return
//}
//
//// GetByVectorIDs 根据向量ID获取向量文档
//func (d *vectorDocumentDao) GetByVectorIDs(ctx context.Context, vectorIDs []string) (result []*entity.DocumentChunk, err error) {
// if len(vectorIDs) == 0 {
// return
// }
// filter := bson.M{"vectorId": bson.M{"$in": vectorIDs}}
// _, err = mongoDB.Find(ctx, filter, &result, CollectionVectorDoc, &beans.Page{PageSize: -1}, nil)
// return
//}

View File

@@ -20,7 +20,7 @@ func (d *keywordDao) Insert(ctx context.Context, req *dto.CreateKeywordReq) (id
if err = gconv.Struct(req, &res); err != nil { if err = gconv.Struct(req, &res); err != nil {
return return
} }
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameKeyword).Data(&res).Insert() r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword).Data(&res).Insert()
if err != nil { if err != nil {
return return
} }
@@ -32,7 +32,7 @@ func (d *keywordDao) BatchSaveOrUpdate(ctx context.Context, req []*dto.CreateKey
if err = gconv.Structs(req, &res); err != nil { if err = gconv.Structs(req, &res); err != nil {
return return
} }
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameKeyword).Data(&res).OnConflict( r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword).Data(&res).OnConflict(
entity.KeywordCol.TenantId, entity.KeywordCol.TenantId,
entity.KeywordCol.DatasetId, entity.KeywordCol.DatasetId,
entity.KeywordCol.DocumentId, entity.KeywordCol.DocumentId,
@@ -44,7 +44,7 @@ func (d *keywordDao) BatchSaveOrUpdate(ctx context.Context, req []*dto.CreateKey
} }
func (d *keywordDao) Update(ctx context.Context, req *dto.UpdateKeywordReq) (rows int64, err error) { func (d *keywordDao) Update(ctx context.Context, req *dto.UpdateKeywordReq) (rows int64, err error) {
model := gfdb.DB(ctx).Model(ctx, public.TableNameKeyword) model := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword)
r, err := model.Data(&req).Where(entity.KeywordCol.Id, req.Id).Update() r, err := model.Data(&req).Where(entity.KeywordCol.Id, req.Id).Update()
if err != nil { if err != nil {
return return
@@ -53,7 +53,7 @@ func (d *keywordDao) Update(ctx context.Context, req *dto.UpdateKeywordReq) (row
} }
func (d *keywordDao) Delete(ctx context.Context, req *dto.DeleteKeywordReq) (rows int64, err error) { func (d *keywordDao) Delete(ctx context.Context, req *dto.DeleteKeywordReq) (rows int64, err error) {
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameKeyword).Where(entity.KeywordCol.Id, req.Id).Delete() r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword).Where(entity.KeywordCol.Id, req.Id).Delete()
if err != nil { if err != nil {
return return
} }
@@ -61,7 +61,7 @@ func (d *keywordDao) Delete(ctx context.Context, req *dto.DeleteKeywordReq) (row
} }
func (d *keywordDao) Count(ctx context.Context, req *dto.ListKeywordReq) (count int, err error) { func (d *keywordDao) Count(ctx context.Context, req *dto.ListKeywordReq) (count int, err error) {
count, err = gfdb.DB(ctx).Model(ctx, public.TableNameKeyword).OmitEmpty(). count, err = gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword).OmitEmpty().
Where(entity.KeywordCol.DatasetId, req.DatasetId). Where(entity.KeywordCol.DatasetId, req.DatasetId).
Where(entity.KeywordCol.DocumentId, req.DocumentId). Where(entity.KeywordCol.DocumentId, req.DocumentId).
Where(entity.KeywordCol.Word, req.Word).Count() Where(entity.KeywordCol.Word, req.Word).Count()
@@ -69,7 +69,7 @@ func (d *keywordDao) Count(ctx context.Context, req *dto.ListKeywordReq) (count
} }
func (d *keywordDao) GetByID(ctx context.Context, req *dto.GetKeywordReq, fields ...string) (res *entity.Document, err error) { func (d *keywordDao) GetByID(ctx context.Context, req *dto.GetKeywordReq, fields ...string) (res *entity.Document, err error) {
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameKeyword).Where(entity.KeywordCol.Id, req.Id).Fields(fields).One() r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword).Where(entity.KeywordCol.Id, req.Id).Fields(fields).One()
if err != nil { if err != nil {
return return
} }
@@ -78,7 +78,7 @@ func (d *keywordDao) GetByID(ctx context.Context, req *dto.GetKeywordReq, fields
} }
func (d *keywordDao) List(ctx context.Context, req *dto.ListKeywordReq, fields ...string) (res []*entity.Keyword, total int, err error) { func (d *keywordDao) List(ctx context.Context, req *dto.ListKeywordReq, fields ...string) (res []*entity.Keyword, total int, err error) {
model := gfdb.DB(ctx).Model(ctx, public.TableNameKeyword).Fields(fields).OmitEmpty() model := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword).Fields(fields).OmitEmpty()
if !g.IsEmpty(req.Keyword) { if !g.IsEmpty(req.Keyword) {
model.WhereLike(entity.KeywordCol.Word, "%"+req.Keyword+"%") model.WhereLike(entity.KeywordCol.Word, "%"+req.Keyword+"%")
} }

View File

@@ -2,6 +2,7 @@ package service
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"rag/common/eino" "rag/common/eino"
"rag/common/gse" "rag/common/gse"
@@ -123,6 +124,9 @@ func (s *documentService) Process(ctx context.Context, req *dto.ProcessDocumentR
if err != nil { if err != nil {
return nil, err return nil, err
} }
if g.IsEmpty(doc) {
return nil, errors.New("document not found")
}
// 2. 使用eino框架进行文件切分并发执行 // 2. 使用eino框架进行文件切分并发执行
var vectorDocsCount, chunks int64 var vectorDocsCount, chunks int64

View File

@@ -57,49 +57,6 @@ func (s *documentChunkService) DocsChunkMsg(ctx context.Context, msg any) (err e
return return
} }
//ctx = context.WithValue(ctx, "user", &beans.User{
// TenantId: req[0].TenantId,
// UserName: req[0].Creator,
//})
// 调用eino接口获取向量
//var vectorDocsStr = make([]string, 0, len(req))
//for _, t := range req {
// vectorDocsStr = append(vectorDocsStr, t.Content)
//}
//embeddings, err := eino.EmbedStrings(ctx, vectorDocsStr)
//if err != nil {
// g.Log().Error(ctx, "DocsChunkMsg err:", err)
// err = s.publishKnowledgeDocumentMsg(ctx, req[0].TenantId, req[0].Creator, req[0].DocumentId, document.VectorStatusFailed.Code())
// return
//}
// 获取向量维度
//dimension := 0
//if len(embeddings) > 0 {
// dimension = len(embeddings[0])
//}
// 创建或更新DatasetIndex
//err = s.createOrUpdateDatasetIndex(ctx, req[0].DatasetId, dimension, int64(len(req)))
//if err != nil {
// g.Log().Error(ctx, "CreateOrUpdateDatasetIndex err:", err)
// err = s.publishKnowledgeDocumentMsg(ctx, req[0].TenantId, req[0].Creator, req[0].DocumentId, document.VectorStatusFailed.Code())
// return
//}
// 更新向量文档
//for i, embedding := range embeddings {
// req[i].Vector = pgvector.NewVector(gconv.Float32s(embedding))
// req[i].VectorStatus = document.VectorStatusCompleted.Code()
// req[i].Status = document.StatusEnable.Code()
//}
//_, err = dao.DocumentChunk.BatchInsert(ctx, req)
//if err != nil {
// g.Log().Error(ctx, "DocsChunkMsg err:", err)
// err = s.publishKnowledgeDocumentMsg(ctx, req[0].TenantId, req[0].Creator, req[0].DocumentId, document.VectorStatusFailed.Code())
// return
//}
idx := eino.NewPGVectorIndexer(&eino.PGVectorIndexerOptions{ idx := eino.NewPGVectorIndexer(&eino.PGVectorIndexerOptions{
BatchSize: 10, BatchSize: 10,
}) })
@@ -108,63 +65,14 @@ func (s *documentChunkService) DocsChunkMsg(ctx context.Context, msg any) (err e
g.Log().Error(ctx, "DocsChunkMsg rows: , err:", rows, err) g.Log().Error(ctx, "DocsChunkMsg rows: , err:", rows, err)
return return
} }
tenantId := docs[0].MetaData[entity.DocumentChunkCol.TenantId].(uint64) tenantId := gconv.Uint64(docs[0].MetaData[entity.DocumentChunkCol.TenantId])
creator := docs[0].MetaData[entity.DocumentChunkCol.Creator].(string) creator := gconv.String(docs[0].MetaData[entity.DocumentChunkCol.Creator])
documentId := docs[0].MetaData[entity.DocumentChunkCol.DocumentId].(int64) documentId := gconv.Int64(docs[0].MetaData[entity.DocumentChunkCol.DocumentId])
err = s.publishKnowledgeDocumentMsg(ctx, tenantId, creator, documentId, document.VectorStatusCompleted.Code()) err = s.publishKnowledgeDocumentMsg(ctx, tenantId, creator, documentId, document.VectorStatusCompleted.Code())
return return
} }
//// createOrUpdateDatasetIndex 创建或更新数据集索引
//func (s *documentChunkService) createOrUpdateDatasetIndex(ctx context.Context, datasetId int64, dimension int, vectorCount int64) (err error) {
// // 查询数据集是否已有索引
// existIndex, err := dao.DatasetIndex.GetByDatasetId(ctx, datasetId)
// if err != nil && !errors.Is(err, sql.ErrNoRows) {
// return err
// }
//
// // 已有索引 → 只更新数量
// if existIndex != nil {
// _ = dao.DatasetIndex.IncVectorCount(ctx, existIndex.Id, vectorCount)
// return nil
// }
//
// // ====================== 创建新索引 ======================
// indexName := fmt.Sprintf("idx_dataset_%d_vector", datasetId) // 真实PG索引名
// // 1. 插入索引配置
// index := &entity.DatasetIndex{
// DatasetId: datasetId,
// Name: indexName,
// Dimension: dimension,
// FieldType: "float",
// MetricType: "COSINE",
// Status: gconv.PtrInt8(1),
// VectorCount: vectorCount,
// Description: fmt.Sprintf("数据集%d向量索引", datasetId),
// }
// _, err = dao.DatasetIndex.Insert(ctx, index)
// if err != nil {
// return err
// }
//
// // 2. 真正创建 PGVector 索引(唯一真实索引!)
// err = s.createRealPGVectorIndex(ctx, indexName)
// return err
//}
//
//// createRealPGVectorIndex 真正在PostgreSQL创建向量索引真实可用
//func (s *documentChunkService) createRealPGVectorIndex(ctx context.Context, indexName string) error {
// // 执行真实建索引语句
// err := dao.DatasetIndex.InsertIndex(ctx, indexName)
// if err != nil {
// g.Log().Error(ctx, "创建向量索引失败:", err)
// return err
// }
// g.Log().Info(ctx, "PGVector真实索引创建成功"+indexName)
// return nil
//}
// publishKnowledgeDocumentMsg 发布消息 // publishKnowledgeDocumentMsg 发布消息
func (s *documentChunkService) publishKnowledgeDocumentMsg(ctx context.Context, tenantId uint64, creator string, documentId int64, vectorStatus document.VectorStatus) (err error) { func (s *documentChunkService) publishKnowledgeDocumentMsg(ctx context.Context, tenantId uint64, creator string, documentId int64, vectorStatus document.VectorStatus) (err error) {
knowledgeDocumentMsg := dto.KnowledgeDocumentMsg{ knowledgeDocumentMsg := dto.KnowledgeDocumentMsg{

View File

@@ -134,9 +134,9 @@ CREATE TABLE IF NOT EXISTS rag_knowledge_keyword (
); );
-- 唯一索引:保证 租户 + 数据集 + 文档 + 关键词 全局唯一 -- 唯一索引:保证 租户 + 数据集 + 文档 + 关键词 全局唯一
CREATE UNIQUE INDEX uk_rag_knowledge_keyword_tenant_dataset_doc_word -- CREATE UNIQUE INDEX uk_rag_knowledge_keyword_tenant_dataset_doc_word
ON rag_knowledge_keyword(tenant_id, dataset_id, document_id, word) -- ON rag_knowledge_keyword(tenant_id, dataset_id, document_id, word)
WHERE deleted_at IS NULL; -- WHERE deleted_at IS NULL;
-- 索引(按业务高频查询) -- 索引(按业务高频查询)
CREATE INDEX idx_keyword_tenant_id ON rag_knowledge_keyword(tenant_id); CREATE INDEX idx_keyword_tenant_id ON rag_knowledge_keyword(tenant_id);
@@ -160,3 +160,118 @@ COMMENT ON COLUMN rag_knowledge_keyword.word IS '关键词';
COMMENT ON COLUMN rag_knowledge_keyword.weight IS '权重'; COMMENT ON COLUMN rag_knowledge_keyword.weight IS '权重';
--------------------pgsql创建rag_knowledge_keyword表语句--------------------------- --------------------pgsql创建rag_knowledge_keyword表语句---------------------------
--------------------pgsql创建rag_vector_dataset_index表语句---------------------------
-- 向量数据集索引表
CREATE TABLE IF NOT EXISTS rag_vector_dataset_index (
-- 基础字段
id BIGINT PRIMARY KEY, -- 主键ID非自增
tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID int8
creator VARCHAR(64) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updater VARCHAR(64) NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at timestamp(6),
-- 核心字段
dataset_id INT8 NOT NULL,
name VARCHAR(255) NOT NULL,
collection VARCHAR(255) NOT NULL,
dimension INT NOT NULL,
field_type VARCHAR(50) NOT NULL,
metric_type VARCHAR(50) NOT NULL,
status SMALLINT NOT NULL DEFAULT 1, -- 状态1启用/0停用
vector_count INT8 NOT NULL DEFAULT 0,
description TEXT
);
-- 唯一约束
ALTER TABLE rag_vector_dataset_index ADD CONSTRAINT uk_dataset_id_name UNIQUE (dataset_id, name);
-- 索引
CREATE INDEX idx_dataset_index_tenant_id ON rag_vector_dataset_index(tenant_id);
CREATE INDEX idx_dataset_index_dataset_id ON rag_vector_dataset_index(dataset_id);
CREATE INDEX idx_dataset_index_status ON rag_vector_dataset_index(status);
-- 注释
COMMENT ON TABLE rag_vector_dataset_index IS '向量数据集索引表';
COMMENT ON COLUMN rag_vector_dataset_index.id IS '主键ID非自增';
COMMENT ON COLUMN rag_vector_dataset_index.tenant_id IS '租户ID';
COMMENT ON COLUMN rag_vector_dataset_index.creator IS '创建人';
COMMENT ON COLUMN rag_vector_dataset_index.created_at IS '创建时间';
COMMENT ON COLUMN rag_vector_dataset_index.updater IS '更新人';
COMMENT ON COLUMN rag_vector_dataset_index.updated_at IS '更新时间';
COMMENT ON COLUMN rag_vector_dataset_index.deleted_at IS '删除时间(软删)';
COMMENT ON COLUMN rag_vector_dataset_index.dataset_id IS '数据集ID';
COMMENT ON COLUMN rag_vector_dataset_index.name IS '索引名称';
COMMENT ON COLUMN rag_vector_dataset_index.collection IS '向量集合名称';
COMMENT ON COLUMN rag_vector_dataset_index.dimension IS '向量维度';
COMMENT ON COLUMN rag_vector_dataset_index.field_type IS '字段类型';
COMMENT ON COLUMN rag_vector_dataset_index.metric_type IS '度量类型';
COMMENT ON COLUMN rag_vector_dataset_index.status IS '状态';
COMMENT ON COLUMN rag_vector_dataset_index.vector_count IS '向量数量';
COMMENT ON COLUMN rag_vector_dataset_index.description IS '描述';
--------------------pgsql创建rag_vector_dataset_index表语句---------------------------
--------------------pgsql创建rag_vector_document_chunk表语句---------------------------
CREATE EXTENSION IF NOT EXISTS vector;
-- 文档分块向量表
CREATE TABLE IF NOT EXISTS rag_vector_document_chunk (
-- 基础字段
id BIGINT PRIMARY KEY, -- 主键ID非自增
tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID int8
creator VARCHAR(64) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updater VARCHAR(64) NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at timestamp(6),
-- 核心字段
status SMALLINT NOT NULL DEFAULT 1, -- 状态1启用/0停用
vector_status SMALLINT NOT NULL DEFAULT 1, -- 向量化状态: 1pending, 2processing, 3completed, 4failed,5partCompleted
dataset_id INT8 NOT NULL,
document_id INT8 NOT NULL,
content TEXT NOT NULL,
content_hash VARCHAR(128) NOT NULL,
chunk_index INT8 NOT NULL,
-- 向量字段pgvector
vector vector(1024) NOT NULL,
-- 扩展信息
metadata JSONB
);
-- 索引
CREATE INDEX idx_chunk_tenant_id ON rag_vector_document_chunk(tenant_id);
CREATE INDEX idx_chunk_dataset_id ON rag_vector_document_chunk(dataset_id);
CREATE INDEX idx_chunk_document_id ON rag_vector_document_chunk(document_id);
CREATE INDEX idx_chunk_content_hash ON rag_vector_document_chunk(content_hash);
CREATE INDEX idx_chunk_status ON rag_vector_document_chunk(status);
CREATE INDEX idx_chunk_vector_status ON rag_vector_document_chunk(vector_status);
-- 注释
COMMENT ON TABLE rag_vector_document_chunk IS '文档分块向量表';
COMMENT ON COLUMN rag_vector_document_chunk.id IS '主键ID非自增';
COMMENT ON COLUMN rag_vector_document_chunk.tenant_id IS '租户ID';
COMMENT ON COLUMN rag_vector_document_chunk.creator IS '创建人';
COMMENT ON COLUMN rag_vector_document_chunk.created_at IS '创建时间';
COMMENT ON COLUMN rag_vector_document_chunk.updater IS '更新人';
COMMENT ON COLUMN rag_vector_document_chunk.updated_at IS '更新时间';
COMMENT ON COLUMN rag_vector_document_chunk.deleted_at IS '删除时间(软删)';
COMMENT ON COLUMN rag_vector_document_chunk.status IS '状态';
COMMENT ON COLUMN rag_vector_document_chunk.vector_status IS '向量生成状态';
COMMENT ON COLUMN rag_vector_document_chunk.dataset_id IS '数据集ID';
COMMENT ON COLUMN rag_vector_document_chunk.document_id IS '文档ID';
COMMENT ON COLUMN rag_vector_document_chunk.content IS '分块内容';
COMMENT ON COLUMN rag_vector_document_chunk.content_hash IS '内容哈希';
COMMENT ON COLUMN rag_vector_document_chunk.chunk_index IS '分块序号';
COMMENT ON COLUMN rag_vector_document_chunk.vector IS '向量数据';
COMMENT ON COLUMN rag_vector_document_chunk.metadata IS '扩展元数据';
--------------------pgsql创建rag_vector_document_chunk表语句---------------------------