This commit is contained in:
2026-04-01 13:38:57 +08:00

View File

@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"net" "net"
"sync" "sync"
"time"
"github.com/gogf/gf/contrib/registry/consul/v2" "github.com/gogf/gf/contrib/registry/consul/v2"
"github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/frame/g"
@@ -14,30 +15,96 @@ import (
"github.com/gogf/gf/v2/util/grand" "github.com/gogf/gf/v2/util/grand"
) )
var initOnce sync.Once var (
registry gsvc.Registry
consulAddr string
reconnectMutex sync.RWMutex
reconnectDone chan struct{}
)
// Init 初始化 Consul 注册中心(延迟初始化,首次调用时执行) // connectConsul 连接 Consul
func Init() { func connectConsul(ctx context.Context) error {
initOnce.Do(func() { reconnectMutex.Lock()
consulAddr := g.Cfg().MustGet(context.Background(), "consul.address").String() defer reconnectMutex.Unlock()
if consulAddr == "" {
g.Log().Warning(context.Background(), "⚠️ Consul 配置未找到,跳过初始化") var err error
registry, err = consul.New(consul.WithAddress(consulAddr))
if err != nil {
g.Log().Errorf(ctx, "❌ Consul 连接失败: %v", err)
return err
}
gsvc.SetRegistry(registry)
gsel.SetBuilder(gsel.NewBuilderRoundRobin())
g.Log().Infof(ctx, "✅ Consul 初始化成功: %s", consulAddr)
// 启动健康检查和自动重连
go startHealthCheckAndReconnect()
return nil
}
// startHealthCheckAndReconnect 启动健康检查和自动重连
func startHealthCheckAndReconnect() {
if reconnectDone != nil {
close(reconnectDone)
}
reconnectDone = make(chan struct{})
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
ctx := context.Background()
for {
select {
case <-ticker.C:
// 检查服务发现是否正常工作
if checkConsulHealth(ctx) {
continue
}
g.Log().Warning(ctx, "⚠️ Consul 连接异常,尝试重新连接...")
if err := connectConsul(ctx); err != nil {
g.Log().Errorf(ctx, "❌ Consul 重连失败: %v,5秒后重试...", err)
}
case <-reconnectDone:
g.Log().Info(ctx, "🛑 Consul 健康检查已停止")
return return
} }
registry, err := consul.New(consul.WithAddress(consulAddr)) }
if err != nil { }
g.Log().Errorf(context.Background(), "Consul 初始化失败: %v", err)
return // checkConsulHealth 检查 Consul 健康状态
} func checkConsulHealth(ctx context.Context) bool {
gsvc.SetRegistry(registry) reconnectMutex.RLock()
gsel.SetBuilder(gsel.NewBuilderRoundRobin()) defer reconnectMutex.RUnlock()
g.Log().Infof(context.Background(), "✅ Consul 初始化成功: %s", consulAddr)
}) if registry == nil {
return false
}
// 尝试获取服务列表来检测连接是否正常
services, err := registry.Search(ctx, gsvc.SearchInput{})
if err != nil {
g.Log().Debugf(ctx, "Consul 健康检查失败: %v", err)
return false
}
g.Log().Debugf(ctx, "✅ Consul 健康检查通过,发现 %d 个服务", len(services))
return true
} }
func init() { func init() {
// 默认自动初始化(保持向后兼容) consulAddr = g.Cfg().MustGet(context.Background(), "consul.address").String()
Init() if consulAddr == "" {
g.Log().Warning(context.Background(), "⚠️ Consul 配置未找到,跳过初始化")
return
}
if err := connectConsul(context.Background()); err != nil {
g.Log().Errorf(context.Background(), "❌ Consul 初始化失败: %v", err)
}
} }
func getLocalIP() (string, error) { func getLocalIP() (string, error) {
// 获取本机所有网络接口 // 获取本机所有网络接口
@@ -69,9 +136,14 @@ func getInstanceAddrByIp(ctx context.Context, ip string, services []gsvc.Service
} }
func GetInstanceAddr(ctx context.Context, name string) (addr string, err error) { func GetInstanceAddr(ctx context.Context, name string) (addr string, err error) {
watch, err := gsvc.GetRegistry().Watch(ctx, name) watch, err := gsvc.GetRegistry().Watch(ctx, name)
if err != nil {
err = errors.New("获取服务监听器失败")
return
}
service, err := watch.Proceed() service, err := watch.Proceed()
if err != nil || service == nil { if err != nil || service == nil {
err = errors.New("获取customerService服务实例失败") err = errors.New("获取服务实例失败")
return return
} }
//优先使用客户端IP获取实例(前后端在同一台机器调试) //优先使用客户端IP获取实例(前后端在同一台机器调试)