3 Commits

8 changed files with 217 additions and 380 deletions

View File

@@ -15,17 +15,19 @@ const (
NodeNameVideoModel = "生成视频" NodeNameVideoModel = "生成视频"
NodeNameAudioModel = "生成音频" NodeNameAudioModel = "生成音频"
NodeNameBatchModel = "批量处理一起返回" NodeNameBatchModel = "批量处理一起返回"
NodeNameDataConversionModel = "参数转换"
NodeNameSenseOptimizeModel = "语义优化" NodeNameSenseOptimizeModel = "语义优化"
NodeNameStoryOptimizeModel = "分镜优化" NodeNameStoryOptimizeModel = "分镜优化"
NodeNameScriptOptimizeModel = "剧本优化" NodeNameScriptOptimizeModel = "剧本优化"
NodeNameDataConversionModel = "参数转换"
NodeNameModel = "模型" NodeNameModel = "模型"
NodeNameMerge = "结果合并" NodeNameMerge = "结果合并"
NodeNameDataMerge = "结果汇集" NodeNameDataMerge = "结果汇集"
NodeNameJudge = "条件判断" NodeNameJudge = "条件判断"
NodeNameLoop = "循环"
NodeNameForm = "表单" NodeNameForm = "表单"
NodeNameHttp = "HTTP(S)接口" NodeNameHttp = "HTTP(S)接口"
NodeNameCustomNode = "自定义节点" NodeNameCustomNode = "自定义节点"
NodeNameSystemSum = "系统-结果汇总"
) )
// 表单字段 Label // 表单字段 Label
@@ -54,20 +56,22 @@ const (
NodeTypeAudioModel NodeType = "audio_model" NodeTypeAudioModel NodeType = "audio_model"
NodeTypeBatchModel NodeType = "batch_model" NodeTypeBatchModel NodeType = "batch_model"
NodeTypeDataConversionModel NodeType = "data_conversion_model"
NodeTypeSenseOptimizeModel NodeType = "sense_optimize_model" NodeTypeSenseOptimizeModel NodeType = "sense_optimize_model"
NodeTypeStoryOptimizeModel NodeType = "story_optimize_model" NodeTypeStoryOptimizeModel NodeType = "story_optimize_model"
NodeTypeScriptOptimizeModel NodeType = "script_optimize_model" NodeTypeScriptOptimizeModel NodeType = "script_optimize_model"
// 基础 // 基础
NodeTypeDataConversionModel NodeType = "data_conversion_model" NodeTypeModel NodeType = "model"
NodeTypeModel NodeType = "model" NodeTypeMerge NodeType = "merge"
NodeTypeMerge NodeType = "merge" NodeTypeDataMerge NodeType = "data_merge"
NodeTypeDataMerge NodeType = "data_merge" NodeTypeJudge NodeType = "judge"
NodeTypeJudge NodeType = "judge" NodeTypeForm NodeType = "form"
NodeTypeForm NodeType = "form" NodeTypeIntent NodeType = "intent"
NodeTypeIntent NodeType = "intent" NodeTypeHttp NodeType = "http"
NodeTypeHttp NodeType = "http"
// 自定义 // 自定义
NodeTypeCustomNode NodeType = "custom_node" NodeTypeCustomNode NodeType = "custom_node"
// 系统
NodeTypeSystemSum NodeType = "system_sum"
) )
const ( const (

View File

@@ -8,7 +8,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"gitea.com/red-future/common/db/gfdb" "gitea.redpowerfuture.com/red-future/common/db/gfdb"
"github.com/gogf/gf/v2/util/gconv" "github.com/gogf/gf/v2/util/gconv"
) )

View File

