diff --git a/http/http.go b/http/http.go index 0023e64..b76621d 100644 --- a/http/http.go +++ b/http/http.go @@ -72,7 +72,7 @@ func doRequest(ctx context.Context, method string, url string, target any, data } defer func() { if err = response.Close(); err != nil { - glog.Errorf(ctx, `%+v`, err) + glog.Error(ctx, err) } }() result := response.ReadAll() diff --git a/jaeger/jaeger.go b/jaeger/jaeger.go index 9acc586..0132dd3 100644 --- a/jaeger/jaeger.go +++ b/jaeger/jaeger.go @@ -12,6 +12,8 @@ import ( "github.com/gogf/gf/v2/net/ghttp" "github.com/gogf/gf/v2/net/gtrace" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) var ( @@ -47,10 +49,75 @@ func init() { // 默认自动初始化(保持向后兼容) Init() } + +// NewSpan 创建新的链路追踪 Span +// spanName: Span 名称,用于在 Jaeger UI 中标识 +// 返回带有 Span 的 context 和 Span 对象,调用方需 defer span.End() +func NewSpan(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, *gtrace.Span) { + return gtrace.NewSpan(ctx, spanName, opts...) +} + +// RecordError 统一错误记录方法 +// 功能: +// 1. 控制台输出错误(带完整堆栈 %+v) +// 2. Jaeger 链路追踪记录错误 +// 3. 设置 Span 错误状态 +// +// 使用示例: +// +// jaeger.RecordError(ctx, err, "保存数据失败") +// +// 参数: +// - ctx: 包含 trace span 的上下文 +// - err: 错误对象(支持 gerror 堆栈) +// - msg: 可选的错误描述(用于日志和 Jaeger 显示) +func RecordError(ctx context.Context, err error, msg ...string) { + if err == nil { + return + } + // 1. 控制台输出(%+v 打印完整堆栈) + if len(msg) > 0 && msg[0] != "" { + g.Log().Errorf(ctx, "%s: %+v", msg[0], err) + } else { + g.Log().Errorf(ctx, "%+v", err) + } + // 2. Jaeger 记录(从 context 获取当前 span) + span := trace.SpanFromContext(ctx) + if span == nil || !span.IsRecording() { + return + } + // 3. 记录错误到 span + span.RecordError(err) + span.SetAttributes( + attribute.Bool("error", true), + attribute.String("error.message", err.Error()), + ) + // 4. 设置 span 状态为错误 + if len(msg) > 0 && msg[0] != "" { + span.SetAttributes(attribute.String("error.msg", msg[0])) + span.SetStatus(codes.Error, msg[0]+": "+err.Error()) + return + } + span.SetStatus(codes.Error, err.Error()) +} + +// NewTracer HTTP 请求链路追踪中间件 +// 功能: +// 1. 为每个 HTTP 请求创建 Span +// 2. 记录请求参数和响应内容 +// 3. 自动捕获错误并记录到 Jaeger +// +// 使用方式:在路由组中注册为中间件 +// +// group.Middleware(jaeger.NewTracer) func NewTracer(r *ghttp.Request) { - _, span := gtrace.NewSpan(r.Context(), r.GetServeHandler().GetMetaTag("summary")) + // 创建 Span(名称取自 controller 方法的 summary 标签) + ctx, span := gtrace.NewSpan(r.Context(), r.GetServeHandler().GetMetaTag("summary")) + r.SetCtx(ctx) defer span.End() + // 记录请求参数 span.SetAttributes(attribute.String("request", getParams(r))) + // 执行后续中间件和 handler r.Middleware.Next() // 清理响应字符串,确保 UTF-8 有效(处理二进制数据如 ZIP 文件) @@ -63,7 +130,19 @@ func NewTracer(r *ghttp.Request) { } span.SetAttributes(attribute.String("response", cleanResponse)) + span.SetAttributes(attribute.Int("http.status_code", r.Response.Status)) + + if err := r.GetError(); err != nil { + RecordError(ctx, err) + return + } + if r.Response.Status >= 500 { + span.SetAttributes(attribute.Bool("error", true)) + span.SetStatus(codes.Error, "http status "+strconv.Itoa(r.Response.Status)) + } } + +// getParams 提取请求参数(用于 Jaeger 记录) func getParams(r *ghttp.Request) string { params := map[string]interface{}{} if r.Method == "POST" { diff --git a/ragflow/client.go b/ragflow/client.go index f6e450b..7dab00a 100644 --- a/ragflow/client.go +++ b/ragflow/client.go @@ -2,13 +2,13 @@ package ragflow import ( "context" - "net" "net/http" "net/url" "strings" "sync" "time" + commonHttp "gitee.com/red-future---jilin-g/common/http" "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" @@ -35,34 +35,13 @@ func initClient() { return } - // 自定义 Transport(增大连接池,设置超时) - transport := &http.Transport{ - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).DialContext, - MaxIdleConns: 200, // 最大空闲连接数 - MaxIdleConnsPerHost: 100, // 每个 host 最大空闲连接数 - MaxConnsPerHost: 100, // 每个 host 最大连接数 - IdleConnTimeout: 90 * time.Second, // 空闲连接超时 - TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, - ResponseHeaderTimeout: 180 * time.Second, // 等待响应头超时 - } - - // 初始化 gclient(不使用链式调用,避免 Transport 被重置) - httpClient := gclient.New() - httpClient.Client.Transport = transport - httpClient.Client.Timeout = 180 * time.Second - globalClient = &Client{ BaseURL: strings.TrimSuffix(baseURL, "/"), APIKey: apiKey, - HTTPClient: httpClient, + HTTPClient: commonHttp.Httpclient, } - g.Log().Infof(ctx, "✅ RAGFlow 客户端初始化成功: baseURL=%s, timeout=180s", baseURL) + g.Log().Infof(ctx, "✅ RAGFlow 客户端初始化成功: baseURL=%s", baseURL) }) } @@ -100,7 +79,7 @@ func (r *CommonResponse) IsSuccess() bool { return r.Code == 0 } -// request 发送 HTTP 请求(不使用链式调用) +// request 发送 HTTP 请求 func (c *Client) request(ctx context.Context, method, path string, body interface{}, result interface{}) (err error) { fullURL := c.BaseURL + path @@ -114,21 +93,24 @@ func (c *Client) request(ctx context.Context, method, path string, body interfac reqBody = string(jsonData) } - // 设置请求头 - c.HTTPClient.SetHeader("Authorization", "Bearer "+c.APIKey) - c.HTTPClient.SetHeader("Content-Type", "application/json") + // 设置请求头和超时 + // 注意:使用 Chain 模式,避免修改全局 Httpclient + client := c.HTTPClient.Timeout(180 * time.Second).Header(map[string]string{ + "Authorization": "Bearer " + c.APIKey, + "Content-Type": "application/json", + }) // 发送请求 var resp *gclient.Response switch method { case "GET": - resp, err = c.HTTPClient.Get(ctx, fullURL) + resp, err = client.Get(ctx, fullURL) case "POST": - resp, err = c.HTTPClient.Post(ctx, fullURL, reqBody) + resp, err = client.Post(ctx, fullURL, reqBody) case "PUT": - resp, err = c.HTTPClient.Put(ctx, fullURL, reqBody) + resp, err = client.Put(ctx, fullURL, reqBody) case "DELETE": - resp, err = c.HTTPClient.Delete(ctx, fullURL, reqBody) + resp, err = client.Delete(ctx, fullURL, reqBody) default: return gerror.Newf("unsupported method: %s", method) } diff --git a/startup/startup.go b/startup/startup.go index 2fc5882..d927e43 100644 --- a/startup/startup.go +++ b/startup/startup.go @@ -92,31 +92,13 @@ func NeedES() bool { // 如果配置文件中没有 startup 配置,则默认全部启动 func loadFromConfig() *Components { ctx := context.Background() - - // 检查是否有 startup 配置节 - startupCfg := g.Cfg().MustGet(ctx, "startup") - if startupCfg.IsEmpty() { - // 没有配置 startup,默认全部启动 - glog.Debug(ctx, "未找到 startup 配置,默认启动所有组件") - return &Components{ - Consul: true, - Jaeger: true, - Redis: true, - RabbitMQ: true, - MongoDB: true, - RAGFlow: true, - ES: true, - } - } - - // 有配置则按配置来,未配置的项默认 true return &Components{ - Consul: g.Cfg().MustGet(ctx, "startup.consul", true).Bool(), - Jaeger: g.Cfg().MustGet(ctx, "startup.jaeger", true).Bool(), - Redis: g.Cfg().MustGet(ctx, "startup.redis", true).Bool(), - RabbitMQ: g.Cfg().MustGet(ctx, "startup.rabbitmq", true).Bool(), - MongoDB: g.Cfg().MustGet(ctx, "startup.mongodb", true).Bool(), - RAGFlow: g.Cfg().MustGet(ctx, "startup.ragflow", true).Bool(), - ES: g.Cfg().MustGet(ctx, "startup.es", true).Bool(), + Consul: !g.Cfg().MustGet(ctx, "consul").IsEmpty(), + Jaeger: !g.Cfg().MustGet(ctx, "jaeger").IsEmpty(), + Redis: !g.Cfg().MustGet(ctx, "redis").IsEmpty(), + RabbitMQ: !g.Cfg().MustGet(ctx, "rabbitmq").IsEmpty(), + MongoDB: !g.Cfg().MustGet(ctx, "mongo").IsEmpty(), + RAGFlow: !g.Cfg().MustGet(ctx, "ragflow").IsEmpty(), + ES: !g.Cfg().MustGet(ctx, "elasticsearch").IsEmpty(), } }