From d5206df131db4a040109d2b4012fc8eaa4ac42fa Mon Sep 17 00:00:00 2001 From: qhd <1766646056@qq.com> Date: Fri, 15 May 2026 18:30:44 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E5=AF=B9=E8=AF=9D?= =?UTF-8?q?=E5=BC=8F=E5=B7=A5=E4=BD=9C=E6=B5=81=E8=8A=82=E7=82=B9=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E4=B8=8E=E7=BB=93=E6=9E=9C=E5=90=88=E5=B9=B6=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- workflow/model/dto/flow/flow_execution_dto.go | 1 + .../service/flow/flow_execution_service.go | 618 ++++++++++++++---- workflow/service/flow/lambda_node.go | 391 ++++++----- workflow/service/flow/lambda_node_util.go | 274 ++++++++ 4 files changed, 957 insertions(+), 327 deletions(-) diff --git a/workflow/model/dto/flow/flow_execution_dto.go b/workflow/model/dto/flow/flow_execution_dto.go index 5ae5841..a74eb2f 100644 --- a/workflow/model/dto/flow/flow_execution_dto.go +++ b/workflow/model/dto/flow/flow_execution_dto.go @@ -137,6 +137,7 @@ type ExecuteReq struct { Desc string `json:"desc"` SkillName string `json:"skillName"` FileUrl []string `json:"fileUrl"` + ResultUrl string `json:"resultUrl"` } type ExecuteRes struct { diff --git a/workflow/service/flow/flow_execution_service.go b/workflow/service/flow/flow_execution_service.go index e6144d7..602fb04 100644 --- a/workflow/service/flow/flow_execution_service.go +++ b/workflow/service/flow/flow_execution_service.go @@ -5,16 +5,20 @@ import ( "ai-agent/workflow/consts/node" fileDao "ai-agent/workflow/dao/file" flowDao "ai-agent/workflow/dao/flow" + "ai-agent/workflow/model/dto" fileDto "ai-agent/workflow/model/dto/file" flowDto "ai-agent/workflow/model/dto/flow" "ai-agent/workflow/model/entity" "context" "errors" "fmt" + "os" + "regexp" "sort" "strconv" "strings" "sync" + "time" "gitea.com/red-future/common/utils" "github.com/cloudwego/eino/compose" @@ -351,134 +355,512 @@ func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.Execute } } - //_, err = flowDao.FlowUserDao.Update(ctx, &flowDto.UpdateFlowUserReq{ - // Id: req.FlowId, - // FlowContent: req.FlowContent, - // NodeInputParams: req.NodeInputParams, - //}) - //if err != nil { - // return nil, err - //} - - //nodeInsert := make([]*nodeDto.CreateNodeExecutionReq, 0, len(flowInfo.NodeInputParams)) - //for _, flowNode := range flowInfo.NodeInputParams { - // nodeInsert = append(nodeInsert, &nodeDto.CreateNodeExecutionReq{ - // FlowExecutionId: executionId, - // NodeId: flowNode.Id, - // Status: node.NodeExecutionStatusWait.Code(), - // NodeInputParams: flowNode, - // TraceId: r.TraceId, - // }) - //} - //_, err = nodeDao.NodeExecutionDao.BatchInsert(ctx, nodeInsert) - //if err != nil { - // return - //} - - // ========================================================================= - // ✅【第1步】给所有判断节点自动生成意图识别节点 - // ========================================================================= - judge2IntentNodeMap := make(map[string]string) - finalNodes := make([]entity.FlowNode, 0, len(req.FlowContent.Nodes)*2) - for _, item := range req.FlowContent.Nodes { - finalNodes = append(finalNodes, item) - // 判断节点自动加 intent 节点 - if item.NodeCode == node.NodeTypeJudge { - intentNodeID := fmt.Sprintf("intent_%s", item.Id) - intentNode := entity.FlowNode{ - Id: intentNodeID, - NodeCode: node.NodeTypeIntent, - Name: fmt.Sprintf("意图识别-%s", item.Name), - InputSource: item.InputSource, // ✅ 正确赋值 - FormConfig: item.FormConfig, // ✅ 用户配置 - ModelConfig: item.ModelConfig, // ✅ 系统配置 + if isDialogue && !g.IsEmpty(flowInfo) { + // 查询节点中是否包含结果合并节点 + var htmlUrl []string + var textNodeId string + var textModelName string + var textModelResponse map[string]any + textResultFrom := make(map[string]any) + var imgNodeId string + var imgModelName string + var imgModelResponse map[string]any + imgResultFrom := make(map[string]any) + for _, item := range flowInfo.NodeInputParams { + if item.NodeCode == node.NodeTypeMerge { + for _, outputParamsItem := range flowInfo.OutputParams { + outputParamsMap := gconv.Map(outputParamsItem) + for _, mapItem := range outputParamsMap { + if strings.HasSuffix(gconv.String(mapItem), ".html") { + htmlUrl = append(htmlUrl, gconv.String(mapItem)) + } + } + } + } + if item.NodeCode == node.NodeTypeTextModel { + textNodeId = item.Id + textModelName = item.ModelConfig.ModelName + textModelResponse = item.ModelConfig.ModelResponse + for key, modelFormItem := range item.ModelConfig.ModelForm { + textResultFrom[key] = map[string]any{ + "value": modelFormItem, + } + } + } + if item.NodeCode == node.NodeTypeImageModel { + imgNodeId = item.Id + imgModelName = item.ModelConfig.ModelName + imgModelResponse = item.ModelConfig.ModelResponse + for key, modelFormItem := range item.ModelConfig.ModelForm { + imgResultFrom[key] = map[string]any{ + "value": modelFormItem, + } + } } - finalNodes = append(finalNodes, intentNode) - judge2IntentNodeMap[item.Id] = intentNodeID } - } - - summaryNodeID := "summary_node" - summaryNode := entity.FlowNode{ - Id: summaryNodeID, - NodeCode: node.NodeTypeCustomNode, // 复用自定义节点类型,也可新增专属类型 - Name: "结果汇总节点", - InputSource: []entity.FlowNodeInputSource{}, // 后续自动聚合所有节点输出 - FormConfig: nil, - ModelConfig: node.ModelItem{}, - } - finalNodes = append(finalNodes, summaryNode) - - // 替换节点列表 - req.FlowContent.Nodes = finalNodes - - // ========================================================================= - // ✅【第2步】构建执行图 - // ========================================================================= - runGraph, err := BuildGraphFromFlowContent(execCtx, req.FlowContent, judge2IntentNodeMap, summaryNodeID) - if err != nil { - executionReq := flowDto.UpdateFlowExecutionReq{ - Id: executionId, - Status: flow.FlowExecutionStatusFailed.Code(), - ErrorMessage: err.Error(), + var url string + url, err = utils.GetFileAddressPrefix(ctx) + if err != nil { + return nil, err } - _, err1 := flowDao.FlowExecutionDao.Update(ctx, &executionReq) - if err1 != nil { - return - } - return nil, fmt.Errorf("执行工作流失败: %v", err) - } + if strings.HasSuffix(gconv.String(req.ResultUrl), ".md") { + resultUserFrom := make(map[string]any) + resultUserFrom["desc"] = req.Desc - // ========================================================================= - // ✅【第3步】构建 ConfigMap - // ========================================================================= - configMap := make(map[string]*entity.FlowNode) - for _, cfg := range req.NodeInputParams { - configMap[cfg.Id] = cfg - } - // 自动给意图节点复制配置 - for judgeID, intentID := range judge2IntentNodeMap { - if cfg, ok := configMap[judgeID]; ok { - configMap[intentID] = cfg - } - } - // 初始化汇总节点配置 - configMap[summaryNodeID] = &summaryNode + var textNode []node.NodeFormField + textNode, err = TextNode(ctx, textNodeId, req.SessionId, textModelName, req.SkillName, textResultFrom, resultUserFrom, textModelResponse, req.FileUrl) + if err != nil { + return nil, err + } + var textContent []string + var textUrl []string + for _, item := range textNode { + if strings.Contains(item.Field, "text_content") { + textContent = append(textContent, item.Value) + } + if strings.Contains(item.Field, "text_url") { + textUrl = append(textUrl, item.Value) + } + } - // ========================================================================= - // ✅【第4步】构建全局执行入参(现在 schemaMap 是有值的!) - // ========================================================================= - execInput := &flowDto.FlowExecutionInput{ - IsDialogue: isDialogue, - ExecutionId: executionId, - ConfigMap: configMap, - SessionId: req.SessionId, - Desc: req.Desc, - SkillName: req.SkillName, - FileUrl: req.FileUrl, - } - // 执行工作流 - _, err = runGraph.Invoke(execCtx, execInput) - if err != nil { - // 检测是否是取消导致的错误 - if errors.Is(execCtx.Err(), context.Canceled) { + } + content := "" + // 第二步 执行目标节点 + if content == "text" { + resultUserFrom := make(map[string]any) + resultUserFrom["desc"] = req.Desc + + var textNode []node.NodeFormField + textNode, err = TextNode(ctx, textNodeId, req.SessionId, textModelName, req.SkillName, textResultFrom, resultUserFrom, textModelResponse, req.FileUrl) + if err != nil { + return nil, err + } + var htmlTags []string + var textUrl []string + for _, item := range textNode { + if strings.Contains(item.Field, "text_content") { + htmlTags = append(htmlTags, item.Value) + } + if strings.Contains(item.Field, "text_url") { + textUrl = append(textUrl, item.Value) + } + } + + var htmlContentUrl []string + if !g.IsEmpty(htmlUrl) { + for i, item := range htmlUrl { + // 获取当前要替换的文本内容 + textContent := htmlTags[i] + // 1. 读取 HTML 文件内容 + var htmlBytes []byte + htmlBytes, err = os.ReadFile(url + item) + if err != nil { + fmt.Printf("读取文件失败 %s: %v", url+item, err) + continue + } + htmlContent := string(htmlBytes) + // 2. 构建要替换成的新 div 标签 + newTextTag := fmt.Sprintf(`
%s
`, textContent) + re := regexp.MustCompile(`.*?`) + result := re.ReplaceAllString(htmlContent, newTextTag) + + fmt.Printf("成功处理文件:%s", result) + + // 上传OSS(每条独立上传) + fileName := fmt.Sprintf("item_%d_%d.html", i, time.Now().UnixMilli()) + var ossResult *dto.UploadFileBytesRes + ossResult, err = Upload(ctx, &dto.UploadFileBytesReq{ + FileBytes: []byte(result), + FileName: fileName, + }) + if err != nil { + return nil, err + } + fmt.Printf("上传OSS成功:%s", ossResult.FileURL) + htmlContentUrl = append(htmlContentUrl, ossResult.FileURL) + } + } + var summaryResult []map[string]interface{} + if !g.IsEmpty(textUrl) { + for _, outputParamsItem := range flowInfo.OutputParams { + if !g.IsEmpty(htmlContentUrl) { + if strings.HasSuffix(gconv.String(outputParamsItem), ".html") { + continue + } + } + if strings.HasSuffix(gconv.String(outputParamsItem), ".txt") { + continue + } + timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10) + item := make(map[string]interface{}) + item[timeKey] = outputParamsItem + summaryResult = append(summaryResult, item) + } + for _, textItem := range textUrl { + // 生成 毫秒时间戳 作为 KEY + timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10) + item := make(map[string]interface{}) + item[timeKey] = textItem + summaryResult = append(summaryResult, item) + } + } + if !g.IsEmpty(htmlContentUrl) { + for _, outputParamsItem := range flowInfo.OutputParams { + if !g.IsEmpty(textUrl) { + if strings.HasSuffix(gconv.String(outputParamsItem), ".txt") { + continue + } + } + if strings.HasSuffix(gconv.String(outputParamsItem), ".html") { + continue + } + timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10) + item := make(map[string]interface{}) + item[timeKey] = outputParamsItem + summaryResult = append(summaryResult, item) + } + for _, textItem := range htmlContentUrl { + // 生成 毫秒时间戳 作为 KEY + timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10) + item := make(map[string]interface{}) + item[timeKey] = textItem + summaryResult = append(summaryResult, item) + } + + } + if !g.IsEmpty(summaryResult) { + executionReq := flowDto.UpdateFlowExecutionReq{ + Id: flowInfo.Id, + Status: flow.FlowExecutionStatusSuccess.Code(), + OutputParams: summaryResult, + } + _, err = flowDao.FlowExecutionDao.Update(ctx, &executionReq) + } + } else if content == "img" { + resultUserFrom := make(map[string]any) + resultUserFrom["desc"] = req.Desc + + var imgNode []node.NodeFormField + imgNode, err = ImgNode(ctx, imgNodeId, req.SessionId, imgModelName, req.SkillName, imgResultFrom, resultUserFrom, imgModelResponse, req.FileUrl) + if err != nil { + return nil, err + } + + var imgCount int + var imgUrl []string + var htmlBuilder strings.Builder + htmlBuilder.WriteString(`
`) + for _, item := range imgNode { + if strings.Contains(item.Field, "img_url") { + imgCount = imgCount + 1 + htmlBuilder.WriteString(fmt.Sprintf(`图片`, item.Value)) + imgUrl = append(imgUrl, item.Value) + } + } + htmlBuilder.WriteString(`
`) + var htmlContentUrl []string + if !g.IsEmpty(htmlUrl) && imgCount > 0 { + for i, item := range htmlUrl { + // 1. 读取 HTML 文件内容 + var htmlBytes []byte + htmlBytes, err = os.ReadFile(url + item) + if err != nil { + fmt.Printf("读取文件失败 %s: %v", url+item, err) + continue + } + htmlContent := string(htmlBytes) + + re := regexp.MustCompile(`
[\s\S]*?
`) + result := re.ReplaceAllString(htmlContent, htmlBuilder.String()) + + fmt.Printf("成功处理文件:%s", result) + + // 上传OSS(每条独立上传) + fileName := fmt.Sprintf("item_%d_%d.html", i, time.Now().UnixMilli()) + var ossResult *dto.UploadFileBytesRes + ossResult, err = Upload(ctx, &dto.UploadFileBytesReq{ + FileBytes: []byte(result), + FileName: fileName, + }) + if err != nil { + return nil, err + } + fmt.Printf("上传OSS成功:%s", ossResult.FileURL) + htmlContentUrl = append(htmlContentUrl, ossResult.FileURL) + } + } + var summaryResult []map[string]interface{} + if !g.IsEmpty(imgCount) { + for _, outputParamsItem := range flowInfo.OutputParams { + if !g.IsEmpty(htmlContentUrl) { + if strings.HasSuffix(gconv.String(outputParamsItem), ".html") { + continue + } + } + if strings.HasSuffix(gconv.String(outputParamsItem), ".png") { + continue + } + timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10) + item := make(map[string]interface{}) + item[timeKey] = outputParamsItem + summaryResult = append(summaryResult, item) + } + for _, textItem := range imgUrl { + // 生成 毫秒时间戳 作为 KEY + timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10) + item := make(map[string]interface{}) + item[timeKey] = textItem + summaryResult = append(summaryResult, item) + } + } + if !g.IsEmpty(htmlContentUrl) { + for _, outputParamsItem := range flowInfo.OutputParams { + if !g.IsEmpty(imgUrl) { + if strings.HasSuffix(gconv.String(outputParamsItem), ".png") { + continue + } + } + if strings.HasSuffix(gconv.String(outputParamsItem), ".html") { + continue + } + timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10) + item := make(map[string]interface{}) + item[timeKey] = outputParamsItem + summaryResult = append(summaryResult, item) + } + for _, textItem := range htmlContentUrl { + // 生成 毫秒时间戳 作为 KEY + timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10) + item := make(map[string]interface{}) + item[timeKey] = textItem + summaryResult = append(summaryResult, item) + } + } + if !g.IsEmpty(summaryResult) { + executionReq := flowDto.UpdateFlowExecutionReq{ + Id: flowInfo.Id, + Status: flow.FlowExecutionStatusSuccess.Code(), + OutputParams: summaryResult, + } + _, err = flowDao.FlowExecutionDao.Update(ctx, &executionReq) + } + } else if content == "text_img" { + //userFrom := make(map[string]any) + //userFrom["desc"] = req.Desc + // + //var textNode []node.NodeFormField + //textNode, err = TextNode(ctx, textNodeId, req.SessionId, textModelName, req.SkillName, textResultFrom, userFrom, textModelResponse, req.FileUrl) + //if err != nil { + // return nil, err + //} + // + //var htmlTags []string + //var textUrl []string + //for _, item := range textNode { + // if strings.Contains(item.Field, "text_content") { + // htmlTags = append(htmlTags, StripHtmlTags(item.Value, false)) + // } + // if strings.Contains(item.Field, "text_url") { + // textUrl = append(textUrl, item.Value) + // } + //} + // + //userFrom["prompt"] = htmlTags + //var imgNode []node.NodeFormField + //imgNode, err = ImgNode(ctx, imgNodeId, req.SessionId, imgModelName, req.SkillName, imgResultFrom, userFrom, imgModelResponse, req.FileUrl) + //if err != nil { + // return nil, err + //} + //var imgCount int + //var imgUrl []string + //var htmlBuilder strings.Builder + //htmlBuilder.WriteString(`
`) + //for _, item := range imgNode { + // if strings.Contains(item.Field, "img_url") { + // imgCount = imgCount + 1 + // htmlBuilder.WriteString(fmt.Sprintf(`图片`, item.Value)) + // imgUrl = append(imgUrl, item.Value) + // } + //} + //htmlBuilder.WriteString(`
`) + // + //var htmlContentUrl []string + //if !g.IsEmpty(htmlUrl) && imgCount > 0 { + // for i, item := range htmlUrl { + // // 获取当前要替换的文本内容 + // textContent := htmlTags[i] + // // 1. 读取 HTML 文件内容 + // var htmlBytes []byte + // htmlBytes, err = os.ReadFile(url + item) + // if err != nil { + // fmt.Printf("读取文件失败 %s: %v", url+item, err) + // continue + // } + // htmlContent := string(htmlBytes) + // + // re := regexp.MustCompile(`
[\s\S]*?
`) + // result := re.ReplaceAllString(htmlContent, htmlBuilder.String()) + // // 2. 构建要替换成的新 div 标签 + // newTextTag := fmt.Sprintf(`
%s
`, textContent) + // ret := regexp.MustCompile(`.*?`) + // result = ret.ReplaceAllString(htmlContent, newTextTag) + // fmt.Printf("成功处理文件:%s", result) + // + // // 上传OSS(每条独立上传) + // fileName := fmt.Sprintf("item_%d_%d.html", i, time.Now().UnixMilli()) + // var ossResult *dto.UploadFileBytesRes + // ossResult, err = Upload(ctx, &dto.UploadFileBytesReq{ + // FileBytes: []byte(result), + // FileName: fileName, + // }) + // if err != nil { + // return nil, err + // } + // fmt.Printf("上传OSS成功:%s", ossResult.FileURL) + // htmlContentUrl = append(htmlContentUrl, ossResult.FileURL) + // } + //} + //if !g.IsEmpty(htmlContentUrl) { + // for _, outputParamsItem := range flowInfo.OutputParams { + // if !g.IsEmpty(imgUrl) { + // if strings.HasSuffix(gconv.String(outputParamsItem), ".png") { + // continue + // } + // } + // if strings.HasSuffix(gconv.String(outputParamsItem), ".html") { + // continue + // } + // timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10) + // item := make(map[string]interface{}) + // item[timeKey] = outputParamsItem + // summaryResult = append(summaryResult, item) + // } + // for _, textItem := range htmlContentUrl { + // // 生成 毫秒时间戳 作为 KEY + // timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10) + // item := make(map[string]interface{}) + // item[timeKey] = textItem + // summaryResult = append(summaryResult, item) + // } + //} + //if !g.IsEmpty(summaryResult) { + // executionReq := flowDto.UpdateFlowExecutionReq{ + // Id: flowInfo.Id, + // Status: flow.FlowExecutionStatusSuccess.Code(), + // OutputParams: summaryResult, + // } + // _, err = flowDao.FlowExecutionDao.Update(ctx, &executionReq) + //} + } else { + return nil, fmt.Errorf("意图识别失败") + } + } else { + // ========================================================================= + // ✅【第1步】给所有判断节点自动生成意图识别节点 + // ========================================================================= + judge2IntentNodeMap := make(map[string]string) + finalNodes := make([]entity.FlowNode, 0, len(req.FlowContent.Nodes)*2) + for _, item := range req.FlowContent.Nodes { + finalNodes = append(finalNodes, item) + // 判断节点自动加 intent 节点 + if item.NodeCode == node.NodeTypeJudge { + intentNodeID := fmt.Sprintf("intent_%s", item.Id) + intentNode := entity.FlowNode{ + Id: intentNodeID, + NodeCode: node.NodeTypeIntent, + Name: fmt.Sprintf("意图识别-%s", item.Name), + InputSource: item.InputSource, // ✅ 正确赋值 + FormConfig: item.FormConfig, // ✅ 用户配置 + ModelConfig: item.ModelConfig, // ✅ 系统配置 + } + finalNodes = append(finalNodes, intentNode) + judge2IntentNodeMap[item.Id] = intentNodeID + } + } + + summaryNodeID := "summary_node" + summaryNode := entity.FlowNode{ + Id: summaryNodeID, + NodeCode: node.NodeTypeCustomNode, // 复用自定义节点类型,也可新增专属类型 + Name: "结果汇总节点", + InputSource: []entity.FlowNodeInputSource{}, // 后续自动聚合所有节点输出 + FormConfig: nil, + ModelConfig: node.ModelItem{}, + } + finalNodes = append(finalNodes, summaryNode) + + // 替换节点列表 + req.FlowContent.Nodes = finalNodes + + // ========================================================================= + // ✅【第2步】构建执行图 + // ========================================================================= + var runGraph compose.Runnable[any, any] + runGraph, err = BuildGraphFromFlowContent(execCtx, req.FlowContent, judge2IntentNodeMap, summaryNodeID) + if err != nil { executionReq := flowDto.UpdateFlowExecutionReq{ - Id: executionId, - Status: flow.FlowExecutionStatusCancel.Code(), + Id: executionId, + Status: flow.FlowExecutionStatusFailed.Code(), + ErrorMessage: err.Error(), } - _, _ = flowDao.FlowExecutionDao.Update(ctx, &executionReq) - return nil, fmt.Errorf("工作流已被取消: %v", err) + _, err1 := flowDao.FlowExecutionDao.Update(ctx, &executionReq) + if err1 != nil { + return + } + return nil, fmt.Errorf("执行工作流失败: %v", err) } - executionReq := flowDto.UpdateFlowExecutionReq{ - Id: executionId, - Status: flow.FlowExecutionStatusFailed.Code(), - ErrorMessage: err.Error(), + + // ========================================================================= + // ✅【第3步】构建 ConfigMap + // ========================================================================= + configMap := make(map[string]*entity.FlowNode) + for _, cfg := range req.NodeInputParams { + configMap[cfg.Id] = cfg } - _, err1 := flowDao.FlowExecutionDao.Update(ctx, &executionReq) - if err1 != nil { - return + // 自动给意图节点复制配置 + for judgeID, intentID := range judge2IntentNodeMap { + if cfg, ok := configMap[judgeID]; ok { + configMap[intentID] = cfg + } + } + // 初始化汇总节点配置 + configMap[summaryNodeID] = &summaryNode + + // ========================================================================= + // ✅【第4步】构建全局执行入参(现在 schemaMap 是有值的!) + // ========================================================================= + execInput := &flowDto.FlowExecutionInput{ + IsDialogue: isDialogue, + ExecutionId: executionId, + ConfigMap: configMap, + SessionId: req.SessionId, + Desc: req.Desc, + SkillName: req.SkillName, + FileUrl: req.FileUrl, + } + // 执行工作流 + _, err = runGraph.Invoke(execCtx, execInput) + if err != nil { + // 检测是否是取消导致的错误 + if errors.Is(execCtx.Err(), context.Canceled) { + executionReq := flowDto.UpdateFlowExecutionReq{ + Id: executionId, + Status: flow.FlowExecutionStatusCancel.Code(), + } + _, _ = flowDao.FlowExecutionDao.Update(ctx, &executionReq) + return nil, fmt.Errorf("工作流已被取消: %v", err) + } + executionReq := flowDto.UpdateFlowExecutionReq{ + Id: executionId, + Status: flow.FlowExecutionStatusFailed.Code(), + ErrorMessage: err.Error(), + } + _, err1 := flowDao.FlowExecutionDao.Update(ctx, &executionReq) + if err1 != nil { + return + } + return nil, fmt.Errorf("执行工作流失败: %v", err) } - return nil, fmt.Errorf("执行工作流失败: %v", err) } return diff --git a/workflow/service/flow/lambda_node.go b/workflow/service/flow/lambda_node.go index bb4fb86..71e2d6d 100644 --- a/workflow/service/flow/lambda_node.go +++ b/workflow/service/flow/lambda_node.go @@ -12,7 +12,6 @@ import ( "ai-agent/workflow/model/entity" "context" "fmt" - "regexp" "strconv" "strings" "time" @@ -240,7 +239,7 @@ func TextModelLambda(ctx context.Context, input any) (any, error) { if field, ok := valueAny.(node.NodeFormField); ok { if !strings.Contains(field.Field, "text_url") && !strings.Contains(field.Field, "img_url") { if strings.Contains(field.Field, "text_content") { - field.Value = stripHtmlTags(field.Value, false) + field.Value = StripHtmlTags(field.Value, false) } resultUserFrom[field.Label] = field } @@ -277,7 +276,7 @@ func TextModelLambda(ctx context.Context, input any) (any, error) { if g.IsEmpty(nodeInput.Config.SkillName) { skillName = nodeInput.Global.SkillName } - contentStr := "你是专业内容生成助手,请严格按以下规则输出内容:1、输出标准 HTML 片段,不要 Markdown,不要 ``` 符号,不要多余解释,2、整体用
包裹,3、主标题使用

