Files
common/middleware/circuit_breaker.go
2026-03-12 08:51:25 +08:00

833 lines
26 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package middleware
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/ghttp"
)
// CircuitBreakerState 熔断器状态
type CircuitBreakerState string
const (
StateClosed CircuitBreakerState = "closed" // 关闭:正常状态
StateOpen CircuitBreakerState = "open" // 开启:熔断状态
)
// CircuitBreakerConfig 熔断器配置
type CircuitBreakerConfig struct {
Enabled bool // 是否启用熔断器
MaxFailures int // 连续失败次数
Timeout string // 熔断超时时间
TimeoutParsed time.Duration // 缓存的超时时间(性能优化)
SuccessStatusCodes []int // 视为成功的HTTP状态码
SlowRequestThreshold string // 慢请求阈值
SlowRequestThresholdParsed time.Duration // 缓存的慢请求阈值(性能优化)
EnableSlidingWindow bool // 是否启用滑动窗口
FailureRateThreshold float64 // 失败率阈值
EnableFallback bool // 是否启用降级
FallbackMessage string // 降级提示消息
RequestTimeout int // 请求超时时间(毫秒)
DistributedTTL int // 分布式熔断状态TTL
AdminIPs []string // 允许重置熔断器的管理员IP列表
}
// CircuitBreakerMetrics 熔断器指标
type CircuitBreakerMetrics struct {
TotalRequests atomic.Int64 // 总请求数
PassRequests atomic.Int64 // 通过请求数
BlockRequests atomic.Int64 // 阻塞请求数
FailureRequests atomic.Int64 // 失败请求数
OpenCount atomic.Int64 // 熔断开启次数
LastResetTime atomic.Int64 // 上次重置时间Unix时间戳
}
// CircuitBreakerInfo 熔断器信息
type CircuitBreakerInfo struct {
ResourceName string `json:"resourceName"` // 资源名称
State CircuitBreakerState `json:"state"` // 当前状态
Config *CircuitBreakerConfig `json:"config"` // 配置信息
LastOpenTime time.Time `json:"lastOpenTime"` // 上次熔断时间
NextRetryTime time.Time `json:"nextRetryTime"` // 下次重试时间
Metrics *CircuitBreakerMetrics `json:"metrics"` // 指标统计
mu sync.RWMutex // 保护状态更新
}
var (
// circuitBreakers 存储所有熔断器状态(用于健康检查)
circuitBreakers sync.Map
// circuitBreakerConfigs 熔断器配置缓存
circuitBreakerConfigs sync.Map
// distributedSyncLocks 分布式同步锁(按服务名分片)
distributedSyncLocks sync.Map
// stateChangeListeners 状态变化监听器
stateChangeListeners sync.Map
// stateChangeListenersRegistered 默认监听器是否已注册
stateChangeListenersRegistered sync.Map
)
// InitCircuitBreaker 初始化Sentinel熔断器
func InitCircuitBreaker() error {
ctx := context.Background()
// 初始化Sentinel
err := api.InitDefault()
if err != nil {
return fmt.Errorf("sentinel初始化失败: %v", err)
}
// 注册熔断器状态变化监听器
registerStateChangeListeners()
g.Log().Infof(ctx, "Sentinel熔断器初始化成功")
// 扫描配置文件中所有配置了熔断器的服务
services := g.Cfg().MustGet(ctx, "circuitBreaker").Map()
// 过滤掉非服务配置的key
serviceNames := filterServiceNames(services)
if len(serviceNames) == 0 {
g.Log().Infof(ctx, "未配置任何服务熔断器")
return nil
}
// 为每个服务创建熔断器
enabledCount := 0
for _, serviceName := range serviceNames {
serviceConfig := loadServiceCircuitBreakerConfig(serviceName)
if serviceConfig != nil && serviceConfig.Enabled {
circuitBreakerConfigs.Store(serviceName, serviceConfig)
initErr := initServiceCircuitBreaker(serviceName, serviceConfig)
if initErr != nil {
g.Log().Errorf(ctx, "服务 %s 熔断器初始化失败: %v", serviceName, initErr)
} else {
g.Log().Infof(ctx, "服务 %s 熔断器初始化成功", serviceName)
enabledCount++
}
} else {
g.Log().Infof(ctx, "服务 %s 熔断器未启用", serviceName)
}
}
g.Log().Infof(ctx, "共初始化 %d 个服务熔断器,其中 %d 个已启用", len(serviceNames), enabledCount)
return nil
}
// ReloadCircuitBreakerConfig 动态重新加载熔断器配置
func ReloadCircuitBreakerConfig(serviceName string) error {
ctx := context.Background()
// 重新加载配置
serviceConfig := loadServiceCircuitBreakerConfig(serviceName)
if serviceConfig == nil {
return fmt.Errorf("未找到服务 %s 的配置", serviceName)
}
// 更新配置缓存
circuitBreakerConfigs.Store(serviceName, serviceConfig)
// 重新初始化熔断器
err := initServiceCircuitBreaker(serviceName, serviceConfig)
if err != nil {
return fmt.Errorf("重新初始化熔断器失败: %v", err)
}
g.Log().Infof(ctx, "服务 %s 熔断器配置重新加载成功", serviceName)
return nil
}
// loadServiceCircuitBreakerConfig 加载单个服务的熔断器配置
func loadServiceCircuitBreakerConfig(serviceName string) *CircuitBreakerConfig {
ctx := context.Background()
key := fmt.Sprintf("circuitBreaker.%s", serviceName)
enabled := g.Cfg().MustGet(ctx, key+".enabled", true).Bool()
maxFailures := g.Cfg().MustGet(ctx, key+".maxFailures", 5).Int()
timeout := g.Cfg().MustGet(ctx, key+".timeout", "60s").String()
slowRequestThreshold := g.Cfg().MustGet(ctx, key+".slowRequestThreshold", "3s").String()
enableSlidingWindow := g.Cfg().MustGet(ctx, key+".enableSlidingWindow", false).Bool()
failureRateThreshold := g.Cfg().MustGet(ctx, key+".failureRateThreshold", 0.5).Float64()
enableFallback := g.Cfg().MustGet(ctx, key+".enableFallback", false).Bool()
fallbackMessage := g.Cfg().MustGet(ctx, key+".fallbackMessage", "").String()
requestTimeout := g.Cfg().MustGet(ctx, key+".requestTimeout", 30000).Int()
distributedTTL := g.Cfg().MustGet(ctx, key+".distributedTTL", 300).Int()
adminIPs := g.Cfg().MustGet(ctx, key+".adminIPs", "").String()
// 解析成功状态码
successCodes := g.Cfg().MustGet(ctx, key+".successStatusCodes", "200,201,204").String()
statusCodes := parseStatusCodes(successCodes)
// 解析时间(缓存结果,性能优化)
timeoutParsed, err := time.ParseDuration(timeout)
if err != nil {
timeoutParsed = 60 * time.Second
g.Log().Warningf(ctx, "服务 %s 的 timeout 解析失败,使用默认值: %v", serviceName, err)
}
slowRequestThresholdParsed, err := time.ParseDuration(slowRequestThreshold)
if err != nil {
slowRequestThresholdParsed = 3 * time.Second
g.Log().Warningf(ctx, "服务 %s 的 slowRequestThreshold 解析失败,使用默认值: %v", serviceName, err)
}
// 解析管理员IP列表
adminIPList := parseAdminIPs(adminIPs)
return &CircuitBreakerConfig{
Enabled: enabled,
MaxFailures: maxFailures,
Timeout: timeout,
TimeoutParsed: timeoutParsed,
SuccessStatusCodes: statusCodes,
SlowRequestThreshold: slowRequestThreshold,
SlowRequestThresholdParsed: slowRequestThresholdParsed,
EnableSlidingWindow: enableSlidingWindow,
FailureRateThreshold: failureRateThreshold,
EnableFallback: enableFallback,
FallbackMessage: fallbackMessage,
RequestTimeout: requestTimeout,
DistributedTTL: distributedTTL,
AdminIPs: adminIPList,
}
}
// parseStatusCodes 解析HTTP状态码
func parseStatusCodes(str string) []int {
parts := strings.Split(str, ",")
codes := make([]int, 0, len(parts))
for _, part := range parts {
var code int
if _, err := fmt.Sscanf(strings.TrimSpace(part), "%d", &code); err == nil {
codes = append(codes, code)
}
}
return codes
}
// parseAdminIPs 解析管理员IP列表
func parseAdminIPs(str string) []string {
if str == "" {
return nil
}
parts := strings.Split(str, ",")
ips := make([]string, 0, len(parts))
for _, part := range parts {
ip := strings.TrimSpace(part)
if ip != "" {
ips = append(ips, ip)
}
}
return ips
}
// filterServiceNames 过滤服务名排除非服务配置的key
func filterServiceNames(services map[string]interface{}) []string {
excludeKeys := map[string]bool{
"services": true,
"enableDistributed": true,
"requestTimeout": true,
"distributedTTL": true,
}
serviceNames := make([]string, 0, len(services))
for key := range services {
if !excludeKeys[key] {
serviceNames = append(serviceNames, key)
}
}
return serviceNames
}
// initServiceCircuitBreaker 初始化服务熔断器
func initServiceCircuitBreaker(serviceName string, config *CircuitBreakerConfig) error {
// 验证配置参数
if err := validateCircuitBreakerConfig(config); err != nil {
return fmt.Errorf("配置验证失败: %v", err)
}
// 使用缓存的时间值(性能优化)
timeout := config.TimeoutParsed
slowRequestThreshold := config.SlowRequestThresholdParsed
resourceName := fmt.Sprintf("service:%s", serviceName)
var rule []*circuitbreaker.Rule
if config.EnableSlidingWindow {
// 使用滑动窗口统计(更精确)- 慢调用比例策略
rule = []*circuitbreaker.Rule{
{
Resource: resourceName,
Strategy: circuitbreaker.SlowRequestRatio,
RetryTimeoutMs: uint32(timeout.Milliseconds()),
MinRequestAmount: uint64(config.MaxFailures),
StatIntervalMs: 1000,
StatSlidingWindowBucketCount: 10,
MaxAllowedRtMs: uint64(slowRequestThreshold.Milliseconds()),
Threshold: config.FailureRateThreshold,
},
}
} else {
// 使用连续失败计数(更简单快速)- 异常数策略
rule = []*circuitbreaker.Rule{
{
Resource: resourceName,
Strategy: circuitbreaker.ErrorCount,
RetryTimeoutMs: uint32(timeout.Milliseconds()),
MinRequestAmount: uint64(config.MaxFailures),
StatIntervalMs: 1000, // 1秒统计窗口
Threshold: float64(config.MaxFailures),
},
}
}
// 先清理旧规则(健壮性改进)
_, _ = circuitbreaker.LoadRulesOfResource(resourceName, []*circuitbreaker.Rule{})
// 加载新规则到Sentinel
_, err := circuitbreaker.LoadRules(rule)
if err != nil {
return fmt.Errorf("加载熔断规则失败: %v", err)
}
// 初始化熔断器信息
cbInfo := &CircuitBreakerInfo{
ResourceName: resourceName,
State: StateClosed,
Config: config,
Metrics: &CircuitBreakerMetrics{},
}
cbInfo.Metrics.LastResetTime.Store(time.Now().Unix())
circuitBreakers.Store(serviceName, cbInfo)
strategy := "error_count"
if config.EnableSlidingWindow {
strategy = "slow_ratio"
}
g.Log().Infof(context.Background(), "服务 %s 熔断器初始化成功: resource=%s, strategy=%s, timeout=%v, threshold=%.2f",
serviceName, resourceName, strategy, timeout, rule[0].Threshold)
return nil
}
// CircuitBreakerMiddleware 熔断降级中间件使用阿里Sentinel
func CircuitBreakerMiddleware(r *ghttp.Request) {
startTime := time.Now()
ctx := r.GetCtx()
// 从URL路径提取服务名改进提取逻辑
serviceName := extractServiceName(r.URL.Path)
if serviceName == "" {
r.Middleware.Next()
return
}
// 获取熔断器配置
val, ok := circuitBreakerConfigs.Load(serviceName)
if !ok {
// 未配置熔断器,直接放行
r.Middleware.Next()
return
}
config := val.(*CircuitBreakerConfig)
if !config.Enabled {
// 熔断器未启用,直接放行
r.Middleware.Next()
return
}
// 获取熔断器信息
cbInfoVal, ok := circuitBreakers.Load(serviceName)
if !ok {
r.Middleware.Next()
return
}
cbInfo := cbInfoVal.(*CircuitBreakerInfo)
cbInfo.Metrics.TotalRequests.Add(1)
// 设置请求超时(使用服务独立配置)
if config.RequestTimeout > 0 {
ctx, cancel := context.WithTimeout(ctx, time.Duration(config.RequestTimeout)*time.Millisecond)
r.SetCtx(ctx)
defer cancel()
}
resourceName := fmt.Sprintf("service:%s", serviceName)
// 检查是否启用分布式熔断
if config.DistributedTTL > 0 {
if isCircuitBreakerOpenInDistributed(ctx, resourceName) {
cbInfo.Metrics.BlockRequests.Add(1)
g.Log().Warningf(ctx, "分布式熔断触发: %s", resourceName)
notifyStateChange(serviceName, StateOpen, StateOpen)
sendFallbackResponse(r, serviceName, config, "distributed")
return
}
}
// 使用Sentinel进行熔断保护
entry, blockError := api.Entry(resourceName)
if blockError != nil {
// 被熔断拦截
cbInfo.Metrics.BlockRequests.Add(1)
cbInfo.Metrics.OpenCount.Add(1)
g.Log().Warningf(ctx, "熔断触发: %s, reason: %v", resourceName, blockError)
// 更新熔断器状态
cbInfo.mu.Lock()
oldState := cbInfo.State
cbInfo.State = StateOpen
cbInfo.LastOpenTime = time.Now()
// 使用缓存的时间值(性能优化)
cbInfo.NextRetryTime = time.Now().Add(config.TimeoutParsed)
cbInfo.mu.Unlock()
// 通知状态变化(如果状态改变)
if oldState != StateOpen {
notifyStateChange(serviceName, oldState, StateOpen)
}
// 同步到分布式存储
if config.DistributedTTL > 0 {
syncCircuitBreakerStateToDistributed(ctx, resourceName, "open", config.DistributedTTL)
}
sendFallbackResponse(r, serviceName, config, "blocked")
return
}
// 执行后续中间件和业务逻辑
r.Middleware.Next()
// 记录请求结果基于HTTP状态码
statusCode := r.Response.Status
duration := time.Since(startTime)
// 使用提前获取的config判断状态码性能优化
if !isSuccessStatusCode(config, statusCode) {
// 记录异常
cbInfo.Metrics.FailureRequests.Add(1)
api.TraceError(entry, fmt.Errorf("request failed with status: %d", statusCode))
g.Log().Debugf(ctx, "服务 %s 请求失败: status=%d, duration=%v", serviceName, statusCode, duration)
} else {
cbInfo.Metrics.PassRequests.Add(1)
// 更新状态为关闭(如果之前是开启状态)
cbInfo.mu.Lock()
oldState := cbInfo.State
if cbInfo.State != StateClosed {
cbInfo.State = StateClosed
notifyStateChange(serviceName, oldState, StateClosed)
}
cbInfo.mu.Unlock()
}
// 退出Sentinel资源
entry.Exit()
}
// sendFallbackResponse 发送降级响应
func sendFallbackResponse(r *ghttp.Request, serviceName string, config *CircuitBreakerConfig, reason string) {
if config.EnableFallback && config.FallbackMessage != "" {
// 自定义降级消息
r.Response.WriteStatusExit(503, config.FallbackMessage)
} else {
// 根据原因返回不同的状态码和消息
switch reason {
case "blocked":
r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 熔断保护中,请稍后再试", serviceName))
case "distributed":
r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 分布式熔断中", serviceName))
default:
r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 暂时不可用,请稍后再试", serviceName))
}
}
}
// isSuccessStatusCode 判断HTTP状态码是否成功
func isSuccessStatusCode(config *CircuitBreakerConfig, statusCode int) bool {
if len(config.SuccessStatusCodes) > 0 {
for _, code := range config.SuccessStatusCodes {
if statusCode == code {
return true
}
}
return false
}
// 默认2xx状态码为成功
return statusCode >= 200 && statusCode < 300
}
// extractServiceName 从URL路径提取服务名改进提取逻辑
func extractServiceName(path string) string {
// 去除首尾斜杠并分割
path = strings.Trim(path, "/")
if path == "" {
return ""
}
parts := strings.Split(path, "/")
if len(parts) == 0 {
return ""
}
serviceName := parts[0]
// 验证服务名是否在已配置的熔断器中
if _, ok := circuitBreakerConfigs.Load(serviceName); ok {
return serviceName
}
return ""
}
// isCircuitBreakerOpenInDistributed 检查分布式熔断状态
func isCircuitBreakerOpenInDistributed(ctx context.Context, resourceName string) bool {
key := fmt.Sprintf("circuit_breaker:%s:state", resourceName)
redis := g.Redis()
if redis == nil {
return false
}
value, err := redis.Get(ctx, key)
if err != nil || value.IsNil() {
return false
}
state := value.String()
return state == "open"
}
// getDistributedLock 获取分布式锁(按服务名分片)
func getDistributedLock(serviceName string) *sync.Mutex {
lock, _ := distributedSyncLocks.LoadOrStore(serviceName, &sync.Mutex{})
return lock.(*sync.Mutex)
}
// syncCircuitBreakerStateToDistributed 同步熔断器状态到分布式存储
func syncCircuitBreakerStateToDistributed(ctx context.Context, resourceName, state string, ttl int) {
// 提取服务名用于锁分片
serviceName := strings.TrimPrefix(resourceName, "service:")
lock := getDistributedLock(serviceName)
lock.Lock()
defer lock.Unlock()
key := fmt.Sprintf("circuit_breaker:%s:state", resourceName)
redis := g.Redis()
if redis == nil {
g.Log().Errorf(ctx, "Redis客户端未初始化无法同步熔断状态")
return
}
_, err := redis.Do(ctx, "SETEX", key, ttl, state)
if err != nil {
g.Log().Errorf(ctx, "同步熔断状态到Redis失败: %v", err)
}
}
// validateCircuitBreakerConfig 验证熔断器配置
func validateCircuitBreakerConfig(config *CircuitBreakerConfig) error {
if config.MaxFailures <= 0 {
return fmt.Errorf("maxFailures必须大于0")
}
if config.FailureRateThreshold < 0 || config.FailureRateThreshold > 1 {
return fmt.Errorf("failureRateThreshold必须在0.0-1.0之间")
}
if len(config.SuccessStatusCodes) == 0 {
return fmt.Errorf("successStatusCodes不能为空")
}
if config.RequestTimeout < 0 || config.RequestTimeout > 300000 {
return fmt.Errorf("requestTimeout必须在0-300000毫秒之间")
}
if config.DistributedTTL < 0 || config.DistributedTTL > 3600 {
return fmt.Errorf("distributedTTL必须在0-3600秒之间")
}
// 验证时间字符串格式(如果缓存为空,说明解析失败)
if config.TimeoutParsed == 0 {
return fmt.Errorf("timeout格式错误应为有效的时间字符串如30s, 1m")
}
if config.SlowRequestThresholdParsed == 0 {
return fmt.Errorf("slowRequestThreshold格式错误应为有效的时间字符串如3s, 1m")
}
return nil
}
// registerStateChangeListeners 注册状态变化监听器
func registerStateChangeListeners() {
// 检查是否已注册,防止重复注册(健壮性改进)
if _, exists := stateChangeListenersRegistered.LoadOrStore("default", true); exists {
return
}
// 注册默认监听器(区分日志级别)
RegisterStateChangeListener("default", func(serviceName string, fromState, toState CircuitBreakerState) {
// Open状态使用Warning级别Closed状态使用Info级别
if toState == StateOpen {
g.Log().Warningf(context.Background(), "熔断器状态变化: service=%s, %s -> %s",
serviceName, fromState, toState)
} else {
g.Log().Infof(context.Background(), "熔断器状态变化: service=%s, %s -> %s",
serviceName, fromState, toState)
}
})
}
// StateChangeListener 状态变化监听器类型
type StateChangeListener func(serviceName string, fromState, toState CircuitBreakerState)
// RegisterStateChangeListener 注册状态变化监听器
func RegisterStateChangeListener(name string, listener StateChangeListener) {
stateChangeListeners.Store(name, listener)
}
// UnregisterStateChangeListener 取消注册状态变化监听器
func UnregisterStateChangeListener(name string) {
stateChangeListeners.Delete(name)
}
// notifyStateChange 通知所有监听器状态变化
func notifyStateChange(serviceName string, fromState, toState CircuitBreakerState) {
stateChangeListeners.Range(func(key, value interface{}) bool {
if listener, ok := value.(StateChangeListener); ok {
listener(serviceName, fromState, toState)
}
return true
})
}
// CircuitBreakerHealthCheckHandler 熔断器健康检查接口
func CircuitBreakerHealthCheckHandler(r *ghttp.Request) {
status := make(map[string]interface{})
totalServices := 0
openServices := 0
// 遍历所有熔断器
circuitBreakers.Range(func(key, value interface{}) bool {
serviceName := key.(string)
cbInfo := value.(*CircuitBreakerInfo)
totalServices++
cbInfo.mu.RLock()
isOpen := cbInfo.State == StateOpen
if isOpen {
openServices++
}
// 从Metrics中读取数据修复数据准确性问题
lastResetTime := cbInfo.Metrics.LastResetTime.Load()
var lastResetTimeStr string
if lastResetTime > 0 {
lastResetTimeStr = time.Unix(lastResetTime, 0).Format("2006-01-02 15:04:05")
}
status[serviceName] = map[string]interface{}{
"resource": cbInfo.ResourceName,
"state": string(cbInfo.State),
"lastOpenTime": cbInfo.LastOpenTime,
"nextRetryTime": cbInfo.NextRetryTime,
"totalRequests": cbInfo.Metrics.TotalRequests.Load(),
"passRequests": cbInfo.Metrics.PassRequests.Load(),
"blockRequests": cbInfo.Metrics.BlockRequests.Load(),
"failureRequests": cbInfo.Metrics.FailureRequests.Load(),
"openCount": cbInfo.Metrics.OpenCount.Load(),
"lastResetTime": lastResetTimeStr,
}
cbInfo.mu.RUnlock()
return true
})
summary := map[string]interface{}{
"totalServices": totalServices,
"openServices": openServices,
"closedServices": totalServices - openServices,
}
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: "熔断器状态",
Data: map[string]interface{}{
"summary": summary,
"services": status,
},
})
}
// isAdminIP 检查请求IP是否在管理员白名单中
func isAdminIP(r *ghttp.Request) bool {
clientIP := r.GetClientIp()
if clientIP == "" {
return false
}
// 检查所有服务的adminIPs配置
var allowedIPs []string
circuitBreakerConfigs.Range(func(key, value interface{}) bool {
config := value.(*CircuitBreakerConfig)
if len(config.AdminIPs) > 0 {
allowedIPs = append(allowedIPs, config.AdminIPs...)
}
return true
})
// 如果没有配置白名单允许所有IP向后兼容
if len(allowedIPs) == 0 {
return true
}
// 检查IP是否在白名单中
for _, allowedIP := range allowedIPs {
if clientIP == allowedIP {
return true
}
}
g.Log().Warningf(r.GetCtx(), "熔断器重置请求被拒绝IP不在白名单中: %s", clientIP)
return false
}
// CircuitBreakerResetHandler 熔断器手动重置接口(仅限管理后台调用)
func CircuitBreakerResetHandler(r *ghttp.Request) {
serviceName := r.Get("service").String()
if serviceName == "" {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 400,
Message: "缺少service参数",
})
return
}
// 权限验证检查IP是否在白名单中
if !isAdminIP(r) {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 403,
Message: "权限不足,禁止访问",
})
return
}
resourceName := fmt.Sprintf("service:%s", serviceName)
// 获取当前服务的所有规则
currentRules := circuitbreaker.GetRulesOfResource(resourceName)
// 只删除当前服务的规则
if len(currentRules) > 0 {
_, err := circuitbreaker.LoadRulesOfResource(resourceName, []*circuitbreaker.Rule{})
if err != nil {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 500,
Message: fmt.Sprintf("重置熔断器失败: %v", err),
})
return
}
}
// 重新加载该服务的规则
if val, ok := circuitBreakerConfigs.Load(serviceName); ok {
config := val.(*CircuitBreakerConfig)
err := initServiceCircuitBreaker(serviceName, config)
if err != nil {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 500,
Message: fmt.Sprintf("重置熔断器失败: %v", err),
})
return
}
}
// 更新内存状态并重置指标
if val, ok := circuitBreakers.Load(serviceName); ok {
cbInfo := val.(*CircuitBreakerInfo)
cbInfo.mu.Lock()
cbInfo.State = StateClosed
cbInfo.LastOpenTime = time.Time{}
cbInfo.NextRetryTime = time.Time{}
// 重置指标
cbInfo.Metrics.TotalRequests.Store(0)
cbInfo.Metrics.PassRequests.Store(0)
cbInfo.Metrics.BlockRequests.Store(0)
cbInfo.Metrics.FailureRequests.Store(0)
cbInfo.Metrics.OpenCount.Store(0)
cbInfo.Metrics.LastResetTime.Store(time.Now().Unix())
cbInfo.mu.Unlock()
}
// 重置分布式状态(如果启用)
if val, ok := circuitBreakerConfigs.Load(serviceName); ok {
config := val.(*CircuitBreakerConfig)
if config.DistributedTTL > 0 {
key := fmt.Sprintf("circuit_breaker:%s:state", resourceName)
redis := g.Redis()
if redis != nil {
_, _ = redis.Del(r.GetCtx(), key)
}
}
}
g.Log().Infof(r.GetCtx(), "熔断器已手动重置: %s", resourceName)
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: fmt.Sprintf("服务 '%s' 的熔断器已重置", serviceName),
})
}
// CircuitBreakerReloadHandler 熔断器配置重载接口
func CircuitBreakerReloadHandler(r *ghttp.Request) {
serviceName := r.Get("service").String()
if serviceName == "" {
// 重载所有服务 - 扫描配置文件中所有服务
services := g.Cfg().MustGet(r.GetCtx(), "circuitBreaker").Map()
// 过滤出服务名
serviceNames := filterServiceNames(services)
successCount := 0
failCount := 0
for _, service := range serviceNames {
err := ReloadCircuitBreakerConfig(service)
if err != nil {
g.Log().Errorf(r.GetCtx(), "服务 %s 配置重载失败: %v", service, err)
failCount++
} else {
successCount++
}
}
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: fmt.Sprintf("配置重载完成: 成功 %d, 失败 %d", successCount, failCount),
Data: map[string]interface{}{
"success": successCount,
"failed": failCount,
},
})
return
}
// 重载单个服务
err := ReloadCircuitBreakerConfig(serviceName)
if err != nil {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 500,
Message: fmt.Sprintf("重载失败: %v", err),
})
return
}
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: fmt.Sprintf("服务 '%s' 的熔断器配置已重载", serviceName),
})
}