diff --git a/config.yml b/config.yml index ef1c857..8da4bef 100644 --- a/config.yml +++ b/config.yml @@ -28,10 +28,10 @@ database: timeMaintainDisabled: false # (可选)是否完全关闭时间更新特性,为true时CreatedAt/UpdatedAt/DeletedAt都将失效 model_gateway: - type: "pgsql" - host: "116.204.74.41" - port: "15432" + host: "192.168.3.30" + port: "5432" user: "postgres" - pass: "Bjang09@686^*^" + pass: "123456" name: "model-gateway" prefix: "" role: "master" diff --git a/service/task/worker.go b/service/task/worker.go index b18f399..7bad2ec 100644 --- a/service/task/worker.go +++ b/service/task/worker.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "net/http" - "os" "strings" "sync" "time" @@ -55,7 +54,7 @@ func (w *asyncWorker) handleOne(ctx context.Context, task *entity.ModelGatewayTa switch { case model.CallMode != nil && *model.CallMode == public.CallModeStream: - rawBytes, err = w.callModelStream(ctx, task, model, body) + rawBytes, err = InvokeModel(ctx, model, body) if err == nil { result, err = util.ParseStreamResponse(rawBytes, model.StreamConfig) } @@ -136,37 +135,6 @@ func (w *asyncWorker) handleOne(ctx context.Context, task *entity.ModelGatewayTa task.TaskID, task.DurationSeconds, oss.FileFormat) } -// callModelStream 调用模型,返回原始字节(不做响应映射,用于流式输出) -func (w *asyncWorker) callModelStream(ctx context.Context, task *entity.ModelGatewayTask, model *entity.ModelGatewayModel, body map[string]any) ([]byte, error) { - var data []byte - var err error - - if task.Phase == 1 && strings.TrimSpace(task.TmpFile) != "" { - data, err = os.ReadFile(task.TmpFile) - if err != nil || len(data) == 0 { - data = nil - } - } - - if data == nil { - data, err = InvokeModel(ctx, model, body) - if err != nil { - return nil, err - } - tmpPath, tmpErr := util.SaveTmpResult(task.TaskID, data, "") - if tmpErr == nil && tmpPath != "" { - task.TmpFile = tmpPath - task.Phase = 1 - _, err = dao.ModelGatewayTask.Update(ctx, task) - if err != nil { - g.Log().Errorf(ctx, "[执行任务][失败] 临时文件保存失败 taskId=%s err=%v", task.TaskID, tmpErr) - } - } - } - - return data, nil -} - // asyncResult 异步任务结果 type asyncResult struct { result map[string]any @@ -219,54 +187,23 @@ func NotifyAsyncResult(taskID string, result map[string]any, err error) { } } -// callModel 调用模型 + 检测文件类型 + 保存临时文件 -// 返回: 解析后的响应体, error +// callModel 调用模型 + 提取文本结果 func (w *asyncWorker) callModel(ctx context.Context, task *entity.ModelGatewayTask, model *entity.ModelGatewayModel, body map[string]any) (map[string]any, error) { - var data []byte - var err error - - // 1) 如果已有临时文件且 phase=1,直接读取 - if task.Phase == 1 && strings.TrimSpace(task.TmpFile) != "" { - data, err = os.ReadFile(task.TmpFile) - if err != nil || len(data) == 0 { - g.Log().Warningf(ctx, "[callModel] 读取临时文件失败,重新调用模型 taskId=%s err=%v", task.TaskID, err) - data = nil - } + data, err := InvokeModel(ctx, model, body) + if err != nil { + return nil, err } - // 2) 没有可用数据,调用模型 - if data == nil { - data, err = InvokeModel(ctx, model, body) - if err != nil { - return nil, err - } - - // 3) 检测文件类型,保存临时文件 - _, ext := util.DetectFileType(data) - tmpPath, tmpErr := util.SaveTmpResult(task.TaskID, data, ext) - if tmpErr == nil && tmpPath != "" { - task.TmpFile = tmpPath - task.Phase = 1 - _, err = dao.ModelGatewayTask.Update(ctx, task) - if err != nil { - g.Log().Errorf(ctx, "[执行任务][失败] 临时文件保存失败 taskId=%s err=%v", task.TaskID, tmpErr) - } - } - } - - // 4) 检测文件类型,提取文本结果 contentType, _ := util.DetectFileType(data) var textResult string if utf8.Valid(data) && (strings.HasPrefix(contentType, "text/") || contentType == "application/json") { textResult = string(data) } - // 5) 非文本内容,返回错误 if textResult == "" { return nil, fmt.Errorf("模型返回非文本内容,contentType=%s", contentType) } - // 6) 解析并返回 return gjson.New(textResult).Map(), nil }