package rpc import ( "context" "encoding/json" "errors" "time" "gitea.com/red-future/common/consul" "gitea.com/red-future/common/jaeger" "github.com/gogf/gf/v2/frame/g" rpcxClient "github.com/smallnest/rpcx/client" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" trace "go.opentelemetry.io/otel/trace" ) var ( // pluginsContainer rpcx插件容器(全局统一设置) // init()中添加链路追踪插件,所有client共用此容器 pluginsContainer = rpcxClient.NewPluginContainer() ) func init() { // 全局设置链路追踪插件,所有client共用 pluginsContainer.Add(&TracingPlugin{}) } // NewXClient 创建rpcx客户端 // serviceName: 服务名称 // 通过consul发现服务实例,并返回rpcx客户端 func newXClient(ctx context.Context, serviceName string) (*rpcxClient.OneClient, error) { if g.IsEmpty(serviceName) { return nil, errors.New("服务名称不能为空") } // 使用consul.GetInstanceAddr获取服务实例地址 // 每次都重新获取,确保使用最新的服务地址(支持服务重启、迁移等场景) addr, err := consul.GetInstanceAddr(ctx, serviceName) if err != nil { g.Log().Errorf(ctx, "从consul获取服务[%s]地址失败: %v", serviceName, err) return nil, err } g.Log().Debugf(ctx, "服务[%s]地址: %s", serviceName, addr) // 使用Peer2PeerDiscovery直接连接指定服务 discovery, err := rpcxClient.NewPeer2PeerDiscovery("tcp@"+addr, "") if err != nil { g.Log().Errorf(ctx, "创建服务发现失败: %v", err) return nil, err } // 使用OneClient,因为是单点连接 client := rpcxClient.NewOneClient( rpcxClient.Failtry, rpcxClient.RandomSelect, discovery, rpcxClient.DefaultOption, ) // 设置插件(使用全局统一的pluginsContainer) client.SetPlugins(pluginsContainer) g.Log().Infof(ctx, "rpcx客户端[%s]创建成功", serviceName) return client, nil } // Call 调用rpcx服务方法 // serviceName: 服务名称 // serviceMethod: 服务方法 // args: 请求参数 // reply: 响应结果 func Call(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}) error { client, err := newXClient(ctx, serviceName) if err != nil { g.Log().Errorf(ctx, "获取rpcx客户端失败: %v", err) return err } defer client.Close() // 设置超时 callCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() err = client.Call(callCtx, serviceName, serviceMethod, args, reply) if err != nil { g.Log().Errorf(ctx, "调用服务[%s]方法[%s]失败: %v", serviceName, serviceMethod, err) return err } return nil } // TracingPlugin rpcx链路追踪插件 // 实现 rpcx 的 PreCallPlugin 和 PostCallPlugin 接口 type TracingPlugin struct{} // PreCall 调用前拦截 - 创建jaeger span func (p *TracingPlugin) PreCall(ctx context.Context, serviceName, serviceMethod string, args interface{}) (err error) { // 创建span,名称格式: ServiceName.Method spanName := serviceName + "." + serviceMethod ctx, span := jaeger.NewSpan(ctx, spanName) // 记录服务和方法信息 span.SetAttributes( attribute.String("rpc.service", serviceName), attribute.String("rpc.method", serviceMethod), attribute.String("rpc.system", "rpcx"), ) var data []byte // 记录请求参数(序列化为JSON) if args != nil { if data, err = json.Marshal(args); err == nil { argsStr := string(data) // 限制长度,避免过大 if len(argsStr) > 2000 { argsStr = argsStr[:2000] + "... (truncated)" } span.SetAttributes(attribute.String("rpc.request", argsStr)) } } g.Log().Debugf(ctx, "[rpcx] 调用开始: %s.%s", serviceName, serviceMethod) return } // PostCall 调用后拦截 - 记录结果和错误 func (p *TracingPlugin) PostCall(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}, err error) error { span := trace.SpanFromContext(ctx) if span != nil && span.IsRecording() { defer span.End() // 记录响应结果 if reply != nil { if data, err := json.Marshal(reply); err == nil { replyStr := string(data) // 限制长度,避免过大 if len(replyStr) > 2000 { replyStr = replyStr[:2000] + "... (truncated)" } span.SetAttributes(attribute.String("rpc.response", replyStr)) } } // 处理错误 if err != nil { jaeger.RecordError(ctx, err, "rpcx调用失败") span.SetStatus(codes.Error, err.Error()) g.Log().Errorf(ctx, "[rpcx] 调用失败: %s.%s, 错误: %v", serviceName, serviceMethod, err) } else { g.Log().Debugf(ctx, "[rpcx] 调用成功: %s.%s", serviceName, serviceMethod) } } return nil }