Compare commits

..

9 Commits

Author SHA1 Message Date
e5a27c00ed 测试服务器IP配置变更 2026-04-08 22:14:22 +08:00
qhd
b6896f3fb4 feat: test 2026-04-03 18:26:20 +08:00
qhd
026beea4d9 feat: 支持多数据库配置与PGVector检索 2026-04-03 17:59:05 +08:00
86c2b7d66e 删除 .idea/vcs.xml 2026-04-03 03:17:32 +00:00
722fbe0cc3 删除 .idea/rag.iml 2026-04-03 03:17:29 +00:00
6d68b468a6 删除 .idea/modules.xml 2026-04-03 03:17:26 +00:00
b33d50944a 删除 .idea/.gitignore 2026-04-03 03:17:22 +00:00
6f2df61bc5 1 2026-04-03 11:17:08 +08:00
qhd
b00d544fb7 feat: rag初始版 2026-04-03 11:14:44 +08:00
33 changed files with 1641 additions and 238 deletions

24
Dockerfile Normal file
View File

@@ -0,0 +1,24 @@
# 最小化Docker镜像
FROM busybox:uclibc
WORKDIR /app
# 复制时区数据
COPY timezone/localtime /etc/localtime
COPY timezone/timezone /etc/timezone
COPY timezone/Shanghai /usr/share/zoneinfo/Asia/Shanghai
# 复制预构建的二进制文件和配置文件
COPY rag_binary ./main
COPY config.yml ./
# 创建日志目录
RUN mkdir -p /logs /app/resource/log/run /app/resource/log/server
# 添加执行权限
RUN chmod +x /app/main
EXPOSE 3008
# 使用root用户运行
CMD ["./main"]

166
common/eino/a.go Normal file
View File

@@ -0,0 +1,166 @@
package eino
import (
"context"
"errors"
"fmt"
"io"
"log"
"os"
"github.com/cloudwego/eino/components/prompt"
"github.com/cloudwego/eino/components/retriever"
"github.com/cloudwego/eino/schema"
"github.com/cloudwego/eino-ext/components/model/ark"
)
func main() {
ctx := context.Background()
// ==========================================
// 1. 初始化三大组件
// ==========================================
// 1.1 向量检索(从知识库查客服知识)
ragRetriever := NewPGVectorRetriever()
// 1.2 提示词模板(客服角色 + 历史 + 知识库 + 用户问题)
chatTpl := newCustomerServiceTemplate()
// 1.3 大模型ARK
chatModel, err := ark.NewChatModel(ctx, &ark.ChatModelConfig{
APIKey: os.Getenv("ARK_API_KEY"),
Model: os.Getenv("ARK_MODEL_ID"),
})
if err != nil {
log.Fatal(err)
}
// ==========================================
// 2. 模拟会话:从 DB 读取历史对话
// ==========================================
sessionHistory := []*schema.Message{
{Role: schema.User, Content: "你们发什么快递?"},
{Role: schema.Assistant, Content: "默认发中通快递"},
{Role: schema.User, Content: "可以发顺丰吗?"},
}
// 当前用户问题
userQuery := "那顺丰需要加钱吗?"
// ==========================================
// 3. RAG 检索知识库
// ==========================================
docs, err := ragRetriever.Retrieve(ctx, userQuery)
if err != nil {
log.Fatal(err)
}
// 拼接参考知识
knowledge := ""
for i, doc := range docs {
knowledge += fmt.Sprintf("[参考%d] %s\n", i+1, doc.Content)
}
// ==========================================
// 4. 模板格式化:系统提示 + 历史 + 知识 + 当前问题
// ==========================================
msgs, err := chatTpl.Format(ctx, map[string]any{
"history": sessionHistory,
"knowledge": knowledge,
"question": userQuery,
})
if err != nil {
log.Fatal(err)
}
// ==========================================
// 5. 流式调用大模型生成客服回答
// ==========================================
fmt.Println("\n=== 客服回复 ===")
stream, err := chatModel.Stream(ctx, msgs)
if err != nil {
log.Fatal(err)
}
fullReply := make([]*schema.Message, 0, 100)
for {
chunk, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Print(chunk.Content)
fullReply = append(fullReply, chunk)
}
// ==========================================
// 6. 拼接完整回复,存入 DB 作为新历史
// ==========================================
replyMsg, _ := schema.ConcatMessages(fullReply)
sessionHistory = append(sessionHistory,
&schema.Message{Role: schema.User, Content: userQuery},
replyMsg,
)
// 接下来把 sessionHistory 存回你的 MySQL/Redis 即可
}
// ==========================================
// 本地客服提示词模板(不需要 MCP
// ==========================================
func newCustomerServiceTemplate() prompt.ChatTemplate {
// 系统提示 + 多轮对话 + 知识库 + 用户问题
return prompt.FromMessages(schema.Messages{
{
Role: schema.System,
Content: `你是电商智能客服,语气友好简洁。
请严格根据参考知识回答,不知道就说“抱歉,这个问题我需要帮你转接人工”。
参考知识:
{{.knowledge}}`,
},
// 历史对话会自动渲染在这里
{{range .history}}{{.}},{{end}},
// 当前用户问题
{Role: schema.User, Content: "{{.question}}"},
})
}
// ==========================================
// PGVector 检索器(简化可直接用)
// ==========================================
type PGVectorRetriever struct {
topK int
}
func NewPGVectorRetriever() retriever.Retriever {
return &PGVectorRetriever{topK: 3}
}
func (r *PGVectorRetriever) Retrieve(
ctx context.Context,
query string,
opts ...retriever.Option,
) ([]*schema.Document, error) {
options := retriever.GetCommonOptions(nil, opts...)
topK := r.topK
if options.TopK != nil {
topK = *options.TopK
}
// ===== 这里替换成你真实的 PG 向量检索 SQL =====
// 模拟知识库
return []*schema.Document{
{
ID: "1",
Content: "顺丰快递需要补10元运费差价",
},
{
ID: "2",
Content: "订单满99元可免费升级顺丰",
},
}, nil
}

107
common/eino/b.go Normal file
View File

@@ -0,0 +1,107 @@
package eino
import (
"context"
"fmt"
"github.com/cloudwego/eino/schema"
"github.com/elastic/go-elasticsearch/v8"
"github.com/cloudwego/eino-ext/components/indexer/es8"
)
const (
indexName = "eino_example"
fieldContent = "content"
fieldContentVector = "content_vector"
fieldExtraLocation = "location"
docExtraLocation = "location"
)
func TestIndexer() {
ctx := context.Background()
// 1. 创建 ES 客户端
client, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{"http://localhost:9200"},
})
if err != nil {
fmt.Printf("create client error: %v\n", err)
return
}
// 2. 定义 Index Spec选填如果索引不存在将自动创建
indexSpec := &es8.IndexSpec{
Settings: map[string]any{
"number_of_shards": 1,
"number_of_replicas": 0,
},
Mappings: map[string]any{
"properties": map[string]any{
fieldContentVector: map[string]any{
"type": "dense_vector",
"dims": 1024,
"index": true,
"similarity": "l2_norm",
},
},
},
}
// 4. 准备文档
// 文档通常包含 ID 和 Content
// 也可以包含额外的 Metadata 用于过滤或其他用途
docs := []*schema.Document{
{
ID: "1",
Content: "Eiffel Tower: Located in Paris, France.",
MetaData: map[string]any{
docExtraLocation: "France",
},
},
{
ID: "2",
Content: "The Great Wall: Located in China.",
MetaData: map[string]any{
docExtraLocation: "China",
},
},
}
// 5. 创建 ES 索引器组件
indexer, err := es8.NewIndexer(ctx, &es8.IndexerConfig{
Client: client,
Index: indexName,
IndexSpec: indexSpec, // 添加此项以启用自动索引创建
BatchSize: 10,
// DocumentToFields 指定如何将文档字段映射到 ES 字段
DocumentToFields: func(ctx context.Context, doc *schema.Document) (field2Value map[string]es8.FieldValue, err error) {
return map[string]es8.FieldValue{
fieldContent: {
Value: doc.Content,
EmbedKey: fieldContentVector, // 对文档内容进行向量化并保存到 "content_vector" 字段
},
fieldExtraLocation: {
// 额外的 metadata 字段
Value: doc.MetaData[docExtraLocation],
},
}, nil
},
// 提供 embedding 组件用于向量化
Embedding: EmbedderDashscope,
})
if err != nil {
fmt.Printf("create indexer error: %v\n", err)
return
}
// 6. 索引文档
ids, err := indexer.Store(ctx, docs)
if err != nil {
fmt.Printf("index error: %v\n", err)
return
}
fmt.Println("indexed ids:", ids)
}

49
common/eino/base_task.go Normal file
View File

@@ -0,0 +1,49 @@
package eino
import (
"time"
"gitea.com/red-future/common/beans"
)
// BaseTask 任务基类 - MongoDB版本
type BaseTask struct {
beans.MongoBaseDO `bson:",inline"`
// 任务信息
TaskType TaskType `bson:"taskType" json:"taskType"`
Status TaskStatus `bson:"status" json:"status"`
Priority TaskPriority `bson:"priority,omitempty" json:"priority,omitempty"`
// 进度
TotalItems int64 `bson:"totalItems" json:"totalItems"`
ProcessedItems int64 `bson:"processedItems" json:"processedItems"`
Progress float64 `bson:"progress" json:"progress"`
// 结果
StartTime *time.Time `bson:"startTime" json:"startTime"`
EndTime *time.Time `bson:"endTime,omitempty" json:"endTime,omitempty"`
Duration int64 `bson:"duration,omitempty" json:"duration,omitempty"`
SuccessCount int64 `bson:"successCount" json:"successCount"`
FailCount int64 `bson:"failCount" json:"failCount"`
// 其他
Executor string `bson:"executor,omitempty" json:"executor,omitempty"`
}
// SQLBaseTask 任务基类 - SQL版本
type SQLBaseTask struct {
beans.SQLBaseDO
// 任务信息
TaskType TaskType `json:"taskType"`
Status TaskStatus `json:"status"`
Priority TaskPriority `json:"priority,omitempty"`
// 进度
TotalItems int64 `json:"totalItems"`
ProcessedItems int64 `json:"processedItems"`
Progress float64 `json:"progress"`
// 结果
StartTime *time.Time `json:"startTime"`
EndTime *time.Time `json:"endTime,omitempty"`
Duration int64 `json:"duration,omitempty"`
SuccessCount int64 `json:"successCount"`
FailCount int64 `json:"failCount"`
// 其他
Executor string `json:"executor,omitempty"`
}

94
common/eino/c.go Normal file
View File