,4、章节标题使用

,5、正文段落使用

,6、列表使用

,7、重点内容使用 加粗,8、段落之间清晰分隔,结构规整,9、如果生成多条文案,每条文案独立用
包裹(序号从1开始),10、每条文案内部必须在最上方添加一行固定格式:

需要配图:N 张

N 是这条文案需要的图片数量,只能是数字,不能是其他文字,11、只输出 HTML 结构,不输出任何额外文字" + contentStr := "你是专业内容生成助手,请严格按以下规则输出内容:1、输出标准 HTML 片段,不要 Markdown,不要 ``` 符号,不要多余解释,2、整体用
包裹,3、主标题使用

,4、章节标题使用

,5、正文段落使用

,6、列表使用

  • ...
,7、重点内容使用 加粗,8、段落之间清晰分隔,结构规整,9、如果生成多条文案,每条文案独立用
包裹(序号从1开始),10、每条文案内部必须在最上方添加一行固定格式:

需要配图:N 张

N 是这条文案需要的图片数量,只能是数字,不能是其他文字,11、只输出 HTML 结构,不输出任何额外文字" resultUserFrom["prompt"] = contentStr req := flowDto.ComposeMessagesReq{ @@ -325,13 +324,14 @@ func TextModelLambda(ctx context.Context, input any) (any, error) { Value: content, Label: fmt.Sprintf("文案内容_%d", i), Type: "string", - Expand: extractImageCount(content), + Expand: ExtractImageCount(content), }) // 1. 去掉 HTML 标签,生成纯文本 - plainText := stripHtmlTags(content, true) + //plainText := StripHtmlTags(content, true) + plainText := BuildText(content) // 2. 上传纯文本到 OSS - textFileName := fmt.Sprintf("ai_text_%d_%d.txt", time.Now().UnixMilli(), i) + textFileName := fmt.Sprintf("ai_text_%d_%d.inc", time.Now().UnixMilli(), i) textUrl, err := Upload(ctx, &dto.UploadFileBytesReq{ FileBytes: []byte(plainText), FileName: textFileName, @@ -341,11 +341,11 @@ func TextModelLambda(ctx context.Context, input any) (any, error) { } // 3. 把纯文本地址存入输出 outputRes = append(outputRes, node.NodeFormField{ - Field: fmt.Sprintf("%v:text_url:%d", nodeInput.Config.Id, i), + Field: fmt.Sprintf("text_url_%d", i), Value: textUrl.FileURL, Label: fmt.Sprintf("文案纯文本_txt_%d", i), Type: "string", - Expand: extractImageCount(content), + Expand: ExtractImageCount(content), }) } nodeInput.Config.OutputResult = outputRes @@ -353,64 +353,6 @@ func TextModelLambda(ctx context.Context, input any) (any, error) { return nodeInput, nil } -// 从 HTML 内容里提取图片数量(例如从

需要配图:3 张

拿到 3) -func extractImageCount(content string) int { - re := regexp.MustCompile(`

