package consul import ( "context" "errors" "fmt" "net" "net/http" "sync" "time" "github.com/gogf/gf/contrib/registry/consul/v2" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/gsel" "github.com/gogf/gf/v2/net/gsvc" "github.com/gogf/gf/v2/os/gcfg" "github.com/gogf/gf/v2/util/grand" "github.com/hashicorp/consul/api" "github.com/r3labs/diff/v2" ) var ( registry gsvc.Registry consulAddr string reconnectMutex sync.RWMutex reconnectDone chan struct{} connected bool httpClient *http.Client ) // connectConsul 连接 Consul func connectConsul(ctx context.Context) error { reconnectMutex.Lock() defer reconnectMutex.Unlock() // 如果已经连接,不再重复连接 if connected && registry != nil { return nil } var err error registry, err = consul.New(consul.WithAddress(consulAddr)) if err != nil { g.Log().Errorf(ctx, "❌ Consul 连接失败: %v", err) return err } gsvc.SetRegistry(registry) gsel.SetBuilder(gsel.NewBuilderRoundRobin()) connected = true g.Log().Infof(ctx, "✅ Consul 初始化成功: %s", consulAddr) return nil } func init() { ctx := context.Background() consulAddr = g.Cfg().MustGet(ctx, "consul.address").String() if consulAddr == "" { g.Log().Debug(ctx, "📄 [Consul] 配置文件中未设置 consul.address,跳过 Consul 初始化") return } if err := connectConsul(ctx); err != nil { g.Log().Errorf(ctx, "❌ Consul 初始化失败: %v", err) } loadConfigFromConsul() } func loadConfigFromConsul() { ctx := context.Background() serviceName := g.Cfg().MustGet(ctx, "server.name", "admin-go").String() fmt.Printf("服务名称: %s\n", serviceName) consulKey := fmt.Sprintf("config/%s/%s", serviceName, serviceName) fmt.Printf("从 Consul 读取配置键: %s\n", consulKey) consulData, lastIndex, err := loadFromConsul(consulKey) if err == nil && len(consulData) > 0 { adapter, err := gcfg.NewAdapterContent() if err != nil { fmt.Printf("创建配置适配器失败: %v\n", err) return } adapter.SetContent(string(consulData)) g.Cfg().SetAdapter(adapter) fmt.Printf("已从 Consul 成功加载初始配置\n") go watchConsulConfig(consulKey, lastIndex) } else { fmt.Printf("从 Consul 获取配置失败,使用本地配置文件\n") } } func loadFromConsul(consulKey string) ([]byte, uint64, error) { ctx := context.Background() consulAddress := g.Cfg().MustGet(ctx, "consul.address", "127.0.0.1:8500").String() fmt.Printf("Consul 地址: %s\n", consulAddress) config := api.DefaultConfig() config.Address = consulAddress client, err := api.NewClient(config) if err != nil { fmt.Printf("创建 Consul 客户端失败: %v\n", err) return nil, 0, err } kv := client.KV() opts := &api.QueryOptions{ WaitIndex: 0, WaitTime: 60 * time.Second, } pair, meta, err := kv.Get(consulKey, opts) if err != nil { fmt.Printf("从 Consul 读取配置失败: %v\n", err) return nil, 0, err } if pair == nil { fmt.Printf("Consul 中未找到配置键: %s\n", consulKey) return nil, 0, nil } fmt.Printf("已从 Consul 加载配置, Index: %d\n", meta.LastIndex) return pair.Value, meta.LastIndex, nil } func watchConsulConfig(consulKey string, lastIndex uint64) { ctx := context.Background() consulAddress := g.Cfg().MustGet(ctx, "consul.address", "127.0.0.1:8500").String() config := api.DefaultConfig() config.Address = consulAddress client, err := api.NewClient(config) if err != nil { fmt.Printf("创建 Consul 监听客户端失败: %v\n", err) return } for { opts := &api.QueryOptions{ WaitIndex: lastIndex, WaitTime: 60 * time.Second, } pair, meta, err := client.KV().Get(consulKey, opts) if err != nil { fmt.Printf("Consul 监听出错: %v, 5秒后重试...\n", err) time.Sleep(5 * time.Second) continue } if meta.LastIndex == lastIndex { continue } if pair == nil { fmt.Printf("Consul 配置被删除: %s\n", consulKey) lastIndex = meta.LastIndex continue } fmt.Printf("检测到 Consul 配置变更: %s, New Index: %d\n", consulKey, meta.LastIndex) updateLocalConfig(pair.Value) lastIndex = meta.LastIndex } } func updateLocalConfig(content []byte) { ctx := context.Background() oldConfig, _ := g.Cfg().Data(ctx) adapter, err := gcfg.NewAdapterContent() if err != nil { fmt.Printf("创建新配置适配器失败: %v\n", err) return } adapter.SetContent(string(content)) g.Cfg().SetAdapter(adapter) newConfig, _ := g.Cfg().Data(ctx) changelog, err := diff.Diff(oldConfig, newConfig) if err != nil { fmt.Printf("配置对比失败: %v\n", err) return } if len(changelog) == 0 { fmt.Printf("配置已热更新成功(内容无实质变化)\n") return } fmt.Printf("=== 检测到配置变更 (%d 项) ===\n", len(changelog)) for _, change := range changelog { switch change.Type { case "create": fmt.Printf("[+] 新增: %s = %v\n", change.Path, change.To) case "update": fmt.Printf("[~] 修改: %s: %v -> %v\n", change.Path, change.From, change.To) case "delete": fmt.Printf("[-] 删除: %s (原值: %v)\n", change.Path, change.From) } } fmt.Printf("=================================\n") } func getLocalIP() (string, error) { // 获取本机所有网络接口 addrs, err := net.InterfaceAddrs() if err != nil { return "", err } for _, addr := range addrs { // 检查是否是IP地址 if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() { if ipNet.IP.To4() != nil { // 返回第一个非回环的IPv4地址 return ipNet.IP.String(), nil } } } return "", fmt.Errorf("无法找到本地IP地址") } func getInstanceAddrByIp(ctx context.Context, ip string, services []gsvc.Service) (addr string) { for _, s := range services { if s.GetEndpoints()[0].Host() == ip { addr = s.GetEndpoints()[0].String() return } } return } func GetInstanceAddr(ctx context.Context, name string) (addr string, err error) { watch, err := gsvc.GetRegistry().Watch(ctx, name) if err != nil { err = errors.New("获取服务监听器失败") return } defer watch.Close() service, err := watch.Proceed() if err != nil || service == nil { err = errors.New("获取服务实例失败") return } //优先使用客户端IP获取实例(前后端在同一台机器调试) addr = getInstanceAddrByIp(ctx, g.RequestFromCtx(ctx).GetClientIp(), service) if !g.IsEmpty(addr) { return } //优先使用gateway同IP的服务实例(前后端不同机器调试) addr, err = getLocalIP() if err != nil { return } addr = getInstanceAddrByIp(ctx, addr, service) if !g.IsEmpty(addr) { return } //随机获取一个服务实例 maxService := grand.N(0, len(service)-1) addr = service[maxService].GetEndpoints()[0].String() return }