feat: 新增工作流执行模块

新增流程执行记录的实体、DTO、DAO、控制器和服务层,支持工作流的执行、回调及结果树状列表查询;同时更新服务名称为 ai-agent。
This commit is contained in:
2026-05-12 13:34:28 +08:00
parent 2aec7fe30f
commit 7c26914353
42 changed files with 4146 additions and 11 deletions

View File

@@ -0,0 +1,612 @@
package flow
import (
"ai-agent/workflow/consts/flow"
"ai-agent/workflow/consts/node"
flowDao "ai-agent/workflow/dao/flow"
flowDto "ai-agent/workflow/model/dto/flow"
"ai-agent/workflow/model/entity"
"context"
"fmt"
"sort"
"strconv"
"strings"
"sync"
"gitea.com/red-future/common/utils"
"github.com/cloudwego/eino/compose"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
"go.opentelemetry.io/otel/trace"
)
var FlowExecutionService = &flowExecutionService{}
type flowExecutionService struct{}
func (s *flowExecutionService) List(ctx context.Context, req *flowDto.ListFlowExecutionReq) (res *flowDto.ListFlowExecutionTreeRes, err error) {
user, err := utils.GetUserInfo(ctx)
if err != nil {
return
}
req.Creator = user.UserName
list, _, err := flowDao.FlowExecutionDao.List(ctx, req)
if err != nil {
return nil, err
}
// ===================== 核心修复:只统计【有数据】的执行记录,空的直接跳过 =====================
executionNumber := make(map[int64]int) // executionId -> 倒序编号(最新=1
// 第一次遍历:只处理【有输出参数】的记录,统计并分配编号
var validList []*entity.FlowExecution // 只存有效(非空)记录
for _, execution := range list {
if g.IsEmpty(execution.OutputParams) {
continue // 空数据直接过滤,不参与编号、不展示
}
validList = append(validList, execution)
}
// 给有效记录分配【时间倒序编号】(最新=1
totalValid := len(validList)
for idx, execution := range validList {
executionNumber[execution.Id] = totalValid - idx
}
// 2. 分组映射:日期 -> 流程节点
type flowWrap struct {
flowNode flowDto.FlowNode
createdAt *gtime.Time
}
dateMap := make(map[string]*[]flowWrap)
// 遍历【有效数据】构建结构
for _, execution := range validList {
createDate := execution.CreatedAt.Format("Y-m-d")
flowName := execution.FlowName
outputParams := execution.OutputParams
// 编号只算有效数据,不会把空的算进去
num := executionNumber[execution.Id]
displayFlowName := fmt.Sprintf("会话-%d(%s)", num, flowName)
// 3. 解析 outputParams
var tempItems []flowDto.OutputItem
for _, paramMap := range outputParams {
for tsKey, value := range paramMap {
if _, err := strconv.ParseInt(tsKey, 10, 64); err != nil {
continue
}
tempItems = append(tempItems, flowDto.OutputItem{
Timestamp: tsKey,
Content: gconv.String(value),
})
}
}
// ===================== 修复1如果解析后依然为空直接跳过不生成第二层节点 =====================
if len(tempItems) == 0 {
continue
}
// 时间戳正序
sort.Slice(tempItems, func(i, j int) bool {
t1, _ := strconv.ParseInt(tempItems[i].Timestamp, 10, 64)
t2, _ := strconv.ParseInt(tempItems[j].Timestamp, 10, 64)
return t1 < t2
})
// 标号:相同类型递增,不同重置
suffixCount := make(map[string]int)
for idx := range tempItems {
item := &tempItems[idx]
val := item.Content
suffix := "内容"
switch {
case strings.Contains(val, "img") || strings.Contains(val, "png") || strings.Contains(val, "jpg"):
suffix = "图片"
case strings.Contains(val, "html") || strings.Contains(val, "HTML"):
suffix = "HTML"
case strings.Contains(val, "txt") || len(val) > 50:
suffix = "文案"
}
suffixCount[suffix]++
item.Label = fmt.Sprintf("%s_%d", suffix, suffixCount[suffix])
}
// 组装节点
node := flowDto.FlowNode{
FlowName: displayFlowName,
FlowId: execution.FlowUserId,
SessionId: gconv.String(execution.SessionId),
Items: tempItems,
}
if dateMap[createDate] == nil {
dateMap[createDate] = &[]flowWrap{}
}
*dateMap[createDate] = append(*dateMap[createDate], flowWrap{
flowNode: node,
createdAt: execution.CreatedAt,
})
}
// 6. 构建树 + 排序
var tree []flowDto.DateNode
for date, wraps := range dateMap {
// 第二层按创建时间倒序(最新在前)
sort.Slice(*wraps, func(i, j int) bool {
return (*wraps)[i].createdAt.After((*wraps)[j].createdAt)
})
var flowNodes []flowDto.FlowNode
for _, w := range *wraps {
flowNodes = append(flowNodes, w.flowNode)
}
// ===================== 修复2日期下没有流程也过滤掉 =====================
if len(flowNodes) == 0 {
continue
}
tree = append(tree, flowDto.DateNode{
CreateDate: date,
Flows: flowNodes,
})
}
// 第一层日期倒序
sort.Slice(tree, func(i, j int) bool {
return tree[i].CreateDate > tree[j].CreateDate
})
imgPrefix, err := utils.GetFileAddressPrefix(ctx)
return &flowDto.ListFlowExecutionTreeRes{
Tree: tree,
ImgAddressPrefix: imgPrefix,
}, nil
}
// ModelCallback 模型回调接口
func (s *flowExecutionService) ModelCallback(ctx context.Context, req *flowDto.ModelCallbackReq) (err error) {
// 唤醒等待的任务
Notify(req.TaskId, req)
return nil
}
// 全局等待任务回调的工具
var (
asyncMu sync.Mutex
asyncTasks = make(map[string]chan any)
)
// Wait 阻塞等待回调结果
// 调用后会一直卡住,直到 Notify 唤醒 或 超时/取消
func Wait(ctx context.Context, taskId string) (any, error) {
asyncMu.Lock()
ch := make(chan any, 1)
asyncTasks[taskId] = ch
asyncMu.Unlock()
select {
case result := <-ch:
return result, nil
case <-ctx.Done():
asyncMu.Lock()
delete(asyncTasks, taskId)
asyncMu.Unlock()
return nil, ctx.Err()
}
}
// Notify 回调时调用,唤醒等待的任务
func Notify(taskId string, result any) {
asyncMu.Lock()
defer asyncMu.Unlock()
ch, exist := asyncTasks[taskId]
if !exist {
return
}
ch <- result
delete(asyncTasks, taskId)
}
//func (s *flowExecutionService) Cancel(ctx context.Context, req *flowDto.CancelReq) (err error) {
// res, err := flowDao.FlowExecutionDao.Get(ctx, &flowDto.GetFlowExecutionReq{
// Id: req.FlowId,
// })
// res.TraceId
// return
//}
func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.ExecuteReq) (res *flowDto.ExecuteRes, err error) {
flowInfo, err := flowDao.FlowExecutionDao.Get(ctx, &flowDto.GetFlowExecutionReq{
SessionId: req.SessionId,
})
if err != nil {
return
}
var executionId int64
if flowInfo == nil {
var r = new(flowDto.CreateFlowExecutionReq)
r.FlowUserId = req.FlowId
r.FlowName = req.FlowName
r.TriggerType = flow.FlowExecutionTriggerTypeManual.Code()
r.FlowContent = req.FlowContent
r.NodeInputParams = req.NodeInputParams
r.SessionId = req.SessionId
r.Status = flow.FlowExecutionStatusRunning.Code()
span := trace.SpanFromContext(ctx)
if span != nil && span.SpanContext().HasTraceID() {
r.TraceId = span.SpanContext().TraceID().String()
}
executionId, err = flowDao.FlowExecutionDao.Insert(ctx, r)
if err != nil {
return
}
} else {
executionId = flowInfo.Id
}
_, err = flowDao.FlowUserDao.Update(ctx, &flowDto.UpdateFlowUserReq{
Id: req.FlowId,
FlowContent: req.FlowContent,
NodeInputParams: req.NodeInputParams,
})
if err != nil {
return nil, err
}
//nodeInsert := make([]*nodeDto.CreateNodeExecutionReq, 0, len(flowInfo.NodeInputParams))
//for _, flowNode := range flowInfo.NodeInputParams {
// nodeInsert = append(nodeInsert, &nodeDto.CreateNodeExecutionReq{
// FlowExecutionId: executionId,
// NodeId: flowNode.Id,
// Status: node.NodeExecutionStatusWait.Code(),
// NodeInputParams: flowNode,
// TraceId: r.TraceId,
// })
//}
//_, err = nodeDao.NodeExecutionDao.BatchInsert(ctx, nodeInsert)
//if err != nil {
// return
//}
// =========================================================================
// ✅【第1步】给所有判断节点自动生成意图识别节点
// =========================================================================
judge2IntentNodeMap := make(map[string]string)
finalNodes := make([]entity.FlowNode, 0, len(req.FlowContent.Nodes)*2)
for _, item := range req.FlowContent.Nodes {
finalNodes = append(finalNodes, item)
// 判断节点自动加 intent 节点
if item.NodeCode == node.NodeTypeJudge {
intentNodeID := fmt.Sprintf("intent_%s", item.Id)
intentNode := entity.FlowNode{
Id: intentNodeID,
NodeCode: node.NodeTypeIntent,
Name: fmt.Sprintf("意图识别-%s", item.Name),
InputSource: item.InputSource, // ✅ 正确赋值
FormConfig: item.FormConfig, // ✅ 用户配置
ModelConfig: item.ModelConfig, // ✅ 系统配置
}
finalNodes = append(finalNodes, intentNode)
judge2IntentNodeMap[item.Id] = intentNodeID
}
}
summaryNodeID := "summary_node"
summaryNode := entity.FlowNode{
Id: summaryNodeID,
NodeCode: node.NodeTypeCustomNode, // 复用自定义节点类型,也可新增专属类型
Name: "结果汇总节点",
InputSource: []entity.FlowNodeInputSource{}, // 后续自动聚合所有节点输出
FormConfig: nil,
ModelConfig: node.ModelItem{},
}
finalNodes = append(finalNodes, summaryNode)
// 替换节点列表
req.FlowContent.Nodes = finalNodes
// =========================================================================
// ✅【第2步】构建执行图
// =========================================================================
runGraph, err := BuildGraphFromFlowContent(ctx, req.FlowContent, judge2IntentNodeMap, summaryNodeID)
if err != nil {
executionReq := flowDto.UpdateFlowExecutionReq{
Id: executionId,
}
executionReq.Status = flow.FlowExecutionStatusFailed.Code()
executionReq.ErrorMessage = err.Error()
_, err1 := flowDao.FlowExecutionDao.Update(ctx, &executionReq)
if err1 != nil {
return
}
return nil, err
}
// =========================================================================
// ✅【第3步】构建 ConfigMap
// =========================================================================
configMap := make(map[string]*entity.FlowNode)
for _, cfg := range req.NodeInputParams {
configMap[cfg.Id] = cfg
}
// 自动给意图节点复制配置
for judgeID, intentID := range judge2IntentNodeMap {
if cfg, ok := configMap[judgeID]; ok {
configMap[intentID] = cfg
}
}
// 初始化汇总节点配置
configMap[summaryNodeID] = &summaryNode
// =========================================================================
// ✅【第4步】构建全局执行入参现在 schemaMap 是有值的!)
// =========================================================================
execInput := &flowDto.FlowExecutionInput{
ExecutionId: executionId,
ConfigMap: configMap,
SessionId: req.SessionId,
Desc: req.Desc,
SkillName: req.SkillName,
FileUrl: req.FileUrl,
}
// 执行工作流
_, err = runGraph.Invoke(ctx, execInput)
if err != nil {
executionReq := flowDto.UpdateFlowExecutionReq{
Id: executionId,
}
executionReq.Status = flow.FlowExecutionStatusFailed.Code()
executionReq.ErrorMessage = err.Error()
_, err1 := flowDao.FlowExecutionDao.Update(ctx, &executionReq)
if err1 != nil {
return
}
return nil, err
}
return
}
// BuildGraphFromFlowContent 根据前端保存的工作流JSON自动构建执行图
func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo, judge2IntentNodeMap map[string]string, summaryNodeID string) (compose.Runnable[any, any], error) {
graph := compose.NewGraph[any, any]()
nodeMap := make(map[string]entity.FlowNode)
// 注册所有节点
for _, item := range flowContent.Nodes {
nodeMap[item.Id] = item
if item.NodeCode != node.NodeTypeJudge {
registerNodeToGraph(graph, item)
}
}
// 构建边关系
upstreamMap := make(map[string][]string)
edgeMap := make(map[string][]entity.FlowEdge)
for _, edge := range flowContent.Edges {
edgeMap[edge.From] = append(edgeMap[edge.From], edge)
upstreamMap[edge.To] = append(upstreamMap[edge.To], edge.From)
}
// 处理连线 & 分支
for fromNodeID, edges := range edgeMap {
fromNode := nodeMap[fromNodeID]
// --------------------------
// 判断节点 → 分支处理
// --------------------------
if fromNode.NodeCode == node.NodeTypeJudge {
intentNodeID, ok := judge2IntentNodeMap[fromNodeID]
if !ok {
return nil, fmt.Errorf("判断节点[%s]未生成意图节点", fromNodeID)
}
branchMap := make(map[string]bool)
for _, e := range edges {
branchMap[e.To] = true
}
judgeLambda := func(ctx context.Context, input any) (string, error) {
execInput, ok := input.(*flowDto.FlowExecutionInput)
if !ok {
return "", fmt.Errorf("入参类型错误")
}
currentConfig := execInput.ConfigMap[fromNodeID]
if currentConfig == nil {
return "", fmt.Errorf("判断节点%s无配置", fromNodeID)
}
branchIdNameMap := make(map[string]string)
var branchIDs []string
for nodeID := range branchMap {
branchIDs = append(branchIDs, nodeID)
// 从configMap获取分支节点的名称
if branchNodeCfg, ok := execInput.ConfigMap[nodeID]; ok {
branchIdNameMap[nodeID] = branchNodeCfg.Name
} else {
branchIdNameMap[nodeID] = "未命名节点" // 兜底
}
}
// 把分支ID-名称映射塞进 ModelConfig带给意图节点
m := make(map[string]interface{})
m["branch_ids"] = branchIDs
m["branch_id_name_map"] = branchIdNameMap // 传递ID-名称映射
currentConfig.Config = m
// 从意图节点取输出
if intentCfg, ok := execInput.ConfigMap[intentNodeID]; ok {
currentConfig.OutputResult = intentCfg.OutputResult
}
// 关键修改:构造 NodeExecutionInput 传入 JudgeLambda
nodeExecInput := &flowDto.NodeExecutionInput{
Config: currentConfig, // 当前判断节点配置
Global: execInput, // 全局执行入参
}
return JudgeLambda(ctx, nodeExecInput) // 传入 NodeExecutionInput 类型
}
_ = graph.AddBranch(intentNodeID, compose.NewGraphBranch(judgeLambda, branchMap))
continue
}
// --------------------------
// 普通节点连线
// --------------------------
for _, e := range edges {
toNode := nodeMap[e.To]
if toNode.NodeCode == node.NodeTypeJudge {
_ = graph.AddEdge(e.From, fmt.Sprintf("intent_%s", toNode.Id))
continue
}
_ = graph.AddEdge(e.From, e.To)
}
}
// ==================== 第四步:处理开始/结束节点 ====================
if flowContent.StartNodeId != "" {
_ = graph.AddEdge(compose.START, flowContent.StartNodeId)
}
originalEndNodes := findEndNodes(flowContent.StartNodeId, flowContent.Edges)
for _, endID := range originalEndNodes {
_ = graph.AddEdge(endID, summaryNodeID)
}
_ = graph.AddEdge(summaryNodeID, compose.END)
return graph.Compile(ctx, compose.WithGraphName("auto_build_workflow"))
}
// -------------------------- 节点自动注册器(核心分发) --------------------------
func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNode) {
nodeID := flowNode.Id
code := flowNode.NodeCode
// 通用包装:全程入参都是 *FlowExecutionInput
wrapLambda := func(lambda func(ctx context.Context, input any) (any, error)) func(ctx context.Context, input any) (any, error) {
return func(ctx context.Context, input any) (any, error) {
// ✅ 【关键】全程入参类型永远不变
execInput, ok := input.(*flowDto.FlowExecutionInput)
if !ok {
return nil, fmt.Errorf("入参必须是 *FlowExecutionInput, 实际是 %T", input)
}
configMap := execInput.ConfigMap
currentConfig := configMap[nodeID]
if currentConfig == nil {
return nil, fmt.Errorf("节点%s无配置", nodeID)
}
// 获取入参 - 适配切片类型:遍历所有来源节点
var realInput any
if len(flowNode.InputSource) > 0 { // 改为判断切片长度
// 遍历所有指定的来源节点,聚合输出结果
for _, inputSource := range flowNode.InputSource { // 遍历切片
if sourceConfig, ok := configMap[inputSource.NodeId]; ok {
currentConfig.OutputResult = append(currentConfig.OutputResult, sourceConfig.OutputResult...)
}
}
}
// ✅ 封装节点执行入参(配置+表单架构)
realInput = &flowDto.NodeExecutionInput{
Config: currentConfig,
Global: execInput, // ✅ 把【全部节点】的对象直接塞进来
}
// 执行节点
output, err := lambda(ctx, realInput)
if err != nil {
return nil, err
}
// ✅ 自动把当前节点ID 加入已执行列表
execInput.ExecutedNodes = append(execInput.ExecutedNodes, nodeID)
// 输出存入 FlowNodeConfig
if outConfig, ok := output.(*entity.FlowNode); ok {
currentConfig.OutputResult = outConfig.OutputResult
}
// ✅ 关键:返回整个 execInput让下一个节点继续用
return execInput, nil
}
}
if nodeID == "summary_node" {
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(SummaryLambda)))
return
}
switch code {
case "__start__":
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(StartLambda)))
case node.NodeTypeTextModel:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(TextModelLambda)))
case node.NodeTypeImageModel:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(ImageModelLambda)))
case node.NodeTypeVideoModel:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(VideoModelLambda)))
case node.NodeTypeAudioModel:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(AudioModelLambda)))
case node.NodeTypeCustomNode:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(CustomLambda)))
case node.NodeTypeForm:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(FormLambda)))
case node.NodeTypeIntent:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(IntentLambda)))
case node.NodeTypeMerge:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(MergeLambda)))
}
}
// --------------------------------------------------------------------
// ✅【工具方法】找出所有没有出边的节点 → 作为结束节点连接 END
// --------------------------------------------------------------------
func findEndNodes(startNodeId string, edges []entity.FlowEdge) []string {
// 构建 节点 → 后续节点 的映射
nextMap := make(map[string][]string)
for _, e := range edges {
nextMap[e.From] = append(nextMap[e.From], e.To)
}
endNodeSet := make(map[string]struct{})
// 🚀 只从【开始节点】递归遍历(关键修复)
findLeafNodes(startNodeId, nextMap, endNodeSet)
// 转成数组返回
endNodes := make([]string, 0, len(endNodeSet))
for id := range endNodeSet {
endNodes = append(endNodes, id)
}
return endNodes
}
// --------------------------------------------------------------------
// ✅ 递归:查找以 nodeId 开头的所有叶子节点
// --------------------------------------------------------------------
func findLeafNodes(nodeId string, nextMap map[string][]string, endNodeSet map[string]struct{}) {
nextNodes := nextMap[nodeId]
// 🚩 没有下一个节点 = 真实结束节点
if len(nextNodes) == 0 {
endNodeSet[nodeId] = struct{}{}
return
}
// 递归继续找下一个
for _, nextId := range nextNodes {
findLeafNodes(nextId, nextMap, endNodeSet)
}
}