[^\d]*(\d+)[^\d]*

`) - match := re.FindStringSubmatch(content) - if len(match) >= 2 { - num, _ := strconv.Atoi(match[1]) - return num - } - return 0 -} - -// stripHtmlTags 去掉所有HTML标签,保留换行和文本结构,并删除配图标记行 -func stripHtmlTags(html string, delImageCount bool) string { - if delImageCount { - // 🔥 第一步:直接删除整个

...

标签(包含内容) - imageTagRegex := regexp.MustCompile(`

[\s\S]*?

`) - html = imageTagRegex.ReplaceAllString(html, "") - } - - // 1. 替换块级标签为换行,保证排版 - blockTags := regexp.MustCompile(`]*>`) - text := blockTags.ReplaceAllString(html, "\n") - - // 2. 去掉所有剩余的 HTML 标签 - allTags := regexp.MustCompile(`<[^>]+>`) - text = allTags.ReplaceAllString(text, "") - - // 4. 清理多余空行(多个换行只保留一个) - text = regexp.MustCompile(`\n\s*\n`).ReplaceAllString(text, "\n") - - // 5. 只去掉首尾空白,中间换行保留 - text = strings.TrimSpace(text) - - return text -} - -// SplitMultiContents 拆分模型返回的多条文案(基于HTML标签分隔) -func SplitMultiContents(htmlContent string) []string { - var contents []string - // 正则匹配
包裹的内容 - re := regexp.MustCompile(`
([\s\S]*?)
`) - matches := re.FindAllStringSubmatch(htmlContent, -1) - for _, match := range matches { - if len(match) > 1 { - // 清理空内容 - trimmed := strings.TrimSpace(match[1]) - if trimmed != "" { - contents = append(contents, trimmed) - } - } - } - // 兜底:如果没有匹配到结构化内容,按换行/分隔符拆分 - if len(contents) == 0 { - contents = strings.Split(htmlContent, "===分隔符===") // 提示词中可新增此兜底规则 - } - return contents -} - // ImageModelLambda 构建图片 func ImageModelLambda(ctx context.Context, input any) (any, error) { nodeInput, ok := input.(*flowDto.NodeExecutionInput) @@ -432,7 +374,7 @@ func ImageModelLambda(ctx context.Context, input any) (any, error) { if field, ok := valueAny.(node.NodeFormField); ok { if !strings.Contains(field.Field, "text_url") && !strings.Contains(field.Field, "img_url") { if strings.Contains(field.Field, "text_content") { - field.Value = stripHtmlTags(field.Value, false) + field.Value = StripHtmlTags(field.Value, false) } resultUserFrom[field.Label] = field } @@ -509,9 +451,8 @@ func ImageModelLambda(ctx context.Context, input any) (any, error) { for _, item := range imgs { mapItem := gconv.Map(item) for _, value := range mapItem { - values := "" - values, ok = value.(string) - if !ok { + values, imgOk := value.(string) + if !imgOk { return nil, fmt.Errorf("图片地址类型错误") } // 下载官方临时图片 @@ -577,7 +518,7 @@ func MergeLambda(ctx context.Context, input any) (any, error) { // 2. 提取所有文案:text_content_0,1,2... var contents []node.NodeFormField for i := 0; ; i++ { - key := fmt.Sprintf("text_content_%d", i) + key := fmt.Sprintf("text_url_%d", i) val, has := dataMap[key] if !has || val.Value == "" { break @@ -668,143 +609,7 @@ func MergeLambda(ctx context.Context, input any) (any, error) { // 支持任意来源:文生图、图生文、单独文、单独图、文图合并 // 生成单条HTML - var htmlBuilder strings.Builder - htmlBuilder.WriteString(` - - - - - - - - -
-
-`) - - // 写入图片(支持0张、1张、多张) - if len(item.Images) > 0 { - htmlBuilder.WriteString(`
`) - for _, imgUrl := range item.Images { - htmlBuilder.WriteString(fmt.Sprintf(`图片`, imgUrl)) - } - htmlBuilder.WriteString(`
`) - } - - // 🔥 写入文案前:删除

需要配图:X 张

- if item.Content != "" { - // 正则删除整行 - re := regexp.MustCompile(`

