package consul import ( "context" "errors" "fmt" "net" "net/http" "sync" "time" "github.com/gogf/gf/contrib/registry/consul/v2" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/gsel" "github.com/gogf/gf/v2/net/gsvc" "github.com/gogf/gf/v2/util/grand" ) var ( registry gsvc.Registry consulAddr string reconnectMutex sync.RWMutex reconnectDone chan struct{} connected bool httpClient *http.Client ) // connectConsul 连接 Consul func connectConsul(ctx context.Context) error { reconnectMutex.Lock() defer reconnectMutex.Unlock() // 如果已经连接,不再重复连接 if connected && registry != nil { return nil } var err error registry, err = consul.New(consul.WithAddress(consulAddr)) if err != nil { g.Log().Errorf(ctx, "❌ Consul 连接失败: %v", err) return err } gsvc.SetRegistry(registry) gsel.SetBuilder(gsel.NewBuilderRoundRobin()) connected = true g.Log().Infof(ctx, "✅ Consul 初始化成功: %s", consulAddr) return nil } // startHealthCheckAndReconnect 启动健康检查和自动重连 func startHealthCheckAndReconnect() { if reconnectDone != nil { close(reconnectDone) } reconnectDone = make(chan struct{}) ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() ctx := context.Background() // 初始化HTTP客户端用于健康检查 httpClient = &http.Client{ Timeout: 5 * time.Second, } for { select { case <-ticker.C: // 检查服务发现是否正常工作 if checkConsulHealth(ctx) { continue } g.Log().Warning(ctx, "⚠️ Consul 连接异常,尝试重新连接...") // 重置连接状态并重连 reconnectMutex.Lock() connected = false registry = nil reconnectMutex.Unlock() if err := connectConsul(ctx); err != nil { g.Log().Errorf(ctx, "❌ Consul 重连失败: %v,30秒后重试...", err) } case <-reconnectDone: g.Log().Info(ctx, "🛑 Consul 健康检查已停止") return } } } // checkConsulHealth 检查 Consul 健康状态 func checkConsulHealth(ctx context.Context) bool { reconnectMutex.RLock() defer reconnectMutex.RUnlock() if registry == nil || !connected { return false } // 使用consul原生API进行健康检查 // 调用 /v1/agent/self 接口检测连接状态 url := fmt.Sprintf("http://%s/v1/agent/self", consulAddr) resp, err := httpClient.Get(url) if err != nil { g.Log().Debugf(ctx, "Consul 健康检查失败: %v", err) return false } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { g.Log().Debugf(ctx, "Consul 健康检查失败,状态码: %d", resp.StatusCode) return false } //g.Log().Debugf(ctx, "✅ Consul 健康检查通过") return true } func init() { consulAddr = g.Cfg().MustGet(context.Background(), "consul.address").String() if consulAddr == "" { g.Log().Warning(context.Background(), "⚠️ Consul 配置未找到,跳过初始化") return } if err := connectConsul(context.Background()); err != nil { g.Log().Errorf(context.Background(), "❌ Consul 初始化失败: %v", err) } else { // 连接成功后启动健康检查和自动重连 go startHealthCheckAndReconnect() } } func getLocalIP() (string, error) { // 获取本机所有网络接口 addrs, err := net.InterfaceAddrs() if err != nil { return "", err } for _, addr := range addrs { // 检查是否是IP地址 if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() { if ipNet.IP.To4() != nil { // 返回第一个非回环的IPv4地址 return ipNet.IP.String(), nil } } } return "", fmt.Errorf("无法找到本地IP地址") } func getInstanceAddrByIp(ctx context.Context, ip string, services []gsvc.Service) (addr string) { for _, s := range services { if s.GetEndpoints()[0].Host() == ip { addr = s.GetEndpoints()[0].String() return } } return } func GetInstanceAddr(ctx context.Context, name string) (addr string, err error) { watch, err := gsvc.GetRegistry().Watch(ctx, name) if err != nil { err = errors.New("获取服务监听器失败") return } service, err := watch.Proceed() if err != nil || service == nil { err = errors.New("获取服务实例失败") return } //优先使用客户端IP获取实例(前后端在同一台机器调试) addr = getInstanceAddrByIp(ctx, g.RequestFromCtx(ctx).GetClientIp(), service) if !g.IsEmpty(addr) { return } //优先使用gateway同IP的服务实例(前后端不同机器调试) addr, err = getLocalIP() if err != nil { return } addr = getInstanceAddrByIp(ctx, addr, service) if !g.IsEmpty(addr) { return } //随机获取一个服务实例 maxService := grand.N(0, len(service)-1) addr = service[maxService].GetEndpoints()[0].String() return }