View File

@@ -0,0 +1,55 @@
package flow
import (
flowDao "ai-agent/workflow/dao/flow"
flowDto "ai-agent/workflow/model/dto/flow"
"context"
"github.com/gogf/gf/v2/util/gconv"
)
var FlowTemplateService = &flowTemplateService{}
type flowTemplateService struct{}
func (s *flowTemplateService) Create(ctx context.Context, req *flowDto.CreateFlowTemplateReq) (res *flowDto.CreateFlowTemplateRes, err error) {
req.NodeInputParams = ExtractFlowNodeFrom(req.FlowContent)
id, err := flowDao.FlowTemplateDao.Insert(ctx, req)
if err != nil {
return
}
return &flowDto.CreateFlowTemplateRes{Id: id}, nil
}
func (s *flowTemplateService) Update(ctx context.Context, req *flowDto.UpdateFlowTemplateReq) (err error) {
req.NodeInputParams = ExtractFlowNodeFrom(req.FlowContent)
_, err = flowDao.FlowTemplateDao.Update(ctx, req)
return
}
func (s *flowTemplateService) Delete(ctx context.Context, req *flowDto.DeleteFlowTemplateReq) (err error) {
_, err = flowDao.FlowTemplateDao.Delete(ctx, req)
return
}
func (s *flowTemplateService) Get(ctx context.Context, req *flowDto.GetFlowTemplateReq) (res *flowDto.FlowTemplateVO, err error) {
flowInfo, err := flowDao.FlowTemplateDao.Get(ctx, req)
if err != nil {
return
}
res = new(flowDto.FlowTemplateVO)
err = gconv.Struct(flowInfo, res)
return
}
func (s *flowTemplateService) List(ctx context.Context, req *flowDto.ListFlowTemplateReq) (res *flowDto.ListFlowTemplateRes, err error) {
list, total, err := flowDao.FlowTemplateDao.List(ctx, req)
if err != nil {
return
}
res = &flowDto.ListFlowTemplateRes{
Total: total,
}
err = gconv.Struct(list, &res.List)
return
}