需要配图:\d+ 张

`) - cleanContent := re.ReplaceAllString(item.Content, "") - - // 写入清理后的文案 - htmlBuilder.WriteString(fmt.Sprintf(`
%s
`, cleanContent)) - } - - htmlBuilder.WriteString(`
-
- -`) - htmlContent := htmlBuilder.String() + htmlContent := BuildHtml(item.Content, item.Images) // 上传OSS(每条独立上传) fileName := fmt.Sprintf("item_%d_%d.html", idx, time.Now().UnixMilli()) @@ -887,9 +692,9 @@ func SummaryLambda(ctx context.Context, input any) (any, error) { executionReq := flowDto.UpdateFlowExecutionReq{ Id: execInput.Global.ExecutionId, + Status: flow.FlowExecutionStatusSuccess.Code(), OutputParams: summaryResult, } - executionReq.Status = flow.FlowExecutionStatusSuccess.Code() _, err = flowDao.FlowExecutionDao.Update(ctx, &executionReq) if flowInfo != nil { @@ -939,3 +744,171 @@ func CustomLambda(ctx context.Context, input any) (any, error) { fmt.Println("CustomLambda:", input) return input, nil } + +func TextNode(ctx context.Context, nodeId, sessionId, modelName, skillName string, from, userFrom, modelResponse map[string]any, fileUrl []string) ([]node.NodeFormField, error) { + contentStr := "你是专业内容生成助手,请严格按以下规则输出内容:1、输出标准 HTML 片段,不要 Markdown,不要 ``` 符号,不要多余解释,2、整体用
包裹,3、主标题使用

