复用http.go里面的 g.client(),更新config.yml的注册方式, 更新jaeger链路追踪
This commit is contained in:
@@ -72,7 +72,7 @@ func doRequest(ctx context.Context, method string, url string, target any, data
|
|||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if err = response.Close(); err != nil {
|
if err = response.Close(); err != nil {
|
||||||
glog.Errorf(ctx, `%+v`, err)
|
glog.Error(ctx, err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
result := response.ReadAll()
|
result := response.ReadAll()
|
||||||
|
|||||||
@@ -12,6 +12,8 @@ import (
|
|||||||
"github.com/gogf/gf/v2/net/ghttp"
|
"github.com/gogf/gf/v2/net/ghttp"
|
||||||
"github.com/gogf/gf/v2/net/gtrace"
|
"github.com/gogf/gf/v2/net/gtrace"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -47,10 +49,75 @@ func init() {
|
|||||||
// 默认自动初始化(保持向后兼容)
|
// 默认自动初始化(保持向后兼容)
|
||||||
Init()
|
Init()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewSpan 创建新的链路追踪 Span
|
||||||
|
// spanName: Span 名称,用于在 Jaeger UI 中标识
|
||||||
|
// 返回带有 Span 的 context 和 Span 对象,调用方需 defer span.End()
|
||||||
|
func NewSpan(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, *gtrace.Span) {
|
||||||
|
return gtrace.NewSpan(ctx, spanName, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordError 统一错误记录方法
|
||||||
|
// 功能:
|
||||||
|
// 1. 控制台输出错误(带完整堆栈 %+v)
|
||||||
|
// 2. Jaeger 链路追踪记录错误
|
||||||
|
// 3. 设置 Span 错误状态
|
||||||
|
//
|
||||||
|
// 使用示例:
|
||||||
|
//
|
||||||
|
// jaeger.RecordError(ctx, err, "保存数据失败")
|
||||||
|
//
|
||||||
|
// 参数:
|
||||||
|
// - ctx: 包含 trace span 的上下文
|
||||||
|
// - err: 错误对象(支持 gerror 堆栈)
|
||||||
|
// - msg: 可选的错误描述(用于日志和 Jaeger 显示)
|
||||||
|
func RecordError(ctx context.Context, err error, msg ...string) {
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// 1. 控制台输出(%+v 打印完整堆栈)
|
||||||
|
if len(msg) > 0 && msg[0] != "" {
|
||||||
|
g.Log().Errorf(ctx, "%s: %+v", msg[0], err)
|
||||||
|
} else {
|
||||||
|
g.Log().Errorf(ctx, "%+v", err)
|
||||||
|
}
|
||||||
|
// 2. Jaeger 记录(从 context 获取当前 span)
|
||||||
|
span := trace.SpanFromContext(ctx)
|
||||||
|
if span == nil || !span.IsRecording() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// 3. 记录错误到 span
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetAttributes(
|
||||||
|
attribute.Bool("error", true),
|
||||||
|
attribute.String("error.message", err.Error()),
|
||||||
|
)
|
||||||
|
// 4. 设置 span 状态为错误
|
||||||
|
if len(msg) > 0 && msg[0] != "" {
|
||||||
|
span.SetAttributes(attribute.String("error.msg", msg[0]))
|
||||||
|
span.SetStatus(codes.Error, msg[0]+": "+err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTracer HTTP 请求链路追踪中间件
|
||||||
|
// 功能:
|
||||||
|
// 1. 为每个 HTTP 请求创建 Span
|
||||||
|
// 2. 记录请求参数和响应内容
|
||||||
|
// 3. 自动捕获错误并记录到 Jaeger
|
||||||
|
//
|
||||||
|
// 使用方式:在路由组中注册为中间件
|
||||||
|
//
|
||||||
|
// group.Middleware(jaeger.NewTracer)
|
||||||
func NewTracer(r *ghttp.Request) {
|
func NewTracer(r *ghttp.Request) {
|
||||||
_, span := gtrace.NewSpan(r.Context(), r.GetServeHandler().GetMetaTag("summary"))
|
// 创建 Span(名称取自 controller 方法的 summary 标签)
|
||||||
|
ctx, span := gtrace.NewSpan(r.Context(), r.GetServeHandler().GetMetaTag("summary"))
|
||||||
|
r.SetCtx(ctx)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
// 记录请求参数
|
||||||
span.SetAttributes(attribute.String("request", getParams(r)))
|
span.SetAttributes(attribute.String("request", getParams(r)))
|
||||||
|
// 执行后续中间件和 handler
|
||||||
r.Middleware.Next()
|
r.Middleware.Next()
|
||||||
|
|
||||||
// 清理响应字符串,确保 UTF-8 有效(处理二进制数据如 ZIP 文件)
|
// 清理响应字符串,确保 UTF-8 有效(处理二进制数据如 ZIP 文件)
|
||||||
@@ -63,7 +130,19 @@ func NewTracer(r *ghttp.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
span.SetAttributes(attribute.String("response", cleanResponse))
|
span.SetAttributes(attribute.String("response", cleanResponse))
|
||||||
|
span.SetAttributes(attribute.Int("http.status_code", r.Response.Status))
|
||||||
|
|
||||||
|
if err := r.GetError(); err != nil {
|
||||||
|
RecordError(ctx, err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
if r.Response.Status >= 500 {
|
||||||
|
span.SetAttributes(attribute.Bool("error", true))
|
||||||
|
span.SetStatus(codes.Error, "http status "+strconv.Itoa(r.Response.Status))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// getParams 提取请求参数(用于 Jaeger 记录)
|
||||||
func getParams(r *ghttp.Request) string {
|
func getParams(r *ghttp.Request) string {
|
||||||
params := map[string]interface{}{}
|
params := map[string]interface{}{}
|
||||||
if r.Method == "POST" {
|
if r.Method == "POST" {
|
||||||
|
|||||||
@@ -2,13 +2,13 @@ package ragflow
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"net"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
commonHttp "gitee.com/red-future---jilin-g/common/http"
|
||||||
"github.com/gogf/gf/v2/encoding/gjson"
|
"github.com/gogf/gf/v2/encoding/gjson"
|
||||||
"github.com/gogf/gf/v2/errors/gerror"
|
"github.com/gogf/gf/v2/errors/gerror"
|
||||||
"github.com/gogf/gf/v2/frame/g"
|
"github.com/gogf/gf/v2/frame/g"
|
||||||
@@ -35,34 +35,13 @@ func initClient() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 自定义 Transport(增大连接池,设置超时)
|
|
||||||
transport := &http.Transport{
|
|
||||||
Proxy: http.ProxyFromEnvironment,
|
|
||||||
DialContext: (&net.Dialer{
|
|
||||||
Timeout: 30 * time.Second,
|
|
||||||
KeepAlive: 30 * time.Second,
|
|
||||||
}).DialContext,
|
|
||||||
MaxIdleConns: 200, // 最大空闲连接数
|
|
||||||
MaxIdleConnsPerHost: 100, // 每个 host 最大空闲连接数
|
|
||||||
MaxConnsPerHost: 100, // 每个 host 最大连接数
|
|
||||||
IdleConnTimeout: 90 * time.Second, // 空闲连接超时
|
|
||||||
TLSHandshakeTimeout: 10 * time.Second,
|
|
||||||
ExpectContinueTimeout: 1 * time.Second,
|
|
||||||
ResponseHeaderTimeout: 180 * time.Second, // 等待响应头超时
|
|
||||||
}
|
|
||||||
|
|
||||||
// 初始化 gclient(不使用链式调用,避免 Transport 被重置)
|
|
||||||
httpClient := gclient.New()
|
|
||||||
httpClient.Client.Transport = transport
|
|
||||||
httpClient.Client.Timeout = 180 * time.Second
|
|
||||||
|
|
||||||
globalClient = &Client{
|
globalClient = &Client{
|
||||||
BaseURL: strings.TrimSuffix(baseURL, "/"),
|
BaseURL: strings.TrimSuffix(baseURL, "/"),
|
||||||
APIKey: apiKey,
|
APIKey: apiKey,
|
||||||
HTTPClient: httpClient,
|
HTTPClient: commonHttp.Httpclient,
|
||||||
}
|
}
|
||||||
|
|
||||||
g.Log().Infof(ctx, "✅ RAGFlow 客户端初始化成功: baseURL=%s, timeout=180s", baseURL)
|
g.Log().Infof(ctx, "✅ RAGFlow 客户端初始化成功: baseURL=%s", baseURL)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -100,7 +79,7 @@ func (r *CommonResponse) IsSuccess() bool {
|
|||||||
return r.Code == 0
|
return r.Code == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// request 发送 HTTP 请求(不使用链式调用)
|
// request 发送 HTTP 请求
|
||||||
func (c *Client) request(ctx context.Context, method, path string, body interface{}, result interface{}) (err error) {
|
func (c *Client) request(ctx context.Context, method, path string, body interface{}, result interface{}) (err error) {
|
||||||
fullURL := c.BaseURL + path
|
fullURL := c.BaseURL + path
|
||||||
|
|
||||||
@@ -114,21 +93,24 @@ func (c *Client) request(ctx context.Context, method, path string, body interfac
|
|||||||
reqBody = string(jsonData)
|
reqBody = string(jsonData)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 设置请求头
|
// 设置请求头和超时
|
||||||
c.HTTPClient.SetHeader("Authorization", "Bearer "+c.APIKey)
|
// 注意:使用 Chain 模式,避免修改全局 Httpclient
|
||||||
c.HTTPClient.SetHeader("Content-Type", "application/json")
|
client := c.HTTPClient.Timeout(180 * time.Second).Header(map[string]string{
|
||||||
|
"Authorization": "Bearer " + c.APIKey,
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
})
|
||||||
|
|
||||||
// 发送请求
|
// 发送请求
|
||||||
var resp *gclient.Response
|
var resp *gclient.Response
|
||||||
switch method {
|
switch method {
|
||||||
case "GET":
|
case "GET":
|
||||||
resp, err = c.HTTPClient.Get(ctx, fullURL)
|
resp, err = client.Get(ctx, fullURL)
|
||||||
case "POST":
|
case "POST":
|
||||||
resp, err = c.HTTPClient.Post(ctx, fullURL, reqBody)
|
resp, err = client.Post(ctx, fullURL, reqBody)
|
||||||
case "PUT":
|
case "PUT":
|
||||||
resp, err = c.HTTPClient.Put(ctx, fullURL, reqBody)
|
resp, err = client.Put(ctx, fullURL, reqBody)
|
||||||
case "DELETE":
|
case "DELETE":
|
||||||
resp, err = c.HTTPClient.Delete(ctx, fullURL, reqBody)
|
resp, err = client.Delete(ctx, fullURL, reqBody)
|
||||||
default:
|
default:
|
||||||
return gerror.Newf("unsupported method: %s", method)
|
return gerror.Newf("unsupported method: %s", method)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -92,31 +92,13 @@ func NeedES() bool {
|
|||||||
// 如果配置文件中没有 startup 配置,则默认全部启动
|
// 如果配置文件中没有 startup 配置,则默认全部启动
|
||||||
func loadFromConfig() *Components {
|
func loadFromConfig() *Components {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// 检查是否有 startup 配置节
|
|
||||||
startupCfg := g.Cfg().MustGet(ctx, "startup")
|
|
||||||
if startupCfg.IsEmpty() {
|
|
||||||
// 没有配置 startup,默认全部启动
|
|
||||||
glog.Debug(ctx, "未找到 startup 配置,默认启动所有组件")
|
|
||||||
return &Components{
|
return &Components{
|
||||||
Consul: true,
|
Consul: !g.Cfg().MustGet(ctx, "consul").IsEmpty(),
|
||||||
Jaeger: true,
|
Jaeger: !g.Cfg().MustGet(ctx, "jaeger").IsEmpty(),
|
||||||
Redis: true,
|
Redis: !g.Cfg().MustGet(ctx, "redis").IsEmpty(),
|
||||||
RabbitMQ: true,
|
RabbitMQ: !g.Cfg().MustGet(ctx, "rabbitmq").IsEmpty(),
|
||||||
MongoDB: true,
|
MongoDB: !g.Cfg().MustGet(ctx, "mongo").IsEmpty(),
|
||||||
RAGFlow: true,
|
RAGFlow: !g.Cfg().MustGet(ctx, "ragflow").IsEmpty(),
|
||||||
ES: true,
|
ES: !g.Cfg().MustGet(ctx, "elasticsearch").IsEmpty(),
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 有配置则按配置来,未配置的项默认 true
|
|
||||||
return &Components{
|
|
||||||
Consul: g.Cfg().MustGet(ctx, "startup.consul", true).Bool(),
|
|
||||||
Jaeger: g.Cfg().MustGet(ctx, "startup.jaeger", true).Bool(),
|
|
||||||
Redis: g.Cfg().MustGet(ctx, "startup.redis", true).Bool(),
|
|
||||||
RabbitMQ: g.Cfg().MustGet(ctx, "startup.rabbitmq", true).Bool(),
|
|
||||||
MongoDB: g.Cfg().MustGet(ctx, "startup.mongodb", true).Bool(),
|
|
||||||
RAGFlow: g.Cfg().MustGet(ctx, "startup.ragflow", true).Bool(),
|
|
||||||
ES: g.Cfg().MustGet(ctx, "startup.es", true).Bool(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user