fix(task): 修复任务状态更新和超时处理问题

This commit is contained in:
2026-06-11 11:27:14 +08:00
parent 196d2069ac
commit afd60caf56
4 changed files with 131 additions and 99 deletions

View File

@@ -21,8 +21,10 @@ import (
"model-gateway/service/gateway"
"model-gateway/service/queue"
"gitea.redpowerfuture.com/red-future/common/beans"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
)
@@ -92,7 +94,10 @@ func (w *asyncWorker) handleOne(ctx context.Context, task *entity.AsynchTask, mo
if err == nil && tmpPath != "" {
task.TmpFile = tmpPath
task.Phase = 1
_ = dao.Task.UpdateTmpAfterModelGlobal(ctx, task.Id, tmpPath)
_, err = dao.Task.Update(ctx, task)
if err != nil {
g.Log().Errorf(ctx, "[执行任务][失败] 更新数据库失败 taskId=%s err=%v", task.TaskID, err)
}
}
// 4) 解析校验 + 响应映射(可重试,失败重新调模型)
@@ -116,7 +121,14 @@ func (w *asyncWorker) handleOne(ctx context.Context, task *entity.AsynchTask, mo
g.Log().Errorf(ctx, "[执行任务][失败] OSS上传失败 taskId=%s attempt=%d/%d err=%v",
task.TaskID, attempt, maxRetry, err)
if attempt == maxRetry {
_ = dao.Task.UpdateFailedKeepTmpGlobal(ctx, task.Id, err.Error())
task.State = 3
task.ErrorMsg = err.Error()
task.FinishedAt = gtime.Now()
task.Phase = 1
_, err = dao.Task.Update(ctx, task)
if err != nil {
g.Log().Errorf(ctx, "[执行任务][失败] 更新数据库失败 taskId=%s err=%v", task.TaskID, err)
}
w.failTask(ctx, task, startTime, fmt.Sprintf("OSS上传重试耗尽: %v", err))
return
}
@@ -130,7 +142,7 @@ func (w *asyncWorker) handleOne(ctx context.Context, task *entity.AsynchTask, mo
task.TextResult = body
task.FileSize = int64(oss.FileSize)
if err = dao.Task.UpdateSuccessGlobal(ctx, task); err != nil {
if _, err = dao.Task.Update(ctx, task); err != nil {
g.Log().Errorf(ctx, "[执行任务][失败] 更新数据库失败 taskId=%s err=%v", task.TaskID, err)
return
}
@@ -170,7 +182,10 @@ func (w *asyncWorker) callModelStream(ctx context.Context, task *entity.AsynchTa
if tmpErr == nil && tmpPath != "" {
task.TmpFile = tmpPath
task.Phase = 1
_ = dao.Task.UpdateTmpAfterModelGlobal(ctx, task.Id, tmpPath)
_, err = dao.Task.Update(ctx, task)
if err != nil {
g.Log().Errorf(ctx, "[执行任务][失败] 临时文件保存失败 taskId=%s err=%v", task.TaskID, tmpErr)
}
}
}
@@ -258,7 +273,10 @@ func (w *asyncWorker) callModel(ctx context.Context, task *entity.AsynchTask, mo
if tmpErr == nil && tmpPath != "" {
task.TmpFile = tmpPath
task.Phase = 1
_ = dao.Task.UpdateTmpAfterModelGlobal(ctx, task.Id, tmpPath)
_, err = dao.Task.Update(ctx, task)
if err != nil {
g.Log().Errorf(ctx, "[执行任务][失败] 临时文件保存失败 taskId=%s err=%v", task.TaskID, tmpErr)
}
}
}
@@ -296,10 +314,11 @@ func (w *asyncWorker) parseAndRetry(ctx context.Context, body map[string]any, ta
}
// 2) 先存 token 到数据库,防止后续失败丢失
if tokens, ok := mapped[model.ResponseTokenField]; ok {
task.ExpendTokens = gconv.Int64(tokens)
_ = dao.Task.UpdateColumns(ctx, task.Id, entity.AsynchTask{
ExpendTokens: gconv.Int64(body[model.ResponseTokenField]),
if _, ok := mapped[model.ResponseTokenField]; ok {
task.ExpendTokens = gconv.Int64(mapped[model.ResponseTokenField])
_, err = dao.Task.Update(ctx, &entity.AsynchTask{
SQLBaseDO: beans.SQLBaseDO{Id: task.Id},
ExpendTokens: task.ExpendTokens,
})
}
@@ -483,7 +502,10 @@ func (w *asyncWorker) failTask(ctx context.Context, t *entity.AsynchTask, startT
t.State = 3
t.ErrorMsg = errMsg
t.DurationSeconds = int64(time.Since(startTime).Seconds())
_ = dao.Task.UpdateFailedGlobal(ctx, t)
_, err := dao.Task.Update(ctx, t)
if err != nil {
g.Log().Warningf(ctx, "[执行任务][更新数据库失败] taskId=%s err=%v", t.TaskID, err)
}
queue.ReleaseQueueSlot(ctx, t.ModelName, t.TaskID)
go gateway.TriggerCallback(context.WithoutCancel(ctx), t)
}