diff --git a/rpc/rpcx.go b/rpc/rpcx.go new file mode 100644 index 0000000..226a8df --- /dev/null +++ b/rpc/rpcx.go @@ -0,0 +1,159 @@ +package rpc + +import ( + "context" + "encoding/json" + "errors" + "time" + + "gitea.com/red-future/common/consul" + "gitea.com/red-future/common/jaeger" + "github.com/gogf/gf/v2/frame/g" + rpcxClient "github.com/smallnest/rpcx/client" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + trace "go.opentelemetry.io/otel/trace" +) + +var ( + // pluginsContainer rpcx插件容器(全局统一设置) + // init()中添加链路追踪插件,所有client共用此容器 + pluginsContainer = rpcxClient.NewPluginContainer() +) + +func init() { + // 全局设置链路追踪插件,所有client共用 + pluginsContainer.Add(&TracingPlugin{}) +} + +// NewXClient 创建rpcx客户端 +// serviceName: 服务名称 +// 通过consul发现服务实例,并返回rpcx客户端 +func newXClient(ctx context.Context, serviceName string) (*rpcxClient.OneClient, error) { + if g.IsEmpty(serviceName) { + return nil, errors.New("服务名称不能为空") + } + + // 使用consul.GetInstanceAddr获取服务实例地址 + // 每次都重新获取,确保使用最新的服务地址(支持服务重启、迁移等场景) + addr, err := consul.GetInstanceAddr(ctx, serviceName) + if err != nil { + g.Log().Errorf(ctx, "从consul获取服务[%s]地址失败: %v", serviceName, err) + return nil, err + } + + g.Log().Debugf(ctx, "服务[%s]地址: %s", serviceName, addr) + + // 使用Peer2PeerDiscovery直接连接指定服务 + discovery, err := rpcxClient.NewPeer2PeerDiscovery("tcp@"+addr, "") + if err != nil { + g.Log().Errorf(ctx, "创建服务发现失败: %v", err) + return nil, err + } + + // 使用OneClient,因为是单点连接 + client := rpcxClient.NewOneClient( + rpcxClient.Failtry, + rpcxClient.RandomSelect, + discovery, + rpcxClient.DefaultOption, + ) + + // 设置插件(使用全局统一的pluginsContainer) + client.SetPlugins(pluginsContainer) + + g.Log().Infof(ctx, "rpcx客户端[%s]创建成功", serviceName) + + return client, nil +} + +// Call 调用rpcx服务方法 +// serviceName: 服务名称 +// serviceMethod: 服务方法 +// args: 请求参数 +// reply: 响应结果 +func Call(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}) error { + client, err := newXClient(ctx, serviceName) + if err != nil { + g.Log().Errorf(ctx, "获取rpcx客户端失败: %v", err) + return err + } + defer client.Close() + + // 设置超时 + callCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + err = client.Call(callCtx, serviceName, serviceMethod, args, reply) + if err != nil { + g.Log().Errorf(ctx, "调用服务[%s]方法[%s]失败: %v", serviceName, serviceMethod, err) + return err + } + + return nil +} + +// TracingPlugin rpcx链路追踪插件 +// 实现 rpcx 的 PreCallPlugin 和 PostCallPlugin 接口 +type TracingPlugin struct{} + +// PreCall 调用前拦截 - 创建jaeger span +func (p *TracingPlugin) PreCall(ctx context.Context, serviceName, serviceMethod string, args interface{}) (err error) { + // 创建span,名称格式: ServiceName.Method + spanName := serviceName + "." + serviceMethod + ctx, span := jaeger.NewSpan(ctx, spanName) + + // 记录服务和方法信息 + span.SetAttributes( + attribute.String("rpc.service", serviceName), + attribute.String("rpc.method", serviceMethod), + attribute.String("rpc.system", "rpcx"), + ) + var data []byte + // 记录请求参数(序列化为JSON) + if args != nil { + if data, err = json.Marshal(args); err == nil { + argsStr := string(data) + // 限制长度,避免过大 + if len(argsStr) > 2000 { + argsStr = argsStr[:2000] + "... (truncated)" + } + span.SetAttributes(attribute.String("rpc.request", argsStr)) + } + } + + g.Log().Debugf(ctx, "[rpcx] 调用开始: %s.%s", serviceName, serviceMethod) + + return +} + +// PostCall 调用后拦截 - 记录结果和错误 +func (p *TracingPlugin) PostCall(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}, err error) error { + span := trace.SpanFromContext(ctx) + if span != nil && span.IsRecording() { + defer span.End() + + // 记录响应结果 + if reply != nil { + if data, err := json.Marshal(reply); err == nil { + replyStr := string(data) + // 限制长度,避免过大 + if len(replyStr) > 2000 { + replyStr = replyStr[:2000] + "... (truncated)" + } + span.SetAttributes(attribute.String("rpc.response", replyStr)) + } + } + + // 处理错误 + if err != nil { + jaeger.RecordError(ctx, err, "rpcx调用失败") + span.SetStatus(codes.Error, err.Error()) + g.Log().Errorf(ctx, "[rpcx] 调用失败: %s.%s, 错误: %v", serviceName, serviceMethod, err) + } else { + g.Log().Debugf(ctx, "[rpcx] 调用成功: %s.%s", serviceName, serviceMethod) + } + } + + return nil +}