feat: 新增主动拉取与多类型回调功能

- 新增 ActivePull 实体、DAO、DTO 及 Service,支持主动拉取任务管理
- 新增 ComposeCallback、VideoCallback、HttpNodeCallback 多类型回调接口
- FlowExecution 增加 NodeGroupId 和 TotalTokens 字段,支持节点组追踪与 Token 统计
- ExecutedNodes 结构由字符串列表改为包含执行状态的节点对象列表
- 重构回调通知机制,统一 Notify 函数调用
- 优化输出项类型判断逻辑,新增文件类型标识
This commit is contained in:
2026-06-10 14:23:55 +08:00
parent ab3a2d967e
commit 03c95c3601
33 changed files with 3207 additions and 615 deletions

View File

@@ -5,8 +5,10 @@ import (
"ai-agent/workflow/consts/node"
fileDao "ai-agent/workflow/dao/file"
flowDao "ai-agent/workflow/dao/flow"
nodeDao "ai-agent/workflow/dao/node"
fileDto "ai-agent/workflow/model/dto/file"
flowDto "ai-agent/workflow/model/dto/flow"
nodeDto "ai-agent/workflow/model/dto/node"
"ai-agent/workflow/model/entity"
"context"
"errors"
@@ -15,12 +17,14 @@ import (
"strconv"
"strings"
"sync"
"time"
"gitea.com/red-future/common/utils"
"github.com/cloudwego/eino/compose"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
)
@@ -121,22 +125,30 @@ func (s *flowExecutionService) List(ctx context.Context, req *flowDto.ListFlowEx
item := &tempItems[idx]
val := item.Content
suffix := "内容"
switch {
case strings.Contains(val, "img") || strings.Contains(val, "png") || strings.Contains(val, "jpg"):
ext := ""
ext = GetFileTypeByPath(val)
if ext == "image" {
suffix = "图片"
case strings.Contains(val, "html") || strings.Contains(val, "HTML"):
suffix = "HTML"
case strings.Contains(val, "inc") || len(val) > 50:
}
if ext == "video" {
suffix = "视频"
}
if ext == "audio" {
suffix = "音频"
}
if ext == "text" {
suffix = "文案"
}
if ext == "html" {
suffix = "HTML"
}
suffixCount[suffix]++
item.Type = ext
item.Label = fmt.Sprintf("%s_%d", suffix, suffixCount[suffix])
}
// 组装节点
node := flowDto.FlowNode{
flowNode := flowDto.FlowNode{
FlowName: displayFlowName,
Id: execution.Id,
SessionId: gconv.String(execution.SessionId),
@@ -147,7 +159,7 @@ func (s *flowExecutionService) List(ctx context.Context, req *flowDto.ListFlowEx
dateMap[createDate] = &[]flowWrap{}
}
*dateMap[createDate] = append(*dateMap[createDate], flowWrap{
flowNode: node,
flowNode: flowNode,
createdAt: execution.CreatedAt,
})
}
@@ -188,6 +200,12 @@ func (s *flowExecutionService) List(ctx context.Context, req *flowDto.ListFlowEx
}, nil
}
// ComposeCallback 提示词回调接口
func (s *flowExecutionService) ComposeCallback(ctx context.Context, req *flowDto.ComposeCallbackReq) (err error) {
Notify(req.TaskId, req)
return nil
}
// ModelCallback 模型回调接口
func (s *flowExecutionService) ModelCallback(ctx context.Context, req *flowDto.ModelCallbackReq) (err error) {
// 唤醒等待的任务
@@ -195,43 +213,19 @@ func (s *flowExecutionService) ModelCallback(ctx context.Context, req *flowDto.M
return nil
}
// 全局等待任务回调的工具
var (
asyncMu sync.Mutex
asyncTasks = make(map[string]chan any)
)
// Wait 阻塞等待回调结果
// 调用后会一直卡住,直到 Notify 唤醒 或 超时/取消
func Wait(ctx context.Context, taskId string) (any, error) {
asyncMu.Lock()
ch := make(chan any, 1)
asyncTasks[taskId] = ch
asyncMu.Unlock()
select {
case result := <-ch:
return result, nil
case <-ctx.Done():
asyncMu.Lock()
delete(asyncTasks, taskId)
asyncMu.Unlock()
return nil, ctx.Err()
}
// VideoCallback 视频拼接回调接口
func (s *flowExecutionService) VideoCallback(ctx context.Context, req *flowDto.VideoCallbackReq) (err error) {
// 唤醒等待的任务
Notify(req.TaskId, req)
return nil
}
// Notify 回调时调用,唤醒等待的任务
func Notify(taskId string, result any) {
asyncMu.Lock()
defer asyncMu.Unlock()
ch, exist := asyncTasks[taskId]
if !exist {
return
}
ch <- result
delete(asyncTasks, taskId)
// HttpNodeCallback http节点回调接口
func (s *flowExecutionService) HttpNodeCallback(ctx context.Context) (err error) {
r := g.RequestFromCtx(ctx)
taskId := r.Get("task_id").String()
Notify(taskId, r)
return nil
}
// ===================== 核心改造:替换为 sync.Map 存储取消上下文 =====================
@@ -298,11 +292,13 @@ func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.Execute
}
var executionId int64
var isDialogue bool
var nodeGroupId = uuid.NewString()
if flowInfo == nil {
isDialogue = false
var r = new(flowDto.CreateFlowExecutionReq)
r.FlowUserId = req.FlowId
r.FlowName = req.FlowName
r.NodeGroupId = nodeGroupId
r.TriggerType = flow.FlowExecutionTriggerTypeManual.Code()
r.FlowContent = req.FlowContent
r.NodeInputParams = req.NodeInputParams
@@ -327,9 +323,10 @@ func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.Execute
cancelMap.Store(traceId, cancel)
}
executionReq := flowDto.UpdateFlowExecutionReq{
Id: executionId,
Status: flow.FlowExecutionStatusRunning.Code(),
TraceId: traceId,
Id: executionId,
NodeGroupId: nodeGroupId,
Status: flow.FlowExecutionStatusRunning.Code(),
TraceId: traceId,
}
_, err = flowDao.FlowExecutionDao.Update(ctx, &executionReq)
if err != nil {
@@ -352,6 +349,7 @@ func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.Execute
}
if isDialogue && !g.IsEmpty(flowInfo) && !g.IsEmpty(req.ResultUrl) {
req.NodeGroupId = nodeGroupId
if strings.HasSuffix(gconv.String(req.ResultUrl), ".inc") {
err = TextModelSingleLambda(ctx, req, flowInfo)
return
@@ -440,6 +438,7 @@ func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.Execute
// ✅【第4步】构建全局执行入参现在 schemaMap 是有值的!)
// =========================================================================
execInput := &flowDto.FlowExecutionInput{
NodeGroupId: nodeGroupId,
IsDialogue: isDialogue,
ExecutionId: executionId,
ConfigMap: configMap,
@@ -476,6 +475,17 @@ func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.Execute
// BuildGraphFromFlowContent 根据前端保存的工作流JSON自动构建执行图
func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo, judge2IntentNodeMap map[string]string, summaryNodeID string) (compose.Runnable[any, any], error) {
// 注册自定义合并函数:处理 *flowDto.FlowExecutionInput 类型合并
// 由于 ConfigMap 是 map 引用类型,所有并行分支修改已经写入共享内存
// 直接返回第一个实例即可,所有修改都已经可见
compose.RegisterValuesMergeFunc(func(values []*flowDto.FlowExecutionInput) (*flowDto.FlowExecutionInput, error) {
if len(values) == 0 {
return nil, nil
}
// 返回第一个实例ConfigMap 是指针,所有修改都已经写入共享数据结构
return values[0], nil
})
graph := compose.NewGraph[any, any]()
nodeMap := make(map[string]entity.FlowNode)
@@ -582,7 +592,7 @@ func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo
}
_ = graph.AddEdge(summaryNodeID, compose.END)
return graph.Compile(ctx, compose.WithGraphName("auto_build_workflow"))
return graph.Compile(ctx, compose.WithGraphName("auto_build_workflow"), compose.WithNodeTriggerMode(compose.AllPredecessor))
}
// -------------------------- 节点自动注册器(核心分发) --------------------------
@@ -606,7 +616,7 @@ func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNod
}
// 获取入参 - 适配切片类型:遍历所有来源节点
var realInput any
realInput := new(flowDto.NodeExecutionInput)
if len(flowNode.InputSource) > 0 { // 改为判断切片长度
// 遍历所有指定的来源节点,聚合输出结果
for _, inputSource := range flowNode.InputSource { // 遍历切片
@@ -621,19 +631,54 @@ func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNod
Config: currentConfig,
Global: execInput, // ✅ 把【全部节点】的对象直接塞进来
}
// 执行节点
output, err := lambda(ctx, realInput)
// ✅ 插入节点执行记录,初始状态为运行中
startTime := time.Now()
nodeExecutionId, err := nodeDao.NodeExecutionDao.Insert(ctx, &nodeDto.CreateNodeExecutionReq{
FlowExecutionId: execInput.ExecutionId,
NodeId: nodeID,
NodeName: flowNode.Name,
NodeGroupId: execInput.NodeGroupId,
InputParams: realInput,
Status: node.NodeExecutionStatusRunning.Code(),
})
if err != nil {
// 记录失败到已执行列表
execInput.ExecutedNodes = append(execInput.ExecutedNodes, flowDto.ExecutedNode{
NodeId: nodeID,
Status: node.NodeExecutionStatusFailed.Code(),
})
return nil, err
}
// ✅ 自动把当前节点ID 加入已执行列表
execInput.ExecutedNodes = append(execInput.ExecutedNodes, nodeID)
// 输出存入 FlowNodeConfig
if outConfig, ok := output.(*entity.FlowNode); ok {
currentConfig.OutputResult = outConfig.OutputResult
realInput.NodeExecutionId = nodeExecutionId
// 执行节点
_, err = lambda(ctx, realInput)
durationMs := time.Since(startTime).Milliseconds()
updateReq := &nodeDto.UpdateNodeExecutionReq{
Id: nodeExecutionId,
InputParams: realInput,
DurationMs: durationMs,
}
if err != nil {
// 执行失败,更新状态
updateReq.Status = node.NodeExecutionStatusFailed.Code()
updateReq.ErrorMessage = err.Error()
_, _ = nodeDao.NodeExecutionDao.Update(ctx, updateReq)
// 记录失败到已执行列表
execInput.ExecutedNodes = append(execInput.ExecutedNodes, flowDto.ExecutedNode{
NodeId: nodeID,
Status: node.NodeExecutionStatusFailed.Code(),
})
return nil, err
}
// 执行成功,更新状态
updateReq.Status = node.NodeExecutionStatusSuccess.Code()
_, _ = nodeDao.NodeExecutionDao.Update(ctx, updateReq)
// 记录成功到已执行列表
execInput.ExecutedNodes = append(execInput.ExecutedNodes, flowDto.ExecutedNode{
NodeId: nodeID,
Status: node.NodeExecutionStatusSuccess.Code(),
})
// ✅ 关键:返回整个 execInput让下一个节点继续用
return execInput, nil
@@ -654,6 +699,16 @@ func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNod
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(VideoModelLambda)))
case node.NodeTypeAudioModel:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(AudioModelLambda)))
case node.NodeTypeBatchModel:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(BatchModelLambda)))
case node.NodeTypeDataConversionModel:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(DataConversionLambda)))
//case node.NodeTypeSenseOptimizeModel:
// _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(SenseOptimizeModelLambda)))
//case node.NodeTypeStoryOptimizeModel:
// _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(StoryOptimizeModelLambda)))
//case node.NodeTypeScriptOptimizeModel:
// _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(ScriptOptimizeModelLambda)))
case node.NodeTypeCustomNode:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(CustomLambda)))
case node.NodeTypeForm:
@@ -662,6 +717,10 @@ func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNod
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(IntentLambda)))
case node.NodeTypeMerge:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(MergeLambda)))
case node.NodeTypeDataMerge:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(DataMergeLambda)))
case node.NodeTypeHttp:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(HttpLambda)))
}
}