Files

195 lines
6.4 KiB
Go
Raw Permalink Normal View History

2026-04-29 15:54:14 +08:00
package service
import (
"context"
"fmt"
"math"
"model-gateway/consts/public"
"model-gateway/model/entity"
2026-04-29 15:54:14 +08:00
"gitea.com/red-future/common/db/gfdb"
"github.com/gogf/gf/v2/frame/g"
)
// AutoTuneResult 单次调参结果(按 model_name
type AutoTuneResult struct {
ModelName string `json:"modelName"` // 模型名称asynch_models.model_name
Samples int `json:"samples"` // 统计样本数(窗口内 state=2/3 且 started_at/finished_at 非空的任务数量)
P90Exec float64 `json:"p90ExecSeconds"` // 执行耗时 P90口径finished_at - started_at
2026-04-29 15:54:14 +08:00
CapMaxConcurrency int `json:"capMaxConcurrency"` // 配置上限asynch_models.max_concurrencycap不会被动态调参覆盖
OldMaxConcurrency int `json:"oldMaxConcurrency"` // 调参前运行时值Redis若无则等于 cap
NewMaxConcurrency int `json:"newMaxConcurrency"` // 本次计算出的运行时值(将写入 Redis受 ±50% 约束且不超过 cap
CapQueueLimit int `json:"capQueueLimit"` // 配置上限asynch_models.queue_limitcap不会被动态调参覆盖
OldQueueLimit int `json:"oldQueueLimit"` // 调参前运行时值Redis若无则等于 cap
NewQueueLimit int `json:"newQueueLimit"` // 本次计算出的运行时值(将写入 Redis受 ±50% 约束且不超过 cap
ExpectedSeconds int `json:"expectedSeconds"` // 模型预计执行时间asynch_models.expected_seconds用于 queue_limit 计算绑定)
}
// AutoTune 由上层定时任务通过接口触发:
// - 统计指定时间窗口内该模型任务的执行耗时finished_at - started_at取 P90
// - 基于吞吐与 P90 执行耗时估算 max_concurrency 的运行时值(不超过 cap
// - queue_limit 与 expected_seconds 绑定(允许排队时间 = expected_seconds * 2生成运行时值不超过 cap
// - 单次调整幅度限制 ±50%,写入 Redis带 TTL
func AutoTune(ctx context.Context, windowSeconds int) ([]AutoTuneResult, error) {
if windowSeconds <= 0 {
windowSeconds = 3600
}
// 1) 读取模型配置cap按 model_name 聚合去重(如果表里有多租户重复数据,取较大上限)
var modelRows []*entity.AsynchModel
if err := gfdb.DB(ctx).Model(ctx, public.TableNameModel).
Where("deleted_at IS NULL").
Where(entity.AsynchModelCol.Enabled, 1).
Scan(&modelRows); err != nil {
return nil, err
}
modelMap := make(map[string]*entity.AsynchModel)
for _, m := range modelRows {
if m == nil || m.ModelName == "" {
continue
}
cur := modelMap[m.ModelName]
if cur == nil {
modelMap[m.ModelName] = m
continue
}
// 取更大的 cap
if m.MaxConcurrency > cur.MaxConcurrency {
cur.MaxConcurrency = m.MaxConcurrency
}
if m.QueueLimit > cur.QueueLimit {
cur.QueueLimit = m.QueueLimit
}
if m.ExpectedSeconds > cur.ExpectedSeconds {
cur.ExpectedSeconds = m.ExpectedSeconds
}
}
if len(modelMap) == 0 {
return []AutoTuneResult{}, nil
}
// 2) 统计指定窗口:按 model_name 计算 cnt 和 P90 执行耗时
type statRow struct {
ModelName string
Cnt int
P90Exec float64
}
var stats []statRow
sql := fmt.Sprintf(`
SELECT model_name,
COUNT(1) AS cnt,
COALESCE(percentile_cont(0.9) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM (finished_at - started_at))), 0) AS p90_exec
FROM %s
WHERE deleted_at IS NULL
AND state IN (2,3)
AND started_at IS NOT NULL
AND finished_at IS NOT NULL
AND finished_at >= (NOW() - (? || ' seconds')::interval)
GROUP BY model_name`, public.TableNameTask)
r, err := gfdb.DB(ctx).GetAll(ctx, sql, windowSeconds)
if err != nil {
return nil, err
}
_ = r.Structs(&stats)
statMap := make(map[string]statRow, len(stats))
for _, s := range stats {
statMap[s.ModelName] = s
}
// 3) 调参计算
const utilization = 0.8
const maxChangeRatio = 0.5 // ±50%
const queueFactor = 2.0 // 与 expected_seconds 绑定W_target = expected_seconds * 2
out := make([]AutoTuneResult, 0, len(modelMap))
for modelName, m := range modelMap {
s := statMap[modelName]
capMax := m.MaxConcurrency
capQueue := m.QueueLimit
oldMax := GetRuntimeMaxConcurrency(ctx, modelName, capMax)
oldQueue := GetRuntimeQueueLimit(ctx, modelName, capQueue)
// 默认:无样本则不调整
if s.Cnt <= 0 || s.P90Exec <= 0 {
out = append(out, AutoTuneResult{
ModelName: modelName,
Samples: s.Cnt,
P90Exec: s.P90Exec,
CapMaxConcurrency: capMax,
OldMaxConcurrency: oldMax,
NewMaxConcurrency: oldMax,
CapQueueLimit: capQueue,
OldQueueLimit: oldQueue,
NewQueueLimit: oldQueue,
ExpectedSeconds: m.ExpectedSeconds,
})
continue
}
// arrival_rate ≈ 完成数/3600
arrivalRate := float64(s.Cnt) / 3600.0
// desiredMax = ceil(arrivalRate * p90 / utilization)
desiredMax := int(math.Ceil(arrivalRate * s.P90Exec / utilization))
if desiredMax < 1 {
desiredMax = 1
}
// 单次变化幅度限制
minMax := int(math.Floor(float64(oldMax) * (1 - maxChangeRatio)))
maxMax := int(math.Ceil(float64(oldMax) * (1 + maxChangeRatio)))
if minMax < 1 {
minMax = 1
}
newMax := clampInt(desiredMax, minMax, maxMax)
if capMax > 0 {
newMax = clampInt(newMax, 1, capMax)
}
setRuntimeInt(ctx, runtimeMaxConcurrencyKey(modelName), newMax)
// queue_limitW_target = expected_seconds * queueFactor
exp := m.ExpectedSeconds
if exp <= 0 {
exp = 60
}
wTarget := float64(exp) * queueFactor
desiredQueue := int(math.Ceil(arrivalRate*wTarget)) + newMax
if desiredQueue < newMax {
desiredQueue = newMax
}
newQueue := oldQueue
if capQueue > 0 {
minQ := int(math.Floor(float64(oldQueue) * (1 - maxChangeRatio)))
maxQ := int(math.Ceil(float64(oldQueue) * (1 + maxChangeRatio)))
if minQ < newMax {
minQ = newMax
}
if maxQ < minQ {
maxQ = minQ
}
newQueue = clampInt(desiredQueue, minQ, maxQ)
newQueue = clampInt(newQueue, newMax, capQueue)
setRuntimeInt(ctx, runtimeQueueLimitKey(modelName), newQueue)
}
out = append(out, AutoTuneResult{
ModelName: modelName,
Samples: s.Cnt,
P90Exec: s.P90Exec,
CapMaxConcurrency: capMax,
OldMaxConcurrency: oldMax,
NewMaxConcurrency: newMax,
CapQueueLimit: capQueue,
OldQueueLimit: oldQueue,
NewQueueLimit: newQueue,
ExpectedSeconds: m.ExpectedSeconds,
})
}
g.Log().Infof(ctx, "[auto_tune] done models=%d windowSeconds=%d", len(out), windowSeconds)
return out, nil
}