2025-11-27 17:38:42 +08:00
|
|
|
|
package ragflow
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"context"
|
|
|
|
|
|
"net/url"
|
|
|
|
|
|
"strings"
|
2025-12-09 17:55:08 +08:00
|
|
|
|
"sync"
|
2026-01-09 17:57:14 +08:00
|
|
|
|
"sync/atomic"
|
2025-11-27 17:38:42 +08:00
|
|
|
|
|
2026-01-12 17:36:14 +08:00
|
|
|
|
"github.com/gogf/gf/v2/encoding/gjson"
|
2025-12-06 18:04:29 +08:00
|
|
|
|
"github.com/gogf/gf/v2/errors/gerror"
|
2025-12-03 09:59:40 +08:00
|
|
|
|
"github.com/gogf/gf/v2/frame/g"
|
2026-01-12 17:36:14 +08:00
|
|
|
|
"github.com/gogf/gf/v2/net/gclient"
|
2025-11-27 17:38:42 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
2025-12-03 09:59:40 +08:00
|
|
|
|
var (
|
2025-12-09 17:55:08 +08:00
|
|
|
|
// globalClient 全局 RAGFlow 客户端(单例,延迟初始化)
|
2025-12-03 09:59:40 +08:00
|
|
|
|
globalClient *Client
|
2025-12-09 17:55:08 +08:00
|
|
|
|
clientOnce sync.Once
|
2025-12-03 09:59:40 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
2025-12-09 17:55:08 +08:00
|
|
|
|
// initClient 延迟初始化客户端
|
|
|
|
|
|
func initClient() {
|
|
|
|
|
|
clientOnce.Do(func() {
|
|
|
|
|
|
ctx := context.Background()
|
2025-11-27 17:38:42 +08:00
|
|
|
|
|
2025-12-09 17:55:08 +08:00
|
|
|
|
// 读取配置
|
2026-01-09 17:57:14 +08:00
|
|
|
|
endpoints, apiKey := loadConfig(ctx)
|
2025-11-27 18:03:01 +08:00
|
|
|
|
|
2025-12-09 17:55:08 +08:00
|
|
|
|
// 如果配置不完整,跳过初始化
|
2026-01-09 17:57:14 +08:00
|
|
|
|
if len(endpoints) == 0 || apiKey == "" {
|
|
|
|
|
|
g.Log().Warning(ctx, "⚠️ RAGFlow 配置未找到,请在 config.yml 中添加 ragflow.base_url 或在 Consul 中配置 ragflow.endpoints")
|
2025-12-09 17:55:08 +08:00
|
|
|
|
return
|
|
|
|
|
|
}
|
2025-12-03 09:59:40 +08:00
|
|
|
|
|
2025-12-09 17:55:08 +08:00
|
|
|
|
globalClient = &Client{
|
2026-01-09 17:57:14 +08:00
|
|
|
|
Endpoints: endpoints,
|
|
|
|
|
|
APIKey: apiKey,
|
2025-12-09 17:55:08 +08:00
|
|
|
|
}
|
2025-12-03 09:59:40 +08:00
|
|
|
|
|
2026-01-09 17:57:14 +08:00
|
|
|
|
if len(endpoints) == 1 {
|
|
|
|
|
|
g.Log().Infof(ctx, "✅ RAGFlow 客户端初始化成功: endpoint=%s", endpoints[0])
|
|
|
|
|
|
} else {
|
|
|
|
|
|
g.Log().Infof(ctx, "✅ RAGFlow 客户端初始化成功: endpoints=%v (负载均衡)", endpoints)
|
|
|
|
|
|
}
|
2025-12-09 17:55:08 +08:00
|
|
|
|
})
|
2025-12-03 09:59:40 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-01-09 17:57:14 +08:00
|
|
|
|
// loadConfig 从配置加载 RAGFlow 配置(支持实例级配置)
|
|
|
|
|
|
// 优先级:
|
|
|
|
|
|
// 1. Consul实例级配置 ragflow.endpoints (数组)
|
|
|
|
|
|
// 2. Consul全局配置 ragflow.endpoints (数组)
|
|
|
|
|
|
// 3. config.yml的 ragflow.base_url (单个URL,向后兼容)
|
|
|
|
|
|
func loadConfig(ctx context.Context) (endpoints []string, apiKey string) {
|
|
|
|
|
|
// 尝试从Consul读取endpoints(支持实例级配置)
|
|
|
|
|
|
// 注意:这里不能直接导入customerService/service包,会造成循环依赖
|
|
|
|
|
|
// 所以只能从config.yml读取,Consul配置需要在customerservice层面调用时传入
|
|
|
|
|
|
|
|
|
|
|
|
// 读取API Key
|
2025-12-03 10:09:00 +08:00
|
|
|
|
apiKey = g.Cfg().MustGet(ctx, "ragflow.api_key", "").String()
|
2026-01-09 17:57:14 +08:00
|
|
|
|
|
|
|
|
|
|
// 尝试读取endpoints数组(从config.yml或Consul同步的配置)
|
|
|
|
|
|
endpointsConfig := g.Cfg().MustGet(ctx, "ragflow.endpoints")
|
|
|
|
|
|
if !endpointsConfig.IsEmpty() {
|
|
|
|
|
|
endpoints = endpointsConfig.Strings()
|
|
|
|
|
|
// 去除尾部斜杠
|
|
|
|
|
|
for i := range endpoints {
|
|
|
|
|
|
endpoints[i] = strings.TrimSuffix(endpoints[i], "/")
|
|
|
|
|
|
}
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Fallback到单个base_url(向后兼容)
|
|
|
|
|
|
baseURL := g.Cfg().MustGet(ctx, "ragflow.base_url", "").String()
|
|
|
|
|
|
if baseURL != "" {
|
|
|
|
|
|
endpoints = []string{strings.TrimSuffix(baseURL, "/")}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-12-06 10:13:38 +08:00
|
|
|
|
return
|
2025-12-03 09:59:40 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-12-09 17:55:08 +08:00
|
|
|
|
// GetGlobalClient 获取全局客户端(延迟初始化)
|
2025-12-03 09:59:40 +08:00
|
|
|
|
func GetGlobalClient() *Client {
|
2025-12-09 17:55:08 +08:00
|
|
|
|
initClient()
|
2025-12-03 09:59:40 +08:00
|
|
|
|
return globalClient
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-01-09 17:57:14 +08:00
|
|
|
|
// Client RAGFlow API 客户端(支持负载均衡)
|
2025-12-03 09:59:40 +08:00
|
|
|
|
type Client struct {
|
2026-01-09 17:57:14 +08:00
|
|
|
|
Endpoints []string // RAGFlow实例列表
|
|
|
|
|
|
APIKey string // API密钥
|
|
|
|
|
|
currentIndex atomic.Uint64 // 当前轮询索引(原子操作)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// getNextEndpoint 获取下一个endpoint(轮询算法)
|
|
|
|
|
|
func (c *Client) getNextEndpoint() string {
|
|
|
|
|
|
if len(c.Endpoints) == 0 {
|
|
|
|
|
|
return ""
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(c.Endpoints) == 1 {
|
|
|
|
|
|
return c.Endpoints[0]
|
|
|
|
|
|
}
|
|
|
|
|
|
// 原子递增并取模,实现轮询
|
|
|
|
|
|
idx := c.currentIndex.Add(1) % uint64(len(c.Endpoints))
|
|
|
|
|
|
return c.Endpoints[idx]
|
2025-11-27 17:38:42 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// CommonResponse 通用响应结构
|
|
|
|
|
|
type CommonResponse struct {
|
2025-11-27 18:03:01 +08:00
|
|
|
|
Code int `json:"code"`
|
|
|
|
|
|
Message string `json:"message"`
|
2025-11-27 17:38:42 +08:00
|
|
|
|
Data interface{} `json:"data,omitempty"`
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// IsSuccess 检查响应是否成功
|
|
|
|
|
|
func (r *CommonResponse) IsSuccess() bool {
|
|
|
|
|
|
return r.Code == 0
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-01-12 17:36:14 +08:00
|
|
|
|
// request 发送 HTTP 请求
|
|
|
|
|
|
//
|
|
|
|
|
|
// 为什么不使用 common/http 包:
|
2026-01-13 10:16:35 +08:00
|
|
|
|
//
|
2026-01-12 17:36:14 +08:00
|
|
|
|
// 1. common/http/http.go:61 会用内部请求的Authorization覆盖RAGFlow API key:
|
|
|
|
|
|
// Httpclient.SetHeader("Authorization", g.RequestFromCtx(ctx).GetHeader("Authorization"))
|
|
|
|
|
|
// 这会导致RAGFlow API认证失败,因为内部token不是RAGFlow的API key
|
|
|
|
|
|
//
|
|
|
|
|
|
// 2. common/http/http.go:69-74 强制解析为内部API响应格式(ghttp.DefaultHandlerResponse):
|
|
|
|
|
|
// resultStrut := &ghttp.DefaultHandlerResponse{}
|
|
|
|
|
|
// if err = gconv.Struct(result, &resultStrut); err != nil {
|
|
|
|
|
|
// err = errors.New(resultStrut.Message)
|
|
|
|
|
|
// } else if resultStrut.Code == 200 || resultStrut.Code == 0 {
|
|
|
|
|
|
// gconv.Struct(resultStrut.Data, target)
|
|
|
|
|
|
// }
|
|
|
|
|
|
// RAGFlow API返回格式与内部API不同,会导致解析失败
|
|
|
|
|
|
//
|
|
|
|
|
|
// 因此直接使用 g.Client() 调用第三方API,避免上述问题
|
2025-12-06 18:04:29 +08:00
|
|
|
|
func (c *Client) request(ctx context.Context, method, path string, body interface{}, result interface{}) (err error) {
|
2026-01-09 17:57:14 +08:00
|
|
|
|
endpoint := c.getNextEndpoint()
|
|
|
|
|
|
if endpoint == "" {
|
|
|
|
|
|
return gerror.New("RAGFlow endpoints not configured")
|
|
|
|
|
|
}
|
|
|
|
|
|
fullURL := endpoint + path
|
2025-11-27 18:03:01 +08:00
|
|
|
|
|
2026-01-12 17:36:14 +08:00
|
|
|
|
// 添加详细日志:请求信息
|
|
|
|
|
|
g.Log().Infof(ctx, "RAGFlow请求: %s %s", method, fullURL)
|
|
|
|
|
|
if body != nil {
|
|
|
|
|
|
bodyJSON := g.NewVar(body).String()
|
|
|
|
|
|
g.Log().Infof(ctx, "RAGFlow请求体: %s", bodyJSON)
|
2026-01-08 15:55:44 +08:00
|
|
|
|
}
|
2025-12-10 18:02:31 +08:00
|
|
|
|
|
2026-01-12 17:36:14 +08:00
|
|
|
|
// 创建新的HTTP客户端实例(避免共享状态)
|
|
|
|
|
|
client := g.Client()
|
|
|
|
|
|
client.SetHeader("Authorization", "Bearer "+c.APIKey)
|
|
|
|
|
|
client.SetHeader("Content-Type", "application/json")
|
|
|
|
|
|
|
|
|
|
|
|
var response *gclient.Response
|
2025-12-11 17:19:33 +08:00
|
|
|
|
switch method {
|
|
|
|
|
|
case "GET":
|
2026-01-12 17:36:14 +08:00
|
|
|
|
response, err = client.Get(ctx, fullURL, body)
|
2025-12-11 17:19:33 +08:00
|
|
|
|
case "POST":
|
2026-01-12 17:36:14 +08:00
|
|
|
|
response, err = client.Post(ctx, fullURL, body)
|
2025-12-11 17:19:33 +08:00
|
|
|
|
case "PUT":
|
2026-01-12 17:36:14 +08:00
|
|
|
|
response, err = client.Put(ctx, fullURL, body)
|
2025-12-11 17:19:33 +08:00
|
|
|
|
case "DELETE":
|
2026-01-13 10:16:35 +08:00
|
|
|
|
// DELETE请求需要明确使用ContentJson发送body
|
|
|
|
|
|
if body != nil {
|
|
|
|
|
|
response, err = client.ContentJson().Delete(ctx, fullURL, body)
|
|
|
|
|
|
} else {
|
|
|
|
|
|
response, err = client.Delete(ctx, fullURL)
|
|
|
|
|
|
}
|
2025-12-11 17:19:33 +08:00
|
|
|
|
default:
|
|
|
|
|
|
return gerror.Newf("unsupported method: %s", method)
|
2025-11-27 17:38:42 +08:00
|
|
|
|
}
|
2025-11-27 18:03:01 +08:00
|
|
|
|
|
2025-12-10 18:02:31 +08:00
|
|
|
|
if err != nil {
|
2026-01-12 17:36:14 +08:00
|
|
|
|
g.Log().Errorf(ctx, "RAGFlow HTTP请求失败: %v", err)
|
|
|
|
|
|
return gerror.Wrapf(err, "HTTP request to RAGFlow failed")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if response == nil {
|
|
|
|
|
|
return gerror.New("HTTP response is nil")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
defer response.Close()
|
|
|
|
|
|
|
|
|
|
|
|
// 读取响应体
|
|
|
|
|
|
respBytes := response.ReadAll()
|
|
|
|
|
|
g.Log().Infof(ctx, "RAGFlow响应: %s", string(respBytes))
|
|
|
|
|
|
|
|
|
|
|
|
// 解析JSON到result
|
|
|
|
|
|
if err = gjson.DecodeTo(respBytes, result); err != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "RAGFlow响应解析失败: %v, 原始响应: %s", err, string(respBytes))
|
|
|
|
|
|
return gerror.Wrapf(err, "failed to decode RAGFlow response")
|
2025-11-27 17:38:42 +08:00
|
|
|
|
}
|
2025-11-27 18:03:01 +08:00
|
|
|
|
|
2025-12-06 18:04:29 +08:00
|
|
|
|
return
|
2025-11-27 17:38:42 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// buildQueryString 构建查询字符串
|
|
|
|
|
|
func buildQueryString(params map[string]interface{}) string {
|
|
|
|
|
|
if len(params) == 0 {
|
|
|
|
|
|
return ""
|
|
|
|
|
|
}
|
2025-11-27 18:03:01 +08:00
|
|
|
|
|
2025-12-06 18:04:29 +08:00
|
|
|
|
parts := make([]string, 0, len(params))
|
2025-11-27 17:38:42 +08:00
|
|
|
|
for k, v := range params {
|
2025-12-06 18:04:29 +08:00
|
|
|
|
parts = append(parts, url.QueryEscape(k)+"="+url.QueryEscape(g.NewVar(v).String()))
|
2025-11-27 17:38:42 +08:00
|
|
|
|
}
|
|
|
|
|
|
return strings.Join(parts, "&")
|
|
|
|
|
|
}
|