,4、章节标题使用

,5、正文段落使用

,6、列表使用

  • ...
,7、重点内容使用 加粗,8、段落之间清晰分隔,结构规整,9、如果生成多条文案,每条文案独立用
包裹(序号从1开始),10、每条文案内部必须在最上方添加一行固定格式:

需要配图:N 张

N 是这条文案需要的图片数量,只能是数字,不能是其他文字,11、只输出 HTML 结构,不输出任何额外文字" + userFrom["prompt"] = contentStr + + textMsgReq := flowDto.ComposeMessagesReq{ + BuildType: 1, + ModelName: modelName, + SkillName: skillName, + Cause: "文案节点", + Form: from, + UserForm: userFrom, + UserFiles: fileUrl, + SessionId: sessionId, + } + msg, err := ComposeMessages(ctx, &textMsgReq) + if err != nil { + return nil, err + } + if g.IsEmpty(msg.Messages) { + return nil, fmt.Errorf("msg is empty") + } + + var taskResult any + taskResult, err = GatewayTask(ctx, msg.EpicycleId, modelName, msg.Messages) + if err != nil { + return nil, err + } + var getTaskResult *flowDto.TaskCallback + getTaskResult, err = GetTaskResult(ctx, taskResult) + if err != nil { + return nil, err + } + mapTaskResult := gconv.Map(getTaskResult.Text) + + resultContent := "" + for key, _ := range modelResponse { + resultContent = gconv.String(mapTaskResult[key]) + } + + // 拆分多条文案 + contentList := SplitMultiContents(resultContent) + + outputRes := make([]node.NodeFormField, 0) + for i, contentItem := range contentList { + outputRes = append(outputRes, node.NodeFormField{ + Field: fmt.Sprintf("text_content_%d", i), + Value: contentItem, + Label: fmt.Sprintf("文案内容_%d", i), + Type: "string", + Expand: ExtractImageCount(contentItem), + }) + + // 1. 去掉 HTML 标签,生成纯文本 + //plainText := StripHtmlTags(contentItem, true) + plainText := BuildHtml(contentItem, nil) + // 2. 上传纯文本到 OSS + textFileName := fmt.Sprintf("ai_text_%d_%d.md", time.Now().UnixMilli(), i) + var textUrl *dto.UploadFileBytesRes + textUrl, err = Upload(ctx, &dto.UploadFileBytesReq{ + FileBytes: []byte(plainText), + FileName: textFileName, + }) + if err != nil { + return nil, err + } + // 3. 把纯文本地址存入输出 + outputRes = append(outputRes, node.NodeFormField{ + Field: fmt.Sprintf("%v:text_url:%d", nodeId, i), + Value: textUrl.FileURL, + Label: fmt.Sprintf("文案纯文本_txt_%d", i), + Type: "string", + Expand: ExtractImageCount(contentItem), + }) + } + return outputRes, nil +} + +func ImgNode(ctx context.Context, nodeId, sessionId, modelName, skillName string, form, userForm, modelResponse map[string]any, fileUrl []string) ([]node.NodeFormField, error) { + imgMsgReq := flowDto.ComposeMessagesReq{ + BuildType: 1, + ModelName: modelName, + SkillName: skillName, + Cause: "图片节点", + Form: form, + UserForm: userForm, + UserFiles: fileUrl, + SessionId: sessionId, + } + msg, err := ComposeMessages(ctx, &imgMsgReq) + if err != nil { + return nil, err + } + if g.IsEmpty(msg.Messages) { + return nil, fmt.Errorf("msg is empty") + } + var taskResult any + taskResult, err = GatewayTask(ctx, msg.EpicycleId, modelName, msg.Messages) + if err != nil { + return nil, err + } + var getTaskResult *flowDto.TaskCallback + getTaskResult, err = GetTaskResult(ctx, taskResult) + if err != nil { + return nil, err + } + mapTaskResult := gconv.Map(getTaskResult.Text) + + var resultContent []string + for key, _ := range modelResponse { + resultContent = gconv.Strings(mapTaskResult[key]) + } + + var images []string + for _, item := range resultContent { + mapItem := gconv.Map(item) + for _, value := range mapItem { + values, ok := value.(string) + if !ok { + return nil, fmt.Errorf("图片地址类型错误") + } + // 下载官方临时图片 + var imgBytes []byte + imgBytes, _, err = GetImageBytesFromURL(values) + if err != nil { + return nil, fmt.Errorf("下载图片失败: %w", err) + } + // 构造文件名 + fileName := fmt.Sprintf("ai_image_%d.png", time.Now().UnixMilli()) + // 上传到你的OSS(你项目已有的Upload方法) + var upResp *dto.UploadFileBytesRes + upResp, err = Upload(ctx, &dto.UploadFileBytesReq{ + FileName: fileName, + FileBytes: imgBytes, + }) + if err != nil { + return nil, fmt.Errorf("上传OSS失败: %w", err) + } + images = append(images, upResp.FileURL) + } + } + + var url string + url, err = utils.GetFileAddressPrefix(ctx) + if err != nil { + return nil, err + } + outputRes := make([]node.NodeFormField, 0) + + for i, item := range images { + // 图片:image_0, image_1, image_2... + outputRes = append(outputRes, node.NodeFormField{ + Field: fmt.Sprintf("image_%d", i), + Value: fmt.Sprintf("%s%s", url, item), + Label: fmt.Sprintf("图片_%d", i), + Type: "string", + }) + // 额外存储关联关系 + outputRes = append(outputRes, node.NodeFormField{ + Field: fmt.Sprintf("%v:img_url:%d", nodeId, i), + Value: fmt.Sprintf("%s%s", url, item), + Label: fmt.Sprintf("图片_img_%d关联文案ID", i), + Type: "string", + }) + } + + return outputRes, nil +} diff --git a/workflow/service/flow/lambda_node_util.go b/workflow/service/flow/lambda_node_util.go index c9c05ab..6cae42a 100644 --- a/workflow/service/flow/lambda_node_util.go +++ b/workflow/service/flow/lambda_node_util.go @@ -9,6 +9,9 @@ import ( "io" "mime/multipart" "net/http" + "regexp" + "strconv" + "strings" commonHttp "gitea.com/red-future/common/http" "gitea.com/red-future/common/utils" @@ -160,3 +163,274 @@ func Upload(ctx context.Context, req *dto.UploadFileBytesReq) (*dto.UploadFileBy g.Log().Infof(ctx, "[Upload] success url=%s size=%d", res.FileURL, res.FileSize) return res, nil } + +func BuildText(text string) string { + // 生成单条HTML + var htmlBuilder strings.Builder + htmlBuilder.WriteString(` + + + + + + + + +
+
+`) + + //// 写入图片(支持0张、1张、多张) + //if len(images) > 0 { + // htmlBuilder.WriteString(`
`) + // for _, imgUrl := range images { + // htmlBuilder.WriteString(fmt.Sprintf(`图片`, imgUrl)) + // } + // htmlBuilder.WriteString(`
`) + //} + + // 🔥 写入文案前:删除

需要配图:X 张

+ if text != "" { + // 正则删除整行 + imageTagRegex := regexp.MustCompile(`

