Files
common/ragflow/client.go

206 lines
6.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package ragflow
import (
"context"
"net/url"
"strings"
"sync"
"sync/atomic"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gclient"
)
var (
// globalClient 全局 RAGFlow 客户端(单例,延迟初始化)
globalClient *Client
clientOnce sync.Once
)
// initClient 延迟初始化客户端
func initClient() {
clientOnce.Do(func() {
ctx := context.Background()
// 读取配置
endpoints, apiKey := loadConfig(ctx)
// 如果配置不完整,跳过初始化
if len(endpoints) == 0 || apiKey == "" {
g.Log().Warning(ctx, "⚠️ RAGFlow 配置未找到,请在 config.yml 中添加 ragflow.base_url 或在 Consul 中配置 ragflow.endpoints")
return
}
globalClient = &Client{
Endpoints: endpoints,
APIKey: apiKey,
}
if len(endpoints) == 1 {
g.Log().Infof(ctx, "✅ RAGFlow 客户端初始化成功: endpoint=%s", endpoints[0])
} else {
g.Log().Infof(ctx, "✅ RAGFlow 客户端初始化成功: endpoints=%v (负载均衡)", endpoints)
}
})
}
// loadConfig 从配置加载 RAGFlow 配置(支持实例级配置)
// 优先级:
// 1. Consul实例级配置 ragflow.endpoints (数组)
// 2. Consul全局配置 ragflow.endpoints (数组)
// 3. config.yml的 ragflow.base_url (单个URL向后兼容)
func loadConfig(ctx context.Context) (endpoints []string, apiKey string) {
// 尝试从Consul读取endpoints支持实例级配置
// 注意这里不能直接导入customerService/service包会造成循环依赖
// 所以只能从config.yml读取Consul配置需要在customerservice层面调用时传入
// 读取API Key
apiKey = g.Cfg().MustGet(ctx, "ragflow.api_key", "").String()
// 尝试读取endpoints数组从config.yml或Consul同步的配置
endpointsConfig := g.Cfg().MustGet(ctx, "ragflow.endpoints")
if !endpointsConfig.IsEmpty() {
endpoints = endpointsConfig.Strings()
// 去除尾部斜杠
for i := range endpoints {
endpoints[i] = strings.TrimSuffix(endpoints[i], "/")
}
return
}
// Fallback到单个base_url向后兼容
baseURL := g.Cfg().MustGet(ctx, "ragflow.base_url", "").String()
if baseURL != "" {
endpoints = []string{strings.TrimSuffix(baseURL, "/")}
}
return
}
// GetGlobalClient 获取全局客户端(延迟初始化)
func GetGlobalClient() *Client {
initClient()
return globalClient
}
// Client RAGFlow API 客户端(支持负载均衡)
type Client struct {
Endpoints []string // RAGFlow实例列表
APIKey string // API密钥
currentIndex atomic.Uint64 // 当前轮询索引(原子操作)
}
// getNextEndpoint 获取下一个endpoint轮询算法
func (c *Client) getNextEndpoint() string {
if len(c.Endpoints) == 0 {
return ""
}
if len(c.Endpoints) == 1 {
return c.Endpoints[0]
}
// 原子递增并取模,实现轮询
idx := c.currentIndex.Add(1) % uint64(len(c.Endpoints))
return c.Endpoints[idx]
}
// CommonResponse 通用响应结构
type CommonResponse struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
// IsSuccess 检查响应是否成功
func (r *CommonResponse) IsSuccess() bool {
return r.Code == 0
}
// request 发送 HTTP 请求
//
// 为什么不使用 common/http 包:
// 1. common/http/http.go:61 会用内部请求的Authorization覆盖RAGFlow API key
// Httpclient.SetHeader("Authorization", g.RequestFromCtx(ctx).GetHeader("Authorization"))
// 这会导致RAGFlow API认证失败因为内部token不是RAGFlow的API key
//
// 2. common/http/http.go:69-74 强制解析为内部API响应格式ghttp.DefaultHandlerResponse
// resultStrut := &ghttp.DefaultHandlerResponse{}
// if err = gconv.Struct(result, &resultStrut); err != nil {
// err = errors.New(resultStrut.Message)
// } else if resultStrut.Code == 200 || resultStrut.Code == 0 {
// gconv.Struct(resultStrut.Data, target)
// }
// RAGFlow API返回格式与内部API不同会导致解析失败
//
// 因此直接使用 g.Client() 调用第三方API避免上述问题
func (c *Client) request(ctx context.Context, method, path string, body interface{}, result interface{}) (err error) {
endpoint := c.getNextEndpoint()
if endpoint == "" {
return gerror.New("RAGFlow endpoints not configured")
}
fullURL := endpoint + path
// 添加详细日志:请求信息
g.Log().Infof(ctx, "RAGFlow请求: %s %s", method, fullURL)
if body != nil {
bodyJSON := g.NewVar(body).String()
g.Log().Infof(ctx, "RAGFlow请求体: %s", bodyJSON)
}
// 创建新的HTTP客户端实例避免共享状态
client := g.Client()
client.SetHeader("Authorization", "Bearer "+c.APIKey)
client.SetHeader("Content-Type", "application/json")
var response *gclient.Response
switch method {
case "GET":
response, err = client.Get(ctx, fullURL, body)
case "POST":
response, err = client.Post(ctx, fullURL, body)
case "PUT":
response, err = client.Put(ctx, fullURL, body)
case "DELETE":
response, err = client.Delete(ctx, fullURL, body)
default:
return gerror.Newf("unsupported method: %s", method)
}
if err != nil {
g.Log().Errorf(ctx, "RAGFlow HTTP请求失败: %v", err)
return gerror.Wrapf(err, "HTTP request to RAGFlow failed")
}
if response == nil {
return gerror.New("HTTP response is nil")
}
defer response.Close()
// 读取响应体
respBytes := response.ReadAll()
g.Log().Infof(ctx, "RAGFlow响应: %s", string(respBytes))
// 解析JSON到result
if err = gjson.DecodeTo(respBytes, result); err != nil {
g.Log().Errorf(ctx, "RAGFlow响应解析失败: %v, 原始响应: %s", err, string(respBytes))
return gerror.Wrapf(err, "failed to decode RAGFlow response")
}
return
}
// buildQueryString 构建查询字符串
func buildQueryString(params map[string]interface{}) string {
if len(params) == 0 {
return ""
}
parts := make([]string, 0, len(params))
for k, v := range params {
parts = append(parts, url.QueryEscape(k)+"="+url.QueryEscape(g.NewVar(v).String()))
}
return strings.Join(parts, "&")
}