From 976e4c6f2c77a8ff6588c57c912d929255da0c4f Mon Sep 17 00:00:00 2001 From: Cold <16419454+cold502@user.noreply.gitee.com> Date: Thu, 11 Dec 2025 17:19:33 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=94=B9client,=20=E6=9B=B4=E6=94=B9?= =?UTF-8?q?=E4=B8=BAgclient=E6=96=B9=E6=B3=95,=E5=BC=83=E7=94=A8=E9=93=BE?= =?UTF-8?q?=E5=BC=8F=E8=B0=83=E7=94=A8=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 ++ ragflow/client.go | 89 ++++++++++++++++++----------------------------- 2 files changed, 36 insertions(+), 55 deletions(-) diff --git a/.gitignore b/.gitignore index 5e12a7a..591b05d 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ ragflow/agent文档.md ragflow/README_GLOBAL.md redis/stream使用示例.md ragflow/client_backup.go.bak +ragflow/为什么不能使用gclient.md +ragflow/agent文档.md diff --git a/ragflow/client.go b/ragflow/client.go index a5f64b2..f6e450b 100644 --- a/ragflow/client.go +++ b/ragflow/client.go @@ -1,9 +1,7 @@ package ragflow import ( - "bytes" "context" - "io" "net" "net/http" "net/url" @@ -14,13 +12,9 @@ import ( "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/net/gclient" ) -// gclient 完全不能用! -// 1. New() 默认 ResponseHeaderTimeout=30s -// 2. Clone() 内部调用 New(),链式调用会重置 Transport -// 3. 必须用原生 http.Client - var ( // globalClient 全局 RAGFlow 客户端(单例,延迟初始化) globalClient *Client @@ -41,7 +35,7 @@ func initClient() { return } - // 自定义 Transport,增大连接池(解决并发连接不足导致的超时) + // 自定义 Transport(增大连接池,设置超时) transport := &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{ @@ -49,23 +43,18 @@ func initClient() { KeepAlive: 30 * time.Second, }).DialContext, MaxIdleConns: 200, // 最大空闲连接数 - MaxIdleConnsPerHost: 100, // 每个 host 最大空闲连接数(关键!默认只有 2) + MaxIdleConnsPerHost: 100, // 每个 host 最大空闲连接数 MaxConnsPerHost: 100, // 每个 host 最大连接数 IdleConnTimeout: 90 * time.Second, // 空闲连接超时 TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, - ResponseHeaderTimeout: 180 * time.Second, // 等待响应头超时(关键!) + ResponseHeaderTimeout: 180 * time.Second, // 等待响应头超时 } - // 使用原生 http.Client(gclient 完全不能用,Clone() 内部调用 New() 会重置 Transport) - httpClient := &http.Client{ - Transport: transport, - Timeout: 0, // 不设置全局超时,由 context 控制 - } - - // 验证 Transport 设置 - g.Log().Infof(ctx, "✅ Transport 配置: ResponseHeaderTimeout=%v, MaxIdleConnsPerHost=%d, DisableKeepAlives=%v", - transport.ResponseHeaderTimeout, transport.MaxIdleConnsPerHost, transport.DisableKeepAlives) + // 初始化 gclient(不使用链式调用,避免 Transport 被重置) + httpClient := gclient.New() + httpClient.Client.Transport = transport + httpClient.Client.Timeout = 180 * time.Second globalClient = &Client{ BaseURL: strings.TrimSuffix(baseURL, "/"), @@ -73,7 +62,7 @@ func initClient() { HTTPClient: httpClient, } - g.Log().Infof(ctx, "✅ RAGFlow 全局客户端初始化成功: baseURL=%s", baseURL) + g.Log().Infof(ctx, "✅ RAGFlow 客户端初始化成功: baseURL=%s, timeout=180s", baseURL) }) } @@ -96,7 +85,7 @@ func GetGlobalClient() *Client { type Client struct { BaseURL string APIKey string - HTTPClient *http.Client // 原生 HTTP 客户端(gclient 不能用) + HTTPClient *gclient.Client // HTTP 客户端 } // CommonResponse 通用响应结构 @@ -111,61 +100,51 @@ func (r *CommonResponse) IsSuccess() bool { return r.Code == 0 } -// request 发送 HTTP 请求 +// request 发送 HTTP 请求(不使用链式调用) func (c *Client) request(ctx context.Context, method, path string, body interface{}, result interface{}) (err error) { fullURL := c.BaseURL + path + // 序列化请求体 var reqBody string if body != nil { - jsonData, err := gjson.Encode(body) - if err != nil { - return gerror.Newf("marshal request body failed: %v", err) + jsonData, jsonErr := gjson.Encode(body) + if jsonErr != nil { + return gerror.Newf("marshal request body failed: %v", jsonErr) } reqBody = string(jsonData) } - // 使用独立的 context 设置 300 秒超时(RAGFlow 高并发时响应较慢) - reqCtx, cancel := context.WithTimeout(context.Background(), 300*time.Second) - defer cancel() - startTime := time.Now() - - // 创建请求 - req, err := http.NewRequestWithContext(reqCtx, method, fullURL, bytes.NewReader([]byte(reqBody))) - if err != nil { - return gerror.Newf("create request failed: %v", err) - } - // 设置请求头 - req.Header.Set("Authorization", "Bearer "+c.APIKey) - req.Header.Set("Content-Type", "application/json") + c.HTTPClient.SetHeader("Authorization", "Bearer "+c.APIKey) + c.HTTPClient.SetHeader("Content-Type", "application/json") // 发送请求 - g.Log().Infof(ctx, "[RAGFlow HTTP] 发送请求: method=%s, url=%s", method, fullURL) - resp, err := c.HTTPClient.Do(req) - elapsed := time.Since(startTime) + var resp *gclient.Response + switch method { + case "GET": + resp, err = c.HTTPClient.Get(ctx, fullURL) + case "POST": + resp, err = c.HTTPClient.Post(ctx, fullURL, reqBody) + case "PUT": + resp, err = c.HTTPClient.Put(ctx, fullURL, reqBody) + case "DELETE": + resp, err = c.HTTPClient.Delete(ctx, fullURL, reqBody) + default: + return gerror.Newf("unsupported method: %s", method) + } + if err != nil { - g.Log().Errorf(ctx, "[RAGFlow HTTP] 请求失败(耗时 %v): method=%s, url=%s, error=%v", elapsed, method, fullURL, err) return gerror.Newf("request failed: %v", err) } - g.Log().Infof(ctx, "[RAGFlow HTTP] 收到响应(耗时 %v): status=%d, url=%s", elapsed, resp.StatusCode, fullURL) - defer resp.Body.Close() + defer resp.Close() - // 读取响应 - respBody, err := io.ReadAll(resp.Body) - if err != nil { - return gerror.Newf("read response failed: %v", err) - } - - // 打印响应详情 - g.Log().Debugf(ctx, "[RAGFlow HTTP] 响应: status=%d, body=%s", resp.StatusCode, respBody) + respBody := resp.ReadAll() if resp.StatusCode != http.StatusOK { - g.Log().Errorf(ctx, "[RAGFlow HTTP] 非200响应: status=%d, body=%s", resp.StatusCode, respBody) - return gerror.Newf("http status %d: %s", resp.StatusCode, respBody) + return gerror.Newf("http status %d: %s", resp.StatusCode, string(respBody)) } if err = gjson.DecodeTo(respBody, result); err != nil { - g.Log().Errorf(ctx, "[RAGFlow HTTP] 解析响应失败: body=%s, error=%v", string(respBody), err) return gerror.Newf("unmarshal response failed: %v", err) }