[\s\S]*?

`) + //re := regexp.MustCompile(`

需要配图:\d+ 张

`) + cleanContent := imageTagRegex.ReplaceAllString(text, "") + + // 写入清理后的文案 + htmlBuilder.WriteString(fmt.Sprintf(`
%s
`, cleanContent)) + } + + htmlBuilder.WriteString(`
+
+ +`) + + return htmlBuilder.String() +} + +func BuildHtml(text string, images []string) string { + var htmlBuilder strings.Builder + htmlBuilder.WriteString(` + + + + + + + +
+`) + // 写入图片(支持0张、1张、多张) + if len(images) > 0 { + htmlBuilder.WriteString(`
`) + for _, imgUrl := range images { + htmlBuilder.WriteString(fmt.Sprintf(`图片`, imgUrl)) + } + htmlBuilder.WriteString(`
`) + } + + htmlBuilder.WriteString(` +
加载中...
+
+ + + +`) + + return htmlBuilder.String() +} + +// ExtractImageCount 从 HTML 内容里提取图片数量(例如从

需要配图:3 张

拿到 3) +func ExtractImageCount(content string) int { + re := regexp.MustCompile(`

[^\d]*(\d+)[^\d]*

`) + match := re.FindStringSubmatch(content) + if len(match) >= 2 { + num, _ := strconv.Atoi(match[1]) + return num + } + return 0 +} + +// StripHtmlTags 去掉所有HTML标签,保留换行和文本结构,并删除配图标记行 +func StripHtmlTags(html string, delImageCount bool) string { + if delImageCount { + // 🔥 第一步:直接删除整个

...

标签(包含内容) + imageTagRegex := regexp.MustCompile(`

