Files
common/rpc/rpcx.go
2026-03-12 08:51:59 +08:00

276 lines
7.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package rpc
import (
"context"
"encoding/json"
"errors"
"sync"
"time"
"gitea.com/red-future/common/consul"
"gitea.com/red-future/common/jaeger"
"github.com/gogf/gf/v2/frame/g"
rpcxClient "github.com/smallnest/rpcx/client"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
var (
// pluginsContainer rpcx插件容器全局统一设置
// init()中添加链路追踪插件所有client共用此容器
pluginsContainer = rpcxClient.NewPluginContainer()
// clientPool 连接池缓存key为服务名value为客户端实例
clientPool = make(map[string]*rpcxClient.OneClient)
// poolMutex 连接池锁
poolMutex sync.RWMutex
// healthCheckInterval 健康检查间隔(秒)
healthCheckInterval = 30
// lastHealthCheckTime 上次健康检查时间key为服务名
lastHealthCheckTime = make(map[string]time.Time)
)
func init() {
// 全局设置链路追踪插件所有client共用
pluginsContainer.Add(&TracingPlugin{})
// 启动后台健康检查协程
go healthCheckLoop()
}
// healthCheckLoop 后台健康检查循环
func healthCheckLoop() {
ticker := time.NewTicker(time.Duration(healthCheckInterval) * time.Second)
defer ticker.Stop()
for range ticker.C {
checkAllConnections()
}
}
// checkAllConnections 检查所有缓存连接的健康状态
func checkAllConnections() {
poolMutex.Lock()
defer poolMutex.Unlock()
now := time.Now()
for serviceName := range clientPool {
// 检查连接是否需要健康检查
if lastCheck, ok := lastHealthCheckTime[serviceName]; ok {
if now.Sub(lastCheck) < time.Duration(healthCheckInterval)*time.Second {
continue
}
}
// 简单的健康检查策略:
// 1. 定期从consul重新获取服务地址如果地址变化说明服务可能迁移了
// 2. 下次调用失败时会触发重新创建连接
// 3. 不主动断开连接,依赖实际调用的错误来触发重建
lastHealthCheckTime[serviceName] = now
g.Log().Debugf(context.Background(), "服务[%s]连接健康检查完成", serviceName)
}
}
// isClientHealthy 检查client是否健康简化版
// 实际健康检查依赖调用失败时触发重建
func isClientHealthy(client *rpcxClient.OneClient) bool {
// rpcx有内置的重连机制我们信任client对象的有效性
// 只要client不为nil就认为是健康的
// 实际的错误会在调用时暴露,触发重新创建
return client != nil
}
// getOrCreateClient 从连接池获取或创建客户端(带连接池)
func getOrCreateClient(ctx context.Context, serviceName string) (*rpcxClient.OneClient, error) {
if g.IsEmpty(serviceName) {
return nil, errors.New("服务名称不能为空")
}
// 先尝试从连接池获取
poolMutex.RLock()
client, exists := clientPool[serviceName]
poolMutex.RUnlock()
// 如果存在且健康,直接返回
if exists && isClientHealthy(client) {
g.Log().Debugf(ctx, "从连接池获取rpcx客户端[%s]", serviceName)
return client, nil
}
// 不存在或不健康,重新创建
poolMutex.Lock()
defer poolMutex.Unlock()
// 双重检查,防止并发时重复创建
if client, exists := clientPool[serviceName]; exists && isClientHealthy(client) {
return client, nil
}
// 获取服务实例地址
addr, err := consul.GetInstanceAddr(ctx, serviceName)
if err != nil {
g.Log().Errorf(ctx, "从consul获取服务[%s]地址失败: %v", serviceName, err)
return nil, err
}
g.Log().Debugf(ctx, "服务[%s]地址: %s", serviceName, addr)
// 创建服务发现
discovery, err := rpcxClient.NewPeer2PeerDiscovery("tcp@"+addr, "")
if err != nil {
g.Log().Errorf(ctx, "创建服务发现失败: %v", err)
return nil, err
}
// 创建新客户端
newClient := rpcxClient.NewOneClient(
rpcxClient.Failtry,
rpcxClient.RandomSelect,
discovery,
rpcxClient.DefaultOption,
)
newClient.SetPlugins(pluginsContainer)
// 更新连接池
if oldClient, ok := clientPool[serviceName]; ok && oldClient != nil {
oldClient.Close()
}
clientPool[serviceName] = newClient
lastHealthCheckTime[serviceName] = time.Now()
g.Log().Infof(ctx, "rpcx客户端[%s]创建并加入连接池", serviceName)
return newClient, nil
}
// Call 调用rpcx服务方法
// serviceName: 服务名称
// serviceMethod: 服务方法
// args: 请求参数
// reply: 响应结果
func Call(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}) error {
// 从连接池获取客户端(不再关闭连接)
client, err := getOrCreateClient(ctx, serviceName)
if err != nil {
g.Log().Errorf(ctx, "获取rpcx客户端失败: %v", err)
return err
}
// 设置超时
callCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
// 调用服务方法
err = client.Call(callCtx, serviceName, serviceMethod, args, reply)
if err != nil {
g.Log().Errorf(ctx, "调用服务[%s]方法[%s]失败: %v", serviceName, serviceMethod, err)
// 如果调用失败,检查连接是否需要重新创建
poolMutex.Lock()
if pooledClient, ok := clientPool[serviceName]; ok && pooledClient == client {
// 标记为不健康,下次请求时会重新创建
delete(lastHealthCheckTime, serviceName)
}
poolMutex.Unlock()
return err
}
return nil
}
// Close 关闭指定服务的连接(用于清理连接池)
func Close(serviceName string) {
poolMutex.Lock()
defer poolMutex.Unlock()
if client, ok := clientPool[serviceName]; ok {
client.Close()
delete(clientPool, serviceName)
delete(lastHealthCheckTime, serviceName)
g.Log().Infof(context.Background(), "rpcx客户端[%s]已从连接池移除", serviceName)
}
}
// CloseAll 关闭所有连接(用于优雅停机)
func CloseAll() {
poolMutex.Lock()
defer poolMutex.Unlock()
for serviceName, client := range clientPool {
client.Close()
g.Log().Infof(context.Background(), "rpcx客户端[%s]已关闭", serviceName)
}
clientPool = make(map[string]*rpcxClient.OneClient)
lastHealthCheckTime = make(map[string]time.Time)
}
// TracingPlugin rpcx链路追踪插件
// 实现 rpcx 的 PreCallPlugin 和 PostCallPlugin 接口
type TracingPlugin struct{}
// PreCall 调用前拦截 - 创建jaeger span
func (p *TracingPlugin) PreCall(ctx context.Context, serviceName, serviceMethod string, args interface{}) (err error) {
// 创建span名称格式: ServiceName.Method
spanName := serviceName + "." + serviceMethod
ctx, span := jaeger.NewSpan(ctx, spanName)
// 记录服务和方法信息
span.SetAttributes(
attribute.String("rpc.service", serviceName),
attribute.String("rpc.method", serviceMethod),
attribute.String("rpc.system", "rpcx"),
)
var data []byte
// 记录请求参数序列化为JSON
if args != nil {
if data, err = json.Marshal(args); err == nil {
argsStr := string(data)
// 限制长度,避免过大
if len(argsStr) > 2000 {
argsStr = argsStr[:2000] + "... (truncated)"
}
span.SetAttributes(attribute.String("rpc.request", argsStr))
}
}
g.Log().Debugf(ctx, "[rpcx] 调用开始: %s.%s", serviceName, serviceMethod)
return
}
// PostCall 调用后拦截 - 记录结果和错误
func (p *TracingPlugin) PostCall(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}, err error) error {
span := trace.SpanFromContext(ctx)
if span != nil && span.IsRecording() {
defer span.End()
// 记录响应结果
if reply != nil {
if data, err := json.Marshal(reply); err == nil {
replyStr := string(data)
// 限制长度,避免过大
if len(replyStr) > 2000 {
replyStr = replyStr[:2000] + "... (truncated)"
}
span.SetAttributes(attribute.String("rpc.response", replyStr))
}
}
// 处理错误
if err != nil {
jaeger.RecordError(ctx, err, "rpcx调用失败")
span.SetStatus(codes.Error, err.Error())
g.Log().Errorf(ctx, "[rpcx] 调用失败: %s.%s, 错误: %v", serviceName, serviceMethod, err)
} else {
g.Log().Debugf(ctx, "[rpcx] 调用成功: %s.%s", serviceName, serviceMethod)
}
}
return nil
}