From 8ac1630514d64d497cdb71c8d9965d1fdf66c021 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=96=8C?= <259278618@qq.com> Date: Thu, 1 Jan 2026 15:13:36 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=86=94=E6=96=AD=E7=AD=96?= =?UTF-8?q?=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- middleware/circuit_breaker.go | 183 ++++++++++++++++++++++++++++------ 1 file changed, 151 insertions(+), 32 deletions(-) diff --git a/middleware/circuit_breaker.go b/middleware/circuit_breaker.go index c39b452..71b3c2c 100644 --- a/middleware/circuit_breaker.go +++ b/middleware/circuit_breaker.go @@ -99,6 +99,126 @@ type CircuitBreakerMetrics struct { WindowFailures atomic.Int64 // 窗口内失败数 } +// 实现HalfOpenMetrics接口 +func (m *CircuitBreakerMetrics) GetHalfOpenRequests() *atomic.Int64 { + return &m.HalfOpenRequests +} + +func (m *CircuitBreakerMetrics) GetHalfOpenPassed() *atomic.Int64 { + return &m.HalfOpenPassed +} + +func (m *CircuitBreakerMetrics) GetHalfOpenFailed() *atomic.Int64 { + return &m.HalfOpenFailed +} + +func (m *CircuitBreakerMetrics) AddHalfOpenRequests(delta int64) { + m.HalfOpenRequests.Add(delta) +} + +func (m *CircuitBreakerMetrics) AddHalfOpenPassed(delta int64) { + m.HalfOpenPassed.Add(delta) +} + +func (m *CircuitBreakerMetrics) AddHalfOpenFailed(delta int64) { + m.HalfOpenFailed.Add(delta) +} + +// 半开状态指标接口,定义半开状态管理需要的原子操作 +type HalfOpenMetrics interface { + GetHalfOpenRequests() *atomic.Int64 + GetHalfOpenPassed() *atomic.Int64 + GetHalfOpenFailed() *atomic.Int64 + AddHalfOpenRequests(delta int64) + AddHalfOpenPassed(delta int64) + AddHalfOpenFailed(delta int64) +} + +// HalfOpenManager 半开状态管理器 +// 解决多个goroutine同时操作半开状态时可能出现的竞态条件和数据不一致问题 +type HalfOpenManager struct { + mu sync.RWMutex +} + +// NewHalfOpenManager 创建半开状态管理器实例 +func NewHalfOpenManager() *HalfOpenManager { + return &HalfOpenManager{} +} + +var ( + halfOpenManagerInstance *HalfOpenManager + halfOpenManagerOnce sync.Once +) + +// GetHalfOpenManager 获取半开状态管理器单例 +func GetHalfOpenManager() *HalfOpenManager { + halfOpenManagerOnce.Do(func() { + halfOpenManagerInstance = NewHalfOpenManager() + }) + return halfOpenManagerInstance +} + +// TryAcquireHalfOpenSlot 尝试获取半开状态的请求槽位 +func (m *HalfOpenManager) TryAcquireHalfOpenSlot(metrics HalfOpenMetrics, maxRequests int) (bool, int) { + if maxRequests <= 0 { + return false, 0 + } + + m.mu.Lock() + defer m.mu.Unlock() + + currentRequests := int(metrics.GetHalfOpenRequests().Load()) + if currentRequests >= maxRequests { + return false, currentRequests + } + + metrics.AddHalfOpenRequests(1) + metrics.AddHalfOpenPassed(1) + return true, currentRequests + 1 +} + +// RecordHalfOpenResult 记录半开状态请求结果,并检查是否达到成功阈值 +func (m *HalfOpenManager) RecordHalfOpenResult(metrics HalfOpenMetrics, isSuccess bool, successThreshold float64) bool { + if successThreshold < 0 || successThreshold > 1 { + successThreshold = 0.5 + } + + m.mu.Lock() + defer m.mu.Unlock() + + metrics.AddHalfOpenRequests(-1) + if isSuccess { + metrics.AddHalfOpenPassed(1) + } else { + metrics.AddHalfOpenFailed(1) + } + + return m.checkHalfOpenSuccessThreshold(metrics, successThreshold) +} + +// checkHalfOpenSuccessThreshold 检查半开状态的成功率是否达到阈值 +func (m *HalfOpenManager) checkHalfOpenSuccessThreshold(metrics HalfOpenMetrics, successThreshold float64) bool { + totalRequests := metrics.GetHalfOpenPassed().Load() + metrics.GetHalfOpenFailed().Load() + passedRequests := metrics.GetHalfOpenPassed().Load() + + if totalRequests == 0 { + return false + } + + successRate := float64(passedRequests) / float64(totalRequests) + return successRate >= successThreshold +} + +// ResetHalfOpenStats 重置半开状态统计 +func (m *HalfOpenManager) ResetHalfOpenStats(metrics HalfOpenMetrics) { + m.mu.Lock() + defer m.mu.Unlock() + + metrics.GetHalfOpenRequests().Store(0) + metrics.GetHalfOpenPassed().Store(0) + metrics.GetHalfOpenFailed().Store(0) +} + // CircuitBreakerInfo 熔断器信息 type CircuitBreakerInfo struct { ResourceName string @@ -693,16 +813,13 @@ func CircuitBreakerMiddleware(r *ghttp.Request) { return } - // 半开状态处理 - 使用原子操作确保线程安全 + // 半开状态处理 - 使用HalfOpenManager确保线程安全 currentState := cbInfo.getState() if currentState == StateHalfOpen { - // 使用原子操作安全地递增半开请求计数 - halfOpenRequests := cbInfo.Metrics.HalfOpenRequests.Add(1) + manager := GetHalfOpenManager() + acquired, _ := manager.TryAcquireHalfOpenSlot(cbInfo.Metrics, config.HalfOpenMaxRequests) - // 如果超过最大半开请求数量,回滚并触发熔断 - if halfOpenRequests > int64(config.HalfOpenMaxRequests) { - // 原子递减回滚 - cbInfo.Metrics.HalfOpenRequests.Add(-1) + if !acquired { cbInfo.Metrics.BlockRequests.Add(1) // 尝试转换为打开状态,如果成功则记录日志 @@ -785,31 +902,20 @@ func CircuitBreakerMiddleware(r *ghttp.Request) { // 重新获取当前状态 currentState := cbInfo.getState() if currentState == StateHalfOpen { - // 原子递增成功计数 - halfOpenPassed := cbInfo.Metrics.HalfOpenPassed.Add(1) - totalRequests := cbInfo.Metrics.HalfOpenRequests.Load() + manager := GetHalfOpenManager() + // 使用HalfOpenManager记录结果并检查是否达到阈值 + if manager.RecordHalfOpenResult(cbInfo.Metrics, true, config.HalfOpenSuccessThreshold) { + // 达到成功阈值,关闭熔断器 + oldState := cbInfo.setStateWithMetrics(StateClosed, true) + if oldState == StateHalfOpen { + // 重置半开统计 + manager.ResetHalfOpenStats(cbInfo.Metrics) - // 计算成功率,确保分母不为零 - if totalRequests > 0 { - successRate := float64(halfOpenPassed) / float64(totalRequests) + g.Log().Infof(ctx, "半开状态成功,恢复关闭: %s", resourceName) - // 检查是否达到成功率阈值,如果达到则关闭熔断器 - if successRate >= config.HalfOpenSuccessThreshold { - // 原子设置状态为关闭,确保只有一个goroutine能成功转换 - oldState := cbInfo.setStateWithMetrics(StateClosed, true) - if oldState == StateHalfOpen { - // 重置半开统计 - cbInfo.Metrics.HalfOpenPassed.Store(0) - cbInfo.Metrics.HalfOpenRequests.Store(0) - cbInfo.Metrics.HalfOpenFailed.Store(0) - - g.Log().Infof(ctx, "半开状态成功,恢复关闭: %s, successRate=%.2f, total=%d, passed=%d", - resourceName, successRate, totalRequests, halfOpenPassed) - - // 同步分布式状态 - if config.DistributedTTL > 0 { - syncCircuitBreakerStateToDistributed(ctx, resourceName, "closed", config.DistributedTTL) - } + // 同步分布式状态 + if config.DistributedTTL > 0 { + syncCircuitBreakerStateToDistributed(ctx, resourceName, "closed", config.DistributedTTL) } } } @@ -1197,8 +1303,21 @@ func resetSingleService(r *ghttp.Request, serviceName string) error { if ok && config.DistributedTTL > 0 { redisClient := g.Redis() if redisClient != nil { - if _, err := redisClient.Del(r.GetCtx(), "circuit_breaker:"+resourceName+":state"); err != nil { - g.Log().Warningf(r.GetCtx(), "清除分布式熔断状态失败: %s, error: %v", resourceName, err) + // 使用common/redis中的Lock方法确保分布式一致性 + lockKey := "circuit_breaker:" + resourceName + ":lock" + success, err := redis.Lock(r.GetCtx(), lockKey, 10, func(ctx context.Context) error { + _, err := redisClient.Del(ctx, "circuit_breaker:"+resourceName+":state") + if err != nil { + g.Log().Warningf(ctx, "清除分布式熔断状态失败: %s, error: %v", resourceName, err) + } + return nil + }) + + if err != nil { + g.Log().Errorf(r.GetCtx(), "获取分布式锁失败: %s, error: %v", lockKey, err) + } + if !success { + g.Log().Debugf(r.GetCtx(), "未获取到分布式锁,跳过状态清除: %s", lockKey) } } }