2026-05-12 13:34:28 +08:00
|
|
|
|
package flow
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"ai-agent/workflow/consts/flow"
|
|
|
|
|
|
"ai-agent/workflow/consts/node"
|
2026-05-15 09:37:23 +08:00
|
|
|
|
fileDao "ai-agent/workflow/dao/file"
|
2026-05-12 13:34:28 +08:00
|
|
|
|
flowDao "ai-agent/workflow/dao/flow"
|
2026-05-15 09:37:23 +08:00
|
|
|
|
fileDto "ai-agent/workflow/model/dto/file"
|
2026-05-12 13:34:28 +08:00
|
|
|
|
flowDto "ai-agent/workflow/model/dto/flow"
|
|
|
|
|
|
"ai-agent/workflow/model/entity"
|
|
|
|
|
|
"context"
|
2026-05-15 09:37:23 +08:00
|
|
|
|
"errors"
|
2026-05-12 13:34:28 +08:00
|
|
|
|
"fmt"
|
|
|
|
|
|
"sort"
|
|
|
|
|
|
"strconv"
|
|
|
|
|
|
"strings"
|
|
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
|
|
|
|
"gitea.com/red-future/common/utils"
|
|
|
|
|
|
"github.com/cloudwego/eino/compose"
|
|
|
|
|
|
"github.com/gogf/gf/v2/frame/g"
|
|
|
|
|
|
"github.com/gogf/gf/v2/os/gtime"
|
|
|
|
|
|
"github.com/gogf/gf/v2/util/gconv"
|
|
|
|
|
|
"go.opentelemetry.io/otel/trace"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
var FlowExecutionService = &flowExecutionService{}
|
|
|
|
|
|
|
|
|
|
|
|
type flowExecutionService struct{}
|
|
|
|
|
|
|
2026-05-12 14:31:51 +08:00
|
|
|
|
func (s *flowExecutionService) Get(ctx context.Context, req *flowDto.GetFlowExecutionReq) (res *flowDto.VOFlowExecution, err error) {
|
|
|
|
|
|
r, err := flowDao.FlowExecutionDao.Get(ctx, req)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
res = new(flowDto.VOFlowExecution)
|
2026-05-15 09:37:23 +08:00
|
|
|
|
res.ImgAddressPrefix, err = utils.GetFileAddressPrefix(ctx)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
2026-05-12 14:31:51 +08:00
|
|
|
|
err = gconv.Struct(r, &res)
|
|
|
|
|
|
return res, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-05-12 13:34:28 +08:00
|
|
|
|
func (s *flowExecutionService) List(ctx context.Context, req *flowDto.ListFlowExecutionReq) (res *flowDto.ListFlowExecutionTreeRes, err error) {
|
|
|
|
|
|
user, err := utils.GetUserInfo(ctx)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
req.Creator = user.UserName
|
|
|
|
|
|
list, _, err := flowDao.FlowExecutionDao.List(ctx, req)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ===================== 核心修复:只统计【有数据】的执行记录,空的直接跳过 =====================
|
|
|
|
|
|
|
|
|
|
|
|
executionNumber := make(map[int64]int) // executionId -> 倒序编号(最新=1)
|
|
|
|
|
|
|
|
|
|
|
|
// 第一次遍历:只处理【有输出参数】的记录,统计并分配编号
|
|
|
|
|
|
var validList []*entity.FlowExecution // 只存有效(非空)记录
|
|
|
|
|
|
for _, execution := range list {
|
|
|
|
|
|
if g.IsEmpty(execution.OutputParams) {
|
|
|
|
|
|
continue // 空数据直接过滤,不参与编号、不展示
|
|
|
|
|
|
}
|
|
|
|
|
|
validList = append(validList, execution)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 给有效记录分配【时间倒序编号】(最新=1)
|
|
|
|
|
|
totalValid := len(validList)
|
|
|
|
|
|
for idx, execution := range validList {
|
|
|
|
|
|
executionNumber[execution.Id] = totalValid - idx
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 2. 分组映射:日期 -> 流程节点
|
|
|
|
|
|
type flowWrap struct {
|
|
|
|
|
|
flowNode flowDto.FlowNode
|
|
|
|
|
|
createdAt *gtime.Time
|
|
|
|
|
|
}
|
|
|
|
|
|
dateMap := make(map[string]*[]flowWrap)
|
|
|
|
|
|
|
|
|
|
|
|
// 遍历【有效数据】构建结构
|
|
|
|
|
|
for _, execution := range validList {
|
|
|
|
|
|
createDate := execution.CreatedAt.Format("Y-m-d")
|
|
|
|
|
|
flowName := execution.FlowName
|
|
|
|
|
|
outputParams := execution.OutputParams
|
|
|
|
|
|
|
|
|
|
|
|
// 编号只算有效数据,不会把空的算进去
|
|
|
|
|
|
num := executionNumber[execution.Id]
|
|
|
|
|
|
displayFlowName := fmt.Sprintf("会话-%d(%s)", num, flowName)
|
|
|
|
|
|
|
|
|
|
|
|
// 3. 解析 outputParams
|
|
|
|
|
|
var tempItems []flowDto.OutputItem
|
|
|
|
|
|
for _, paramMap := range outputParams {
|
|
|
|
|
|
for tsKey, value := range paramMap {
|
|
|
|
|
|
if _, err := strconv.ParseInt(tsKey, 10, 64); err != nil {
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
tempItems = append(tempItems, flowDto.OutputItem{
|
|
|
|
|
|
Timestamp: tsKey,
|
|
|
|
|
|
Content: gconv.String(value),
|
|
|
|
|
|
})
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ===================== 修复1:如果解析后依然为空,直接跳过,不生成第二层节点 =====================
|
|
|
|
|
|
if len(tempItems) == 0 {
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 时间戳正序
|
|
|
|
|
|
sort.Slice(tempItems, func(i, j int) bool {
|
|
|
|
|
|
t1, _ := strconv.ParseInt(tempItems[i].Timestamp, 10, 64)
|
|
|
|
|
|
t2, _ := strconv.ParseInt(tempItems[j].Timestamp, 10, 64)
|
|
|
|
|
|
return t1 < t2
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
// 标号:相同类型递增,不同重置
|
|
|
|
|
|
suffixCount := make(map[string]int)
|
|
|
|
|
|
for idx := range tempItems {
|
|
|
|
|
|
item := &tempItems[idx]
|
|
|
|
|
|
val := item.Content
|
|
|
|
|
|
suffix := "内容"
|
|
|
|
|
|
|
|
|
|
|
|
switch {
|
|
|
|
|
|
case strings.Contains(val, "img") || strings.Contains(val, "png") || strings.Contains(val, "jpg"):
|
|
|
|
|
|
suffix = "图片"
|
|
|
|
|
|
case strings.Contains(val, "html") || strings.Contains(val, "HTML"):
|
|
|
|
|
|
suffix = "HTML"
|
|
|
|
|
|
case strings.Contains(val, "txt") || len(val) > 50:
|
|
|
|
|
|
suffix = "文案"
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
suffixCount[suffix]++
|
|
|
|
|
|
item.Label = fmt.Sprintf("%s_%d", suffix, suffixCount[suffix])
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 组装节点
|
|
|
|
|
|
node := flowDto.FlowNode{
|
|
|
|
|
|
FlowName: displayFlowName,
|
2026-05-12 14:31:51 +08:00
|
|
|
|
Id: execution.Id,
|
2026-05-12 13:34:28 +08:00
|
|
|
|
SessionId: gconv.String(execution.SessionId),
|
|
|
|
|
|
Items: tempItems,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if dateMap[createDate] == nil {
|
|
|
|
|
|
dateMap[createDate] = &[]flowWrap{}
|
|
|
|
|
|
}
|
|
|
|
|
|
*dateMap[createDate] = append(*dateMap[createDate], flowWrap{
|
|
|
|
|
|
flowNode: node,
|
|
|
|
|
|
createdAt: execution.CreatedAt,
|
|
|
|
|
|
})
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 6. 构建树 + 排序
|
|
|
|
|
|
var tree []flowDto.DateNode
|
|
|
|
|
|
for date, wraps := range dateMap {
|
|
|
|
|
|
// 第二层按创建时间倒序(最新在前)
|
|
|
|
|
|
sort.Slice(*wraps, func(i, j int) bool {
|
|
|
|
|
|
return (*wraps)[i].createdAt.After((*wraps)[j].createdAt)
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
var flowNodes []flowDto.FlowNode
|
|
|
|
|
|
for _, w := range *wraps {
|
|
|
|
|
|
flowNodes = append(flowNodes, w.flowNode)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ===================== 修复2:日期下没有流程,也过滤掉 =====================
|
|
|
|
|
|
if len(flowNodes) == 0 {
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
tree = append(tree, flowDto.DateNode{
|
|
|
|
|
|
CreateDate: date,
|
|
|
|
|
|
Flows: flowNodes,
|
|
|
|
|
|
})
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 第一层日期倒序
|
|
|
|
|
|
sort.Slice(tree, func(i, j int) bool {
|
|
|
|
|
|
return tree[i].CreateDate > tree[j].CreateDate
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
imgPrefix, err := utils.GetFileAddressPrefix(ctx)
|
|
|
|
|
|
return &flowDto.ListFlowExecutionTreeRes{
|
|
|
|
|
|
Tree: tree,
|
|
|
|
|
|
ImgAddressPrefix: imgPrefix,
|
|
|
|
|
|
}, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ModelCallback 模型回调接口
|
|
|
|
|
|
func (s *flowExecutionService) ModelCallback(ctx context.Context, req *flowDto.ModelCallbackReq) (err error) {
|
|
|
|
|
|
// 唤醒等待的任务
|
|
|
|
|
|
Notify(req.TaskId, req)
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 全局等待任务回调的工具
|
|
|
|
|
|
var (
|
|
|
|
|
|
asyncMu sync.Mutex
|
|
|
|
|
|
asyncTasks = make(map[string]chan any)
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// Wait 阻塞等待回调结果
|
|
|
|
|
|
// 调用后会一直卡住,直到 Notify 唤醒 或 超时/取消
|
|
|
|
|
|
func Wait(ctx context.Context, taskId string) (any, error) {
|
|
|
|
|
|
asyncMu.Lock()
|
|
|
|
|
|
ch := make(chan any, 1)
|
|
|
|
|
|
asyncTasks[taskId] = ch
|
|
|
|
|
|
asyncMu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
|
case result := <-ch:
|
|
|
|
|
|
return result, nil
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
|
asyncMu.Lock()
|
|
|
|
|
|
delete(asyncTasks, taskId)
|
|
|
|
|
|
asyncMu.Unlock()
|
|
|
|
|
|
return nil, ctx.Err()
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Notify 回调时调用,唤醒等待的任务
|
|
|
|
|
|
func Notify(taskId string, result any) {
|
|
|
|
|
|
asyncMu.Lock()
|
|
|
|
|
|
defer asyncMu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
ch, exist := asyncTasks[taskId]
|
|
|
|
|
|
if !exist {
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ch <- result
|
|
|
|
|
|
delete(asyncTasks, taskId)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-05-15 09:37:23 +08:00
|
|
|
|
// ===================== 核心改造:替换为 sync.Map 存储取消上下文 =====================
|
|
|
|
|
|
var (
|
|
|
|
|
|
// cancelMap: traceID -> context.CancelFunc
|
|
|
|
|
|
cancelMap sync.Map
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
func (s *flowExecutionService) Cancel(ctx context.Context, req *flowDto.CancelReq) (err error) {
|
|
|
|
|
|
getRes, err := flowDao.FlowExecutionDao.Get(ctx, &flowDto.GetFlowExecutionReq{
|
|
|
|
|
|
SessionId: req.SessionId,
|
|
|
|
|
|
})
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
if g.IsEmpty(getRes) {
|
|
|
|
|
|
return fmt.Errorf("会话[%s] 不存在", req.SessionId)
|
|
|
|
|
|
}
|
|
|
|
|
|
// 从 sync.Map 获取取消函数
|
|
|
|
|
|
cancelVal, exist := cancelMap.Load(getRes.TraceId)
|
|
|
|
|
|
if !exist {
|
|
|
|
|
|
return fmt.Errorf("traceID[%s] 不存在或已执行完成", getRes.TraceId)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 执行取消
|
|
|
|
|
|
cancel, ok := cancelVal.(context.CancelFunc)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return fmt.Errorf("traceID[%s] 对应的取消函数类型错误", getRes.TraceId)
|
|
|
|
|
|
}
|
|
|
|
|
|
cancel()
|
|
|
|
|
|
|
|
|
|
|
|
// 取消后清理(可选:也可以在流程结束时统一清理)
|
|
|
|
|
|
cancelMap.Delete(getRes.TraceId)
|
|
|
|
|
|
|
|
|
|
|
|
// 同步更新流程执行状态为已取消
|
|
|
|
|
|
_, err = flowDao.FlowExecutionDao.Update(ctx, &flowDto.UpdateFlowExecutionReq{
|
|
|
|
|
|
Id: getRes.Id,
|
|
|
|
|
|
Status: flow.FlowExecutionStatusCancel.Code(),
|
|
|
|
|
|
})
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return fmt.Errorf("更新取消状态失败: %v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
2026-05-12 13:34:28 +08:00
|
|
|
|
|
|
|
|
|
|
func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.ExecuteReq) (res *flowDto.ExecuteRes, err error) {
|
2026-05-15 09:37:23 +08:00
|
|
|
|
// ===================== 核心改造1:创建可取消的上下文 =====================
|
|
|
|
|
|
execCtx, cancel := context.WithCancel(ctx)
|
|
|
|
|
|
traceId := ""
|
|
|
|
|
|
defer func() {
|
|
|
|
|
|
// 流程结束(成功/失败)时清理 cancelMap
|
|
|
|
|
|
if traceId != "" {
|
|
|
|
|
|
cancelMap.Delete(traceId)
|
|
|
|
|
|
}
|
|
|
|
|
|
cancel()
|
|
|
|
|
|
}()
|
2026-05-12 13:34:28 +08:00
|
|
|
|
|
|
|
|
|
|
flowInfo, err := flowDao.FlowExecutionDao.Get(ctx, &flowDto.GetFlowExecutionReq{
|
|
|
|
|
|
SessionId: req.SessionId,
|
|
|
|
|
|
})
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
var executionId int64
|
2026-05-15 09:37:23 +08:00
|
|
|
|
var isDialogue bool
|
2026-05-12 13:34:28 +08:00
|
|
|
|
if flowInfo == nil {
|
2026-05-15 09:37:23 +08:00
|
|
|
|
isDialogue = false
|
2026-05-12 13:34:28 +08:00
|
|
|
|
var r = new(flowDto.CreateFlowExecutionReq)
|
|
|
|
|
|
r.FlowUserId = req.FlowId
|
|
|
|
|
|
r.FlowName = req.FlowName
|
|
|
|
|
|
r.TriggerType = flow.FlowExecutionTriggerTypeManual.Code()
|
|
|
|
|
|
r.FlowContent = req.FlowContent
|
|
|
|
|
|
r.NodeInputParams = req.NodeInputParams
|
|
|
|
|
|
r.SessionId = req.SessionId
|
|
|
|
|
|
r.Status = flow.FlowExecutionStatusRunning.Code()
|
|
|
|
|
|
span := trace.SpanFromContext(ctx)
|
|
|
|
|
|
if span != nil && span.SpanContext().HasTraceID() {
|
|
|
|
|
|
r.TraceId = span.SpanContext().TraceID().String()
|
2026-05-15 09:37:23 +08:00
|
|
|
|
traceId = r.TraceId
|
|
|
|
|
|
cancelMap.Store(traceId, cancel)
|
2026-05-12 13:34:28 +08:00
|
|
|
|
}
|
|
|
|
|
|
executionId, err = flowDao.FlowExecutionDao.Insert(ctx, r)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
} else {
|
2026-05-15 09:37:23 +08:00
|
|
|
|
isDialogue = true
|
2026-05-12 13:34:28 +08:00
|
|
|
|
executionId = flowInfo.Id
|
2026-05-15 09:37:23 +08:00
|
|
|
|
span := trace.SpanFromContext(ctx)
|
|
|
|
|
|
if span != nil && span.SpanContext().HasTraceID() {
|
|
|
|
|
|
traceId = span.SpanContext().TraceID().String()
|
|
|
|
|
|
cancelMap.Store(traceId, cancel)
|
|
|
|
|
|
}
|
|
|
|
|
|
executionReq := flowDto.UpdateFlowExecutionReq{
|
|
|
|
|
|
Id: executionId,
|
|
|
|
|
|
Status: flow.FlowExecutionStatusRunning.Code(),
|
|
|
|
|
|
TraceId: traceId,
|
|
|
|
|
|
}
|
|
|
|
|
|
_, err = flowDao.FlowExecutionDao.Update(ctx, &executionReq)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
2026-05-12 13:34:28 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-05-15 09:37:23 +08:00
|
|
|
|
if !g.IsEmpty(req.FileUrl) {
|
|
|
|
|
|
createFileTempReq := make([]*fileDto.CreateFileTempReq, 0, len(req.FileUrl))
|
|
|
|
|
|
for _, fileUrl := range req.FileUrl {
|
|
|
|
|
|
var createReq = new(fileDto.CreateFileTempReq)
|
|
|
|
|
|
createReq.BusinessId = req.SessionId
|
|
|
|
|
|
createReq.FileUrl = fileUrl
|
|
|
|
|
|
createFileTempReq = append(createFileTempReq, createReq)
|
|
|
|
|
|
}
|
|
|
|
|
|
_, err = fileDao.FileTempDao.BatchInsert(ctx, createFileTempReq)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
2026-05-12 13:34:28 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-05-15 09:37:23 +08:00
|
|
|
|
//_, err = flowDao.FlowUserDao.Update(ctx, &flowDto.UpdateFlowUserReq{
|
|
|
|
|
|
// Id: req.FlowId,
|
|
|
|
|
|
// FlowContent: req.FlowContent,
|
|
|
|
|
|
// NodeInputParams: req.NodeInputParams,
|
|
|
|
|
|
//})
|
|
|
|
|
|
//if err != nil {
|
|
|
|
|
|
// return nil, err
|
|
|
|
|
|
//}
|
|
|
|
|
|
|
2026-05-12 13:34:28 +08:00
|
|
|
|
//nodeInsert := make([]*nodeDto.CreateNodeExecutionReq, 0, len(flowInfo.NodeInputParams))
|
|
|
|
|
|
//for _, flowNode := range flowInfo.NodeInputParams {
|
|
|
|
|
|
// nodeInsert = append(nodeInsert, &nodeDto.CreateNodeExecutionReq{
|
|
|
|
|
|
// FlowExecutionId: executionId,
|
|
|
|
|
|
// NodeId: flowNode.Id,
|
|
|
|
|
|
// Status: node.NodeExecutionStatusWait.Code(),
|
|
|
|
|
|
// NodeInputParams: flowNode,
|
|
|
|
|
|
// TraceId: r.TraceId,
|
|
|
|
|
|
// })
|
|
|
|
|
|
//}
|
|
|
|
|
|
//_, err = nodeDao.NodeExecutionDao.BatchInsert(ctx, nodeInsert)
|
|
|
|
|
|
//if err != nil {
|
|
|
|
|
|
// return
|
|
|
|
|
|
//}
|
|
|
|
|
|
|
|
|
|
|
|
// =========================================================================
|
|
|
|
|
|
// ✅【第1步】给所有判断节点自动生成意图识别节点
|
|
|
|
|
|
// =========================================================================
|
|
|
|
|
|
judge2IntentNodeMap := make(map[string]string)
|
|
|
|
|
|
finalNodes := make([]entity.FlowNode, 0, len(req.FlowContent.Nodes)*2)
|
|
|
|
|
|
for _, item := range req.FlowContent.Nodes {
|
|
|
|
|
|
finalNodes = append(finalNodes, item)
|
|
|
|
|
|
// 判断节点自动加 intent 节点
|
|
|
|
|
|
if item.NodeCode == node.NodeTypeJudge {
|
|
|
|
|
|
intentNodeID := fmt.Sprintf("intent_%s", item.Id)
|
|
|
|
|
|
intentNode := entity.FlowNode{
|
|
|
|
|
|
Id: intentNodeID,
|
|
|
|
|
|
NodeCode: node.NodeTypeIntent,
|
|
|
|
|
|
Name: fmt.Sprintf("意图识别-%s", item.Name),
|
|
|
|
|
|
InputSource: item.InputSource, // ✅ 正确赋值
|
|
|
|
|
|
FormConfig: item.FormConfig, // ✅ 用户配置
|
|
|
|
|
|
ModelConfig: item.ModelConfig, // ✅ 系统配置
|
|
|
|
|
|
}
|
|
|
|
|
|
finalNodes = append(finalNodes, intentNode)
|
|
|
|
|
|
judge2IntentNodeMap[item.Id] = intentNodeID
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
summaryNodeID := "summary_node"
|
|
|
|
|
|
summaryNode := entity.FlowNode{
|
|
|
|
|
|
Id: summaryNodeID,
|
|
|
|
|
|
NodeCode: node.NodeTypeCustomNode, // 复用自定义节点类型,也可新增专属类型
|
|
|
|
|
|
Name: "结果汇总节点",
|
|
|
|
|
|
InputSource: []entity.FlowNodeInputSource{}, // 后续自动聚合所有节点输出
|
|
|
|
|
|
FormConfig: nil,
|
|
|
|
|
|
ModelConfig: node.ModelItem{},
|
|
|
|
|
|
}
|
|
|
|
|
|
finalNodes = append(finalNodes, summaryNode)
|
|
|
|
|
|
|
|
|
|
|
|
// 替换节点列表
|
|
|
|
|
|
req.FlowContent.Nodes = finalNodes
|
|
|
|
|
|
|
|
|
|
|
|
// =========================================================================
|
|
|
|
|
|
// ✅【第2步】构建执行图
|
|
|
|
|
|
// =========================================================================
|
2026-05-15 09:37:23 +08:00
|
|
|
|
runGraph, err := BuildGraphFromFlowContent(execCtx, req.FlowContent, judge2IntentNodeMap, summaryNodeID)
|
2026-05-12 13:34:28 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
executionReq := flowDto.UpdateFlowExecutionReq{
|
2026-05-15 09:37:23 +08:00
|
|
|
|
Id: executionId,
|
|
|
|
|
|
Status: flow.FlowExecutionStatusFailed.Code(),
|
|
|
|
|
|
ErrorMessage: err.Error(),
|
2026-05-12 13:34:28 +08:00
|
|
|
|
}
|
|
|
|
|
|
_, err1 := flowDao.FlowExecutionDao.Update(ctx, &executionReq)
|
|
|
|
|
|
if err1 != nil {
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
2026-05-15 09:37:23 +08:00
|
|
|
|
return nil, fmt.Errorf("执行工作流失败: %v", err)
|
2026-05-12 13:34:28 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// =========================================================================
|
|
|
|
|
|
// ✅【第3步】构建 ConfigMap
|
|
|
|
|
|
// =========================================================================
|
|
|
|
|
|
configMap := make(map[string]*entity.FlowNode)
|
|
|
|
|
|
for _, cfg := range req.NodeInputParams {
|
|
|
|
|
|
configMap[cfg.Id] = cfg
|
|
|
|
|
|
}
|
|
|
|
|
|
// 自动给意图节点复制配置
|
|
|
|
|
|
for judgeID, intentID := range judge2IntentNodeMap {
|
|
|
|
|
|
if cfg, ok := configMap[judgeID]; ok {
|
|
|
|
|
|
configMap[intentID] = cfg
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
// 初始化汇总节点配置
|
|
|
|
|
|
configMap[summaryNodeID] = &summaryNode
|
|
|
|
|
|
|
|
|
|
|
|
// =========================================================================
|
|
|
|
|
|
// ✅【第4步】构建全局执行入参(现在 schemaMap 是有值的!)
|
|
|
|
|
|
// =========================================================================
|
|
|
|
|
|
execInput := &flowDto.FlowExecutionInput{
|
2026-05-15 09:37:23 +08:00
|
|
|
|
IsDialogue: isDialogue,
|
2026-05-12 13:34:28 +08:00
|
|
|
|
ExecutionId: executionId,
|
|
|
|
|
|
ConfigMap: configMap,
|
|
|
|
|
|
SessionId: req.SessionId,
|
|
|
|
|
|
Desc: req.Desc,
|
|
|
|
|
|
SkillName: req.SkillName,
|
|
|
|
|
|
FileUrl: req.FileUrl,
|
|
|
|
|
|
}
|
|
|
|
|
|
// 执行工作流
|
2026-05-15 09:37:23 +08:00
|
|
|
|
_, err = runGraph.Invoke(execCtx, execInput)
|
2026-05-12 13:34:28 +08:00
|
|
|
|
if err != nil {
|
2026-05-15 09:37:23 +08:00
|
|
|
|
// 检测是否是取消导致的错误
|
|
|
|
|
|
if errors.Is(execCtx.Err(), context.Canceled) {
|
|
|
|
|
|
executionReq := flowDto.UpdateFlowExecutionReq{
|
|
|
|
|
|
Id: executionId,
|
|
|
|
|
|
Status: flow.FlowExecutionStatusCancel.Code(),
|
|
|
|
|
|
}
|
|
|
|
|
|
_, _ = flowDao.FlowExecutionDao.Update(ctx, &executionReq)
|
|
|
|
|
|
return nil, fmt.Errorf("工作流已被取消: %v", err)
|
|
|
|
|
|
}
|
2026-05-12 13:34:28 +08:00
|
|
|
|
executionReq := flowDto.UpdateFlowExecutionReq{
|
2026-05-15 09:37:23 +08:00
|
|
|
|
Id: executionId,
|
|
|
|
|
|
Status: flow.FlowExecutionStatusFailed.Code(),
|
|
|
|
|
|
ErrorMessage: err.Error(),
|
2026-05-12 13:34:28 +08:00
|
|
|
|
}
|
|
|
|
|
|
_, err1 := flowDao.FlowExecutionDao.Update(ctx, &executionReq)
|
|
|
|
|
|
if err1 != nil {
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
2026-05-15 09:37:23 +08:00
|
|
|
|
return nil, fmt.Errorf("执行工作流失败: %v", err)
|
2026-05-12 13:34:28 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// BuildGraphFromFlowContent 根据前端保存的工作流JSON,自动构建执行图
|
|
|
|
|
|
func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo, judge2IntentNodeMap map[string]string, summaryNodeID string) (compose.Runnable[any, any], error) {
|
|
|
|
|
|
graph := compose.NewGraph[any, any]()
|
|
|
|
|
|
nodeMap := make(map[string]entity.FlowNode)
|
|
|
|
|
|
|
|
|
|
|
|
// 注册所有节点
|
|
|
|
|
|
for _, item := range flowContent.Nodes {
|
|
|
|
|
|
nodeMap[item.Id] = item
|
|
|
|
|
|
if item.NodeCode != node.NodeTypeJudge {
|
|
|
|
|
|
registerNodeToGraph(graph, item)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 构建边关系
|
|
|
|
|
|
upstreamMap := make(map[string][]string)
|
|
|
|
|
|
edgeMap := make(map[string][]entity.FlowEdge)
|
|
|
|
|
|
for _, edge := range flowContent.Edges {
|
|
|
|
|
|
edgeMap[edge.From] = append(edgeMap[edge.From], edge)
|
|
|
|
|
|
upstreamMap[edge.To] = append(upstreamMap[edge.To], edge.From)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 处理连线 & 分支
|
|
|
|
|
|
for fromNodeID, edges := range edgeMap {
|
|
|
|
|
|
fromNode := nodeMap[fromNodeID]
|
|
|
|
|
|
|
|
|
|
|
|
// --------------------------
|
|
|
|
|
|
// 判断节点 → 分支处理
|
|
|
|
|
|
// --------------------------
|
|
|
|
|
|
if fromNode.NodeCode == node.NodeTypeJudge {
|
|
|
|
|
|
intentNodeID, ok := judge2IntentNodeMap[fromNodeID]
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return nil, fmt.Errorf("判断节点[%s]未生成意图节点", fromNodeID)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
branchMap := make(map[string]bool)
|
|
|
|
|
|
for _, e := range edges {
|
|
|
|
|
|
branchMap[e.To] = true
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
judgeLambda := func(ctx context.Context, input any) (string, error) {
|
|
|
|
|
|
execInput, ok := input.(*flowDto.FlowExecutionInput)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return "", fmt.Errorf("入参类型错误")
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
currentConfig := execInput.ConfigMap[fromNodeID]
|
|
|
|
|
|
if currentConfig == nil {
|
|
|
|
|
|
return "", fmt.Errorf("判断节点%s无配置", fromNodeID)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
branchIdNameMap := make(map[string]string)
|
|
|
|
|
|
var branchIDs []string
|
|
|
|
|
|
for nodeID := range branchMap {
|
|
|
|
|
|
branchIDs = append(branchIDs, nodeID)
|
|
|
|
|
|
// 从configMap获取分支节点的名称
|
|
|
|
|
|
if branchNodeCfg, ok := execInput.ConfigMap[nodeID]; ok {
|
|
|
|
|
|
branchIdNameMap[nodeID] = branchNodeCfg.Name
|
|
|
|
|
|
} else {
|
|
|
|
|
|
branchIdNameMap[nodeID] = "未命名节点" // 兜底
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 把分支ID-名称映射塞进 ModelConfig,带给意图节点
|
|
|
|
|
|
m := make(map[string]interface{})
|
|
|
|
|
|
m["branch_ids"] = branchIDs
|
|
|
|
|
|
m["branch_id_name_map"] = branchIdNameMap // 传递ID-名称映射
|
|
|
|
|
|
currentConfig.Config = m
|
|
|
|
|
|
|
|
|
|
|
|
// 从意图节点取输出
|
|
|
|
|
|
if intentCfg, ok := execInput.ConfigMap[intentNodeID]; ok {
|
|
|
|
|
|
currentConfig.OutputResult = intentCfg.OutputResult
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 关键修改:构造 NodeExecutionInput 传入 JudgeLambda
|
|
|
|
|
|
nodeExecInput := &flowDto.NodeExecutionInput{
|
|
|
|
|
|
Config: currentConfig, // 当前判断节点配置
|
|
|
|
|
|
Global: execInput, // 全局执行入参
|
|
|
|
|
|
}
|
|
|
|
|
|
return JudgeLambda(ctx, nodeExecInput) // 传入 NodeExecutionInput 类型
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
_ = graph.AddBranch(intentNodeID, compose.NewGraphBranch(judgeLambda, branchMap))
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// --------------------------
|
|
|
|
|
|
// 普通节点连线
|
|
|
|
|
|
// --------------------------
|
|
|
|
|
|
for _, e := range edges {
|
|
|
|
|
|
toNode := nodeMap[e.To]
|
|
|
|
|
|
if toNode.NodeCode == node.NodeTypeJudge {
|
|
|
|
|
|
_ = graph.AddEdge(e.From, fmt.Sprintf("intent_%s", toNode.Id))
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
_ = graph.AddEdge(e.From, e.To)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ==================== 第四步:处理开始/结束节点 ====================
|
|
|
|
|
|
if flowContent.StartNodeId != "" {
|
|
|
|
|
|
_ = graph.AddEdge(compose.START, flowContent.StartNodeId)
|
|
|
|
|
|
}
|
|
|
|
|
|
originalEndNodes := findEndNodes(flowContent.StartNodeId, flowContent.Edges)
|
|
|
|
|
|
for _, endID := range originalEndNodes {
|
|
|
|
|
|
_ = graph.AddEdge(endID, summaryNodeID)
|
|
|
|
|
|
}
|
|
|
|
|
|
_ = graph.AddEdge(summaryNodeID, compose.END)
|
|
|
|
|
|
|
|
|
|
|
|
return graph.Compile(ctx, compose.WithGraphName("auto_build_workflow"))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// -------------------------- 节点自动注册器(核心分发) --------------------------
|
|
|
|
|
|
func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNode) {
|
|
|
|
|
|
nodeID := flowNode.Id
|
|
|
|
|
|
code := flowNode.NodeCode
|
|
|
|
|
|
|
|
|
|
|
|
// 通用包装:全程入参都是 *FlowExecutionInput
|
|
|
|
|
|
wrapLambda := func(lambda func(ctx context.Context, input any) (any, error)) func(ctx context.Context, input any) (any, error) {
|
|
|
|
|
|
return func(ctx context.Context, input any) (any, error) {
|
|
|
|
|
|
// ✅ 【关键】全程入参类型永远不变
|
|
|
|
|
|
execInput, ok := input.(*flowDto.FlowExecutionInput)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
return nil, fmt.Errorf("入参必须是 *FlowExecutionInput, 实际是 %T", input)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
configMap := execInput.ConfigMap
|
|
|
|
|
|
currentConfig := configMap[nodeID]
|
|
|
|
|
|
if currentConfig == nil {
|
|
|
|
|
|
return nil, fmt.Errorf("节点%s无配置", nodeID)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 获取入参 - 适配切片类型:遍历所有来源节点
|
|
|
|
|
|
var realInput any
|
|
|
|
|
|
if len(flowNode.InputSource) > 0 { // 改为判断切片长度
|
|
|
|
|
|
// 遍历所有指定的来源节点,聚合输出结果
|
|
|
|
|
|
for _, inputSource := range flowNode.InputSource { // 遍历切片
|
|
|
|
|
|
if sourceConfig, ok := configMap[inputSource.NodeId]; ok {
|
|
|
|
|
|
currentConfig.OutputResult = append(currentConfig.OutputResult, sourceConfig.OutputResult...)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ✅ 封装节点执行入参(配置+表单架构)
|
|
|
|
|
|
realInput = &flowDto.NodeExecutionInput{
|
|
|
|
|
|
Config: currentConfig,
|
|
|
|
|
|
Global: execInput, // ✅ 把【全部节点】的对象直接塞进来
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 执行节点
|
|
|
|
|
|
output, err := lambda(ctx, realInput)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
// ✅ 自动把当前节点ID 加入已执行列表
|
|
|
|
|
|
execInput.ExecutedNodes = append(execInput.ExecutedNodes, nodeID)
|
|
|
|
|
|
|
|
|
|
|
|
// 输出存入 FlowNodeConfig
|
|
|
|
|
|
if outConfig, ok := output.(*entity.FlowNode); ok {
|
|
|
|
|
|
currentConfig.OutputResult = outConfig.OutputResult
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ✅ 关键:返回整个 execInput,让下一个节点继续用!
|
|
|
|
|
|
return execInput, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
if nodeID == "summary_node" {
|
|
|
|
|
|
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(SummaryLambda)))
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
switch code {
|
|
|
|
|
|
case "__start__":
|
|
|
|
|
|
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(StartLambda)))
|
|
|
|
|
|
case node.NodeTypeTextModel:
|
|
|
|
|
|
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(TextModelLambda)))
|
|
|
|
|
|
case node.NodeTypeImageModel:
|
|
|
|
|
|
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(ImageModelLambda)))
|
|
|
|
|
|
case node.NodeTypeVideoModel:
|
|
|
|
|
|
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(VideoModelLambda)))
|
|
|
|
|
|
case node.NodeTypeAudioModel:
|
|
|
|
|
|
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(AudioModelLambda)))
|
|
|
|
|
|
case node.NodeTypeCustomNode:
|
|
|
|
|
|
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(CustomLambda)))
|
|
|
|
|
|
case node.NodeTypeForm:
|
|
|
|
|
|
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(FormLambda)))
|
|
|
|
|
|
case node.NodeTypeIntent:
|
|
|
|
|
|
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(IntentLambda)))
|
|
|
|
|
|
case node.NodeTypeMerge:
|
|
|
|
|
|
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(MergeLambda)))
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// --------------------------------------------------------------------
|
|
|
|
|
|
// ✅【工具方法】找出所有没有出边的节点 → 作为结束节点连接 END
|
|
|
|
|
|
// --------------------------------------------------------------------
|
|
|
|
|
|
func findEndNodes(startNodeId string, edges []entity.FlowEdge) []string {
|
|
|
|
|
|
// 构建 节点 → 后续节点 的映射
|
|
|
|
|
|
nextMap := make(map[string][]string)
|
|
|
|
|
|
for _, e := range edges {
|
|
|
|
|
|
nextMap[e.From] = append(nextMap[e.From], e.To)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
endNodeSet := make(map[string]struct{})
|
|
|
|
|
|
|
|
|
|
|
|
// 🚀 只从【开始节点】递归遍历(关键修复)
|
|
|
|
|
|
findLeafNodes(startNodeId, nextMap, endNodeSet)
|
|
|
|
|
|
|
|
|
|
|
|
// 转成数组返回
|
|
|
|
|
|
endNodes := make([]string, 0, len(endNodeSet))
|
|
|
|
|
|
for id := range endNodeSet {
|
|
|
|
|
|
endNodes = append(endNodes, id)
|
|
|
|
|
|
}
|
|
|
|
|
|
return endNodes
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// --------------------------------------------------------------------
|
|
|
|
|
|
// ✅ 递归:查找以 nodeId 开头的所有叶子节点
|
|
|
|
|
|
// --------------------------------------------------------------------
|
|
|
|
|
|
func findLeafNodes(nodeId string, nextMap map[string][]string, endNodeSet map[string]struct{}) {
|
|
|
|
|
|
nextNodes := nextMap[nodeId]
|
|
|
|
|
|
|
|
|
|
|
|
// 🚩 没有下一个节点 = 真实结束节点
|
|
|
|
|
|
if len(nextNodes) == 0 {
|
|
|
|
|
|
endNodeSet[nodeId] = struct{}{}
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 递归继续找下一个
|
|
|
|
|
|
for _, nextId := range nextNodes {
|
|
|
|
|
|
findLeafNodes(nextId, nextMap, endNodeSet)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|