diff --git a/update.sql b/update.sql index 475e3d6..dcd7ba4 100644 --- a/update.sql +++ b/update.sql @@ -620,7 +620,9 @@ CREATE TABLE IF NOT EXISTS black_deacon_node_execution ( node_name VARCHAR(128) NOT NULL DEFAULT '', -- 节点名称 node_group_id VARCHAR(64) NOT NULL DEFAULT '', -- 节点分组ID input_params JSONB DEFAULT '{}', -- 节点输入参数 + input_params_path VARCHAR(256) DEFAULT '', -- 节点输入参数路径 output_params JSONB DEFAULT '{}', -- 节点输出参数 + output_params_path VARCHAR(256) DEFAULT '', prompt_tokens INTEGER NOT NULL DEFAULT 0, -- 提示词token消耗 completion_tokens INTEGER NOT NULL DEFAULT 0, -- 补全token消耗 total_tokens INTEGER NOT NULL DEFAULT 0, -- 总token消耗 @@ -650,7 +652,9 @@ COMMENT ON COLUMN black_deacon_node_execution.node_id IS '节点ID'; COMMENT ON COLUMN black_deacon_node_execution.node_name IS '节点名称'; COMMENT ON COLUMN black_deacon_node_execution.node_group_id IS '节点分组ID'; COMMENT ON COLUMN black_deacon_node_execution.input_params IS '节点输入参数'; +COMMENT ON COLUMN black_deacon_node_execution.input_params_path IS '节点输入参数路径'; COMMENT ON COLUMN black_deacon_node_execution.output_params IS '节点输出参数'; +COMMENT ON COLUMN black_deacon_node_execution.output_params_path IS '节点输出参数路径'; COMMENT ON COLUMN black_deacon_node_execution.prompt_tokens IS '提示词token消耗'; COMMENT ON COLUMN black_deacon_node_execution.completion_tokens IS '补全token消耗'; COMMENT ON COLUMN black_deacon_node_execution.total_tokens IS '总token消耗'; diff --git a/workflow/model/dto/node/node_execution_dto.go b/workflow/model/dto/node/node_execution_dto.go index 8811e65..e91ec1c 100644 --- a/workflow/model/dto/node/node_execution_dto.go +++ b/workflow/model/dto/node/node_execution_dto.go @@ -11,13 +11,16 @@ import ( // CreateNodeExecutionReq 创建节点执行记录请求 type CreateNodeExecutionReq struct { - g.Meta `path:"/create" method:"post" tags:"节点执行记录" summary:"创建节点执行记录" dc:"创建节点执行记录"` - FlowExecutionId int64 `json:"flowExecutionId" v:"required#流程执行ID不能为空"` - NodeId string `json:"nodeId" v:"required#节点ID不能为空"` - NodeName string `json:"nodeName"` - NodeGroupId string `json:"nodeGroupId"` - Status node.NodeExecutionStatus `json:"status"` - InputParams *flowDto.NodeExecutionInput `json:"inputParams"` + g.Meta `path:"/create" method:"post" tags:"节点执行记录" summary:"创建节点执行记录" dc:"创建节点执行记录"` + FlowExecutionId int64 `json:"flowExecutionId" v:"required#流程执行ID不能为空"` + NodeId string `json:"nodeId" v:"required#节点ID不能为空"` + NodeName string `json:"nodeName"` + NodeGroupId string `json:"nodeGroupId"` + Status node.NodeExecutionStatus `json:"status"` + InputParams *flowDto.NodeExecutionInput `json:"inputParams"` + InputParamsPath string + OutputParams *flowDto.NodeExecutionInput `json:"outputParams"` + OutputParamsPath string } type CreateNodeExecutionRes struct { @@ -29,12 +32,15 @@ type UpdateNodeExecutionReq struct { g.Meta `path:"/update" method:"put" tags:"节点执行记录" summary:"更新节点执行记录" dc:"更新节点执行记录状态和结果"` Id int64 `json:"id" v:"required#ID不能为空"` InputParams *flowDto.NodeExecutionInput `json:"inputParams"` - PromptTokens int `json:"promptTokens"` - CompletionTokens int `json:"completionTokens"` - TotalTokens int `json:"totalTokens"` - Status node.NodeExecutionStatus `json:"status"` - DurationMs int64 `json:"durationMs"` - ErrorMessage string `json:"errorMessage"` + InputParamsPath string + OutputParams *flowDto.NodeExecutionInput `json:"outputParams"` + OutputParamsPath string + PromptTokens int `json:"promptTokens"` + CompletionTokens int `json:"completionTokens"` + TotalTokens int `json:"totalTokens"` + Status node.NodeExecutionStatus `json:"status"` + DurationMs int64 `json:"durationMs"` + ErrorMessage string `json:"errorMessage"` } // DeleteNodeExecutionReq 删除节点执行记录请求 diff --git a/workflow/model/entity/node_execution.go b/workflow/model/entity/node_execution.go index f7db132..b8b5e08 100644 --- a/workflow/model/entity/node_execution.go +++ b/workflow/model/entity/node_execution.go @@ -16,7 +16,9 @@ type NodeExecution struct { NodeName string `orm:"node_name" json:"nodeName" description:"节点名称"` NodeGroupId string `orm:"node_group_id" json:"nodeGroupId" description:"节点组ID"` InputParams map[string]interface{} `orm:"input_params" json:"inputParams" description:"节点输入参数"` + InputParamsPath string `orm:"input_params_path" json:"inputParamsPath" description:"节点输入参数路径"` OutputParams map[string]interface{} `orm:"output_params" json:"outputParams" description:"节点输出参数"` + OutputParamsPath string `orm:"output_params_path" json:"outputParamsPath" description:"节点输出参数路径"` PromptTokens int `orm:"prompt_tokens" json:"promptTokens" description:"提示词token消耗"` CompletionTokens int `orm:"completion_tokens" json:"completionTokens" description:"补全token消耗"` TotalTokens int `orm:"total_tokens" json:"totalTokens" description:"总token消耗"` @@ -32,7 +34,9 @@ type nodeExecutionCol struct { NodeName string NodeGroupId string InputParams string + InputParamsPath string OutputParams string + OutputParamsPath string PromptTokens string CompletionTokens string TotalTokens string @@ -48,7 +52,9 @@ var NodeExecutionCol = nodeExecutionCol{ NodeName: "node_name", NodeGroupId: "node_group_id", InputParams: "input_params", + InputParamsPath: "input_params_path", OutputParams: "output_params", + OutputParamsPath: "output_params_path", PromptTokens: "prompt_tokens", CompletionTokens: "completion_tokens", TotalTokens: "total_tokens", diff --git a/workflow/service/flow/flow_execution_service.go b/workflow/service/flow/flow_execution_service.go index 6de8a0e..3bc103f 100644 --- a/workflow/service/flow/flow_execution_service.go +++ b/workflow/service/flow/flow_execution_service.go @@ -6,6 +6,7 @@ import ( fileDao "ai-agent/workflow/dao/file" flowDao "ai-agent/workflow/dao/flow" nodeDao "ai-agent/workflow/dao/node" + "ai-agent/workflow/model/dto" fileDto "ai-agent/workflow/model/dto/file" flowDto "ai-agent/workflow/model/dto/flow" nodeDto "ai-agent/workflow/model/dto/node" @@ -633,12 +634,22 @@ func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNod } // ✅ 插入节点执行记录,初始状态为运行中 startTime := time.Now() + + // 上传OSS(每条独立上传) + ossResult, err := Upload(ctx, &dto.UploadFileBytesReq{ + FileBytes: gconv.Bytes(gconv.String(realInput)), + FileName: fmt.Sprintf("nodeInput:%v.txt", time.Now().UnixMilli()), + }) + if err != nil { + return nil, err + } + nodeExecutionId, err := nodeDao.NodeExecutionDao.Insert(ctx, &nodeDto.CreateNodeExecutionReq{ FlowExecutionId: execInput.ExecutionId, NodeId: nodeID, NodeName: flowNode.Name, NodeGroupId: execInput.NodeGroupId, - InputParams: realInput, + InputParamsPath: ossResult.FileURL, Status: node.NodeExecutionStatusRunning.Code(), }) if err != nil { @@ -653,10 +664,18 @@ func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNod // 执行节点 _, err = lambda(ctx, realInput) durationMs := time.Since(startTime).Milliseconds() + // 上传OSS(每条独立上传) + ossResult1, err := Upload(ctx, &dto.UploadFileBytesReq{ + FileBytes: gconv.Bytes(gconv.String(realInput)), + FileName: fmt.Sprintf("nodeInput:%v.txt", time.Now().UnixMilli()), + }) + if err != nil { + return nil, err + } updateReq := &nodeDto.UpdateNodeExecutionReq{ - Id: nodeExecutionId, - InputParams: realInput, - DurationMs: durationMs, + Id: nodeExecutionId, + OutputParamsPath: ossResult1.FileURL, + DurationMs: durationMs, } if err != nil { // 执行失败,更新状态