View File

@@ -0,0 +1,185 @@
package flow
import (
"ai-agent/workflow/consts/flow"
flowDao "ai-agent/workflow/dao/flow"
flowDto "ai-agent/workflow/model/dto/flow"
"ai-agent/workflow/model/entity"
"context"
commonHttp "gitea.com/red-future/common/http"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
)
var FlowUserService = &flowUserService{}
type flowUserService struct{}
// IsAdmin 调用admin-go服务检查是否是管理员
func IsAdmin(ctx context.Context) (res bool, err error) {
headers := make(map[string]string)
if r := g.RequestFromCtx(ctx); r != nil {
for k, v := range r.Request.Header {
if len(v) > 0 {
headers[k] = v[0]
}
}
}
var r = make(map[string]bool)
if err = commonHttp.Get(ctx, "admin-go/api/v1/system/user/checkIsSuperAdmin", headers, &r); err != nil {
return false, err
}
return r["isSuperAdmin"], err
}
func (s *flowUserService) Create(ctx context.Context, req *flowDto.CreateFlowUserReq) (res *flowDto.CreateFlowUserRes, err error) {
admin, err := IsAdmin(ctx)
if err != nil {
return
}
req.NodeInputParams = ExtractFlowNodeFrom(req.FlowContent)
var id int64
if admin {
id, err = flowDao.FlowTemplateDao.Insert(ctx, &flowDto.CreateFlowTemplateReq{
FlowTemplateName: req.FlowName,
Description: req.Description,
FlowContent: req.FlowContent,
NodeInputParams: req.NodeInputParams,
Status: flow.FlowTemplateStatusEnable.Code(),
})
} else {
id, err = flowDao.FlowUserDao.Insert(ctx, req)
}
return &flowDto.CreateFlowUserRes{Id: id}, err
}
func (s *flowUserService) Update(ctx context.Context, req *flowDto.UpdateFlowUserReq) (err error) {
admin, err := IsAdmin(ctx)
if err != nil {
return
}
req.NodeInputParams = ExtractFlowNodeFrom(req.FlowContent)
get, err := flowDao.FlowTemplateDao.Get(ctx, &flowDto.GetFlowTemplateReq{
Id: req.Id,
})
if err != nil {
return err
}
if !g.IsEmpty(get) && !admin {
_, err = flowDao.FlowUserDao.Insert(ctx, &flowDto.CreateFlowUserReq{
FlowName: req.FlowName,
Description: req.Description,
FlowContent: req.FlowContent,
NodeInputParams: req.NodeInputParams,
SourceFlowTemplateId: get.Id,
})
if err != nil {
return
}
}
if admin {
_, err = flowDao.FlowTemplateDao.Update(ctx, &flowDto.UpdateFlowTemplateReq{
Id: req.Id,
FlowTemplateName: req.FlowName,
Description: req.Description,
FlowContent: req.FlowContent,
NodeInputParams: req.NodeInputParams,
Status: flow.FlowTemplateStatusEnable.Code(),
})
} else {
_, err = flowDao.FlowUserDao.Update(ctx, req)
}
return
}
func ExtractFlowNodeFrom(flowContent *entity.FlowInfo) []*entity.FlowNode {
var flowNodes []*entity.FlowNode
for _, item := range flowContent.Nodes {
flowNodes = append(flowNodes, &item)
}
return flowNodes
}
func (s *flowUserService) Delete(ctx context.Context, req *flowDto.DeleteFlowUserReq) (err error) {
admin, err := IsAdmin(ctx)
if err != nil {
return
}
if admin {
_, err = flowDao.FlowTemplateDao.Delete(ctx, &flowDto.DeleteFlowTemplateReq{
Id: req.Id,
})
} else {
_, err = flowDao.FlowUserDao.Delete(ctx, req)
}
return
}
func (s *flowUserService) Get(ctx context.Context, req *flowDto.GetFlowUserReq) (res *flowDto.FlowUserVO, err error) {
var flowInfo *entity.FlowTemplate
flowInfo, err = flowDao.FlowTemplateDao.Get(ctx, &flowDto.GetFlowTemplateReq{
Id: req.Id,
})
if err != nil {
return
}
if flowInfo != nil {
res = new(flowDto.FlowUserVO)
res.FlowName = flowInfo.FlowTemplateName
err = gconv.Struct(flowInfo, res)
return
}
var flowUserInfo *entity.FlowUser
flowUserInfo, err = flowDao.FlowUserDao.Get(ctx, req)
if err != nil {
return
}
res = new(flowDto.FlowUserVO)
err = gconv.Struct(flowUserInfo, res)
return
}
func (s *flowUserService) List(ctx context.Context, req *flowDto.ListFlowUserReq) (res *flowDto.ListFlowRes, err error) {
l, t, err := flowDao.FlowTemplateDao.List(ctx, &flowDto.ListFlowTemplateReq{
Keyword: req.Keyword,
Page: req.Page,
})
if err != nil {
return
}
r := &flowDto.ListFlowTemplateRes{
Total: t,
}
err = gconv.Struct(l, &r.List)
if err != nil {
return
}
list, total, err := flowDao.FlowUserDao.List(ctx, req)
if err != nil {
return
}
re := &flowDto.ListFlowUserRes{
Total: total,
}
err = gconv.Struct(list, &re.List)
if err != nil {
return
}
admin, err := IsAdmin(ctx)
if err != nil {
return
}
res = &flowDto.ListFlowRes{
ListFlowUserRes: re,
ListFlowTemplateRes: r,
IsAdmin: admin,
}
return
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,245 @@
package flow
import (
"ai-agent/workflow/model/dto"
flowDto "ai-agent/workflow/model/dto/flow"
"bytes"
"context"
"fmt"
"io"
"mime/multipart"
"net/http"
"strings"
commonHttp "gitea.com/red-future/common/http"
"gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
)
func GetIsChatModel(ctx context.Context) (*flowDto.GetIsChatModelRes, error) {
headers := make(map[string]string)
if r := g.RequestFromCtx(ctx); r != nil {
for k, v := range r.Request.Header {
if len(v) > 0 {
headers[k] = v[0]
}
}
}
res := new(flowDto.GetIsChatModelRes)
err := commonHttp.Get(ctx, "model-gateway/model/getIsChatModel", headers, res, nil)
if err != nil {
return nil, err
}
return res, nil
}
func CreateGatewayTask(ctx context.Context, req *flowDto.CreateTaskReq) (string, error) {
headers := make(map[string]string)
if r := g.RequestFromCtx(ctx); r != nil {
for k, v := range r.Request.Header {
if len(v) > 0 {
headers[k] = v[0]
}
}
}
res := new(flowDto.CreateTaskRes)
err := commonHttp.Post(ctx, "model-gateway/task/createTask", headers, res, &req)
if err != nil {
return "", err
}
return res.TaskId, nil
}
func ComposeMessages(ctx context.Context, req *flowDto.ComposeMessagesReq) (*flowDto.ComposeMessagesRes, error) {
headers := make(map[string]string)
if r := g.RequestFromCtx(ctx); r != nil {
for k, v := range r.Request.Header {
if len(v) > 0 {
headers[k] = v[0]
}
}
}
res := new(flowDto.ComposeMessagesRes)
err := commonHttp.Post(ctx, "prompts-core/prompt/composeMessages", headers, res, &req)
if err != nil {
return nil, err
}
return res, nil
}
func GatewayTask(ctx context.Context, epicycleId int64, model string, content map[string]any) (any, error) {
modelTaskId, err := CreateGatewayTask(ctx, &flowDto.CreateTaskReq{
ModelName: model,
BizName: g.Cfg().MustGet(ctx, "server.name").String(),
CallbackUrl: "/flow/execution/modelCallback",
RequestPayload: content,
EpicycleId: epicycleId,
})
if err != nil {
return nil, err
}
return Wait(ctx, modelTaskId)
}
func GetTaskResult(ctx context.Context, result any) (*flowDto.TaskCallback, error) {
task := new(flowDto.TaskCallback)
if err := gconv.Struct(result, task); err != nil {
return nil, err
}
url, err := utils.GetFileAddressPrefix(ctx)
if err != nil {
return nil, err
}
// 获取远程文件内容
file, err := FetchRemoteJsonFile(ctx, url+task.OssFile)
if err != nil {
return nil, err
}
task.Text = gconv.String(file)
return task, nil
}
func FetchRemoteJsonFile(ctx context.Context, fileUrl string) ([]byte, error) {
// 1. 下载文件
resp, err := g.Client().Get(ctx, fileUrl)
if err != nil {
return nil, fmt.Errorf("get file failed: %w", err)
}
defer resp.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("http status error: %d", resp.StatusCode)
}
return io.ReadAll(resp.Body)
}
func GetImageBytesFromURL(url string) (all []byte, contentType string, err error) {
resp, err := http.Get(url)
if err != nil {
return
}
defer resp.Body.Close()
all, err = io.ReadAll(resp.Body)
if err != nil {
return
}
contentType = resp.Header.Get("Content-Type")
return
}
func Upload(ctx context.Context, req *dto.UploadFileBytesReq) (*dto.UploadFileBytesRes, error) {
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)
part, err := writer.CreateFormFile("file", req.FileName)
if err != nil {
return nil, err
}
if _, err = part.Write(req.FileBytes); err != nil {
return nil, err
}
if err = writer.Close(); err != nil {
return nil, err
}
headers := make(map[string]string)
headers["Content-Type"] = writer.FormDataContentType()
if r := g.RequestFromCtx(ctx); r != nil {
if auth := r.Header.Get("Authorization"); auth != "" {
headers["Authorization"] = auth
}
}
// 发起上传请求
res := &dto.UploadFileBytesRes{}
url := "oss/file/uploadFile"
if err = commonHttp.Post(ctx, url, headers, res, body.Bytes()); err != nil {
return nil, err
}
g.Log().Infof(ctx, "[Upload] success url=%s size=%d", res.FileURL, res.FileSize)
return res, nil
}
func buildMergeHtml(texts []string, images []string) string {
html := strings.Builder{}
html.WriteString(`
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: "Microsoft YaHei", Arial, sans-serif;
background: #fff;
color: #333;
}
.container {
max-width: 960px;
margin: 0 auto;
}
/* 图片:完全贴边,无额外间距 */
.image-block {
width: 100%;
margin: 0;
padding: 0;
}
.image-block img {
width: 100%;
height: auto;
display: block;
border-radius: 0;
}
/* 文案:极致紧凑 */
.text-block {
margin: 0;
padding: 16px; /* 仅保留内边距,不设外边距 */
line-height: 1.6;
font-size: 14px;
color: #444;
white-space: pre-wrap;
}
/* 分割线:完全去掉,改用内边距自然分隔 */
</style>
</head>
<body>
<div class="container">
`)
// 1. 先渲染图片(无任何上下边距,占满宽度)
if len(images) > 0 {
html.WriteString(`<div class="image-block">`)
for _, img := range images {
html.WriteString(fmt.Sprintf(`<img src="%s" alt="" />`, img))
}
html.WriteString(`</div>`)
}
// 2. 渲染文案(紧贴图片下方,仅用内边距留白)
if len(texts) > 0 {
html.WriteString(`<div class="text-block">`)
// 段落之间用 <br> 而不是 <br><br>,减少空行
html.WriteString(strings.Join(texts, "<br>"))
html.WriteString(`</div>`)
}
html.WriteString(`
</div>
</body>
</html>
`)
return html.String()
}

