package video import ( "bytes" "context" "encoding/json" "fmt" "io" "mime/multipart" "net/http" "os" "os/exec" "path/filepath" "strconv" "strings" "time" dao "media/dao/video" dto "media/model/dto/video" entity "media/model/entity/video" "gitea.redpowerfuture.com/red-future/common/beans" commonHttp "gitea.redpowerfuture.com/red-future/common/http" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/util/guid" ) type cutService struct{} // Cut 视频分镜剪切服务单例 var Cut = new(cutService) // CutShot 单个分镜片段(转换为秒后内部存储) type CutShot struct { Start float64 `json:"start"` End float64 `json:"end"` } // CutReq 视频剪切请求 type CutReq struct { VideoPath string // 输入视频文件路径 Shots []CutShot // 分镜片段列表(按此顺序剪切拼接) OutputPath string // 输出视频文件路径,空则自动生成 Upload bool // 是否上传到MinIO } // CutRes 视频剪切响应 type CutRes struct { OutputPath string `json:"outputPath"` // 输出文件路径 FileSize int64 `json:"fileSize"` // 文件大小(bytes) Duration float64 `json:"duration"` // 总时长(秒) DurationStr string `json:"durationStr"` // 可读时长 ShotsCount int `json:"shotsCount"` // 输出片段数 FileURL string `json:"fileURL"` // MinIO访问地址(上传后返回) } // parseTimeStr 解析 HH:MM:SS.mmm 格式为秒 func parseTimeStr(timeStr string) (float64, error) { parts := strings.Split(timeStr, ":") if len(parts) != 3 { return 0, fmt.Errorf("invalid time format: %s, expected HH:MM:SS.mmm", timeStr) } h, err := strconv.Atoi(parts[0]) if err != nil { return 0, err } m, err := strconv.Atoi(parts[1]) if err != nil { return 0, err } // 处理秒和毫秒 secParts := strings.SplitN(parts[2], ".", 2) s, err := strconv.Atoi(secParts[0]) if err != nil { return 0, err } ms := 0 if len(secParts) > 1 { ms, _ = strconv.Atoi(secParts[1]) } total := float64(h)*3600 + float64(m)*60 + float64(s) + float64(ms)/1000.0 return total, nil } // ConvertScenes 将 DTO 的分镜片段数组转换为内部的 CutShot 列表 // 同时兼容两种格式: // 1. 如果 scene.Start > 0 && scene.End > scene.Start → 直接使用 // 2. 否则 → 解析 scene.StartTimeStr / scene.EndTimeStr func ConvertScenes(scenes []dto.CutScene) ([]CutShot, error) { var shots []CutShot for _, scene := range scenes { var ( start float64 end float64 err error ) // 优先使用 start/end 直接给出的秒数 if scene.Start > 0 && scene.End > scene.Start { start = scene.Start end = scene.End } else if scene.StartTimeStr != "" && scene.EndTimeStr != "" { // 否则解析时间字符串 HH:MM:SS.mmm start, err = parseTimeStr(scene.StartTimeStr) if err != nil { return nil, fmt.Errorf("parse start time %s: %v", scene.StartTimeStr, err) } end, err = parseTimeStr(scene.EndTimeStr) if err != nil { return nil, fmt.Errorf("parse end time %s: %v", scene.EndTimeStr, err) } } else { // 没有可用的时间信息,跳过 continue } if end > start { shots = append(shots, CutShot{Start: start, End: end}) } } return shots, nil } // Cut 根据分镜剪切多个片段并拼接输出 func (s *cutService) Cut(ctx context.Context, req *CutReq) (res *CutRes, err error) { g.Log().Infof(ctx, "[Cut] 服务层收到请求: video=%s, shots=%d, upload=%v", req.VideoPath, len(req.Shots), req.Upload) // 校验输入 if _, err := os.Stat(req.VideoPath); os.IsNotExist(err) { return nil, fmt.Errorf("输入视频文件不存在: %s", req.VideoPath) } // 过滤掉无效片段(start >= end) var validShots []CutShot for _, shot := range req.Shots { if shot.End > shot.Start { validShots = append(validShots, shot) } } if len(validShots) == 0 { return nil, fmt.Errorf("没有有效的分镜片段(所有片段 start >= end)") } ffmpegPath, err := s.getFFmpegPath() if err != nil { return nil, err } // 生成输出路径 outputPath := req.OutputPath if outputPath == "" { outputDir := filepath.Dir(req.VideoPath) baseName := filepath.Base(req.VideoPath) ext := filepath.Ext(baseName) stem := strings.TrimSuffix(baseName, ext) stemRunes := []rune(stem) if len(stemRunes) > 20 { stemRunes = stemRunes[:20] } outputPath = filepath.Join(outputDir, fmt.Sprintf("cut_%s_x%d_%s%s", string(stemRunes), len(validShots), time.Now().Format("150405"), ext)) } // 执行剪切拼接 err = s.cutByFilterComplex(ctx, ffmpegPath, req.VideoPath, validShots, outputPath) if err != nil { return nil, fmt.Errorf("视频剪切失败: %v", err) } // 获取输出文件信息 stat, statErr := os.Stat(outputPath) if statErr != nil { return nil, fmt.Errorf("输出文件异常: %v", statErr) } // 获取时长 duration, _ := s.getVideoDuration(ctx, ffmpegPath, outputPath) res = &CutRes{ OutputPath: outputPath, FileSize: stat.Size(), Duration: duration, DurationStr: formatDuration(duration), ShotsCount: len(validShots), } // 如果需要上传到 MinIO(用独立 context,避免 HTTP 断开后 ctx 被取消,同时保留用户信息) if req.Upload { uploadCtx := context.WithValue(context.Background(), "user", getUserFromCtx(ctx)) uploadRes, uploadErr := s.UploadToMinIO(uploadCtx, outputPath) if uploadErr != nil { return nil, fmt.Errorf("上传到MinIO失败: %v", uploadErr) } res.FileURL = uploadRes.FileURL } return } // cutByFilterComplex 使用 filter_complex 一步完成剪切拼接 // 对每个片段用 trim/atrim 截取,然后 concat 拼接 func (s *cutService) cutByFilterComplex(ctx context.Context, ffmpegPath, inputPath string, shots []CutShot, outputPath string) error { n := len(shots) // 检测视频是否有音频轨道 hasAudio, err := s.checkVideoAudio(ctx, ffmpegPath, inputPath) if err != nil { g.Log().Debugf(ctx, "[Cut] 检测音频失败: %v", err) hasAudio = false } // 获取视频总时长,用于裁剪超出的 end videoDuration, _ := s.getVideoDuration(ctx, ffmpegPath, inputPath) // 构建 filter_complex var filterParts []string var concatInputs []string for i, shot := range shots { // 确保 end 不超过视频时长 end := shot.End if videoDuration > 0 && end > videoDuration { end = videoDuration } if end <= shot.Start { continue } // 视频: trim 截取,重置 PTS filterParts = append(filterParts, fmt.Sprintf( "[0:v]trim=start=%.3f:end=%.3f,setpts=PTS-STARTPTS[v%d]", shot.Start, end, i, )) // 音频处理 if hasAudio { // 有音频轨道,用 atrim 截取 filterParts = append(filterParts, fmt.Sprintf( "[0:a]atrim=start=%.3f:end=%.3f,asetpts=PTS-STARTPTS[a%d]", shot.Start, end, i, )) concatInputs = append(concatInputs, fmt.Sprintf("[v%d][a%d]", i, i)) } else { // 无音频轨道,为这个片段生成静音音频 dur := end - shot.Start filterParts = append(filterParts, fmt.Sprintf( "aevalsrc=0:n=2:s=44100:d=%.2f[a%d]", dur, i, )) concatInputs = append(concatInputs, fmt.Sprintf("[v%d][a%d]", i, i)) } } // 拼接所有片段 filterStr := fmt.Sprintf("%s;%sconcat=n=%d:v=1:a=1[outv][outa]", strings.Join(filterParts, ";"), strings.Join(concatInputs, ""), n, ) // 构造完整参数 args := []string{ "-i", inputPath, "-filter_complex", filterStr, "-map", "[outv]", "-map", "[outa]", "-c:v", "h264_videotoolbox", "-b:v", "5M", "-allow_sw", "true", "-c:a", "aac", "-y", outputPath, } // 调试:记录完整命令 g.Log().Debugf(ctx, "[Cut] ffmpeg 命令: %s %v", ffmpegPath, args) // 保存 filter 用于调试 outputDir := filepath.Dir(outputPath) filterFile := filepath.Join(outputDir, "cut_filter.txt") os.WriteFile(filterFile, []byte(filterStr), 0644) defer os.Remove(filterFile) // 使用独立 context,避免 HTTP 请求超时导致 ffmpeg 被 SIGKILL bgCtx := context.Background() cmd := exec.CommandContext(bgCtx, ffmpegPath, args...) outputBytes, err := cmd.CombinedOutput() if err != nil { return fmt.Errorf("ffmpeg 执行失败: %v\n日志:\n%s", err, string(outputBytes)) } return nil } // getFFmpegPath 获取 FFmpeg 路径 func (s *cutService) getFFmpegPath() (string, error) { ffmpegPath := g.Cfg().MustGet(context.Background(), "ffmpeg.path", "").String() if ffmpegPath != "" { if _, err := os.Stat(ffmpegPath); err == nil { return ffmpegPath, nil } } path, err := exec.LookPath("ffmpeg") if err != nil { return "", fmt.Errorf("未找到 ffmpeg") } return path, nil } // getVideoDuration 获取视频时长 func (s *cutService) getVideoDuration(ctx context.Context, ffmpegPath, videoPath string) (float64, error) { ffprobePath := filepath.Join(filepath.Dir(ffmpegPath), "ffprobe") if _, err := os.Stat(ffprobePath); os.IsNotExist(err) { ffprobePath = "ffprobe" } cmd := exec.CommandContext(ctx, ffprobePath, "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", videoPath, ) output, err := cmd.Output() if err != nil { return 0, err } var duration float64 fmt.Sscanf(strings.TrimSpace(string(output)), "%f", &duration) return duration, nil } // checkVideoAudio 检测视频文件是否有音频轨道 func (s *cutService) checkVideoAudio(ctx context.Context, ffmpegPath, videoPath string) (bool, error) { ffprobePath := filepath.Join(filepath.Dir(ffmpegPath), "ffprobe") if _, err := os.Stat(ffprobePath); os.IsNotExist(err) { ffprobePath = "ffprobe" } cmd := exec.CommandContext(ctx, ffprobePath, "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=codec_type", "-of", "default=noprint_wrappers=1:nokey=1", videoPath, ) output, err := cmd.Output() if err != nil || len(strings.TrimSpace(string(output))) == 0 { return false, err } return true, nil } // hasVideoAudio 别名保持命名一致(从concat_service复制) func (s *cutService) hasVideoAudio(ctx context.Context, ffmpegPath, videoPath string) bool { has, _ := s.checkVideoAudio(ctx, ffmpegPath, videoPath) return has } // getVideoResolution 获取视频分辨率(复用) func (s *cutService) getVideoResolution(ctx context.Context, ffmpegPath, videoPath string) (width, height int, err error) { ffprobePath := filepath.Join(filepath.Dir(ffmpegPath), "ffprobe") if _, err := os.Stat(ffprobePath); os.IsNotExist(err) { ffprobePath = "ffprobe" } cmd := exec.CommandContext(ctx, ffprobePath, "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=p=0", videoPath, ) output, err := cmd.Output() if err != nil { return 0, 0, err } fmt.Sscanf(strings.TrimSpace(string(output)), "%d,%d", &width, &height) return } // cutUploadFileRes 上传文件响应(类型别名避免重定义) type cutUploadFileRes struct { FileURL string `json:"fileURL" dc:"上传地址"` FileSize int `json:"fileSize" dc:"文件大小"` FileName string `json:"fileName" dc:"文件名称"` FileFormat string `json:"fileFormat" dc:"文件格式"` FileAddressPrefix string `json:"fileAddressPrefix" dc:"文件地址前缀"` } // UploadToMinIO 通过 OSS 微服务的 uploadFile 接口上传到 MinIO(multipart/form-data) func (s *cutService) UploadToMinIO(ctx context.Context, localFilePath string) (*cutUploadFileRes, error) { // 构建 multipart/form-data 表单 var buf bytes.Buffer mw := multipart.NewWriter(&buf) file, err := os.Open(localFilePath) if err != nil { return nil, fmt.Errorf("打开文件失败: %v", err) } defer file.Close() fw, err := mw.CreateFormFile("file", filepath.Base(localFilePath)) if err != nil { return nil, fmt.Errorf("创建表单文件字段失败: %v", err) } if _, err = io.Copy(fw, file); err != nil { return nil, fmt.Errorf("写入文件内容失败: %v", err) } mw.Close() // 使用 commonHttp 的客户端(含 Consul 服务发现),大文件上传设置长超时 client := commonHttp.Httpclient.Clone() // 必须单独设置 Transport.ResponseHeaderTimeout,SetTimeout 只设 Client.Timeout newTransport := http.DefaultTransport.(*http.Transport).Clone() newTransport.ResponseHeaderTimeout = 5 * time.Minute client.Transport = newTransport client.SetTimeout(10 * time.Minute) // 透传认证 headers(优先从 HTTP 请求头取) hasAuthHeader := false if r := g.RequestFromCtx(ctx); r != nil { for k, v := range r.Header { client.SetHeader(k, v[0]) if strings.EqualFold(k, "Authorization") || strings.EqualFold(k, "X-User-Info") { hasAuthHeader = true } } } // 无 HTTP 请求时(异步 goroutine),从 context 的用户信息构造 header if !hasAuthHeader { uploadUser := getUserFromCtx(ctx) userJSON, _ := json.Marshal(uploadUser) client.SetHeader("X-User-Info", string(userJSON)) } // 设置 multipart Content-Type(含 boundary) contentType := mw.FormDataContentType() client.SetHeader("Content-Type", contentType) g.Log().Debugf(ctx, "[UploadToMinIO] 请求URL: oss/file/uploadFile, 文件: %s, Body大小: %d bytes", localFilePath, buf.Len()) // 发送 multipart 请求(原始字节流) response, err := client.Post(ctx, "oss/file/uploadFile", buf.Bytes()) if err != nil { glog.Error(ctx, err) return nil, fmt.Errorf("调用OSS上传服务失败: %v", err) } defer response.Close() body := response.ReadAll() // 调试:打印原始响应 g.Log().Debugf(ctx, "[UploadToMinIO] OSS原始响应: %s", string(body)) // 解析标准 GoFrame 响应格式 {code, message, data} var apiResp struct { Code int `json:"code"` Message string `json:"message"` Data *cutUploadFileRes `json:"data"` } if err = json.Unmarshal(body, &apiResp); err != nil { return nil, fmt.Errorf("响应解析失败: %v", err) } if apiResp.Code != 200 && apiResp.Code != 0 { return nil, fmt.Errorf("OSS上传失败: %s", apiResp.Message) } g.Log().Infof(ctx, "[UploadToMinIO] 上传成功 fileURL=%s size=%d", apiResp.Data.FileURL, apiResp.Data.FileSize) return apiResp.Data, nil } // ---------- 异步任务管理 ---------- // CreateAsyncTask 创建异步剪切任务,返回 taskId,后台处理 func (s *cutService) CreateAsyncTask(ctx context.Context, videoURL string, shots []CutShot, upload bool, callbackURL string) (string, error) { if videoURL == "" { return "", fmt.Errorf("视频URL不能为空") } // 过滤无效片段 var validShots []CutShot for _, shot := range shots { if shot.End > shot.Start { validShots = append(validShots, shot) } } if len(validShots) == 0 { return "", fmt.Errorf("没有有效的分镜片段") } // 序列化 shots JSON shotsJSON, err := json.Marshal(shots) if err != nil { return "", fmt.Errorf("序列化分镜失败: %v", err) } taskID := "cut_" + guid.S() task := &entity.CutTask{ TaskID: taskID, VideoURL: videoURL, ShotsJSON: string(shotsJSON), Status: "pending", CallbackURL: callbackURL, } if _, err := dao.CutTask.Insert(ctx, task); err != nil { return "", fmt.Errorf("创建任务失败: %v", err) } // 提取调用方用户信息,传给 goroutine user := getUserFromCtx(ctx) g.Log().Infof(ctx, "[异步剪切] 创建任务 %s, 视频=%s, 片段数=%d, 回调=%s", taskID, videoURL, len(validShots), callbackURL) // 异步处理 go s.processAsyncTask(user, taskID, videoURL, shots, upload, callbackURL) return taskID, nil } // GetTaskResult 查询异步任务结果 func (s *cutService) GetTaskResult(ctx context.Context, taskID string) (*dto.GetCutTaskRes, error) { task, err := dao.CutTask.GetByTaskID(ctx, taskID) if err != nil { return nil, fmt.Errorf("查询任务失败: %v", err) } if task == nil { return nil, fmt.Errorf("任务不存在: %s", taskID) } return dao.CutEntityToTaskRes(task), nil } // processAsyncTask 后台处理异步剪切任务 func (s *cutService) processAsyncTask(user *beans.User, taskID string, videoURL string, shots []CutShot, upload bool, callbackURL string) { bgCtx := context.Background() bgCtx = context.WithValue(bgCtx, "user", user) dao.CutTask.UpdateRunning(bgCtx, taskID) defer func() { if r := recover(); r != nil { errMsg := fmt.Sprintf("异步剪切异常: %v", r) g.Log().Errorf(bgCtx, "[异步剪切 %s] %s", taskID, errMsg) dao.CutTask.UpdateError(bgCtx, taskID, errMsg) s.cutCallback(bgCtx, taskID, callbackURL) } }() // 下载视频 tempDir := g.Cfg().MustGet(bgCtx, "ffmpeg.temp_dir", "resource/temp").String() os.MkdirAll(tempDir, 0755) savePath, dlErr := downloadFile(bgCtx, videoURL, tempDir) if dlErr != nil { errMsg := fmt.Sprintf("下载视频失败: %v", dlErr) dao.CutTask.UpdateError(bgCtx, taskID, errMsg) s.cutCallback(bgCtx, taskID, callbackURL) return } defer os.Remove(savePath) // 执行剪切 cutErr := s.executeCut(bgCtx, taskID, savePath, shots, upload) if cutErr != nil { dao.CutTask.UpdateError(bgCtx, taskID, cutErr.Error()) s.cutCallback(bgCtx, taskID, callbackURL) return } g.Log().Infof(bgCtx, "[异步剪切 %s] 完成", taskID) if callbackURL != "" { s.cutCallback(bgCtx, taskID, callbackURL) } } // executeCut 执行剪切并更新任务状态 func (s *cutService) executeCut(ctx context.Context, taskID string, videoPath string, shots []CutShot, upload bool) error { tempDir := filepath.Dir(videoPath) outputPath := filepath.Join(tempDir, fmt.Sprintf("cut_%s_x%d_%s.mp4", taskID, len(shots), time.Now().Format("150405"))) res, cutErr := s.Cut(ctx, &CutReq{ VideoPath: videoPath, Shots: shots, OutputPath: outputPath, Upload: upload, }) if cutErr != nil { os.Remove(outputPath) return cutErr } // 更新数据库为成功 fileName := filepath.Base(outputPath) fileFormat := "" if idx := strings.LastIndex(fileName, "."); idx > 0 { fileFormat = fileName[idx+1:] } dao.CutTask.UpdateSuccess(ctx, taskID, res.FileURL, res.FileSize, fileName, fileFormat, "", res.DurationStr) os.Remove(outputPath) return nil } // cutCallback 回调通知 func (s *cutService) cutCallback(ctx context.Context, taskID, callbackURL string) { if callbackURL == "" { return } task, err := dao.CutTask.GetByTaskID(ctx, taskID) if err != nil || task == nil { g.Log().Errorf(ctx, "[异步剪切回调 %s] 查询任务失败: %v", taskID, err) return } payload := map[string]interface{}{ "taskId": task.TaskID, "status": task.Status, } if task.Status == "success" { payload["fileURL"] = task.FileURL payload["fileSize"] = task.FileSize } if task.Status == "failed" { payload["errorMessage"] = task.ErrorMessage } body, _ := json.Marshal(payload) g.Log().Infof(ctx, "[异步剪切回调 %s] 状态=%s, 目标=%s", taskID, task.Status, callbackURL) req, _ := http.NewRequest("POST", callbackURL, bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") // 透传调用方用户信息 cbUser := getUserFromCtx(ctx) userJSON, _ := json.Marshal(cbUser) req.Header.Set("X-User-Info", string(userJSON)) client := &http.Client{Timeout: 2 * time.Minute} resp, reqErr := client.Do(req) if reqErr != nil { g.Log().Errorf(ctx, "[异步剪切回调 %s] 请求失败: %v", taskID, reqErr) return } defer resp.Body.Close() respBody, _ := io.ReadAll(resp.Body) g.Log().Infof(ctx, "[异步剪切回调 %s] 响应 status=%d, body=%s", taskID, resp.StatusCode, string(respBody)) }