From 9b900c1159f49f58d5ee39213835f6316dfcdadc Mon Sep 17 00:00:00 2001 From: Cold <16419454+cold502@user.noreply.gitee.com> Date: Fri, 9 Jan 2026 17:57:14 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0consul=E9=83=A8=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ragflow/client.go | 77 ++++++++++++++++++++++++++++++++++++--------- ragflow/document.go | 8 +++-- redis/types.go | 18 ++++++----- 3 files changed, 79 insertions(+), 24 deletions(-) diff --git a/ragflow/client.go b/ragflow/client.go index c0ee6c2..d5958bf 100644 --- a/ragflow/client.go +++ b/ragflow/client.go @@ -5,6 +5,7 @@ import ( "net/url" "strings" "sync" + "sync/atomic" commonHttp "gitee.com/red-future---jilin-g/common/http" "github.com/gogf/gf/v2/errors/gerror" @@ -23,27 +24,57 @@ func initClient() { ctx := context.Background() // 读取配置 - baseURL, apiKey := loadConfig(ctx) + endpoints, apiKey := loadConfig(ctx) // 如果配置不完整,跳过初始化 - if baseURL == "" || apiKey == "" { - g.Log().Warning(ctx, "⚠️ RAGFlow 配置未找到,请在项目 config.yml 中添加 ragflow.base_url 和 ragflow.api_key") + if len(endpoints) == 0 || apiKey == "" { + g.Log().Warning(ctx, "⚠️ RAGFlow 配置未找到,请在 config.yml 中添加 ragflow.base_url 或在 Consul 中配置 ragflow.endpoints") return } globalClient = &Client{ - BaseURL: strings.TrimSuffix(baseURL, "/"), - APIKey: apiKey, + Endpoints: endpoints, + APIKey: apiKey, } - g.Log().Infof(ctx, "✅ RAGFlow 客户端初始化成功: baseURL=%s", baseURL) + if len(endpoints) == 1 { + g.Log().Infof(ctx, "✅ RAGFlow 客户端初始化成功: endpoint=%s", endpoints[0]) + } else { + g.Log().Infof(ctx, "✅ RAGFlow 客户端初始化成功: endpoints=%v (负载均衡)", endpoints) + } }) } -// loadConfig 从配置文件加载 RAGFlow 配置 -func loadConfig(ctx context.Context) (baseURL, apiKey string) { - baseURL = g.Cfg().MustGet(ctx, "ragflow.base_url", "").String() +// loadConfig 从配置加载 RAGFlow 配置(支持实例级配置) +// 优先级: +// 1. Consul实例级配置 ragflow.endpoints (数组) +// 2. Consul全局配置 ragflow.endpoints (数组) +// 3. config.yml的 ragflow.base_url (单个URL,向后兼容) +func loadConfig(ctx context.Context) (endpoints []string, apiKey string) { + // 尝试从Consul读取endpoints(支持实例级配置) + // 注意:这里不能直接导入customerService/service包,会造成循环依赖 + // 所以只能从config.yml读取,Consul配置需要在customerservice层面调用时传入 + + // 读取API Key apiKey = g.Cfg().MustGet(ctx, "ragflow.api_key", "").String() + + // 尝试读取endpoints数组(从config.yml或Consul同步的配置) + endpointsConfig := g.Cfg().MustGet(ctx, "ragflow.endpoints") + if !endpointsConfig.IsEmpty() { + endpoints = endpointsConfig.Strings() + // 去除尾部斜杠 + for i := range endpoints { + endpoints[i] = strings.TrimSuffix(endpoints[i], "/") + } + return + } + + // Fallback到单个base_url(向后兼容) + baseURL := g.Cfg().MustGet(ctx, "ragflow.base_url", "").String() + if baseURL != "" { + endpoints = []string{strings.TrimSuffix(baseURL, "/")} + } + return } @@ -53,10 +84,24 @@ func GetGlobalClient() *Client { return globalClient } -// Client RAGFlow API 客户端 +// Client RAGFlow API 客户端(支持负载均衡) type Client struct { - BaseURL string - APIKey string + Endpoints []string // RAGFlow实例列表 + APIKey string // API密钥 + currentIndex atomic.Uint64 // 当前轮询索引(原子操作) +} + +// getNextEndpoint 获取下一个endpoint(轮询算法) +func (c *Client) getNextEndpoint() string { + if len(c.Endpoints) == 0 { + return "" + } + if len(c.Endpoints) == 1 { + return c.Endpoints[0] + } + // 原子递增并取模,实现轮询 + idx := c.currentIndex.Add(1) % uint64(len(c.Endpoints)) + return c.Endpoints[idx] } // CommonResponse 通用响应结构 @@ -71,9 +116,13 @@ func (r *CommonResponse) IsSuccess() bool { return r.Code == 0 } -// request 发送 HTTP 请求(使用统一的common/http包) +// request 发送 HTTP 请求(使用统一的common/http包,支持负载均衡) func (c *Client) request(ctx context.Context, method, path string, body interface{}, result interface{}) (err error) { - fullURL := c.BaseURL + path + endpoint := c.getNextEndpoint() + if endpoint == "" { + return gerror.New("RAGFlow endpoints not configured") + } + fullURL := endpoint + path headers := map[string]string{ "Authorization": "Bearer " + c.APIKey, diff --git a/ragflow/document.go b/ragflow/document.go index f263c9b..c6e39e0 100644 --- a/ragflow/document.go +++ b/ragflow/document.go @@ -162,8 +162,12 @@ func (c *Client) UploadDocumentFromText(ctx context.Context, datasetId, content, filename = "document.txt" } - // 构造URL - url := c.BaseURL + "/api/v1/datasets/" + datasetId + "/documents" + // 构造URL(使用负载均衡) + endpoint := c.getNextEndpoint() + if endpoint == "" { + return "", gerror.New("RAGFlow endpoints not configured") + } + url := endpoint + "/api/v1/datasets/" + datasetId + "/documents" // 创建multipart writer body := &bytes.Buffer{} diff --git a/redis/types.go b/redis/types.go index f76d97d..2ccfbde 100644 --- a/redis/types.go +++ b/redis/types.go @@ -38,14 +38,16 @@ type BatchStreamMessage struct { // ResponseStreamMessage RAGFlow 响应消息结构(MQ 消息) type ResponseStreamMessage struct { - UserId string `json:"userId"` // 用户ID - Platform string `json:"platform"` // 平台标识 - TenantId string `json:"tenantId"` // 租户ID - Question string `json:"question"` // 用户问题 - Content string `json:"content"` // RAGFlow 回复内容 - SessionId string `json:"sessionId"` // RAGFlow Session ID - Timestamp int64 `json:"timestamp"` // 时间戳(秒) - MessageId string `json:"messageId"` // 原始消息ID + UserId string `json:"userId"` // 用户ID + Platform string `json:"platform"` // 平台标识 + TenantId string `json:"tenantId"` // 租户ID + AccountId string `json:"accountId,omitempty"` // 账号ID + AccountName string `json:"accountName,omitempty"` // 客服账号名称 + Question string `json:"question"` // 用户问题 + Content string `json:"content"` // RAGFlow 回复内容 + SessionId string `json:"sessionId"` // RAGFlow Session ID + Timestamp int64 `json:"timestamp"` // 时间戳(秒) + MessageId string `json:"messageId"` // 原始消息ID } // FollowUpMessage 追问消息结构(RabbitMQ 延时队列)