Files
model-gateway/service/queue_gate.go

108 lines
3.2 KiB
Go
Raw Normal View History

2026-04-29 15:54:14 +08:00
package service
import (
"context"
"fmt"
"math"
"time"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
)
// ===== 严格 queue_limitRedis 原子闸门 =====
//
// 背景:原来的 queue_limit 通过“Count + Insert”做近似控制分布式并发创建时会短暂超限。
// 目标:以 Redis Lua 脚本实现原子校验 + 入队占位,做到严格不超限。
//
// 计数口径与原逻辑保持一致:只统计 state=0/1排队中/执行中)。
// - CreateTask 成功入库后占用 1 个 slot
// - 任务成功/失败state->2/3释放 slot
// - 失败任务重试state 3->0需要再次占用 slot若占位失败则暂不重试留在 state=3下次 cleaner 再尝试)
//
// 说明:为避免极端情况下“占位泄漏”导致永久占满,采用 ZSET + 过期时间的方式自动回收。
// 只要任务实际生命周期远小于 gateTTLSeconds就可保持严格。
const (
queueGateKeyPrefix = "asynch:qgate:" // asynch:qgate:{modelName}
)
// Lua清理过期 slot然后按 limit 做原子判定并占位
var queueGateAcquireLua = `
local key = KEYS[1]
local now = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local expireAt = tonumber(ARGV[3])
local member = ARGV[4]
local keyTTL = tonumber(ARGV[5])
-- 先清理过期的占位
redis.call("ZREMRANGEBYSCORE", key, "-inf", now)
local current = tonumber(redis.call("ZCARD", key) or "0")
if current >= limit then
return 0
end
redis.call("ZADD", key, expireAt, member)
redis.call("EXPIRE", key, keyTTL)
return 1
`
// Lua释放 slot幂等
var queueGateReleaseLua = `
local key = KEYS[1]
local member = ARGV[1]
redis.call("ZREM", key, member)
return 1
`
func queueGateKey(modelName string) string {
return fmt.Sprintf("%s%s", queueGateKeyPrefix, modelName)
}
// calcGateTTLSeconds 计算闸门占位的“自动回收 TTL”
// 取 expectedSeconds 的倍数并做上下限,避免任务异常导致永久占位。
func calcGateTTLSeconds(expectedSeconds int) int {
// 默认至少 1 小时;最多 24 小时
minTTL := 3600
maxTTL := 24 * 3600
if expectedSeconds <= 0 {
return minTTL
}
ttl := int(math.Ceil(float64(expectedSeconds) * 10)) // 预计耗时 * 10 做兜底
if ttl < minTTL {
ttl = minTTL
}
if ttl > maxTTL {
ttl = maxTTL
}
return ttl
}
// AcquireQueueSlot 严格入队:原子占位(成功返回 true
func AcquireQueueSlot(ctx context.Context, modelName, taskId string, limit int, expectedSeconds int) (bool, error) {
if limit <= 0 {
return true, nil
}
key := queueGateKey(modelName)
now := time.Now().Unix()
ttl := calcGateTTLSeconds(expectedSeconds)
expireAt := now + int64(ttl)
// keyTTL 要略大于 member TTL避免 key 先过期导致计数丢失
keyTTL := ttl + 60
r, err := g.Redis().Do(ctx, "EVAL", queueGateAcquireLua, 1, key, now, limit, expireAt, taskId, keyTTL)
if err != nil {
return false, fmt.Errorf("queue gate acquire failed: %w", err)
}
return gconv.Int(r) == 1, nil
}
// ReleaseQueueSlot 释放占位(幂等)
func ReleaseQueueSlot(ctx context.Context, modelName, taskId string) {
if taskId == "" || modelName == "" {
return
}
key := queueGateKey(modelName)
_, _ = g.Redis().Do(ctx, "EVAL", queueGateReleaseLua, 1, key, taskId)
}