View File

@@ -0,0 +1,147 @@
package node
import (
"ai-agent/workflow/consts/node"
nodeDto "ai-agent/workflow/model/dto/node"
"context"
"fmt"
"gitea.com/red-future/common/beans"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/frame/g"
)
var NodeLibraryService = &nodeLibraryService{}
type nodeLibraryService struct{}
func (s *nodeLibraryService) GetNodeLibrary(ctx context.Context, req *nodeDto.WorkflowNodeTreeReq) (*nodeDto.WorkflowNodeTreeRes, error) {
WorkflowNodeGroups := []node.NodeGroupItem{
{
Group: node.NodeGroupComponent,
Label: node.NodeGroupNameComponent,
Items: []node.NodeItem{
{
NodeCode: node.NodeTypeTextModel,
NodeName: node.NodeNameTextModel,
SkillOption: true,
FormConfig: []node.NodeFormField{}, // 技能下拉
ModelConfig: []node.ModelItem{},
},
{
NodeCode: node.NodeTypeImageModel,
NodeName: node.NodeNameImageModel,
SkillOption: true,
FormConfig: []node.NodeFormField{}, // 技能下拉
ModelConfig: []node.ModelItem{},
},
},
},
{
Group: node.NodeGroupBase,
Label: node.NodeGroupNameBase,
Items: []node.NodeItem{
{
NodeCode: node.NodeTypeMerge,
NodeName: node.NodeNameMerge,
SkillOption: false,
FormConfig: []node.NodeFormField{},
ModelConfig: []node.ModelItem{},
},
{
NodeCode: node.NodeTypeJudge,
NodeName: node.NodeNameJudge,
SkillOption: false,
FormConfig: []node.NodeFormField{
{Field: "condition", Label: node.FormLabelCondition, Type: "input", Required: true},
},
ModelConfig: []node.ModelItem{},
},
{
NodeCode: node.NodeTypeForm,
NodeName: node.NodeNameForm,
SkillOption: false,
FormConfig: []node.NodeFormField{},
ModelConfig: []node.ModelItem{},
},
//{
// NodeCode: node.NodeTypeModel,
// NodeName: node.NodeNameModel,
// SkillOption: true,
// FormConfig: []node.NodeFormField{},
// ModelConfig: []node.ModelItem{},
//},
},
},
{
Group: node.NodeGroupCustom,
Label: node.NodeGroupNameCustom,
Items: []node.NodeItem{
{
NodeCode: node.NodeTypeCustomNode,
NodeName: node.NodeNameCustomNode,
SkillOption: true,
FormConfig: []node.NodeFormField{
{Field: "nodeName", Label: node.FormLabelApiKey, Type: "input", Required: true},
{Field: "nodeType", Label: node.FormLabelModel, Type: "input", Required: true},
},
ModelConfig: []node.ModelItem{},
},
},
},
}
tree := &nodeDto.WorkflowNodeTreeRes{
Groups: WorkflowNodeGroups,
}
// 3. 遍历分组,根据 typeId=1 给【文本模型节点】追加固定表单
for gIdx := range tree.Groups {
group := &tree.Groups[gIdx]
// 遍历分组下的每个节点
for itemIdx := range group.Items {
item := &group.Items[itemIdx]
if item.NodeCode == node.NodeTypeTextModel {
item.ModelConfig = append(item.ModelConfig, node.ModelItem{
ModelName: "自定义",
})
}
if item.NodeCode == node.NodeTypeImageModel {
item.ModelConfig = append(item.ModelConfig, node.ModelItem{
ModelName: "自定义",
})
}
}
}
return tree, nil
}
// SetUserInfo 设置用户信息
func (s *nodeLibraryService) SetUserInfo(ctx context.Context, creator string, tenantId uint64) (headers map[string]string, err error) {
// 创建完整的用户信息
userInfo := &beans.User{
UserName: creator,
TenantId: tenantId,
}
ctx = context.WithValue(ctx, "user", *userInfo)
// 提取并保存请求头(在连接升级前)
headers = make(map[string]string)
// 提取其他headers
if r := g.RequestFromCtx(ctx); r != nil {
for k, v := range r.Request.Header {
if len(v) > 0 {
headers[k] = v[0]
}
}
}
// 将完整用户信息序列化为JSON放到X-User-Info请求头
userInfoJson, err := gjson.Encode(userInfo)
if err != nil {
return nil, fmt.Errorf("用户信息序列化失败: %w", err)
}
headers["X-User-Info"] = string(userInfoJson)
return
}

