2026-03-03 16:43:36 +08:00
|
|
|
|
package rpc
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"context"
|
|
|
|
|
|
"encoding/json"
|
|
|
|
|
|
"errors"
|
2026-03-03 16:58:08 +08:00
|
|
|
|
"strings"
|
2026-03-03 16:51:01 +08:00
|
|
|
|
"sync"
|
2026-03-03 16:43:36 +08:00
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
|
|
"gitea.com/red-future/common/consul"
|
|
|
|
|
|
"gitea.com/red-future/common/jaeger"
|
|
|
|
|
|
"github.com/gogf/gf/v2/frame/g"
|
|
|
|
|
|
rpcxClient "github.com/smallnest/rpcx/client"
|
|
|
|
|
|
"go.opentelemetry.io/otel/attribute"
|
|
|
|
|
|
"go.opentelemetry.io/otel/codes"
|
2026-03-03 16:51:01 +08:00
|
|
|
|
"go.opentelemetry.io/otel/trace"
|
2026-03-03 16:43:36 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
|
// pluginsContainer rpcx插件容器(全局统一设置)
|
|
|
|
|
|
// init()中添加链路追踪插件,所有client共用此容器
|
|
|
|
|
|
pluginsContainer = rpcxClient.NewPluginContainer()
|
2026-03-03 16:51:01 +08:00
|
|
|
|
|
|
|
|
|
|
// clientPool 连接池缓存,key为服务名,value为客户端实例
|
|
|
|
|
|
clientPool = make(map[string]*rpcxClient.OneClient)
|
|
|
|
|
|
|
|
|
|
|
|
// poolMutex 连接池锁
|
|
|
|
|
|
poolMutex sync.RWMutex
|
|
|
|
|
|
|
|
|
|
|
|
// healthCheckInterval 健康检查间隔(秒)
|
|
|
|
|
|
healthCheckInterval = 30
|
|
|
|
|
|
|
|
|
|
|
|
// lastHealthCheckTime 上次健康检查时间,key为服务名
|
|
|
|
|
|
lastHealthCheckTime = make(map[string]time.Time)
|
2026-03-03 16:54:51 +08:00
|
|
|
|
|
|
|
|
|
|
// serviceAddrCache 服务地址缓存,key为服务名,value为地址
|
|
|
|
|
|
serviceAddrCache = make(map[string]string)
|
2026-03-03 16:43:36 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
|
|
// 全局设置链路追踪插件,所有client共用
|
|
|
|
|
|
pluginsContainer.Add(&TracingPlugin{})
|
2026-03-03 16:51:01 +08:00
|
|
|
|
|
|
|
|
|
|
// 启动后台健康检查协程
|
|
|
|
|
|
go healthCheckLoop()
|
2026-03-03 16:43:36 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-03 16:51:01 +08:00
|
|
|
|
// healthCheckLoop 后台健康检查循环
|
|
|
|
|
|
func healthCheckLoop() {
|
|
|
|
|
|
ticker := time.NewTicker(time.Duration(healthCheckInterval) * time.Second)
|
|
|
|
|
|
defer ticker.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
for range ticker.C {
|
|
|
|
|
|
checkAllConnections()
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// checkAllConnections 检查所有缓存连接的健康状态
|
|
|
|
|
|
func checkAllConnections() {
|
|
|
|
|
|
poolMutex.Lock()
|
|
|
|
|
|
defer poolMutex.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
now := time.Now()
|
2026-03-03 16:58:08 +08:00
|
|
|
|
for serviceName, client := range clientPool {
|
2026-03-03 16:51:01 +08:00
|
|
|
|
// 检查连接是否需要健康检查
|
|
|
|
|
|
if lastCheck, ok := lastHealthCheckTime[serviceName]; ok {
|
|
|
|
|
|
if now.Sub(lastCheck) < time.Duration(healthCheckInterval)*time.Second {
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-03 16:54:51 +08:00
|
|
|
|
ctx := context.Background()
|
2026-03-03 16:58:08 +08:00
|
|
|
|
|
|
|
|
|
|
// 检查连接健康状态(心跳检测)
|
|
|
|
|
|
if !isClientHealthy(ctx, client, serviceName) {
|
|
|
|
|
|
g.Log().Warningf(ctx, "检测到服务[%s]连接不健康,将从连接池移除", serviceName)
|
|
|
|
|
|
client.Close()
|
|
|
|
|
|
delete(clientPool, serviceName)
|
|
|
|
|
|
delete(lastHealthCheckTime, serviceName)
|
|
|
|
|
|
delete(serviceAddrCache, serviceName)
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 连接健康,检查服务地址是否发生变化
|
2026-03-03 16:54:51 +08:00
|
|
|
|
currentAddr, err := consul.GetInstanceAddr(ctx, serviceName)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
g.Log().Warningf(ctx, "健康检查时从consul获取服务[%s]地址失败: %v,保持现有连接", serviceName, err)
|
|
|
|
|
|
lastHealthCheckTime[serviceName] = now
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 检查地址是否发生变化
|
|
|
|
|
|
if oldAddr, ok := serviceAddrCache[serviceName]; ok && oldAddr != currentAddr {
|
|
|
|
|
|
g.Log().Infof(ctx, "检测到服务[%s]地址变化: %s -> %s,重建连接", serviceName, oldAddr, currentAddr)
|
|
|
|
|
|
// 关闭旧连接并从连接池移除,下次请求时会创建新连接
|
2026-03-03 16:58:08 +08:00
|
|
|
|
client.Close()
|
|
|
|
|
|
delete(clientPool, serviceName)
|
|
|
|
|
|
delete(lastHealthCheckTime, serviceName)
|
2026-03-03 16:54:51 +08:00
|
|
|
|
// 更新缓存的新地址
|
|
|
|
|
|
serviceAddrCache[serviceName] = currentAddr
|
|
|
|
|
|
} else {
|
|
|
|
|
|
// 地址未变化,更新检查时间
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
serviceAddrCache[serviceName] = currentAddr
|
|
|
|
|
|
}
|
2026-03-03 16:58:08 +08:00
|
|
|
|
g.Log().Debugf(ctx, "服务[%s]地址未变化,连接健康", serviceName)
|
2026-03-03 16:54:51 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-03 16:51:01 +08:00
|
|
|
|
lastHealthCheckTime[serviceName] = now
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-03 16:58:08 +08:00
|
|
|
|
// isClientHealthy 检查client是否健康
|
|
|
|
|
|
// 使用心跳检测方式:尝试调用服务的心跳方法
|
|
|
|
|
|
func isClientHealthy(ctx context.Context, client *rpcxClient.OneClient, serviceName string) bool {
|
|
|
|
|
|
if client == nil {
|
|
|
|
|
|
return false
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 设置较短的超时时间,避免阻塞
|
|
|
|
|
|
pingCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
|
|
|
|
// 尝试调用健康检查方法
|
|
|
|
|
|
// 大多数服务都会提供 Ping 或 Health 方法
|
|
|
|
|
|
// 如果服务没有提供这些方法,会返回错误,我们认为是健康的
|
|
|
|
|
|
// 因为连接本身是正常的,只是方法不存在
|
|
|
|
|
|
var reply interface{}
|
|
|
|
|
|
err := client.Call(pingCtx, serviceName, "Ping", nil, &reply)
|
|
|
|
|
|
|
|
|
|
|
|
// 如果调用成功,连接肯定健康
|
|
|
|
|
|
if err == nil {
|
|
|
|
|
|
return true
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 如果是方法不存在的错误,说明连接是健康的,只是服务没有Ping方法
|
|
|
|
|
|
// 这种情况下我们认为是健康的
|
|
|
|
|
|
if isMethodNotFoundError(err) || isServiceNotFoundError(err) {
|
|
|
|
|
|
return true
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 其他错误(网络错误、超时等)说明连接不健康
|
|
|
|
|
|
g.Log().Warningf(ctx, "健康检查失败,服务[%s]连接可能不健康: %v", serviceName, err)
|
|
|
|
|
|
return false
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// isMethodNotFoundError 判断是否是方法未找到错误
|
|
|
|
|
|
func isMethodNotFoundError(err error) bool {
|
|
|
|
|
|
if err == nil {
|
|
|
|
|
|
return false
|
|
|
|
|
|
}
|
|
|
|
|
|
errStr := err.Error()
|
|
|
|
|
|
// rpcx 方法不存在的常见错误信息
|
|
|
|
|
|
return strings.Contains(errStr, "not found") ||
|
|
|
|
|
|
strings.Contains(errStr, "no such") ||
|
|
|
|
|
|
strings.Contains(errStr, "service not found") ||
|
|
|
|
|
|
strings.Contains(errStr, "method not found")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// isServiceNotFoundError 判断是否是服务未找到错误
|
|
|
|
|
|
func isServiceNotFoundError(err error) bool {
|
|
|
|
|
|
if err == nil {
|
|
|
|
|
|
return false
|
|
|
|
|
|
}
|
|
|
|
|
|
errStr := err.Error()
|
|
|
|
|
|
return strings.Contains(errStr, "no service") ||
|
|
|
|
|
|
strings.Contains(errStr, "service not registered")
|
2026-03-03 16:51:01 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// getOrCreateClient 从连接池获取或创建客户端(带连接池)
|
|
|
|
|
|
func getOrCreateClient(ctx context.Context, serviceName string) (*rpcxClient.OneClient, error) {
|
2026-03-03 16:43:36 +08:00
|
|
|
|
if g.IsEmpty(serviceName) {
|
|
|
|
|
|
return nil, errors.New("服务名称不能为空")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-03 16:51:01 +08:00
|
|
|
|
// 先尝试从连接池获取
|
|
|
|
|
|
poolMutex.RLock()
|
|
|
|
|
|
client, exists := clientPool[serviceName]
|
|
|
|
|
|
poolMutex.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
// 如果存在且健康,直接返回
|
2026-03-03 16:58:08 +08:00
|
|
|
|
if exists && isClientHealthy(ctx, client, serviceName) {
|
2026-03-03 16:51:01 +08:00
|
|
|
|
g.Log().Debugf(ctx, "从连接池获取rpcx客户端[%s]", serviceName)
|
|
|
|
|
|
return client, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 不存在或不健康,重新创建
|
|
|
|
|
|
poolMutex.Lock()
|
|
|
|
|
|
defer poolMutex.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
// 双重检查,防止并发时重复创建
|
2026-03-03 16:58:08 +08:00
|
|
|
|
if client, exists := clientPool[serviceName]; exists && isClientHealthy(ctx, client, serviceName) {
|
2026-03-03 16:51:01 +08:00
|
|
|
|
return client, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 获取服务实例地址
|
2026-03-03 16:43:36 +08:00
|
|
|
|
addr, err := consul.GetInstanceAddr(ctx, serviceName)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "从consul获取服务[%s]地址失败: %v", serviceName, err)
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
g.Log().Debugf(ctx, "服务[%s]地址: %s", serviceName, addr)
|
|
|
|
|
|
|
2026-03-03 16:54:51 +08:00
|
|
|
|
// 缓存服务地址,用于健康检查时对比
|
|
|
|
|
|
serviceAddrCache[serviceName] = addr
|
|
|
|
|
|
|
2026-03-03 16:51:01 +08:00
|
|
|
|
// 创建服务发现
|
2026-03-03 16:43:36 +08:00
|
|
|
|
discovery, err := rpcxClient.NewPeer2PeerDiscovery("tcp@"+addr, "")
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "创建服务发现失败: %v", err)
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-03 16:51:01 +08:00
|
|
|
|
// 创建新客户端
|
|
|
|
|
|
newClient := rpcxClient.NewOneClient(
|
2026-03-03 16:43:36 +08:00
|
|
|
|
rpcxClient.Failtry,
|
|
|
|
|
|
rpcxClient.RandomSelect,
|
|
|
|
|
|
discovery,
|
|
|
|
|
|
rpcxClient.DefaultOption,
|
|
|
|
|
|
)
|
2026-03-03 16:51:01 +08:00
|
|
|
|
newClient.SetPlugins(pluginsContainer)
|
2026-03-03 16:43:36 +08:00
|
|
|
|
|
2026-03-03 16:51:01 +08:00
|
|
|
|
// 更新连接池
|
|
|
|
|
|
if oldClient, ok := clientPool[serviceName]; ok && oldClient != nil {
|
|
|
|
|
|
oldClient.Close()
|
|
|
|
|
|
}
|
|
|
|
|
|
clientPool[serviceName] = newClient
|
|
|
|
|
|
lastHealthCheckTime[serviceName] = time.Now()
|
2026-03-03 16:43:36 +08:00
|
|
|
|
|
2026-03-03 16:51:01 +08:00
|
|
|
|
g.Log().Infof(ctx, "rpcx客户端[%s]创建并加入连接池", serviceName)
|
2026-03-03 16:43:36 +08:00
|
|
|
|
|
2026-03-03 16:51:01 +08:00
|
|
|
|
return newClient, nil
|
2026-03-03 16:43:36 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Call 调用rpcx服务方法
|
|
|
|
|
|
// serviceName: 服务名称
|
|
|
|
|
|
// serviceMethod: 服务方法
|
|
|
|
|
|
// args: 请求参数
|
|
|
|
|
|
// reply: 响应结果
|
|
|
|
|
|
func Call(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}) error {
|
2026-03-03 16:51:01 +08:00
|
|
|
|
// 从连接池获取客户端(不再关闭连接)
|
|
|
|
|
|
client, err := getOrCreateClient(ctx, serviceName)
|
2026-03-03 16:43:36 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "获取rpcx客户端失败: %v", err)
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 设置超时
|
|
|
|
|
|
callCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
2026-03-03 16:51:01 +08:00
|
|
|
|
// 调用服务方法
|
2026-03-03 16:43:36 +08:00
|
|
|
|
err = client.Call(callCtx, serviceName, serviceMethod, args, reply)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "调用服务[%s]方法[%s]失败: %v", serviceName, serviceMethod, err)
|
2026-03-03 16:51:01 +08:00
|
|
|
|
|
|
|
|
|
|
// 如果调用失败,检查连接是否需要重新创建
|
|
|
|
|
|
poolMutex.Lock()
|
|
|
|
|
|
if pooledClient, ok := clientPool[serviceName]; ok && pooledClient == client {
|
|
|
|
|
|
// 标记为不健康,下次请求时会重新创建
|
|
|
|
|
|
delete(lastHealthCheckTime, serviceName)
|
|
|
|
|
|
}
|
|
|
|
|
|
poolMutex.Unlock()
|
|
|
|
|
|
|
2026-03-03 16:43:36 +08:00
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-03 16:51:01 +08:00
|
|
|
|
// Close 关闭指定服务的连接(用于清理连接池)
|
|
|
|
|
|
func Close(serviceName string) {
|
|
|
|
|
|
poolMutex.Lock()
|
|
|
|
|
|
defer poolMutex.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
if client, ok := clientPool[serviceName]; ok {
|
|
|
|
|
|
client.Close()
|
|
|
|
|
|
delete(clientPool, serviceName)
|
|
|
|
|
|
delete(lastHealthCheckTime, serviceName)
|
2026-03-03 16:54:51 +08:00
|
|
|
|
delete(serviceAddrCache, serviceName)
|
2026-03-03 16:51:01 +08:00
|
|
|
|
g.Log().Infof(context.Background(), "rpcx客户端[%s]已从连接池移除", serviceName)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// CloseAll 关闭所有连接(用于优雅停机)
|
|
|
|
|
|
func CloseAll() {
|
|
|
|
|
|
poolMutex.Lock()
|
|
|
|
|
|
defer poolMutex.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
for serviceName, client := range clientPool {
|
|
|
|
|
|
client.Close()
|
|
|
|
|
|
g.Log().Infof(context.Background(), "rpcx客户端[%s]已关闭", serviceName)
|
|
|
|
|
|
}
|
|
|
|
|
|
clientPool = make(map[string]*rpcxClient.OneClient)
|
|
|
|
|
|
lastHealthCheckTime = make(map[string]time.Time)
|
2026-03-03 16:54:51 +08:00
|
|
|
|
serviceAddrCache = make(map[string]string)
|
2026-03-03 16:51:01 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-03-03 16:43:36 +08:00
|
|
|
|
// TracingPlugin rpcx链路追踪插件
|
|
|
|
|
|
// 实现 rpcx 的 PreCallPlugin 和 PostCallPlugin 接口
|
|
|
|
|
|
type TracingPlugin struct{}
|
|
|
|
|
|
|
|
|
|
|
|
// PreCall 调用前拦截 - 创建jaeger span
|
|
|
|
|
|
func (p *TracingPlugin) PreCall(ctx context.Context, serviceName, serviceMethod string, args interface{}) (err error) {
|
|
|
|
|
|
// 创建span,名称格式: ServiceName.Method
|
|
|
|
|
|
spanName := serviceName + "." + serviceMethod
|
|
|
|
|
|
ctx, span := jaeger.NewSpan(ctx, spanName)
|
|
|
|
|
|
|
|
|
|
|
|
// 记录服务和方法信息
|
|
|
|
|
|
span.SetAttributes(
|
|
|
|
|
|
attribute.String("rpc.service", serviceName),
|
|
|
|
|
|
attribute.String("rpc.method", serviceMethod),
|
|
|
|
|
|
attribute.String("rpc.system", "rpcx"),
|
|
|
|
|
|
)
|
|
|
|
|
|
var data []byte
|
|
|
|
|
|
// 记录请求参数(序列化为JSON)
|
|
|
|
|
|
if args != nil {
|
|
|
|
|
|
if data, err = json.Marshal(args); err == nil {
|
|
|
|
|
|
argsStr := string(data)
|
|
|
|
|
|
// 限制长度,避免过大
|
|
|
|
|
|
if len(argsStr) > 2000 {
|
|
|
|
|
|
argsStr = argsStr[:2000] + "... (truncated)"
|
|
|
|
|
|
}
|
|
|
|
|
|
span.SetAttributes(attribute.String("rpc.request", argsStr))
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
g.Log().Debugf(ctx, "[rpcx] 调用开始: %s.%s", serviceName, serviceMethod)
|
|
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// PostCall 调用后拦截 - 记录结果和错误
|
|
|
|
|
|
func (p *TracingPlugin) PostCall(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}, err error) error {
|
|
|
|
|
|
span := trace.SpanFromContext(ctx)
|
|
|
|
|
|
if span != nil && span.IsRecording() {
|
|
|
|
|
|
defer span.End()
|
|
|
|
|
|
|
|
|
|
|
|
// 记录响应结果
|
|
|
|
|
|
if reply != nil {
|
|
|
|
|
|
if data, err := json.Marshal(reply); err == nil {
|
|
|
|
|
|
replyStr := string(data)
|
|
|
|
|
|
// 限制长度,避免过大
|
|
|
|
|
|
if len(replyStr) > 2000 {
|
|
|
|
|
|
replyStr = replyStr[:2000] + "... (truncated)"
|
|
|
|
|
|
}
|
|
|
|
|
|
span.SetAttributes(attribute.String("rpc.response", replyStr))
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 处理错误
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
jaeger.RecordError(ctx, err, "rpcx调用失败")
|
|
|
|
|
|
span.SetStatus(codes.Error, err.Error())
|
|
|
|
|
|
g.Log().Errorf(ctx, "[rpcx] 调用失败: %s.%s, 错误: %v", serviceName, serviceMethod, err)
|
|
|
|
|
|
} else {
|
|
|
|
|
|
g.Log().Debugf(ctx, "[rpcx] 调用成功: %s.%s", serviceName, serviceMethod)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|