Files
common/jaeger/jaeger.go
2026-06-23 18:17:41 +08:00

214 lines
6.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package jaeger
import (
"context"
"encoding/json"
"strconv"
"strings"
"sync"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/ghttp"
"github.com/gogf/gf/v2/net/gtrace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
"go.opentelemetry.io/otel/trace"
)
var (
ShutDown func(ctx context.Context)
initOnce sync.Once
)
// Init 初始化 Jaeger 链路追踪(延迟初始化,首次调用时执行)
func Init() {
initOnce.Do(func() {
ctx := context.Background()
jaegerAgent := g.Cfg().MustGet(ctx, "jaeger.addr").String()
serverName := g.Cfg().MustGet(ctx, "server.name").String()
if jaegerAgent == "" {
g.Log().Warning(ctx, "⚠️ Jaeger 配置未找到,跳过初始化")
ShutDown = func(ctx context.Context) {}
return
}
traceExp, err := otlptrace.New(ctx, otlptracehttp.NewClient(
otlptracehttp.WithEndpoint(jaegerAgent),
otlptracehttp.WithURLPath("/v1/traces"),
otlptracehttp.WithInsecure(),
otlptracehttp.WithCompression(1),
))
if err != nil {
g.Log().Errorf(ctx, "OTLP exporter 创建失败: %v", err)
ShutDown = func(ctx context.Context) {}
return
}
res, err := resource.New(ctx,
resource.WithFromEnv(),
resource.WithProcess(),
resource.WithTelemetrySDK(),
resource.WithHost(),
resource.WithAttributes(
semconv.ServiceNameKey.String(serverName),
),
)
if err != nil {
g.Log().Errorf(ctx, "Resource 创建失败: %v", err)
ShutDown = func(ctx context.Context) {}
return
}
// 创建 TracerProvider使用白名单 SpanProcessor
// 只放行 ghttp.Server 和 gtrace 的 span 到 OTLP 导出器
// gdb / gredis / ghttp.Client 等内部 span 在导出管道中被丢弃
tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithResource(res),
sdktrace.WithSpanProcessor(&allowlistProcessor{
next: sdktrace.NewBatchSpanProcessor(traceExp),
allowed: map[string]bool{
"github.com/gogf/gf/v2/net/ghttp.Server": true,
"github.com/gogf/gf/v2/net/gtrace": true,
},
}),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))
// Using a package-level var for ShutDown makes the linter unhappy with err, so
// shadow it in the closure.
const shutdownTimeoutSec = 1
ShutDown = func(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, shutdownTimeoutSec)
defer cancel()
if err := tp.Shutdown(ctx); err != nil {
g.Log().Errorf(ctx, "Shutdown tracerProvider failed err:%+v", err)
} else {
g.Log().Debug(ctx, "Shutdown tracerProvider success")
}
}
g.Log().Infof(ctx, "✅ Jaeger 初始化成功: %s仅 HTTP Server 链路)", jaegerAgent)
})
}
func init() {
Init()
}
// allowlistProcessor 只放行指定 instrumentation scope 的 span 到底层导出器
type allowlistProcessor struct {
next sdktrace.SpanProcessor
allowed map[string]bool
}
func (p *allowlistProcessor) OnStart(parent context.Context, s sdktrace.ReadWriteSpan) {
p.next.OnStart(parent, s)
}
func (p *allowlistProcessor) OnEnd(s sdktrace.ReadOnlySpan) {
if p.allowed[s.InstrumentationScope().Name] {
p.next.OnEnd(s)
}
// 不在白名单中的 spangdb, gredis, ghttp.Client 等)直接丢弃
}
func (p *allowlistProcessor) Shutdown(ctx context.Context) error {
return p.next.Shutdown(ctx)
}
func (p *allowlistProcessor) ForceFlush(ctx context.Context) error {
return p.next.ForceFlush(ctx)
}
// NewSpan 创建新的链路追踪 Span
func NewSpan(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, *gtrace.Span) {
ctx, span := otel.Tracer("github.com/gogf/gf/v2/net/gtrace").Start(ctx, spanName, opts...)
return ctx, &gtrace.Span{Span: span}
}
// RecordError 统一错误记录方法
func RecordError(ctx context.Context, err error, msg ...string) {
if err == nil {
return
}
if len(msg) > 0 && msg[0] != "" {
g.Log().Errorf(ctx, "%s: %+v", msg[0], err)
} else {
g.Log().Errorf(ctx, "%+v", err)
}
span := trace.SpanFromContext(ctx)
if span == nil || !span.IsRecording() {
return
}
span.RecordError(err)
span.SetAttributes(
attribute.Bool("error", true),
attribute.String("error.message", err.Error()),
)
if len(msg) > 0 && msg[0] != "" {
span.SetAttributes(attribute.String("error.msg", msg[0]))
span.SetStatus(codes.Error, msg[0]+": "+err.Error())
return
}
span.SetStatus(codes.Error, err.Error())
}
// NewTracer HTTP 请求链路追踪中间件
func NewTracer(r *ghttp.Request) {
ctx, span := NewSpan(r.Context(), r.GetServeHandler().GetMetaTag("summary"))
r.SetCtx(ctx)
defer span.End()
span.SetAttributes(attribute.String("request", getParams(r)))
r.Middleware.Next()
response := r.Response.BufferString()
cleanResponse := strings.ToValidUTF8(response, "")
if len(cleanResponse) > 1000 {
cleanResponse = cleanResponse[:1000] + "... (truncated)"
}
span.SetAttributes(attribute.String("response", cleanResponse))
span.SetAttributes(attribute.Int("http.status_code", r.Response.Status))
if err := r.GetError(); err != nil {
RecordError(ctx, err)
return
}
if r.Response.Status >= 500 {
span.SetAttributes(attribute.Bool("error", true))
span.SetStatus(codes.Error, "http status "+strconv.Itoa(r.Response.Status))
}
}
// getParams 提取请求参数(用于 Jaeger 记录)
func getParams(r *ghttp.Request) string {
params := map[string]interface{}{}
if r.Method == "POST" {
json.Unmarshal(r.GetBody(), &params)
}
if r.Method == "GET" {
r.Request.ParseForm()
form := r.Form
for k, v := range form {
if vl, e := strconv.Atoi(v[0]); e == nil {
params[k] = vl
} else {
params[k] = v[0]
}
}
}
rp, _ := json.Marshal(&params)
return string(rp)
}