View File

@@ -0,0 +1,38 @@
package skill
import (
skillDao "ai-agent/workflow/dao/skill"
skillDto "ai-agent/workflow/model/dto/skill"
"context"
"github.com/gogf/gf/v2/util/gconv"
)
var SkillTemplateService = &skillTemplateService{}
type skillTemplateService struct{}
func (s *skillTemplateService) Create(ctx context.Context, req *skillDto.CreateSkillTemplateReq) (res *skillDto.CreateSkillTemplateRes, err error) {
id, err := skillDao.SkillTemplateDao.Insert(ctx, req)
if err != nil {
return
}
return &skillDto.CreateSkillTemplateRes{Id: id}, nil
}
func (s *skillTemplateService) Delete(ctx context.Context, req *skillDto.DeleteSkillTemplateReq) (err error) {
_, err = skillDao.SkillTemplateDao.Delete(ctx, req)
return
}
func (s *skillTemplateService) List(ctx context.Context, req *skillDto.ListSkillTemplateReq) (res *skillDto.ListSkillTemplateRes, err error) {
list, total, err := skillDao.SkillTemplateDao.List(ctx, req)
if err != nil {
return nil, err
}
res = &skillDto.ListSkillTemplateRes{
Total: total,
}
err = gconv.Struct(list, &res.List)
return
}

