From e6c27e2dee6c5ffc3a800c07846e23a1c3b395dd Mon Sep 17 00:00:00 2001 From: lmk <1095689763@qq.com> Date: Mon, 25 May 2026 15:08:47 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=9A=E8=BF=87=E5=88=86=E9=95=9Cjson?= =?UTF-8?q?=E5=89=AA=E8=BE=91=E8=A7=86=E9=A2=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.yml | 2 +- controller/video/concat_controller.go | 5 + controller/video/cut_controller.go | 92 ++++ dao/video/cut_task_dao.go | 111 +++++ main.go | 1 + model/dto/video/cut_dto.go | 78 +++ model/entity/video/cut_task.go | 54 +++ service/video/cut_service.go | 662 ++++++++++++++++++++++++++ sql/cut_task.sql | 42 ++ 9 files changed, 1046 insertions(+), 1 deletion(-) create mode 100644 controller/video/cut_controller.go create mode 100644 dao/video/cut_task_dao.go create mode 100644 model/dto/video/cut_dto.go create mode 100644 model/entity/video/cut_task.go create mode 100644 service/video/cut_service.go create mode 100644 sql/cut_task.sql diff --git a/config.yml b/config.yml index 2b3439e..072548d 100644 --- a/config.yml +++ b/config.yml @@ -37,7 +37,7 @@ database: redis: # 集群模式配置方法 default: - address: 116.204.74.41:6379 + address: 192.168.3.30:6379 db: 0 idleTimeout: "60s" #连接最大空闲时间,使用时间字符串例如30s/1m/1d maxConnLifetime: "90s" #连接最长存活时间,使用时间字符串例如30s/1m/1d diff --git a/controller/video/concat_controller.go b/controller/video/concat_controller.go index f58e336..5b52ead 100644 --- a/controller/video/concat_controller.go +++ b/controller/video/concat_controller.go @@ -133,6 +133,11 @@ func withUser(ctx context.Context) context.Context { return ctx } + // 调试:打印 Authorization 头 + if req := g.RequestFromCtx(ctx); req != nil { + g.Log().Debugf(ctx, "[withUser] Authorization头=%q", req.Header.Get("Authorization")) + } + user, err := utils.GetUserInfo(ctx) if err == nil && user != nil && user.TenantId > 0 { g.Log().Infof(ctx, "[用户信息] 从请求头解析到用户: userName=%s, tenantId=%d", user.UserName, user.TenantId) diff --git a/controller/video/cut_controller.go b/controller/video/cut_controller.go new file mode 100644 index 0000000..c2034f7 --- /dev/null +++ b/controller/video/cut_controller.go @@ -0,0 +1,92 @@ +package video + +import ( + "context" + "fmt" + "os" + + dto "media/model/dto/video" + service "media/service/video" + + "github.com/gogf/gf/v2/frame/g" +) + +type cut struct{} + +var Cut = new(cut) + +// Cut 视频分镜剪切(URL模式) POST /video/cut +func (c *cut) Cut(ctx context.Context, req *dto.CutReq) (res *dto.CutRes, err error) { + ctx = withUser(ctx) + g.Log().Infof(ctx, "[视频分镜剪切] 收到请求 入参: video_url=%s, total_scenes=%d, upload=%v", + req.VideoURL, len(req.Scenes), req.Upload) + + // 下载视频到临时目录 + tempDir := getTempDir(ctx) + os.MkdirAll(tempDir, 0755) + + savePath, err := downloadFromURL(ctx, req.VideoURL, tempDir) + if err != nil { + return nil, fmt.Errorf("下载视频失败: %v", err) + } + defer os.Remove(savePath) + + // 转换分镜为 service 层类型(解析时间字符串为秒) + serviceShots, err := service.ConvertScenes(req.Scenes) + if err != nil { + return nil, fmt.Errorf("解析分镜失败: %v", err) + } + if len(serviceShots) == 0 { + return nil, fmt.Errorf("没有有效的分镜片段") + } + + // 调用服务层 + svcRes, err := service.Cut.Cut(ctx, &service.CutReq{ + VideoPath: savePath, + Shots: serviceShots, + Upload: req.Upload, + }) + if err != nil { + return nil, err + } + + defer os.Remove(svcRes.OutputPath) + + return &dto.CutRes{ + OutputPath: svcRes.OutputPath, + FileSize: svcRes.FileSize, + Duration: svcRes.Duration, + DurationStr: svcRes.DurationStr, + ShotsCount: svcRes.ShotsCount, + FileURL: svcRes.FileURL, + }, nil +} + +// CutAsync 视频分镜剪切-异步(URL模式) POST /video/cut/async +func (c *cut) CutAsync(ctx context.Context, req *dto.CutAsyncReq) (res *dto.CreateCutTaskRes, err error) { + ctx = withUser(ctx) + g.Log().Infof(ctx, "[视频分镜剪切-异步] 收到请求 入参: video_url=%s, total_scenes=%d, upload=%v, callback=%s", + req.VideoURL, len(req.Scenes), req.Upload, req.CallbackURL) + + // 转换分镜为 service 层类型(解析时间字符串为秒) + serviceShots, err := service.ConvertScenes(req.Scenes) + if err != nil { + return nil, fmt.Errorf("解析分镜失败: %v", err) + } + if len(serviceShots) == 0 { + return nil, fmt.Errorf("没有有效的分镜片段") + } + + taskID, taskErr := service.Cut.CreateAsyncTask(ctx, req.VideoURL, serviceShots, req.Upload, req.CallbackURL) + if taskErr != nil { + return nil, taskErr + } + + return &dto.CreateCutTaskRes{TaskID: taskID}, nil +} + +// GetCutTask 查询异步剪切任务结果 GET /video/cut/task/{taskId} +func (c *cut) GetCutTask(ctx context.Context, req *dto.GetCutTaskReq) (res *dto.GetCutTaskRes, err error) { + ctx = withUser(ctx) + return service.Cut.GetTaskResult(ctx, req.TaskID) +} diff --git a/dao/video/cut_task_dao.go b/dao/video/cut_task_dao.go new file mode 100644 index 0000000..0320de2 --- /dev/null +++ b/dao/video/cut_task_dao.go @@ -0,0 +1,111 @@ +package video + +import ( + "context" + "time" + + dto "media/model/dto/video" + entity "media/model/entity/video" + + "gitea.com/red-future/common/db/gfdb" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/util/gconv" +) + +var CutTask = new(cutTaskDao) + +type cutTaskDao struct{} + +const cutTaskTable = "cut_task" + +// Insert 创建任务(排除 id 字段,让数据库自增) +func (d *cutTaskDao) Insert(ctx context.Context, data *entity.CutTask) (id int64, err error) { + r, err := gfdb.DB(ctx).Model(ctx, cutTaskTable). + Data(data). + FieldsEx(entity.CutTaskCols.Id). + Insert() + if err != nil { + return 0, err + } + return r.LastInsertId() +} + +// GetByTaskID 根据taskId查询任务 +func (d *cutTaskDao) GetByTaskID(ctx context.Context, taskID string) (res *entity.CutTask, err error) { + r, err := gfdb.DB(ctx).Model(ctx, cutTaskTable). + Where(entity.CutTaskCols.TaskID, taskID). + One() + if err != nil { + return nil, err + } + if r == nil { + return nil, nil + } + err = r.Struct(&res) + return +} + +// UpdateRunning 更新为运行中 +func (d *cutTaskDao) UpdateRunning(ctx context.Context, taskID string) error { + _, err := gfdb.DB(ctx).Model(ctx, cutTaskTable). + Data(g.Map{ + entity.CutTaskCols.Status: "running", + }). + Where(entity.CutTaskCols.TaskID, taskID). + Update() + return err +} + +// UpdateSuccess 更新为成功 +func (d *cutTaskDao) UpdateSuccess(ctx context.Context, taskID string, fileURL string, fileSize int64, fileName, fileFormat, fileAddrPrefix, durationStr string) error { + _, err := gfdb.DB(ctx).Model(ctx, cutTaskTable). + Data(g.Map{ + entity.CutTaskCols.Status: "success", + entity.CutTaskCols.FileURL: fileURL, + entity.CutTaskCols.FileSize: fileSize, + entity.CutTaskCols.FileName: fileName, + entity.CutTaskCols.FileFormat: fileFormat, + entity.CutTaskCols.FileAddressPrefix: fileAddrPrefix, + entity.CutTaskCols.DurationStr: durationStr, + entity.CutTaskCols.ErrorMessage: "", + }). + Where(entity.CutTaskCols.TaskID, taskID). + Update() + return err +} + +// UpdateError 更新为失败 +func (d *cutTaskDao) UpdateError(ctx context.Context, taskID string, errMsg string) error { + _, err := gfdb.DB(ctx).Model(ctx, cutTaskTable). + Data(g.Map{ + entity.CutTaskCols.Status: "failed", + entity.CutTaskCols.ErrorMessage: errMsg, + }). + Where(entity.CutTaskCols.TaskID, taskID). + Update() + return err +} + +// EntityToTaskRes 实体转DTO +func CutEntityToTaskRes(e *entity.CutTask) *dto.GetCutTaskRes { + res := &dto.GetCutTaskRes{ + TaskID: e.TaskID, + Status: e.Status, + CreatedAt: gconv.Int64(e.CreatedAt.Timestamp()), + } + if e.CreatedAt == nil { + res.CreatedAt = time.Now().UnixMilli() + } + if e.Status == "success" { + res.FileURL = e.FileURL + res.FileSize = e.FileSize + res.FileName = e.FileName + res.FileFormat = e.FileFormat + res.FileAddressPrefix = e.FileAddressPrefix + res.DurationStr = e.DurationStr + } + if e.Status == "failed" { + res.ErrorMessage = e.ErrorMessage + } + return res +} diff --git a/main.go b/main.go index 617eb65..4c1715f 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,7 @@ func main() { http.RouteRegister([]interface{}{ controllerAudio.AudioExtract, controllerVideo.Concat, + controllerVideo.Cut, }) select {} } diff --git a/model/dto/video/cut_dto.go b/model/dto/video/cut_dto.go new file mode 100644 index 0000000..652f91a --- /dev/null +++ b/model/dto/video/cut_dto.go @@ -0,0 +1,78 @@ +package video + +import "github.com/gogf/gf/v2/frame/g" + +// CutScene 单个分镜场景 +// 同时兼容两种格式: +// 1. 完整格式: {"sceneIndex":1,"startTimeStr":"00:00:00.000","endTimeStr":"00:00:03.115", ...} +// 2. 简化格式: {"start": 1.5, "end": 5.0} +type CutScene struct { + SceneIndex int `json:"sceneIndex" dc:"场景序号"` + StartTimeStr string `json:"startTimeStr" dc:"起始时间 HH:MM:SS.mmm"` + EndTimeStr string `json:"endTimeStr" dc:"结束时间 HH:MM:SS.mmm"` + Start float64 `json:"start" dc:"起始时间(秒)"` + End float64 `json:"end" dc:"结束时间(秒)"` + DurationStr string `json:"durationStr" dc:"时长"` + ShotType string `json:"shotType" dc:"镜头类型"` + Composition string `json:"composition" dc:"构图"` + NarrativePos string `json:"narrativePos" dc:"叙事位置"` + Description string `json:"description" dc:"描述"` +} + +// CutReq 视频分镜剪切请求(URL 方式) +type CutReq struct { + g.Meta `path:"/video/cut" method:"post" tags:"视频剪切" summary:"视频分镜剪切(URL模式)" dc:"根据分镜JSON从视频中剪切多个片段并拼接输出"` + VideoURL string `json:"video_url" v:"required#视频URL不能为空" dc:"原始视频URL"` + Scenes []CutScene `json:"scenes" v:"required#分镜片段不能为空" dc:"分镜片段数组"` + TotalScenes int `json:"totalScenes" dc:"总分镜数"` + DurationStr string `json:"durationStr" dc:"总时长"` + Upload bool `json:"upload" dc:"是否上传到MinIO" d:"false"` +} + +// CutAsyncReq 视频分镜剪切-异步请求(URL方式) +type CutAsyncReq struct { + g.Meta `path:"/video/cut/async" method:"post" tags:"视频剪切" summary:"视频分镜剪切-异步(URL模式)" dc:"异步根据分镜JSON剪切视频,立即返回taskId,完成后通过callback_url通知结果"` + VideoURL string `json:"video_url" v:"required#视频URL不能为空" dc:"原始视频URL"` + Scenes []CutScene `json:"scenes" v:"required#分镜片段不能为空" dc:"分镜片段数组"` + TotalScenes int `json:"totalScenes" dc:"总分镜数"` + DurationStr string `json:"durationStr" dc:"总时长"` + Upload bool `json:"upload" dc:"是否上传到MinIO" d:"false"` + CallbackURL string `json:"callback_url" v:"required#回调地址不能为空" dc:"回调地址,剪切完成后POST结果到该地址"` +} + +// CutRes 视频分镜剪切响应 +type CutRes struct { + OutputPath string `json:"outputPath" dc:"输出文件路径"` + FileSize int64 `json:"fileSize" dc:"文件大小(字节)"` + Duration float64 `json:"duration" dc:"总时长(秒)"` + DurationStr string `json:"durationStr" dc:"可读时长"` + ShotsCount int `json:"shotsCount" dc:"输出片段数"` + FileURL string `json:"fileURL" dc:"MinIO访问地址(上传后返回)"` +} + +// ---------- 异步剪切任务 ---------- + +// CreateCutTaskRes 创建异步剪切任务响应 +type CreateCutTaskRes struct { + TaskID string `json:"taskId" dc:"任务ID"` +} + +// GetCutTaskReq 查询异步剪切任务请求 +type GetCutTaskReq struct { + g.Meta `path:"/video/cut/task/{taskId}" method:"get" tags:"视频剪切" summary:"查询剪切任务结果" dc:"根据taskId查询异步剪切任务的结果"` + TaskID string `json:"taskId" dc:"任务ID"` +} + +// GetCutTaskRes 查询异步剪切任务响应 +type GetCutTaskRes struct { + TaskID string `json:"taskId" dc:"任务ID"` + Status string `json:"status" dc:"状态: pending/running/success/failed"` + FileURL string `json:"fileURL,omitempty" dc:"MinIO文件访问路径"` + FileSize int64 `json:"fileSize,omitempty" dc:"文件大小(字节)"` + FileName string `json:"fileName,omitempty" dc:"文件名"` + FileFormat string `json:"fileFormat,omitempty" dc:"文件格式"` + FileAddressPrefix string `json:"fileAddressPrefix,omitempty" dc:"MinIO地址前缀"` + DurationStr string `json:"durationStr,omitempty" dc:"剪切后时长"` + ErrorMessage string `json:"errorMessage,omitempty" dc:"错误信息"` + CreatedAt int64 `json:"createdAt" dc:"创建时间戳"` +} diff --git a/model/entity/video/cut_task.go b/model/entity/video/cut_task.go new file mode 100644 index 0000000..bfe3e7d --- /dev/null +++ b/model/entity/video/cut_task.go @@ -0,0 +1,54 @@ +package video + +import "gitea.com/red-future/common/beans" + +// CutTask 视频分镜剪切异步任务实体 +type CutTask struct { + beans.SQLBaseDO `orm:",inherit"` + TaskID string `orm:"task_id" json:"taskId" description:"任务唯一标识"` + VideoURL string `orm:"video_url" json:"videoUrl" description:"原始视频URL"` + ShotsJSON string `orm:"shots_json" json:"shotsJson" description:"分镜JSON"` + Status string `orm:"status" json:"status" description:"任务状态:pending/running/success/failed"` + FileURL string `orm:"file_url" json:"fileUrl" description:"MinIO文件访问路径"` + FileSize int64 `orm:"file_size" json:"fileSize" description:"文件大小(字节)"` + FileName string `orm:"file_name" json:"fileName" description:"文件名"` + FileFormat string `orm:"file_format" json:"fileFormat" description:"文件格式"` + FileAddressPrefix string `orm:"file_address_prefix" json:"fileAddressPrefix" description:"MinIO地址前缀"` + DurationStr string `orm:"duration_str" json:"durationStr" description:"剪切后时长"` + ErrorMessage string `orm:"error_message" json:"errorMessage" description:"错误信息"` + CallbackURL string `orm:"callback_url" json:"callbackUrl" description:"回调地址"` +} + +// CutTaskCol 字段定义 +type CutTaskCol struct { + beans.SQLBaseCol + TaskID string + VideoURL string + ShotsJSON string + Status string + FileURL string + FileSize string + FileName string + FileFormat string + FileAddressPrefix string + DurationStr string + ErrorMessage string + CallbackURL string +} + +// CutTaskCols 字段常量 +var CutTaskCols = CutTaskCol{ + SQLBaseCol: beans.DefSQLBaseCol, + TaskID: "task_id", + VideoURL: "video_url", + ShotsJSON: "shots_json", + Status: "status", + FileURL: "file_url", + FileSize: "file_size", + FileName: "file_name", + FileFormat: "file_format", + FileAddressPrefix: "file_address_prefix", + DurationStr: "duration_str", + ErrorMessage: "error_message", + CallbackURL: "callback_url", +} diff --git a/service/video/cut_service.go b/service/video/cut_service.go new file mode 100644 index 0000000..b1d03af --- /dev/null +++ b/service/video/cut_service.go @@ -0,0 +1,662 @@ +package video + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "time" + + dao "media/dao/video" + dto "media/model/dto/video" + entity "media/model/entity/video" + + "gitea.com/red-future/common/beans" + commonHttp "gitea.com/red-future/common/http" + + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/glog" + "github.com/gogf/gf/v2/util/guid" +) + +type cutService struct{} + +// Cut 视频分镜剪切服务单例 +var Cut = new(cutService) + +// CutShot 单个分镜片段(转换为秒后内部存储) +type CutShot struct { + Start float64 `json:"start"` + End float64 `json:"end"` +} + +// CutReq 视频剪切请求 +type CutReq struct { + VideoPath string // 输入视频文件路径 + Shots []CutShot // 分镜片段列表(按此顺序剪切拼接) + OutputPath string // 输出视频文件路径,空则自动生成 + Upload bool // 是否上传到MinIO +} + +// CutRes 视频剪切响应 +type CutRes struct { + OutputPath string `json:"outputPath"` // 输出文件路径 + FileSize int64 `json:"fileSize"` // 文件大小(bytes) + Duration float64 `json:"duration"` // 总时长(秒) + DurationStr string `json:"durationStr"` // 可读时长 + ShotsCount int `json:"shotsCount"` // 输出片段数 + FileURL string `json:"fileURL"` // MinIO访问地址(上传后返回) +} + +// parseTimeStr 解析 HH:MM:SS.mmm 格式为秒 +func parseTimeStr(timeStr string) (float64, error) { + parts := strings.Split(timeStr, ":") + if len(parts) != 3 { + return 0, fmt.Errorf("invalid time format: %s, expected HH:MM:SS.mmm", timeStr) + } + + h, err := strconv.Atoi(parts[0]) + if err != nil { + return 0, err + } + + m, err := strconv.Atoi(parts[1]) + if err != nil { + return 0, err + } + + // 处理秒和毫秒 + secParts := strings.SplitN(parts[2], ".", 2) + s, err := strconv.Atoi(secParts[0]) + if err != nil { + return 0, err + } + + ms := 0 + if len(secParts) > 1 { + ms, _ = strconv.Atoi(secParts[1]) + } + + total := float64(h)*3600 + float64(m)*60 + float64(s) + float64(ms)/1000.0 + return total, nil +} + +// ConvertScenes 将 DTO 的分镜片段数组转换为内部的 CutShot 列表 +// 同时兼容两种格式: +// 1. 如果 scene.Start > 0 && scene.End > scene.Start → 直接使用 +// 2. 否则 → 解析 scene.StartTimeStr / scene.EndTimeStr +func ConvertScenes(scenes []dto.CutScene) ([]CutShot, error) { + var shots []CutShot + for _, scene := range scenes { + var ( + start float64 + end float64 + err error + ) + + // 优先使用 start/end 直接给出的秒数 + if scene.Start > 0 && scene.End > scene.Start { + start = scene.Start + end = scene.End + } else if scene.StartTimeStr != "" && scene.EndTimeStr != "" { + // 否则解析时间字符串 HH:MM:SS.mmm + start, err = parseTimeStr(scene.StartTimeStr) + if err != nil { + return nil, fmt.Errorf("parse start time %s: %v", scene.StartTimeStr, err) + } + end, err = parseTimeStr(scene.EndTimeStr) + if err != nil { + return nil, fmt.Errorf("parse end time %s: %v", scene.EndTimeStr, err) + } + } else { + // 没有可用的时间信息,跳过 + continue + } + + if end > start { + shots = append(shots, CutShot{Start: start, End: end}) + } + } + return shots, nil +} + +// Cut 根据分镜剪切多个片段并拼接输出 +func (s *cutService) Cut(ctx context.Context, req *CutReq) (res *CutRes, err error) { + g.Log().Infof(ctx, "[Cut] 服务层收到请求: video=%s, shots=%d, upload=%v", + req.VideoPath, len(req.Shots), req.Upload) + + // 校验输入 + if _, err := os.Stat(req.VideoPath); os.IsNotExist(err) { + return nil, fmt.Errorf("输入视频文件不存在: %s", req.VideoPath) + } + + // 过滤掉无效片段(start >= end) + var validShots []CutShot + for _, shot := range req.Shots { + if shot.End > shot.Start { + validShots = append(validShots, shot) + } + } + if len(validShots) == 0 { + return nil, fmt.Errorf("没有有效的分镜片段(所有片段 start >= end)") + } + + ffmpegPath, err := s.getFFmpegPath() + if err != nil { + return nil, err + } + + // 生成输出路径 + outputPath := req.OutputPath + if outputPath == "" { + outputDir := filepath.Dir(req.VideoPath) + baseName := filepath.Base(req.VideoPath) + ext := filepath.Ext(baseName) + stem := strings.TrimSuffix(baseName, ext) + stemRunes := []rune(stem) + if len(stemRunes) > 20 { + stemRunes = stemRunes[:20] + } + outputPath = filepath.Join(outputDir, + fmt.Sprintf("cut_%s_x%d_%s%s", string(stemRunes), len(validShots), time.Now().Format("150405"), ext)) + } + + // 执行剪切拼接 + err = s.cutByFilterComplex(ctx, ffmpegPath, req.VideoPath, validShots, outputPath) + if err != nil { + return nil, fmt.Errorf("视频剪切失败: %v", err) + } + + // 获取输出文件信息 + stat, statErr := os.Stat(outputPath) + if statErr != nil { + return nil, fmt.Errorf("输出文件异常: %v", statErr) + } + + // 获取时长 + duration, _ := s.getVideoDuration(ctx, ffmpegPath, outputPath) + + res = &CutRes{ + OutputPath: outputPath, + FileSize: stat.Size(), + Duration: duration, + DurationStr: formatDuration(duration), + ShotsCount: len(validShots), + } + + // 如果需要上传到 MinIO + if req.Upload { + uploadCtx := context.Background() + uploadRes, uploadErr := s.UploadToMinIO(uploadCtx, outputPath) + if uploadErr != nil { + return nil, fmt.Errorf("上传到MinIO失败: %v", uploadErr) + } + res.FileURL = uploadRes.FileURL + } + + return +} + +// cutByFilterComplex 使用 filter_complex 一步完成剪切拼接 +// 对每个片段用 trim/atrim 截取,然后 concat 拼接 +func (s *cutService) cutByFilterComplex(ctx context.Context, ffmpegPath, inputPath string, shots []CutShot, outputPath string) error { + n := len(shots) + + // 检测视频是否有音频轨道 + hasAudio, err := s.checkVideoAudio(ctx, ffmpegPath, inputPath) + if err != nil { + g.Log().Debugf(ctx, "[Cut] 检测音频失败: %v", err) + hasAudio = false + } + + // 获取视频总时长,用于裁剪超出的 end + videoDuration, _ := s.getVideoDuration(ctx, ffmpegPath, inputPath) + + // 构建 filter_complex + var filterParts []string + var concatInputs []string + + for i, shot := range shots { + // 确保 end 不超过视频时长 + end := shot.End + if videoDuration > 0 && end > videoDuration { + end = videoDuration + } + if end <= shot.Start { + continue + } + + // 视频: trim 截取,重置 PTS + filterParts = append(filterParts, fmt.Sprintf( + "[0:v]trim=start=%.3f:end=%.3f,setpts=PTS-STARTPTS[v%d]", + shot.Start, end, i, + )) + + // 音频处理 + if hasAudio { + // 有音频轨道,用 atrim 截取 + filterParts = append(filterParts, fmt.Sprintf( + "[0:a]atrim=start=%.3f:end=%.3f,asetpts=PTS-STARTPTS[a%d]", + shot.Start, end, i, + )) + concatInputs = append(concatInputs, fmt.Sprintf("[v%d][a%d]", i, i)) + } else { + // 无音频轨道,为这个片段生成静音音频 + dur := end - shot.Start + filterParts = append(filterParts, fmt.Sprintf( + "aevalsrc=0:n=2:s=44100:d=%.2f[a%d]", + dur, i, + )) + concatInputs = append(concatInputs, fmt.Sprintf("[v%d][a%d]", i, i)) + } + } + + // 拼接所有片段 + filterStr := fmt.Sprintf("%s;%sconcat=n=%d:v=1:a=1[outv][outa]", + strings.Join(filterParts, ";"), + strings.Join(concatInputs, ""), n, + ) + + // 构造完整参数 + args := []string{ + "-i", inputPath, + "-filter_complex", filterStr, + "-map", "[outv]", + "-map", "[outa]", + "-c:v", "h264_videotoolbox", + "-b:v", "5M", + "-allow_sw", "true", + "-c:a", "aac", + "-y", + outputPath, + } + + // 调试:记录完整命令 + g.Log().Debugf(ctx, "[Cut] ffmpeg 命令: %s %v", ffmpegPath, args) + + // 保存 filter 用于调试 + outputDir := filepath.Dir(outputPath) + filterFile := filepath.Join(outputDir, "cut_filter.txt") + os.WriteFile(filterFile, []byte(filterStr), 0644) + defer os.Remove(filterFile) + + // 使用独立 context,避免 HTTP 请求超时导致 ffmpeg 被 SIGKILL + bgCtx := context.Background() + cmd := exec.CommandContext(bgCtx, ffmpegPath, args...) + outputBytes, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("ffmpeg 执行失败: %v\n日志:\n%s", err, string(outputBytes)) + } + + return nil +} + +// getFFmpegPath 获取 FFmpeg 路径 +func (s *cutService) getFFmpegPath() (string, error) { + ffmpegPath := g.Cfg().MustGet(context.Background(), "ffmpeg.path", "").String() + if ffmpegPath != "" { + if _, err := os.Stat(ffmpegPath); err == nil { + return ffmpegPath, nil + } + } + path, err := exec.LookPath("ffmpeg") + if err != nil { + return "", fmt.Errorf("未找到 ffmpeg") + } + return path, nil +} + +// getVideoDuration 获取视频时长 +func (s *cutService) getVideoDuration(ctx context.Context, ffmpegPath, videoPath string) (float64, error) { + ffprobePath := filepath.Join(filepath.Dir(ffmpegPath), "ffprobe") + if _, err := os.Stat(ffprobePath); os.IsNotExist(err) { + ffprobePath = "ffprobe" + } + + cmd := exec.CommandContext(ctx, ffprobePath, + "-v", "error", + "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", + videoPath, + ) + + output, err := cmd.Output() + if err != nil { + return 0, err + } + + var duration float64 + fmt.Sscanf(strings.TrimSpace(string(output)), "%f", &duration) + return duration, nil +} + +// checkVideoAudio 检测视频文件是否有音频轨道 +func (s *cutService) checkVideoAudio(ctx context.Context, ffmpegPath, videoPath string) (bool, error) { + ffprobePath := filepath.Join(filepath.Dir(ffmpegPath), "ffprobe") + if _, err := os.Stat(ffprobePath); os.IsNotExist(err) { + ffprobePath = "ffprobe" + } + + cmd := exec.CommandContext(ctx, ffprobePath, + "-v", "error", + "-select_streams", "a:0", + "-show_entries", "stream=codec_type", + "-of", "default=noprint_wrappers=1:nokey=1", + videoPath, + ) + output, err := cmd.Output() + if err != nil || len(strings.TrimSpace(string(output))) == 0 { + return false, err + } + return true, nil +} + +// hasVideoAudio 别名保持命名一致(从concat_service复制) +func (s *cutService) hasVideoAudio(ctx context.Context, ffmpegPath, videoPath string) bool { + has, _ := s.checkVideoAudio(ctx, ffmpegPath, videoPath) + return has +} + +// getVideoResolution 获取视频分辨率(复用) +func (s *cutService) getVideoResolution(ctx context.Context, ffmpegPath, videoPath string) (width, height int, err error) { + ffprobePath := filepath.Join(filepath.Dir(ffmpegPath), "ffprobe") + if _, err := os.Stat(ffprobePath); os.IsNotExist(err) { + ffprobePath = "ffprobe" + } + + cmd := exec.CommandContext(ctx, ffprobePath, + "-v", "error", + "-select_streams", "v:0", + "-show_entries", "stream=width,height", + "-of", "csv=p=0", + videoPath, + ) + output, err := cmd.Output() + if err != nil { + return 0, 0, err + } + fmt.Sscanf(strings.TrimSpace(string(output)), "%d,%d", &width, &height) + return +} + +// cutUploadFileRes 上传文件响应(类型别名避免重定义) +type cutUploadFileRes struct { + FileURL string `json:"fileURL" dc:"上传地址"` + FileSize int `json:"fileSize" dc:"文件大小"` + FileName string `json:"fileName" dc:"文件名称"` + FileFormat string `json:"fileFormat" dc:"文件格式"` + FileAddressPrefix string `json:"fileAddressPrefix" dc:"文件地址前缀"` +} + +// UploadToMinIO 通过 OSS 微服务的 uploadFile 接口上传到 MinIO(multipart/form-data) +func (s *cutService) UploadToMinIO(ctx context.Context, localFilePath string) (*cutUploadFileRes, error) { + // 构建 multipart/form-data 表单 + var buf bytes.Buffer + mw := multipart.NewWriter(&buf) + + file, err := os.Open(localFilePath) + if err != nil { + return nil, fmt.Errorf("打开文件失败: %v", err) + } + defer file.Close() + + fw, err := mw.CreateFormFile("file", filepath.Base(localFilePath)) + if err != nil { + return nil, fmt.Errorf("创建表单文件字段失败: %v", err) + } + if _, err = io.Copy(fw, file); err != nil { + return nil, fmt.Errorf("写入文件内容失败: %v", err) + } + mw.Close() + + // 使用 commonHttp 的客户端(含 Consul 服务发现),大文件上传设置长超时 + client := commonHttp.Httpclient.Clone() + // 必须单独设置 Transport.ResponseHeaderTimeout,SetTimeout 只设 Client.Timeout + newTransport := http.DefaultTransport.(*http.Transport).Clone() + newTransport.ResponseHeaderTimeout = 5 * time.Minute + client.Transport = newTransport + client.SetTimeout(10 * time.Minute) + + // 透传认证 headers(优先从 HTTP 请求头取) + hasAuthHeader := false + if r := g.RequestFromCtx(ctx); r != nil { + for k, v := range r.Header { + client.SetHeader(k, v[0]) + if strings.EqualFold(k, "Authorization") || strings.EqualFold(k, "X-User-Info") { + hasAuthHeader = true + } + } + } + // 无 HTTP 请求时(异步 goroutine),从 context 的用户信息构造 header + if !hasAuthHeader { + uploadUser := getUserFromCtx(ctx) + userJSON, _ := json.Marshal(uploadUser) + client.SetHeader("X-User-Info", string(userJSON)) + } + + // 设置 multipart Content-Type(含 boundary) + contentType := mw.FormDataContentType() + client.SetHeader("Content-Type", contentType) + + g.Log().Debugf(ctx, "[UploadToMinIO] 请求URL: oss/file/uploadFile, 文件: %s, Body大小: %d bytes", + localFilePath, buf.Len()) + + // 发送 multipart 请求(原始字节流) + response, err := client.Post(ctx, "oss/file/uploadFile", buf.Bytes()) + if err != nil { + glog.Error(ctx, err) + return nil, fmt.Errorf("调用OSS上传服务失败: %v", err) + } + defer response.Close() + + body := response.ReadAll() + + // 调试:打印原始响应 + g.Log().Debugf(ctx, "[UploadToMinIO] OSS原始响应: %s", string(body)) + + // 解析标准 GoFrame 响应格式 {code, message, data} + var apiResp struct { + Code int `json:"code"` + Message string `json:"message"` + Data *cutUploadFileRes `json:"data"` + } + if err = json.Unmarshal(body, &apiResp); err != nil { + return nil, fmt.Errorf("响应解析失败: %v", err) + } + + if apiResp.Code != 200 && apiResp.Code != 0 { + return nil, fmt.Errorf("OSS上传失败: %s", apiResp.Message) + } + + g.Log().Infof(ctx, "[UploadToMinIO] 上传成功 fileURL=%s size=%d", apiResp.Data.FileURL, apiResp.Data.FileSize) + return apiResp.Data, nil +} + +// ---------- 异步任务管理 ---------- + +// CreateAsyncTask 创建异步剪切任务,返回 taskId,后台处理 +func (s *cutService) CreateAsyncTask(ctx context.Context, videoURL string, shots []CutShot, upload bool, callbackURL string) (string, error) { + if videoURL == "" { + return "", fmt.Errorf("视频URL不能为空") + } + + // 过滤无效片段 + var validShots []CutShot + for _, shot := range shots { + if shot.End > shot.Start { + validShots = append(validShots, shot) + } + } + if len(validShots) == 0 { + return "", fmt.Errorf("没有有效的分镜片段") + } + + // 序列化 shots JSON + shotsJSON, err := json.Marshal(shots) + if err != nil { + return "", fmt.Errorf("序列化分镜失败: %v", err) + } + + taskID := "cut_" + guid.S() + task := &entity.CutTask{ + TaskID: taskID, + VideoURL: videoURL, + ShotsJSON: string(shotsJSON), + Status: "pending", + CallbackURL: callbackURL, + } + if _, err := dao.CutTask.Insert(ctx, task); err != nil { + return "", fmt.Errorf("创建任务失败: %v", err) + } + + // 提取调用方用户信息,传给 goroutine + user := getUserFromCtx(ctx) + + g.Log().Infof(ctx, "[异步剪切] 创建任务 %s, 视频=%s, 片段数=%d, 回调=%s", taskID, videoURL, len(validShots), callbackURL) + + // 异步处理 + go s.processAsyncTask(user, taskID, videoURL, shots, upload, callbackURL) + + return taskID, nil +} + +// GetTaskResult 查询异步任务结果 +func (s *cutService) GetTaskResult(ctx context.Context, taskID string) (*dto.GetCutTaskRes, error) { + task, err := dao.CutTask.GetByTaskID(ctx, taskID) + if err != nil { + return nil, fmt.Errorf("查询任务失败: %v", err) + } + if task == nil { + return nil, fmt.Errorf("任务不存在: %s", taskID) + } + + return dao.CutEntityToTaskRes(task), nil +} + +// processAsyncTask 后台处理异步剪切任务 +func (s *cutService) processAsyncTask(user *beans.User, taskID string, videoURL string, shots []CutShot, upload bool, callbackURL string) { + bgCtx := context.Background() + bgCtx = context.WithValue(bgCtx, "user", user) + + dao.CutTask.UpdateRunning(bgCtx, taskID) + + defer func() { + if r := recover(); r != nil { + errMsg := fmt.Sprintf("异步剪切异常: %v", r) + g.Log().Errorf(bgCtx, "[异步剪切 %s] %s", taskID, errMsg) + dao.CutTask.UpdateError(bgCtx, taskID, errMsg) + s.cutCallback(bgCtx, taskID, callbackURL) + } + }() + + // 下载视频 + tempDir := g.Cfg().MustGet(bgCtx, "ffmpeg.temp_dir", "resource/temp").String() + os.MkdirAll(tempDir, 0755) + + savePath, dlErr := downloadFile(bgCtx, videoURL, tempDir) + if dlErr != nil { + errMsg := fmt.Sprintf("下载视频失败: %v", dlErr) + dao.CutTask.UpdateError(bgCtx, taskID, errMsg) + s.cutCallback(bgCtx, taskID, callbackURL) + return + } + defer os.Remove(savePath) + + // 执行剪切 + cutErr := s.executeCut(bgCtx, taskID, savePath, shots, upload) + if cutErr != nil { + dao.CutTask.UpdateError(bgCtx, taskID, cutErr.Error()) + s.cutCallback(bgCtx, taskID, callbackURL) + return + } + + g.Log().Infof(bgCtx, "[异步剪切 %s] 完成", taskID) + + if callbackURL != "" { + s.cutCallback(bgCtx, taskID, callbackURL) + } +} + +// executeCut 执行剪切并更新任务状态 +func (s *cutService) executeCut(ctx context.Context, taskID string, videoPath string, shots []CutShot, upload bool) error { + tempDir := filepath.Dir(videoPath) + outputPath := filepath.Join(tempDir, + fmt.Sprintf("cut_%s_x%d_%s.mp4", taskID, len(shots), time.Now().Format("150405"))) + + res, cutErr := s.Cut(ctx, &CutReq{ + VideoPath: videoPath, + Shots: shots, + OutputPath: outputPath, + Upload: upload, + }) + if cutErr != nil { + os.Remove(outputPath) + return cutErr + } + + // 更新数据库为成功 + fileName := filepath.Base(outputPath) + fileFormat := "" + if idx := strings.LastIndex(fileName, "."); idx > 0 { + fileFormat = fileName[idx+1:] + } + dao.CutTask.UpdateSuccess(ctx, taskID, + res.FileURL, res.FileSize, fileName, fileFormat, + "", res.DurationStr) + + os.Remove(outputPath) + return nil +} + +// cutCallback 回调通知 +func (s *cutService) cutCallback(ctx context.Context, taskID, callbackURL string) { + if callbackURL == "" { + return + } + + task, err := dao.CutTask.GetByTaskID(ctx, taskID) + if err != nil || task == nil { + g.Log().Errorf(ctx, "[异步剪切回调 %s] 查询任务失败: %v", taskID, err) + return + } + + payload := map[string]interface{}{ + "taskId": task.TaskID, + "status": task.Status, + } + if task.Status == "success" { + payload["fileURL"] = task.FileURL + payload["fileSize"] = task.FileSize + } + if task.Status == "failed" { + payload["errorMessage"] = task.ErrorMessage + } + + body, _ := json.Marshal(payload) + g.Log().Infof(ctx, "[异步剪切回调 %s] 状态=%s, 目标=%s", taskID, task.Status, callbackURL) + + req, _ := http.NewRequest("POST", callbackURL, bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + // 透传调用方用户信息 + cbUser := getUserFromCtx(ctx) + userJSON, _ := json.Marshal(cbUser) + req.Header.Set("X-User-Info", string(userJSON)) + + client := &http.Client{Timeout: 2 * time.Minute} + resp, reqErr := client.Do(req) + if reqErr != nil { + g.Log().Errorf(ctx, "[异步剪切回调 %s] 请求失败: %v", taskID, reqErr) + return + } + defer resp.Body.Close() + respBody, _ := io.ReadAll(resp.Body) + g.Log().Infof(ctx, "[异步剪切回调 %s] 响应 status=%d, body=%s", taskID, resp.StatusCode, string(respBody)) +} diff --git a/sql/cut_task.sql b/sql/cut_task.sql new file mode 100644 index 0000000..80d35b7 --- /dev/null +++ b/sql/cut_task.sql @@ -0,0 +1,42 @@ +-- cut_task 视频分镜剪切异步任务表 +CREATE TABLE IF NOT EXISTS cut_task ( + id BIGSERIAL NOT NULL, + tenant_id BIGINT NOT NULL DEFAULT 0, + task_id VARCHAR(64) NOT NULL, + video_url TEXT NOT NULL, + shots_json TEXT NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'pending', + file_url TEXT NOT NULL DEFAULT '', + file_size BIGINT NOT NULL DEFAULT 0, + file_name VARCHAR(255) NOT NULL DEFAULT '', + file_format VARCHAR(32) NOT NULL DEFAULT '', + file_address_prefix TEXT NOT NULL DEFAULT '', + duration_str VARCHAR(32) NOT NULL DEFAULT '', + error_message TEXT, + callback_url VARCHAR(500) NOT NULL DEFAULT '', + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + deleted_at TIMESTAMP WITH TIME ZONE, + PRIMARY KEY (id) +); + +COMMENT ON TABLE cut_task IS '视频分镜剪切异步任务表'; +COMMENT ON COLUMN cut_task.task_id IS '任务唯一标识'; +COMMENT ON COLUMN cut_task.video_url IS '原始视频URL'; +COMMENT ON COLUMN cut_task.shots_json IS '分镜JSON数组'; +COMMENT ON COLUMN cut_task.status IS '任务状态:pending/running/success/failed'; +COMMENT ON COLUMN cut_task.file_url IS 'MinIO文件访问路径'; +COMMENT ON COLUMN cut_task.file_size IS '文件大小(字节)'; +COMMENT ON COLUMN cut_task.file_name IS '文件名'; +COMMENT ON COLUMN cut_task.file_format IS '文件格式'; +COMMENT ON COLUMN cut_task.file_address_prefix IS 'MinIO地址前缀'; +COMMENT ON COLUMN cut_task.duration_str IS '剪切后时长'; +COMMENT ON COLUMN cut_task.error_message IS '错误信息'; +COMMENT ON COLUMN cut_task.callback_url IS '回调地址'; +COMMENT ON COLUMN cut_task.created_at IS '创建时间'; +COMMENT ON COLUMN cut_task.updated_at IS '更新时间'; +COMMENT ON COLUMN cut_task.deleted_at IS '删除时间(软删除)'; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_cut_task_task_id ON cut_task(task_id); +CREATE INDEX IF NOT EXISTS idx_cut_task_status ON cut_task(status); +CREATE INDEX IF NOT EXISTS idx_cut_task_created_at ON cut_task(created_at);