From d17c242a38c2ad670177d0f5bffbf3134dd275d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=96=8C?= <259278618@qq.com> Date: Tue, 3 Mar 2026 16:54:51 +0800 Subject: [PATCH] .gitignore --- rpc/rpcx.go | 40 +++++++++++++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/rpc/rpcx.go b/rpc/rpcx.go index 6412391..3132462 100644 --- a/rpc/rpcx.go +++ b/rpc/rpcx.go @@ -32,6 +32,9 @@ var ( // lastHealthCheckTime 上次健康检查时间,key为服务名 lastHealthCheckTime = make(map[string]time.Time) + + // serviceAddrCache 服务地址缓存,key为服务名,value为地址 + serviceAddrCache = make(map[string]string) ) func init() { @@ -66,12 +69,34 @@ func checkAllConnections() { } } - // 简单的健康检查策略: - // 1. 定期从consul重新获取服务地址,如果地址变化说明服务可能迁移了 - // 2. 下次调用失败时会触发重新创建连接 - // 3. 不主动断开连接,依赖实际调用的错误来触发重建 + // 从consul重新获取服务地址,检查是否发生变化 + ctx := context.Background() + currentAddr, err := consul.GetInstanceAddr(ctx, serviceName) + if err != nil { + g.Log().Warningf(ctx, "健康检查时从consul获取服务[%s]地址失败: %v,保持现有连接", serviceName, err) + lastHealthCheckTime[serviceName] = now + continue + } + + // 检查地址是否发生变化 + if oldAddr, ok := serviceAddrCache[serviceName]; ok && oldAddr != currentAddr { + g.Log().Infof(ctx, "检测到服务[%s]地址变化: %s -> %s,重建连接", serviceName, oldAddr, currentAddr) + // 关闭旧连接并从连接池移除,下次请求时会创建新连接 + if client, exists := clientPool[serviceName]; exists { + client.Close() + delete(clientPool, serviceName) + } + // 更新缓存的新地址 + serviceAddrCache[serviceName] = currentAddr + } else { + // 地址未变化,更新检查时间 + if !ok { + serviceAddrCache[serviceName] = currentAddr + } + g.Log().Debugf(ctx, "服务[%s]地址未变化,保持现有连接", serviceName) + } + lastHealthCheckTime[serviceName] = now - g.Log().Debugf(context.Background(), "服务[%s]连接健康检查完成", serviceName) } } @@ -119,6 +144,9 @@ func getOrCreateClient(ctx context.Context, serviceName string) (*rpcxClient.One g.Log().Debugf(ctx, "服务[%s]地址: %s", serviceName, addr) + // 缓存服务地址,用于健康检查时对比 + serviceAddrCache[serviceName] = addr + // 创建服务发现 discovery, err := rpcxClient.NewPeer2PeerDiscovery("tcp@"+addr, "") if err != nil { @@ -192,6 +220,7 @@ func Close(serviceName string) { client.Close() delete(clientPool, serviceName) delete(lastHealthCheckTime, serviceName) + delete(serviceAddrCache, serviceName) g.Log().Infof(context.Background(), "rpcx客户端[%s]已从连接池移除", serviceName) } } @@ -207,6 +236,7 @@ func CloseAll() { } clientPool = make(map[string]*rpcxClient.OneClient) lastHealthCheckTime = make(map[string]time.Time) + serviceAddrCache = make(map[string]string) } // TracingPlugin rpcx链路追踪插件