View File

@@ -0,0 +1,130 @@
package skill
import (
skillDao "ai-agent/workflow/dao/skill"
skillDto "ai-agent/workflow/model/dto/skill"
"ai-agent/workflow/model/entity"
"context"
commonHttp "gitea.com/red-future/common/http"
"gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
)
var SkillUserService = &skillUserService{}
type skillUserService struct{}
// IsAdmin 调用admin-go服务检查是否是管理员
func IsAdmin(ctx context.Context) (res bool, err error) {
headers := make(map[string]string)
if r := g.RequestFromCtx(ctx); r != nil {
for k, v := range r.Request.Header {
if len(v) > 0 {
headers[k] = v[0]
}
}
}
var r = make(map[string]bool)
if err = commonHttp.Get(ctx, "admin-go/api/v1/system/user/checkIsSuperAdmin", headers, &r); err != nil {
return false, err
}
return r["isSuperAdmin"], err
}
func (s *skillUserService) Create(ctx context.Context, req *skillDto.CreateSkillUserReq) (res *skillDto.CreateSkillUserRes, err error) {
admin, err := IsAdmin(ctx)
if err != nil {
return
}
var id int64
if admin {
id, err = skillDao.SkillTemplateDao.Insert(ctx, &skillDto.CreateSkillTemplateReq{
Name: req.Name,
Description: req.Description,
Category: req.Category,
FileName: req.FileName,
FileUrl: req.FileUrl,
})
} else {
id, err = skillDao.SkillUserDao.Insert(ctx, req)
}
return &skillDto.CreateSkillUserRes{Id: id}, err
}
func (s *skillUserService) Delete(ctx context.Context, req *skillDto.DeleteSkillUserReq) (err error) {
admin, err := IsAdmin(ctx)
if err != nil {
return
}
if admin {
_, err = skillDao.SkillTemplateDao.Delete(ctx, &skillDto.DeleteSkillTemplateReq{
Id: req.Id,
})
} else {
_, err = skillDao.SkillUserDao.Delete(ctx, req)
}
return
}
func (s *skillUserService) List(ctx context.Context, req *skillDto.ListSkillReq) (res *skillDto.ListSkillUserRes, err error) {
admin, err := IsAdmin(ctx)
if err != nil {
return
}
if admin {
var total int
var list []*entity.SkillTemplate
list, total, err = skillDao.SkillTemplateDao.List(ctx, &skillDto.ListSkillTemplateReq{
Keyword: req.Keyword,
Page: req.Page,
})
if err != nil {
return nil, err
}
res = &skillDto.ListSkillUserRes{
Total: total,
}
err = gconv.Struct(list, &res.List)
return
}
user, err := utils.GetUserInfo(ctx)
if err != nil {
return
}
req.Creator = user.UserName
list, total, err := skillDao.SkillUserDao.List(ctx, &skillDto.ListSkillUserReq{
Keyword: req.Keyword,
Creator: req.Creator,
Page: req.Page,
})
if err != nil {
return nil, err
}
res = &skillDto.ListSkillUserRes{
Total: total,
}
err = gconv.Struct(list, &res.List)
return
}
func (s *skillUserService) ListUser(ctx context.Context, req *skillDto.ListSkillUserReq) (res *skillDto.ListSkillUserRes, err error) {
user, err := utils.GetUserInfo(ctx)
if err != nil {
return
}
req.Creator = user.UserName
list, total, err := skillDao.SkillUserDao.List(ctx, req)
if err != nil {
return nil, err
}
res = &skillDto.ListSkillUserRes{
Total: total,
}
err = gconv.Struct(list, &res.List)
return
}