Files
common/middleware/circuit_breaker.go

741 lines
23 KiB
Go
Raw Normal View History

2025-12-31 23:38:33 +08:00
package middleware
import (
"context"
"fmt"
"strings"
"sync"
2026-01-01 07:38:00 +08:00
"sync/atomic"
2025-12-31 23:38:33 +08:00
"time"
2026-01-01 01:33:59 +08:00
"github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
2025-12-31 23:38:33 +08:00
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/ghttp"
)
2026-01-01 01:33:59 +08:00
// CircuitBreakerState 熔断器状态
type CircuitBreakerState string
2025-12-31 23:38:33 +08:00
const (
2026-01-01 10:48:47 +08:00
StateClosed CircuitBreakerState = "closed" // 关闭:正常状态
StateOpen CircuitBreakerState = "open" // 开启:熔断状态
2025-12-31 23:38:33 +08:00
)
2026-01-01 01:33:59 +08:00
// CircuitBreakerConfig 熔断器配置
2025-12-31 23:38:33 +08:00
type CircuitBreakerConfig struct {
2026-01-01 10:48:47 +08:00
Enabled bool // 是否启用熔断器
MaxFailures int // 连续失败次数
Timeout string // 熔断超时时间
TimeoutParsed time.Duration // 缓存的超时时间(性能优化)
SuccessStatusCodes []int // 视为成功的HTTP状态码
SlowRequestThreshold string // 慢请求阈值
SlowRequestThresholdParsed time.Duration // 缓存的慢请求阈值(性能优化)
EnableSlidingWindow bool // 是否启用滑动窗口
FailureRateThreshold float64 // 失败率阈值
EnableFallback bool // 是否启用降级
FallbackMessage string // 降级提示消息
RequestTimeout int // 请求超时时间(毫秒)
DistributedTTL int // 分布式熔断状态TTL
2026-01-01 07:38:00 +08:00
}
// CircuitBreakerMetrics 熔断器指标
type CircuitBreakerMetrics struct {
TotalRequests atomic.Int64 // 总请求数
PassRequests atomic.Int64 // 通过请求数
BlockRequests atomic.Int64 // 阻塞请求数
FailureRequests atomic.Int64 // 失败请求数
OpenCount atomic.Int64 // 熔断开启次数
2025-12-31 23:38:33 +08:00
}
2026-01-01 01:33:59 +08:00
// CircuitBreakerInfo 熔断器信息
type CircuitBreakerInfo struct {
2026-01-01 07:38:00 +08:00
ResourceName string `json:"resourceName"` // 资源名称
State CircuitBreakerState `json:"state"` // 当前状态
Config *CircuitBreakerConfig `json:"config"` // 配置信息
LastOpenTime time.Time `json:"lastOpenTime"` // 上次熔断时间
NextRetryTime time.Time `json:"nextRetryTime"` // 下次重试时间
Metrics *CircuitBreakerMetrics `json:"metrics"` // 指标统计
mu sync.RWMutex // 保护状态更新
2025-12-31 23:38:33 +08:00
}
var (
2026-01-01 01:33:59 +08:00
// circuitBreakers 存储所有熔断器状态(用于健康检查)
circuitBreakers sync.Map
// circuitBreakerConfigs 熔断器配置缓存
circuitBreakerConfigs sync.Map
2026-01-01 10:48:47 +08:00
// distributedSyncLocks 分布式同步锁(按服务名分片)
distributedSyncLocks sync.Map
2026-01-01 10:37:01 +08:00
// stateChangeListeners 状态变化监听器
stateChangeListeners sync.Map
2026-01-01 10:48:47 +08:00
// stateChangeListenersRegistered 默认监听器是否已注册
stateChangeListenersRegistered sync.Map
2025-12-31 23:38:33 +08:00
)
2026-01-01 01:33:59 +08:00
// InitCircuitBreaker 初始化Sentinel熔断器
func InitCircuitBreaker() error {
ctx := context.Background()
2025-12-31 23:38:33 +08:00
2026-01-01 01:33:59 +08:00
// 初始化Sentinel
err := api.InitDefault()
if err != nil {
2026-01-01 10:48:47 +08:00
return fmt.Errorf("sentinel初始化失败: %v", err)
2025-12-31 23:38:33 +08:00
}
2026-01-01 10:37:01 +08:00
// 注册熔断器状态变化监听器
registerStateChangeListeners()
2025-12-31 23:38:33 +08:00
2026-01-01 10:37:01 +08:00
g.Log().Infof(ctx, "Sentinel熔断器初始化成功")
2026-01-01 07:38:00 +08:00
2026-01-01 10:37:01 +08:00
// 扫描配置文件中所有配置了熔断器的服务
services := g.Cfg().MustGet(ctx, "circuitBreaker").Map()
// 过滤掉非服务配置的key
2026-01-01 10:48:47 +08:00
serviceNames := filterServiceNames(services)
2026-01-01 10:37:01 +08:00
if len(serviceNames) == 0 {
g.Log().Infof(ctx, "未配置任何服务熔断器")
return nil
2025-12-31 23:38:33 +08:00
}
2026-01-01 07:38:00 +08:00
// 为每个服务创建熔断器
2026-01-01 10:37:01 +08:00
enabledCount := 0
for _, serviceName := range serviceNames {
serviceConfig := loadServiceCircuitBreakerConfig(serviceName)
if serviceConfig != nil && serviceConfig.Enabled {
circuitBreakerConfigs.Store(serviceName, serviceConfig)
initErr := initServiceCircuitBreaker(serviceName, serviceConfig)
2026-01-01 01:33:59 +08:00
if initErr != nil {
2026-01-01 10:37:01 +08:00
g.Log().Errorf(ctx, "服务 %s 熔断器初始化失败: %v", serviceName, initErr)
2026-01-01 07:38:00 +08:00
} else {
2026-01-01 10:37:01 +08:00
g.Log().Infof(ctx, "服务 %s 熔断器初始化成功", serviceName)
enabledCount++
2025-12-31 23:38:33 +08:00
}
2026-01-01 10:37:01 +08:00
} else {
g.Log().Infof(ctx, "服务 %s 熔断器未启用", serviceName)
2025-12-31 23:38:33 +08:00
}
}
2026-01-01 10:37:01 +08:00
g.Log().Infof(ctx, "共初始化 %d 个服务熔断器,其中 %d 个已启用", len(serviceNames), enabledCount)
2026-01-01 01:33:59 +08:00
return nil
2025-12-31 23:38:33 +08:00
}
2026-01-01 07:38:00 +08:00
// ReloadCircuitBreakerConfig 动态重新加载熔断器配置
func ReloadCircuitBreakerConfig(serviceName string) error {
ctx := context.Background()
// 重新加载配置
serviceConfig := loadServiceCircuitBreakerConfig(serviceName)
if serviceConfig == nil {
return fmt.Errorf("未找到服务 %s 的配置", serviceName)
2025-12-31 23:38:33 +08:00
}
2026-01-01 07:38:00 +08:00
// 更新配置缓存
circuitBreakerConfigs.Store(serviceName, serviceConfig)
// 重新初始化熔断器
err := initServiceCircuitBreaker(serviceName, serviceConfig)
if err != nil {
return fmt.Errorf("重新初始化熔断器失败: %v", err)
2025-12-31 23:38:33 +08:00
}
2026-01-01 07:38:00 +08:00
g.Log().Infof(ctx, "服务 %s 熔断器配置重新加载成功", serviceName)
return nil
2025-12-31 23:38:33 +08:00
}
2026-01-01 01:33:59 +08:00
// loadServiceCircuitBreakerConfig 加载单个服务的熔断器配置
func loadServiceCircuitBreakerConfig(serviceName string) *CircuitBreakerConfig {
ctx := context.Background()
key := fmt.Sprintf("circuitBreaker.%s", serviceName)
2026-01-01 10:37:01 +08:00
enabled := g.Cfg().MustGet(ctx, key+".enabled", true).Bool()
2026-01-01 01:33:59 +08:00
maxFailures := g.Cfg().MustGet(ctx, key+".maxFailures", 5).Int()
timeout := g.Cfg().MustGet(ctx, key+".timeout", "60s").String()
slowRequestThreshold := g.Cfg().MustGet(ctx, key+".slowRequestThreshold", "3s").String()
enableSlidingWindow := g.Cfg().MustGet(ctx, key+".enableSlidingWindow", false).Bool()
failureRateThreshold := g.Cfg().MustGet(ctx, key+".failureRateThreshold", 0.5).Float64()
2026-01-01 07:38:00 +08:00
enableFallback := g.Cfg().MustGet(ctx, key+".enableFallback", false).Bool()
fallbackMessage := g.Cfg().MustGet(ctx, key+".fallbackMessage", "").String()
2026-01-01 10:37:01 +08:00
requestTimeout := g.Cfg().MustGet(ctx, key+".requestTimeout", 30000).Int()
distributedTTL := g.Cfg().MustGet(ctx, key+".distributedTTL", 300).Int()
2026-01-01 01:33:59 +08:00
// 解析成功状态码
successCodes := g.Cfg().MustGet(ctx, key+".successStatusCodes", "200,201,204").String()
statusCodes := parseStatusCodes(successCodes)
2026-01-01 10:48:47 +08:00
// 解析时间(缓存结果,性能优化)
timeoutParsed, err := time.ParseDuration(timeout)
if err != nil {
timeoutParsed = 60 * time.Second
g.Log().Warningf(ctx, "服务 %s 的 timeout 解析失败,使用默认值: %v", serviceName, err)
}
slowRequestThresholdParsed, err := time.ParseDuration(slowRequestThreshold)
if err != nil {
slowRequestThresholdParsed = 3 * time.Second
g.Log().Warningf(ctx, "服务 %s 的 slowRequestThreshold 解析失败,使用默认值: %v", serviceName, err)
}
2026-01-01 01:33:59 +08:00
return &CircuitBreakerConfig{
2026-01-01 10:48:47 +08:00
Enabled: enabled,
MaxFailures: maxFailures,
Timeout: timeout,
TimeoutParsed: timeoutParsed,
SuccessStatusCodes: statusCodes,
SlowRequestThreshold: slowRequestThreshold,
SlowRequestThresholdParsed: slowRequestThresholdParsed,
EnableSlidingWindow: enableSlidingWindow,
FailureRateThreshold: failureRateThreshold,
EnableFallback: enableFallback,
FallbackMessage: fallbackMessage,
RequestTimeout: requestTimeout,
DistributedTTL: distributedTTL,
2025-12-31 23:38:33 +08:00
}
}
2026-01-01 01:33:59 +08:00
// parseStatusCodes 解析HTTP状态码
func parseStatusCodes(str string) []int {
parts := strings.Split(str, ",")
codes := make([]int, 0, len(parts))
for _, part := range parts {
var code int
if _, err := fmt.Sscanf(strings.TrimSpace(part), "%d", &code); err == nil {
codes = append(codes, code)
}
2025-12-31 23:38:33 +08:00
}
2026-01-01 01:33:59 +08:00
return codes
}
2025-12-31 23:38:33 +08:00
2026-01-01 10:48:47 +08:00
// filterServiceNames 过滤服务名排除非服务配置的key
func filterServiceNames(services map[string]interface{}) []string {
excludeKeys := map[string]bool{
"services": true,
"enableDistributed": true,
"requestTimeout": true,
"distributedTTL": true,
}
serviceNames := make([]string, 0, len(services))
for key := range services {
if !excludeKeys[key] {
serviceNames = append(serviceNames, key)
}
}
return serviceNames
}
2026-01-01 01:33:59 +08:00
// initServiceCircuitBreaker 初始化服务熔断器
func initServiceCircuitBreaker(serviceName string, config *CircuitBreakerConfig) error {
2026-01-01 10:37:01 +08:00
// 验证配置参数
if err := validateCircuitBreakerConfig(config); err != nil {
return fmt.Errorf("配置验证失败: %v", err)
}
2026-01-01 10:48:47 +08:00
// 使用缓存的时间值(性能优化)
timeout := config.TimeoutParsed
slowRequestThreshold := config.SlowRequestThresholdParsed
2026-01-01 01:33:59 +08:00
resourceName := fmt.Sprintf("service:%s", serviceName)
var rule []*circuitbreaker.Rule
if config.EnableSlidingWindow {
// 使用滑动窗口统计(更精确)- 慢调用比例策略
rule = []*circuitbreaker.Rule{
{
Resource: resourceName,
Strategy: circuitbreaker.SlowRequestRatio,
RetryTimeoutMs: uint32(timeout.Milliseconds()),
MinRequestAmount: uint64(config.MaxFailures),
StatIntervalMs: 1000,
StatSlidingWindowBucketCount: 10,
MaxAllowedRtMs: uint64(slowRequestThreshold.Milliseconds()),
Threshold: config.FailureRateThreshold,
},
}
} else {
// 使用连续失败计数(更简单快速)- 异常数策略
rule = []*circuitbreaker.Rule{
{
Resource: resourceName,
Strategy: circuitbreaker.ErrorCount,
RetryTimeoutMs: uint32(timeout.Milliseconds()),
MinRequestAmount: uint64(config.MaxFailures),
StatIntervalMs: 1000, // 1秒统计窗口
Threshold: float64(config.MaxFailures),
},
}
2025-12-31 23:38:33 +08:00
}
2026-01-01 10:48:47 +08:00
// 先清理旧规则(健壮性改进)
_, _ = circuitbreaker.LoadRulesOfResource(resourceName, []*circuitbreaker.Rule{})
// 加载新规则到Sentinel
_, err := circuitbreaker.LoadRules(rule)
2025-12-31 23:38:33 +08:00
if err != nil {
2026-01-01 01:33:59 +08:00
return fmt.Errorf("加载熔断规则失败: %v", err)
2025-12-31 23:38:33 +08:00
}
2026-01-01 01:33:59 +08:00
// 初始化熔断器信息
cbInfo := &CircuitBreakerInfo{
ResourceName: resourceName,
State: StateClosed,
Config: config,
2026-01-01 07:38:00 +08:00
Metrics: &CircuitBreakerMetrics{},
}
2026-01-01 01:33:59 +08:00
circuitBreakers.Store(serviceName, cbInfo)
2026-01-01 01:33:59 +08:00
strategy := "error_count"
if config.EnableSlidingWindow {
strategy = "slow_ratio"
2025-12-31 23:38:33 +08:00
}
2026-01-01 10:37:01 +08:00
g.Log().Infof(context.Background(), "服务 %s 熔断器初始化成功: resource=%s, strategy=%s, timeout=%v, threshold=%.2f",
serviceName, resourceName, strategy, timeout, rule[0].Threshold)
2025-12-31 23:38:33 +08:00
2026-01-01 01:33:59 +08:00
return nil
2025-12-31 23:38:33 +08:00
}
2026-01-01 01:33:59 +08:00
// CircuitBreakerMiddleware 熔断降级中间件使用阿里Sentinel
2025-12-31 23:38:33 +08:00
func CircuitBreakerMiddleware(r *ghttp.Request) {
2026-01-01 07:38:00 +08:00
startTime := time.Now()
2026-01-01 10:37:01 +08:00
ctx := r.GetCtx()
2026-01-01 07:38:00 +08:00
2026-01-01 01:33:59 +08:00
// 从URL路径提取服务名
2025-12-31 23:38:33 +08:00
pathParts := strings.Split(strings.Trim(r.URL.Path, "/"), "/")
if len(pathParts) == 0 {
r.Middleware.Next()
return
}
serviceName := pathParts[0]
2026-01-01 01:33:59 +08:00
2026-01-01 10:37:01 +08:00
// 获取熔断器配置
val, ok := circuitBreakerConfigs.Load(serviceName)
2026-01-01 07:38:00 +08:00
if !ok {
// 未配置熔断器,直接放行
r.Middleware.Next()
return
}
2026-01-01 10:37:01 +08:00
config := val.(*CircuitBreakerConfig)
if !config.Enabled {
// 熔断器未启用,直接放行
r.Middleware.Next()
return
}
// 获取熔断器信息
cbInfoVal, ok := circuitBreakers.Load(serviceName)
if !ok {
r.Middleware.Next()
return
}
cbInfo := cbInfoVal.(*CircuitBreakerInfo)
2026-01-01 07:38:00 +08:00
cbInfo.Metrics.TotalRequests.Add(1)
2026-01-01 10:37:01 +08:00
// 设置请求超时(使用服务独立配置)
if config.RequestTimeout > 0 {
ctx, cancel := context.WithTimeout(ctx, time.Duration(config.RequestTimeout)*time.Millisecond)
r.SetCtx(ctx)
defer cancel()
}
resourceName := fmt.Sprintf("service:%s", serviceName)
2026-01-01 01:33:59 +08:00
// 检查是否启用分布式熔断
2026-01-01 10:37:01 +08:00
if config.DistributedTTL > 0 {
if isCircuitBreakerOpenInDistributed(ctx, resourceName) {
2026-01-01 07:38:00 +08:00
cbInfo.Metrics.BlockRequests.Add(1)
2026-01-01 10:37:01 +08:00
g.Log().Warningf(ctx, "分布式熔断触发: %s", resourceName)
notifyStateChange(serviceName, StateOpen, StateOpen)
sendFallbackResponse(r, serviceName, config, "distributed")
2026-01-01 01:33:59 +08:00
return
}
2025-12-31 23:38:33 +08:00
}
2026-01-01 01:33:59 +08:00
// 使用Sentinel进行熔断保护
entry, blockError := api.Entry(resourceName)
if blockError != nil {
// 被熔断拦截
2026-01-01 07:38:00 +08:00
cbInfo.Metrics.BlockRequests.Add(1)
cbInfo.Metrics.OpenCount.Add(1)
2026-01-01 10:37:01 +08:00
g.Log().Warningf(ctx, "熔断触发: %s, reason: %v", resourceName, blockError)
2026-01-01 01:33:59 +08:00
// 更新熔断器状态
2026-01-01 07:38:00 +08:00
cbInfo.mu.Lock()
2026-01-01 10:37:01 +08:00
oldState := cbInfo.State
2026-01-01 07:38:00 +08:00
cbInfo.State = StateOpen
cbInfo.LastOpenTime = time.Now()
2026-01-01 10:48:47 +08:00
// 使用缓存的时间值(性能优化)
cbInfo.NextRetryTime = time.Now().Add(config.TimeoutParsed)
2026-01-01 07:38:00 +08:00
cbInfo.mu.Unlock()
2025-12-31 23:38:33 +08:00
2026-01-01 10:37:01 +08:00
// 通知状态变化(如果状态改变)
if oldState != StateOpen {
notifyStateChange(serviceName, oldState, StateOpen)
}
2026-01-01 07:38:00 +08:00
// 同步到分布式存储
2026-01-01 10:37:01 +08:00
if config.DistributedTTL > 0 {
syncCircuitBreakerStateToDistributed(ctx, resourceName, "open", config.DistributedTTL)
2026-01-01 07:38:00 +08:00
}
2026-01-01 10:37:01 +08:00
sendFallbackResponse(r, serviceName, config, "blocked")
2026-01-01 01:33:59 +08:00
return
}
2025-12-31 23:38:33 +08:00
2026-01-01 01:33:59 +08:00
// 执行后续中间件和业务逻辑
r.Middleware.Next()
2025-12-31 23:38:33 +08:00
2026-01-01 01:33:59 +08:00
// 记录请求结果基于HTTP状态码
statusCode := r.Response.Status
2026-01-01 07:38:00 +08:00
duration := time.Since(startTime)
2026-01-01 01:33:59 +08:00
if !isSuccessStatusCode(resourceName, statusCode) {
// 记录异常
2026-01-01 07:38:00 +08:00
cbInfo.Metrics.FailureRequests.Add(1)
2026-01-01 01:33:59 +08:00
api.TraceError(entry, fmt.Errorf("request failed with status: %d", statusCode))
2026-01-01 10:37:01 +08:00
g.Log().Debugf(ctx, "服务 %s 请求失败: status=%d, duration=%v", serviceName, statusCode, duration)
2026-01-01 07:38:00 +08:00
} else {
cbInfo.Metrics.PassRequests.Add(1)
2026-01-01 10:37:01 +08:00
// 更新状态为关闭(如果之前是开启状态)
cbInfo.mu.Lock()
oldState := cbInfo.State
if cbInfo.State != StateClosed {
cbInfo.State = StateClosed
notifyStateChange(serviceName, oldState, StateClosed)
}
cbInfo.mu.Unlock()
2026-01-01 01:33:59 +08:00
}
2025-12-31 23:38:33 +08:00
2026-01-01 01:33:59 +08:00
// 退出Sentinel资源
entry.Exit()
}
2025-12-31 23:38:33 +08:00
2026-01-01 07:38:00 +08:00
// sendFallbackResponse 发送降级响应
2026-01-01 10:37:01 +08:00
func sendFallbackResponse(r *ghttp.Request, serviceName string, config *CircuitBreakerConfig, reason string) {
2026-01-01 07:38:00 +08:00
if config.EnableFallback && config.FallbackMessage != "" {
// 自定义降级消息
r.Response.WriteStatusExit(503, config.FallbackMessage)
} else {
2026-01-01 10:37:01 +08:00
// 根据原因返回不同的状态码和消息
switch reason {
case "blocked":
r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 熔断保护中,请稍后再试", serviceName))
case "distributed":
r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 分布式熔断中", serviceName))
default:
r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 暂时不可用,请稍后再试", serviceName))
}
2026-01-01 07:38:00 +08:00
}
}
2026-01-01 01:33:59 +08:00
// isSuccessStatusCode 判断HTTP状态码是否成功
func isSuccessStatusCode(resourceName string, statusCode int) bool {
serviceName := strings.TrimPrefix(resourceName, "service:")
if serviceName == "" {
// 默认只认为2xx是成功
return statusCode >= 200 && statusCode < 300
2025-12-31 23:38:33 +08:00
}
2026-01-01 01:33:59 +08:00
// 从配置中获取成功状态码列表
var serviceConfig *CircuitBreakerConfig
if val, ok := circuitBreakerConfigs.Load(serviceName); ok {
serviceConfig = val.(*CircuitBreakerConfig)
}
2025-12-31 23:38:33 +08:00
2026-01-01 01:33:59 +08:00
if serviceConfig != nil && len(serviceConfig.SuccessStatusCodes) > 0 {
for _, code := range serviceConfig.SuccessStatusCodes {
if statusCode == code {
return true
}
}
return false
2025-12-31 23:38:33 +08:00
}
2026-01-01 01:33:59 +08:00
// 默认2xx状态码为成功
return statusCode >= 200 && statusCode < 300
2025-12-31 23:38:33 +08:00
}
2026-01-01 01:33:59 +08:00
// isCircuitBreakerOpenInDistributed 检查分布式熔断状态
func isCircuitBreakerOpenInDistributed(ctx context.Context, resourceName string) bool {
key := fmt.Sprintf("circuit_breaker:%s:state", resourceName)
2026-01-01 10:37:01 +08:00
redis := g.Redis()
if redis == nil {
return false
}
value, err := redis.Get(ctx, key)
2026-01-01 01:33:59 +08:00
if err != nil || value.IsNil() {
return false
2025-12-31 23:38:33 +08:00
}
2026-01-01 01:33:59 +08:00
state := value.String()
return state == "open"
2025-12-31 23:38:33 +08:00
}
2026-01-01 10:48:47 +08:00
// getDistributedLock 获取分布式锁(按服务名分片)
func getDistributedLock(serviceName string) *sync.Mutex {
lock, _ := distributedSyncLocks.LoadOrStore(serviceName, &sync.Mutex{})
return lock.(*sync.Mutex)
}
2026-01-01 07:38:00 +08:00
// syncCircuitBreakerStateToDistributed 同步熔断器状态到分布式存储
2026-01-01 10:37:01 +08:00
func syncCircuitBreakerStateToDistributed(ctx context.Context, resourceName, state string, ttl int) {
2026-01-01 10:48:47 +08:00
// 提取服务名用于锁分片
serviceName := strings.TrimPrefix(resourceName, "service:")
lock := getDistributedLock(serviceName)
lock.Lock()
defer lock.Unlock()
2026-01-01 07:38:00 +08:00
key := fmt.Sprintf("circuit_breaker:%s:state", resourceName)
2026-01-01 10:37:01 +08:00
redis := g.Redis()
if redis == nil {
g.Log().Errorf(ctx, "Redis客户端未初始化无法同步熔断状态")
return
}
_, err := redis.Do(ctx, "SETEX", key, ttl, state)
2026-01-01 07:38:00 +08:00
if err != nil {
g.Log().Errorf(ctx, "同步熔断状态到Redis失败: %v", err)
}
}
2026-01-01 10:37:01 +08:00
// validateCircuitBreakerConfig 验证熔断器配置
func validateCircuitBreakerConfig(config *CircuitBreakerConfig) error {
if config.MaxFailures <= 0 {
return fmt.Errorf("maxFailures必须大于0")
}
if config.FailureRateThreshold < 0 || config.FailureRateThreshold > 1 {
return fmt.Errorf("failureRateThreshold必须在0.0-1.0之间")
}
if len(config.SuccessStatusCodes) == 0 {
return fmt.Errorf("successStatusCodes不能为空")
}
2026-01-01 10:48:47 +08:00
if config.RequestTimeout < 0 || config.RequestTimeout > 300000 {
return fmt.Errorf("requestTimeout必须在0-300000毫秒之间")
}
if config.DistributedTTL < 0 || config.DistributedTTL > 3600 {
return fmt.Errorf("distributedTTL必须在0-3600秒之间")
}
// 验证时间字符串格式(如果缓存为空,说明解析失败)
if config.TimeoutParsed == 0 {
return fmt.Errorf("timeout格式错误应为有效的时间字符串如30s, 1m")
}
if config.SlowRequestThresholdParsed == 0 {
return fmt.Errorf("slowRequestThreshold格式错误应为有效的时间字符串如3s, 1m")
}
2026-01-01 10:37:01 +08:00
return nil
}
// registerStateChangeListeners 注册状态变化监听器
func registerStateChangeListeners() {
2026-01-01 10:48:47 +08:00
// 检查是否已注册,防止重复注册(健壮性改进)
if _, exists := stateChangeListenersRegistered.LoadOrStore("default", true); exists {
return
}
// 注册默认监听器
2026-01-01 10:37:01 +08:00
RegisterStateChangeListener("default", func(serviceName string, fromState, toState CircuitBreakerState) {
g.Log().Infof(context.Background(), "熔断器状态变化: service=%s, %s -> %s",
serviceName, fromState, toState)
})
}
// StateChangeListener 状态变化监听器类型
type StateChangeListener func(serviceName string, fromState, toState CircuitBreakerState)
// RegisterStateChangeListener 注册状态变化监听器
func RegisterStateChangeListener(name string, listener StateChangeListener) {
stateChangeListeners.Store(name, listener)
}
// UnregisterStateChangeListener 取消注册状态变化监听器
func UnregisterStateChangeListener(name string) {
stateChangeListeners.Delete(name)
}
// notifyStateChange 通知所有监听器状态变化
func notifyStateChange(serviceName string, fromState, toState CircuitBreakerState) {
stateChangeListeners.Range(func(key, value interface{}) bool {
if listener, ok := value.(StateChangeListener); ok {
listener(serviceName, fromState, toState)
}
return true
})
}
2025-12-31 23:38:33 +08:00
// CircuitBreakerHealthCheckHandler 熔断器健康检查接口
func CircuitBreakerHealthCheckHandler(r *ghttp.Request) {
2026-01-01 01:33:59 +08:00
status := make(map[string]interface{})
2026-01-01 07:38:00 +08:00
totalServices := 0
openServices := 0
2026-01-01 01:33:59 +08:00
// 遍历所有熔断器
circuitBreakers.Range(func(key, value interface{}) bool {
serviceName := key.(string)
cbInfo := value.(*CircuitBreakerInfo)
2026-01-01 07:38:00 +08:00
totalServices++
cbInfo.mu.RLock()
isOpen := cbInfo.State == StateOpen
if isOpen {
openServices++
2026-01-01 01:33:59 +08:00
}
2026-01-01 10:48:47 +08:00
// 从Metrics中读取数据修复数据准确性问题
2026-01-01 01:33:59 +08:00
status[serviceName] = map[string]interface{}{
2026-01-01 07:38:00 +08:00
"resource": cbInfo.ResourceName,
"state": string(cbInfo.State),
"lastOpenTime": cbInfo.LastOpenTime,
"nextRetryTime": cbInfo.NextRetryTime,
"totalRequests": cbInfo.Metrics.TotalRequests.Load(),
"passRequests": cbInfo.Metrics.PassRequests.Load(),
"blockRequests": cbInfo.Metrics.BlockRequests.Load(),
"failureRequests": cbInfo.Metrics.FailureRequests.Load(),
"openCount": cbInfo.Metrics.OpenCount.Load(),
2026-01-01 01:33:59 +08:00
}
2026-01-01 07:38:00 +08:00
cbInfo.mu.RUnlock()
2026-01-01 01:33:59 +08:00
return true
})
2025-12-31 23:38:33 +08:00
2026-01-01 07:38:00 +08:00
summary := map[string]interface{}{
"totalServices": totalServices,
"openServices": openServices,
"closedServices": totalServices - openServices,
}
2025-12-31 23:38:33 +08:00
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
2026-01-01 01:33:59 +08:00
Message: "熔断器状态",
2026-01-01 07:38:00 +08:00
Data: map[string]interface{}{
"summary": summary,
"services": status,
},
2025-12-31 23:38:33 +08:00
})
}
2026-01-01 01:33:59 +08:00
// CircuitBreakerResetHandler 熔断器手动重置接口(仅限管理后台调用)
2025-12-31 23:38:33 +08:00
func CircuitBreakerResetHandler(r *ghttp.Request) {
2026-01-01 01:33:59 +08:00
serviceName := r.Get("service").String()
2025-12-31 23:38:33 +08:00
if serviceName == "" {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 400,
2026-01-01 01:33:59 +08:00
Message: "缺少service参数",
2025-12-31 23:38:33 +08:00
})
return
}
2026-01-01 01:33:59 +08:00
resourceName := fmt.Sprintf("service:%s", serviceName)
2025-12-31 23:38:33 +08:00
2026-01-01 07:38:00 +08:00
// 获取当前服务的所有规则
currentRules := circuitbreaker.GetRulesOfResource(resourceName)
// 只删除当前服务的规则
if len(currentRules) > 0 {
_, err := circuitbreaker.LoadRulesOfResource(resourceName, []*circuitbreaker.Rule{})
if err != nil {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 500,
Message: fmt.Sprintf("重置熔断器失败: %v", err),
})
return
}
2025-12-31 23:38:33 +08:00
}
2026-01-01 07:38:00 +08:00
// 重新加载该服务的规则
2026-01-01 01:33:59 +08:00
if val, ok := circuitBreakerConfigs.Load(serviceName); ok {
config := val.(*CircuitBreakerConfig)
2026-01-01 07:38:00 +08:00
err := initServiceCircuitBreaker(serviceName, config)
2026-01-01 01:33:59 +08:00
if err != nil {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 500,
Message: fmt.Sprintf("重置熔断器失败: %v", err),
})
return
}
}
// 更新内存状态
if val, ok := circuitBreakers.Load(serviceName); ok {
cbInfo := val.(*CircuitBreakerInfo)
2026-01-01 07:38:00 +08:00
cbInfo.mu.Lock()
2026-01-01 01:33:59 +08:00
cbInfo.State = StateClosed
cbInfo.LastOpenTime = time.Time{}
cbInfo.NextRetryTime = time.Time{}
2026-01-01 07:38:00 +08:00
cbInfo.mu.Unlock()
2026-01-01 01:33:59 +08:00
}
// 重置分布式状态(如果启用)
2026-01-01 10:37:01 +08:00
if val, ok := circuitBreakerConfigs.Load(serviceName); ok {
config := val.(*CircuitBreakerConfig)
if config.DistributedTTL > 0 {
key := fmt.Sprintf("circuit_breaker:%s:state", resourceName)
redis := g.Redis()
if redis != nil {
_, _ = redis.Del(r.GetCtx(), key)
}
}
2026-01-01 01:33:59 +08:00
}
g.Log().Infof(r.GetCtx(), "熔断器已手动重置: %s", resourceName)
2025-12-31 23:38:33 +08:00
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
2026-01-01 01:33:59 +08:00
Message: fmt.Sprintf("服务 '%s' 的熔断器已重置", serviceName),
2025-12-31 23:38:33 +08:00
})
}
2026-01-01 07:38:00 +08:00
// CircuitBreakerReloadHandler 熔断器配置重载接口
func CircuitBreakerReloadHandler(r *ghttp.Request) {
serviceName := r.Get("service").String()
if serviceName == "" {
2026-01-01 10:37:01 +08:00
// 重载所有服务 - 扫描配置文件中所有服务
services := g.Cfg().MustGet(r.GetCtx(), "circuitBreaker").Map()
// 过滤出服务名
2026-01-01 10:48:47 +08:00
serviceNames := filterServiceNames(services)
2026-01-01 10:37:01 +08:00
2026-01-01 07:38:00 +08:00
successCount := 0
failCount := 0
2026-01-01 10:37:01 +08:00
for _, service := range serviceNames {
2026-01-01 07:38:00 +08:00
err := ReloadCircuitBreakerConfig(service)
if err != nil {
g.Log().Errorf(r.GetCtx(), "服务 %s 配置重载失败: %v", service, err)
failCount++
} else {
successCount++
}
}
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: fmt.Sprintf("配置重载完成: 成功 %d, 失败 %d", successCount, failCount),
Data: map[string]interface{}{
"success": successCount,
"failed": failCount,
},
})
return
}
// 重载单个服务
err := ReloadCircuitBreakerConfig(serviceName)
if err != nil {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 500,
Message: fmt.Sprintf("重载失败: %v", err),
})
return
}
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: fmt.Sprintf("服务 '%s' 的熔断器配置已重载", serviceName),
})
}