@@ -128,11 +128,12 @@ type ComposeCallbackReq struct {
type ModelCallbackReq struct { type ModelCallbackReq struct {
g.Meta `path:"/modelCallback" method:"post" tags:"提示词处理" summary:"model-gateway 回调" dc:"model-gateway 成功后 GET 回调callbackUrl/{bizName}"` g.Meta `path:"/modelCallback" method:"post" tags:"提示词处理" summary:"model-gateway 回调" dc:"model-gateway 成功后 GET 回调callbackUrl/{bizName}"`
TaskId string `p:"task_id" json:"task_id" v:"required#task_id不能为空" dc:"网关任务ID"` TaskId string `p:"task_id" json:"task_id" v:"required#task_id不能为空" dc:"网关任务ID"`
State int `p:"state" json:"state" dc:"网关任务状态"` State int `p:"state" json:"state" dc:"网关任务状态"`
OssFile string `p:"oss_file" json:"oss_file" dc:"结果文件地址"` OssFile string `p:"oss_file" json:"oss_file" dc:"结果文件地址"`
FileType string `p:"file_type" json:"file_type" dc:"结果文件类型"` FileType string `p:"file_type" json:"file_type" dc:"结果文件类型"`
ErrorMsg string `json:"error_msg"` Messages map[string]any `json:"messages"`
ErrorMsg string `json:"error_msg"`
} }
type VideoCallbackReq struct { type VideoCallbackReq struct {

View File

@@ -363,49 +363,12 @@ func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.Execute
} }
return nil, errors.New("文件格式不支持") return nil, errors.New("文件格式不支持")
} }
// =========================================================================
// ✅【第1步】给所有判断节点自动生成意图识别节点
// =========================================================================
judge2IntentNodeMap := make(map[string]string)
finalNodes := make([]entity.FlowNode, 0, len(req.FlowContent.Nodes)*2)
for _, item := range req.FlowContent.Nodes {
finalNodes = append(finalNodes, item)
// 判断节点自动加 intent 节点
if item.NodeCode == node.NodeTypeJudge {
intentNodeID := fmt.Sprintf("intent_%s", item.Id)
intentNode := entity.FlowNode{
Id: intentNodeID,
NodeCode: node.NodeTypeIntent,
Name: fmt.Sprintf("意图识别-%s", item.Name),
InputSource: item.InputSource, // ✅ 正确赋值
FormConfig: item.FormConfig, // ✅ 用户配置
ModelConfig: item.ModelConfig, // ✅ 系统配置
}
finalNodes = append(finalNodes, intentNode)
judge2IntentNodeMap[item.Id] = intentNodeID
}
}
summaryNodeID := "summary_node"
summaryNode := entity.FlowNode{
Id: summaryNodeID,
NodeCode: node.NodeTypeCustomNode, // 复用自定义节点类型,也可新增专属类型
Name: "结果汇总节点",
InputSource: []entity.FlowNodeInputSource{}, // 后续自动聚合所有节点输出
FormConfig: nil,
ModelConfig: node.ModelItem{},
}
finalNodes = append(finalNodes, summaryNode)
// 替换节点列表
req.FlowContent.Nodes = finalNodes
// ========================================================================= // =========================================================================
// ✅【第2步】构建执行图 // ✅【第2步】构建执行图
// ========================================================================= // =========================================================================
var nodeList []entity.FlowNode
var runGraph compose.Runnable[any, any] var runGraph compose.Runnable[any, any]
runGraph, err = BuildGraphFromFlowContent(execCtx, req.FlowContent, judge2IntentNodeMap, summaryNodeID) nodeList, runGraph, err = BuildGraphFromFlowContent(execCtx, req.FlowContent)
if err != nil { if err != nil {
executionReq := flowDto.UpdateFlowExecutionReq{ executionReq := flowDto.UpdateFlowExecutionReq{
Id: executionId, Id: executionId,
@@ -418,7 +381,6 @@ func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.Execute
} }
return nil, fmt.Errorf("执行工作流失败: %v", err) return nil, fmt.Errorf("执行工作流失败: %v", err)
} }
// ========================================================================= // =========================================================================
// ✅【第3步】构建 ConfigMap // ✅【第3步】构建 ConfigMap
// ========================================================================= // =========================================================================
@@ -426,15 +388,9 @@ func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.Execute
for _, cfg := range req.NodeInputParams { for _, cfg := range req.NodeInputParams {
configMap[cfg.Id] = cfg configMap[cfg.Id] = cfg
} }
// 自动给意图节点复制配置 for _, i := range nodeList {
for judgeID, intentID := range judge2IntentNodeMap { configMap[i.Id] = &i
if cfg, ok := configMap[judgeID]; ok {
configMap[intentID] = cfg
}
} }
// 初始化汇总节点配置
configMap[summaryNodeID] = &summaryNode
// ========================================================================= // =========================================================================
// ✅【第4步】构建全局执行入参现在 schemaMap 是有值的!) // ✅【第4步】构建全局执行入参现在 schemaMap 是有值的!)
// ========================================================================= // =========================================================================
@@ -475,7 +431,7 @@ func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.Execute
} }
// BuildGraphFromFlowContent 根据前端保存的工作流JSON自动构建执行图 // BuildGraphFromFlowContent 根据前端保存的工作流JSON自动构建执行图
func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo, judge2IntentNodeMap map[string]string, summaryNodeID string) (compose.Runnable[any, any], error) { func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo) ([]entity.FlowNode, compose.Runnable[any, any], error) {
// 注册自定义合并函数:处理 *flowDto.FlowExecutionInput 类型合并 // 注册自定义合并函数:处理 *flowDto.FlowExecutionInput 类型合并
// 由于 ConfigMap 是 map 引用类型,所有并行分支修改已经写入共享内存 // 由于 ConfigMap 是 map 引用类型,所有并行分支修改已经写入共享内存
// 直接返回第一个实例即可,所有修改都已经可见 // 直接返回第一个实例即可,所有修改都已经可见
@@ -488,9 +444,26 @@ func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo
}) })
graph := compose.NewGraph[any, any]() graph := compose.NewGraph[any, any]()
nodeMap := make(map[string]entity.FlowNode)
var nodeList []entity.FlowNode
nodeId := uuid.NewString()
originalEndNodes := findEndNodes(flowContent.StartNodeId, flowContent.Edges)
for i := range originalEndNodes {
sprintf := fmt.Sprintf("%v_%d", nodeId, i)
summaryNode := entity.FlowNode{
Id: sprintf,
NodeCode: node.NodeTypeSystemSum,
Name: node.NodeNameSystemSum,
InputSource: []entity.FlowNodeInputSource{}, // 后续自动聚合所有节点输出
FormConfig: nil,
ModelConfig: node.ModelItem{},
}
nodeList = append(nodeList, summaryNode)
flowContent.Nodes = append(flowContent.Nodes, summaryNode)
}
// 注册所有节点 // 注册所有节点
nodeMap := make(map[string]entity.FlowNode)
for _, item := range flowContent.Nodes { for _, item := range flowContent.Nodes {
nodeMap[item.Id] = item nodeMap[item.Id] = item
if item.NodeCode != node.NodeTypeJudge { if item.NodeCode != node.NodeTypeJudge {
@@ -498,6 +471,16 @@ func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo
} }
} }
// 注册所有边
if flowContent.StartNodeId != "" {
_ = graph.AddEdge(compose.START, flowContent.StartNodeId)
}
for i, endID := range originalEndNodes {
sprintf := fmt.Sprintf("%v_%d", nodeId, i)
_ = graph.AddEdge(endID, sprintf)
_ = graph.AddEdge(sprintf, compose.END)
}
// 构建边关系 // 构建边关系
upstreamMap := make(map[string][]string) upstreamMap := make(map[string][]string)
edgeMap := make(map[string][]entity.FlowEdge) edgeMap := make(map[string][]entity.FlowEdge)
@@ -510,15 +493,8 @@ func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo
for fromNodeID, edges := range edgeMap { for fromNodeID, edges := range edgeMap {
fromNode := nodeMap[fromNodeID] fromNode := nodeMap[fromNodeID]
// --------------------------
// 判断节点 → 分支处理 // 判断节点 → 分支处理
// --------------------------
if fromNode.NodeCode == node.NodeTypeJudge { if fromNode.NodeCode == node.NodeTypeJudge {
intentNodeID, ok := judge2IntentNodeMap[fromNodeID]
if !ok {
return nil, fmt.Errorf("判断节点[%s]未生成意图节点", fromNodeID)
}
branchMap := make(map[string]bool) branchMap := make(map[string]bool)
for _, e := range edges { for _, e := range edges {
branchMap[e.To] = true branchMap[e.To] = true
@@ -553,11 +529,6 @@ func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo
m["branch_id_name_map"] = branchIdNameMap // 传递ID-名称映射 m["branch_id_name_map"] = branchIdNameMap // 传递ID-名称映射
currentConfig.Config = m currentConfig.Config = m
// 从意图节点取输出
if intentCfg, ok := execInput.ConfigMap[intentNodeID]; ok {
currentConfig.OutputResult = intentCfg.OutputResult
}
// 关键修改:构造 NodeExecutionInput 传入 JudgeLambda // 关键修改:构造 NodeExecutionInput 传入 JudgeLambda
nodeExecInput := &flowDto.NodeExecutionInput{ nodeExecInput := &flowDto.NodeExecutionInput{
Config: currentConfig, // 当前判断节点配置 Config: currentConfig, // 当前判断节点配置
@@ -566,34 +537,21 @@ func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo
return JudgeLambda(ctx, nodeExecInput) // 传入 NodeExecutionInput 类型 return JudgeLambda(ctx, nodeExecInput) // 传入 NodeExecutionInput 类型
} }
_ = graph.AddBranch(intentNodeID, compose.NewGraphBranch(judgeLambda, branchMap)) _ = graph.AddBranch(upstreamMap[fromNodeID][0], compose.NewGraphBranch(judgeLambda, branchMap))
continue continue
} }
// --------------------------
// 普通节点连线 // 普通节点连线
// --------------------------
for _, e := range edges { for _, e := range edges {
toNode := nodeMap[e.To] toNode := nodeMap[e.To]
if toNode.NodeCode == node.NodeTypeJudge { if toNode.NodeCode == node.NodeTypeJudge {
_ = graph.AddEdge(e.From, fmt.Sprintf("intent_%s", toNode.Id))
continue continue
} }
_ = graph.AddEdge(e.From, e.To) _ = graph.AddEdge(e.From, e.To)
} }
} }
compile, err := graph.Compile(ctx, compose.WithGraphName("auto_build_workflow"))
// ==================== 第四步:处理开始/结束节点 ==================== return nodeList, compile, err
if flowContent.StartNodeId != "" {
_ = graph.AddEdge(compose.START, flowContent.StartNodeId)
}
originalEndNodes := findEndNodes(flowContent.StartNodeId, flowContent.Edges)
for _, endID := range originalEndNodes {
_ = graph.AddEdge(endID, summaryNodeID)
}
_ = graph.AddEdge(summaryNodeID, compose.END)
return graph.Compile(ctx, compose.WithGraphName("auto_build_workflow"), compose.WithNodeTriggerMode(compose.AllPredecessor))
} }
// -------------------------- 节点自动注册器(核心分发) -------------------------- // -------------------------- 节点自动注册器(核心分发) --------------------------
@@ -664,9 +622,18 @@ func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNod
// 执行节点 // 执行节点
_, err = lambda(ctx, realInput) _, err = lambda(ctx, realInput)
durationMs := time.Since(startTime).Milliseconds() durationMs := time.Since(startTime).Milliseconds()
// 上传OSS每条独立上传
ossResult1, err := Upload(ctx, &dto.UploadFileBytesReq{
FileBytes: gconv.Bytes(gconv.String(realInput)),
FileName: fmt.Sprintf("nodeInput:%v.txt", time.Now().UnixMilli()),
})
if err != nil {
return nil, err
}
updateReq := &nodeDto.UpdateNodeExecutionReq{ updateReq := &nodeDto.UpdateNodeExecutionReq{
Id: nodeExecutionId, Id: nodeExecutionId,
DurationMs: durationMs, OutputParamsPath: ossResult1.FileURL,
DurationMs: durationMs,
} }
if err != nil { if err != nil {
// 执行失败,更新状态 // 执行失败,更新状态
@@ -680,15 +647,7 @@ func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNod
}) })
return nil, err return nil, err
} }
// 上传OSS每条独立上传
ossResult1, err := Upload(ctx, &dto.UploadFileBytesReq{
FileBytes: gconv.Bytes(gconv.String(realInput)),
FileName: fmt.Sprintf("nodeInput:%v.txt", time.Now().UnixMilli()),
})
if err != nil {
return nil, err
}
updateReq.OutputParamsPath = ossResult1.FileURL
// 执行成功,更新状态 // 执行成功,更新状态
updateReq.Status = node.NodeExecutionStatusSuccess.Code() updateReq.Status = node.NodeExecutionStatusSuccess.Code()
_, _ = nodeDao.NodeExecutionDao.Update(ctx, updateReq) _, _ = nodeDao.NodeExecutionDao.Update(ctx, updateReq)
@@ -702,13 +661,11 @@ func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNod
return execInput, nil return execInput, nil
} }
} }
if nodeID == "summary_node" {
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(SummaryLambda)))
return
}
switch code { switch code {
case "__start__": case "__start__":
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(StartLambda))) _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(StartLambda)))
case node.NodeTypeSystemSum:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(SummaryLambda)))
case node.NodeTypeTextModel: case node.NodeTypeTextModel:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(TextModelLambda))) _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(TextModelLambda)))
case node.NodeTypeImageModel: case node.NodeTypeImageModel:
@@ -721,12 +678,6 @@ func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNod
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(BatchModelLambda))) _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(BatchModelLambda)))
case node.NodeTypeDataConversionModel: case node.NodeTypeDataConversionModel:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(DataConversionLambda))) _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(DataConversionLambda)))
//case node.NodeTypeSenseOptimizeModel:
// _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(SenseOptimizeModelLambda)))
//case node.NodeTypeStoryOptimizeModel:
// _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(StoryOptimizeModelLambda)))
//case node.NodeTypeScriptOptimizeModel:
// _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(ScriptOptimizeModelLambda)))
case node.NodeTypeCustomNode: case node.NodeTypeCustomNode:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(CustomLambda))) _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(CustomLambda)))
case node.NodeTypeForm: case node.NodeTypeForm:
@@ -736,49 +687,42 @@ func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNod
case node.NodeTypeMerge: case node.NodeTypeMerge:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(MergeLambda))) _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(MergeLambda)))
case node.NodeTypeDataMerge: case node.NodeTypeDataMerge:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(DataMergeLambda))) _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(DataMergeLambda)), compose.WithGraphCompileOptions(compose.WithNodeTriggerMode(compose.AllPredecessor)))
case node.NodeTypeHttp: case node.NodeTypeHttp:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(HttpLambda))) _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(HttpLambda)))
} }
} }
// --------------------------------------------------------------------
// ✅【工具方法】找出所有没有出边的节点 → 作为结束节点连接 END
// --------------------------------------------------------------------
func findEndNodes(startNodeId string, edges []entity.FlowEdge) []string { func findEndNodes(startNodeId string, edges []entity.FlowEdge) []string {
// 构建 节点 → 后续节点 的映射
nextMap := make(map[string][]string) nextMap := make(map[string][]string)
for _, e := range edges { for _, e := range edges {
nextMap[e.From] = append(nextMap[e.From], e.To) nextMap[e.From] = append(nextMap[e.From], e.To)
} }
endNodeSet := make(map[string]struct{}) endNodeSet := make(map[string]struct{})
visited := make(map[string]struct{})
queue := []string{startNodeId}
// 🚀 只从【开始节点】递归遍历(关键修复) for len(queue) > 0 {
findLeafNodes(startNodeId, nextMap, endNodeSet) node := queue[0]
queue = queue[1:]
// 转成数组返回 if _, exist := visited[node]; exist {
endNodes := make([]string, 0, len(endNodeSet)) continue
for id := range endNodeSet { }
endNodes = append(endNodes, id) visited[node] = struct{}{}
}
return endNodes nextList := nextMap[node]
} if len(nextList) == 0 {
endNodeSet[node] = struct{}{}
// -------------------------------------------------------------------- continue
// ✅ 递归:查找以 nodeId 开头的所有叶子节点 }
// -------------------------------------------------------------------- queue = append(queue, nextList...)
func findLeafNodes(nodeId string, nextMap map[string][]string, endNodeSet map[string]struct{}) {
nextNodes := nextMap[nodeId]
// 🚩 没有下一个节点 = 真实结束节点
if len(nextNodes) == 0 {
endNodeSet[nodeId] = struct{}{}
return
}
// 递归继续找下一个
for _, nextId := range nextNodes {
findLeafNodes(nextId, nextMap, endNodeSet)
} }
res := make([]string, 0, len(endNodeSet))
for k := range endNodeSet {
res = append(res, k)
}
return res
} }

