Files
common/ragflow/client.go

187 lines
5.5 KiB
Go
Raw Normal View History

2025-11-27 17:38:42 +08:00
package ragflow
import (
"context"
"net/url"
"strings"
2025-12-09 17:55:08 +08:00
"sync"
2026-01-09 17:57:14 +08:00
"sync/atomic"
2025-11-27 17:38:42 +08:00
2026-01-13 11:08:17 +08:00
"gitee.com/red-future---jilin-g/common/http"
"github.com/gogf/gf/v2/errors/gerror"
2025-12-03 09:59:40 +08:00
"github.com/gogf/gf/v2/frame/g"
2025-11-27 17:38:42 +08:00
)
2025-12-03 09:59:40 +08:00
var (
2025-12-09 17:55:08 +08:00
// globalClient 全局 RAGFlow 客户端(单例,延迟初始化)
2025-12-03 09:59:40 +08:00
globalClient *Client
2025-12-09 17:55:08 +08:00
clientOnce sync.Once
2025-12-03 09:59:40 +08:00
)
2025-12-09 17:55:08 +08:00
// initClient 延迟初始化客户端
func initClient() {
clientOnce.Do(func() {
ctx := context.Background()
2025-11-27 17:38:42 +08:00
2025-12-09 17:55:08 +08:00
// 读取配置
2026-01-09 17:57:14 +08:00
endpoints, apiKey := loadConfig(ctx)
2025-11-27 18:03:01 +08:00
2025-12-09 17:55:08 +08:00
// 如果配置不完整,跳过初始化
2026-01-09 17:57:14 +08:00
if len(endpoints) == 0 || apiKey == "" {
g.Log().Warning(ctx, "⚠️ RAGFlow 配置未找到,请在 config.yml 中添加 ragflow.base_url 或在 Consul 中配置 ragflow.endpoints")
2025-12-09 17:55:08 +08:00
return
}
2025-12-03 09:59:40 +08:00
2025-12-09 17:55:08 +08:00
globalClient = &Client{
2026-01-09 17:57:14 +08:00
Endpoints: endpoints,
APIKey: apiKey,
2025-12-09 17:55:08 +08:00
}
2025-12-03 09:59:40 +08:00
2026-01-09 17:57:14 +08:00
if len(endpoints) == 1 {
g.Log().Infof(ctx, "✅ RAGFlow 客户端初始化成功: endpoint=%s", endpoints[0])
} else {
g.Log().Infof(ctx, "✅ RAGFlow 客户端初始化成功: endpoints=%v (负载均衡)", endpoints)
}
2025-12-09 17:55:08 +08:00
})
2025-12-03 09:59:40 +08:00
}
2026-01-09 17:57:14 +08:00
// loadConfig 从配置加载 RAGFlow 配置(支持实例级配置)
// 优先级:
// 1. Consul实例级配置 ragflow.endpoints (数组)
// 2. Consul全局配置 ragflow.endpoints (数组)
// 3. config.yml的 ragflow.base_url (单个URL向后兼容)
func loadConfig(ctx context.Context) (endpoints []string, apiKey string) {
// 尝试从Consul读取endpoints支持实例级配置
// 注意这里不能直接导入customerService/service包会造成循环依赖
// 所以只能从config.yml读取Consul配置需要在customerservice层面调用时传入
// 读取API Key
2025-12-03 10:09:00 +08:00
apiKey = g.Cfg().MustGet(ctx, "ragflow.api_key", "").String()
2026-01-09 17:57:14 +08:00
// 尝试读取endpoints数组从config.yml或Consul同步的配置
endpointsConfig := g.Cfg().MustGet(ctx, "ragflow.endpoints")
if !endpointsConfig.IsEmpty() {
endpoints = endpointsConfig.Strings()
// 去除尾部斜杠
for i := range endpoints {
endpoints[i] = strings.TrimSuffix(endpoints[i], "/")
}
return
}
// Fallback到单个base_url向后兼容
baseURL := g.Cfg().MustGet(ctx, "ragflow.base_url", "").String()
if baseURL != "" {
endpoints = []string{strings.TrimSuffix(baseURL, "/")}
}
return
2025-12-03 09:59:40 +08:00
}
2025-12-09 17:55:08 +08:00
// GetGlobalClient 获取全局客户端(延迟初始化)
2025-12-03 09:59:40 +08:00
func GetGlobalClient() *Client {
2025-12-09 17:55:08 +08:00
initClient()
2025-12-03 09:59:40 +08:00
return globalClient
}
2026-01-09 17:57:14 +08:00
// Client RAGFlow API 客户端(支持负载均衡)
2025-12-03 09:59:40 +08:00
type Client struct {
2026-01-09 17:57:14 +08:00
Endpoints []string // RAGFlow实例列表
APIKey string // API密钥
currentIndex atomic.Uint64 // 当前轮询索引(原子操作)
}
// getNextEndpoint 获取下一个endpoint轮询算法
func (c *Client) getNextEndpoint() string {
if len(c.Endpoints) == 0 {
return ""
}
if len(c.Endpoints) == 1 {
return c.Endpoints[0]
}
// 原子递增并取模,实现轮询
idx := c.currentIndex.Add(1) % uint64(len(c.Endpoints))
return c.Endpoints[idx]
2025-11-27 17:38:42 +08:00
}
// CommonResponse 通用响应结构
type CommonResponse struct {
2025-11-27 18:03:01 +08:00
Code int `json:"code"`
Message string `json:"message"`
2025-11-27 17:38:42 +08:00
Data interface{} `json:"data,omitempty"`
}
// IsSuccess 检查响应是否成功
func (r *CommonResponse) IsSuccess() bool {
return r.Code == 0
}
// request 发送 HTTP 请求
//
// 为什么不使用 common/http 包:
2026-01-13 10:16:35 +08:00
//
// 1. common/http/http.go:61 会用内部请求的Authorization覆盖RAGFlow API key
// Httpclient.SetHeader("Authorization", g.RequestFromCtx(ctx).GetHeader("Authorization"))
// 这会导致RAGFlow API认证失败因为内部token不是RAGFlow的API key
//
// 2. common/http/http.go:69-74 强制解析为内部API响应格式ghttp.DefaultHandlerResponse
// resultStrut := &ghttp.DefaultHandlerResponse{}
// if err = gconv.Struct(result, &resultStrut); err != nil {
// err = errors.New(resultStrut.Message)
// } else if resultStrut.Code == 200 || resultStrut.Code == 0 {
// gconv.Struct(resultStrut.Data, target)
// }
// RAGFlow API返回格式与内部API不同会导致解析失败
//
// 因此直接使用 g.Client() 调用第三方API避免上述问题
func (c *Client) request(ctx context.Context, method, path string, body interface{}, result interface{}) (err error) {
2026-01-09 17:57:14 +08:00
endpoint := c.getNextEndpoint()
if endpoint == "" {
return gerror.New("RAGFlow endpoints not configured")
}
2026-01-13 11:20:14 +08:00
2026-01-09 17:57:14 +08:00
fullURL := endpoint + path
2025-11-27 18:03:01 +08:00
// 添加详细日志:请求信息
g.Log().Infof(ctx, "RAGFlow请求: %s %s", method, fullURL)
if body != nil {
bodyJSON := g.NewVar(body).String()
g.Log().Infof(ctx, "RAGFlow请求体: %s", bodyJSON)
2026-01-08 15:55:44 +08:00
}
2026-01-17 17:58:13 +08:00
// 使用common/http包
2026-01-13 11:08:17 +08:00
var headers = make(map[string]string)
headers["Authorization"] = "Bearer " + c.APIKey
headers["Content-Type"] = "application/json"
switch method {
case "GET":
2026-01-13 11:08:17 +08:00
err = http.Get(ctx, fullURL, headers, result, body)
case "POST":
2026-01-13 11:08:17 +08:00
err = http.Post(ctx, fullURL, headers, result, body)
case "PUT":
2026-01-13 11:08:17 +08:00
err = http.Put(ctx, fullURL, headers, result, body)
case "DELETE":
2026-01-13 10:16:35 +08:00
if body != nil {
2026-01-13 11:08:17 +08:00
err = http.Delete(ctx, fullURL, headers, result, body)
2026-01-13 10:16:35 +08:00
} else {
2026-01-13 11:08:17 +08:00
err = http.Delete(ctx, fullURL, headers, result)
2026-01-13 10:16:35 +08:00
}
default:
return gerror.Newf("unsupported method: %s", method)
2025-11-27 17:38:42 +08:00
}
return
2025-11-27 17:38:42 +08:00
}
// buildQueryString 构建查询字符串
func buildQueryString(params map[string]interface{}) string {
if len(params) == 0 {
return ""
}
2025-11-27 18:03:01 +08:00
parts := make([]string, 0, len(params))
2025-11-27 17:38:42 +08:00
for k, v := range params {
parts = append(parts, url.QueryEscape(k)+"="+url.QueryEscape(g.NewVar(v).String()))
2025-11-27 17:38:42 +08:00
}
return strings.Join(parts, "&")
}