@@ -0,0 +1,94 @@
package eino
import (
"context"
"encoding/json"
"fmt"
"github.com/cloudwego/eino/schema"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
"github.com/cloudwego/eino-ext/components/retriever/es8"
"github.com/cloudwego/eino-ext/components/retriever/es8/search_mode"
)
func TestRetriever() {
ctx := context.Background()
client, _ := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{"http://localhost:9200"},
})
// 创建 retriever 组件
retriever, _ := es8.NewRetriever(ctx, &es8.RetrieverConfig{
Client: client,
Index: indexName,
TopK: 5,
SearchMode: search_mode.SearchModeApproximate(&search_mode.ApproximateConfig{
QueryFieldName: fieldContent,
VectorFieldName: fieldContentVector,
Hybrid: false,
// RRF 仅在特定许可证下可用
// 参见: https://www.elastic.co/subscriptions
RRF: false,
RRFRankConstant: nil,
RRFWindowSize: nil,
}),
ResultParser: func(ctx context.Context, hit types.Hit) (doc *schema.Document, err error) {
doc = &schema.Document{
ID: *hit.Id_,
Content: "",
MetaData: map[string]any{},
}
var src map[string]any
if err = json.Unmarshal(hit.Source_, &src); err != nil {
return nil, err
}
for field, val := range src {
switch field {
case fieldContent:
doc.Content = val.(string)
case fieldContentVector:
var v []float64
for _, item := range val.([]interface{}) {
v = append(v, item.(float64))
}
doc.WithDenseVector(v)
case fieldExtraLocation:
doc.MetaData[docExtraLocation] = val.(string)
}
}
if hit.Score_ != nil {
doc.WithScore(float64(*hit.Score_))
}
return doc, nil
},
Embedding: EmbedderDashscope,
})
// 不带过滤器的搜索
docs, _ := retriever.Retrieve(ctx, "tourist attraction")
// 带过滤器的搜索
docs, _ = retriever.Retrieve(ctx, "tourist attraction",
es8.WithFilters([]types.Query{{
Term: map[string]types.TermQuery{
fieldExtraLocation: {
CaseInsensitive: of(true),
Value: "China",
},
},
}}),
)
fmt.Printf("retrieved docs: %+v\n", docs)
}
func of[T any](v T) *T {
return &v
}

8
common/eino/consts.go Normal file
View File

@@ -0,0 +1,8 @@
package eino
const (
providerArk = "ark"
providerOpenai = "openai"
providerQianfan = "qianfan"
providerDashscope = "dashscope"
)

View File

@@ -0,0 +1,51 @@
package eino
import (
"context"
"fmt"
"gitea.com/red-future/common/utils"
"github.com/cloudwego/eino-ext/components/document/loader/url"
"github.com/cloudwego/eino-ext/components/document/parser/docx"
"github.com/cloudwego/eino-ext/components/document/parser/pdf"
"github.com/cloudwego/eino-ext/components/document/parser/xlsx"
"github.com/cloudwego/eino/components/document"
"github.com/cloudwego/eino/components/document/parser"
"github.com/cloudwego/eino/schema"
)
// LoadDocument 业务函数:加载文件
func LoadDocument(ctx context.Context, filePath, fileFormat string) (docs []*schema.Document, err error) {
p, err := docsParser(ctx, fileFormat)
if err != nil {
return
}
loader, err := url.NewLoader(ctx, &url.LoaderConfig{
Parser: p,
})
imageUrl, err := utils.GetFileAddressPrefix(ctx)
if err != nil {
return
}
docs, err = loader.Load(context.Background(), document.Source{
URI: fmt.Sprintf("%s%s", imageUrl, filePath),
})
return
}
func docsParser(ctx context.Context, fileFormat string) (p parser.Parser, err error) {
switch fileFormat {
case "docx":
p, err = docx.NewDocxParser(ctx, &docx.Config{
ToSections: true,
IncludeHeaders: true,
IncludeFooters: true,
IncludeTables: true,
})
case "pdf":
p, err = pdf.NewPDFParser(ctx, &pdf.Config{})
case "xlsx":
p, err = xlsx.NewXlsxParser(ctx, &xlsx.Config{})
}
return
}

View File

@@ -0,0 +1,64 @@
package eino
import (
"context"
"github.com/cloudwego/eino-ext/components/document/transformer/splitter/recursive"
"github.com/cloudwego/eino-ext/components/document/transformer/splitter/semantic"
"github.com/cloudwego/eino/schema"
"github.com/gogf/gf/v2/frame/g"
)
// SemanticSplitDocument 语义分割文档
func SemanticSplitDocument(ctx context.Context, docs []*schema.Document) (res []*schema.Document, err error) {
// 默认分隔符(支持中英文)
separators := []string{"\n\n", "\n", "。", "", "", "", ".", "!", "?", ";"}
// 读取配置,使用合理的默认值
bufferSize := g.Cfg().MustGet(ctx, "eino.splitter.bufferSize").Int()
minChunkSize := g.Cfg().MustGet(ctx, "eino.splitter.minChunkSize").Int()
percentile := g.Cfg().MustGet(ctx, "eino.splitter.percentile").Float64()
batchSize := g.Cfg().MustGet(ctx, "eino.splitter.batchSize").Int()
if batchSize <= 0 {
batchSize = 10 // doubao-embedding-vision 限制每批最多 10 个
}
// 使用批量包装器
var batchEmbedder *BatchEmbedder
provider := g.Cfg().MustGet(ctx, "eino.embedding.provider").String()
switch provider {
case providerArk:
batchEmbedder = NewBatchEmbedder(EmbedderArk, batchSize)
case providerOpenai:
batchEmbedder = NewBatchEmbedder(EmbedderOpenAI, batchSize)
case providerDashscope:
batchEmbedder = NewBatchEmbedder(EmbedderDashscope, batchSize)
}
splitter, err := semantic.NewSplitter(ctx, &semantic.Config{
Embedding: batchEmbedder,
BufferSize: bufferSize,
MinChunkSize: minChunkSize,
Percentile: percentile,
Separators: separators,
})
if err != nil {
return
}
return splitter.Transform(ctx, docs)
}
// RecursiveSplitDocument 递归分割文档
func RecursiveSplitDocument(ctx context.Context, docs []*schema.Document) (res []*schema.Document, err error) {
// 默认分隔符(支持中英文)
separators := []string{"\n\n", "\n", "。", "", "", "", ".", "!", "?", ";"}
splitter, err := recursive.NewSplitter(ctx, &recursive.Config{
ChunkSize: 512,
OverlapSize: 100,
KeepType: recursive.KeepTypeNone,
Separators: separators,
})
if err != nil {
return
}
return splitter.Transform(ctx, docs)
}

69
common/eino/embedding.go Normal file
View File

@@ -0,0 +1,69 @@
package eino
import (
"context"
"fmt"
"github.com/cloudwego/eino-ext/components/embedding/ark"
"github.com/cloudwego/eino-ext/components/embedding/dashscope"
"github.com/cloudwego/eino-ext/components/embedding/openai"
"github.com/gogf/gf/v2/frame/g"
"github.com/golang/glog"
)
// 全局只初始化一次
var (
EmbedderArk *ark.Embedder
EmbedderDashscope *dashscope.Embedder
EmbedderOpenAI *openai.Embedder
)
func init() {
ctx := context.Background()
if !g.Cfg().MustGet(ctx, "eino.embedding").IsEmpty() {
var err error
provider := g.Cfg().MustGet(ctx, "eino.embedding.provider").String()
switch provider {
case providerArk:
cfg := &ark.EmbeddingConfig{
APIKey: g.Cfg().MustGet(ctx, "eino.embedding.apiKey").String(),
Model: g.Cfg().MustGet(ctx, "eino.embedding.model").String(),
}
if apiType := g.Cfg().MustGet(ctx, "eino.embedding.apiType").String(); apiType != "" {
apiTypeVal := ark.APIType(apiType)
cfg.APIType = &apiTypeVal
}
EmbedderArk, err = ark.NewEmbedder(ctx, cfg)
case providerOpenai:
chatModelConfig := &openai.EmbeddingConfig{
APIKey: g.Cfg().MustGet(ctx, "eino.embedding.apiKey").String(),
Model: g.Cfg().MustGet(ctx, "eino.embedding.model").String(),
}
EmbedderOpenAI, err = openai.NewEmbedder(ctx, chatModelConfig)
case providerDashscope:
cfg := &dashscope.EmbeddingConfig{
APIKey: g.Cfg().MustGet(ctx, "eino.embedding.apiKey").String(),
Model: g.Cfg().MustGet(ctx, "eino.embedding.model").String(),
}
EmbedderDashscope, err = dashscope.NewEmbedder(ctx, cfg)
}
if err != nil {
glog.Fatalf("NewEmbedder of %v error: %v", provider, err)
}
}
return
}
func EmbedStrings(ctx context.Context, texts []string) (embeddings [][]float64, err error) {
provider := g.Cfg().MustGet(ctx, "eino.embedding.provider").String()
switch provider {
case providerArk:
return EmbedderArk.EmbedStrings(ctx, texts)
case providerOpenai:
return EmbedderOpenAI.EmbedStrings(ctx, texts)
case providerDashscope:
return EmbedderDashscope.EmbedStrings(ctx, texts)
}
return nil, fmt.Errorf("unsupported provider: %v", provider)
}

View File

@@ -0,0 +1,47 @@
package eino
import (
"context"
"github.com/cloudwego/eino/components/embedding"
)
// BatchEmbedder 包装器,支持批量限制
type BatchEmbedder struct {
embedder embedding.Embedder
batchSize int
}
// NewBatchEmbedder 创建支持批量限制的 embedding 包装器
func NewBatchEmbedder(embedder embedding.Embedder, batchSize int) *BatchEmbedder {
if batchSize <= 0 {
batchSize = 10 // 默认每批 10 个
}
return &BatchEmbedder{
embedder: embedder,
batchSize: batchSize,
}
}
// EmbedStrings 分批调用 embedding
func (b *BatchEmbedder) EmbedStrings(ctx context.Context, texts []string, opts ...embedding.Option) ([][]float64, error) {
if len(texts) <= b.batchSize {
return b.embedder.EmbedStrings(ctx, texts, opts...)
}
var allEmbeddings [][]float64
for i := 0; i < len(texts); i += b.batchSize {
end := i + b.batchSize
if end > len(texts) {
end = len(texts)
}
batch := texts[i:end]
embeddings, err := b.embedder.EmbedStrings(ctx, batch, opts...)
if err != nil {
return nil, err
}
allEmbeddings = append(allEmbeddings, embeddings...)
}
return allEmbeddings, nil
}

View File

