From 122e52a03df62562887ebcbdaab8f59cc9b16950 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=96=8C?= <259278618@qq.com> Date: Mon, 5 Jan 2026 15:59:51 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=99=90=E6=B5=81=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- middleware/circuit_breaker.go | 662 +++++++++++++--------------------- 1 file changed, 251 insertions(+), 411 deletions(-) diff --git a/middleware/circuit_breaker.go b/middleware/circuit_breaker.go index 71b3c2c..43378af 100644 --- a/middleware/circuit_breaker.go +++ b/middleware/circuit_breaker.go @@ -3,7 +3,6 @@ package middleware import ( "context" "fmt" - "net" "strconv" "strings" "sync" @@ -48,10 +47,8 @@ type CircuitBreakerConfig struct { FallbackMessage string RequestTimeout int DistributedTTL int - AdminIPs []string StatIntervalMs int MinRequestAmount int - AdminCIDRs []string HalfOpenMaxRequests int HalfOpenSuccessThreshold float64 WarmupDuration string @@ -59,7 +56,6 @@ type CircuitBreakerConfig struct { EnableAdaptiveThreshold bool AdaptiveMinThreshold float64 AdaptiveMaxThreshold float64 - CIDRNetMasks []*net.IPNet } // CircuitBreakerMetrics 熔断器指标 @@ -226,23 +222,14 @@ type CircuitBreakerInfo struct { Config *CircuitBreakerConfig Metrics *CircuitBreakerMetrics SuccessCodeMap map[int]bool - CIDRNetMasks []*net.IPNet AdaptiveThreshold float64 WarmupEndTime int64 } var ( circuitBreakers sync.Map - circuitBreakerConfigs sync.Map stateChangeListeners sync.Map stateChangeListenersRegistered sync.Map - allowedAdminIPsMap map[string]bool - allowedAdminIPsMutex sync.RWMutex - allowedAdminCIDRs []*net.IPNet - allowedAdminCIDRsMutex sync.RWMutex - totalServicesCount atomic.Int64 - serviceNamesSlice []string - serviceNamesMutex sync.RWMutex ) // 默认值常量 @@ -341,19 +328,29 @@ func (cb *CircuitBreakerInfo) updateStateMetrics(state CircuitBreakerState) { } } -// getCircuitBreakerInfoAndConfig 获取熔断器信息和配置 -func getCircuitBreakerInfoAndConfig(serviceName string) (*CircuitBreakerInfo, *CircuitBreakerConfig) { - cbInfoVal, ok := circuitBreakers.Load(serviceName) - if !ok { - return nil, nil +// getCircuitBreakerInfoByResource 根据资源名获取熔断器信息 +// 支持精确匹配和前缀匹配 +func getCircuitBreakerInfoByResource(resourceName string) (*CircuitBreakerInfo, *CircuitBreakerConfig) { + // 先尝试精确匹配 + if cbInfoVal, ok := circuitBreakers.Load(resourceName); ok { + cbInfo, ok := cbInfoVal.(*CircuitBreakerInfo) + if ok { + return cbInfo, cbInfo.Config + } } - cbInfo, ok := cbInfoVal.(*CircuitBreakerInfo) - if !ok { - return nil, nil + // 尝试前缀匹配:去掉查询参数部分 + if idx := strings.Index(resourceName, "?"); idx > 0 { + prefix := resourceName[:idx] + if cbInfoVal, ok := circuitBreakers.Load(prefix); ok { + cbInfo, ok := cbInfoVal.(*CircuitBreakerInfo) + if ok { + return cbInfo, cbInfo.Config + } + } } - return cbInfo, cbInfo.Config + return nil, nil } // updateResponseTimeStats 更新响应时间统计 @@ -388,88 +385,63 @@ func InitCircuitBreaker() error { registerStateChangeListeners() g.Log().Infof(ctx, "Sentinel熔断器初始化成功") - services := g.Cfg().MustGet(ctx, "circuitBreaker").Map() - serviceNames := filterServiceNames(services) - - if len(serviceNames) == 0 { - g.Log().Infof(ctx, "未配置任何服务熔断器") + // 加载接口级别的熔断器配置 + configs := g.Cfg().MustGet(ctx, "circuitBreaker.interfaces").Map() + if len(configs) == 0 { + g.Log().Infof(ctx, "未配置任何接口熔断器") return nil } - totalServicesCount.Store(int64(len(serviceNames))) - serviceNamesMutex.Lock() - serviceNamesSlice = serviceNames - serviceNamesMutex.Unlock() - enabledCount := 0 - for _, serviceName := range serviceNames { - config := loadServiceCircuitBreakerConfig(serviceName) + for resourcePattern, configData := range configs { + config, err := loadInterfaceCircuitBreakerConfig(ctx, resourcePattern, configData) + if err != nil { + g.Log().Errorf(ctx, "加载接口 %s 熔断器配置失败: %v", resourcePattern, err) + continue + } if config != nil && config.Enabled { - circuitBreakerConfigs.Store(serviceName, config) - if err := initServiceCircuitBreaker(serviceName, config); err != nil { - g.Log().Errorf(ctx, "服务 %s 熔断器初始化失败: %v", serviceName, err) + if err := initInterfaceCircuitBreaker(resourcePattern, config); err != nil { + g.Log().Errorf(ctx, "接口 %s 熔断器初始化失败: %v", resourcePattern, err) } else { - g.Log().Infof(ctx, "服务 %s 熔断器初始化成功", serviceName) + g.Log().Infof(ctx, "接口 %s 熔断器初始化成功", resourcePattern) enabledCount++ } } } - updateAdminIPsCache() - g.Log().Infof(ctx, "共初始化 %d 个服务熔断器,其中 %d 个已启用", len(serviceNames), enabledCount) + g.Log().Infof(ctx, "共初始化 %d 个接口熔断器,其中 %d 个已启用", len(configs), enabledCount) return nil } // ReloadCircuitBreakerConfig 动态重新加载熔断器配置 -func ReloadCircuitBreakerConfig(serviceName string) error { - config := loadServiceCircuitBreakerConfig(serviceName) - if config == nil { - return fmt.Errorf("未找到服务 %s 的配置", serviceName) +// loadInterfaceCircuitBreakerConfig 加载接口级别的熔断器配置 +func loadInterfaceCircuitBreakerConfig(ctx context.Context, resourcePattern string, configData interface{}) (*CircuitBreakerConfig, error) { + configMap, ok := configData.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("接口 %s 配置格式错误: %v", resourcePattern, configData) } - if err := validateCircuitBreakerConfig(config); err != nil { - return fmt.Errorf("配置验证失败: %v", err) - } - - oldConfig, _ := circuitBreakerConfigs.Load(serviceName) - circuitBreakerConfigs.Store(serviceName, config) - - if err := initServiceCircuitBreaker(serviceName, config); err != nil { - if oldConfig != nil { - circuitBreakerConfigs.Store(serviceName, oldConfig) - } - return fmt.Errorf("重新初始化熔断器失败: %v", err) - } - - g.Log().Infof(context.Background(), "服务 %s 熔断器配置重新加载成功", serviceName) - return nil -} - -// loadServiceCircuitBreakerConfig 加载配置 -func loadServiceCircuitBreakerConfig(serviceName string) *CircuitBreakerConfig { - ctx := context.Background() - key := "circuitBreaker." + serviceName config := &CircuitBreakerConfig{ - Enabled: g.Cfg().MustGet(ctx, key+".enabled", true).Bool(), - MaxFailures: g.Cfg().MustGet(ctx, key+".maxFailures", defaultMaxFailures).Int(), - Timeout: g.Cfg().MustGet(ctx, key+".timeout", defaultTimeout).String(), - SlowRequestThreshold: g.Cfg().MustGet(ctx, key+".slowRequestThreshold", defaultSlowRequestThreshold).String(), - EnableSlidingWindow: g.Cfg().MustGet(ctx, key+".enableSlidingWindow", false).Bool(), - FailureRateThreshold: g.Cfg().MustGet(ctx, key+".failureRateThreshold", 0.5).Float64(), - EnableFallback: g.Cfg().MustGet(ctx, key+".enableFallback", false).Bool(), - FallbackMessage: g.Cfg().MustGet(ctx, key+".fallbackMessage", "").String(), - RequestTimeout: g.Cfg().MustGet(ctx, key+".requestTimeout", defaultRequestTimeout).Int(), - DistributedTTL: g.Cfg().MustGet(ctx, key+".distributedTTL", defaultDistributedTTL).Int(), - StatIntervalMs: g.Cfg().MustGet(ctx, key+".statIntervalMs", defaultStatIntervalMs).Int(), - HalfOpenMaxRequests: g.Cfg().MustGet(ctx, key+".halfOpenMaxRequests", defaultHalfOpenMaxRequests).Int(), - HalfOpenSuccessThreshold: g.Cfg().MustGet(ctx, key+".halfOpenSuccessThreshold", defaultHalfOpenSuccessThreshold).Float64(), - WarmupDuration: g.Cfg().MustGet(ctx, key+".warmupDuration", defaultWarmupDuration).String(), - EnableAdaptiveThreshold: g.Cfg().MustGet(ctx, key+".enableAdaptiveThreshold", false).Bool(), - AdaptiveMinThreshold: g.Cfg().MustGet(ctx, key+".adaptiveMinThreshold", 0.3).Float64(), - AdaptiveMaxThreshold: g.Cfg().MustGet(ctx, key+".adaptiveMaxThreshold", 0.7).Float64(), + Enabled: getBoolFromMap(configMap, "enabled", true), + MaxFailures: getIntFromMap(configMap, "maxFailures", defaultMaxFailures), + Timeout: getStringFromMap(configMap, "timeout", defaultTimeout), + SlowRequestThreshold: getStringFromMap(configMap, "slowRequestThreshold", defaultSlowRequestThreshold), + EnableSlidingWindow: getBoolFromMap(configMap, "enableSlidingWindow", false), + FailureRateThreshold: getFloatFromMap(configMap, "failureRateThreshold", 0.5), + EnableFallback: getBoolFromMap(configMap, "enableFallback", false), + FallbackMessage: getStringFromMap(configMap, "fallbackMessage", ""), + RequestTimeout: getIntFromMap(configMap, "requestTimeout", defaultRequestTimeout), + DistributedTTL: getIntFromMap(configMap, "distributedTTL", defaultDistributedTTL), + StatIntervalMs: getIntFromMap(configMap, "statIntervalMs", defaultStatIntervalMs), + HalfOpenMaxRequests: getIntFromMap(configMap, "halfOpenMaxRequests", defaultHalfOpenMaxRequests), + HalfOpenSuccessThreshold: getFloatFromMap(configMap, "halfOpenSuccessThreshold", defaultHalfOpenSuccessThreshold), + WarmupDuration: getStringFromMap(configMap, "warmupDuration", defaultWarmupDuration), + EnableAdaptiveThreshold: getBoolFromMap(configMap, "enableAdaptiveThreshold", false), + AdaptiveMinThreshold: getFloatFromMap(configMap, "adaptiveMinThreshold", 0.3), + AdaptiveMaxThreshold: getFloatFromMap(configMap, "adaptiveMaxThreshold", 0.7), } - config.MinRequestAmount = g.Cfg().MustGet(ctx, key+".minRequestAmount", 0).Int() + config.MinRequestAmount = getIntFromMap(configMap, "minRequestAmount", 0) if config.MinRequestAmount == 0 { config.MinRequestAmount = config.MaxFailures } @@ -480,15 +452,125 @@ func loadServiceCircuitBreakerConfig(serviceName string) *CircuitBreakerConfig { config.WarmupDurationParsed, config.WarmupDuration = parseDurationWithDefault(ctx, config.WarmupDuration, defaultWarmupDuration, "warmupDuration") // 解析状态码 - successCodes := g.Cfg().MustGet(ctx, key+".successStatusCodes", "200,201,204").String() + successCodes := getStringFromMap(configMap, "successStatusCodes", "200,201,204") config.SuccessStatusCodes = parseIntSlice(successCodes) - // 解析IP和CIDR - config.AdminIPs = parseStrings(g.Cfg().MustGet(ctx, key+".adminIPs", "").String()) - config.AdminCIDRs = parseStrings(g.Cfg().MustGet(ctx, key+".adminCIDRs", "").String()) - config.CIDRNetMasks, _ = parseCIDRs(config.AdminCIDRs) + return config, nil +} - return config +// 辅助函数:从map中获取值 +func getBoolFromMap(m map[string]interface{}, key string, defaultValue bool) bool { + if val, ok := m[key]; ok { + if b, ok := val.(bool); ok { + return b + } + } + return defaultValue +} + +func getIntFromMap(m map[string]interface{}, key string, defaultValue int) int { + if val, ok := m[key]; ok { + switch v := val.(type) { + case int: + return v + case float64: + return int(v) + case string: + if i, err := strconv.Atoi(v); err == nil { + return i + } + } + } + return defaultValue +} + +func getFloatFromMap(m map[string]interface{}, key string, defaultValue float64) float64 { + if val, ok := m[key]; ok { + switch v := val.(type) { + case float64: + return v + case int: + return float64(v) + case string: + if f, err := strconv.ParseFloat(v, 64); err == nil { + return f + } + } + } + return defaultValue +} + +func getStringFromMap(m map[string]interface{}, key string, defaultValue string) string { + if val, ok := m[key]; ok { + if s, ok := val.(string); ok { + return s + } + } + return defaultValue +} + +// initInterfaceCircuitBreaker 初始化接口级别的熔断器 +func initInterfaceCircuitBreaker(resourcePattern string, config *CircuitBreakerConfig) error { + if err := validateCircuitBreakerConfig(config); err != nil { + return err + } + + threshold := config.FailureRateThreshold + if config.EnableAdaptiveThreshold { + threshold = (config.AdaptiveMinThreshold + config.AdaptiveMaxThreshold) / 2 + } + + var rule []*circuitbreaker.Rule + baseRule := &circuitbreaker.Rule{ + Resource: resourcePattern, + RetryTimeoutMs: uint32(config.TimeoutParsed.Milliseconds()), + MinRequestAmount: uint64(config.MinRequestAmount), + StatIntervalMs: uint32(config.StatIntervalMs), + } + + if config.EnableSlidingWindow { + baseRule.Strategy = circuitbreaker.SlowRequestRatio + baseRule.StatSlidingWindowBucketCount = 10 + baseRule.MaxAllowedRtMs = uint64(config.SlowRequestThresholdParsed.Milliseconds()) + baseRule.Threshold = threshold + } else { + baseRule.Strategy = circuitbreaker.ErrorCount + baseRule.Threshold = float64(config.MaxFailures) + } + + rule = []*circuitbreaker.Rule{baseRule} + + if _, err := circuitbreaker.LoadRulesOfResource(resourcePattern, []*circuitbreaker.Rule{}); err != nil { + return fmt.Errorf("清空熔断规则失败: %v", err) + } + if _, err := circuitbreaker.LoadRules(rule); err != nil { + return fmt.Errorf("加载熔断规则失败: %v", err) + } + + successCodeMap := make(map[int]bool, len(config.SuccessStatusCodes)) + for _, code := range config.SuccessStatusCodes { + successCodeMap[code] = true + } + + cbInfo := &CircuitBreakerInfo{ + ResourceName: resourcePattern, + Config: config, + Metrics: newCircuitBreakerMetrics(), + SuccessCodeMap: successCodeMap, + AdaptiveThreshold: threshold, + WarmupEndTime: time.Now().Add(config.WarmupDurationParsed).Unix(), + } + cbInfo.init() + circuitBreakers.Store(resourcePattern, cbInfo) + + strategy := "error_count" + if config.EnableSlidingWindow { + strategy = "slow_ratio" + } + g.Log().Infof(context.Background(), "接口 %s 熔断器初始化成功: resource=%s, strategy=%s, timeout=%v, threshold=%.2f", + resourcePattern, resourcePattern, strategy, config.TimeoutParsed, rule[0].Threshold) + + return nil } // parseIntSlice 解析整数切片 @@ -503,21 +585,6 @@ func parseIntSlice(str string) []int { return result } -// parseStrings 解析字符串切片 -func parseStrings(str string) []string { - if str == "" { - return nil - } - parts := strings.Split(str, ",") - result := make([]string, 0, len(parts)) - for _, part := range parts { - if trimmed := strings.TrimSpace(part); trimmed != "" { - result = append(result, trimmed) - } - } - return result -} - // parseDurationWithDefault 解析持续时间,失败时使用默认值 func parseDurationWithDefault(ctx context.Context, durationStr, defaultStr, fieldName string) (time.Duration, string) { durationParsed, err := time.ParseDuration(durationStr) @@ -555,16 +622,6 @@ func atomicUpdateMax(maxValue *atomic.Int64, newValue int64) { } } -// getAllowedIPsAndCIDRs 获取允许的IP和CIDR列表(带锁保护) -func getAllowedIPsAndCIDRs() (map[string]bool, []*net.IPNet) { - allowedAdminIPsMutex.RLock() - allowedAdminCIDRsMutex.RLock() - defer allowedAdminIPsMutex.RUnlock() - defer allowedAdminCIDRsMutex.RUnlock() - - return allowedAdminIPsMap, allowedAdminCIDRs -} - // reset 重置所有指标到初始状态 func (m *CircuitBreakerMetrics) reset() { m.TotalRequests.Store(0) @@ -584,26 +641,6 @@ func (m *CircuitBreakerMetrics) reset() { // 时间戳相关字段不重置,LastResetTime在调用时单独设置 } -// parseCIDRs 解析CIDR列表 -func parseCIDRs(strs []string) ([]*net.IPNet, error) { - nets := make([]*net.IPNet, 0, len(strs)) - for _, s := range strs { - if s == "*" { - if _, ipv4Net, err := net.ParseCIDR("0.0.0.0/0"); err == nil { - nets = append(nets, ipv4Net) - } - if _, ipv6Net, err := net.ParseCIDR("::/0"); err == nil { - nets = append(nets, ipv6Net) - } - continue - } - if _, ipNet, err := net.ParseCIDR(s); err == nil { - nets = append(nets, ipNet) - } - } - return nets, nil -} - // newCircuitBreakerMetrics 创建并初始化熔断器指标 func newCircuitBreakerMetrics() *CircuitBreakerMetrics { metrics := &CircuitBreakerMetrics{} @@ -707,84 +744,20 @@ func validateCircuitBreakerConfig(config *CircuitBreakerConfig) error { return nil } -// initServiceCircuitBreaker 初始化服务熔断器 -func initServiceCircuitBreaker(serviceName string, config *CircuitBreakerConfig) error { - if err := validateCircuitBreakerConfig(config); err != nil { - return err - } - - resourceName := "service:" + serviceName - threshold := config.FailureRateThreshold - if config.EnableAdaptiveThreshold { - threshold = (config.AdaptiveMinThreshold + config.AdaptiveMaxThreshold) / 2 - } - - var rule []*circuitbreaker.Rule - baseRule := &circuitbreaker.Rule{ - Resource: resourceName, - RetryTimeoutMs: uint32(config.TimeoutParsed.Milliseconds()), - MinRequestAmount: uint64(config.MinRequestAmount), - StatIntervalMs: uint32(config.StatIntervalMs), - } - - if config.EnableSlidingWindow { - baseRule.Strategy = circuitbreaker.SlowRequestRatio - baseRule.StatSlidingWindowBucketCount = 10 - baseRule.MaxAllowedRtMs = uint64(config.SlowRequestThresholdParsed.Milliseconds()) - baseRule.Threshold = threshold - } else { - baseRule.Strategy = circuitbreaker.ErrorCount - baseRule.Threshold = float64(config.MaxFailures) - } - - rule = []*circuitbreaker.Rule{baseRule} - - if _, err := circuitbreaker.LoadRulesOfResource(resourceName, []*circuitbreaker.Rule{}); err != nil { - return fmt.Errorf("清空熔断规则失败: %v", err) - } - if _, err := circuitbreaker.LoadRules(rule); err != nil { - return fmt.Errorf("加载熔断规则失败: %v", err) - } - - successCodeMap := make(map[int]bool, len(config.SuccessStatusCodes)) - for _, code := range config.SuccessStatusCodes { - successCodeMap[code] = true - } - - cbInfo := &CircuitBreakerInfo{ - ResourceName: resourceName, - Config: config, - Metrics: newCircuitBreakerMetrics(), - SuccessCodeMap: successCodeMap, - CIDRNetMasks: config.CIDRNetMasks, - AdaptiveThreshold: threshold, - WarmupEndTime: time.Now().Add(config.WarmupDurationParsed).Unix(), - } - cbInfo.init() - circuitBreakers.Store(serviceName, cbInfo) - - strategy := "error_count" - if config.EnableSlidingWindow { - strategy = "slow_ratio" - } - g.Log().Infof(context.Background(), "服务 %s 熔断器初始化成功: resource=%s, strategy=%s, timeout=%v, threshold=%.2f", - serviceName, resourceName, strategy, config.TimeoutParsed, rule[0].Threshold) - - return nil -} - // CircuitBreakerMiddleware 熔断降级中间件 func CircuitBreakerMiddleware(r *ghttp.Request) { startTime := time.Now() ctx := r.GetCtx() - serviceName := extractServiceName(r.URL.Path) - if serviceName == "" { + // 基于接口地址+请求参数生成熔断资源名 + resourceName := generateResourceName(r) + if resourceName == "" { r.Middleware.Next() return } - cbInfo, config := getCircuitBreakerInfoAndConfig(serviceName) + // 检查是否有该资源的熔断配置 + cbInfo, config := getCircuitBreakerInfoByResource(resourceName) if cbInfo == nil || config == nil || !config.Enabled { r.Middleware.Next() return @@ -797,7 +770,6 @@ func CircuitBreakerMiddleware(r *ghttp.Request) { return } - resourceName := cbInfo.ResourceName if config.RequestTimeout > 0 { var ctxCancel context.CancelFunc ctx, ctxCancel = context.WithTimeout(ctx, time.Duration(config.RequestTimeout)*time.Millisecond) @@ -809,7 +781,7 @@ func CircuitBreakerMiddleware(r *ghttp.Request) { if config.DistributedTTL > 0 && isCircuitBreakerOpenInDistributed(ctx, resourceName) { cbInfo.Metrics.BlockRequests.Add(1) g.Log().Warningf(ctx, "分布式熔断触发: %s", resourceName) - sendFallbackResponse(r, serviceName, config, "distributed") + sendFallbackResponse(r, resourceName, config, "distributed") return } @@ -831,7 +803,7 @@ func CircuitBreakerMiddleware(r *ghttp.Request) { } } - sendFallbackResponse(r, serviceName, config, "halfopen_limit") + sendFallbackResponse(r, resourceName, config, "halfopen_limit") return } } @@ -846,14 +818,14 @@ func CircuitBreakerMiddleware(r *ghttp.Request) { oldState := cbInfo.setStateWithMetrics(StateOpen, true) if oldState != StateOpen { - notifyStateChange(serviceName, oldState, StateOpen) + notifyStateChange(resourceName, oldState, StateOpen) } if config.DistributedTTL > 0 { syncCircuitBreakerStateToDistributed(ctx, resourceName, "open", config.DistributedTTL) } - sendFallbackResponse(r, serviceName, config, "blocked") + sendFallbackResponse(r, resourceName, config, "blocked") return } @@ -882,7 +854,7 @@ func CircuitBreakerMiddleware(r *ghttp.Request) { if entry != nil { api.TraceError(entry, fmt.Errorf("request failed with status: %d", statusCode)) } - g.Log().Debugf(ctx, "服务 %s 请求失败: status=%d, duration=%v", serviceName, statusCode, duration) + g.Log().Debugf(ctx, "接口 %s 请求失败: status=%d, duration=%v", resourceName, statusCode, duration) // 重新获取当前状态,避免使用过期状态 currentState := cbInfo.getState() @@ -923,27 +895,27 @@ func CircuitBreakerMiddleware(r *ghttp.Request) { // 如果状态不是关闭但也不是半开,尝试重置为关闭状态 oldState := cbInfo.setStateWithMetrics(StateClosed, true) if oldState != StateClosed { - notifyStateChange(serviceName, oldState, StateClosed) + notifyStateChange(resourceName, oldState, StateClosed) } } } } // sendFallbackResponse 发送降级响应 -func sendFallbackResponse(r *ghttp.Request, serviceName string, config *CircuitBreakerConfig, reason string) { - g.Log().Warningf(r.GetCtx(), "熔断器降级: service=%s, reason=%s, clientIP=%s", serviceName, reason, r.GetClientIp()) +func sendFallbackResponse(r *ghttp.Request, resourceName string, config *CircuitBreakerConfig, reason string) { + g.Log().Warningf(r.GetCtx(), "熔断器降级: resource=%s, reason=%s", resourceName, reason) if config.EnableFallback && config.FallbackMessage != "" { r.Response.WriteStatusExit(503, config.FallbackMessage) return } - msg := fmt.Sprintf("服务 '%s' 暂时不可用,请稍后再试", serviceName) + msg := fmt.Sprintf("接口 '%s' 暂时不可用,请稍后再试", resourceName) switch reason { case "blocked": - msg = fmt.Sprintf("服务 '%s' 熔断保护中,请稍后再试", serviceName) + msg = fmt.Sprintf("接口 '%s' 熔断保护中,请稍后再试", resourceName) case "distributed": - msg = fmt.Sprintf("服务 '%s' 分布式熔断中", serviceName) + msg = fmt.Sprintf("接口 '%s' 分布式熔断中", resourceName) } r.Response.WriteStatusExit(503, msg) } @@ -960,101 +932,45 @@ func isSuccessStatusCode(cbInfo *CircuitBreakerInfo, statusCode int) bool { return statusCode >= 200 && statusCode < 300 } -// extractServiceName 从URL路径提取服务名 -func extractServiceName(path string) string { - path = strings.Trim(path, "/") - if path == "" { +// generateResourceName 基于接口地址+请求参数生成熔断资源名 +func generateResourceName(r *ghttp.Request) string { + method := r.Method + path := r.URL.Path + query := r.URL.Query().Encode() + + // 生成资源名:方法:路径?查询参数 + // 示例: GET:/api/users?userId=123 + resourceName := method + ":" + path + if query != "" { + // 对查询参数进行排序以确保相同的参数顺序生成相同的资源名 + sortedQuery := sortQueryString(query) + resourceName += "?" + sortedQuery + } + + return resourceName +} + +// sortQueryString 对查询字符串进行排序 +func sortQueryString(query string) string { + if query == "" { return "" } - // 获取第一个路径段 - if idx := strings.Index(path, "/"); idx > 0 { - path = path[:idx] + params := strings.Split(query, "&") + if len(params) == 0 { + return query } - // 解码URL编码(简化版) - serviceName := path - if strings.Contains(serviceName, "%") { - serviceName = urlDecode(serviceName) - } - - if _, ok := circuitBreakerConfigs.Load(serviceName); ok { - return serviceName - } - return "" -} - -// urlDecode 简单的URL解码 -func urlDecode(s string) string { - result := make([]byte, 0, len(s)) - - for i := 0; i < len(s); i++ { - if s[i] == '%' && i+2 < len(s) { - if high := hexDigit(s[i+1]); high != 0xFF { - if low := hexDigit(s[i+2]); low != 0xFF { - result = append(result, (high<<4)|low) - i += 2 - continue - } + // 简单的字符串排序 + for i := 0; i < len(params)-1; i++ { + for j := i + 1; j < len(params); j++ { + if params[i] > params[j] { + params[i], params[j] = params[j], params[i] } } - result = append(result, s[i]) } - return string(result) -} - -func hexDigit(c byte) byte { - switch { - case '0' <= c && c <= '9': - return c - '0' - case 'a' <= c && c <= 'f': - return c - 'a' + 10 - case 'A' <= c && c <= 'F': - return c - 'A' + 10 - default: - return 0xFF - } -} - -// updateAdminIPsCache 更新管理员IP白名单缓存 -func updateAdminIPsCache() { - ipMap := make(map[string]bool) - cidrNets := make([]*net.IPNet, 0) - - circuitBreakerConfigs.Range(func(_, value interface{}) bool { - config, ok := value.(*CircuitBreakerConfig) - if !ok { - return true - } - for _, ip := range config.AdminIPs { - if !ipMap[ip] { - ipMap[ip] = true - } - } - cidrNets = append(cidrNets, config.CIDRNetMasks...) - return true - }) - - allowedAdminIPsMutex.Lock() - allowedAdminIPsMap = ipMap - allowedAdminIPsMutex.Unlock() - - allowedAdminCIDRsMutex.Lock() - allowedAdminCIDRs = cidrNets - allowedAdminCIDRsMutex.Unlock() -} - -// filterServiceNames 过滤服务名 -func filterServiceNames(services map[string]interface{}) []string { - excludeKeys := map[string]bool{"services": true, "enableDistributed": true, "requestTimeout": true, "distributedTTL": true} - result := make([]string, 0, len(services)) - for key := range services { - if !excludeKeys[key] { - result = append(result, key) - } - } - return result + return strings.Join(params, "&") } // isCircuitBreakerOpenInDistributed 检查分布式熔断状态 @@ -1105,11 +1021,6 @@ func syncCircuitBreakerStateToDistributed(ctx context.Context, resourceName, sta // CircuitBreakerHealthCheckHandler 健康检查接口 func CircuitBreakerHealthCheckHandler(r *ghttp.Request) { - if !isAdminIP(r) { - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 403, Message: "权限不足,禁止访问"}) - return - } - page := r.Get("page").Int() size := r.Get("size").Int() if page < 0 { @@ -1119,11 +1030,14 @@ func CircuitBreakerHealthCheckHandler(r *ghttp.Request) { size = 20 } - serviceNamesMutex.RLock() - slice := serviceNamesSlice - serviceNamesMutex.RUnlock() + // 获取所有熔断器资源 + var resources []string + circuitBreakers.Range(func(key, value interface{}) bool { + resources = append(resources, key.(string)) + return true + }) - total := len(slice) + total := len(resources) start := page * size if start >= total { r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 200, Message: "熔断器状态", @@ -1144,8 +1058,8 @@ func CircuitBreakerHealthCheckHandler(r *ghttp.Request) { halfOpenServices := 0 for i := start; i < end; i++ { - serviceName := slice[i] - cbInfoVal, ok := circuitBreakers.Load(serviceName) + resourceName := resources[i] + cbInfoVal, ok := circuitBreakers.Load(resourceName) if !ok { continue } @@ -1166,7 +1080,7 @@ func CircuitBreakerHealthCheckHandler(r *ghttp.Request) { lastOpenTimeStr := formatUnixTime(cbInfo.Metrics.LastOpenTime.Load()) nextRetryTimeStr := formatUnixTime(cbInfo.Metrics.NextRetryTime.Load()) - status[serviceName] = map[string]interface{}{ + status[resourceName] = map[string]interface{}{ "resource": cbInfo.ResourceName, "state": string(state), "lastOpenTime": lastOpenTimeStr, @@ -1189,74 +1103,34 @@ func CircuitBreakerHealthCheckHandler(r *ghttp.Request) { "services": status, "page": page, "size": size, "total": total}}) } -// isAdminIP 检查IP是否在白名单中 -func isAdminIP(r *ghttp.Request) bool { - clientIP := r.GetClientIp() - if clientIP == "" { - return false - } - - // 一次性获取IP和CIDR列表,减少锁操作 - allowedIPs, allowedCIDRs := getAllowedIPsAndCIDRs() - - // 如果没有任何限制,允许访问 - if len(allowedIPs) == 0 && len(allowedCIDRs) == 0 { - return true - } - - // 检查IP白名单 - if allowedIPs[clientIP] { - return true - } - - // 检查CIDR白名单 - if clientNetIP := net.ParseIP(clientIP); clientNetIP != nil { - for _, cidrNet := range allowedCIDRs { - if cidrNet.Contains(clientNetIP) { - return true - } - } - } - - g.Log().Warningf(r.GetCtx(), "熔断器操作请求被拒绝,IP不在白名单中: %s", clientIP) - return false -} - -// batchProcessServices 批量处理服务 -func batchProcessServices(r *ghttp.Request, processFunc func(serviceName string) error) (int, int, map[string]string) { +// batchProcessResources 批量处理资源 +func batchProcessResources(r *ghttp.Request, processFunc func(resourceName string) error) (int, int, map[string]string) { successCount := 0 failCount := 0 failures := make(map[string]string) - serviceNamesMutex.RLock() - slice := serviceNamesSlice - serviceNamesMutex.RUnlock() - - for _, serviceName := range slice { - if err := processFunc(serviceName); err != nil { - g.Log().Errorf(r.GetCtx(), "服务 %s 处理失败: %v", serviceName, err) + circuitBreakers.Range(func(key, value interface{}) bool { + resourceName := key.(string) + if err := processFunc(resourceName); err != nil { + g.Log().Errorf(r.GetCtx(), "资源 %s 处理失败: %v", resourceName, err) failCount++ - failures[serviceName] = err.Error() + failures[resourceName] = err.Error() } else { successCount++ } - } + return true + }) return successCount, failCount, failures } // CircuitBreakerResetHandler 重置熔断器 func CircuitBreakerResetHandler(r *ghttp.Request) { - serviceName := r.Get("service").String() + resourceName := r.Get("resource").String() - if !isAdminIP(r) { - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 403, Message: "权限不足,禁止访问"}) - return - } - - if serviceName == "" || serviceName == "*" { - successCount, failCount, failures := batchProcessServices(r, func(name string) error { - return resetSingleService(r, name) + if resourceName == "" || resourceName == "*" { + successCount, failCount, failures := batchProcessResources(r, func(name string) error { + return resetSingleResource(r, name) }) g.Log().Infof(r.GetCtx(), "批量重置熔断器完成: 成功 %d, 失败 %d", successCount, failCount) @@ -1265,45 +1139,35 @@ func CircuitBreakerResetHandler(r *ghttp.Request) { return } - if err := resetSingleService(r, serviceName); err != nil { + if err := resetSingleResource(r, resourceName); err != nil { r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 500, Message: fmt.Sprintf("重置熔断器失败: %v", err)}) return } - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 200, Message: fmt.Sprintf("服务 '%s' 的熔断器已重置", serviceName)}) + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 200, Message: fmt.Sprintf("资源 '%s' 的熔断器已重置", resourceName)}) } -// resetSingleService 重置单个服务 -func resetSingleService(r *ghttp.Request, serviceName string) error { - resourceName := "service:" + serviceName - +// resetSingleResource 重置单个资源 +func resetSingleResource(r *ghttp.Request, resourceName string) error { if rules := circuitbreaker.GetRulesOfResource(resourceName); len(rules) > 0 { if _, err := circuitbreaker.LoadRulesOfResource(resourceName, []*circuitbreaker.Rule{}); err != nil { return err } } - if configVal, ok := circuitBreakerConfigs.Load(serviceName); ok { - if err := initServiceCircuitBreaker(serviceName, configVal.(*CircuitBreakerConfig)); err != nil { - return err - } - } - - if cbInfoVal, ok := circuitBreakers.Load(serviceName); ok { + if cbInfoVal, ok := circuitBreakers.Load(resourceName); ok { cbInfo := cbInfoVal.(*CircuitBreakerInfo) + config := cbInfo.Config cbInfo.State.Store(stateClosed) // 重置指标 cbInfo.Metrics.reset() - cbInfo.WarmupEndTime = time.Now().Add(cbInfo.Config.WarmupDurationParsed).Unix() + cbInfo.WarmupEndTime = time.Now().Add(config.WarmupDurationParsed).Unix() cbInfo.Metrics.LastResetTime.Store(time.Now().Unix()) - } - if configVal, ok := circuitBreakerConfigs.Load(serviceName); ok { - config, ok := configVal.(*CircuitBreakerConfig) - if ok && config.DistributedTTL > 0 { + // 清除分布式状态 + if config.DistributedTTL > 0 { redisClient := g.Redis() if redisClient != nil { - // 使用common/redis中的Lock方法确保分布式一致性 lockKey := "circuit_breaker:" + resourceName + ":lock" success, err := redis.Lock(r.GetCtx(), lockKey, 10, func(ctx context.Context) error { _, err := redisClient.Del(ctx, "circuit_breaker:"+resourceName+":state") @@ -1329,31 +1193,7 @@ func resetSingleService(r *ghttp.Request, serviceName string) error { // CircuitBreakerReloadHandler 配置重载接口 func CircuitBreakerReloadHandler(r *ghttp.Request) { - serviceName := r.Get("service").String() - - if !isAdminIP(r) { - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 403, Message: "权限不足,禁止访问"}) - return - } - - if serviceName == "" || serviceName == "*" { - successCount, failCount, failures := batchProcessServices(r, func(serviceName string) error { - return ReloadCircuitBreakerConfig(serviceName) - }) - - updateAdminIPsCache() - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 200, Message: fmt.Sprintf("配置重载完成: 成功 %d, 失败 %d", successCount, failCount), - Data: map[string]interface{}{"success": successCount, "failed": failCount, "failures": failures}}) - return - } - - if err := ReloadCircuitBreakerConfig(serviceName); err != nil { - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 500, Message: fmt.Sprintf("重载失败: %v", err)}) - return - } - - updateAdminIPsCache() - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 200, Message: fmt.Sprintf("服务 '%s' 的熔断器配置已重载", serviceName)}) + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 501, Message: "基于接口的熔断器暂不支持配置重载"}) } // StateChangeListener 状态变化监听器类型