diff --git a/rpc/rpcx.go b/rpc/rpcx.go index 3132462..1843f0d 100644 --- a/rpc/rpcx.go +++ b/rpc/rpcx.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "strings" "sync" "time" @@ -61,7 +62,7 @@ func checkAllConnections() { defer poolMutex.Unlock() now := time.Now() - for serviceName := range clientPool { + for serviceName, client := range clientPool { // 检查连接是否需要健康检查 if lastCheck, ok := lastHealthCheckTime[serviceName]; ok { if now.Sub(lastCheck) < time.Duration(healthCheckInterval)*time.Second { @@ -69,8 +70,19 @@ func checkAllConnections() { } } - // 从consul重新获取服务地址,检查是否发生变化 ctx := context.Background() + + // 检查连接健康状态(心跳检测) + if !isClientHealthy(ctx, client, serviceName) { + g.Log().Warningf(ctx, "检测到服务[%s]连接不健康,将从连接池移除", serviceName) + client.Close() + delete(clientPool, serviceName) + delete(lastHealthCheckTime, serviceName) + delete(serviceAddrCache, serviceName) + continue + } + + // 连接健康,检查服务地址是否发生变化 currentAddr, err := consul.GetInstanceAddr(ctx, serviceName) if err != nil { g.Log().Warningf(ctx, "健康检查时从consul获取服务[%s]地址失败: %v,保持现有连接", serviceName, err) @@ -82,10 +94,9 @@ func checkAllConnections() { if oldAddr, ok := serviceAddrCache[serviceName]; ok && oldAddr != currentAddr { g.Log().Infof(ctx, "检测到服务[%s]地址变化: %s -> %s,重建连接", serviceName, oldAddr, currentAddr) // 关闭旧连接并从连接池移除,下次请求时会创建新连接 - if client, exists := clientPool[serviceName]; exists { - client.Close() - delete(clientPool, serviceName) - } + client.Close() + delete(clientPool, serviceName) + delete(lastHealthCheckTime, serviceName) // 更新缓存的新地址 serviceAddrCache[serviceName] = currentAddr } else { @@ -93,20 +104,68 @@ func checkAllConnections() { if !ok { serviceAddrCache[serviceName] = currentAddr } - g.Log().Debugf(ctx, "服务[%s]地址未变化,保持现有连接", serviceName) + g.Log().Debugf(ctx, "服务[%s]地址未变化,连接健康", serviceName) } lastHealthCheckTime[serviceName] = now } } -// isClientHealthy 检查client是否健康(简化版) -// 实际健康检查依赖调用失败时触发重建 -func isClientHealthy(client *rpcxClient.OneClient) bool { - // rpcx有内置的重连机制,我们信任client对象的有效性 - // 只要client不为nil就认为是健康的 - // 实际的错误会在调用时暴露,触发重新创建 - return client != nil +// isClientHealthy 检查client是否健康 +// 使用心跳检测方式:尝试调用服务的心跳方法 +func isClientHealthy(ctx context.Context, client *rpcxClient.OneClient, serviceName string) bool { + if client == nil { + return false + } + + // 设置较短的超时时间,避免阻塞 + pingCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + + // 尝试调用健康检查方法 + // 大多数服务都会提供 Ping 或 Health 方法 + // 如果服务没有提供这些方法,会返回错误,我们认为是健康的 + // 因为连接本身是正常的,只是方法不存在 + var reply interface{} + err := client.Call(pingCtx, serviceName, "Ping", nil, &reply) + + // 如果调用成功,连接肯定健康 + if err == nil { + return true + } + + // 如果是方法不存在的错误,说明连接是健康的,只是服务没有Ping方法 + // 这种情况下我们认为是健康的 + if isMethodNotFoundError(err) || isServiceNotFoundError(err) { + return true + } + + // 其他错误(网络错误、超时等)说明连接不健康 + g.Log().Warningf(ctx, "健康检查失败,服务[%s]连接可能不健康: %v", serviceName, err) + return false +} + +// isMethodNotFoundError 判断是否是方法未找到错误 +func isMethodNotFoundError(err error) bool { + if err == nil { + return false + } + errStr := err.Error() + // rpcx 方法不存在的常见错误信息 + return strings.Contains(errStr, "not found") || + strings.Contains(errStr, "no such") || + strings.Contains(errStr, "service not found") || + strings.Contains(errStr, "method not found") +} + +// isServiceNotFoundError 判断是否是服务未找到错误 +func isServiceNotFoundError(err error) bool { + if err == nil { + return false + } + errStr := err.Error() + return strings.Contains(errStr, "no service") || + strings.Contains(errStr, "service not registered") } // getOrCreateClient 从连接池获取或创建客户端(带连接池) @@ -121,7 +180,7 @@ func getOrCreateClient(ctx context.Context, serviceName string) (*rpcxClient.One poolMutex.RUnlock() // 如果存在且健康,直接返回 - if exists && isClientHealthy(client) { + if exists && isClientHealthy(ctx, client, serviceName) { g.Log().Debugf(ctx, "从连接池获取rpcx客户端[%s]", serviceName) return client, nil } @@ -131,7 +190,7 @@ func getOrCreateClient(ctx context.Context, serviceName string) (*rpcxClient.One defer poolMutex.Unlock() // 双重检查,防止并发时重复创建 - if client, exists := clientPool[serviceName]; exists && isClientHealthy(client) { + if client, exists := clientPool[serviceName]; exists && isClientHealthy(ctx, client, serviceName) { return client, nil }