@@ -0,0 +1,273 @@
/*
* Copyright 2024 Red Future Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package eino
import (
"context"
"fmt"
"net/http"
"time"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/components"
"github.com/cloudwego/eino/components/embedding"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gclient"
"github.com/gogf/gf/v2/util/gconv"
)
var (
// 千问API默认配置
defaultBaseURL = "https://dashscope.aliyuncs.com/api/v1/services/embeddings/text-embedding"
defaultTimeout = 10 * time.Minute
defaultRetryTimes = 2
)
type QwenEmbeddingConfig struct {
// Timeout specifies the maximum duration to wait for API responses
// Optional. Default: 10 minutes
Timeout *time.Duration `json:"timeout"`
// HTTPClient specifies the client to send HTTP requests.
// Optional. Default &http.Client{Timeout: Timeout}
HTTPClient *http.Client `json:"http_client"`
// RetryTimes specifies the number of retry attempts for failed API calls
// Optional. Default: 2
RetryTimes *int `json:"retry_times"`
// BaseURL specifies the base URL for Qwen DashScope service
// Optional. Default: "https://dashscope.aliyuncs.com/api/v1/services/embeddings/text-embedding"
BaseURL string `json:"base_url"`
// APIKey specifies the API Key for authentication
// Required
APIKey string `json:"api_key"`
// Model specifies the model name for Qwen embedding
// Required. Examples: "text-embedding-v2", "text-embedding-v3"
Model string `json:"model"`
// TextType specifies the type of text: "document" or "query"
// Optional. Default: "document"
TextType string `json:"text_type"`
// MaxConcurrentRequests specifies the maximum number of concurrent requests allowed
// Optional. Default: 5
MaxConcurrentRequests *int `json:"max_concurrent_requests"`
}
type QwenEmbedder struct {
client *gclient.Client
conf *QwenEmbeddingConfig
}
// EmbeddingRequest 千问embedding请求结构
type EmbeddingRequest struct {
Model string `json:"model"`
Input struct {
Texts []string `json:"texts"`
} `json:"input"`
Parameters struct {
TextType string `json:"text_type,omitempty"`
} `json:"parameters,omitempty"`
}
// EmbeddingResponse 千问embedding响应结构
type EmbeddingResponse struct {
Output struct {
Embeddings []struct {
TextIndex int `json:"text_index"`
Embedding []float64 `json:"embedding"`
} `json:"embeddings"`
} `json:"output"`
Usage struct {
TotalTokens int `json:"total_tokens"`
} `json:"usage"`
RequestID string `json:"request_id"`
}
type APIError struct {
Code string `json:"code"`
Message string `json:"message"`
RequestID string `json:"request_id"`
}
func (e *APIError) Error() string {
return fmt.Sprintf("API Error: %s - %s (RequestID: %s)", e.Code, e.Message, e.RequestID)
}
func buildQwenClient(config *QwenEmbeddingConfig) *gclient.Client {
if len(config.BaseURL) == 0 {
config.BaseURL = defaultBaseURL
}
if config.Timeout == nil {
config.Timeout = &defaultTimeout
}
if config.RetryTimes == nil {
defaultRetryTimes := 2
config.RetryTimes = &defaultRetryTimes
}
if len(config.TextType) == 0 {
config.TextType = "document"
}
if config.MaxConcurrentRequests == nil {
defaultMaxConcurrentRequests := 5
config.MaxConcurrentRequests = &defaultMaxConcurrentRequests
}
client := g.Client()
client.SetTimeout(*config.Timeout)
return client
}
func NewQwenEmbedder(ctx context.Context, config *QwenEmbeddingConfig) (*QwenEmbedder, error) {
if len(config.APIKey) == 0 {
return nil, fmt.Errorf("[Qwen] APIKey is required")
}
if len(config.Model) == 0 {
return nil, fmt.Errorf("[Qwen] Model is required")
}
client := buildQwenClient(config)
return &QwenEmbedder{
client: client,
conf: config,
}, nil
}
func (e *QwenEmbedder) EmbedStrings(ctx context.Context, texts []string, opts ...embedding.Option) (
[][]float64, error) {
if len(texts) == 0 {
return nil, fmt.Errorf("[Qwen] texts cannot be empty")
}
options := embedding.GetCommonOptions(&embedding.Options{
Model: &e.conf.Model,
}, opts...)
conf := &embedding.Config{
Model: dereferenceOrZero(options.Model),
}
ctx = callbacks.EnsureRunInfo(ctx, e.GetType(), components.ComponentOfEmbedding)
ctx = callbacks.OnStart(ctx, &embedding.CallbackInput{
Texts: texts,
Config: conf,
})
defer func() {
if err := recover(); err != nil {
callbacks.OnError(ctx, fmt.Errorf("[Qwen] panic: %v", err))
}
}()
var usage *embedding.TokenUsage
var embeddings [][]float64
var err error
// 调用千问API获取embedding
embeddings, usage, err = e.callEmbeddingAPI(ctx, texts)
if err != nil {
callbacks.OnError(ctx, err)
return nil, err
}
callbacks.OnEnd(ctx, &embedding.CallbackOutput{
Embeddings: embeddings,
Config: conf,
TokenUsage: usage,
})
return embeddings, nil
}
func (e *QwenEmbedder) callEmbeddingAPI(ctx context.Context, texts []string) ([][]float64, *embedding.TokenUsage, error) {
// 构建请求
var req EmbeddingRequest
req.Model = e.conf.Model
req.Input.Texts = texts
req.Parameters.TextType = e.conf.TextType
// 调用API
client := e.client.Clone()
client.SetHeader("Authorization", "Bearer "+e.conf.APIKey)
client.SetHeader("Content-Type", "application/json")
client.SetTimeout(*e.conf.Timeout)
resp, err := client.Post(ctx, e.conf.BaseURL, req)
if err != nil {
return nil, nil, fmt.Errorf("[Qwen] HTTP request error: %w", err)
}
defer resp.Close()
// 检查状态码
if resp.StatusCode != http.StatusOK {
var errResp APIError
result := resp.ReadAll()
if err = gconv.Struct(result, &errResp); err == nil && errResp.Code != "" {
return nil, nil, &errResp
}
return nil, nil, fmt.Errorf("[Qwen] HTTP status error: %d", resp.StatusCode)
}
// 解析响应
var apiResp EmbeddingResponse
result := resp.ReadAll()
if err = gconv.Struct(result, &apiResp); err != nil {
return nil, nil, fmt.Errorf("[Qwen] parse response error: %w", err)
}
// 解析响应结果
embeddings := make([][]float64, len(texts))
for _, emb := range apiResp.Output.Embeddings {
if emb.TextIndex >= 0 && emb.TextIndex < len(embeddings) {
embeddings[emb.TextIndex] = emb.Embedding
}
}
usage := &embedding.TokenUsage{
TotalTokens: apiResp.Usage.TotalTokens,
}
g.Log().Debugf(ctx, "[Qwen] Embedding success: request_id=%s, total_tokens=%d", apiResp.RequestID, usage.TotalTokens)
return embeddings, usage, nil
}
func (e *QwenEmbedder) GetType() string {
return getType()
}
func (e *QwenEmbedder) IsCallbacksEnabled() bool {
return true
}
func getType() string {
return "Qwen"
}
func dereferenceOrZero[T any](v *T) T {
if v == nil {
var t T
return t
}
return *v
}

177
common/eino/indexer.go Normal file
View File

@@ -0,0 +1,177 @@
package eino
import (
"context"
"database/sql"
"errors"
"fmt"
"rag/dao"
"rag/model/dto"
"rag/model/entity"
"gitea.com/red-future/common/beans"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/components/indexer"
"github.com/cloudwego/eino/schema"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/util/gconv"
"github.com/pgvector/pgvector-go"
)
type PGVectorIndexerOptions struct {
BatchSize int // 每批处理多少条
}
type PGVectorIndexer struct {
opts *PGVectorIndexerOptions
}
func NewPGVectorIndexer(opts *PGVectorIndexerOptions) *PGVectorIndexer {
// 默认值
if opts.BatchSize <= 0 {
opts.BatchSize = 5
}
return &PGVectorIndexer{opts: opts}
}
func (i *PGVectorIndexer) Store(ctx context.Context, docs []*schema.Document, opts ...indexer.Option) (rows int64, err error) {
commonOpts := indexer.GetCommonOptions(&indexer.Options{}, opts...)
if commonOpts.Embedding == nil {
return 0, errors.New("embedding model not set")
}
// 回调
ctx = callbacks.OnStart(ctx, &indexer.CallbackInput{Docs: docs})
rows, err = i.bulkStore(ctx, docs, commonOpts)
if err != nil {
callbacks.OnError(ctx, err)
return
}
callbacks.OnEnd(ctx, &indexer.CallbackOutput{IDs: gconv.Strings(rows)})
return
}
func (i *PGVectorIndexer) bulkStore(ctx context.Context, docs []*schema.Document, opts *indexer.Options) (rows int64, err error) {
var batchDocs []*schema.Document
// 官方ES同款逻辑满 BatchSize 就处理一批
for _, doc := range docs {
batchDocs = append(batchDocs, doc)
// 满了 → 处理
if len(batchDocs) >= i.opts.BatchSize {
var r int64
r, err = i.doStore(ctx, batchDocs, opts)
if err != nil {
return
}
rows = rows + r
batchDocs = nil
}
}
// 最后一批
if len(batchDocs) > 0 {
var r int64
r, err = i.doStore(ctx, batchDocs, opts)
if err != nil {
return
}
rows = rows + r
}
return
}
func (i *PGVectorIndexer) doStore(ctx context.Context, docs []*schema.Document, opts *indexer.Options) (rows int64, err error) {
texts := make([]string, len(docs))
for i, d := range docs {
texts[i] = d.Content
}
// 向量化官方ES也没有重试
vectors, err := opts.Embedding.EmbedStrings(ctx, texts)
if err != nil {
return
}
// 转成业务实体
var chunks []*dto.VectorDocumentChunkMsg
for idx, doc := range docs {
ck := new(dto.VectorDocumentChunkMsg)
err = gconv.Struct(doc.MetaData, ck)
if err != nil {
glog.Errorf(ctx, "doStore err: %v", err)
continue
}
ck.Content = doc.Content
ck.Vector = pgvector.NewVector(gconv.Float32s(vectors[idx]))
ck.VectorStatus = gconv.PtrInt8(1)
ck.Status = gconv.PtrInt8(1)
chunks = append(chunks, ck)
}
if len(chunks) == 0 {
return
}
ctx = context.WithValue(ctx, "user", &beans.User{
TenantId: chunks[0].TenantId,
UserName: chunks[0].Creator,
})
// 创建索引
if err = i.createOrUpdateDatasetIndex(ctx, chunks[0].DatasetId, len(vectors[0]), int64(len(chunks))); err != nil {
return
}
// 入库
rows, err = dao.DocumentChunk.BatchInsert(ctx, chunks)
return
}
func (i *PGVectorIndexer) createOrUpdateDatasetIndex(ctx context.Context, datasetId int64, dimension int, vectorCount int64) error {
exist, err := dao.DatasetIndex.GetByDatasetId(ctx, datasetId)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return err
}
if exist != nil {
_ = dao.DatasetIndex.IncVectorCount(ctx, exist.Id, vectorCount)
return nil
}
indexName := fmt.Sprintf("idx_dataset_%d_vector", datasetId)
idx := &entity.DatasetIndex{
DatasetId: datasetId,
Name: indexName,
Dimension: dimension,
FieldType: "float",
MetricType: "COSINE",
Status: gconv.PtrInt8(1),
VectorCount: vectorCount,
Description: fmt.Sprintf("数据集%d向量索引", datasetId),
}
_, err = dao.DatasetIndex.Insert(ctx, idx)
if err != nil {
return err
}
return i.createRealPGVectorIndex(ctx, indexName)
}
func (i *PGVectorIndexer) createRealPGVectorIndex(ctx context.Context, indexName string) error {
if err := dao.DatasetIndex.InsertIndex(ctx, indexName); err != nil {
glog.Errorf(ctx, "create vector index failed: %v", err)
return err
}
glog.Infof(ctx, "created pgvector index: %s", indexName)
return nil
}
func (i *PGVectorIndexer) GetType() string {
return "pgvector_indexer"
}
func (i *PGVectorIndexer) IsCallbacksEnabled() bool {
return true
}

View File

@@ -0,0 +1,11 @@
package eino
// TaskPriority 任务优先级
type TaskPriority string
const (
TaskPriorityLow TaskPriority = "low" // 低优先级
TaskPriorityMedium TaskPriority = "medium" // 中优先级
TaskPriorityHigh TaskPriority = "high" // 高优先级
TaskPriorityUrgent TaskPriority = "urgent" // 紧急
)

117
common/eino/retriever.go Normal file
View File

@@ -0,0 +1,117 @@
package eino
import (
"context"
"errors"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/components/embedding"
"github.com/cloudwego/eino/components/retriever"
"github.com/cloudwego/eino/schema"
"github.com/gogf/gf/v2/util/gconv"
"github.com/pgvector/pgvector-go"
)
type PGVectorRetrieverConfig struct {
Embedder embedding.Embedder
DefaultTopK int
DefaultIndex string
}
type PGVectorRetriever struct {
embedder embedding.Embedder
topK int
index string
}
func NewPGVectorRetriever(config *PGVectorRetrieverConfig) (*PGVectorRetriever, error) {
if config.Embedder == nil {
return nil, errors.New("embedder is required")
}
if config.DefaultTopK <= 0 {
config.DefaultTopK = 5
}
return &PGVectorRetriever{
embedder: config.Embedder,
topK: config.DefaultTopK,
index: config.DefaultIndex,
}, nil
}
func (r *PGVectorRetriever) Retrieve(ctx context.Context, query string, opts ...retriever.Option) ([]*schema.Document, error) {
// 1. 处理公共 Option官方标准写法
options := &retriever.Options{
Index: &r.index,
TopK: &r.topK,
Embedding: r.embedder,
}
options = retriever.GetCommonOptions(options, opts...)
// 2. 回调(官方标准)
ctx = callbacks.OnStart(ctx, &retriever.CallbackInput{
Query: query,
TopK: *options.TopK,
})
// 3. 执行检索
docs, err := r.doRetrieve(ctx, query, options)
if err != nil {
callbacks.OnError(ctx, err)
return nil, err
}
// 4. 完成回调
callbacks.OnEnd(ctx, &retriever.CallbackOutput{
Docs: docs,
})
return docs, nil
}
func (r *PGVectorRetriever) doRetrieve(ctx context.Context, query string, opts *retriever.Options) ([]*schema.Document, error) {
// 1. 生成向量
vectors, err := opts.Embedding.EmbedStrings(ctx, []string{query})
if err != nil {
return nil, err
}
if len(vectors) == 0 {
return nil, errors.New("empty query vector")
}
queryVec := pgvector.NewVector(vectors[0])
topK := *opts.TopK
// 2. PG 向量相似度检索 SQL
sql := `
SELECT id, content, dataset_id, document_id,
vector <-> ? AS distance
FROM document_chunk
ORDER BY distance ASC
LIMIT ?
`
// 3. 查询
rows, err := dao.DocumentChunk.GetDB().GetAll(ctx, sql, queryVec, topK)
if err != nil {
return nil, err
}
// 4. 转为 Eino Document
docs := make([]*schema.Document, 0, len(rows))
for _, row := range rows {
docs = append(docs, &schema.Document{
ID: gconv.String(row["id"]),
Content: gconv.String(row["content"]),
Metadata: map[string]any{
"dataset_id": row["dataset_id"],
"document_id": row["document_id"],
"distance": row["distance"],
},
})
}
return docs, nil
}

View File

@@ -0,0 +1,12 @@
package eino
// TaskStatus 任务状态
type TaskStatus string
const (
TaskStatusPending TaskStatus = "pending" // 待处理
TaskStatusRunning TaskStatus = "running" // 运行中
TaskStatusCompleted TaskStatus = "completed" // 已完成
TaskStatusFailed TaskStatus = "failed" // 失败
TaskStatusCancelled TaskStatus = "cancelled" // 已取消
)

14
common/eino/task_type.go Normal file
View File

@@ -0,0 +1,14 @@
package eino
// TaskType 任务类型
type TaskType string
const (
TaskTypeDocumentIngestion TaskType = "document_ingestion" // 文档摄入任务
TaskTypeVectorIngestion TaskType = "vector_ingestion" // 向量摄入任务
TaskTypeIndexCreation TaskType = "index_creation" // 索引创建任务
TaskTypeQAProcessing TaskType = "qa_processing" // 问答处理任务
TaskTypeKnowledgeConstruction TaskType = "knowledge_construction" // 知识库构建任务
TaskTypeGraphBuilding TaskType = "graph_building" // 图谱构建任务
TaskTypeKnowledgeSync TaskType = "knowledge_sync" // 知识同步任务
)

114
common/gse/utils.go Normal file
View File

@@ -0,0 +1,114 @@
package gse
import (
"context"
"sort"
"github.com/go-ego/gse"
"github.com/go-ego/gse/hmm/extracker"
"github.com/go-ego/gse/hmm/segment"
"github.com/gogf/gf/v2/os/glog"
)
var GseTool *gseTool
// 初始化函数:程序启动时执行一次
func init() {
var err error
GseTool, err = newGseTool()
if err != nil {
glog.Error(context.Background(), err)
}
}
// gseTool 关键词提取工具gse v1.0.2 标准)
type gseTool struct {
seg gse.Segmenter
tfidf *extracker.TagExtracter
tr *extracker.TextRanker
}
// newGseTool 初始化工具(内置词典 + 停用词)
func newGseTool() (tool *gseTool, err error) {
// 1. 初始化分词器
var seg gse.Segmenter
// 内置词典(无外部文件)
err = seg.LoadDictEmbed()
if err != nil {
return
}
// 内置停用词v1.0.2 标准)
err = seg.LoadStopEmbed()
if err != nil {
return
}
// 2. 初始化 TF-IDF 提取器
tfidf := &extracker.TagExtracter{}
tfidf.WithGse(seg)
err = tfidf.LoadIdf()
if err != nil {
return
}
// 3. 初始化 TextRank 提取器
tr := &extracker.TextRanker{}
tr.WithGse(seg)
tool = &gseTool{
seg: seg,
tfidf: tfidf,
tr: tr,
}
return
}
// Cut 分词(关键词提取唯一正确模式:精确模式 + HMM
func (k *gseTool) Cut(text string) []string {
return k.seg.Cut(text, true)
}
// Keyword 最终输出:关键词 + 权重
type Keyword struct {
Word string `json:"word"`
Score float64 `json:"score"`
}
func (k *gseTool) Extract(text string, topN int) []Keyword {
// 1. 提取 TF-IDF
tfTags := k.extractTFIDF(text, topN)
// 2. 提取 TextRank
trTags := k.extractTextRank(text, topN)
// 3. 合并成最终关键词(业务最常用)
scoreMap := make(map[string]float64)
for _, tag := range tfTags {
scoreMap[tag.Text] = tag.Weight
}
for _, tag := range trTags {
scoreMap[tag.Text] = tag.Weight
}
// 转成切片并排序(高分在前)
res := make([]Keyword, 0, len(scoreMap))
for word, score := range scoreMap {
res = append(res, Keyword{Word: word, Score: score})
}
sort.Slice(res, func(i, j int) bool {
return res[i].Score > res[j].Score
})
return res
}
// ExtractTFIDF TF-IDF 关键词带权重90% 业务:文章标签、搜索、关键词
func (k *gseTool) extractTFIDF(text string, topN int) segment.Segments {
return k.tfidf.ExtractTags(text, topN)
}
// ExtractTextRank TextRank 关键词(带权重)长文本、摘要、语义理解
func (k *gseTool) extractTextRank(text string, topN int) segment.Segments {
return k.tr.TextRank(text, topN)
}

View File

@@ -12,8 +12,9 @@ database:
user: "postgres"
pass: "Bjang09@686^*^"
name: "rag"
prefix: "rag_knowledge_" # (可选)表名前缀
role: "master" # (可选)数据库主从角色(master/slave)默认为master。如果不使用应用主从机制请不配置或留空即可。
debug: false # (可选)开启调试模式
debug: true # (可选)开启调试模式
dryRun: false # (可选)ORM空跑(只读不写)
charset: "utf8" # (可选)数据库编码(如: utf8mb4/utf8/gbk/gb2312)一般设置为utf8mb4。默认为utf8。
timezone: "Asia/Shanghai" # (可选)时区配置,例如:Local
@@ -30,7 +31,8 @@ database:
port: "15432"
user: "postgres"
pass: "Bjang09@686^*^"
name: "rag"
name: "tenant-1"
prefix: "rag_knowledge_" # (可选)表名前缀
role: "slave" # (可选)数据库主从角色(master/slave)默认为master。如果不使用应用主从机制请不配置或留空即可。
debug: false # (可选)开启调试模式
dryRun: false # (可选)ORM空跑(只读不写)
@@ -44,15 +46,36 @@ database:
updatedAt: "updated_at" # (可选)自动更新时间字段名称
deletedAt: "deleted_at" # (可选)软删除时间字段名称
timeMaintainDisabled: false # (可选)是否完全关闭时间更新特性为true时CreatedAt/UpdatedAt/DeletedAt都将失效
tenant-1:
rag_knowledge:
- type: "pgsql"
host: "localhost"
port: "5432"
user: "postgres"
pass: "123456"
name: "tenant"
name: "tenant-1"
prefix: "rag_knowledge_" # (可选)表名前缀
role: "master"
debug: true # (可选)开启调试模式
dryRun: false # (可选)ORM空跑(只读不写)
charset: "utf8" # (可选)数据库编码(如: utf8mb4/utf8/gbk/gb2312)一般设置为utf8mb4。默认为utf8。
timezone: "Asia/Shanghai" # (可选)时区配置,例如:Local
maxIdle: 5 # (可选)连接池最大闲置的连接数(默认10)
maxOpen: 20 # (可选)连接池最大打开的连接数(默认无限制)
maxLifetime: "30s" # (可选)连接对象可重复使用的时间长度(默认30秒)
maxIdleConnTime: "30s" # (可选v2.10新增)连接池中空闲连接的最大生存时间(默认30秒)。可以通过配置文件或SetConnMaxIdleTime方法设置避免长时间空闲连接占用资源。
createdAt: "created_at" # (可选)自动创建时间字段名称
updatedAt: "updated_at" # (可选)自动更新时间字段名称
deletedAt: "deleted_at" # (可选)软删除时间字段名称
timeMaintainDisabled: false # (可选)是否完全关闭时间更新特性为true时CreatedAt/UpdatedAt/DeletedAt都将失效
rag_vector:
- type: "pgsql"
host: "localhost"
port: "5432"
user: "postgres"
pass: "123456"
name: "tenant-1"
prefix: "rag_vector_" # (可选)表名前缀
role: "master"
prefix: "rag_" # (可选)表名前缀
debug: true # (可选)开启调试模式
dryRun: false # (可选)ORM空跑(只读不写)
charset: "utf8" # (可选)数据库编码(如: utf8mb4/utf8/gbk/gb2312)一般设置为utf8mb4。默认为utf8。

View File

@@ -1,5 +1,10 @@
package public
const (
DbNameKnowledge = "rag_knowledge"
DbNameVector = "rag_vector"
)
// sql 数据库表名
const (
TableNameDocument = "document"

View File

@@ -1,5 +0,0 @@
package controller
type datasetIndex struct{}
var DatasetIndex = new(datasetIndex)

View File

@@ -22,7 +22,7 @@ func (d *datasetDao) Insert(ctx context.Context, req *dto.CreateDatasetReq) (id
if err = gconv.Struct(req, &res); err != nil {
return
}
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDataset).Data(&res).Insert()
r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDataset).Data(&res).Insert()
if err != nil {
return
}
@@ -31,7 +31,7 @@ func (d *datasetDao) Insert(ctx context.Context, req *dto.CreateDatasetReq) (id
// Update 更新数据集
func (d *datasetDao) Update(ctx context.Context, req *dto.UpdateDatasetReq) (rows int64, err error) {
model := gfdb.DB(ctx).Model(ctx, public.TableNameDataset).OmitEmpty()
model := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDataset).OmitEmpty()
if !g.IsEmpty(req.DocumentCount) {
model.Data(entity.DatasetCol.DocumentCount, &gdb.Counter{
Field: entity.DatasetCol.DocumentCount,
@@ -53,7 +53,7 @@ func (d *datasetDao) Update(ctx context.Context, req *dto.UpdateDatasetReq) (row
// Delete 删除数据集
func (d *datasetDao) Delete(ctx context.Context, req *dto.DeleteDatasetReq) (rows int64, err error) {
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDataset).Where(entity.DatasetCol.Id, req.Id).Delete()
r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDataset).Where(entity.DatasetCol.Id, req.Id).Delete()
if err != nil {
return
}
@@ -61,7 +61,7 @@ func (d *datasetDao) Delete(ctx context.Context, req *dto.DeleteDatasetReq) (row
}
func (d *datasetDao) GetByID(ctx context.Context, req *dto.GetDatasetReq, fields ...string) (res *entity.Dataset, err error) {
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDataset).Where(entity.DatasetCol.Id, req.Id).Fields(fields).One()
r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDataset).Where(entity.DatasetCol.Id, req.Id).Fields(fields).One()
if err != nil {
return
}
@@ -71,7 +71,7 @@ func (d *datasetDao) GetByID(ctx context.Context, req *dto.GetDatasetReq, fields
// List 获取数据集列表
func (d *datasetDao) List(ctx context.Context, req *dto.ListDatasetReq, fields ...string) (res []*entity.Dataset, total int, err error) {
model := gfdb.DB(ctx).Model(ctx, public.TableNameDataset).Fields(fields).OmitEmpty()
model := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDataset).Fields(fields).OmitEmpty()
if !g.IsEmpty(req.Keyword) {
model.WhereLike(entity.DatasetCol.Name, "%"+req.Keyword+"%")
}

View File

@@ -16,7 +16,7 @@ type datasetIndexDao struct{}
// Insert 插入数据集索引
func (d *datasetIndexDao) Insert(ctx context.Context, index *entity.DatasetIndex) (id int64, err error) {
_, err = gfdb.DB(ctx).Model(ctx, public.TableNameDatasetIndex).Data(index).Insert()
_, err = gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDatasetIndex).Data(index).Insert()
if err != nil {
return
}
@@ -25,7 +25,7 @@ func (d *datasetIndexDao) Insert(ctx context.Context, index *entity.DatasetIndex
// GetByDatasetId 根据数据集ID获取索引
func (d *datasetIndexDao) GetByDatasetId(ctx context.Context, datasetId int64) (result *entity.DatasetIndex, err error) {
err = gfdb.DB(ctx).Model(ctx, public.TableNameDatasetIndex).Where(entity.DatasetIndexCol.DatasetId, datasetId).Scan(&result)
err = gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDatasetIndex).Where(entity.DatasetIndexCol.DatasetId, datasetId).Scan(&result)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
@@ -37,23 +37,20 @@ func (d *datasetIndexDao) GetByDatasetId(ctx context.Context, datasetId int64) (
// IncVectorCount 增加或减少向量数量
func (d *datasetIndexDao) IncVectorCount(ctx context.Context, id int64, delta int64) (err error) {
_, err = gfdb.DB(ctx).Model(ctx, public.TableNameDatasetIndex).
_, err = gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDatasetIndex).
Where(entity.DatasetIndexCol.Id, id).
Increment(entity.DatasetIndexCol.VectorCount, delta)
return
}
func (d *datasetIndexDao) InsertIndex(ctx context.Context, indexName string) (err error) {
prefix, err := gfdb.GetTablePrefix(ctx)
if err != nil {
return
}
db := gfdb.DB(ctx, public.DbNameVector)
sqlStr := fmt.Sprintf(`
CREATE INDEX IF NOT EXISTS %s
ON %s
USING ivfflat (vector vector_cosine_ops)
WHERE vector IS NOT NULL;
`, indexName, prefix+public.TableNameDocumentChunk)
_, err = gfdb.DB(ctx).Exec(ctx, sqlStr)
`, indexName, gfdb.TablePrefix+public.TableNameDocumentChunk)
_, err = db.Exec(ctx, sqlStr)
return
}

View File

@@ -22,7 +22,7 @@ func (d *documentDao) Insert(ctx context.Context, req *dto.CreateDocumentReq) (i
if err = gconv.Struct(req, &res); err != nil {
return
}
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDocument).Data(&res).Insert()
r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDocument).Data(&res).Insert()
if err != nil {
return
}
@@ -31,7 +31,7 @@ func (d *documentDao) Insert(ctx context.Context, req *dto.CreateDocumentReq) (i
// Update 更新文件
func (d *documentDao) Update(ctx context.Context, req *dto.UpdateDocumentReq) (rows int64, err error) {
model := gfdb.DB(ctx).Model(ctx, public.TableNameDocument).OmitEmpty()
model := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDocument).OmitEmpty()
if !g.IsEmpty(req.ChunkCount) {
model.Data(entity.DocumentCol.ChunkCount, &gdb.Counter{
Field: entity.DocumentCol.ChunkCount,
@@ -48,7 +48,7 @@ func (d *documentDao) Update(ctx context.Context, req *dto.UpdateDocumentReq) (r
// Delete 删除文件
func (d *documentDao) Delete(ctx context.Context, req *dto.DeleteDocumentReq) (rows int64, err error) {
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDocument).Where(entity.DocumentCol.Id, req.Id).Delete()
r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDocument).Where(entity.DocumentCol.Id, req.Id).Delete()
if err != nil {
return
}
@@ -57,7 +57,7 @@ func (d *documentDao) Delete(ctx context.Context, req *dto.DeleteDocumentReq) (r
// GetByID 根据ID获取文件
func (d *documentDao) GetByID(ctx context.Context, req *dto.GetDocumentReq, fields ...string) (res *entity.Document, err error) {
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDocument).Where(entity.DocumentCol.Id, req.Id).Fields(fields).One()
r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDocument).Where(entity.DocumentCol.Id, req.Id).Fields(fields).One()
if err != nil {
return
}
@@ -67,7 +67,7 @@ func (d *documentDao) GetByID(ctx context.Context, req *dto.GetDocumentReq, fiel
// List 获取文件列表
func (d *documentDao) List(ctx context.Context, req *dto.ListDocumentReq, fields ...string) (res []*entity.Document, total int, err error) {
model := gfdb.DB(ctx).Model(ctx, public.TableNameDocument).OmitEmpty()
model := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDocument).OmitEmpty()
if !g.IsEmpty(req.Keyword) {
model.WhereLike(entity.DocumentCol.Title, "%"+req.Keyword+"%")
}

View File

@@ -20,7 +20,7 @@ func (d *documentChunkDao) BatchInsert(ctx context.Context, req []*dto.VectorDoc
if err = gconv.Structs(req, &res); err != nil {
return
}
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDocumentChunk).Data(&res).Insert()
r, err := gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDocumentChunk).Data(&res).Insert()
if err != nil {
return
}
@@ -29,7 +29,7 @@ func (d *documentChunkDao) BatchInsert(ctx context.Context, req []*dto.VectorDoc
// Update 更新文件块
func (d *documentChunkDao) Update(ctx context.Context, req *dto.UpdateDocumentChunkReq) (rows int64, err error) {
model := gfdb.DB(ctx).Model(ctx, public.TableNameDocumentChunk)
model := gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDocumentChunk)
r, err := model.Data(&req).Where(entity.DocumentChunkCol.Id, req.Id).Update()
if err != nil {
return
@@ -39,7 +39,7 @@ func (d *documentChunkDao) Update(ctx context.Context, req *dto.UpdateDocumentCh
// List 文件块列表
func (d *documentChunkDao) List(ctx context.Context, req *dto.ListDocumentChunkReq, fields ...string) (res []*entity.DocumentChunk, total int, err error) {
model := gfdb.DB(ctx).Model(ctx, public.TableNameDocumentChunk).Fields(fields).OmitEmpty().
model := gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDocumentChunk).Fields(fields).OmitEmpty().
Where(entity.DocumentChunkCol.DatasetId, req.DatasetId).
Where(entity.DocumentChunkCol.DocumentId, req.DocumentId).
Where(entity.DocumentChunkCol.Status, req.Status).
@@ -55,50 +55,3 @@ func (d *documentChunkDao) List(ctx context.Context, req *dto.ListDocumentChunkR
err = r.Structs(&res)
return
}
//// Insert 插入向量文档
//func (d *vectorDocumentDao) Insert(ctx context.Context, docs []*entity.DocumentChunk) (ids []interface{}, err error) {
// if len(docs) == 0 {
// return
// }
// interfaces := make([]interface{}, len(docs))
// for i := range docs {
// interfaces[i] = docs[i]
// }
// return mongoDB.Insert(ctx, interfaces, CollectionVectorDoc)
//}
//
//// DeleteByIDs 根据ID删除向量文档
//func (d *vectorDocumentDao) DeleteByIDs(ctx context.Context, ids []string) (err error) {
// if len(ids) == 0 {
// return
// }
// objectIDs := make([]bson.ObjectID, len(ids))
// for i, id := range ids {
// objectIDs[i], err = bson.ObjectIDFromHex(id)
// if err != nil {
// return err
// }
// }
// filter := bson.M{"_id": bson.M{"$in": objectIDs}}
// _, err = mongoDB.Delete(ctx, filter, CollectionVectorDoc)
// return
//}
//
//// GetByIndexID 根据索引ID获取向量文档
//func (d *vectorDocumentDao) GetByIndexID(ctx context.Context, indexID string, limit int) (result []*entity.DocumentChunk, err error) {
// filter := bson.M{"indexId": indexID}
// page := &beans.Page{PageNum: 1, PageSize: int64(limit)}
// _, err = mongoDB.Find(ctx, filter, &result, CollectionVectorDoc, page, nil)
// return
//}
//
//// GetByVectorIDs 根据向量ID获取向量文档
//func (d *vectorDocumentDao) GetByVectorIDs(ctx context.Context, vectorIDs []string) (result []*entity.DocumentChunk, err error) {
// if len(vectorIDs) == 0 {
// return
// }
// filter := bson.M{"vectorId": bson.M{"$in": vectorIDs}}
// _, err = mongoDB.Find(ctx, filter, &result, CollectionVectorDoc, &beans.Page{PageSize: -1}, nil)
// return
//}

View File

@@ -20,7 +20,7 @@ func (d *keywordDao) Insert(ctx context.Context, req *dto.CreateKeywordReq) (id
if err = gconv.Struct(req, &res); err != nil {
return
}
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameKeyword).Data(&res).Insert()
r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword).Data(&res).Insert()
if err != nil {
return
}
@@ -32,7 +32,7 @@ func (d *keywordDao) BatchSaveOrUpdate(ctx context.Context, req []*dto.CreateKey
if err = gconv.Structs(req, &res); err != nil {
return
}
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameKeyword).Data(&res).OnConflict(
r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword).Data(&res).OnConflict(
entity.KeywordCol.TenantId,
entity.KeywordCol.DatasetId,
entity.KeywordCol.DocumentId,
@@ -44,7 +44,7 @@ func (d *keywordDao) BatchSaveOrUpdate(ctx context.Context, req []*dto.CreateKey
}
func (d *keywordDao) Update(ctx context.Context, req *dto.UpdateKeywordReq) (rows int64, err error) {
model := gfdb.DB(ctx).Model(ctx, public.TableNameKeyword)
model := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword)
r, err := model.Data(&req).Where(entity.KeywordCol.Id, req.Id).Update()
if err != nil {
return
@@ -53,7 +53,7 @@ func (d *keywordDao) Update(ctx context.Context, req *dto.UpdateKeywordReq) (row
}
func (d *keywordDao) Delete(ctx context.Context, req *dto.DeleteKeywordReq) (rows int64, err error) {
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameKeyword).Where(entity.KeywordCol.Id, req.Id).Delete()
r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword).Where(entity.KeywordCol.Id, req.Id).Delete()
if err != nil {
return
}
@@ -61,7 +61,7 @@ func (d *keywordDao) Delete(ctx context.Context, req *dto.DeleteKeywordReq) (row
}
func (d *keywordDao) Count(ctx context.Context, req *dto.ListKeywordReq) (count int, err error) {
count, err = gfdb.DB(ctx).Model(ctx, public.TableNameKeyword).OmitEmpty().
count, err = gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword).OmitEmpty().
Where(entity.KeywordCol.DatasetId, req.DatasetId).
Where(entity.KeywordCol.DocumentId, req.DocumentId).
Where(entity.KeywordCol.Word, req.Word).Count()
@@ -69,7 +69,7 @@ func (d *keywordDao) Count(ctx context.Context, req *dto.ListKeywordReq) (count
}
func (d *keywordDao) GetByID(ctx context.Context, req *dto.GetKeywordReq, fields ...string) (res *entity.Document, err error) {
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameKeyword).Where(entity.KeywordCol.Id, req.Id).Fields(fields).One()
r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword).Where(entity.KeywordCol.Id, req.Id).Fields(fields).One()
if err != nil {
return
}
@@ -78,7 +78,7 @@ func (d *keywordDao) GetByID(ctx context.Context, req *dto.GetKeywordReq, fields
}
func (d *keywordDao) List(ctx context.Context, req *dto.ListKeywordReq, fields ...string) (res []*entity.Keyword, total int, err error) {
model := gfdb.DB(ctx).Model(ctx, public.TableNameKeyword).Fields(fields).OmitEmpty()
model := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword).Fields(fields).OmitEmpty()
if !g.IsEmpty(req.Keyword) {
model.WhereLike(entity.KeywordCol.Word, "%"+req.Keyword+"%")
}

35
go.mod
View File

@@ -3,15 +3,30 @@ module rag
go 1.26.0
require (
gitea.com/red-future/common v0.0.6
gitea.com/red-future/common v0.0.11
github.com/bjang03/gmq v0.0.0-00010101000000-000000000000
github.com/cloudwego/eino v0.8.6
github.com/cloudwego/eino-ext/components/document/loader/url v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/document/parser/docx v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/document/parser/pdf v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/document/parser/xlsx v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/document/transformer/splitter/recursive v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/document/transformer/splitter/semantic v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/embedding/ark v0.1.1
github.com/cloudwego/eino-ext/components/embedding/dashscope v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/embedding/openai v0.0.0-20260323112355-f061db7e8419
github.com/cloudwego/eino-ext/components/indexer/es8 v0.0.0-20260331071634-4f359694d2d9
github.com/cloudwego/eino-ext/components/model/ark v0.1.65
github.com/cloudwego/eino-ext/components/retriever/es8 v0.0.0-20260331071634-4f359694d2d9
github.com/elastic/go-elasticsearch/v8 v8.16.0
github.com/go-ego/gse v1.0.2
github.com/gogf/gf/contrib/drivers/pgsql/v2 v2.10.0
github.com/gogf/gf/v2 v2.10.0
github.com/golang/glog v1.2.5
github.com/pgvector/pgvector-go v0.3.0
)
replace gitea.com/red-future/common v0.0.6 => ../common
//replace gitea.com/red-future/common v0.0.11 => ../common
replace github.com/bjang03/gmq => ../gmq
@@ -35,18 +50,7 @@ require (
github.com/clipperhouse/displaywidth v0.11.0 // indirect
github.com/clipperhouse/uax29/v2 v2.7.0 // indirect
github.com/cloudwego/base64x v0.1.6 // indirect
github.com/cloudwego/eino-ext/components/document/loader/url v0.0.0-20260323112355-f061db7e8419 // indirect
github.com/cloudwego/eino-ext/components/document/parser/docx v0.0.0-20260323112355-f061db7e8419 // indirect
github.com/cloudwego/eino-ext/components/document/parser/html v0.0.0-20241224063832-9fbcc0e56c28 // indirect
github.com/cloudwego/eino-ext/components/document/parser/pdf v0.0.0-20260323112355-f061db7e8419 // indirect
github.com/cloudwego/eino-ext/components/document/parser/xlsx v0.0.0-20260323112355-f061db7e8419 // indirect
github.com/cloudwego/eino-ext/components/document/transformer/splitter/recursive v0.0.0-20260323112355-f061db7e8419 // indirect
github.com/cloudwego/eino-ext/components/document/transformer/splitter/semantic v0.0.0-20260323112355-f061db7e8419 // indirect
github.com/cloudwego/eino-ext/components/embedding/ark v0.1.1 // indirect
github.com/cloudwego/eino-ext/components/embedding/dashscope v0.0.0-20260323112355-f061db7e8419 // indirect
github.com/cloudwego/eino-ext/components/embedding/openai v0.0.0-20260323112355-f061db7e8419 // indirect
github.com/cloudwego/eino-ext/components/indexer/es8 v0.0.0-20260331071634-4f359694d2d9 // indirect
github.com/cloudwego/eino-ext/components/retriever/es8 v0.0.0-20260331071634-4f359694d2d9 // indirect
github.com/cloudwego/eino-ext/libs/acl/openai v0.1.14 // indirect
github.com/dgraph-io/badger/v4 v4.2.0 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
@@ -56,13 +60,11 @@ require (
github.com/eino-contrib/docx2md v0.0.1 // indirect
github.com/eino-contrib/jsonschema v1.0.3 // indirect
github.com/elastic/elastic-transport-go/v8 v8.10.0 // indirect
github.com/elastic/go-elasticsearch/v8 v8.16.0 // indirect
github.com/emirpasic/gods/v2 v2.0.0-alpha // indirect
github.com/evanphx/json-patch v0.5.2 // indirect
github.com/fatih/color v1.19.0 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.13 // indirect
github.com/go-ego/gse v1.0.2 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-playground/locales v0.14.1 // indirect
@@ -74,7 +76,6 @@ require (
github.com/gogf/gf/contrib/trace/otlphttp/v2 v2.9.5 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v5 v5.3.1 // indirect
github.com/golang/glog v1.2.5 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v1.0.0 // indirect
@@ -134,7 +135,7 @@ require (
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/vcaesar/cedar v0.30.0 // indirect
github.com/volcengine/volc-sdk-golang v1.0.199 // indirect
github.com/volcengine/volcengine-go-sdk v1.0.181 // indirect
github.com/volcengine/volcengine-go-sdk v1.2.9 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/xuri/efp v0.0.0-20240408161823-9ad904a10d6d // indirect
github.com/xuri/excelize/v2 v2.9.0 // indirect

9
go.sum
View File

@@ -33,6 +33,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
entgo.io/ent v0.14.3 h1:wokAV/kIlH9TeklJWGGS7AYJdVckr0DloWjIcO9iIIQ=
entgo.io/ent v0.14.3/go.mod h1:aDPE/OziPEu8+OWbzy4UlvWmD2/kbRuWfK2A40hcxJM=
gitea.com/red-future/common v0.0.11 h1:AV7W3G0uZ8aPpHHSHd4ZHmLWe5+2STPKe/AYPoPCWVc=
gitea.com/red-future/common v0.0.11/go.mod h1:B8syUI4XbLCDQSeRHURYxEwnWw8mEFgmqCxjC+lM+NU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/toml v1.6.0 h1:dRaEfpa2VI55EwlIW72hMRHdWouJeRF7TPYhI+AUQjk=
@@ -156,6 +158,8 @@ github.com/cloudwego/eino-ext/components/embedding/openai v0.0.0-20260323112355-
github.com/cloudwego/eino-ext/components/embedding/openai v0.0.0-20260323112355-f061db7e8419/go.mod h1:SajSFFRIXJXIbxadAAlSUIS5KTY8R/jzJg9RNSOXCCI=
github.com/cloudwego/eino-ext/components/indexer/es8 v0.0.0-20260331071634-4f359694d2d9 h1:vZ3dL8xwo2sy73aBVKs4AJiO5OCHRxMOJUwIYkp0CWs=
github.com/cloudwego/eino-ext/components/indexer/es8 v0.0.0-20260331071634-4f359694d2d9/go.mod h1:+oI0sr0rA0OHCxaQJ0rzMYld3LAODHhPKzBx5JYCya0=
github.com/cloudwego/eino-ext/components/model/ark v0.1.65 h1:52ukXVU9ntToTa36SwI8be81qskGkpUEZraIFOf0wqk=
github.com/cloudwego/eino-ext/components/model/ark v0.1.65/go.mod h1:aabMR15RTXBSi9Eu13CWavzE+no5BQO4FJUEEdqImbg=
github.com/cloudwego/eino-ext/components/retriever/es8 v0.0.0-20260331071634-4f359694d2d9 h1:Sl6giB1SJlA+ZlO0gzPH05IsUORtdYYPN6GiyH1B9MA=
github.com/cloudwego/eino-ext/components/retriever/es8 v0.0.0-20260331071634-4f359694d2d9/go.mod h1:H4kNmiTe2irnvipVNIP4q8yqXf2fZ6v24krvQYBtYb8=
github.com/cloudwego/eino-ext/libs/acl/openai v0.1.14 h1:yOZII6VYaL00CVZYba+HUixFygsW0Xz/1QjQ5htj1Ls=
@@ -733,8 +737,8 @@ github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV
github.com/volcengine/volc-sdk-golang v1.0.23/go.mod h1:AfG/PZRUkHJ9inETvbjNifTDgut25Wbkm2QoYBTbvyU=
github.com/volcengine/volc-sdk-golang v1.0.199 h1:zv9QOqTl/IsLwtfC37GlJtcz6vMAHi+pjq8ILWjLYUc=
github.com/volcengine/volc-sdk-golang v1.0.199/go.mod h1:stZX+EPgv1vF4nZwOlEe8iGcriUPRBKX8zA19gXycOQ=
github.com/volcengine/volcengine-go-sdk v1.0.181 h1:/3PB4M1N4fjMqiSKTJwX43EZ5Nn1HUOtQrSCk+22+wI=
github.com/volcengine/volcengine-go-sdk v1.0.181/go.mod h1:gfEDc1s7SYaGoY+WH2dRrS3qiuDJMkwqyfXWCa7+7oA=
github.com/volcengine/volcengine-go-sdk v1.2.9 h1:du2gnImtyWXKkQFnJW/GXCs+UBibGGOXIbP1Ams2pB8=
github.com/volcengine/volcengine-go-sdk v1.2.9/go.mod h1:oxoVo+A17kvkwPkIeIHPVLjSw7EQAm+l/Vau1YGHN+A=
github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc=
github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw=
github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg=
@@ -1191,6 +1195,7 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=

View File

@@ -1 +0,0 @@
package dto

View File

@@ -1,5 +0,0 @@
package service
var DatasetIndex = new(datasetIndexService)
type datasetIndexService struct{}

View File

@@ -2,7 +2,10 @@ package service
import (
"context"
"errors"
"fmt"
"rag/common/eino"
"rag/common/gse"
"rag/consts/document"
"rag/consts/public"
"rag/dao"
@@ -16,8 +19,6 @@ import (
"gitea.com/red-future/common/db/gfdb"
"gitea.com/red-future/common/full-text-search/meilisearch"
"gitea.com/red-future/common/http"
"gitea.com/red-future/common/rag/eino"
"gitea.com/red-future/common/rag/gse"
"gitea.com/red-future/common/utils"
gmq "github.com/bjang03/gmq/core/gmq"
"github.com/bjang03/gmq/mq"
@@ -123,6 +124,9 @@ func (s *documentService) Process(ctx context.Context, req *dto.ProcessDocumentR
if err != nil {
return nil, err
}
if g.IsEmpty(doc) {
return nil, errors.New("document not found")
}
// 2. 使用eino框架进行文件切分并发执行
var vectorDocsCount, chunks int64
@@ -251,7 +255,7 @@ func (s *documentService) sqlSplitDocument(ctx context.Context, doc *entity.Docu
return
}
// 3. 组装向量文档
var vectorDocs = make([]dto.VectorDocumentChunkMsg, 0)
var docsChunk = make([]*schema.Document, 0)
for i, t := range docsSplit {
contentHash := gmd5.MustEncryptString(t.Content)
// 检查是否重复
@@ -263,27 +267,26 @@ func (s *documentService) sqlSplitDocument(ctx context.Context, doc *entity.Docu
if !success {
continue
}
vectorDocs = append(vectorDocs, dto.VectorDocumentChunkMsg{
TenantId: doc.TenantId,
Creator: doc.Creator,
DatasetId: doc.DatasetId,
DocumentId: doc.Id,
Content: t.Content,
ContentHash: contentHash,
ChunkIndex: gconv.Int64(i),
})
var metaData = make(map[string]any)
metaData[entity.DocumentCol.TenantId] = doc.TenantId
metaData[entity.DocumentCol.Creator] = doc.Creator
metaData[entity.DocumentCol.DatasetId] = doc.DatasetId
metaData[entity.DocumentChunkCol.DocumentId] = doc.Id
metaData[entity.DocumentChunkCol.ContentHash] = contentHash
metaData[entity.DocumentChunkCol.ChunkIndex] = gconv.Int64(i)
t.MetaData = metaData
docsChunk = append(docsChunk, t)
}
// 4. 发送消息到队列
if len(vectorDocs) > 0 {
if len(docsChunk) > 0 {
err = gmq.GetGmq("primary").GmqPublish(ctx, &mq.RedisPubMessage{
PubMessage: types.PubMessage{
Topic: public.KnowledgeDocumentChunkTopic,
Data: vectorDocs,
Data: docsChunk,
},
})
}
vectorDocsCount = gconv.Int64(len(vectorDocs))
vectorDocsCount = gconv.Int64(len(docsChunk))
return
}
@@ -318,12 +321,12 @@ func (s *documentService) esSplitDocument(ctx context.Context, doc *entity.Docum
}
// 构建Meilisearch文档
meiliDocs = append(meiliDocs, map[string]interface{}{
"id": contentHash,
"datasetId": doc.DatasetId,
"documentId": doc.Id,
"content": t.Content,
"contentHash": contentHash,
"chunkIndex": i,
entity.DocumentChunkCol.Id: contentHash,
entity.DocumentChunkCol.DatasetId: doc.DatasetId,
entity.DocumentChunkCol.DocumentId: doc.Id,
entity.DocumentChunkCol.Content: t.Content,
entity.DocumentChunkCol.ContentHash: contentHash,
entity.DocumentChunkCol.ChunkIndex: i,
})
}
// 4. 写入到meilisearch数据库中

View File

@@ -2,23 +2,20 @@ package service
import (
"context"
"database/sql"
"errors"
"fmt"
"rag/common/eino"
"rag/consts/document"
"rag/consts/public"
"rag/dao"
"rag/model/dto"
"rag/model/entity"
"gitea.com/red-future/common/beans"
"gitea.com/red-future/common/rag/eino"
gmq "github.com/bjang03/gmq/core/gmq"
"github.com/bjang03/gmq/mq"
"github.com/bjang03/gmq/types"
"github.com/cloudwego/eino/components/indexer"
"github.com/cloudwego/eino/schema"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
"github.com/pgvector/pgvector-go"
)
var DocumentChunk = new(documentChunkService)
@@ -49,115 +46,33 @@ func (s *documentChunkService) List(ctx context.Context, req *dto.ListDocumentCh
}
func (s *documentChunkService) DocsChunkMsg(ctx context.Context, msg any) (err error) {
var req = make([]*dto.VectorDocumentChunkMsg, 0)
var docs = make([]*schema.Document, 0)
msgMap := gconv.Map(msg)
if err = gconv.Structs(msgMap["data"], &req); err != nil {
if err = gconv.Structs(msgMap["data"], &docs); err != nil {
g.Log().Error(ctx, "DocsChunkMsg err:", err)
return
}
if len(req) == 0 {
if len(docs) == 0 {
g.Log().Error(ctx, "DocsChunkMsg err:", "msg is empty")
return
}
ctx = context.WithValue(ctx, "user", &beans.User{
TenantId: req[0].TenantId,
UserName: req[0].Creator,
idx := eino.NewPGVectorIndexer(&eino.PGVectorIndexerOptions{
BatchSize: 10,
})
// 调用eino接口获取向量
var vectorDocsStr = make([]string, 0, len(req))
for _, t := range req {
vectorDocsStr = append(vectorDocsStr, t.Content)
}
embeddings, err := eino.EmbedStrings(ctx, vectorDocsStr)
if err != nil {
g.Log().Error(ctx, "DocsChunkMsg err:", err)
err = s.publishKnowledgeDocumentMsg(ctx, req[0].TenantId, req[0].Creator, req[0].DocumentId, document.VectorStatusFailed.Code())
rows, err := idx.Store(ctx, docs, indexer.WithEmbedding(eino.EmbedderDashscope))
if err != nil || rows == 0 {
g.Log().Error(ctx, "DocsChunkMsg rows: , err:", rows, err)
return
}
// 获取向量维度
dimension := 0
if len(embeddings) > 0 {
dimension = len(embeddings[0])
}
// 创建或更新DatasetIndex
err = s.createOrUpdateDatasetIndex(ctx, req[0].DatasetId, dimension, int64(len(req)))
if err != nil {
g.Log().Error(ctx, "CreateOrUpdateDatasetIndex err:", err)
err = s.publishKnowledgeDocumentMsg(ctx, req[0].TenantId, req[0].Creator, req[0].DocumentId, document.VectorStatusFailed.Code())
return
}
// 更新向量文档
for i, embedding := range embeddings {
req[i].Vector = pgvector.NewVector(gconv.Float32s(embedding))
req[i].VectorStatus = document.VectorStatusCompleted.Code()
req[i].Status = document.StatusEnable.Code()
}
_, err = dao.DocumentChunk.BatchInsert(ctx, req)
if err != nil {
g.Log().Error(ctx, "DocsChunkMsg err:", err)
err = s.publishKnowledgeDocumentMsg(ctx, req[0].TenantId, req[0].Creator, req[0].DocumentId, document.VectorStatusFailed.Code())
return
}
err = s.publishKnowledgeDocumentMsg(ctx, req[0].TenantId, req[0].Creator, req[0].DocumentId, document.VectorStatusCompleted.Code())
tenantId := gconv.Uint64(docs[0].MetaData[entity.DocumentChunkCol.TenantId])
creator := gconv.String(docs[0].MetaData[entity.DocumentChunkCol.Creator])
documentId := gconv.Int64(docs[0].MetaData[entity.DocumentChunkCol.DocumentId])
err = s.publishKnowledgeDocumentMsg(ctx, tenantId, creator, documentId, document.VectorStatusCompleted.Code())
return
}
// createOrUpdateDatasetIndex 创建或更新数据集索引
func (s *documentChunkService) createOrUpdateDatasetIndex(ctx context.Context, datasetId int64, dimension int, vectorCount int64) (err error) {
// 查询数据集是否已有索引
existIndex, err := dao.DatasetIndex.GetByDatasetId(ctx, datasetId)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return err
}
// 已有索引 → 只更新数量
if existIndex != nil {
_ = dao.DatasetIndex.IncVectorCount(ctx, existIndex.Id, vectorCount)
return nil
}
// ====================== 创建新索引 ======================
indexName := fmt.Sprintf("idx_dataset_%d_vector", datasetId) // 真实PG索引名
// 1. 插入索引配置
index := &entity.DatasetIndex{
DatasetId: datasetId,
Name: indexName,
Dimension: dimension,
FieldType: "float",
MetricType: "COSINE",
Status: gconv.PtrInt8(1),
VectorCount: vectorCount,
Description: fmt.Sprintf("数据集%d向量索引", datasetId),
}
_, err = dao.DatasetIndex.Insert(ctx, index)
if err != nil {
return err
}
// 2. 真正创建 PGVector 索引(唯一真实索引!)
err = s.createRealPGVectorIndex(ctx, indexName)
return err
}
// createRealPGVectorIndex 真正在PostgreSQL创建向量索引真实可用
func (s *documentChunkService) createRealPGVectorIndex(ctx context.Context, indexName string) error {
// 执行真实建索引语句
err := dao.DatasetIndex.InsertIndex(ctx, indexName)
if err != nil {
g.Log().Error(ctx, "创建向量索引失败:", err)
return err
}
g.Log().Info(ctx, "PGVector真实索引创建成功"+indexName)
return nil
}
// publishKnowledgeDocumentMsg 发布消息
func (s *documentChunkService) publishKnowledgeDocumentMsg(ctx context.Context, tenantId uint64, creator string, documentId int64, vectorStatus document.VectorStatus) (err error) {
knowledgeDocumentMsg := dto.KnowledgeDocumentMsg{

View File

@@ -134,9 +134,9 @@ CREATE TABLE IF NOT EXISTS rag_knowledge_keyword (
);
-- 唯一索引:保证 租户 + 数据集 + 文档 + 关键词 全局唯一
CREATE UNIQUE INDEX uk_rag_knowledge_keyword_tenant_dataset_doc_word
ON rag_knowledge_keyword(tenant_id, dataset_id, document_id, word)
WHERE deleted_at IS NULL;
-- CREATE UNIQUE INDEX uk_rag_knowledge_keyword_tenant_dataset_doc_word
-- ON rag_knowledge_keyword(tenant_id, dataset_id, document_id, word)
-- WHERE deleted_at IS NULL;
-- 索引(按业务高频查询)
CREATE INDEX idx_keyword_tenant_id ON rag_knowledge_keyword(tenant_id);
@@ -160,3 +160,118 @@ COMMENT ON COLUMN rag_knowledge_keyword.word IS '关键词';
COMMENT ON COLUMN rag_knowledge_keyword.weight IS '权重';
--------------------pgsql创建rag_knowledge_keyword表语句---------------------------
--------------------pgsql创建rag_vector_dataset_index表语句---------------------------
-- 向量数据集索引表
CREATE TABLE IF NOT EXISTS rag_vector_dataset_index (
-- 基础字段
id BIGINT PRIMARY KEY, -- 主键ID非自增
tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID int8
creator VARCHAR(64) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updater VARCHAR(64) NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at timestamp(6),
-- 核心字段
dataset_id INT8 NOT NULL,
name VARCHAR(255) NOT NULL,
collection VARCHAR(255) NOT NULL,
dimension INT NOT NULL,
field_type VARCHAR(50) NOT NULL,
metric_type VARCHAR(50) NOT NULL,
status SMALLINT NOT NULL DEFAULT 1, -- 状态1启用/0停用
vector_count INT8 NOT NULL DEFAULT 0,
description TEXT
);
-- 唯一约束
ALTER TABLE rag_vector_dataset_index ADD CONSTRAINT uk_dataset_id_name UNIQUE (dataset_id, name);
-- 索引
CREATE INDEX idx_dataset_index_tenant_id ON rag_vector_dataset_index(tenant_id);
CREATE INDEX idx_dataset_index_dataset_id ON rag_vector_dataset_index(dataset_id);
CREATE INDEX idx_dataset_index_status ON rag_vector_dataset_index(status);
-- 注释
COMMENT ON TABLE rag_vector_dataset_index IS '向量数据集索引表';
COMMENT ON COLUMN rag_vector_dataset_index.id IS '主键ID非自增';
COMMENT ON COLUMN rag_vector_dataset_index.tenant_id IS '租户ID';
COMMENT ON COLUMN rag_vector_dataset_index.creator IS '创建人';
COMMENT ON COLUMN rag_vector_dataset_index.created_at IS '创建时间';
COMMENT ON COLUMN rag_vector_dataset_index.updater IS '更新人';
COMMENT ON COLUMN rag_vector_dataset_index.updated_at IS '更新时间';
COMMENT ON COLUMN rag_vector_dataset_index.deleted_at IS '删除时间(软删)';
COMMENT ON COLUMN rag_vector_dataset_index.dataset_id IS '数据集ID';
COMMENT ON COLUMN rag_vector_dataset_index.name IS '索引名称';
COMMENT ON COLUMN rag_vector_dataset_index.collection IS '向量集合名称';
COMMENT ON COLUMN rag_vector_dataset_index.dimension IS '向量维度';
COMMENT ON COLUMN rag_vector_dataset_index.field_type IS '字段类型';
COMMENT ON COLUMN rag_vector_dataset_index.metric_type IS '度量类型';
COMMENT ON COLUMN rag_vector_dataset_index.status IS '状态';
COMMENT ON COLUMN rag_vector_dataset_index.vector_count IS '向量数量';
COMMENT ON COLUMN rag_vector_dataset_index.description IS '描述';
--------------------pgsql创建rag_vector_dataset_index表语句---------------------------
--------------------pgsql创建rag_vector_document_chunk表语句---------------------------
CREATE EXTENSION IF NOT EXISTS vector;
-- 文档分块向量表
CREATE TABLE IF NOT EXISTS rag_vector_document_chunk (
-- 基础字段
id BIGINT PRIMARY KEY, -- 主键ID非自增
tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID int8
creator VARCHAR(64) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updater VARCHAR(64) NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at timestamp(6),
-- 核心字段
status SMALLINT NOT NULL DEFAULT 1, -- 状态1启用/0停用
vector_status SMALLINT NOT NULL DEFAULT 1, -- 向量化状态: 1pending, 2processing, 3completed, 4failed,5partCompleted
dataset_id INT8 NOT NULL,
document_id INT8 NOT NULL,
content TEXT NOT NULL,
content_hash VARCHAR(128) NOT NULL,
chunk_index INT8 NOT NULL,
-- 向量字段pgvector
vector vector(1024) NOT NULL,
-- 扩展信息
metadata JSONB
);
-- 索引
CREATE INDEX idx_chunk_tenant_id ON rag_vector_document_chunk(tenant_id);
CREATE INDEX idx_chunk_dataset_id ON rag_vector_document_chunk(dataset_id);
CREATE INDEX idx_chunk_document_id ON rag_vector_document_chunk(document_id);
CREATE INDEX idx_chunk_content_hash ON rag_vector_document_chunk(content_hash);
CREATE INDEX idx_chunk_status ON rag_vector_document_chunk(status);
CREATE INDEX idx_chunk_vector_status ON rag_vector_document_chunk(vector_status);
-- 注释
COMMENT ON TABLE rag_vector_document_chunk IS '文档分块向量表';
COMMENT ON COLUMN rag_vector_document_chunk.id IS '主键ID非自增';
COMMENT ON COLUMN rag_vector_document_chunk.tenant_id IS '租户ID';
COMMENT ON COLUMN rag_vector_document_chunk.creator IS '创建人';
COMMENT ON COLUMN rag_vector_document_chunk.created_at IS '创建时间';
COMMENT ON COLUMN rag_vector_document_chunk.updater IS '更新人';
COMMENT ON COLUMN rag_vector_document_chunk.updated_at IS '更新时间';
COMMENT ON COLUMN rag_vector_document_chunk.deleted_at IS '删除时间(软删)';
COMMENT ON COLUMN rag_vector_document_chunk.status IS '状态';
COMMENT ON COLUMN rag_vector_document_chunk.vector_status IS '向量生成状态';
COMMENT ON COLUMN rag_vector_document_chunk.dataset_id IS '数据集ID';
COMMENT ON COLUMN rag_vector_document_chunk.document_id IS '文档ID';
COMMENT ON COLUMN rag_vector_document_chunk.content IS '分块内容';
COMMENT ON COLUMN rag_vector_document_chunk.content_hash IS '内容哈希';
COMMENT ON COLUMN rag_vector_document_chunk.chunk_index IS '分块序号';
COMMENT ON COLUMN rag_vector_document_chunk.vector IS '向量数据';
COMMENT ON COLUMN rag_vector_document_chunk.metadata IS '扩展元数据';
--------------------pgsql创建rag_vector_document_chunk表语句---------------------------