Files
common/ragflow/client.go

166 lines
4.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package ragflow
import (
"context"
"net"
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gclient"
)
var (
// globalClient 全局 RAGFlow 客户端(单例,延迟初始化)
globalClient *Client
clientOnce sync.Once
)
// initClient 延迟初始化客户端
func initClient() {
clientOnce.Do(func() {
ctx := context.Background()
// 读取配置
baseURL, apiKey := loadConfig(ctx)
// 如果配置不完整,跳过初始化
if baseURL == "" || apiKey == "" {
g.Log().Warning(ctx, "⚠️ RAGFlow 配置未找到,请在项目 config.yml 中添加 ragflow.base_url 和 ragflow.api_key")
return
}
// 自定义 Transport增大连接池设置超时
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 200, // 最大空闲连接数
MaxIdleConnsPerHost: 100, // 每个 host 最大空闲连接数
MaxConnsPerHost: 100, // 每个 host 最大连接数
IdleConnTimeout: 90 * time.Second, // 空闲连接超时
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
ResponseHeaderTimeout: 180 * time.Second, // 等待响应头超时
}
// 初始化 gclient不使用链式调用避免 Transport 被重置)
httpClient := gclient.New()
httpClient.Client.Transport = transport
httpClient.Client.Timeout = 180 * time.Second
globalClient = &Client{
BaseURL: strings.TrimSuffix(baseURL, "/"),
APIKey: apiKey,
HTTPClient: httpClient,
}
g.Log().Infof(ctx, "✅ RAGFlow 客户端初始化成功: baseURL=%s, timeout=180s", baseURL)
})
}
// loadConfig 从配置文件加载 RAGFlow 配置
func loadConfig(ctx context.Context) (baseURL, apiKey string) {
// 使用 GoFrame 全局配置(从项目的 config.yml 读取)
baseURL = g.Cfg().MustGet(ctx, "ragflow.base_url", "").String()
apiKey = g.Cfg().MustGet(ctx, "ragflow.api_key", "").String()
return
}
// GetGlobalClient 获取全局客户端(延迟初始化)
// 使用示例client := ragflow.GetGlobalClient()
func GetGlobalClient() *Client {
initClient()
return globalClient
}
// Client RAGFlow API 客户端
type Client struct {
BaseURL string
APIKey string
HTTPClient *gclient.Client // HTTP 客户端
}
// CommonResponse 通用响应结构
type CommonResponse struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
// IsSuccess 检查响应是否成功
func (r *CommonResponse) IsSuccess() bool {
return r.Code == 0
}
// request 发送 HTTP 请求(不使用链式调用)
func (c *Client) request(ctx context.Context, method, path string, body interface{}, result interface{}) (err error) {
fullURL := c.BaseURL + path
// 序列化请求体
var reqBody string
if body != nil {
jsonData, jsonErr := gjson.Encode(body)
if jsonErr != nil {
return gerror.Newf("marshal request body failed: %v", jsonErr)
}
reqBody = string(jsonData)
}
// 设置请求头
c.HTTPClient.SetHeader("Authorization", "Bearer "+c.APIKey)
c.HTTPClient.SetHeader("Content-Type", "application/json")
// 发送请求
var resp *gclient.Response
switch method {
case "GET":
resp, err = c.HTTPClient.Get(ctx, fullURL)
case "POST":
resp, err = c.HTTPClient.Post(ctx, fullURL, reqBody)
case "PUT":
resp, err = c.HTTPClient.Put(ctx, fullURL, reqBody)
case "DELETE":
resp, err = c.HTTPClient.Delete(ctx, fullURL, reqBody)
default:
return gerror.Newf("unsupported method: %s", method)
}
if err != nil {
return gerror.Newf("request failed: %v", err)
}
defer resp.Close()
respBody := resp.ReadAll()
if resp.StatusCode != http.StatusOK {
return gerror.Newf("http status %d: %s", resp.StatusCode, string(respBody))
}
if err = gjson.DecodeTo(respBody, result); err != nil {
return gerror.Newf("unmarshal response failed: %v", err)
}
return
}
// buildQueryString 构建查询字符串
func buildQueryString(params map[string]interface{}) string {
if len(params) == 0 {
return ""
}
parts := make([]string, 0, len(params))
for k, v := range params {
parts = append(parts, url.QueryEscape(k)+"="+url.QueryEscape(g.NewVar(v).String()))
}
return strings.Join(parts, "&")
}