Files
common/middleware/circuit_breaker.go
2026-03-12 08:51:25 +08:00

695 lines
21 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package middleware
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/ghttp"
)
// CircuitBreakerState 熔断器状态
type CircuitBreakerState string
const (
StateClosed CircuitBreakerState = "closed" // 关闭:正常状态
StateOpen CircuitBreakerState = "open" // 开启:熔断状态
StateHalfOpen CircuitBreakerState = "half-open" // 半开:尝试恢复状态
)
// CircuitBreakerConfig 熔断器配置
type CircuitBreakerConfig struct {
Enabled bool // 是否启用熔断器
MaxFailures int // 连续失败次数
Timeout string // 熔断超时时间
SuccessStatusCodes []int // 视为成功的HTTP状态码
SlowRequestThreshold string // 慢请求阈值
EnableSlidingWindow bool // 是否启用滑动窗口
FailureRateThreshold float64 // 失败率阈值
EnableFallback bool // 是否启用降级
FallbackMessage string // 降级提示消息
RequestTimeout int // 请求超时时间(毫秒)
DistributedTTL int // 分布式熔断状态TTL
}
// CircuitBreakerMetrics 熔断器指标
type CircuitBreakerMetrics struct {
TotalRequests atomic.Int64 // 总请求数
PassRequests atomic.Int64 // 通过请求数
BlockRequests atomic.Int64 // 阻塞请求数
FailureRequests atomic.Int64 // 失败请求数
OpenCount atomic.Int64 // 熔断开启次数
}
// CircuitBreakerInfo 熔断器信息
type CircuitBreakerInfo struct {
ResourceName string `json:"resourceName"` // 资源名称
State CircuitBreakerState `json:"state"` // 当前状态
Config *CircuitBreakerConfig `json:"config"` // 配置信息
FailCount int64 `json:"failCount"` // 失败次数
TotalCount int64 `json:"totalCount"` // 总请求数
LastOpenTime time.Time `json:"lastOpenTime"` // 上次熔断时间
NextRetryTime time.Time `json:"nextRetryTime"` // 下次重试时间
Metrics *CircuitBreakerMetrics `json:"metrics"` // 指标统计
mu sync.RWMutex // 保护状态更新
}
var (
// circuitBreakers 存储所有熔断器状态(用于健康检查)
circuitBreakers sync.Map
// circuitBreakerConfigs 熔断器配置缓存
circuitBreakerConfigs sync.Map
// distributedSyncLock 分布式同步锁
distributedSyncLock sync.Mutex
// stateChangeListeners 状态变化监听器
stateChangeListeners sync.Map
)
// InitCircuitBreaker 初始化Sentinel熔断器
func InitCircuitBreaker() error {
ctx := context.Background()
// 初始化Sentinel
err := api.InitDefault()
if err != nil {
return fmt.Errorf("Sentinel初始化失败: %v", err)
}
// 注册熔断器状态变化监听器
registerStateChangeListeners()
g.Log().Infof(ctx, "Sentinel熔断器初始化成功")
// 扫描配置文件中所有配置了熔断器的服务
services := g.Cfg().MustGet(ctx, "circuitBreaker").Map()
// 过滤掉非服务配置的key
serviceNames := make([]string, 0)
for key := range services {
if key != "services" && key != "enableDistributed" && key != "requestTimeout" && key != "distributedTTL" {
serviceNames = append(serviceNames, key)
}
}
if len(serviceNames) == 0 {
g.Log().Infof(ctx, "未配置任何服务熔断器")
return nil
}
// 为每个服务创建熔断器
enabledCount := 0
for _, serviceName := range serviceNames {
serviceConfig := loadServiceCircuitBreakerConfig(serviceName)
if serviceConfig != nil && serviceConfig.Enabled {
circuitBreakerConfigs.Store(serviceName, serviceConfig)
initErr := initServiceCircuitBreaker(serviceName, serviceConfig)
if initErr != nil {
g.Log().Errorf(ctx, "服务 %s 熔断器初始化失败: %v", serviceName, initErr)
} else {
g.Log().Infof(ctx, "服务 %s 熔断器初始化成功", serviceName)
enabledCount++
}
} else {
g.Log().Infof(ctx, "服务 %s 熔断器未启用", serviceName)
}
}
g.Log().Infof(ctx, "共初始化 %d 个服务熔断器,其中 %d 个已启用", len(serviceNames), enabledCount)
return nil
}
// ReloadCircuitBreakerConfig 动态重新加载熔断器配置
func ReloadCircuitBreakerConfig(serviceName string) error {
ctx := context.Background()
// 重新加载配置
serviceConfig := loadServiceCircuitBreakerConfig(serviceName)
if serviceConfig == nil {
return fmt.Errorf("未找到服务 %s 的配置", serviceName)
}
// 更新配置缓存
circuitBreakerConfigs.Store(serviceName, serviceConfig)
// 重新初始化熔断器
err := initServiceCircuitBreaker(serviceName, serviceConfig)
if err != nil {
return fmt.Errorf("重新初始化熔断器失败: %v", err)
}
g.Log().Infof(ctx, "服务 %s 熔断器配置重新加载成功", serviceName)
return nil
}
// loadServiceCircuitBreakerConfig 加载单个服务的熔断器配置
func loadServiceCircuitBreakerConfig(serviceName string) *CircuitBreakerConfig {
ctx := context.Background()
key := fmt.Sprintf("circuitBreaker.%s", serviceName)
enabled := g.Cfg().MustGet(ctx, key+".enabled", true).Bool()
maxFailures := g.Cfg().MustGet(ctx, key+".maxFailures", 5).Int()
timeout := g.Cfg().MustGet(ctx, key+".timeout", "60s").String()
slowRequestThreshold := g.Cfg().MustGet(ctx, key+".slowRequestThreshold", "3s").String()
enableSlidingWindow := g.Cfg().MustGet(ctx, key+".enableSlidingWindow", false).Bool()
failureRateThreshold := g.Cfg().MustGet(ctx, key+".failureRateThreshold", 0.5).Float64()
enableFallback := g.Cfg().MustGet(ctx, key+".enableFallback", false).Bool()
fallbackMessage := g.Cfg().MustGet(ctx, key+".fallbackMessage", "").String()
requestTimeout := g.Cfg().MustGet(ctx, key+".requestTimeout", 30000).Int()
distributedTTL := g.Cfg().MustGet(ctx, key+".distributedTTL", 300).Int()
// 解析成功状态码
successCodes := g.Cfg().MustGet(ctx, key+".successStatusCodes", "200,201,204").String()
statusCodes := parseStatusCodes(successCodes)
return &CircuitBreakerConfig{
Enabled: enabled,
MaxFailures: maxFailures,
Timeout: timeout,
SuccessStatusCodes: statusCodes,
SlowRequestThreshold: slowRequestThreshold,
EnableSlidingWindow: enableSlidingWindow,
FailureRateThreshold: failureRateThreshold,
EnableFallback: enableFallback,
FallbackMessage: fallbackMessage,
RequestTimeout: requestTimeout,
DistributedTTL: distributedTTL,
}
}
// parseStatusCodes 解析HTTP状态码
func parseStatusCodes(str string) []int {
parts := strings.Split(str, ",")
codes := make([]int, 0, len(parts))
for _, part := range parts {
var code int
if _, err := fmt.Sscanf(strings.TrimSpace(part), "%d", &code); err == nil {
codes = append(codes, code)
}
}
return codes
}
// initServiceCircuitBreaker 初始化服务熔断器
func initServiceCircuitBreaker(serviceName string, config *CircuitBreakerConfig) error {
// 验证配置参数
if err := validateCircuitBreakerConfig(config); err != nil {
return fmt.Errorf("配置验证失败: %v", err)
}
timeout, err := time.ParseDuration(config.Timeout)
if err != nil {
return fmt.Errorf("解析超时时间失败: %v", err)
}
slowRequestThreshold, err := time.ParseDuration(config.SlowRequestThreshold)
if err != nil {
return fmt.Errorf("解析慢请求阈值失败: %v", err)
}
resourceName := fmt.Sprintf("service:%s", serviceName)
var rule []*circuitbreaker.Rule
if config.EnableSlidingWindow {
// 使用滑动窗口统计(更精确)- 慢调用比例策略
rule = []*circuitbreaker.Rule{
{
Resource: resourceName,
Strategy: circuitbreaker.SlowRequestRatio,
RetryTimeoutMs: uint32(timeout.Milliseconds()),
MinRequestAmount: uint64(config.MaxFailures),
StatIntervalMs: 1000,
StatSlidingWindowBucketCount: 10,
MaxAllowedRtMs: uint64(slowRequestThreshold.Milliseconds()),
Threshold: config.FailureRateThreshold,
},
}
} else {
// 使用连续失败计数(更简单快速)- 异常数策略
rule = []*circuitbreaker.Rule{
{
Resource: resourceName,
Strategy: circuitbreaker.ErrorCount,
RetryTimeoutMs: uint32(timeout.Milliseconds()),
MinRequestAmount: uint64(config.MaxFailures),
StatIntervalMs: 1000, // 1秒统计窗口
Threshold: float64(config.MaxFailures),
},
}
}
// 加载规则到Sentinel
_, err = circuitbreaker.LoadRules(rule)
if err != nil {
return fmt.Errorf("加载熔断规则失败: %v", err)
}
// 初始化熔断器信息
cbInfo := &CircuitBreakerInfo{
ResourceName: resourceName,
State: StateClosed,
Config: config,
Metrics: &CircuitBreakerMetrics{},
}
circuitBreakers.Store(serviceName, cbInfo)
strategy := "error_count"
if config.EnableSlidingWindow {
strategy = "slow_ratio"
}
g.Log().Infof(context.Background(), "服务 %s 熔断器初始化成功: resource=%s, strategy=%s, timeout=%v, threshold=%.2f",
serviceName, resourceName, strategy, timeout, rule[0].Threshold)
return nil
}
// CircuitBreakerMiddleware 熔断降级中间件使用阿里Sentinel
func CircuitBreakerMiddleware(r *ghttp.Request) {
startTime := time.Now()
ctx := r.GetCtx()
// 从URL路径提取服务名
pathParts := strings.Split(strings.Trim(r.URL.Path, "/"), "/")
if len(pathParts) == 0 {
r.Middleware.Next()
return
}
serviceName := pathParts[0]
// 获取熔断器配置
val, ok := circuitBreakerConfigs.Load(serviceName)
if !ok {
// 未配置熔断器,直接放行
r.Middleware.Next()
return
}
config := val.(*CircuitBreakerConfig)
if !config.Enabled {
// 熔断器未启用,直接放行
r.Middleware.Next()
return
}
// 获取熔断器信息
cbInfoVal, ok := circuitBreakers.Load(serviceName)
if !ok {
r.Middleware.Next()
return
}
cbInfo := cbInfoVal.(*CircuitBreakerInfo)
cbInfo.Metrics.TotalRequests.Add(1)
// 设置请求超时(使用服务独立配置)
if config.RequestTimeout > 0 {
ctx, cancel := context.WithTimeout(ctx, time.Duration(config.RequestTimeout)*time.Millisecond)
r.SetCtx(ctx)
defer cancel()
}
resourceName := fmt.Sprintf("service:%s", serviceName)
// 检查是否启用分布式熔断
if config.DistributedTTL > 0 {
if isCircuitBreakerOpenInDistributed(ctx, resourceName) {
cbInfo.Metrics.BlockRequests.Add(1)
g.Log().Warningf(ctx, "分布式熔断触发: %s", resourceName)
notifyStateChange(serviceName, StateOpen, StateOpen)
sendFallbackResponse(r, serviceName, config, "distributed")
return
}
}
// 使用Sentinel进行熔断保护
entry, blockError := api.Entry(resourceName)
if blockError != nil {
// 被熔断拦截
cbInfo.Metrics.BlockRequests.Add(1)
cbInfo.Metrics.OpenCount.Add(1)
g.Log().Warningf(ctx, "熔断触发: %s, reason: %v", resourceName, blockError)
// 更新熔断器状态
cbInfo.mu.Lock()
oldState := cbInfo.State
cbInfo.State = StateOpen
cbInfo.LastOpenTime = time.Now()
if timeout, err := time.ParseDuration(config.Timeout); err == nil {
cbInfo.NextRetryTime = time.Now().Add(timeout)
}
cbInfo.mu.Unlock()
// 通知状态变化(如果状态改变)
if oldState != StateOpen {
notifyStateChange(serviceName, oldState, StateOpen)
}
// 同步到分布式存储
if config.DistributedTTL > 0 {
syncCircuitBreakerStateToDistributed(ctx, resourceName, "open", config.DistributedTTL)
}
sendFallbackResponse(r, serviceName, config, "blocked")
return
}
// 执行后续中间件和业务逻辑
r.Middleware.Next()
// 记录请求结果基于HTTP状态码
statusCode := r.Response.Status
duration := time.Since(startTime)
if !isSuccessStatusCode(resourceName, statusCode) {
// 记录异常
cbInfo.Metrics.FailureRequests.Add(1)
api.TraceError(entry, fmt.Errorf("request failed with status: %d", statusCode))
g.Log().Debugf(ctx, "服务 %s 请求失败: status=%d, duration=%v", serviceName, statusCode, duration)
} else {
cbInfo.Metrics.PassRequests.Add(1)
// 更新状态为关闭(如果之前是开启状态)
cbInfo.mu.Lock()
oldState := cbInfo.State
if cbInfo.State != StateClosed {
cbInfo.State = StateClosed
notifyStateChange(serviceName, oldState, StateClosed)
}
cbInfo.mu.Unlock()
}
// 退出Sentinel资源
entry.Exit()
}
// sendFallbackResponse 发送降级响应
func sendFallbackResponse(r *ghttp.Request, serviceName string, config *CircuitBreakerConfig, reason string) {
if config.EnableFallback && config.FallbackMessage != "" {
// 自定义降级消息
r.Response.WriteStatusExit(503, config.FallbackMessage)
} else {
// 根据原因返回不同的状态码和消息
switch reason {
case "timeout":
r.Response.WriteStatusExit(504, fmt.Sprintf("服务 '%s' 响应超时", serviceName))
case "blocked":
r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 熔断保护中,请稍后再试", serviceName))
case "distributed":
r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 分布式熔断中", serviceName))
default:
r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 暂时不可用,请稍后再试", serviceName))
}
}
}
// isSuccessStatusCode 判断HTTP状态码是否成功
func isSuccessStatusCode(resourceName string, statusCode int) bool {
serviceName := strings.TrimPrefix(resourceName, "service:")
if serviceName == "" {
// 默认只认为2xx是成功
return statusCode >= 200 && statusCode < 300
}
// 从配置中获取成功状态码列表
var serviceConfig *CircuitBreakerConfig
if val, ok := circuitBreakerConfigs.Load(serviceName); ok {
serviceConfig = val.(*CircuitBreakerConfig)
}
if serviceConfig != nil && len(serviceConfig.SuccessStatusCodes) > 0 {
for _, code := range serviceConfig.SuccessStatusCodes {
if statusCode == code {
return true
}
}
return false
}
// 默认2xx状态码为成功
return statusCode >= 200 && statusCode < 300
}
// isCircuitBreakerOpenInDistributed 检查分布式熔断状态
func isCircuitBreakerOpenInDistributed(ctx context.Context, resourceName string) bool {
key := fmt.Sprintf("circuit_breaker:%s:state", resourceName)
redis := g.Redis()
if redis == nil {
return false
}
value, err := redis.Get(ctx, key)
if err != nil || value.IsNil() {
return false
}
state := value.String()
return state == "open"
}
// syncCircuitBreakerStateToDistributed 同步熔断器状态到分布式存储
func syncCircuitBreakerStateToDistributed(ctx context.Context, resourceName, state string, ttl int) {
distributedSyncLock.Lock()
defer distributedSyncLock.Unlock()
key := fmt.Sprintf("circuit_breaker:%s:state", resourceName)
redis := g.Redis()
if redis == nil {
g.Log().Errorf(ctx, "Redis客户端未初始化无法同步熔断状态")
return
}
_, err := redis.Do(ctx, "SETEX", key, ttl, state)
if err != nil {
g.Log().Errorf(ctx, "同步熔断状态到Redis失败: %v", err)
}
}
// validateCircuitBreakerConfig 验证熔断器配置
func validateCircuitBreakerConfig(config *CircuitBreakerConfig) error {
if config.MaxFailures <= 0 {
return fmt.Errorf("maxFailures必须大于0")
}
if config.FailureRateThreshold < 0 || config.FailureRateThreshold > 1 {
return fmt.Errorf("failureRateThreshold必须在0.0-1.0之间")
}
if len(config.SuccessStatusCodes) == 0 {
return fmt.Errorf("successStatusCodes不能为空")
}
return nil
}
// registerStateChangeListeners 注册状态变化监听器
func registerStateChangeListeners() {
// 示例:注册默认监听器
RegisterStateChangeListener("default", func(serviceName string, fromState, toState CircuitBreakerState) {
g.Log().Infof(context.Background(), "熔断器状态变化: service=%s, %s -> %s",
serviceName, fromState, toState)
})
}
// StateChangeListener 状态变化监听器类型
type StateChangeListener func(serviceName string, fromState, toState CircuitBreakerState)
// RegisterStateChangeListener 注册状态变化监听器
func RegisterStateChangeListener(name string, listener StateChangeListener) {
stateChangeListeners.Store(name, listener)
}
// UnregisterStateChangeListener 取消注册状态变化监听器
func UnregisterStateChangeListener(name string) {
stateChangeListeners.Delete(name)
}
// notifyStateChange 通知所有监听器状态变化
func notifyStateChange(serviceName string, fromState, toState CircuitBreakerState) {
stateChangeListeners.Range(func(key, value interface{}) bool {
if listener, ok := value.(StateChangeListener); ok {
listener(serviceName, fromState, toState)
}
return true
})
}
// CircuitBreakerHealthCheckHandler 熔断器健康检查接口
func CircuitBreakerHealthCheckHandler(r *ghttp.Request) {
status := make(map[string]interface{})
totalServices := 0
openServices := 0
// 遍历所有熔断器
circuitBreakers.Range(func(key, value interface{}) bool {
serviceName := key.(string)
cbInfo := value.(*CircuitBreakerInfo)
totalServices++
cbInfo.mu.RLock()
isOpen := cbInfo.State == StateOpen
if isOpen {
openServices++
}
status[serviceName] = map[string]interface{}{
"resource": cbInfo.ResourceName,
"state": string(cbInfo.State),
"lastOpenTime": cbInfo.LastOpenTime,
"nextRetryTime": cbInfo.NextRetryTime,
"totalRequests": cbInfo.Metrics.TotalRequests.Load(),
"passRequests": cbInfo.Metrics.PassRequests.Load(),
"blockRequests": cbInfo.Metrics.BlockRequests.Load(),
"failureRequests": cbInfo.Metrics.FailureRequests.Load(),
"openCount": cbInfo.Metrics.OpenCount.Load(),
}
cbInfo.mu.RUnlock()
return true
})
summary := map[string]interface{}{
"totalServices": totalServices,
"openServices": openServices,
"closedServices": totalServices - openServices,
}
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: "熔断器状态",
Data: map[string]interface{}{
"summary": summary,
"services": status,
},
})
}
// CircuitBreakerResetHandler 熔断器手动重置接口(仅限管理后台调用)
func CircuitBreakerResetHandler(r *ghttp.Request) {
serviceName := r.Get("service").String()
if serviceName == "" {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 400,
Message: "缺少service参数",
})
return
}
resourceName := fmt.Sprintf("service:%s", serviceName)
// 获取当前服务的所有规则
currentRules := circuitbreaker.GetRulesOfResource(resourceName)
// 只删除当前服务的规则
if len(currentRules) > 0 {
_, err := circuitbreaker.LoadRulesOfResource(resourceName, []*circuitbreaker.Rule{})
if err != nil {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 500,
Message: fmt.Sprintf("重置熔断器失败: %v", err),
})
return
}
}
// 重新加载该服务的规则
if val, ok := circuitBreakerConfigs.Load(serviceName); ok {
config := val.(*CircuitBreakerConfig)
err := initServiceCircuitBreaker(serviceName, config)
if err != nil {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 500,
Message: fmt.Sprintf("重置熔断器失败: %v", err),
})
return
}
}
// 更新内存状态
if val, ok := circuitBreakers.Load(serviceName); ok {
cbInfo := val.(*CircuitBreakerInfo)
cbInfo.mu.Lock()
cbInfo.State = StateClosed
cbInfo.LastOpenTime = time.Time{}
cbInfo.NextRetryTime = time.Time{}
cbInfo.mu.Unlock()
}
// 重置分布式状态(如果启用)
if val, ok := circuitBreakerConfigs.Load(serviceName); ok {
config := val.(*CircuitBreakerConfig)
if config.DistributedTTL > 0 {
key := fmt.Sprintf("circuit_breaker:%s:state", resourceName)
redis := g.Redis()
if redis != nil {
_, _ = redis.Del(r.GetCtx(), key)
}
}
}
g.Log().Infof(r.GetCtx(), "熔断器已手动重置: %s", resourceName)
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: fmt.Sprintf("服务 '%s' 的熔断器已重置", serviceName),
})
}
// CircuitBreakerReloadHandler 熔断器配置重载接口
func CircuitBreakerReloadHandler(r *ghttp.Request) {
serviceName := r.Get("service").String()
if serviceName == "" {
// 重载所有服务 - 扫描配置文件中所有服务
services := g.Cfg().MustGet(r.GetCtx(), "circuitBreaker").Map()
// 过滤出服务名
serviceNames := make([]string, 0)
for key := range services {
if key != "services" && key != "enableDistributed" && key != "requestTimeout" && key != "distributedTTL" {
serviceNames = append(serviceNames, key)
}
}
successCount := 0
failCount := 0
for _, service := range serviceNames {
err := ReloadCircuitBreakerConfig(service)
if err != nil {
g.Log().Errorf(r.GetCtx(), "服务 %s 配置重载失败: %v", service, err)
failCount++
} else {
successCount++
}
}
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: fmt.Sprintf("配置重载完成: 成功 %d, 失败 %d", successCount, failCount),
Data: map[string]interface{}{
"success": successCount,
"failed": failCount,
},
})
return
}
// 重载单个服务
err := ReloadCircuitBreakerConfig(serviceName)
if err != nil {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 500,
Message: fmt.Sprintf("重载失败: %v", err),
})
return
}
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: fmt.Sprintf("服务 '%s' 的熔断器配置已重载", serviceName),
})
}