Files
ai-agent/workflow/service/flow/flow_execution_service.go

729 lines
23 KiB
Go
Raw Permalink Normal View History

package flow
import (
"ai-agent/workflow/consts/flow"
"ai-agent/workflow/consts/node"
fileDao "ai-agent/workflow/dao/file"
flowDao "ai-agent/workflow/dao/flow"
nodeDao "ai-agent/workflow/dao/node"
"ai-agent/workflow/model/dto"
fileDto "ai-agent/workflow/model/dto/file"
flowDto "ai-agent/workflow/model/dto/flow"
nodeDto "ai-agent/workflow/model/dto/node"
"ai-agent/workflow/model/entity"
"context"
"errors"
"fmt"
"sort"
"strconv"
"strings"
"sync"
"time"
"gitea.redpowerfuture.com/red-future/common/utils"
"github.com/cloudwego/eino/compose"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
)
var FlowExecutionService = &flowExecutionService{}
type flowExecutionService struct{}
func (s *flowExecutionService) Get(ctx context.Context, req *flowDto.GetFlowExecutionReq) (res *flowDto.VOFlowExecution, err error) {
r, err := flowDao.FlowExecutionDao.Get(ctx, req)
if err != nil {
return nil, err
}
res = new(flowDto.VOFlowExecution)
res.ImgAddressPrefix, err = utils.GetFileAddressPrefix(ctx)
if err != nil {
return nil, err
}
err = gconv.Struct(r, &res)
return res, err
}
func (s *flowExecutionService) List(ctx context.Context, req *flowDto.ListFlowExecutionReq) (res *flowDto.ListFlowExecutionTreeRes, err error) {
user, err := utils.GetUserInfo(ctx)
if err != nil {
return
}
req.Creator = user.UserName
list, _, err := flowDao.FlowExecutionDao.List(ctx, req)
if err != nil {
return nil, err
}
// ===================== 核心修复:只统计【有数据】的执行记录,空的直接跳过 =====================
executionNumber := make(map[int64]int) // executionId -> 倒序编号(最新=1
// 第一次遍历:只处理【有输出参数】的记录,统计并分配编号
var validList []*entity.FlowExecution // 只存有效(非空)记录
for _, execution := range list {
if g.IsEmpty(execution.OutputParams) {
continue // 空数据直接过滤,不参与编号、不展示
}
validList = append(validList, execution)
}
// 给有效记录分配【时间倒序编号】(最新=1
totalValid := len(validList)
for idx, execution := range validList {
executionNumber[execution.Id] = totalValid - idx
}
// 2. 分组映射:日期 -> 流程节点
type flowWrap struct {
flowNode flowDto.FlowNode
createdAt *gtime.Time
}
dateMap := make(map[string]*[]flowWrap)
// 遍历【有效数据】构建结构
for _, execution := range validList {
createDate := execution.CreatedAt.Format("Y-m-d")
flowName := execution.FlowName
outputParams := execution.OutputParams
// 编号只算有效数据,不会把空的算进去
num := executionNumber[execution.Id]
displayFlowName := fmt.Sprintf("会话-%d(%s)", num, flowName)
// 3. 解析 outputParams
var tempItems []flowDto.OutputItem
for _, paramMap := range outputParams {
for tsKey, value := range paramMap {
if _, err := strconv.ParseInt(tsKey, 10, 64); err != nil {
continue
}
tempItems = append(tempItems, flowDto.OutputItem{
Timestamp: tsKey,
Content: gconv.String(value),
})
}
}
// ===================== 修复1如果解析后依然为空直接跳过不生成第二层节点 =====================
if len(tempItems) == 0 {
continue
}
// 时间戳正序
sort.Slice(tempItems, func(i, j int) bool {
t1, _ := strconv.ParseInt(tempItems[i].Timestamp, 10, 64)
t2, _ := strconv.ParseInt(tempItems[j].Timestamp, 10, 64)
return t1 < t2
})
// 标号:相同类型递增,不同重置
suffixCount := make(map[string]int)
for idx := range tempItems {
item := &tempItems[idx]
val := item.Content
suffix := "内容"
ext := ""
ext = GetFileTypeByPath(val)
if ext == "image" {
suffix = "图片"
}
if ext == "video" {
suffix = "视频"
}
if ext == "audio" {
suffix = "音频"
}
if ext == "text" {
suffix = "文案"
}
if ext == "html" {
suffix = "HTML"
}
suffixCount[suffix]++
item.Type = ext
item.Label = fmt.Sprintf("%s_%d", suffix, suffixCount[suffix])
}
// 组装节点
flowNode := flowDto.FlowNode{
FlowName: displayFlowName,
Id: execution.Id,
SessionId: gconv.String(execution.SessionId),
Items: tempItems,
}
if dateMap[createDate] == nil {
dateMap[createDate] = &[]flowWrap{}
}
*dateMap[createDate] = append(*dateMap[createDate], flowWrap{
flowNode: flowNode,
createdAt: execution.CreatedAt,
})
}
// 6. 构建树 + 排序
var tree []flowDto.DateNode
for date, wraps := range dateMap {
// 第二层按创建时间倒序(最新在前)
sort.Slice(*wraps, func(i, j int) bool {
return (*wraps)[i].createdAt.After((*wraps)[j].createdAt)
})
var flowNodes []flowDto.FlowNode
for _, w := range *wraps {
flowNodes = append(flowNodes, w.flowNode)
}
// ===================== 修复2日期下没有流程也过滤掉 =====================
if len(flowNodes) == 0 {
continue
}
tree = append(tree, flowDto.DateNode{
CreateDate: date,
Flows: flowNodes,
})
}
// 第一层日期倒序
sort.Slice(tree, func(i, j int) bool {
return tree[i].CreateDate > tree[j].CreateDate
})
imgPrefix, err := utils.GetFileAddressPrefix(ctx)
return &flowDto.ListFlowExecutionTreeRes{
Tree: tree,
ImgAddressPrefix: imgPrefix,
}, nil
}
// ComposeCallback 提示词回调接口
func (s *flowExecutionService) ComposeCallback(ctx context.Context, req *flowDto.ComposeCallbackReq) (err error) {
Notify(req.TaskId, req)
return nil
}
// ModelCallback 模型回调接口
func (s *flowExecutionService) ModelCallback(ctx context.Context, req *flowDto.ModelCallbackReq) (err error) {
// 唤醒等待的任务
Notify(req.TaskId, req)
return nil
}
// VideoCallback 视频拼接回调接口
func (s *flowExecutionService) VideoCallback(ctx context.Context, req *flowDto.VideoCallbackReq) (err error) {
// 唤醒等待的任务
Notify(req.TaskId, req)
return nil
}
// HttpNodeCallback http节点回调接口
func (s *flowExecutionService) HttpNodeCallback(ctx context.Context) (err error) {
r := g.RequestFromCtx(ctx)
taskId := r.Get("task_id").String()
Notify(taskId, r)
return nil
}
// ===================== 核心改造:替换为 sync.Map 存储取消上下文 =====================
var (
// cancelMap: traceID -> context.CancelFunc
cancelMap sync.Map
)
func (s *flowExecutionService) Cancel(ctx context.Context, req *flowDto.CancelReq) (err error) {
getRes, err := flowDao.FlowExecutionDao.Get(ctx, &flowDto.GetFlowExecutionReq{
SessionId: req.SessionId,
})
if err != nil {
return err
}
if g.IsEmpty(getRes) {
return fmt.Errorf("会话[%s] 不存在", req.SessionId)
}
// 从 sync.Map 获取取消函数
cancelVal, exist := cancelMap.Load(getRes.TraceId)
if !exist {
return fmt.Errorf("traceID[%s] 不存在或已执行完成", getRes.TraceId)
}
// 执行取消
cancel, ok := cancelVal.(context.CancelFunc)
if !ok {
return fmt.Errorf("traceID[%s] 对应的取消函数类型错误", getRes.TraceId)
}
cancel()
// 取消后清理(可选:也可以在流程结束时统一清理)
cancelMap.Delete(getRes.TraceId)
// 同步更新流程执行状态为已取消
_, err = flowDao.FlowExecutionDao.Update(ctx, &flowDto.UpdateFlowExecutionReq{
Id: getRes.Id,
Status: flow.FlowExecutionStatusCancel.Code(),
})
if err != nil {
return fmt.Errorf("更新取消状态失败: %v", err)
}
return nil
}
func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.ExecuteReq) (res *flowDto.ExecuteRes, err error) {
// ===================== 核心改造1创建可取消的上下文 =====================
execCtx, cancel := context.WithCancel(ctx)
traceId := ""
defer func() {
// 流程结束(成功/失败)时清理 cancelMap
if traceId != "" {
cancelMap.Delete(traceId)
}
cancel()
}()
flowInfo, err := flowDao.FlowExecutionDao.Get(ctx, &flowDto.GetFlowExecutionReq{
SessionId: req.SessionId,
})
if err != nil {
return
}
var executionId int64
var isDialogue bool
var nodeGroupId = uuid.NewString()
if flowInfo == nil {
isDialogue = false
var r = new(flowDto.CreateFlowExecutionReq)
r.FlowUserId = req.FlowId
r.FlowName = req.FlowName
r.NodeGroupId = nodeGroupId
r.TriggerType = flow.FlowExecutionTriggerTypeManual.Code()
r.FlowContent = req.FlowContent
r.NodeInputParams = req.NodeInputParams
r.SessionId = req.SessionId
r.Status = flow.FlowExecutionStatusRunning.Code()
span := trace.SpanFromContext(ctx)
if span != nil && span.SpanContext().HasTraceID() {
r.TraceId = span.SpanContext().TraceID().String()
traceId = r.TraceId
cancelMap.Store(traceId, cancel)
}
executionId, err = flowDao.FlowExecutionDao.Insert(ctx, r)
if err != nil {
return
}
} else {
isDialogue = true
executionId = flowInfo.Id
span := trace.SpanFromContext(ctx)
if span != nil && span.SpanContext().HasTraceID() {
traceId = span.SpanContext().TraceID().String()
cancelMap.Store(traceId, cancel)
}
executionReq := flowDto.UpdateFlowExecutionReq{
Id: executionId,
NodeGroupId: nodeGroupId,
Status: flow.FlowExecutionStatusRunning.Code(),
TraceId: traceId,
}
_, err = flowDao.FlowExecutionDao.Update(ctx, &executionReq)
if err != nil {
return
}
}
if !g.IsEmpty(req.FileUrl) {
createFileTempReq := make([]*fileDto.CreateFileTempReq, 0, len(req.FileUrl))
for _, fileUrl := range req.FileUrl {
var createReq = new(fileDto.CreateFileTempReq)
createReq.BusinessId = req.SessionId
createReq.FileUrl = fileUrl
createFileTempReq = append(createFileTempReq, createReq)
}
_, err = fileDao.FileTempDao.BatchInsert(ctx, createFileTempReq)
if err != nil {
return nil, err
}
}
if isDialogue && !g.IsEmpty(flowInfo) && !g.IsEmpty(req.ResultUrl) {
req.NodeGroupId = nodeGroupId
if strings.HasSuffix(gconv.String(req.ResultUrl), ".inc") {
err = TextModelSingleLambda(ctx, req, flowInfo)
return
} else if strings.HasSuffix(gconv.String(req.ResultUrl), ".png") {
err = ImgModelSingleLambda(ctx, req, flowInfo)
return
} else if strings.HasSuffix(gconv.String(req.ResultUrl), ".html") {
err = TextImgModelSingleLambda(ctx, req, flowInfo)
return
}
return nil, errors.New("文件格式不支持")
}
// =========================================================================
// ✅【第2步】构建执行图
// =========================================================================
var nodeList []entity.FlowNode
var runGraph compose.Runnable[any, any]
nodeList, runGraph, err = BuildGraphFromFlowContent(execCtx, req.FlowContent)
if err != nil {
executionReq := flowDto.UpdateFlowExecutionReq{
Id: executionId,
Status: flow.FlowExecutionStatusFailed.Code(),
ErrorMessage: err.Error(),
}
_, err1 := flowDao.FlowExecutionDao.Update(ctx, &executionReq)
if err1 != nil {
return
}
return nil, fmt.Errorf("执行工作流失败: %v", err)
}
// =========================================================================
// ✅【第3步】构建 ConfigMap
// =========================================================================
configMap := make(map[string]*entity.FlowNode)
for _, cfg := range req.NodeInputParams {
configMap[cfg.Id] = cfg
}
for _, i := range nodeList {
configMap[i.Id] = &i
}
// =========================================================================
// ✅【第4步】构建全局执行入参现在 schemaMap 是有值的!)
// =========================================================================
execInput := &flowDto.FlowExecutionInput{
NodeGroupId: nodeGroupId,
IsDialogue: isDialogue,
ExecutionId: executionId,
ConfigMap: configMap,
SessionId: req.SessionId,
Desc: req.Desc,
SkillName: req.SkillName,
FileUrl: req.FileUrl,
}
// 执行工作流
_, err = runGraph.Invoke(execCtx, execInput)
if err != nil {
// 检测是否是取消导致的错误
if errors.Is(execCtx.Err(), context.Canceled) {
executionReq := flowDto.UpdateFlowExecutionReq{
Id: executionId,
Status: flow.FlowExecutionStatusCancel.Code(),
}
_, _ = flowDao.FlowExecutionDao.Update(ctx, &executionReq)
return nil, fmt.Errorf("工作流已被取消: %v", err)
}
executionReq := flowDto.UpdateFlowExecutionReq{
Id: executionId,
Status: flow.FlowExecutionStatusFailed.Code(),
ErrorMessage: err.Error(),
}
_, err1 := flowDao.FlowExecutionDao.Update(ctx, &executionReq)
if err1 != nil {
return
}
return nil, fmt.Errorf("执行工作流失败: %v", err)
}
return
}
// BuildGraphFromFlowContent 根据前端保存的工作流JSON自动构建执行图
func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo) ([]entity.FlowNode, compose.Runnable[any, any], error) {
// 注册自定义合并函数:处理 *flowDto.FlowExecutionInput 类型合并
// 由于 ConfigMap 是 map 引用类型,所有并行分支修改已经写入共享内存
// 直接返回第一个实例即可,所有修改都已经可见
compose.RegisterValuesMergeFunc(func(values []*flowDto.FlowExecutionInput) (*flowDto.FlowExecutionInput, error) {
if len(values) == 0 {
return nil, nil
}
// 返回第一个实例ConfigMap 是指针,所有修改都已经写入共享数据结构
return values[0], nil
})
graph := compose.NewGraph[any, any]()
var nodeList []entity.FlowNode
nodeId := uuid.NewString()
originalEndNodes := findEndNodes(flowContent.StartNodeId, flowContent.Edges)
for i := range originalEndNodes {
sprintf := fmt.Sprintf("%v_%d", nodeId, i)
summaryNode := entity.FlowNode{
Id: sprintf,
NodeCode: node.NodeTypeSystemSum,
Name: node.NodeNameSystemSum,
InputSource: []entity.FlowNodeInputSource{}, // 后续自动聚合所有节点输出
FormConfig: nil,
ModelConfig: node.ModelItem{},
}
nodeList = append(nodeList, summaryNode)
flowContent.Nodes = append(flowContent.Nodes, summaryNode)
}
// 注册所有节点
nodeMap := make(map[string]entity.FlowNode)
for _, item := range flowContent.Nodes {
nodeMap[item.Id] = item
if item.NodeCode != node.NodeTypeJudge {
registerNodeToGraph(graph, item)
}
}
// 注册所有边
if flowContent.StartNodeId != "" {
_ = graph.AddEdge(compose.START, flowContent.StartNodeId)
}
for i, endID := range originalEndNodes {
sprintf := fmt.Sprintf("%v_%d", nodeId, i)
_ = graph.AddEdge(endID, sprintf)
_ = graph.AddEdge(sprintf, compose.END)
}
// 构建边关系
upstreamMap := make(map[string][]string)
edgeMap := make(map[string][]entity.FlowEdge)
for _, edge := range flowContent.Edges {
edgeMap[edge.From] = append(edgeMap[edge.From], edge)
upstreamMap[edge.To] = append(upstreamMap[edge.To], edge.From)
}
// 处理连线 & 分支
for fromNodeID, edges := range edgeMap {
fromNode := nodeMap[fromNodeID]
// 判断节点 → 分支处理
if fromNode.NodeCode == node.NodeTypeJudge {
branchMap := make(map[string]bool)
for _, e := range edges {
branchMap[e.To] = true
}
judgeLambda := func(ctx context.Context, input any) (string, error) {
execInput, ok := input.(*flowDto.FlowExecutionInput)
if !ok {
return "", fmt.Errorf("入参类型错误")
}
currentConfig := execInput.ConfigMap[fromNodeID]
if currentConfig == nil {
return "", fmt.Errorf("判断节点%s无配置", fromNodeID)
}
branchIdNameMap := make(map[string]string)
var branchIDs []string
for nodeID := range branchMap {
branchIDs = append(branchIDs, nodeID)
// 从configMap获取分支节点的名称
if branchNodeCfg, ok := execInput.ConfigMap[nodeID]; ok {
branchIdNameMap[nodeID] = branchNodeCfg.Name
} else {
branchIdNameMap[nodeID] = "未命名节点" // 兜底
}
}
// 把分支ID-名称映射塞进 ModelConfig带给意图节点
m := make(map[string]interface{})
m["branch_ids"] = branchIDs
m["branch_id_name_map"] = branchIdNameMap // 传递ID-名称映射
currentConfig.Config = m
// 关键修改:构造 NodeExecutionInput 传入 JudgeLambda
nodeExecInput := &flowDto.NodeExecutionInput{
Config: currentConfig, // 当前判断节点配置
Global: execInput, // 全局执行入参
}
return JudgeLambda(ctx, nodeExecInput) // 传入 NodeExecutionInput 类型
}
_ = graph.AddBranch(upstreamMap[fromNodeID][0], compose.NewGraphBranch(judgeLambda, branchMap))
continue
}
// 普通节点连线
for _, e := range edges {
toNode := nodeMap[e.To]
if toNode.NodeCode == node.NodeTypeJudge {
continue
}
_ = graph.AddEdge(e.From, e.To)
}
}
compile, err := graph.Compile(ctx, compose.WithGraphName("auto_build_workflow"))
return nodeList, compile, err
}
// -------------------------- 节点自动注册器(核心分发) --------------------------
func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNode) {
nodeID := flowNode.Id
code := flowNode.NodeCode
// 通用包装:全程入参都是 *FlowExecutionInput
wrapLambda := func(lambda func(ctx context.Context, input any) (any, error)) func(ctx context.Context, input any) (any, error) {
return func(ctx context.Context, input any) (any, error) {
// ✅ 【关键】全程入参类型永远不变
execInput, ok := input.(*flowDto.FlowExecutionInput)
if !ok {
return nil, fmt.Errorf("入参必须是 *FlowExecutionInput, 实际是 %T", input)
}
configMap := execInput.ConfigMap
currentConfig := configMap[nodeID]
if currentConfig == nil {
return nil, fmt.Errorf("节点%s无配置", nodeID)
}
// 获取入参 - 适配切片类型:遍历所有来源节点
realInput := new(flowDto.NodeExecutionInput)
if len(flowNode.InputSource) > 0 { // 改为判断切片长度
// 遍历所有指定的来源节点,聚合输出结果
for _, inputSource := range flowNode.InputSource { // 遍历切片
if sourceConfig, ok := configMap[inputSource.NodeId]; ok {
currentConfig.OutputResult = append(currentConfig.OutputResult, sourceConfig.OutputResult...)
}
}
}
// ✅ 封装节点执行入参(配置+表单架构)
realInput = &flowDto.NodeExecutionInput{
Config: currentConfig,
Global: execInput, // ✅ 把【全部节点】的对象直接塞进来
}
// ✅ 插入节点执行记录,初始状态为运行中
startTime := time.Now()
// 上传OSS每条独立上传
ossResult, err := Upload(ctx, &dto.UploadFileBytesReq{
FileBytes: gconv.Bytes(gconv.String(realInput)),
FileName: fmt.Sprintf("nodeInput:%v.txt", time.Now().UnixMilli()),
})
if err != nil {
return nil, err
}
nodeExecutionId, err := nodeDao.NodeExecutionDao.Insert(ctx, &nodeDto.CreateNodeExecutionReq{
FlowExecutionId: execInput.ExecutionId,
NodeId: nodeID,
NodeName: flowNode.Name,
NodeGroupId: execInput.NodeGroupId,
InputParamsPath: ossResult.FileURL,
Status: node.NodeExecutionStatusRunning.Code(),
})
if err != nil {
// 记录失败到已执行列表
execInput.ExecutedNodes = append(execInput.ExecutedNodes, flowDto.ExecutedNode{
NodeId: nodeID,
Status: node.NodeExecutionStatusFailed.Code(),
})
return nil, err
}
realInput.NodeExecutionId = nodeExecutionId
// 执行节点
_, err = lambda(ctx, realInput)
durationMs := time.Since(startTime).Milliseconds()
// 上传OSS每条独立上传
ossResult1, err := Upload(ctx, &dto.UploadFileBytesReq{
FileBytes: gconv.Bytes(gconv.String(realInput)),
FileName: fmt.Sprintf("nodeInput:%v.txt", time.Now().UnixMilli()),
})
if err != nil {
return nil, err
}
updateReq := &nodeDto.UpdateNodeExecutionReq{
Id: nodeExecutionId,
OutputParamsPath: ossResult1.FileURL,
DurationMs: durationMs,
}
if err != nil {
// 执行失败,更新状态
updateReq.Status = node.NodeExecutionStatusFailed.Code()
updateReq.ErrorMessage = err.Error()
_, _ = nodeDao.NodeExecutionDao.Update(ctx, updateReq)
// 记录失败到已执行列表
execInput.ExecutedNodes = append(execInput.ExecutedNodes, flowDto.ExecutedNode{
NodeId: nodeID,
Status: node.NodeExecutionStatusFailed.Code(),
})
return nil, err
}
// 执行成功,更新状态
updateReq.Status = node.NodeExecutionStatusSuccess.Code()
_, _ = nodeDao.NodeExecutionDao.Update(ctx, updateReq)
// 记录成功到已执行列表
execInput.ExecutedNodes = append(execInput.ExecutedNodes, flowDto.ExecutedNode{
NodeId: nodeID,
Status: node.NodeExecutionStatusSuccess.Code(),
})
// ✅ 关键:返回整个 execInput让下一个节点继续用
return execInput, nil
}
}
switch code {
case "__start__":
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(StartLambda)))
case node.NodeTypeSystemSum:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(SummaryLambda)))
case node.NodeTypeTextModel:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(TextModelLambda)))
case node.NodeTypeImageModel:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(ImageModelLambda)))
case node.NodeTypeVideoModel:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(VideoModelLambda)))
case node.NodeTypeAudioModel:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(AudioModelLambda)))
case node.NodeTypeBatchModel:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(BatchModelLambda)))
case node.NodeTypeDataConversionModel:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(DataConversionLambda)))
case node.NodeTypeCustomNode:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(CustomLambda)))
case node.NodeTypeForm:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(FormLambda)))
case node.NodeTypeIntent:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(IntentLambda)))
case node.NodeTypeMerge:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(MergeLambda)))
case node.NodeTypeDataMerge:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(DataMergeLambda)), compose.WithGraphCompileOptions(compose.WithNodeTriggerMode(compose.AllPredecessor)))
case node.NodeTypeHttp:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(HttpLambda)))
}
}
func findEndNodes(startNodeId string, edges []entity.FlowEdge) []string {
nextMap := make(map[string][]string)
for _, e := range edges {
nextMap[e.From] = append(nextMap[e.From], e.To)
}
endNodeSet := make(map[string]struct{})
visited := make(map[string]struct{})
queue := []string{startNodeId}
for len(queue) > 0 {
node := queue[0]
queue = queue[1:]
if _, exist := visited[node]; exist {
continue
}
visited[node] = struct{}{}
nextList := nextMap[node]
if len(nextList) == 0 {
endNodeSet[node] = struct{}{}
continue
}
queue = append(queue, nextList...)
}
res := make([]string, 0, len(endNodeSet))
for k := range endNodeSet {
res = append(res, k)
}
return res
}