Files
common/middleware/circuit_breaker.go
2026-03-12 08:51:25 +08:00

420 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package middleware
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/ghttp"
)
// CircuitBreakerState 熔断器状态
type CircuitBreakerState string
const (
StateClosed CircuitBreakerState = "closed" // 关闭:正常状态
StateOpen CircuitBreakerState = "open" // 开启:熔断状态
StateHalfOpen CircuitBreakerState = "half-open" // 半开:尝试恢复状态
)
// CircuitBreakerConfig 熔断器配置
type CircuitBreakerConfig struct {
MaxFailures int // 连续失败次数
Timeout string // 熔断超时时间
HalfOpenSuccess int // 半开状态连续成功次数
SuccessStatusCodes []int // 视为成功的HTTP状态码
SlowRequestThreshold string // 慢请求阈值
HalfOpenRequestSampleRate float64 // 半开状态请求采样率
Dimension string // 熔断器维度: service/ip/user
EnableSlidingWindow bool // 是否启用滑动窗口
SlidingWindowSize string // 滑动窗口大小
FailureRateThreshold float64 // 失败率阈值
}
// CircuitBreakerInfo 熔断器信息
type CircuitBreakerInfo struct {
ResourceName string `json:"resourceName"` // 资源名称
State CircuitBreakerState `json:"state"` // 当前状态
Config *CircuitBreakerConfig `json:"config"` // 配置信息
FailCount int64 `json:"failCount"` // 失败次数
TotalCount int64 `json:"totalCount"` // 总请求数
LastOpenTime time.Time `json:"lastOpenTime"` // 上次熔断时间
NextRetryTime time.Time `json:"nextRetryTime"` // 下次重试时间
}
var (
// circuitBreakers 存储所有熔断器状态(用于健康检查)
circuitBreakers sync.Map
// enableDistributed 是否启用分布式熔断
enableDistributed = false
// circuitBreakerConfigs 熔断器配置缓存
circuitBreakerConfigs sync.Map
)
// InitCircuitBreaker 初始化Sentinel熔断器
func InitCircuitBreaker() error {
ctx := context.Background()
// 从配置文件读取是否启用分布式熔断
enableDistributed = g.Cfg().MustGet(ctx, "circuitBreaker.enableDistributed", false).Bool()
// 初始化Sentinel
err := api.InitDefault()
if err != nil {
return fmt.Errorf("Sentinel初始化失败: %v", err)
}
g.Log().Infof(ctx, "Sentinel熔断器初始化成功分布式熔断: %v", enableDistributed)
// 加载所有服务的熔断器配置
loadCircuitBreakerConfigs()
// 为每个服务创建熔断器
services := []string{
"customerService", "order", "assets", "cid", "oss",
"wallet", "market", "knapsack",
}
for _, service := range services {
serviceConfig := loadServiceCircuitBreakerConfig(service)
if serviceConfig != nil {
circuitBreakerConfigs.Store(service, serviceConfig)
initErr := initServiceCircuitBreaker(service, serviceConfig)
if initErr != nil {
g.Log().Errorf(ctx, "服务 %s 熔断器初始化失败: %v", service, initErr)
}
}
}
return nil
}
// loadCircuitBreakerConfigs 加载熔断器配置
func loadCircuitBreakerConfigs() {
services := []string{
"customerService", "order", "assets", "cid", "oss",
"wallet", "market", "knapsack",
}
for _, service := range services {
config := loadServiceCircuitBreakerConfig(service)
if config != nil {
circuitBreakerConfigs.Store(service, config)
}
}
}
// loadServiceCircuitBreakerConfig 加载单个服务的熔断器配置
func loadServiceCircuitBreakerConfig(serviceName string) *CircuitBreakerConfig {
ctx := context.Background()
key := fmt.Sprintf("circuitBreaker.%s", serviceName)
maxFailures := g.Cfg().MustGet(ctx, key+".maxFailures", 5).Int()
timeout := g.Cfg().MustGet(ctx, key+".timeout", "60s").String()
halfOpenSuccess := g.Cfg().MustGet(ctx, key+".halfOpenSuccess", 2).Int()
slowRequestThreshold := g.Cfg().MustGet(ctx, key+".slowRequestThreshold", "3s").String()
dimension := g.Cfg().MustGet(ctx, key+".dimension", "service").String()
enableSlidingWindow := g.Cfg().MustGet(ctx, key+".enableSlidingWindow", false).Bool()
slidingWindowSize := g.Cfg().MustGet(ctx, key+".slidingWindowSize", "60s").String()
failureRateThreshold := g.Cfg().MustGet(ctx, key+".failureRateThreshold", 0.5).Float64()
halfOpenRequestSampleRate := g.Cfg().MustGet(ctx, key+".halfOpenRequestSampleRate", 1.0).Float64()
// 解析成功状态码
successCodes := g.Cfg().MustGet(ctx, key+".successStatusCodes", "200,201,204").String()
statusCodes := parseStatusCodes(successCodes)
return &CircuitBreakerConfig{
MaxFailures: maxFailures,
Timeout: timeout,
HalfOpenSuccess: halfOpenSuccess,
SuccessStatusCodes: statusCodes,
SlowRequestThreshold: slowRequestThreshold,
HalfOpenRequestSampleRate: halfOpenRequestSampleRate,
Dimension: dimension,
EnableSlidingWindow: enableSlidingWindow,
SlidingWindowSize: slidingWindowSize,
FailureRateThreshold: failureRateThreshold,
}
}
// parseStatusCodes 解析HTTP状态码
func parseStatusCodes(str string) []int {
parts := strings.Split(str, ",")
codes := make([]int, 0, len(parts))
for _, part := range parts {
var code int
if _, err := fmt.Sscanf(strings.TrimSpace(part), "%d", &code); err == nil {
codes = append(codes, code)
}
}
return codes
}
// initServiceCircuitBreaker 初始化服务熔断器
func initServiceCircuitBreaker(serviceName string, config *CircuitBreakerConfig) error {
timeout, _ := time.ParseDuration(config.Timeout)
slowRequestThreshold, _ := time.ParseDuration(config.SlowRequestThreshold)
_, _ = time.ParseDuration(config.SlidingWindowSize)
resourceName := fmt.Sprintf("service:%s", serviceName)
var rule []*circuitbreaker.Rule
if config.EnableSlidingWindow {
// 使用滑动窗口统计(更精确)- 慢调用比例策略
rule = []*circuitbreaker.Rule{
{
Resource: resourceName,
Strategy: circuitbreaker.SlowRequestRatio,
RetryTimeoutMs: uint32(timeout.Milliseconds()),
MinRequestAmount: uint64(config.MaxFailures),
StatIntervalMs: 1000,
StatSlidingWindowBucketCount: 10,
MaxAllowedRtMs: uint64(slowRequestThreshold.Milliseconds()),
Threshold: config.FailureRateThreshold,
},
}
} else {
// 使用连续失败计数(更简单快速)- 异常数策略
rule = []*circuitbreaker.Rule{
{
Resource: resourceName,
Strategy: circuitbreaker.ErrorCount,
RetryTimeoutMs: uint32(timeout.Milliseconds()),
MinRequestAmount: uint64(config.MaxFailures),
StatIntervalMs: 1000, // 1秒统计窗口
Threshold: float64(config.MaxFailures),
},
}
}
// 加载规则到Sentinel
_, err := circuitbreaker.LoadRules(rule)
if err != nil {
return fmt.Errorf("加载熔断规则失败: %v", err)
}
// 初始化熔断器信息
cbInfo := &CircuitBreakerInfo{
ResourceName: resourceName,
State: StateClosed,
Config: config,
}
circuitBreakers.Store(serviceName, cbInfo)
strategy := "error_count"
if config.EnableSlidingWindow {
strategy = "slow_ratio"
}
g.Log().Infof(context.Background(), "服务 %s 熔断器初始化成功: resource=%s, strategy=%s, timeout=%v",
serviceName, resourceName, strategy, timeout)
return nil
}
// CircuitBreakerMiddleware 熔断降级中间件使用阿里Sentinel
func CircuitBreakerMiddleware(r *ghttp.Request) {
// 从URL路径提取服务名
pathParts := strings.Split(strings.Trim(r.URL.Path, "/"), "/")
if len(pathParts) == 0 {
r.Middleware.Next()
return
}
serviceName := pathParts[0]
resourceName := fmt.Sprintf("service:%s", serviceName)
// 检查是否启用分布式熔断
if enableDistributed {
// 检查Redis中的熔断状态
if isCircuitBreakerOpenInDistributed(r.GetCtx(), resourceName) {
g.Log().Warningf(r.GetCtx(), "分布式熔断触发: %s", resourceName)
r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 熔断中,请稍后再试", serviceName))
return
}
}
// 使用Sentinel进行熔断保护
entry, blockError := api.Entry(resourceName)
if blockError != nil {
// 被熔断拦截
g.Log().Warningf(r.GetCtx(), "熔断触发: %s, reason: %v", resourceName, blockError)
// 更新熔断器状态
if val, ok := circuitBreakers.Load(serviceName); ok {
cbInfo := val.(*CircuitBreakerInfo)
cbInfo.State = StateOpen
cbInfo.LastOpenTime = time.Now()
if timeout, err := time.ParseDuration(cbInfo.Config.Timeout); err == nil {
cbInfo.NextRetryTime = time.Now().Add(timeout)
}
circuitBreakers.Store(serviceName, cbInfo)
}
r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 暂时不可用,请稍后再试", serviceName))
return
}
// 执行后续中间件和业务逻辑
r.Middleware.Next()
// 记录请求结果基于HTTP状态码
statusCode := r.Response.Status
if !isSuccessStatusCode(resourceName, statusCode) {
// 记录异常
api.TraceError(entry, fmt.Errorf("request failed with status: %d", statusCode))
}
// 退出Sentinel资源
entry.Exit()
}
// isSuccessStatusCode 判断HTTP状态码是否成功
func isSuccessStatusCode(resourceName string, statusCode int) bool {
serviceName := strings.TrimPrefix(resourceName, "service:")
if serviceName == "" {
// 默认只认为2xx是成功
return statusCode >= 200 && statusCode < 300
}
// 从配置中获取成功状态码列表
var serviceConfig *CircuitBreakerConfig
if val, ok := circuitBreakerConfigs.Load(serviceName); ok {
serviceConfig = val.(*CircuitBreakerConfig)
}
if serviceConfig != nil && len(serviceConfig.SuccessStatusCodes) > 0 {
for _, code := range serviceConfig.SuccessStatusCodes {
if statusCode == code {
return true
}
}
return false
}
// 默认2xx状态码为成功
return statusCode >= 200 && statusCode < 300
}
// isCircuitBreakerOpenInDistributed 检查分布式熔断状态
func isCircuitBreakerOpenInDistributed(ctx context.Context, resourceName string) bool {
key := fmt.Sprintf("circuit_breaker:%s:state", resourceName)
value, err := g.Redis().Get(ctx, key)
if err != nil || value.IsNil() {
return false
}
state := value.String()
return state == "open"
}
// CircuitBreakerHealthCheckHandler 熔断器健康检查接口
func CircuitBreakerHealthCheckHandler(r *ghttp.Request) {
status := make(map[string]interface{})
// 遍历所有熔断器
circuitBreakers.Range(func(key, value interface{}) bool {
serviceName := key.(string)
cbInfo := value.(*CircuitBreakerInfo)
// 获取Sentinel中的实际状态
rules := circuitbreaker.GetRulesOfResource(cbInfo.ResourceName)
var stateStr string
if len(rules) > 0 {
stateStr = string(cbInfo.State)
} else {
stateStr = "unknown"
}
status[serviceName] = map[string]interface{}{
"resource": cbInfo.ResourceName,
"state": stateStr,
"config": cbInfo.Config,
"lastOpenTime": cbInfo.LastOpenTime,
"nextRetryTime": cbInfo.NextRetryTime,
}
return true
})
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: "熔断器状态",
Data: status,
})
}
// getSentinelStateString 转换Sentinel状态为字符串
func getSentinelStateString(state circuitbreaker.State) string {
switch state {
case circuitbreaker.Closed:
return string(StateClosed)
case circuitbreaker.Open:
return string(StateOpen)
case circuitbreaker.HalfOpen:
return string(StateHalfOpen)
default:
return "unknown"
}
}
// CircuitBreakerResetHandler 熔断器手动重置接口(仅限管理后台调用)
func CircuitBreakerResetHandler(r *ghttp.Request) {
serviceName := r.Get("service").String()
if serviceName == "" {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 400,
Message: "缺少service参数",
})
return
}
resourceName := fmt.Sprintf("service:%s", serviceName)
// 重置Sentinel规则 - 清空现有规则
_, err := circuitbreaker.LoadRules([]*circuitbreaker.Rule{})
if err != nil {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 500,
Message: fmt.Sprintf("重置熔断器失败: %v", err),
})
return
}
// 重新加载规则
if val, ok := circuitBreakerConfigs.Load(serviceName); ok {
config := val.(*CircuitBreakerConfig)
err = initServiceCircuitBreaker(serviceName, config)
if err != nil {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 500,
Message: fmt.Sprintf("重置熔断器失败: %v", err),
})
return
}
}
// 更新内存状态
if val, ok := circuitBreakers.Load(serviceName); ok {
cbInfo := val.(*CircuitBreakerInfo)
cbInfo.State = StateClosed
cbInfo.LastOpenTime = time.Time{}
cbInfo.NextRetryTime = time.Time{}
circuitBreakers.Store(serviceName, cbInfo)
}
// 重置分布式状态(如果启用)
if enableDistributed {
key := fmt.Sprintf("circuit_breaker:%s:state", resourceName)
_, _ = g.Redis().Del(r.GetCtx(), key)
}
g.Log().Infof(r.GetCtx(), "熔断器已手动重置: %s", resourceName)
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: fmt.Sprintf("服务 '%s' 的熔断器已重置", serviceName),
})
}