fix: 移除已消费的文件URL并优化递归收集逻辑

This commit is contained in:
2026-06-11 16:49:11 +08:00
parent 73dd18baf4
commit ddc4e0be63

View File

@@ -105,19 +105,27 @@ func GetComposeResult(ctx context.Context, buildType int, modelName, promptConte
} }
var callbackUrl = utils.GetCallbackURL(ctx, "/flow/execution/composeCallBack") var callbackUrl = utils.GetCallbackURL(ctx, "/flow/execution/composeCallBack")
var consult = make([]flowDto.Consult, 0) var consult = make([]flowDto.Consult, 0)
var collectFileUrls func(val any) var collectFileUrls func(val any) (fullyConsumed bool)
collectFileUrls = func(val any) { collectFileUrls = func(val any) (fullyConsumed bool) {
switch { switch {
case g.NewVar(val).IsSlice(): case g.NewVar(val).IsSlice():
slice := gconv.SliceAny(val) slice := gconv.SliceAny(val)
allConsumed := false
for _, item := range slice { for _, item := range slice {
collectFileUrls(item) if collectFileUrls(item) {
allConsumed = true
}
} }
return allConsumed
case g.NewVar(val).IsMap(): case g.NewVar(val).IsMap():
m := gconv.Map(val) m := gconv.Map(val)
allConsumed := false
for _, item := range m { for _, item := range m {
collectFileUrls(item) if collectFileUrls(item) {
allConsumed = true
}
} }
return allConsumed
default: default:
s := gconv.String(val) s := gconv.String(val)
if s != "" { if s != "" {
@@ -127,13 +135,21 @@ func GetComposeResult(ctx context.Context, buildType int, modelName, promptConte
Type: getFileTypeByPath, Type: getFileTypeByPath,
Url: s, Url: s,
}) })
return true
} }
} }
return false
} }
} }
var newUserForm []map[string]any
for _, m := range userForm { for _, m := range userForm {
for _, v := range gconv.Map(m) { for k, v := range m {
collectFileUrls(v) if collectFileUrls(v) {
delete(m, k)
}
}
if len(m) > 0 {
newUserForm = append(newUserForm, m)
} }
} }
for _, v := range fileUrl { for _, v := range fileUrl {
@@ -152,7 +168,7 @@ func GetComposeResult(ctx context.Context, buildType int, modelName, promptConte
CallbackUrl: callbackUrl, CallbackUrl: callbackUrl,
Cause: cause, Cause: cause,
Form: form, Form: form,
UserForm: userForm, UserForm: newUserForm,
Consult: consult, Consult: consult,
SessionId: sessionId, SessionId: sessionId,
NodeId: nodeId, NodeId: nodeId,
@@ -236,17 +252,21 @@ func waitGatewayResult(ctx context.Context, taskId string) (map[string]any, erro
} }
task := new(flowDto.ModelCallbackReq) task := new(flowDto.ModelCallbackReq)
if err := gconv.Struct(waitRes, task); err != nil { if err = gconv.Struct(waitRes, task); err != nil {
return nil, err return nil, err
} }
if task.State == 3 || !g.IsEmpty(task.ErrorMsg) { if task.State == 3 || !g.IsEmpty(task.ErrorMsg) {
return nil, fmt.Errorf("模型执行失败:%s", task.ErrorMsg) return nil, fmt.Errorf("模型执行失败:%s", task.ErrorMsg)
} }
if g.IsEmpty(task.Messages) { if g.IsEmpty(task.Messages) {
return nil, fmt.Errorf("模型返回结果为空") return nil, fmt.Errorf("模型返回结果为空")
} }
// 获取远程文件内容
//file, err := GetFileBytesFromURL(ctx, task.OssFile)
//if err != nil {
// return nil, err
//}
//task.Messages = gconv.Map(file)
return task.Messages, nil return task.Messages, nil
} }
@@ -307,7 +327,6 @@ func GetModelResult(ctx context.Context, sessionId string, nodeInput *flowDto.No
return nil, fmt.Errorf("模型返回结果为空") return nil, fmt.Errorf("模型返回结果为空")
} }
// Update taskResultMap for next round (used by VideoModel)
if nodeInput.Config.NodeCode == node.NodeTypeVideoModel { if nodeInput.Config.NodeCode == node.NodeTypeVideoModel {
ext := GetFileTypeByPath(gconv.String(taskResult[modelInfo.Model.ResponseBody])) ext := GetFileTypeByPath(gconv.String(taskResult[modelInfo.Model.ResponseBody]))
if ext == "image" { if ext == "image" {
@@ -334,15 +353,12 @@ func GetModelResult(ctx context.Context, sessionId string, nodeInput *flowDto.No
taskIdList[idx] = taskId taskIdList[idx] = taskId
} }
// Step 2: Wait for all tasks in parallel
var wg sync.WaitGroup var wg sync.WaitGroup
errChan := make(chan error, len(taskIdList)) errChan := make(chan error, len(taskIdList))
for idx, taskId := range taskIdList { for idx, taskId := range taskIdList {
wg.Add(1) wg.Add(1)
// Pass idx and taskId as parameters to avoid loop variable capture bug
// This guarantees results are stored in the correct order matching original requests
go func(idx int, taskId string) { go func(idx int, taskId string) {
defer wg.Done() defer wg.Done()