[\s\S]*?

`) + html = imageTagRegex.ReplaceAllString(html, "") + } + + // 1. 替换块级标签为换行,保证排版 + blockTags := regexp.MustCompile(`]*>`) + text := blockTags.ReplaceAllString(html, "\n") + + // 2. 去掉所有剩余的 HTML 标签 + allTags := regexp.MustCompile(`<[^>]+>`) + text = allTags.ReplaceAllString(text, "") + + // 4. 清理多余空行(多个换行只保留一个) + text = regexp.MustCompile(`\n\s*\n`).ReplaceAllString(text, "\n") + + // 5. 只去掉首尾空白,中间换行保留 + text = strings.TrimSpace(text) + + return text +} + +// SplitMultiContents 拆分模型返回的多条文案(基于HTML标签分隔) +func SplitMultiContents(htmlContent string) []string { + var contents []string + // 正则匹配
包裹的内容 + re := regexp.MustCompile(`
([\s\S]*?)
`) + matches := re.FindAllStringSubmatch(htmlContent, -1) + for _, match := range matches { + if len(match) > 1 { + // 清理空内容 + trimmed := strings.TrimSpace(match[1]) + if trimmed != "" { + contents = append(contents, trimmed) + } + } + } + // 兜底:如果没有匹配到结构化内容,按换行/分隔符拆分 + if len(contents) == 0 { + contents = strings.Split(htmlContent, "===分隔符===") // 提示词中可新增此兜底规则 + } + return contents +}