View File

@@ -41,6 +41,20 @@ func JudgeLambda(ctx context.Context, input any) (string, error) {
if !ok { if !ok {
return "", fmt.Errorf("入参类型错误,期望 *flowDto.NodeExecutionInput实际 %T", input) return "", fmt.Errorf("入参类型错误,期望 *flowDto.NodeExecutionInput实际 %T", input)
} }
//inputMap, outputMap, modelMap := GetNodeContextContent(nodeInput.Global, nodeInput.Config)
//fmt.Printf("JudgeLambda路由输入=%s\n", gjson.MustEncode(inputMap))
//fmt.Printf("JudgeLambda路由输出=%s\n", gjson.MustEncode(outputMap))
//fmt.Printf("JudgeLambda路由模型=%s\n", gjson.MustEncode(modelMap))
//configMap := gconv.Map(nodeInput.Config.Config)
//ids := gconv.Strings(configMap["branch_ids"])
//fmt.Printf("JudgeLambda路由目标节点ID=%s\n", gconv.String(ids))
//
//m := map[string]bool{
// "80000a50-81e1-4c15-adae-aab6c0d781ad": true,
// "59a6ffa2-3252-4535-b6ed-d3e49cdf6c55": true,
//}
//
//return m, nil
// 1. 直接用你原来的方法(返回两个 map // 1. 直接用你原来的方法(返回两个 map
inputMap, outputMap, modelMap := GetNodeContextContent(nodeInput.Global, nodeInput.Config) inputMap, outputMap, modelMap := GetNodeContextContent(nodeInput.Global, nodeInput.Config)
var outputResult []node.NodeFormField var outputResult []node.NodeFormField
@@ -125,75 +139,62 @@ func BatchModelLambda(ctx context.Context, input any) (any, error) {
} }
} }
} }
// 结果按索引存放,切片不同下标并发写无竞争,不用锁 // 结果按索引存放,保证顺序
res := make([][]node.NodeFormField, len(reqMap)) res := make([][]node.NodeFormField, len(reqMap))
var wg sync.WaitGroup var wg sync.WaitGroup
// 用一个通道标记是否完成
done := make(chan struct{})
// 错误只存一个
var execErr error
subCtx, cancel := context.WithCancel(ctx) // 并发执行
defer cancel()
// 缓冲1错误通道仅接收第一个错误
errCh := make(chan error, 1)
// 并发执行任务
for idx, item := range reqMap { for idx, item := range reqMap {
wg.Add(1) wg.Add(1)
go func(idx int, userItem map[string]any) { go func(idx int, userItem map[string]any) {
defer wg.Done() defer wg.Done()
// 上下文已取消则直接退出
select {
case <-subCtx.Done():
return
default:
}
singleUserFrom := []map[string]any{userItem} singleUserFrom := []map[string]any{userItem}
output, err := TextNode(subCtx, nodeInput, skillName, from, singleUserFrom) output, err := TextNode(ctx, nodeInput, skillName, from, singleUserFrom)
if err != nil { if err != nil {
// 仅第一个错误写入通道 // 并发安全赋值错误
select { if execErr == nil {
case errCh <- err: execErr = err
cancel() // 触发全局取消,其他协程快速退出
default:
} }
return return
} }
// 直接按原索引写,顺序绝对正确
res[idx] = output res[idx] = output
}(idx, item) }(idx, item)
} }
// 任务全部结束后关闭错误通道 // 后台等待所有协程完成,然后关闭 done 通道
go func() { go func() {
wg.Wait() wg.Wait()
close(errCh) close(done)
}() }()
// ========== 修正后的等待逻辑 ========== // 等待全部完成
var execErr error <-done
select {
// 优先捕获业务错误 // 如果有错误,直接返回
case execErr = <-errCh: if execErr != nil {
if execErr != nil { return nil, execErr
// 收到真实业务错误,等待剩余协程收尾后返回
wg.Wait()
return nil, execErr
}
// execErr == nil 代表通道关闭、无任何错误,走到下方返回完整结果
case <-subCtx.Done():
// 上下文被取消阻塞读完errCh确认是否存在业务错误
execErr = <-errCh
} }
// 拼接输出结果 // 全局自增 i
var globalIndex int var globalIndex int
var outputRes []node.NodeFormField var outputRes []node.NodeFormField
for _, items := range res { for _, items := range res {
for _, item := range items { for _, item := range items {
// 1. 拿到原来的 Field例如 "text_content:2:0"
oldField := item.Field oldField := item.Field
// 2. 找到最后一个 : 的位置
if idx := strings.LastIndex(oldField, ":"); idx != -1 { if idx := strings.LastIndex(oldField, ":"); idx != -1 {
// 3. 截断前面部分,拼接上新的 globalIndex
item.Field = oldField[:idx+1] + fmt.Sprint(globalIndex) item.Field = oldField[:idx+1] + fmt.Sprint(globalIndex)
} }
// Label 同理
oldLabel := item.Label oldLabel := item.Label
if idx := strings.LastIndex(oldLabel, ":"); idx != -1 { if idx := strings.LastIndex(oldLabel, ":"); idx != -1 {
item.Label = oldLabel[:idx+1] + fmt.Sprint(globalIndex) item.Label = oldLabel[:idx+1] + fmt.Sprint(globalIndex)
@@ -219,6 +220,8 @@ func TextModelLambda(ctx context.Context, input any) (any, error) {
return nil, err return nil, err
} }
nodeInput.Config.OutputResult = outputRes nodeInput.Config.OutputResult = outputRes
//}
return nodeInput, nil return nodeInput, nil
} }
@@ -296,14 +299,13 @@ func VideoModelLambda(ctx context.Context, input any) (any, error) {
Label: fmt.Sprintf("video_oss_url:content:%d", 0), Label: fmt.Sprintf("video_oss_url:content:%d", 0),
Type: "string", Type: "string",
}) })
} else {
outputRes = append(outputRes, node.NodeFormField{
Field: fmt.Sprintf("concat_video_url:content:%d", 0),
Value: urlPrefix + msg.FileURL,
Label: fmt.Sprintf("concat_video_url:content:%d", 0),
Type: "string",
})
} }
outputRes = append(outputRes, node.NodeFormField{
Field: fmt.Sprintf("concat_video_url:content:%d", 0),
Value: urlPrefix + msg.FileURL,
Label: fmt.Sprintf("视频内容:content:%d", 0),
Type: "string",
})
nodeInput.Config.OutputResult = outputRes nodeInput.Config.OutputResult = outputRes
return nodeInput, nil return nodeInput, nil
@@ -318,6 +320,64 @@ func HttpLambda(ctx context.Context, input any) (any, error) {
outputRes := make([]node.NodeFormField, 0) outputRes := make([]node.NodeFormField, 0)
var err error var err error
outputRes, err = HttpNode(ctx, nodeInput) outputRes, err = HttpNode(ctx, nodeInput)
//if nodeInput.Config.Name == "生成视频" {
// outputRes, err = HttpNode(ctx, nodeInput)
//} else {
// a := []map[string]any{
// {
// "timeline": "0.0-2.1",
// "url": "https://ark-auto-2127201628-cn-beijing-default.tos-cn-beijing.volces.com/%E8%A7%86%E9%A2%91/1.mp4",
// },
// {
// "timeline": "2.1-4.5",
// "url": "https://ark-auto-2127201628-cn-beijing-default.tos-cn-beijing.volces.com/%E8%A7%86%E9%A2%91/2.mp4",
// },
// {
// "timeline": "4.5-12.2",
// "url": "https://ark-auto-2127201628-cn-beijing-default.tos-cn-beijing.volces.com/%E8%A7%86%E9%A2%91/3.mp4",
// },
// {
// "timeline": "12.2-13.6",
// "url": "https://ark-auto-2127201628-cn-beijing-default.tos-cn-beijing.volces.com/%E8%A7%86%E9%A2%91/4.mp4",
// },
// {
// "timeline": "13.6-17.7",
// "url": "https://ark-auto-2127201628-cn-beijing-default.tos-cn-beijing.volces.com/%E8%A7%86%E9%A2%91/5.mp4model-gateway",
// },
// {
// "timeline": "17.7-31.0",
// "url": "https://ark-auto-2127201628-cn-beijing-default.tos-cn-beijing.volces.com/%E8%A7%86%E9%A2%91/6.mp4",
// },
// {
// "timeline": "31.0-33.2",
// "url": "https://ark-auto-2127201628-cn-beijing-default.tos-cn-beijing.volces.com/%E8%A7%86%E9%A2%91/7.mp4",
// },
// {
// "timeline": "33.2-37.4",
// "url": "https://ark-auto-2127201628-cn-beijing-default.tos-cn-beijing.volces.com/%E8%A7%86%E9%A2%91/8.mp4",
// },
// {
// "timeline": "37.4-38.9",
// "url": "https://ark-auto-2127201628-cn-beijing-default.tos-cn-beijing.volces.com/%E8%A7%86%E9%A2%91/9.mp4",
// },
// {
// "timeline": "38.9-57.9",
// "url": "https://ark-auto-2127201628-cn-beijing-default.tos-cn-beijing.volces.com/%E8%A7%86%E9%A2%91/10.mp4",
// },
// }
// outputRes = append(outputRes, node.NodeFormField{
// Field: fmt.Sprintf("segments"),
// Value: a,
// Label: fmt.Sprintf("segments"),
// Type: "string",
// })
// outputRes = append(outputRes, node.NodeFormField{
// Field: fmt.Sprintf("audioUrl"),
// Value: "http://116.204.74.41:9000/tenantid-94/2026-06-11/9915351c-55b9-46d8-b783-3815126b.m4a",
// Label: fmt.Sprintf("audioUrl"),
// Type: "string",
// })
//}
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -452,12 +512,6 @@ func MergeLambda(ctx context.Context, input any) (res any, err error) {
// 1. 把所有节点输出拍平成 字段名->内容 的map // 1. 把所有节点输出拍平成 字段名->内容 的map
dataMap := make(map[string]node.NodeFormField) dataMap := make(map[string]node.NodeFormField)
_, outputMap, _ := GetNodeContextContent(nodeInput.Global, nodeInput.Config) _, outputMap, _ := GetNodeContextContent(nodeInput.Global, nodeInput.Config)
//for _, valueAny := range outputMap {
// field := node.NodeFormField{}
// if field, ok = valueAny.(node.NodeFormField); ok {
// dataMap[field.Field] = field
// }
//}
for _, field := range outputMap { for _, field := range outputMap {
dataMap[field.Field] = field dataMap[field.Field] = field
} }

View File

@@ -583,7 +583,7 @@ func ImgNode(ctx context.Context, nodeInput *flowDto.NodeExecutionInput, skillNa
outputRes = append(outputRes, node.NodeFormField{ outputRes = append(outputRes, node.NodeFormField{
Field: fmt.Sprintf("img_url:%v:%d", k, i), Field: fmt.Sprintf("img_url:%v:%d", k, i),
Value: v, Value: v,
Label: fmt.Sprintf("img_url%v:%d", k, i), Label: fmt.Sprintf("图片内容%v:%d", k, i),
Type: "string", Type: "string",
}) })
} }
@@ -674,7 +674,7 @@ func AudioOptimizeNode(ctx context.Context, nodeInput *flowDto.NodeExecutionInpu
outputRes = append(outputRes, node.NodeFormField{ outputRes = append(outputRes, node.NodeFormField{
Field: fmt.Sprintf("audio_url:%v:%d", k, i), Field: fmt.Sprintf("audio_url:%v:%d", k, i),
Value: v, Value: v,
Label: fmt.Sprintf("audio_url:%v:%d", k, i), Label: fmt.Sprintf("音频内容:%v:%d", k, i),
Type: "string", Type: "string",
}) })
} }
@@ -1024,146 +1024,3 @@ func GetNodeContextContent(execInput *flowDto.FlowExecutionInput, nodeEntity *en
} }
return input, output, model return input, output, model
} }
//func BuildParam(nodeInput *flowDto.NodeExecutionInput) (skillName string, resultFrom []map[string]any, resultUserFrom []map[string]any) {
// inputMap, outputMap, modelMap := GetNodeContextContent(nodeInput.Global, nodeInput.Config)
// var outputResult []node.NodeFormField
// for _, valueAny := range inputMap {
// if field, ok := valueAny.(node.NodeFormField); ok {
// outputResult = append(outputResult, field)
// }
// }
//
// resultUserFrom = []map[string]any{}
// for _, valueAny := range outputMap {
// if field, ok := valueAny.(node.NodeFormField); ok {
// if !strings.Contains(field.Field, "text_url") && !strings.Contains(field.Field, "img_url") {
// if strings.Contains(field.Field, "text_content") {
// field.Value = StripHtmlTags(gconv.String(field.Value))
// }
// resultUserFrom = append(resultUserFrom, map[string]any{
// field.Label: field.Value,
// })
// }
// }
// }
// for _, valueAny := range modelMap {
// if field, ok := valueAny.(node.NodeFormField); ok {
// outputResult = append(outputResult, field)
// }
// }
// //if !nodeInput.Global.IsDialogue {
// for _, item := range outputResult {
// resultUserFrom = append(resultUserFrom, map[string]any{
// item.Label: item.Value,
// })
// }
// for _, item := range nodeInput.Config.FormConfig {
// resultUserFrom = append(resultUserFrom, map[string]any{
// item.Label: item.Value,
// })
// }
// //}
// if !g.IsEmpty(nodeInput.Global.Desc) {
// resultUserFrom = append(resultUserFrom, map[string]any{
// "desc": nodeInput.Global.Desc,
// })
// }
//
// resultFrom = []map[string]any{}
// for _, item := range nodeInput.Config.ModelConfig.ModelForm {
// if g.IsEmpty(item.Value) {
// continue
// }
// resultFrom = append(resultFrom, map[string]any{
// item.Label: item.Value,
// })
// }
// skillName = nodeInput.Config.SkillName
// if g.IsEmpty(nodeInput.Config.SkillName) {
// skillName = nodeInput.Global.SkillName
// }
//
// return skillName, resultFrom, resultUserFrom
//}
//
//func GetNodeContextContent(execInput *flowDto.FlowExecutionInput, nodeEntity *entity.FlowNode) (map[string]any, map[string]any, map[string]any) {
// input := make(map[string]any)
// output := make(map[string]any)
// model := make(map[string]any)
// // 1. 有引用 → 取引用节点的字段值
// if len(nodeEntity.InputSource) > 0 {
// for _, source := range nodeEntity.InputSource {
// refNodeID := source.NodeId
// fields := source.Field
//
// refNode, ok := execInput.ConfigMap[refNodeID]
// if !ok {
// continue
// }
//
// inputMap := buildInputMap(refNode)
// outputMap := mergeOutput(refNode.OutputResult)
// modelMap := mergeModel(refNode.ModelConfig)
// if len(fields) > 0 {
// // 取指定字段
// for _, f := range fields {
// if v, ok := inputMap[f]; ok {
// input[f] = v
// }
// if v, ok := modelMap[f]; ok {
// model[f] = v
// }
// for k, v := range outputMap {
// if strings.Contains(k, f) {
// model[k] = v
// }
// }
// }
// } else {
// // 取全部
// if refNode.NodeCode != node.NodeTypeHttp {
// for k, v := range inputMap {
// input[k] = v
// }
// }
// for k, v := range modelMap {
// model[k] = v
// }
// }
// }
// }
// return input, output, model
//}
//
//// buildInputMap 从 FormConfig 构造输入map
//func buildInputMap(node *entity.FlowNode) map[string]any {
// m := make(map[string]any)
// for _, item := range node.FormConfig {
// m[item.Label] = item
// }
// return m
//}
//
//// mergeOutput 合并节点输出 []map → 单map
//func mergeOutput(output []node.NodeFormField) map[string]any {
// m := make(map[string]any)
// for _, item := range output {
// m[item.Label] = item
// }
// return m
//}
//
//// mergeOutput 合并节点输出 []map → 单map
//func mergeModel(output node.ModelItem) map[string]any {
// m := make(map[string]any)
// // 遍历 output.ModelForm 里的每一个 key 和原始值
// for _, rawValue := range output.ModelForm {
// if g.IsEmpty(rawValue.Value) {
// continue
// }
// // 包装成 { "value": 原始值 }
// m[rawValue.Label] = rawValue.Value
// }
// return m
//}

