Files
common/consul/consul.go
2026-04-01 14:11:07 +08:00

175 lines
4.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package consul
import (
"context"
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/gogf/gf/contrib/registry/consul/v2"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gsel"
"github.com/gogf/gf/v2/net/gsvc"
"github.com/gogf/gf/v2/util/grand"
)
var (
registry gsvc.Registry
consulAddr string
reconnectMutex sync.RWMutex
reconnectDone chan struct{}
connected bool
)
// connectConsul 连接 Consul
func connectConsul(ctx context.Context) error {
reconnectMutex.Lock()
defer reconnectMutex.Unlock()
// 如果已经连接,不再重复连接
if connected && registry != nil {
return nil
}
var err error
registry, err = consul.New(consul.WithAddress(consulAddr))
if err != nil {
g.Log().Errorf(ctx, "❌ Consul 连接失败: %v", err)
return err
}
gsvc.SetRegistry(registry)
gsel.SetBuilder(gsel.NewBuilderRoundRobin())
connected = true
g.Log().Infof(ctx, "✅ Consul 初始化成功: %s", consulAddr)
// 启动健康检查和自动重连(只启动一次)
go startHealthCheckAndReconnect()
return nil
}
// startHealthCheckAndReconnect 启动健康检查和自动重连
func startHealthCheckAndReconnect() {
if reconnectDone != nil {
close(reconnectDone)
}
reconnectDone = make(chan struct{})
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
ctx := context.Background()
for {
select {
case <-ticker.C:
// 检查服务发现是否正常工作
if checkConsulHealth(ctx) {
continue
}
g.Log().Warning(ctx, "⚠️ Consul 连接异常,尝试重新连接...")
// 重置连接状态并重连
reconnectMutex.Lock()
connected = false
registry = nil
reconnectMutex.Unlock()
if err := connectConsul(ctx); err != nil {
g.Log().Errorf(ctx, "❌ Consul 重连失败: %v,5秒后重试...", err)
}
case <-reconnectDone:
g.Log().Info(ctx, "🛑 Consul 健康检查已停止")
return
}
}
}
// checkConsulHealth 检查 Consul 健康状态
func checkConsulHealth(ctx context.Context) bool {
reconnectMutex.RLock()
defer reconnectMutex.RUnlock()
if registry == nil || !connected {
return false
}
// 简单检查注册器是否可用避免频繁调用Search导致错误
return true
}
func init() {
consulAddr = g.Cfg().MustGet(context.Background(), "consul.address").String()
if consulAddr == "" {
g.Log().Warning(context.Background(), "⚠️ Consul 配置未找到,跳过初始化")
return
}
if err := connectConsul(context.Background()); err != nil {
g.Log().Errorf(context.Background(), "❌ Consul 初始化失败: %v", err)
}
}
func getLocalIP() (string, error) {
// 获取本机所有网络接口
addrs, err := net.InterfaceAddrs()
if err != nil {
return "", err
}
for _, addr := range addrs {
// 检查是否是IP地址
if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
if ipNet.IP.To4() != nil {
// 返回第一个非回环的IPv4地址
return ipNet.IP.String(), nil
}
}
}
return "", fmt.Errorf("无法找到本地IP地址")
}
func getInstanceAddrByIp(ctx context.Context, ip string, services []gsvc.Service) (addr string) {
for _, s := range services {
if s.GetEndpoints()[0].Host() == ip {
addr = s.GetEndpoints()[0].String()
return
}
}
return
}
func GetInstanceAddr(ctx context.Context, name string) (addr string, err error) {
watch, err := gsvc.GetRegistry().Watch(ctx, name)
if err != nil {
err = errors.New("获取服务监听器失败")
return
}
service, err := watch.Proceed()
if err != nil || service == nil {
err = errors.New("获取服务实例失败")
return
}
//优先使用客户端IP获取实例(前后端在同一台机器调试)
addr = getInstanceAddrByIp(ctx, g.RequestFromCtx(ctx).GetClientIp(), service)
if !g.IsEmpty(addr) {
return
}
//优先使用gateway同IP的服务实例(前后端不同机器调试)
addr, err = getLocalIP()
if err != nil {
return
}
addr = getInstanceAddrByIp(ctx, addr, service)
if !g.IsEmpty(addr) {
return
}
//随机获取一个服务实例
maxService := grand.N(0, len(service)-1)
addr = service[maxService].GetEndpoints()[0].String()
return
}