From ddc4e0be6395ff42dbc98d22c4ff195ebd519c9e Mon Sep 17 00:00:00 2001 From: qhd <1766646056@qq.com> Date: Thu, 11 Jun 2026 16:49:11 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E7=A7=BB=E9=99=A4=E5=B7=B2=E6=B6=88?= =?UTF-8?q?=E8=B4=B9=E7=9A=84=E6=96=87=E4=BB=B6URL=E5=B9=B6=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E9=80=92=E5=BD=92=E6=94=B6=E9=9B=86=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- workflow/service/flow/lambda_node_util.go | 44 +++++++++++++++-------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/workflow/service/flow/lambda_node_util.go b/workflow/service/flow/lambda_node_util.go index 9d72e9b..63020a6 100644 --- a/workflow/service/flow/lambda_node_util.go +++ b/workflow/service/flow/lambda_node_util.go @@ -105,19 +105,27 @@ func GetComposeResult(ctx context.Context, buildType int, modelName, promptConte } var callbackUrl = utils.GetCallbackURL(ctx, "/flow/execution/composeCallBack") var consult = make([]flowDto.Consult, 0) - var collectFileUrls func(val any) - collectFileUrls = func(val any) { + var collectFileUrls func(val any) (fullyConsumed bool) + collectFileUrls = func(val any) (fullyConsumed bool) { switch { case g.NewVar(val).IsSlice(): slice := gconv.SliceAny(val) + allConsumed := false for _, item := range slice { - collectFileUrls(item) + if collectFileUrls(item) { + allConsumed = true + } } + return allConsumed case g.NewVar(val).IsMap(): m := gconv.Map(val) + allConsumed := false for _, item := range m { - collectFileUrls(item) + if collectFileUrls(item) { + allConsumed = true + } } + return allConsumed default: s := gconv.String(val) if s != "" { @@ -127,13 +135,21 @@ func GetComposeResult(ctx context.Context, buildType int, modelName, promptConte Type: getFileTypeByPath, Url: s, }) + return true } } + return false } } + var newUserForm []map[string]any for _, m := range userForm { - for _, v := range gconv.Map(m) { - collectFileUrls(v) + for k, v := range m { + if collectFileUrls(v) { + delete(m, k) + } + } + if len(m) > 0 { + newUserForm = append(newUserForm, m) } } for _, v := range fileUrl { @@ -152,7 +168,7 @@ func GetComposeResult(ctx context.Context, buildType int, modelName, promptConte CallbackUrl: callbackUrl, Cause: cause, Form: form, - UserForm: userForm, + UserForm: newUserForm, Consult: consult, SessionId: sessionId, NodeId: nodeId, @@ -236,17 +252,21 @@ func waitGatewayResult(ctx context.Context, taskId string) (map[string]any, erro } task := new(flowDto.ModelCallbackReq) - if err := gconv.Struct(waitRes, task); err != nil { + if err = gconv.Struct(waitRes, task); err != nil { return nil, err } - if task.State == 3 || !g.IsEmpty(task.ErrorMsg) { return nil, fmt.Errorf("模型执行失败:%s", task.ErrorMsg) } if g.IsEmpty(task.Messages) { return nil, fmt.Errorf("模型返回结果为空") } - + // 获取远程文件内容 + //file, err := GetFileBytesFromURL(ctx, task.OssFile) + //if err != nil { + // return nil, err + //} + //task.Messages = gconv.Map(file) return task.Messages, nil } @@ -307,7 +327,6 @@ func GetModelResult(ctx context.Context, sessionId string, nodeInput *flowDto.No return nil, fmt.Errorf("模型返回结果为空") } - // Update taskResultMap for next round (used by VideoModel) if nodeInput.Config.NodeCode == node.NodeTypeVideoModel { ext := GetFileTypeByPath(gconv.String(taskResult[modelInfo.Model.ResponseBody])) if ext == "image" { @@ -334,15 +353,12 @@ func GetModelResult(ctx context.Context, sessionId string, nodeInput *flowDto.No taskIdList[idx] = taskId } - // Step 2: Wait for all tasks in parallel var wg sync.WaitGroup errChan := make(chan error, len(taskIdList)) for idx, taskId := range taskIdList { wg.Add(1) - // Pass idx and taskId as parameters to avoid loop variable capture bug - // This guarantees results are stored in the correct order matching original requests go func(idx int, taskId string) { defer wg.Done()