View File

@@ -258,15 +258,16 @@ func waitGatewayResult(ctx context.Context, taskId string) (map[string]any, erro
if task.State == 3 || !g.IsEmpty(task.ErrorMsg) { if task.State == 3 || !g.IsEmpty(task.ErrorMsg) {
return nil, fmt.Errorf("模型执行失败:%s", task.ErrorMsg) return nil, fmt.Errorf("模型执行失败:%s", task.ErrorMsg)
} }
if g.IsEmpty(task.OssFile) { if g.IsEmpty(task.Messages) {
return nil, fmt.Errorf("模型返回结果为空") return nil, fmt.Errorf("模型返回结果为空")
} }
// 获取远程文件内容 // 获取远程文件内容
file, err := GetFileBytesFromURL(ctx, task.OssFile) //file, err := GetFileBytesFromURL(ctx, task.OssFile)
if err != nil { //if err != nil {
return nil, err // return nil, err
} //}
return gconv.Map(file), nil //task.Messages = gconv.Map(file)
return task.Messages, nil
} }
// updateTokenCount updates the token count in node execution // updateTokenCount updates the token count in node execution
@@ -295,9 +296,6 @@ func GetModelResult(ctx context.Context, sessionId string, nodeInput *flowDto.No
if err != nil { if err != nil {
return nil, err return nil, err
} }
if composeResult.Status != "success" {
return nil, fmt.Errorf("模型提示词构建错误")
}
modelInfo, err := GetModelInfo(ctx, &flowDto.GetModelInfoReq{ModelName: nodeInput.Config.ModelConfig.ModelName}) modelInfo, err := GetModelInfo(ctx, &flowDto.GetModelInfoReq{ModelName: nodeInput.Config.ModelConfig.ModelName})
if err != nil { if err != nil {
@@ -347,41 +345,31 @@ func GetModelResult(ctx context.Context, sessionId string, nodeInput *flowDto.No
taskIdList := make([]string, len(composeResult.Messages.Rounds)) taskIdList := make([]string, len(composeResult.Messages.Rounds))
for idx, item := range composeResult.Messages.Rounds { for idx, item := range composeResult.Messages.Rounds {
taskId, err := createGatewayTaskOnly(ctx, composeResult.EpicycleId, nodeInput.Config.ModelConfig.ModelName, item) var taskId string
taskId, err = createGatewayTaskOnly(ctx, composeResult.EpicycleId, nodeInput.Config.ModelConfig.ModelName, item)
if err != nil { if err != nil {
return nil, err return nil, err
} }
taskIdList[idx] = taskId taskIdList[idx] = taskId
} }
// 全局共享子上下文,实现一处报错全部终止
subCtx, globalCancel := context.WithCancel(ctx)
defer globalCancel() // 函数退出兜底释放
var wg sync.WaitGroup var wg sync.WaitGroup
errChan := make(chan error, len(taskIdList)) errChan := make(chan error, len(taskIdList))
// 加互斥锁保护结果map
var mu sync.Mutex
for idx, taskId := range taskIdList { for idx, taskId := range taskIdList {
wg.Add(1) wg.Add(1)
go func(idx int, taskId string) { go func(idx int, taskId string) {
defer wg.Done() defer wg.Done()
taskResult, err := waitGatewayResult(subCtx, taskId) var taskResult map[string]any
taskResult, err = waitGatewayResult(ctx, taskId)
if err != nil { if err != nil {
errChan <- err errChan <- err
globalCancel() // 全局取消所有协程收到ctx取消信号快速退出
return return
} }
// 加锁写入map解决并发竞态
mu.Lock()
mapTaskResult[idx] = taskResult mapTaskResult[idx] = taskResult
mu.Unlock()
updateTokenCount(ctx, nodeInput.NodeExecutionId, modelInfo.Model.ResponseTokenField, taskResult) updateTokenCount(ctx, nodeInput.NodeExecutionId, modelInfo.Model.ResponseTokenField, taskResult)
}(idx, taskId) }(idx, taskId)
} }
@@ -389,15 +377,8 @@ func GetModelResult(ctx context.Context, sessionId string, nodeInput *flowDto.No
wg.Wait() wg.Wait()
close(errChan) close(errChan)
// 收集全部错误,而非只读一条 if len(errChan) > 0 {
var errs []error return nil, <-errChan
for len(errChan) > 0 {
errs = append(errs, <-errChan)
}
if len(errs) > 0 {
// 返回第一个错误;如需汇总所有错误可拼接
return nil, errs[0]
} }
} }
} else { } else {
@@ -509,17 +490,16 @@ func VideoConcat(ctx context.Context, videoUrls []string) (r any, err error) {
} }
func GetFileBytesFromURL(ctx context.Context, fileUrl string) ([]byte, error) { func GetFileBytesFromURL(ctx context.Context, fileUrl string) ([]byte, error) {
newS := strings.ReplaceAll(fileUrl, "http://cdn.redpowerfuture.com", g.Cfg().MustGet(ctx, "filePrefix").String())
// 使用 GoFrame 客户端(自带超时、追踪、日志等能力) // 使用 GoFrame 客户端(自带超时、追踪、日志等能力)
resp, err := g.Client().Get(ctx, newS) resp, err := g.Client().Get(ctx, fileUrl)
if err != nil { if err != nil {
return nil, gerror.Wrapf(err, "failed to request url: %s", newS) return nil, gerror.Wrapf(err, "failed to request url: %s", fileUrl)
} }
defer resp.Close() defer resp.Close()
// 校验状态码 // 校验状态码
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return nil, gerror.Newf("request failed with status code: %d, url: %s", resp.StatusCode, newS) return nil, gerror.Newf("request failed with status code: %d, url: %s", resp.StatusCode, fileUrl)
} }
// 读取全部内容 // 读取全部内容

View File

@@ -359,10 +359,7 @@ func (s *nodeLibraryService) GetNodeLibrary(ctx context.Context, req *nodeDto.Wo
item.NodeCode == node.NodeTypeVideoModel || item.NodeCode == node.NodeTypeVideoModel ||
item.NodeCode == node.NodeTypeAudioModel || item.NodeCode == node.NodeTypeAudioModel ||
item.NodeCode == node.NodeTypeBatchModel || item.NodeCode == node.NodeTypeBatchModel ||
item.NodeCode == node.NodeTypeDataConversionModel || item.NodeCode == node.NodeTypeDataConversionModel {
item.NodeCode == node.NodeTypeSenseOptimizeModel ||
item.NodeCode == node.NodeTypeStoryOptimizeModel ||
item.NodeCode == node.NodeTypeScriptOptimizeModel {
item.ModelConfig = append(item.ModelConfig, node.ModelItem{ item.ModelConfig = append(item.ModelConfig, node.ModelItem{
ModelName: "自定义", ModelName: "自定义",
}) })