272 lines
7.6 KiB
Go
272 lines
7.6 KiB
Go
package service
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"model-gateway/common/util"
|
||
"model-gateway/model/dto"
|
||
"model-gateway/service/gateway"
|
||
"os"
|
||
"path/filepath"
|
||
"strings"
|
||
"time"
|
||
"unicode/utf8"
|
||
|
||
"model-gateway/dao"
|
||
"model-gateway/model/entity"
|
||
|
||
"github.com/gogf/gf/v2/frame/g"
|
||
"github.com/gogf/gf/v2/os/grpool"
|
||
"github.com/tidwall/gjson"
|
||
)
|
||
|
||
var AsyncWorker = &asyncWorker{}
|
||
|
||
type asyncWorker struct {
|
||
}
|
||
|
||
// RunOnce 由上层定时任务触发:一次性抢占并处理一批任务
|
||
// - batchSize: 本次抢占数量
|
||
// - goroutines: 本次并发数(协程池大小)
|
||
func (w *asyncWorker) RunOnce(ctx context.Context, req *dto.RunWorkReq) (res *dto.RunWorkRes, err error) {
|
||
if req.BatchSize <= 0 {
|
||
req.BatchSize = 10
|
||
}
|
||
if req.Goroutines <= 0 {
|
||
req.Goroutines = 1
|
||
}
|
||
tasks, err := dao.Task.ClaimPendingGlobal(ctx, req.BatchSize)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if len(tasks) == 0 {
|
||
return nil, errors.New("no task to run")
|
||
}
|
||
pool := grpool.New(req.Goroutines)
|
||
defer pool.Close()
|
||
claimed := len(tasks)
|
||
done := make(chan struct{}, claimed)
|
||
for _, t := range tasks {
|
||
task := t
|
||
_ = pool.AddWithRecover(ctx, func(ctx context.Context) {
|
||
w.handleOne(ctx, task, 0)
|
||
done <- struct{}{}
|
||
}, func(ctx context.Context, e error) {
|
||
if e != nil {
|
||
_ = dao.Task.UpdateFailedGlobal(ctx, task.Id, fmt.Sprintf("worker panic: %v", e))
|
||
ReleaseQueueSlot(ctx, task.ModelName, task.TaskID)
|
||
}
|
||
done <- struct{}{}
|
||
})
|
||
}
|
||
for i := 0; i < claimed; i++ {
|
||
<-done
|
||
}
|
||
return &dto.RunWorkRes{
|
||
Claimed: claimed,
|
||
}, nil
|
||
}
|
||
|
||
// RunByTaskID 创建任务后立即异步尝试执行当前任务:
|
||
// - 只定向抢占当前 taskId 对应的 pending 任务
|
||
// - 若任务已被其它 worker 抢走/已不在 pending,则直接返回
|
||
func (w *asyncWorker) RunByTaskID(ctx context.Context, taskID string, epicycleId int64) error {
|
||
task, err := dao.Task.ClaimPendingByTaskIDGlobal(ctx, taskID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if task == nil {
|
||
return nil
|
||
}
|
||
w.handleOne(ctx, task, epicycleId)
|
||
return nil
|
||
}
|
||
|
||
func (w *asyncWorker) handleOne(ctx context.Context, t *entity.AsynchTask, epicycleId int64) {
|
||
// 从任务入库的 request_payload 里恢复 payload + headers
|
||
payload, headers := util.ParseStoredPayload(t.RequestPayload)
|
||
if len(headers) > 0 {
|
||
ctx = util.SetTaskHeadersToCtx(ctx, headers)
|
||
}
|
||
|
||
// 1) 拉取模型配置
|
||
m, err := dao.Model.GetByModelNameForTenant(ctx, t.TenantId, t.ModelName)
|
||
if err != nil {
|
||
_ = dao.Task.UpdateFailedGlobal(ctx, t.Id, err.Error())
|
||
ReleaseQueueSlot(ctx, t.ModelName, t.TaskID)
|
||
// ============ 失败回调 ============
|
||
t.State = 3
|
||
t.ErrorMsg = err.Error()
|
||
go gateway.TriggerCallback(context.WithoutCancel(ctx), t)
|
||
// ================================
|
||
return
|
||
}
|
||
if m == nil || (m.Enabled != nil && *m.Enabled != 1) {
|
||
errMsg := "模型不存在或未启用"
|
||
_ = dao.Task.UpdateFailedGlobal(ctx, t.Id, errMsg)
|
||
ReleaseQueueSlot(ctx, t.ModelName, t.TaskID)
|
||
// ============ 失败回调 ============
|
||
t.State = 3
|
||
t.ErrorMsg = errMsg
|
||
go gateway.TriggerCallback(context.WithoutCancel(ctx), t)
|
||
// ================================
|
||
return
|
||
}
|
||
|
||
// 2) 分布式并发限制
|
||
semKey := fmt.Sprintf("asynch:sem:%s", t.ModelName)
|
||
leaseSeconds := int64(3600)
|
||
maxC := GetRuntimeMaxConcurrency(ctx, t.ModelName, m.MaxConcurrency)
|
||
acquired, err := acquireSemaphore(ctx, semKey, maxC, leaseSeconds)
|
||
if err != nil {
|
||
_ = dao.Task.UpdateFailedGlobal(ctx, t.Id, err.Error())
|
||
ReleaseQueueSlot(ctx, t.ModelName, t.TaskID)
|
||
// ============ 失败回调 ============
|
||
t.State = 3
|
||
t.ErrorMsg = err.Error()
|
||
go gateway.TriggerCallback(context.WithoutCancel(ctx), t)
|
||
// ================================
|
||
return
|
||
}
|
||
if !acquired {
|
||
// 并发满了:放回排队,不回调(不是失败)
|
||
_ = w.rollbackToPending(ctx, t.Id)
|
||
return
|
||
}
|
||
defer func() {
|
||
_ = releaseSemaphore(ctx, semKey)
|
||
}()
|
||
|
||
// 3) 调用模型服务
|
||
if payload == nil {
|
||
payload = map[string]any{
|
||
"taskId": t.TaskID,
|
||
"inputRef": t.InputRef,
|
||
}
|
||
}
|
||
var (
|
||
data []byte
|
||
contentType string
|
||
ext string
|
||
textResult string
|
||
)
|
||
|
||
// phase=1 表示模型已成功但 OSS 上传失败:优先从临时文件加载
|
||
if t.Phase == 1 && strings.TrimSpace(t.TmpFile) != "" {
|
||
data, err = os.ReadFile(t.TmpFile)
|
||
if err == nil && len(data) > 0 {
|
||
contentType, ext = util.DetectFileType(data)
|
||
} else {
|
||
data = nil
|
||
}
|
||
}
|
||
if data == nil {
|
||
// 统计
|
||
_ = dao.Stat.IncRequestCount(ctx, time.Now(), int64(t.TenantId), t.Creator, t.ModelName)
|
||
// 核心调用
|
||
data, err = InvokeModel(ctx, m, payload, t.ModelKey)
|
||
if err != nil {
|
||
_ = dao.Task.UpdateFailedGlobal(ctx, t.Id, err.Error())
|
||
ReleaseQueueSlot(ctx, t.ModelName, t.TaskID)
|
||
// ============ 失败回调 ============
|
||
t.State = 3
|
||
t.ErrorMsg = err.Error()
|
||
go gateway.TriggerCallback(context.WithoutCancel(ctx), t)
|
||
// ================================
|
||
return
|
||
}
|
||
contentType, ext = util.DetectFileType(data)
|
||
if utf8.Valid(data) && (strings.HasPrefix(contentType, "text/") || contentType == "application/json") {
|
||
textResult = string(data)
|
||
}
|
||
tmpPath, err := saveTmpResult(t.TaskID, data, ext)
|
||
if err == nil && tmpPath != "" {
|
||
t.TmpFile = tmpPath
|
||
t.Phase = 1
|
||
_ = dao.Task.UpdateTmpAfterModelGlobal(ctx, t.Id, tmpPath)
|
||
}
|
||
}
|
||
|
||
// 4) 存储 OSS
|
||
ossURL, err := gateway.UploadByTask(ctx, t, data, ext, contentType)
|
||
if err != nil {
|
||
// OSS 阶段失败:保留临时文件,下一轮仅重试 OSS
|
||
_ = dao.Task.UpdateFailedKeepTmpGlobal(ctx, t.Id, err.Error())
|
||
ReleaseQueueSlot(ctx, t.ModelName, t.TaskID)
|
||
// ============ OSS失败不回调(还会重试) ============
|
||
// 注意:OSS失败保留临时文件,下次重试,所以这里不触发最终回调
|
||
// 如果已经重试多次还没成功,需要在任务超时或超过最大重试次数时才回调失败
|
||
return
|
||
}
|
||
|
||
// 5) 更新任务状态成功
|
||
fileType := strings.TrimPrefix(ext, ".")
|
||
if fileType == "" {
|
||
fileType = contentType
|
||
}
|
||
if err = dao.Task.UpdateSuccessGlobal(
|
||
ctx,
|
||
t.Id,
|
||
ossURL,
|
||
fileType,
|
||
textResult,
|
||
int64(len(data)),
|
||
nil,
|
||
GetExpendTokens(m.ResponseTokenField, textResult),
|
||
); err != nil {
|
||
g.Log().Errorf(ctx, "[worker] update success failed: %v", err)
|
||
return
|
||
}
|
||
|
||
// 成功/失败均不再占用 queue_limit
|
||
ReleaseQueueSlot(ctx, t.ModelName, t.TaskID)
|
||
|
||
// 6) 成功回调
|
||
t.State = 2
|
||
t.OssFile = ossURL
|
||
t.FileType = fileType
|
||
t.TextResult = textResult
|
||
g.Log().Infof(ctx, "[CALLBACK][DISPATCH] taskId=%s bizName=%s callbackUrl=%s", t.TaskID, t.BizName, t.CallbackURL)
|
||
go gateway.TriggerCallback(context.WithoutCancel(ctx), t)
|
||
// ============ 如果有 epicycleId,也触发业务回调 ============
|
||
if epicycleId != 0 {
|
||
go gateway.TriggerPromptsCallback(context.WithoutCancel(ctx), t, epicycleId)
|
||
}
|
||
|
||
// 成功后清理临时文件
|
||
_ = os.Remove(t.TmpFile)
|
||
}
|
||
|
||
// saveTmpResult 将模型输出写入临时文件,用于 OSS 上传失败后的“仅重试 OSS”。
|
||
func saveTmpResult(taskID string, data []byte, ext string) (string, error) {
|
||
dir := filepath.Join(os.TempDir(), "model-asynch")
|
||
if err := os.MkdirAll(dir, 0o755); err != nil {
|
||
return "", err
|
||
}
|
||
if ext == "" {
|
||
ext = ".bin"
|
||
}
|
||
if ext[0] != '.' {
|
||
ext = "." + ext
|
||
}
|
||
path := filepath.Join(dir, fmt.Sprintf("%s%s", taskID, ext))
|
||
if err := os.WriteFile(path, data, 0o644); err != nil {
|
||
return "", err
|
||
}
|
||
return path, nil
|
||
}
|
||
|
||
func (w *asyncWorker) rollbackToPending(ctx context.Context, id int64) error {
|
||
return dao.Task.RollbackToPendingGlobal(ctx, id)
|
||
}
|
||
|
||
// GetExpendTokens 根据映射路径从 textResult 中提取消耗 token 值
|
||
func GetExpendTokens(tokenMapping string, textResult string) int {
|
||
value := gjson.Get(textResult, tokenMapping)
|
||
if value.Exists() {
|
||
return int(value.Int())
|
||
}
|
||
return len(textResult)
|
||
}
|