diff --git a/middleware/circuit_breaker.go b/middleware/circuit_breaker.go index 4ca4f0e..913c581 100644 --- a/middleware/circuit_breaker.go +++ b/middleware/circuit_breaker.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "strconv" "strings" "sync" "sync/atomic" @@ -19,130 +20,201 @@ import ( type CircuitBreakerState string const ( - StateClosed CircuitBreakerState = "closed" // 关闭:正常状态 - StateOpen CircuitBreakerState = "open" // 开启:熔断状态 + StateClosed CircuitBreakerState = "closed" + StateOpen CircuitBreakerState = "open" + StateHalfOpen CircuitBreakerState = "halfopen" ) // 熔断器状态常量(用于atomic.Int64) const ( - stateClosed int64 = 0 - stateOpen int64 = 1 + stateClosed int64 = 0 + stateOpen int64 = 1 + stateHalfOpen int64 = 2 ) -// getState 获取熔断器状态字符串 -func (cb *CircuitBreakerInfo) getState() CircuitBreakerState { - if cb.State.Load() == stateOpen { - return StateOpen - } - return StateClosed -} - -// setState 设置熔断器状态(atomic操作,无锁) -func (cb *CircuitBreakerInfo) setState(state CircuitBreakerState) CircuitBreakerState { - var newState int64 - if state == StateOpen { - newState = stateOpen - } else { - newState = stateClosed - } - oldState := cb.State.Swap(newState) - if oldState == stateOpen { - return StateOpen - } - return StateClosed -} - // CircuitBreakerConfig 熔断器配置 type CircuitBreakerConfig struct { - Enabled bool // 是否启用熔断器 - MaxFailures int // 连续失败次数 - Timeout string // 熔断超时时间 - TimeoutParsed time.Duration // 缓存的超时时间(性能优化) - SuccessStatusCodes []int // 视为成功的HTTP状态码 - SlowRequestThreshold string // 慢请求阈值 - SlowRequestThresholdParsed time.Duration // 缓存的慢请求阈值(性能优化) - EnableSlidingWindow bool // 是否启用滑动窗口 - FailureRateThreshold float64 // 失败率阈值 - EnableFallback bool // 是否启用降级 - FallbackMessage string // 降级提示消息 - RequestTimeout int // 请求超时时间(毫秒) - DistributedTTL int // 分布式熔断状态TTL(秒) - AdminIPs []string // 允许重置熔断器的管理员IP列表 - StatIntervalMs int // 统计窗口时长(毫秒),默认1000ms - MinRequestAmount int // 最小请求数量,默认与MaxFailures相同 - AdminCIDRs []string // 允许重置熔断器的管理员CIDR列表(P0:支持IP段) - // P1:预编译的CIDR网络掩码(性能优化) - CIDRNetMasks []*net.IPNet + Enabled bool + MaxFailures int + Timeout string + TimeoutParsed time.Duration + SuccessStatusCodes []int + SlowRequestThreshold string + SlowRequestThresholdParsed time.Duration + EnableSlidingWindow bool + FailureRateThreshold float64 + EnableFallback bool + FallbackMessage string + RequestTimeout int + DistributedTTL int + AdminIPs []string + StatIntervalMs int + MinRequestAmount int + AdminCIDRs []string + HalfOpenMaxRequests int + HalfOpenSuccessThreshold float64 + WarmupDuration string + WarmupDurationParsed time.Duration + EnableAdaptiveThreshold bool + AdaptiveMinThreshold float64 + AdaptiveMaxThreshold float64 + CIDRNetMasks []*net.IPNet } // CircuitBreakerMetrics 熔断器指标 type CircuitBreakerMetrics struct { - TotalRequests atomic.Int64 // 总请求数 - PassRequests atomic.Int64 // 通过请求数 - BlockRequests atomic.Int64 // 阻塞请求数 - FailureRequests atomic.Int64 // 失败请求数 - SlowRequests atomic.Int64 // 慢请求数(P2:可观测性) - OpenCount atomic.Int64 // 熔断开启次数 - LastResetTime atomic.Int64 // 上次重置时间(Unix时间戳) - // 使用atomic.Int64实现简单的时间戳存储,避免使用mutex - LastOpenTime atomic.Int64 // 上次熔断时间(Unix时间戳) - NextRetryTime atomic.Int64 // 下次重试时间(Unix时间戳) + // 请求统计 + TotalRequests atomic.Int64 + PassRequests atomic.Int64 + BlockRequests atomic.Int64 + FailureRequests atomic.Int64 + SlowRequests atomic.Int64 + + // 状态统计 + OpenCount atomic.Int64 + ClosedCount atomic.Int64 + HalfOpenCount atomic.Int64 + + // 时间戳 + LastResetTime atomic.Int64 + LastOpenTime atomic.Int64 + NextRetryTime atomic.Int64 + LastCloseTime atomic.Int64 + LastHalfOpenTime atomic.Int64 + + // 半开状态统计 + HalfOpenRequests atomic.Int64 + HalfOpenPassed atomic.Int64 + HalfOpenFailed atomic.Int64 + + // 性能指标 + TotalResponseTime atomic.Int64 // 总响应时间(纳秒) + MinResponseTime atomic.Int64 // 最小响应时间(纳秒) + MaxResponseTime atomic.Int64 // 最大响应时间(纳秒) + + // 窗口统计(用于计算成功率等) + WindowStartTime atomic.Int64 // 统计窗口开始时间 + WindowRequests atomic.Int64 // 窗口内请求总数 + WindowFailures atomic.Int64 // 窗口内失败数 } // CircuitBreakerInfo 熔断器信息 type CircuitBreakerInfo struct { - ResourceName string `json:"resourceName"` // 资源名称 - State atomic.Int64 `json:"state"` // 当前状态(0:closed, 1:open),使用atomic避免mutex - Config *CircuitBreakerConfig `json:"config"` // 配置信息 - Metrics *CircuitBreakerMetrics `json:"metrics"` // 指标统计 - // 预编译的成功状态码集合(P1:性能优化) - SuccessCodeMap map[int]bool - // P1:预编译的CIDR网络掩码(避免重复解析) - CIDRNetMasks []*net.IPNet + ResourceName string + State atomic.Int64 + Config *CircuitBreakerConfig + Metrics *CircuitBreakerMetrics + SuccessCodeMap map[int]bool + CIDRNetMasks []*net.IPNet + AdaptiveThreshold float64 + WarmupEndTime int64 } var ( - // circuitBreakers 存储所有熔断器状态(用于健康检查) - circuitBreakers sync.Map - // circuitBreakerConfigs 熔断器配置缓存 - circuitBreakerConfigs sync.Map - // distributedSyncLocks 分布式同步锁(按服务名分片) - distributedSyncLocks sync.Map - // stateChangeListeners 状态变化监听器 - stateChangeListeners sync.Map - // stateChangeListenersRegistered 默认监听器是否已注册 + circuitBreakers sync.Map + circuitBreakerConfigs sync.Map + stateChangeListeners sync.Map stateChangeListenersRegistered sync.Map - // P1:使用map代替slice优化IP查找性能 - allowedAdminIPsMap map[string]bool - // allowedAdminIPsMutex 保护白名单缓存的并发访问 - allowedAdminIPsMutex sync.RWMutex - // P1:预编译的CIDR网络掩码列表 - allowedAdminCIDRs []*net.IPNet - // allowedAdminCIDRsMutex 保护CIDR列表的并发访问 - allowedAdminCIDRsMutex sync.RWMutex - // totalServicesCount 缓存总服务数(P1:性能优化) - totalServicesCount atomic.Int64 + allowedAdminIPsMap map[string]bool + allowedAdminIPsMutex sync.RWMutex + allowedAdminCIDRs []*net.IPNet + allowedAdminCIDRsMutex sync.RWMutex + totalServicesCount atomic.Int64 + serviceNamesSlice []string + serviceNamesMutex sync.RWMutex ) +// 默认值常量 +const ( + defaultMaxFailures = 5 + defaultTimeout = "60s" + defaultSlowRequestThreshold = "3s" + defaultStatIntervalMs = 1000 + defaultRequestTimeout = 30000 + defaultDistributedTTL = 300 + defaultHalfOpenMaxRequests = 5 + defaultWarmupDuration = "10s" + defaultHalfOpenSuccessThreshold = 0.5 +) + +// getState 获取熔断器状态 +func (cb *CircuitBreakerInfo) getState() CircuitBreakerState { + switch cb.State.Load() { + case stateOpen: + return StateOpen + case stateHalfOpen: + return StateHalfOpen + default: + return StateClosed + } +} + +// setState 设置熔断器状态 +func (cb *CircuitBreakerInfo) setState(state CircuitBreakerState) CircuitBreakerState { + return cb.setStateWithMetrics(state, true) +} + +// setStateWithMetrics 设置熔断器状态并更新指标 +func (cb *CircuitBreakerInfo) setStateWithMetrics(state CircuitBreakerState, updateMetrics bool) CircuitBreakerState { + var newState int64 + switch state { + case StateOpen: + newState = stateOpen + case StateHalfOpen: + newState = stateHalfOpen + default: + newState = stateClosed + } + + oldState := cb.State.Swap(newState) + var oldStateEnum CircuitBreakerState + + switch oldState { + case stateOpen: + oldStateEnum = StateOpen + case stateHalfOpen: + oldStateEnum = StateHalfOpen + default: + oldStateEnum = StateClosed + } + + // 如果状态发生了变化且需要更新指标 + if oldStateEnum != state && updateMetrics { + now := time.Now().Unix() + + // 根据新状态更新计数器 + switch state { + case StateOpen: + cb.Metrics.OpenCount.Add(1) + cb.Metrics.LastOpenTime.Store(now) + case StateClosed: + cb.Metrics.ClosedCount.Add(1) + cb.Metrics.LastCloseTime.Store(now) + case StateHalfOpen: + cb.Metrics.HalfOpenCount.Add(1) + cb.Metrics.LastHalfOpenTime.Store(now) + } + + // 设置下一次重试时间(如果是打开状态) + if state == StateOpen { + cb.Metrics.NextRetryTime.Store(time.Now().Add(cb.Config.TimeoutParsed).Unix()) + } + } + + return oldStateEnum +} + // InitCircuitBreaker 初始化Sentinel熔断器 func InitCircuitBreaker() error { ctx := context.Background() - - // 初始化Sentinel - err := api.InitDefault() - if err != nil { + if err := api.InitDefault(); err != nil { return fmt.Errorf("sentinel初始化失败: %v", err) } - // 注册熔断器状态变化监听器 registerStateChangeListeners() - g.Log().Infof(ctx, "Sentinel熔断器初始化成功") - // 扫描配置文件中所有配置了熔断器的服务 services := g.Cfg().MustGet(ctx, "circuitBreaker").Map() - - // 过滤掉非服务配置的key serviceNames := filterServiceNames(services) if len(serviceNames) == 0 { @@ -150,402 +222,423 @@ func InitCircuitBreaker() error { return nil } - // P1:缓存总服务数 totalServicesCount.Store(int64(len(serviceNames))) + serviceNamesMutex.Lock() + serviceNamesSlice = serviceNames + serviceNamesMutex.Unlock() - // 为每个服务创建熔断器 enabledCount := 0 for _, serviceName := range serviceNames { - serviceConfig := loadServiceCircuitBreakerConfig(serviceName) - if serviceConfig != nil && serviceConfig.Enabled { - circuitBreakerConfigs.Store(serviceName, serviceConfig) - initErr := initServiceCircuitBreaker(serviceName, serviceConfig) - if initErr != nil { - g.Log().Errorf(ctx, "服务 %s 熔断器初始化失败: %v", serviceName, initErr) + config := loadServiceCircuitBreakerConfig(serviceName) + if config != nil && config.Enabled { + circuitBreakerConfigs.Store(serviceName, config) + if err := initServiceCircuitBreaker(serviceName, config); err != nil { + g.Log().Errorf(ctx, "服务 %s 熔断器初始化失败: %v", serviceName, err) } else { g.Log().Infof(ctx, "服务 %s 熔断器初始化成功", serviceName) enabledCount++ } - } else { - g.Log().Infof(ctx, "服务 %s 熔断器未启用", serviceName) } } - // P1:更新管理员IP白名单缓存(在所有服务配置加载完成后) updateAdminIPsCache() - g.Log().Infof(ctx, "共初始化 %d 个服务熔断器,其中 %d 个已启用", len(serviceNames), enabledCount) return nil } // ReloadCircuitBreakerConfig 动态重新加载熔断器配置 func ReloadCircuitBreakerConfig(serviceName string) error { - ctx := context.Background() - - // 重新加载配置 - serviceConfig := loadServiceCircuitBreakerConfig(serviceName) - if serviceConfig == nil { + config := loadServiceCircuitBreakerConfig(serviceName) + if config == nil { return fmt.Errorf("未找到服务 %s 的配置", serviceName) } + if err := validateCircuitBreakerConfig(config); err != nil { + return fmt.Errorf("配置验证失败: %v", err) + } - // 更新配置缓存 - circuitBreakerConfigs.Store(serviceName, serviceConfig) + oldConfig, _ := circuitBreakerConfigs.Load(serviceName) + circuitBreakerConfigs.Store(serviceName, config) - // 重新初始化熔断器 - err := initServiceCircuitBreaker(serviceName, serviceConfig) - if err != nil { + if err := initServiceCircuitBreaker(serviceName, config); err != nil { + if oldConfig != nil { + circuitBreakerConfigs.Store(serviceName, oldConfig) + } return fmt.Errorf("重新初始化熔断器失败: %v", err) } - g.Log().Infof(ctx, "服务 %s 熔断器配置重新加载成功", serviceName) + g.Log().Infof(context.Background(), "服务 %s 熔断器配置重新加载成功", serviceName) return nil } -// loadServiceCircuitBreakerConfig 加载单个服务的熔断器配置 +// loadServiceCircuitBreakerConfig 加载配置 func loadServiceCircuitBreakerConfig(serviceName string) *CircuitBreakerConfig { ctx := context.Background() - key := fmt.Sprintf("circuitBreaker.%s", serviceName) + key := "circuitBreaker." + serviceName - enabled := g.Cfg().MustGet(ctx, key+".enabled", true).Bool() - maxFailures := g.Cfg().MustGet(ctx, key+".maxFailures", 5).Int() - timeout := g.Cfg().MustGet(ctx, key+".timeout", "60s").String() - slowRequestThreshold := g.Cfg().MustGet(ctx, key+".slowRequestThreshold", "3s").String() - enableSlidingWindow := g.Cfg().MustGet(ctx, key+".enableSlidingWindow", false).Bool() - failureRateThreshold := g.Cfg().MustGet(ctx, key+".failureRateThreshold", 0.5).Float64() - enableFallback := g.Cfg().MustGet(ctx, key+".enableFallback", false).Bool() - fallbackMessage := g.Cfg().MustGet(ctx, key+".fallbackMessage", "").String() - requestTimeout := g.Cfg().MustGet(ctx, key+".requestTimeout", 30000).Int() - distributedTTL := g.Cfg().MustGet(ctx, key+".distributedTTL", 300).Int() - adminIPs := g.Cfg().MustGet(ctx, key+".adminIPs", "").String() - adminCIDRs := g.Cfg().MustGet(ctx, key+".adminCIDRs", "").String() // P0:支持CIDR - statIntervalMs := g.Cfg().MustGet(ctx, key+".statIntervalMs", 1000).Int() - minRequestAmount := g.Cfg().MustGet(ctx, key+".minRequestAmount", 0).Int() + config := &CircuitBreakerConfig{ + Enabled: g.Cfg().MustGet(ctx, key+".enabled", true).Bool(), + MaxFailures: g.Cfg().MustGet(ctx, key+".maxFailures", defaultMaxFailures).Int(), + Timeout: g.Cfg().MustGet(ctx, key+".timeout", defaultTimeout).String(), + SlowRequestThreshold: g.Cfg().MustGet(ctx, key+".slowRequestThreshold", defaultSlowRequestThreshold).String(), + EnableSlidingWindow: g.Cfg().MustGet(ctx, key+".enableSlidingWindow", false).Bool(), + FailureRateThreshold: g.Cfg().MustGet(ctx, key+".failureRateThreshold", 0.5).Float64(), + EnableFallback: g.Cfg().MustGet(ctx, key+".enableFallback", false).Bool(), + FallbackMessage: g.Cfg().MustGet(ctx, key+".fallbackMessage", "").String(), + RequestTimeout: g.Cfg().MustGet(ctx, key+".requestTimeout", defaultRequestTimeout).Int(), + DistributedTTL: g.Cfg().MustGet(ctx, key+".distributedTTL", defaultDistributedTTL).Int(), + StatIntervalMs: g.Cfg().MustGet(ctx, key+".statIntervalMs", defaultStatIntervalMs).Int(), + HalfOpenMaxRequests: g.Cfg().MustGet(ctx, key+".halfOpenMaxRequests", defaultHalfOpenMaxRequests).Int(), + HalfOpenSuccessThreshold: g.Cfg().MustGet(ctx, key+".halfOpenSuccessThreshold", defaultHalfOpenSuccessThreshold).Float64(), + WarmupDuration: g.Cfg().MustGet(ctx, key+".warmupDuration", defaultWarmupDuration).String(), + EnableAdaptiveThreshold: g.Cfg().MustGet(ctx, key+".enableAdaptiveThreshold", false).Bool(), + AdaptiveMinThreshold: g.Cfg().MustGet(ctx, key+".adaptiveMinThreshold", 0.3).Float64(), + AdaptiveMaxThreshold: g.Cfg().MustGet(ctx, key+".adaptiveMaxThreshold", 0.7).Float64(), + } - // 解析成功状态码(使用map用于快速查找) + config.MinRequestAmount = g.Cfg().MustGet(ctx, key+".minRequestAmount", 0).Int() + if config.MinRequestAmount == 0 { + config.MinRequestAmount = config.MaxFailures + } + + // 解析时间 - 使用默认值处理解析错误 + timeoutParsed, err := time.ParseDuration(config.Timeout) + if err != nil { + g.Log().Warningf(ctx, "解析timeout失败: %s, 使用默认值 %s, error: %v", config.Timeout, defaultTimeout, err) + timeoutParsed, _ = time.ParseDuration(defaultTimeout) + config.Timeout = defaultTimeout + } + config.TimeoutParsed = timeoutParsed + + slowThresholdParsed, err := time.ParseDuration(config.SlowRequestThreshold) + if err != nil { + g.Log().Warningf(ctx, "解析slowRequestThreshold失败: %s, 使用默认值 %s, error: %v", config.SlowRequestThreshold, defaultSlowRequestThreshold, err) + slowThresholdParsed, _ = time.ParseDuration(defaultSlowRequestThreshold) + config.SlowRequestThreshold = defaultSlowRequestThreshold + } + config.SlowRequestThresholdParsed = slowThresholdParsed + + warmupParsed, err := time.ParseDuration(config.WarmupDuration) + if err != nil { + g.Log().Warningf(ctx, "解析warmupDuration失败: %s, 使用默认值 %s, error: %v", config.WarmupDuration, defaultWarmupDuration, err) + warmupParsed, _ = time.ParseDuration(defaultWarmupDuration) + config.WarmupDuration = defaultWarmupDuration + } + config.WarmupDurationParsed = warmupParsed + + // 解析状态码 successCodes := g.Cfg().MustGet(ctx, key+".successStatusCodes", "200,201,204").String() - statusCodes := parseStatusCodesSlice(successCodes) + config.SuccessStatusCodes = parseIntSlice(successCodes) - // 解析时间(缓存结果,性能优化) - timeoutParsed, err := time.ParseDuration(timeout) - if err != nil { - timeoutParsed = 60 * time.Second - g.Log().Warningf(ctx, "服务 %s 的 timeout 解析失败,使用默认值: %v", serviceName, err) - } + // 解析IP和CIDR + config.AdminIPs = parseStrings(g.Cfg().MustGet(ctx, key+".adminIPs", "").String()) + config.AdminCIDRs = parseStrings(g.Cfg().MustGet(ctx, key+".adminCIDRs", "").String()) + config.CIDRNetMasks, _ = parseCIDRs(config.AdminCIDRs) - slowRequestThresholdParsed, err := time.ParseDuration(slowRequestThreshold) - if err != nil { - slowRequestThresholdParsed = 3 * time.Second - g.Log().Warningf(ctx, "服务 %s 的 slowRequestThreshold 解析失败,使用默认值: %v", serviceName, err) - } - - // 解析管理员IP列表 - adminIPList := parseAdminIPs(adminIPs) - - // P1:预编译CIDR为net.IPNet(支持IPv4和IPv6) - cidrNets, err := parseAdminCIDRs(adminCIDRs) - if err != nil { - g.Log().Warningf(ctx, "服务 %s 的 adminCIDRs 解析失败: %v", serviceName, err) - cidrNets = nil - } - - // 如果minRequestAmount未配置,则使用maxFailures作为默认值 - if minRequestAmount == 0 { - minRequestAmount = maxFailures - } - - return &CircuitBreakerConfig{ - Enabled: enabled, - MaxFailures: maxFailures, - Timeout: timeout, - TimeoutParsed: timeoutParsed, - SuccessStatusCodes: statusCodes, - SlowRequestThreshold: slowRequestThreshold, - SlowRequestThresholdParsed: slowRequestThresholdParsed, - EnableSlidingWindow: enableSlidingWindow, - FailureRateThreshold: failureRateThreshold, - EnableFallback: enableFallback, - FallbackMessage: fallbackMessage, - RequestTimeout: requestTimeout, - DistributedTTL: distributedTTL, - AdminIPs: adminIPList, - CIDRNetMasks: cidrNets, // P1:预编译的CIDR网络掩码 - StatIntervalMs: statIntervalMs, - MinRequestAmount: minRequestAmount, - } + return config } -// parseStatusCodes 解析HTTP状态码(返回map用于快速查找) -func parseStatusCodes(str string) map[int]bool { +// parseIntSlice 解析整数切片 +func parseIntSlice(str string) []int { parts := strings.Split(str, ",") - codeMap := make(map[int]bool, len(parts)) + result := make([]int, 0, len(parts)) for _, part := range parts { - var code int - if _, err := fmt.Sscanf(strings.TrimSpace(part), "%d", &code); err == nil { - codeMap[code] = true + if val, err := strconv.Atoi(strings.TrimSpace(part)); err == nil { + result = append(result, val) } } - return codeMap + return result } -// parseStatusCodesSlice 解析HTTP状态码(返回切片用于配置) -func parseStatusCodesSlice(str string) []int { - parts := strings.Split(str, ",") - codes := make([]int, 0, len(parts)) - for _, part := range parts { - var code int - if _, err := fmt.Sscanf(strings.TrimSpace(part), "%d", &code); err == nil { - codes = append(codes, code) - } - } - return codes -} - -// parseAdminCIDRs 解析管理员CIDR列表(P1:预编译为net.IPNet,P0:支持IPv6) -func parseAdminCIDRs(str string) ([]*net.IPNet, error) { +// parseStrings 解析字符串切片 +func parseStrings(str string) []string { if str == "" { - return nil, nil + return nil } parts := strings.Split(str, ",") - nets := make([]*net.IPNet, 0, len(parts)) + result := make([]string, 0, len(parts)) for _, part := range parts { - cidr := strings.TrimSpace(part) - if cidr != "" { - // 使用net.ParseCIDR解析CIDR(支持IPv4和IPv6) - _, ipNet, err := net.ParseCIDR(cidr) - if err != nil { - return nil, fmt.Errorf("解析CIDR失败: %s, error: %v", cidr, err) + if trimmed := strings.TrimSpace(part); trimmed != "" { + result = append(result, trimmed) + } + } + return result +} + +// parseCIDRs 解析CIDR列表 +func parseCIDRs(strs []string) ([]*net.IPNet, error) { + nets := make([]*net.IPNet, 0, len(strs)) + for _, s := range strs { + if s == "*" { + if _, ipv4Net, err := net.ParseCIDR("0.0.0.0/0"); err == nil { + nets = append(nets, ipv4Net) } + if _, ipv6Net, err := net.ParseCIDR("::/0"); err == nil { + nets = append(nets, ipv6Net) + } + continue + } + if _, ipNet, err := net.ParseCIDR(s); err == nil { nets = append(nets, ipNet) } } return nets, nil } -// updateAdminIPsCache 更新管理员IP白名单缓存(P1:使用map优化性能,P1:预编译CIDR) -func updateAdminIPsCache() { - ipMap := make(map[string]bool) - cidrNets := make([]*net.IPNet, 0) - - // 收集所有服务的adminIPs配置 - circuitBreakerConfigs.Range(func(key, value interface{}) bool { - config := value.(*CircuitBreakerConfig) - if len(config.AdminIPs) > 0 { - for _, ip := range config.AdminIPs { - if !ipMap[ip] { - ipMap[ip] = true - } - } - } - // P1:使用预编译的CIDR网络掩码 - if len(config.CIDRNetMasks) > 0 { - for _, cidrNet := range config.CIDRNetMasks { - cidrNets = append(cidrNets, cidrNet) - } - } - return true - }) - - // 更新缓存 - allowedAdminIPsMutex.Lock() - allowedAdminIPsMap = ipMap - allowedAdminIPsMutex.Unlock() - - allowedAdminCIDRsMutex.Lock() - allowedAdminCIDRs = cidrNets - allowedAdminCIDRsMutex.Unlock() +// newCircuitBreakerMetrics 创建并初始化熔断器指标 +func newCircuitBreakerMetrics() *CircuitBreakerMetrics { + metrics := &CircuitBreakerMetrics{ + MinResponseTime: atomic.Int64{}, + MaxResponseTime: atomic.Int64{}, + } + metrics.MinResponseTime.Store(1<<63 - 1) // 最大int64值作为初始最小值 + return metrics } -// isIPInCIDR 检查IP是否在CIDR范围内(P1:使用预编译的net.IPNet,P0:支持IPv6) -func isIPInCIDR(ipStr string, cidrNet *net.IPNet) bool { - ip := net.ParseIP(ipStr) - if ip == nil { - return false - } - return cidrNet.Contains(ip) -} +// updateWindowStats 更新窗口统计信息 +func (cb *CircuitBreakerInfo) updateWindowStats(isSuccess bool, ctx context.Context) { + now := time.Now().Unix() + windowStart := cb.Metrics.WindowStartTime.Load() -// parseAdminIPs 解析管理员IP列表 -func parseAdminIPs(str string) []string { - if str == "" { - return nil + // 默认窗口大小为60秒 + windowSize := int64(60) + + // 如果超过窗口大小,重置统计 + if now-windowStart >= windowSize { + // 使用原子操作重置窗口 + if cb.Metrics.WindowStartTime.CompareAndSwap(windowStart, now) { + cb.Metrics.WindowRequests.Store(0) + cb.Metrics.WindowFailures.Store(0) + } + // 重新获取最新的windowStart + windowStart = cb.Metrics.WindowStartTime.Load() } - parts := strings.Split(str, ",") - ips := make([]string, 0, len(parts)) - for _, part := range parts { - ip := strings.TrimSpace(part) - if ip != "" { - ips = append(ips, ip) + + // 原子更新窗口内请求总数 + cb.Metrics.WindowRequests.Add(1) + if !isSuccess { + cb.Metrics.WindowFailures.Add(1) + } + + // 计算当前窗口内的成功率 + total := cb.Metrics.WindowRequests.Load() + failures := cb.Metrics.WindowFailures.Load() + if total > 0 { + successRate := float64(total-failures) / float64(total) + if successRate < 0.5 && total >= 10 { // 如果成功率低于50%且有足够样本 + g.Log().Warningf(ctx, "熔断器 %s 窗口内成功率较低: %.2f%%, total=%d, failures=%d", + cb.ResourceName, successRate*100, total, failures) } } - return ips } -// filterServiceNames 过滤服务名(排除非服务配置的key) -func filterServiceNames(services map[string]interface{}) []string { - excludeKeys := map[string]bool{ - "services": true, - "enableDistributed": true, - "requestTimeout": true, - "distributedTTL": true, +// validateCircuitBreakerConfig 验证配置 +func validateCircuitBreakerConfig(config *CircuitBreakerConfig) error { + if config.MaxFailures <= 0 { + return fmt.Errorf("maxFailures必须大于0") } - - serviceNames := make([]string, 0, len(services)) - for key := range services { - if !excludeKeys[key] { - serviceNames = append(serviceNames, key) + if config.FailureRateThreshold < 0 || config.FailureRateThreshold > 1 { + return fmt.Errorf("failureRateThreshold必须在0.0-1.0之间") + } + if len(config.SuccessStatusCodes) == 0 { + return fmt.Errorf("successStatusCodes不能为空") + } + if config.RequestTimeout < 0 || config.RequestTimeout > 300000 { + return fmt.Errorf("requestTimeout必须在0-300000毫秒之间") + } + if config.DistributedTTL < 0 || config.DistributedTTL > 3600 { + return fmt.Errorf("distributedTTL必须在0-3600秒之间") + } + if config.StatIntervalMs < 100 || config.StatIntervalMs > 60000 { + return fmt.Errorf("statIntervalMs必须在100-60000毫秒之间") + } + if config.MinRequestAmount < 1 || config.MinRequestAmount > 10000 { + return fmt.Errorf("minRequestAmount必须在1-10000之间") + } + if config.HalfOpenMaxRequests < 1 || config.HalfOpenMaxRequests > 100 { + return fmt.Errorf("halfOpenMaxRequests必须在1-100之间") + } + if config.HalfOpenSuccessThreshold < 0 || config.HalfOpenSuccessThreshold > 1 { + return fmt.Errorf("halfOpenSuccessThreshold必须在0.0-1.0之间") + } + if config.EnableAdaptiveThreshold { + if config.AdaptiveMinThreshold < 0 || config.AdaptiveMinThreshold > 1 { + return fmt.Errorf("adaptiveMinThreshold必须在0.0-1.0之间") + } + if config.AdaptiveMaxThreshold < 0 || config.AdaptiveMaxThreshold > 1 { + return fmt.Errorf("adaptiveMaxThreshold必须在0.0-1.0之间") + } + if config.AdaptiveMinThreshold >= config.AdaptiveMaxThreshold { + return fmt.Errorf("adaptiveMinThreshold必须小于adaptiveMaxThreshold") } } - return serviceNames + return nil } // initServiceCircuitBreaker 初始化服务熔断器 func initServiceCircuitBreaker(serviceName string, config *CircuitBreakerConfig) error { - // 验证配置参数 if err := validateCircuitBreakerConfig(config); err != nil { - return fmt.Errorf("配置验证失败: %v", err) + return err } - // 使用缓存的时间值(性能优化) - timeout := config.TimeoutParsed - slowRequestThreshold := config.SlowRequestThresholdParsed - - resourceName := fmt.Sprintf("service:%s", serviceName) + resourceName := "service:" + serviceName + threshold := config.FailureRateThreshold + if config.EnableAdaptiveThreshold { + threshold = (config.AdaptiveMinThreshold + config.AdaptiveMaxThreshold) / 2 + } var rule []*circuitbreaker.Rule if config.EnableSlidingWindow { - // 使用滑动窗口统计(更精确)- 慢调用比例策略 - rule = []*circuitbreaker.Rule{ - { - Resource: resourceName, - Strategy: circuitbreaker.SlowRequestRatio, - RetryTimeoutMs: uint32(timeout.Milliseconds()), - MinRequestAmount: uint64(config.MinRequestAmount), - StatIntervalMs: uint32(config.StatIntervalMs), - StatSlidingWindowBucketCount: 10, - MaxAllowedRtMs: uint64(slowRequestThreshold.Milliseconds()), - Threshold: config.FailureRateThreshold, - }, - } + rule = []*circuitbreaker.Rule{{ + Resource: resourceName, + Strategy: circuitbreaker.SlowRequestRatio, + RetryTimeoutMs: uint32(config.TimeoutParsed.Milliseconds()), + MinRequestAmount: uint64(config.MinRequestAmount), + StatIntervalMs: uint32(config.StatIntervalMs), + StatSlidingWindowBucketCount: 10, + MaxAllowedRtMs: uint64(config.SlowRequestThresholdParsed.Milliseconds()), + Threshold: threshold, + }} } else { - // 使用连续失败计数(更简单快速)- 异常数策略 - rule = []*circuitbreaker.Rule{ - { - Resource: resourceName, - Strategy: circuitbreaker.ErrorCount, - RetryTimeoutMs: uint32(timeout.Milliseconds()), - MinRequestAmount: uint64(config.MinRequestAmount), - StatIntervalMs: uint32(config.StatIntervalMs), - Threshold: float64(config.MaxFailures), - }, - } + rule = []*circuitbreaker.Rule{{ + Resource: resourceName, + Strategy: circuitbreaker.ErrorCount, + RetryTimeoutMs: uint32(config.TimeoutParsed.Milliseconds()), + MinRequestAmount: uint64(config.MinRequestAmount), + StatIntervalMs: uint32(config.StatIntervalMs), + Threshold: float64(config.MaxFailures), + }} } - // 先清理旧规则(健壮性改进) - _, _ = circuitbreaker.LoadRulesOfResource(resourceName, []*circuitbreaker.Rule{}) - - // 加载新规则到Sentinel - _, err := circuitbreaker.LoadRules(rule) - if err != nil { + if _, err := circuitbreaker.LoadRulesOfResource(resourceName, []*circuitbreaker.Rule{}); err != nil { + return fmt.Errorf("清空熔断规则失败: %v", err) + } + if _, err := circuitbreaker.LoadRules(rule); err != nil { return fmt.Errorf("加载熔断规则失败: %v", err) } - // 初始化熔断器信息(P1:直接从slice构建map,避免重复解析) successCodeMap := make(map[int]bool, len(config.SuccessStatusCodes)) for _, code := range config.SuccessStatusCodes { successCodeMap[code] = true } cbInfo := &CircuitBreakerInfo{ - ResourceName: resourceName, - Config: config, - Metrics: &CircuitBreakerMetrics{}, - SuccessCodeMap: successCodeMap, - CIDRNetMasks: config.CIDRNetMasks, + ResourceName: resourceName, + Config: config, + Metrics: newCircuitBreakerMetrics(), + SuccessCodeMap: successCodeMap, + CIDRNetMasks: config.CIDRNetMasks, + AdaptiveThreshold: threshold, + WarmupEndTime: time.Now().Add(config.WarmupDurationParsed).Unix(), } cbInfo.State.Store(stateClosed) cbInfo.Metrics.LastResetTime.Store(time.Now().Unix()) + cbInfo.Metrics.LastCloseTime.Store(time.Now().Unix()) + cbInfo.Metrics.WindowStartTime.Store(time.Now().Unix()) + + // 初始化响应时间统计 + cbInfo.Metrics.MinResponseTime.Store(1<<63 - 1) // 最大int64值作为初始最小值 circuitBreakers.Store(serviceName, cbInfo) strategy := "error_count" if config.EnableSlidingWindow { strategy = "slow_ratio" } - g.Log().Infof(context.Background(), "服务 %s 熔断器初始化成功: resource=%s, strategy=%s, timeout=%v, threshold=%.2f", - serviceName, resourceName, strategy, timeout, rule[0].Threshold) + serviceName, resourceName, strategy, config.TimeoutParsed, rule[0].Threshold) return nil } -// CircuitBreakerMiddleware 熔断降级中间件(使用阿里Sentinel) +// CircuitBreakerMiddleware 熔断降级中间件 func CircuitBreakerMiddleware(r *ghttp.Request) { startTime := time.Now() ctx := r.GetCtx() - // 从URL路径提取服务名并获取配置(P1:合并重复验证) serviceName := extractServiceName(r.URL.Path) if serviceName == "" { r.Middleware.Next() return } - // 获取熔断器信息(包含配置) cbInfoVal, ok := circuitBreakers.Load(serviceName) if !ok { - // 未配置熔断器,直接放行 r.Middleware.Next() return } - cbInfo := cbInfoVal.(*CircuitBreakerInfo) + cbInfo, ok := cbInfoVal.(*CircuitBreakerInfo) + if !ok { + r.Middleware.Next() + return + } config := cbInfo.Config if !config.Enabled { - // 熔断器未启用,直接放行 r.Middleware.Next() return } cbInfo.Metrics.TotalRequests.Add(1) - // 提前构造resourceName(性能优化) - resourceName := cbInfo.ResourceName - - // 设置请求超时(使用服务独立配置) - if config.RequestTimeout > 0 { - ctx, cancel := context.WithTimeout(ctx, time.Duration(config.RequestTimeout)*time.Millisecond) - r.SetCtx(ctx) - defer cancel() + // 预热期检查 + if time.Now().Unix() < cbInfo.WarmupEndTime { + r.Middleware.Next() + return } - // 检查是否启用分布式熔断 - if config.DistributedTTL > 0 { - if isCircuitBreakerOpenInDistributed(ctx, resourceName) { + resourceName := cbInfo.ResourceName + if config.RequestTimeout > 0 { + var ctxCancel context.CancelFunc + ctx, ctxCancel = context.WithTimeout(ctx, time.Duration(config.RequestTimeout)*time.Millisecond) + r.SetCtx(ctx) + defer ctxCancel() + } + + // 分布式熔断检查 + if config.DistributedTTL > 0 && isCircuitBreakerOpenInDistributed(ctx, resourceName) { + cbInfo.Metrics.BlockRequests.Add(1) + g.Log().Warningf(ctx, "分布式熔断触发: %s", resourceName) + sendFallbackResponse(r, serviceName, config, "distributed") + return + } + + // 半开状态处理 - 使用原子操作确保线程安全 + currentState := cbInfo.getState() + if currentState == StateHalfOpen { + // 使用原子操作安全地递增半开请求计数 + halfOpenRequests := cbInfo.Metrics.HalfOpenRequests.Add(1) + + // 如果超过最大半开请求数量,回滚并触发熔断 + if halfOpenRequests > int64(config.HalfOpenMaxRequests) { + // 原子递减回滚 + cbInfo.Metrics.HalfOpenRequests.Add(-1) cbInfo.Metrics.BlockRequests.Add(1) - g.Log().Warningf(ctx, "分布式熔断触发: %s", resourceName) - sendFallbackResponse(r, serviceName, config, "distributed") + + // 尝试转换为打开状态,如果成功则记录日志 + oldState := cbInfo.setState(StateOpen) + if oldState != StateOpen { + g.Log().Warningf(ctx, "半开状态试探请求超限,恢复熔断: %s", resourceName) + if config.DistributedTTL > 0 { + syncCircuitBreakerStateToDistributed(ctx, resourceName, "open", config.DistributedTTL) + } + } + + sendFallbackResponse(r, serviceName, config, "halfopen_limit") return } } - // 使用Sentinel进行熔断保护 entry, blockError := api.Entry(resourceName) if blockError != nil { - // 被熔断拦截 - cbInfo.Metrics.BlockRequests.Add(1) - cbInfo.Metrics.OpenCount.Add(1) - g.Log().Warningf(ctx, "熔断触发: %s, reason: %v", resourceName, blockError) - - // 使用atomic更新状态(无锁) - oldStateStr := cbInfo.setState(StateOpen) - now := time.Now() - cbInfo.Metrics.LastOpenTime.Store(now.Unix()) - cbInfo.Metrics.NextRetryTime.Store(now.Add(config.TimeoutParsed).Unix()) - - // 通知状态变化(如果状态改变) - if oldStateStr != StateOpen { - notifyStateChange(serviceName, oldStateStr, StateOpen) + if entry != nil { + entry.Exit() + } + + cbInfo.Metrics.BlockRequests.Add(1) + oldState := cbInfo.setStateWithMetrics(StateOpen, true) + + if oldState != StateOpen { + notifyStateChange(serviceName, oldState, StateOpen) } - // 同步到分布式存储 if config.DistributedTTL > 0 { syncCircuitBreakerStateToDistributed(ctx, resourceName, "open", config.DistributedTTL) } @@ -554,72 +647,149 @@ func CircuitBreakerMiddleware(r *ghttp.Request) { return } - // 执行后续中间件和业务逻辑 + if entry != nil { + defer entry.Exit() + } + r.Middleware.Next() - // 记录请求结果(基于HTTP状态码) statusCode := r.Response.Status + if statusCode < 100 || statusCode > 599 { + return + } duration := time.Since(startTime) - // 判断是否为慢请求(P2:可观测性) + // 记录响应时间统计 + durationNs := duration.Nanoseconds() + cbInfo.Metrics.TotalResponseTime.Add(durationNs) + + // 原子更新最小响应时间 + for { + currentMin := cbInfo.Metrics.MinResponseTime.Load() + if durationNs >= currentMin { + break + } + if cbInfo.Metrics.MinResponseTime.CompareAndSwap(currentMin, durationNs) { + break + } + } + + // 原子更新最大响应时间 + for { + currentMax := cbInfo.Metrics.MaxResponseTime.Load() + if durationNs <= currentMax { + break + } + if cbInfo.Metrics.MaxResponseTime.CompareAndSwap(currentMax, durationNs) { + break + } + } + if duration > config.SlowRequestThresholdParsed { cbInfo.Metrics.SlowRequests.Add(1) } - // 使用cbInfo.SuccessCodeMap判断状态码(性能优化) - if !isSuccessStatusCode(cbInfo, statusCode) { - // 记录异常 + isSuccess := isSuccessStatusCode(cbInfo, statusCode) + + // 更新窗口统计 + cbInfo.updateWindowStats(isSuccess, ctx) + + if !isSuccess { cbInfo.Metrics.FailureRequests.Add(1) - api.TraceError(entry, fmt.Errorf("request failed with status: %d", statusCode)) + if entry != nil { + api.TraceError(entry, fmt.Errorf("request failed with status: %d", statusCode)) + } g.Log().Debugf(ctx, "服务 %s 请求失败: status=%d, duration=%v", serviceName, statusCode, duration) + + // 重新获取当前状态,避免使用过期状态 + currentState := cbInfo.getState() + if currentState == StateHalfOpen { + cbInfo.Metrics.HalfOpenFailed.Add(1) + oldState := cbInfo.setStateWithMetrics(StateOpen, true) + if oldState == StateHalfOpen { + g.Log().Warningf(ctx, "半开状态请求失败,恢复熔断: %s", resourceName) + if config.DistributedTTL > 0 { + syncCircuitBreakerStateToDistributed(ctx, resourceName, "open", config.DistributedTTL) + } + } + } } else { cbInfo.Metrics.PassRequests.Add(1) - // 更新状态为关闭(如果之前是开启状态,使用atomic操作) - if cbInfo.getState() != StateClosed { - oldStateStr := cbInfo.setState(StateClosed) - if oldStateStr != StateClosed { - notifyStateChange(serviceName, oldStateStr, StateClosed) + + // 重新获取当前状态 + currentState := cbInfo.getState() + if currentState == StateHalfOpen { + // 原子递增成功计数 + halfOpenPassed := cbInfo.Metrics.HalfOpenPassed.Add(1) + totalRequests := cbInfo.Metrics.HalfOpenRequests.Load() + + // 计算成功率,确保分母不为零 + if totalRequests > 0 { + successRate := float64(halfOpenPassed) / float64(totalRequests) + + // 检查是否达到成功率阈值,如果达到则关闭熔断器 + if successRate >= config.HalfOpenSuccessThreshold { + // 原子设置状态为关闭,确保只有一个goroutine能成功转换 + oldState := cbInfo.setStateWithMetrics(StateClosed, true) + if oldState == StateHalfOpen { + // 重置半开统计 + cbInfo.Metrics.HalfOpenPassed.Store(0) + cbInfo.Metrics.HalfOpenRequests.Store(0) + cbInfo.Metrics.HalfOpenFailed.Store(0) + + g.Log().Infof(ctx, "半开状态成功,恢复关闭: %s, successRate=%.2f, total=%d, passed=%d", + resourceName, successRate, totalRequests, halfOpenPassed) + + // 同步分布式状态 + if config.DistributedTTL > 0 { + syncCircuitBreakerStateToDistributed(ctx, resourceName, "closed", config.DistributedTTL) + } + } + } + } + } else if currentState != StateClosed { + // 如果状态不是关闭但也不是半开,尝试重置为关闭状态 + oldState := cbInfo.setStateWithMetrics(StateClosed, true) + if oldState != StateClosed { + notifyStateChange(serviceName, oldState, StateClosed) } } } - - // 退出Sentinel资源 - entry.Exit() } -// sendFallbackResponse 发送降级响应(P0:添加日志记录) +// sendFallbackResponse 发送降级响应 func sendFallbackResponse(r *ghttp.Request, serviceName string, config *CircuitBreakerConfig, reason string) { - // P0:记录降级日志,便于问题排查 g.Log().Warningf(r.GetCtx(), "熔断器降级: service=%s, reason=%s, clientIP=%s", serviceName, reason, r.GetClientIp()) if config.EnableFallback && config.FallbackMessage != "" { - // 自定义降级消息 r.Response.WriteStatusExit(503, config.FallbackMessage) - } else { - // 根据原因返回不同的状态码和消息 - switch reason { - case "blocked": - r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 熔断保护中,请稍后再试", serviceName)) - case "distributed": - r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 分布式熔断中", serviceName)) - default: - r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 暂时不可用,请稍后再试", serviceName)) - } + return + } + + switch reason { + case "blocked": + r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 熔断保护中,请稍后再试", serviceName)) + case "distributed": + r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 分布式熔断中", serviceName)) + default: + r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 暂时不可用,请稍后再试", serviceName)) } } -// isSuccessStatusCode 判断HTTP状态码是否成功(使用cbInfo.SuccessCodeMap优化性能) +// isSuccessStatusCode 判断HTTP状态码是否成功 func isSuccessStatusCode(cbInfo *CircuitBreakerInfo, statusCode int) bool { - if cbInfo.SuccessCodeMap != nil && len(cbInfo.SuccessCodeMap) > 0 { + // 验证状态码范围 + if statusCode < 100 || statusCode > 599 { + return false + } + if len(cbInfo.SuccessCodeMap) > 0 { return cbInfo.SuccessCodeMap[statusCode] } - // 默认:2xx状态码为成功 return statusCode >= 200 && statusCode < 300 } -// extractServiceName 从URL路径提取服务名(P0:添加URL编码处理) +// extractServiceName 从URL路径提取服务名 func extractServiceName(path string) string { - // 去除首尾斜杠并分割 path = strings.Trim(path, "/") if path == "" { return "" @@ -630,55 +800,38 @@ func extractServiceName(path string) string { } serviceName := parts[0] - // P0:处理URL编码,将 %2F 等转义字符还原 - // 注意:在goframe的网关中间件中,路径通常已经被框架处理过 - // 但为了安全性,这里对包含%的情况进行简单处理 if strings.Contains(serviceName, "%") { - // 尝试解码URL编码的字符串 - // 使用path.Unescape而不是url.QueryUnescape,因为我们处理的是路径片段 - decoded, err := pathUnescape(serviceName) - if err == nil { + if decoded, err := pathUnescape(serviceName); err == nil { serviceName = decoded } - // 如果解码失败,继续使用原始serviceName } - // 验证服务名是否在已配置的熔断器中 if _, ok := circuitBreakerConfigs.Load(serviceName); ok { return serviceName } return "" } -// pathUnescape 路径片段的URL解码(P0:安全性改进) -// 注意:Go 1.8+ 可以使用 path.Unescape,这里提供兼容实现 +// pathUnescape 路径片段的URL解码 func pathUnescape(s string) (string, error) { - // 使用strings.Builder优化性能 var builder strings.Builder builder.Grow(len(s)) for i := 0; i < len(s); i++ { switch s[i] { case '%': - // 处理百分号编码 if i+2 >= len(s) { - // 不完整的编码,保留原样 builder.WriteByte(s[i]) continue } - // 解析十六进制数字 high := hexDigit(s[i+1]) low := hexDigit(s[i+2]) if high == 0xFF || low == 0xFF { - // 无效的十六进制,保留原样 builder.WriteByte(s[i]) - continue + } else { + builder.WriteByte((high << 4) | low) + i += 2 } - builder.WriteByte((high << 4) | low) - i += 2 - case '+': - // 路径片段中的+通常不需要解码为空格 - builder.WriteByte('+') default: builder.WriteByte(s[i]) } @@ -686,7 +839,6 @@ func pathUnescape(s string) (string, error) { return builder.String(), nil } -// hexDigit 将十六进制字符转换为对应的数值 func hexDigit(c byte) byte { switch { case '0' <= c && c <= '9': @@ -696,188 +848,199 @@ func hexDigit(c byte) byte { case 'A' <= c && c <= 'F': return c - 'A' + 10 default: - return 0xFF // 无效字符 + return 0xFF } } +// updateAdminIPsCache 更新管理员IP白名单缓存 +func updateAdminIPsCache() { + ipMap := make(map[string]bool) + cidrNets := make([]*net.IPNet, 0) + + circuitBreakerConfigs.Range(func(_, value interface{}) bool { + config, ok := value.(*CircuitBreakerConfig) + if !ok { + return true + } + for _, ip := range config.AdminIPs { + if !ipMap[ip] { + ipMap[ip] = true + } + } + cidrNets = append(cidrNets, config.CIDRNetMasks...) + return true + }) + + allowedAdminIPsMutex.Lock() + allowedAdminIPsMap = ipMap + allowedAdminIPsMutex.Unlock() + + allowedAdminCIDRsMutex.Lock() + allowedAdminCIDRs = cidrNets + allowedAdminCIDRsMutex.Unlock() +} + +// filterServiceNames 过滤服务名 +func filterServiceNames(services map[string]interface{}) []string { + excludeKeys := map[string]bool{"services": true, "enableDistributed": true, "requestTimeout": true, "distributedTTL": true} + result := make([]string, 0, len(services)) + for key := range services { + if !excludeKeys[key] { + result = append(result, key) + } + } + return result +} + // isCircuitBreakerOpenInDistributed 检查分布式熔断状态 func isCircuitBreakerOpenInDistributed(ctx context.Context, resourceName string) bool { - key := fmt.Sprintf("circuit_breaker:%s:state", resourceName) - + key := "circuit_breaker:" + resourceName + ":state" redis := g.Redis() if redis == nil { return false } - value, err := redis.Get(ctx, key) if err != nil || value.IsNil() { return false } - state := value.String() - return state == "open" + return value.String() == "open" } -// getDistributedLock 获取分布式锁(按服务名分片) -func getDistributedLock(serviceName string) *sync.Mutex { - lock, _ := distributedSyncLocks.LoadOrStore(serviceName, &sync.Mutex{}) - return lock.(*sync.Mutex) -} - -// syncCircuitBreakerStateToDistributed 同步熔断器状态到分布式存储 +// syncCircuitBreakerStateToDistributed 同步熔断器状态到Redis func syncCircuitBreakerStateToDistributed(ctx context.Context, resourceName, state string, ttl int) { - // 提取服务名用于锁分片 - serviceName := strings.TrimPrefix(resourceName, "service:") - lock := getDistributedLock(serviceName) - lock.Lock() - defer lock.Unlock() - - key := fmt.Sprintf("circuit_breaker:%s:state", resourceName) - + key := "circuit_breaker:" + resourceName + ":state" + lockKey := "circuit_breaker:" + resourceName + ":lock" redis := g.Redis() if redis == nil { - g.Log().Errorf(ctx, "Redis客户端未初始化,无法同步熔断状态") + g.Log().Warningf(ctx, "Redis未初始化,无法同步分布式熔断状态: %s", resourceName) return } - _, err := redis.Do(ctx, "SETEX", key, ttl, state) + lockValue := fmt.Sprintf("%d", time.Now().UnixNano()) + + // 获取分布式锁 + locked, err := redis.Do(ctx, "SET", lockKey, lockValue, "NX", "EX", 10) if err != nil { - g.Log().Errorf(ctx, "同步熔断状态到Redis失败: %v", err) + g.Log().Errorf(ctx, "获取分布式锁失败: %s, error: %v", lockKey, err) + return } -} -// validateCircuitBreakerConfig 验证熔断器配置 -func validateCircuitBreakerConfig(config *CircuitBreakerConfig) error { - if config.MaxFailures <= 0 { - return fmt.Errorf("maxFailures必须大于0,当前值: %d", config.MaxFailures) - } - if config.FailureRateThreshold < 0 || config.FailureRateThreshold > 1 { - return fmt.Errorf("failureRateThreshold必须在0.0-1.0之间,当前值: %.2f", config.FailureRateThreshold) - } - if len(config.SuccessStatusCodes) == 0 { - return fmt.Errorf("successStatusCodes不能为空") - } - if config.RequestTimeout < 0 || config.RequestTimeout > 300000 { - return fmt.Errorf("requestTimeout必须在0-300000毫秒之间,当前值: %d", config.RequestTimeout) - } - if config.DistributedTTL < 0 || config.DistributedTTL > 3600 { - return fmt.Errorf("distributedTTL必须在0-3600秒之间,当前值: %d", config.DistributedTTL) - } - if config.StatIntervalMs < 100 || config.StatIntervalMs > 60000 { - return fmt.Errorf("statIntervalMs必须在100-60000毫秒之间,当前值: %d", config.StatIntervalMs) - } - if config.MinRequestAmount < 1 || config.MinRequestAmount > 10000 { - return fmt.Errorf("minRequestAmount必须在1-10000之间,当前值: %d", config.MinRequestAmount) - } - // 验证时间字符串格式(如果缓存为空,说明解析失败) - if config.TimeoutParsed == 0 { - return fmt.Errorf("timeout格式错误,应为有效的时间字符串(如30s, 1m),当前值: %s", config.Timeout) - } - if config.SlowRequestThresholdParsed == 0 { - return fmt.Errorf("slowRequestThreshold格式错误,应为有效的时间字符串(如3s, 1m),当前值: %s", config.SlowRequestThreshold) - } + // 检查是否获取到锁 + var isLocked bool + if locked != nil && !locked.IsNil() { + isLocked = true + } else { + // 检查锁是否已过期 + currentLock, err := redis.Get(ctx, lockKey) + if err == nil && !currentLock.IsNil() { + lockTime, _ := strconv.ParseInt(currentLock.String(), 10, 64) + // 如果锁已经存在超过10秒(超时),强制获取 + if time.Now().UnixNano()-lockTime > 10*1e9 { + // 使用SETNX方式获取锁,使用Lua脚本保证原子性 + luaAcquire := ` +local current = redis.call("get", KEYS[1]) +if current and tonumber(current) then + local lockTime = tonumber(current) + if redis.call("TIME")[1] * 1000000000 + redis.call("TIME")[2] - lockTime > 10000000000 then + redis.call("del", KEYS[1]) + return redis.call("set", KEYS[1], ARGV[1], "EX", 10) + end return nil -} - -// registerStateChangeListeners 注册状态变化监听器 -func registerStateChangeListeners() { - // 检查是否已注册,防止重复注册(健壮性改进) - if _, exists := stateChangeListenersRegistered.LoadOrStore("default", true); exists { - return +else + return redis.call("set", KEYS[1], ARGV[1], "EX", 10) +end` + locked, err = redis.Do(ctx, "EVAL", luaAcquire, 1, lockKey, lockValue) + if err != nil { + g.Log().Errorf(ctx, "强制获取分布式锁失败: %s, error: %v", lockKey, err) + return + } + if locked != nil && !locked.IsNil() { + isLocked = true + } + } + } } - // 注册默认监听器(区分日志级别) - RegisterStateChangeListener("default", func(serviceName string, fromState, toState CircuitBreakerState) { - // Open状态使用Warning级别,Closed状态使用Info级别 - if toState == StateOpen { - g.Log().Warningf(context.Background(), "熔断器状态变化: service=%s, %s -> %s", - serviceName, fromState, toState) + if isLocked { + defer func() { + // 使用Lua脚本原子性地删除锁,只删除自己创建的锁 + luaScript := `if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end` + _, err := redis.Do(ctx, "EVAL", luaScript, 1, lockKey, lockValue) + if err != nil { + g.Log().Warningf(ctx, "释放分布式锁失败: %s, error: %v", lockKey, err) + } + }() + + // 设置状态 + _, err := redis.Do(ctx, "SETEX", key, ttl, state) + if err != nil { + g.Log().Errorf(ctx, "设置分布式熔断状态失败: %s=%s, error: %v", key, state, err) } else { - g.Log().Infof(context.Background(), "熔断器状态变化: service=%s, %s -> %s", - serviceName, fromState, toState) + g.Log().Debugf(ctx, "分布式熔断状态已同步: %s=%s (TTL: %d)", key, state, ttl) } - }) + } else { + g.Log().Debugf(ctx, "未获取到分布式锁,跳过状态同步: %s", lockKey) + } } -// StateChangeListener 状态变化监听器类型 -type StateChangeListener func(serviceName string, fromState, toState CircuitBreakerState) - -// RegisterStateChangeListener 注册状态变化监听器 -func RegisterStateChangeListener(name string, listener StateChangeListener) { - stateChangeListeners.Store(name, listener) -} - -// UnregisterStateChangeListener 取消注册状态变化监听器 -func UnregisterStateChangeListener(name string) { - stateChangeListeners.Delete(name) -} - -// notifyStateChange 通知所有监听器状态变化 -func notifyStateChange(serviceName string, fromState, toState CircuitBreakerState) { - stateChangeListeners.Range(func(key, value interface{}) bool { - if listener, ok := value.(StateChangeListener); ok { - listener(serviceName, fromState, toState) - } - return true - }) -} - -// CircuitBreakerHealthCheckHandler 熔断器健康检查接口(P0:添加IP白名单验证,P1:添加分页支持,P1:优化性能) +// CircuitBreakerHealthCheckHandler 健康检查接口 func CircuitBreakerHealthCheckHandler(r *ghttp.Request) { - // P0:权限验证 if !isAdminIP(r) { - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ - Code: 403, - Message: "权限不足,禁止访问", - }) + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 403, Message: "权限不足,禁止访问"}) return } - status := make(map[string]interface{}) - totalServices := 0 - openServices := 0 - - // P1:分页参数 page := r.Get("page").Int() size := r.Get("size").Int() if page < 0 { page = 0 } if size <= 0 || size > 100 { - size = 20 // 默认20条,最多100条 + size = 20 } - // P1:使用缓存的totalServicesCount避免每次遍历 - total := int(totalServicesCount.Load()) - start := page * size + serviceNamesMutex.RLock() + slice := serviceNamesSlice + serviceNamesMutex.RUnlock() + + total := len(slice) + start := page * size + if start >= total { + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 200, Message: "熔断器状态", + Data: map[string]interface{}{ + "summary": map[string]interface{}{"totalServices": 0, "openServices": 0, "closedServices": 0, "halfOpenServices": 0}, + "services": map[string]interface{}{}, "page": page, "size": size, "total": total}}) + return + } - // P1:只遍历分页范围内的服务(通过计数跳过) end := start + size if end > total { end = total } - current := 0 - circuitBreakers.Range(func(key, value interface{}) bool { - // 跳过前面的页 - if current < start { - current++ - return true - } - // 只处理当前页 - if current >= end { - return false - } - - serviceName := key.(string) - cbInfo := value.(*CircuitBreakerInfo) + status := make(map[string]interface{}) + totalServices := 0 + openServices := 0 + halfOpenServices := 0 + for i := start; i < end; i++ { + serviceName := slice[i] + cbInfoVal, ok := circuitBreakers.Load(serviceName) + if !ok { + continue + } + cbInfo, ok := cbInfoVal.(*CircuitBreakerInfo) + if !ok { + continue + } totalServices++ - isOpen := cbInfo.getState() == StateOpen - if isOpen { + state := cbInfo.getState() + if state == StateOpen { openServices++ + } else if state == StateHalfOpen { + halfOpenServices++ } - // 从Metrics中读取数据(使用atomic) lastResetTime := cbInfo.Metrics.LastResetTime.Load() var lastResetTimeStr string if lastResetTime > 0 { @@ -897,70 +1060,52 @@ func CircuitBreakerHealthCheckHandler(r *ghttp.Request) { } status[serviceName] = map[string]interface{}{ - "resource": cbInfo.ResourceName, - "state": string(cbInfo.getState()), - "lastOpenTime": lastOpenTimeStr, - "nextRetryTime": nextRetryTimeStr, - "totalRequests": cbInfo.Metrics.TotalRequests.Load(), - "passRequests": cbInfo.Metrics.PassRequests.Load(), - "blockRequests": cbInfo.Metrics.BlockRequests.Load(), - "failureRequests": cbInfo.Metrics.FailureRequests.Load(), - "slowRequests": cbInfo.Metrics.SlowRequests.Load(), - "openCount": cbInfo.Metrics.OpenCount.Load(), - "lastResetTime": lastResetTimeStr, + "resource": cbInfo.ResourceName, + "state": string(state), + "lastOpenTime": lastOpenTimeStr, + "nextRetryTime": nextRetryTimeStr, + "totalRequests": cbInfo.Metrics.TotalRequests.Load(), + "passRequests": cbInfo.Metrics.PassRequests.Load(), + "blockRequests": cbInfo.Metrics.BlockRequests.Load(), + "failureRequests": cbInfo.Metrics.FailureRequests.Load(), + "slowRequests": cbInfo.Metrics.SlowRequests.Load(), + "openCount": cbInfo.Metrics.OpenCount.Load(), + "lastResetTime": lastResetTimeStr, + "halfOpenRequests": cbInfo.Metrics.HalfOpenRequests.Load(), + "halfOpenPassed": cbInfo.Metrics.HalfOpenPassed.Load(), } - current++ - return true - }) - - summary := map[string]interface{}{ - "totalServices": totalServices, - "openServices": openServices, - "closedServices": totalServices - openServices, } - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ - Code: 200, - Message: "熔断器状态", + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 200, Message: "熔断器状态", Data: map[string]interface{}{ - "summary": summary, - "services": status, - "page": page, - "size": size, - "total": total, - }, - }) + "summary": map[string]interface{}{"totalServices": totalServices, "openServices": openServices, "closedServices": totalServices - openServices - halfOpenServices, "halfOpenServices": halfOpenServices}, + "services": status, "page": page, "size": size, "total": total}}) } -// isAdminIP 检查请求IP是否在管理员白名单中(P1:使用map优化性能,P0:支持IPv6 CIDR) +// isAdminIP 检查IP是否在白名单中 func isAdminIP(r *ghttp.Request) bool { clientIP := r.GetClientIp() if clientIP == "" { return false } - // 读取缓存的IP白名单(P1:使用map实现O(1)查找) allowedAdminIPsMutex.RLock() allowedIPs := allowedAdminIPsMap allowedAdminIPsMutex.RUnlock() - // 如果没有配置白名单,允许所有IP(向后兼容) if len(allowedIPs) == 0 { allowedAdminCIDRsMutex.RLock() hasCIDRs := len(allowedAdminCIDRs) > 0 allowedAdminCIDRsMutex.RUnlock() - // 如果也没有CIDR,则允许所有IP if !hasCIDRs { return true } } - // 精确IP匹配(P1:map查找O(1)) if allowedIPs[clientIP] { return true } - // P1:使用预编译的CIDR网络掩码匹配(支持IPv4和IPv6) allowedAdminCIDRsMutex.RLock() cidrNets := allowedAdminCIDRs allowedAdminCIDRsMutex.RUnlock() @@ -980,151 +1125,168 @@ func isAdminIP(r *ghttp.Request) bool { return false } -// CircuitBreakerResetHandler 熔断器手动重置接口(仅限管理后台调用) +// CircuitBreakerResetHandler 重置熔断器 func CircuitBreakerResetHandler(r *ghttp.Request) { serviceName := r.Get("service").String() - if serviceName == "" { - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ - Code: 400, - Message: "缺少service参数", - }) - return - } - // 权限验证:检查IP是否在白名单中 if !isAdminIP(r) { - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ - Code: 403, - Message: "权限不足,禁止访问", - }) + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 403, Message: "权限不足,禁止访问"}) return } - resourceName := fmt.Sprintf("service:%s", serviceName) - - // 获取当前服务的所有规则 - currentRules := circuitbreaker.GetRulesOfResource(resourceName) - - // 只删除当前服务的规则 - if len(currentRules) > 0 { - _, err := circuitbreaker.LoadRulesOfResource(resourceName, []*circuitbreaker.Rule{}) - if err != nil { - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ - Code: 500, - Message: fmt.Sprintf("重置熔断器失败: %v", err), - }) - return - } - } - - // 重新加载该服务的规则 - if val, ok := circuitBreakerConfigs.Load(serviceName); ok { - config := val.(*CircuitBreakerConfig) - err := initServiceCircuitBreaker(serviceName, config) - if err != nil { - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ - Code: 500, - Message: fmt.Sprintf("重置熔断器失败: %v", err), - }) - return - } - } - - // 更新内存状态并重置指标(使用atomic操作) - if val, ok := circuitBreakers.Load(serviceName); ok { - cbInfo := val.(*CircuitBreakerInfo) - cbInfo.State.Store(stateClosed) - cbInfo.Metrics.LastOpenTime.Store(0) - cbInfo.Metrics.NextRetryTime.Store(0) - // 重置指标 - cbInfo.Metrics.TotalRequests.Store(0) - cbInfo.Metrics.PassRequests.Store(0) - cbInfo.Metrics.BlockRequests.Store(0) - cbInfo.Metrics.FailureRequests.Store(0) - cbInfo.Metrics.SlowRequests.Store(0) - cbInfo.Metrics.OpenCount.Store(0) - cbInfo.Metrics.LastResetTime.Store(time.Now().Unix()) - } - - // 重置分布式状态(如果启用) - if val, ok := circuitBreakerConfigs.Load(serviceName); ok { - config := val.(*CircuitBreakerConfig) - if config.DistributedTTL > 0 { - key := fmt.Sprintf("circuit_breaker:%s:state", resourceName) - redis := g.Redis() - if redis != nil { - _, _ = redis.Del(r.GetCtx(), key) - } - } - } - - g.Log().Infof(r.GetCtx(), "熔断器已手动重置: %s", resourceName) - - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ - Code: 200, - Message: fmt.Sprintf("服务 '%s' 的熔断器已重置", serviceName), - }) -} - -// CircuitBreakerReloadHandler 熔断器配置重载接口 -func CircuitBreakerReloadHandler(r *ghttp.Request) { - serviceName := r.Get("service").String() - - // 权限验证:检查IP是否在白名单中(P0级别安全问题) - if !isAdminIP(r) { - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ - Code: 403, - Message: "权限不足,禁止访问", - }) - return - } - - if serviceName == "" { - // 重载所有服务 - 扫描配置文件中所有服务 - services := g.Cfg().MustGet(r.GetCtx(), "circuitBreaker").Map() - - // 过滤出服务名 - serviceNames := filterServiceNames(services) + if serviceName == "" || serviceName == "*" { + serviceNamesMutex.RLock() + slice := serviceNamesSlice + serviceNamesMutex.RUnlock() successCount := 0 failCount := 0 - for _, service := range serviceNames { - err := ReloadCircuitBreakerConfig(service) - if err != nil { - g.Log().Errorf(r.GetCtx(), "服务 %s 配置重载失败: %v", service, err) + for _, name := range slice { + if err := resetSingleService(r, name); err != nil { + g.Log().Errorf(r.GetCtx(), "服务 %s 熔断器重置失败: %v", name, err) failCount++ } else { successCount++ } } - // 更新管理员IP白名单缓存 + g.Log().Infof(r.GetCtx(), "批量重置熔断器完成: 成功 %d, 失败 %d", successCount, failCount) + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 200, Message: fmt.Sprintf("批量重置完成: 成功 %d, 失败 %d", successCount, failCount), + Data: map[string]interface{}{"success": successCount, "failed": failCount}}) + return + } + + if err := resetSingleService(r, serviceName); err != nil { + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 500, Message: fmt.Sprintf("重置熔断器失败: %v", err)}) + return + } + + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 200, Message: fmt.Sprintf("服务 '%s' 的熔断器已重置", serviceName)}) +} + +// resetSingleService 重置单个服务 +func resetSingleService(r *ghttp.Request, serviceName string) error { + resourceName := "service:" + serviceName + + if rules := circuitbreaker.GetRulesOfResource(resourceName); len(rules) > 0 { + if _, err := circuitbreaker.LoadRulesOfResource(resourceName, []*circuitbreaker.Rule{}); err != nil { + return err + } + } + + if configVal, ok := circuitBreakerConfigs.Load(serviceName); ok { + if err := initServiceCircuitBreaker(serviceName, configVal.(*CircuitBreakerConfig)); err != nil { + return err + } + } + + if cbInfoVal, ok := circuitBreakers.Load(serviceName); ok { + cbInfo := cbInfoVal.(*CircuitBreakerInfo) + cbInfo.State.Store(stateClosed) + cbInfo.Metrics.LastOpenTime.Store(0) + cbInfo.Metrics.NextRetryTime.Store(0) + cbInfo.Metrics.TotalRequests.Store(0) + cbInfo.Metrics.PassRequests.Store(0) + cbInfo.Metrics.BlockRequests.Store(0) + cbInfo.Metrics.FailureRequests.Store(0) + cbInfo.Metrics.SlowRequests.Store(0) + cbInfo.Metrics.OpenCount.Store(0) + cbInfo.Metrics.HalfOpenRequests.Store(0) + cbInfo.Metrics.HalfOpenPassed.Store(0) + cbInfo.WarmupEndTime = time.Now().Add(cbInfo.Config.WarmupDurationParsed).Unix() + cbInfo.Metrics.LastResetTime.Store(time.Now().Unix()) + } + + if configVal, ok := circuitBreakerConfigs.Load(serviceName); ok { + config, ok := configVal.(*CircuitBreakerConfig) + if ok && config.DistributedTTL > 0 { + redis := g.Redis() + if redis != nil { + if _, err := redis.Del(r.GetCtx(), "circuit_breaker:"+resourceName+":state"); err != nil { + g.Log().Warningf(r.GetCtx(), "清除分布式熔断状态失败: %s, error: %v", resourceName, err) + } + } + } + } + + g.Log().Infof(r.GetCtx(), "熔断器已手动重置: %s", resourceName) + return nil +} + +// CircuitBreakerReloadHandler 配置重载接口 +func CircuitBreakerReloadHandler(r *ghttp.Request) { + serviceName := r.Get("service").String() + + if !isAdminIP(r) { + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 403, Message: "权限不足,禁止访问"}) + return + } + + if serviceName == "" || serviceName == "*" { + serviceNamesMutex.RLock() + slice := serviceNamesSlice + serviceNamesMutex.RUnlock() + + successCount := 0 + failCount := 0 + failures := make(map[string]string) + + for _, service := range slice { + if err := ReloadCircuitBreakerConfig(service); err != nil { + g.Log().Errorf(r.GetCtx(), "服务 %s 配置重载失败: %v", service, err) + failCount++ + failures[service] = err.Error() + } else { + successCount++ + } + } + updateAdminIPsCache() - - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ - Code: 200, - Message: fmt.Sprintf("配置重载完成: 成功 %d, 失败 %d", successCount, failCount), - Data: map[string]interface{}{ - "success": successCount, - "failed": failCount, - }, - }) + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 200, Message: fmt.Sprintf("配置重载完成: 成功 %d, 失败 %d", successCount, failCount), + Data: map[string]interface{}{"success": successCount, "failed": failCount, "failures": failures}}) return } - // 重载单个服务 - err := ReloadCircuitBreakerConfig(serviceName) - if err != nil { - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ - Code: 500, - Message: fmt.Sprintf("重载失败: %v", err), - }) + if err := ReloadCircuitBreakerConfig(serviceName); err != nil { + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 500, Message: fmt.Sprintf("重载失败: %v", err)}) return } - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ - Code: 200, - Message: fmt.Sprintf("服务 '%s' 的熔断器配置已重载", serviceName), + updateAdminIPsCache() + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 200, Message: fmt.Sprintf("服务 '%s' 的熔断器配置已重载", serviceName)}) +} + +// StateChangeListener 状态变化监听器类型 +type StateChangeListener func(serviceName string, fromState, toState CircuitBreakerState) + +// RegisterStateChangeListener 注册监听器 +func RegisterStateChangeListener(name string, listener StateChangeListener) { + stateChangeListeners.Store(name, listener) +} + +// notifyStateChange 通知监听器 +func notifyStateChange(serviceName string, fromState, toState CircuitBreakerState) { + stateChangeListeners.Range(func(_, value interface{}) bool { + listener, ok := value.(StateChangeListener) + if ok { + listener(serviceName, fromState, toState) + } + return true + }) +} + +// registerStateChangeListeners 注册默认监听器 +func registerStateChangeListeners() { + if _, exists := stateChangeListenersRegistered.LoadOrStore("default", true); exists { + return + } + + RegisterStateChangeListener("default", func(serviceName string, fromState, toState CircuitBreakerState) { + level := "Info" + if toState == StateOpen { + level = "Warning" + } + g.Log().Print(context.Background(), level, fmt.Sprintf("熔断器状态变化: service=%s, %s -> %s", serviceName, fromState, toState)) }) }