feat: 将节点输入输出参数上传至OSS存储

This commit is contained in:
2026-06-12 11:02:36 +08:00
parent ddc4e0be63
commit fba7d032ae
4 changed files with 52 additions and 17 deletions

View File

@@ -620,7 +620,9 @@ CREATE TABLE IF NOT EXISTS black_deacon_node_execution (
node_name VARCHAR(128) NOT NULL DEFAULT '', -- 节点名称 node_name VARCHAR(128) NOT NULL DEFAULT '', -- 节点名称
node_group_id VARCHAR(64) NOT NULL DEFAULT '', -- 节点分组ID node_group_id VARCHAR(64) NOT NULL DEFAULT '', -- 节点分组ID
input_params JSONB DEFAULT '{}', -- 节点输入参数 input_params JSONB DEFAULT '{}', -- 节点输入参数
input_params_path VARCHAR(256) DEFAULT '', -- 节点输入参数路径
output_params JSONB DEFAULT '{}', -- 节点输出参数 output_params JSONB DEFAULT '{}', -- 节点输出参数
output_params_path VARCHAR(256) DEFAULT '',
prompt_tokens INTEGER NOT NULL DEFAULT 0, -- 提示词token消耗 prompt_tokens INTEGER NOT NULL DEFAULT 0, -- 提示词token消耗
completion_tokens INTEGER NOT NULL DEFAULT 0, -- 补全token消耗 completion_tokens INTEGER NOT NULL DEFAULT 0, -- 补全token消耗
total_tokens INTEGER NOT NULL DEFAULT 0, -- 总token消耗 total_tokens INTEGER NOT NULL DEFAULT 0, -- 总token消耗
@@ -650,7 +652,9 @@ COMMENT ON COLUMN black_deacon_node_execution.node_id IS '节点ID';
COMMENT ON COLUMN black_deacon_node_execution.node_name IS '节点名称'; COMMENT ON COLUMN black_deacon_node_execution.node_name IS '节点名称';
COMMENT ON COLUMN black_deacon_node_execution.node_group_id IS '节点分组ID'; COMMENT ON COLUMN black_deacon_node_execution.node_group_id IS '节点分组ID';
COMMENT ON COLUMN black_deacon_node_execution.input_params IS '节点输入参数'; COMMENT ON COLUMN black_deacon_node_execution.input_params IS '节点输入参数';
COMMENT ON COLUMN black_deacon_node_execution.input_params_path IS '节点输入参数路径';
COMMENT ON COLUMN black_deacon_node_execution.output_params IS '节点输出参数'; COMMENT ON COLUMN black_deacon_node_execution.output_params IS '节点输出参数';
COMMENT ON COLUMN black_deacon_node_execution.output_params_path IS '节点输出参数路径';
COMMENT ON COLUMN black_deacon_node_execution.prompt_tokens IS '提示词token消耗'; COMMENT ON COLUMN black_deacon_node_execution.prompt_tokens IS '提示词token消耗';
COMMENT ON COLUMN black_deacon_node_execution.completion_tokens IS '补全token消耗'; COMMENT ON COLUMN black_deacon_node_execution.completion_tokens IS '补全token消耗';
COMMENT ON COLUMN black_deacon_node_execution.total_tokens IS '总token消耗'; COMMENT ON COLUMN black_deacon_node_execution.total_tokens IS '总token消耗';

View File

@@ -11,13 +11,16 @@ import (
// CreateNodeExecutionReq 创建节点执行记录请求 // CreateNodeExecutionReq 创建节点执行记录请求
type CreateNodeExecutionReq struct { type CreateNodeExecutionReq struct {
g.Meta `path:"/create" method:"post" tags:"节点执行记录" summary:"创建节点执行记录" dc:"创建节点执行记录"` g.Meta `path:"/create" method:"post" tags:"节点执行记录" summary:"创建节点执行记录" dc:"创建节点执行记录"`
FlowExecutionId int64 `json:"flowExecutionId" v:"required#流程执行ID不能为空"` FlowExecutionId int64 `json:"flowExecutionId" v:"required#流程执行ID不能为空"`
NodeId string `json:"nodeId" v:"required#节点ID不能为空"` NodeId string `json:"nodeId" v:"required#节点ID不能为空"`
NodeName string `json:"nodeName"` NodeName string `json:"nodeName"`
NodeGroupId string `json:"nodeGroupId"` NodeGroupId string `json:"nodeGroupId"`
Status node.NodeExecutionStatus `json:"status"` Status node.NodeExecutionStatus `json:"status"`
InputParams *flowDto.NodeExecutionInput `json:"inputParams"` InputParams *flowDto.NodeExecutionInput `json:"inputParams"`
InputParamsPath string
OutputParams *flowDto.NodeExecutionInput `json:"outputParams"`
OutputParamsPath string
} }
type CreateNodeExecutionRes struct { type CreateNodeExecutionRes struct {
@@ -29,12 +32,15 @@ type UpdateNodeExecutionReq struct {
g.Meta `path:"/update" method:"put" tags:"节点执行记录" summary:"更新节点执行记录" dc:"更新节点执行记录状态和结果"` g.Meta `path:"/update" method:"put" tags:"节点执行记录" summary:"更新节点执行记录" dc:"更新节点执行记录状态和结果"`
Id int64 `json:"id" v:"required#ID不能为空"` Id int64 `json:"id" v:"required#ID不能为空"`
InputParams *flowDto.NodeExecutionInput `json:"inputParams"` InputParams *flowDto.NodeExecutionInput `json:"inputParams"`
PromptTokens int `json:"promptTokens"` InputParamsPath string
CompletionTokens int `json:"completionTokens"` OutputParams *flowDto.NodeExecutionInput `json:"outputParams"`
TotalTokens int `json:"totalTokens"` OutputParamsPath string
Status node.NodeExecutionStatus `json:"status"` PromptTokens int `json:"promptTokens"`
DurationMs int64 `json:"durationMs"` CompletionTokens int `json:"completionTokens"`
ErrorMessage string `json:"errorMessage"` TotalTokens int `json:"totalTokens"`
Status node.NodeExecutionStatus `json:"status"`
DurationMs int64 `json:"durationMs"`
ErrorMessage string `json:"errorMessage"`
} }
// DeleteNodeExecutionReq 删除节点执行记录请求 // DeleteNodeExecutionReq 删除节点执行记录请求

View File

@@ -16,7 +16,9 @@ type NodeExecution struct {
NodeName string `orm:"node_name" json:"nodeName" description:"节点名称"` NodeName string `orm:"node_name" json:"nodeName" description:"节点名称"`
NodeGroupId string `orm:"node_group_id" json:"nodeGroupId" description:"节点组ID"` NodeGroupId string `orm:"node_group_id" json:"nodeGroupId" description:"节点组ID"`
InputParams map[string]interface{} `orm:"input_params" json:"inputParams" description:"节点输入参数"` InputParams map[string]interface{} `orm:"input_params" json:"inputParams" description:"节点输入参数"`
InputParamsPath string `orm:"input_params_path" json:"inputParamsPath" description:"节点输入参数路径"`
OutputParams map[string]interface{} `orm:"output_params" json:"outputParams" description:"节点输出参数"` OutputParams map[string]interface{} `orm:"output_params" json:"outputParams" description:"节点输出参数"`
OutputParamsPath string `orm:"output_params_path" json:"outputParamsPath" description:"节点输出参数路径"`
PromptTokens int `orm:"prompt_tokens" json:"promptTokens" description:"提示词token消耗"` PromptTokens int `orm:"prompt_tokens" json:"promptTokens" description:"提示词token消耗"`
CompletionTokens int `orm:"completion_tokens" json:"completionTokens" description:"补全token消耗"` CompletionTokens int `orm:"completion_tokens" json:"completionTokens" description:"补全token消耗"`
TotalTokens int `orm:"total_tokens" json:"totalTokens" description:"总token消耗"` TotalTokens int `orm:"total_tokens" json:"totalTokens" description:"总token消耗"`
@@ -32,7 +34,9 @@ type nodeExecutionCol struct {
NodeName string NodeName string
NodeGroupId string NodeGroupId string
InputParams string InputParams string
InputParamsPath string
OutputParams string OutputParams string
OutputParamsPath string
PromptTokens string PromptTokens string
CompletionTokens string CompletionTokens string
TotalTokens string TotalTokens string
@@ -48,7 +52,9 @@ var NodeExecutionCol = nodeExecutionCol{
NodeName: "node_name", NodeName: "node_name",
NodeGroupId: "node_group_id", NodeGroupId: "node_group_id",
InputParams: "input_params", InputParams: "input_params",
InputParamsPath: "input_params_path",
OutputParams: "output_params", OutputParams: "output_params",
OutputParamsPath: "output_params_path",
PromptTokens: "prompt_tokens", PromptTokens: "prompt_tokens",
CompletionTokens: "completion_tokens", CompletionTokens: "completion_tokens",
TotalTokens: "total_tokens", TotalTokens: "total_tokens",

View File

@@ -6,6 +6,7 @@ import (
fileDao "ai-agent/workflow/dao/file" fileDao "ai-agent/workflow/dao/file"
flowDao "ai-agent/workflow/dao/flow" flowDao "ai-agent/workflow/dao/flow"
nodeDao "ai-agent/workflow/dao/node" nodeDao "ai-agent/workflow/dao/node"
"ai-agent/workflow/model/dto"
fileDto "ai-agent/workflow/model/dto/file" fileDto "ai-agent/workflow/model/dto/file"
flowDto "ai-agent/workflow/model/dto/flow" flowDto "ai-agent/workflow/model/dto/flow"
nodeDto "ai-agent/workflow/model/dto/node" nodeDto "ai-agent/workflow/model/dto/node"
@@ -633,12 +634,22 @@ func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNod
} }
// ✅ 插入节点执行记录,初始状态为运行中 // ✅ 插入节点执行记录,初始状态为运行中
startTime := time.Now() startTime := time.Now()
// 上传OSS每条独立上传
ossResult, err := Upload(ctx, &dto.UploadFileBytesReq{
FileBytes: gconv.Bytes(gconv.String(realInput)),
FileName: fmt.Sprintf("nodeInput:%v.txt", time.Now().UnixMilli()),
})
if err != nil {
return nil, err
}
nodeExecutionId, err := nodeDao.NodeExecutionDao.Insert(ctx, &nodeDto.CreateNodeExecutionReq{ nodeExecutionId, err := nodeDao.NodeExecutionDao.Insert(ctx, &nodeDto.CreateNodeExecutionReq{
FlowExecutionId: execInput.ExecutionId, FlowExecutionId: execInput.ExecutionId,
NodeId: nodeID, NodeId: nodeID,
NodeName: flowNode.Name, NodeName: flowNode.Name,
NodeGroupId: execInput.NodeGroupId, NodeGroupId: execInput.NodeGroupId,
InputParams: realInput, InputParamsPath: ossResult.FileURL,
Status: node.NodeExecutionStatusRunning.Code(), Status: node.NodeExecutionStatusRunning.Code(),
}) })
if err != nil { if err != nil {
@@ -653,10 +664,18 @@ func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNod
// 执行节点 // 执行节点
_, err = lambda(ctx, realInput) _, err = lambda(ctx, realInput)
durationMs := time.Since(startTime).Milliseconds() durationMs := time.Since(startTime).Milliseconds()
// 上传OSS每条独立上传
ossResult1, err := Upload(ctx, &dto.UploadFileBytesReq{
FileBytes: gconv.Bytes(gconv.String(realInput)),
FileName: fmt.Sprintf("nodeInput:%v.txt", time.Now().UnixMilli()),
})
if err != nil {
return nil, err
}
updateReq := &nodeDto.UpdateNodeExecutionReq{ updateReq := &nodeDto.UpdateNodeExecutionReq{
Id: nodeExecutionId, Id: nodeExecutionId,
InputParams: realInput, OutputParamsPath: ossResult1.FileURL,
DurationMs: durationMs, DurationMs: durationMs,
} }
if err != nil { if err != nil {
// 执行失败,更新状态 // 执行失败,更新状态