package video import ( "bytes" "context" "encoding/json" "fmt" "io" "mime/multipart" "net/http" "net/url" "os" "os/exec" "path/filepath" "strings" "time" dao "media/dao/video" dto "media/model/dto/video" entity "media/model/entity/video" "gitea.com/red-future/common/beans" commonHttp "gitea.com/red-future/common/http" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/util/guid" ) type concatService struct{} // Concat 视频拼接服务单例 var Concat = new(concatService) // ConcatReq 视频拼接请求 type ConcatReq struct { VideoPaths []string // 视频文件路径列表(按此顺序拼接) OutputPath string // 输出视频文件路径,空则自动生成 Method string // 拼接方式: auto/fast/reencode,默认 auto Upload bool // 是否上传到MinIO } // ConcatRes 视频拼接响应 type ConcatRes struct { OutputPath string `json:"outputPath"` // 输出文件路径 FileSize int64 `json:"fileSize"` // 文件大小(bytes) Duration float64 `json:"duration"` // 拼接后总时长(秒) DurationStr string `json:"durationStr"` // 可读时长 MethodUsed string `json:"methodUsed"` // 实际使用的拼接方式 InputFiles int `json:"inputFiles"` // 输入文件数 FileURL string `json:"fileURL"` // MinIO访问地址(上传后返回) } // Concat 拼接多个视频为一个 func (s *concatService) Concat(ctx context.Context, req *ConcatReq) (res *ConcatRes, err error) { g.Log().Infof(ctx, "[Concat] 服务层收到请求: videoPaths=%v, method=%s, upload=%v", req.VideoPaths, req.Method, req.Upload) if len(req.VideoPaths) < 2 { return nil, fmt.Errorf("至少需要2个视频才能拼接") } // 校验所有视频文件存在 for i, p := range req.VideoPaths { if _, err := os.Stat(p); os.IsNotExist(err) { return nil, fmt.Errorf("第%d个视频文件不存在: %s", i+1, p) } } ffmpegPath, err := s.getFFmpegPath() if err != nil { return nil, err } // 生成输出路径 outputPath := req.OutputPath if outputPath == "" { outputDir := filepath.Dir(req.VideoPaths[0]) // 用第一个输入文件名 + 拼接数 + 时间戳,溯源更清晰 baseName := filepath.Base(req.VideoPaths[0]) ext := filepath.Ext(baseName) stem := strings.TrimSuffix(baseName, ext) stemRunes := []rune(stem) if len(stemRunes) > 20 { stemRunes = stemRunes[:20] } outputPath = filepath.Join(outputDir, fmt.Sprintf("concat_%s_x%d_%s%s", string(stemRunes), len(req.VideoPaths), time.Now().Format("150405"), ext)) } method := req.Method if method == "" { method = "auto" } var methodUsed string switch method { case "fast": // 无损拼接(要求同编码参数,速度快但可能黑屏) err = s.concatByDemuxer(ctx, ffmpegPath, req.VideoPaths, outputPath) methodUsed = "concat demuxer (无损)" default: // 重编码拼接(自动归一化分辨率/音频,兼容所有视频) err = s.concatByFilter(ctx, ffmpegPath, req.VideoPaths, outputPath) methodUsed = "concat filter (重编码)" } if err != nil { return nil, fmt.Errorf("视频拼接失败: %v", err) } // 获取输出文件信息 stat, statErr := os.Stat(outputPath) if statErr != nil { return nil, fmt.Errorf("输出文件异常: %v", statErr) } // 获取时长 duration, _ := s.getVideoDuration(ctx, ffmpegPath, outputPath) res = &ConcatRes{ OutputPath: outputPath, FileSize: stat.Size(), Duration: duration, DurationStr: formatDuration(duration), MethodUsed: methodUsed, InputFiles: len(req.VideoPaths), } // 如果需要上传到 MinIO(用独立 context,避免 HTTP 断开后 ctx 被取消) if req.Upload { uploadCtx := context.Background() uploadRes, uploadErr := s.UploadToMinIO(uploadCtx, outputPath) if uploadErr != nil { return nil, fmt.Errorf("上传到MinIO失败: %v", uploadErr) } res.FileURL = uploadRes.FileURL } return } // concatByDemuxer 使用 concat demuxer 无损拼接(要求同编码参数) func (s *concatService) concatByDemuxer(ctx context.Context, ffmpegPath string, inputs []string, output string) error { // 创建文件列表 fileListPath := filepath.Join(filepath.Dir(output), "concat_list.txt") var lines []string for _, p := range inputs { lines = append(lines, fmt.Sprintf("file '%s'", p)) } if err := os.WriteFile(fileListPath, []byte(strings.Join(lines, "\n")+"\n"), 0644); err != nil { return fmt.Errorf("创建文件列表失败: %v", err) } defer os.Remove(fileListPath) args := []string{ "-f", "concat", "-safe", "0", "-i", fileListPath, "-c", "copy", // 直接复制流,不重编码 "-y", output, } // 使用独立 context,避免 HTTP 请求超时导致 ffmpeg 被 SIGKILL bgCtx := context.Background() cmd := exec.CommandContext(bgCtx, ffmpegPath, args...) outputBytes, err := cmd.CombinedOutput() if err != nil { return fmt.Errorf("ffmpeg demuxer 失败: %v\n%s", err, string(outputBytes)) } return nil } // concatByFilter 使用 concat filter 重编码拼接(自动归一化分辨率/音频参数) func (s *concatService) concatByFilter(ctx context.Context, ffmpegPath string, inputs []string, output string) error { n := len(inputs) // 1. 获取所有视频的分辨率,确定统一输出尺寸 maxW, maxH := 0, 0 var inputMeta []struct{ w, h int } for _, p := range inputs { w, h, _ := s.getVideoResolution(ctx, ffmpegPath, p) inputMeta = append(inputMeta, struct{ w, h int }{w, h}) if w > maxW { maxW = w } if h > maxH { maxH = h } } // 保底 if maxW == 0 { maxW = 1920 } if maxH == 0 { maxH = 1080 } // 2. 构建输入参数 var inputArgs []string for _, p := range inputs { inputArgs = append(inputArgs, "-i", p) } // 3. 检测每个视频是否有音频轨道及时长 hasAudio := make([]bool, n) videoDuration := make([]float64, n) for i, p := range inputs { hasAudio[i] = s.hasVideoAudio(ctx, ffmpegPath, p) videoDuration[i], _ = s.getVideoDuration(ctx, ffmpegPath, p) } // 4. 构建 filter_complex:每个视频 scale+pad 到统一尺寸,然后 concat var filterParts []string var concatInputs []string for i := 0; i < n; i++ { filterParts = append(filterParts, fmt.Sprintf( "[%d:v]scale=%d:%d:force_original_aspect_ratio=decrease,pad=%d:%d:(ow-iw)/2:(oh-ih)/2,setsar=1,fps=30[v%d]", i, maxW, maxH, maxW, maxH, i, )) if hasAudio[i] { filterParts = append(filterParts, fmt.Sprintf( "[%d:a]aresample=44100[a%d]", i, i, )) concatInputs = append(concatInputs, fmt.Sprintf("[v%d][a%d]", i, i)) } else { // 无音频轨道,生成匹配视频时长的静音音频 dur := videoDuration[i] if dur <= 0 { dur = 30 // 保底30秒 } filterParts = append(filterParts, fmt.Sprintf( "aevalsrc=0:n=2:s=44100:d=%.2f[a%d]", dur, i, )) concatInputs = append(concatInputs, fmt.Sprintf("[v%d][a%d]", i, i)) } } filterStr := fmt.Sprintf("%s;%sconcat=n=%d:v=1:a=1[outv][outa]", strings.Join(filterParts, ";"), strings.Join(concatInputs, ""), n) outputDir := filepath.Dir(output) args := append(inputArgs, "-filter_complex", filterStr, "-map", "[outv]", "-map", "[outa]", "-c:v", "h264_videotoolbox", "-b:v", "5M", "-allow_sw", "true", "-c:a", "aac", "-y", output, ) // 调试:记录完整命令 g.Log().Debugf(ctx, "concat filter 命令: %s %v", ffmpegPath, args) // 保存 filter graph 用于调试 filterFile := filepath.Join(outputDir, "concat_filter.txt") os.WriteFile(filterFile, []byte(filterStr), 0644) defer os.Remove(filterFile) // 使用独立 context,避免 HTTP 请求超时导致 ffmpeg 被 SIGKILL bgCtx := context.Background() cmd := exec.CommandContext(bgCtx, ffmpegPath, args...) outputBytes, err := cmd.CombinedOutput() if err != nil { return fmt.Errorf("ffmpeg filter 失败: %v\n日志:\n%s", err, string(outputBytes)) } return nil } // getVideoResolution 获取视频分辨率 func (s *concatService) getVideoResolution(ctx context.Context, ffmpegPath, videoPath string) (width, height int, err error) { ffprobePath := filepath.Join(filepath.Dir(ffmpegPath), "ffprobe") if _, err := os.Stat(ffprobePath); os.IsNotExist(err) { ffprobePath = "ffprobe" } cmd := exec.CommandContext(ctx, ffprobePath, "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=p=0", videoPath, ) output, err := cmd.Output() if err != nil { return 0, 0, err } fmt.Sscanf(strings.TrimSpace(string(output)), "%d,%d", &width, &height) return } // getVideoDuration 获取视频时长 func (s *concatService) getVideoDuration(ctx context.Context, ffmpegPath, videoPath string) (float64, error) { ffprobePath := filepath.Join(filepath.Dir(ffmpegPath), "ffprobe") if _, err := os.Stat(ffprobePath); os.IsNotExist(err) { ffprobePath = "ffprobe" } cmd := exec.CommandContext(ctx, ffprobePath, "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", videoPath, ) output, err := cmd.Output() if err != nil { return 0, err } var duration float64 fmt.Sscanf(strings.TrimSpace(string(output)), "%f", &duration) return duration, nil } // hasVideoAudio 检测视频文件是否有音频轨道 func (s *concatService) hasVideoAudio(ctx context.Context, ffmpegPath, videoPath string) bool { ffprobePath := filepath.Join(filepath.Dir(ffmpegPath), "ffprobe") if _, err := os.Stat(ffprobePath); os.IsNotExist(err) { ffprobePath = "ffprobe" } cmd := exec.CommandContext(ctx, ffprobePath, "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=codec_type", "-of", "default=noprint_wrappers=1:nokey=1", videoPath, ) output, err := cmd.Output() if err != nil || len(strings.TrimSpace(string(output))) == 0 { return false } // 检测视频时长,如果为0则用 aevalsrc 生成静音 return true } func (s *concatService) getFFmpegPath() (string, error) { ffmpegPath := g.Cfg().MustGet(context.Background(), "ffmpeg.path", "").String() if ffmpegPath != "" { if _, err := os.Stat(ffmpegPath); err == nil { return ffmpegPath, nil } } path, err := exec.LookPath("ffmpeg") if err != nil { return "", fmt.Errorf("未找到 ffmpeg") } return path, nil } func formatDuration(seconds float64) string { h := int(seconds) / 3600 m := (int(seconds) % 3600) / 60 s := int(seconds) % 60 return fmt.Sprintf("%02d:%02d:%02d", h, m, s) } // uploadFileRes 上传文件响应 type uploadFileRes struct { FileURL string `json:"fileURL" dc:"上传地址"` FileSize int `json:"fileSize" dc:"文件大小"` FileName string `json:"fileName" dc:"文件名称"` FileFormat string `json:"fileFormat" dc:"文件格式"` FileAddressPrefix string `json:"fileAddressPrefix"` } // UploadToMinIO 通过 OSS 微服务的 uploadFile 接口上传到 MinIO(multipart/form-data) func (s *concatService) UploadToMinIO(ctx context.Context, localFilePath string) (*uploadFileRes, error) { // 构建 multipart/form-data 表单 var buf bytes.Buffer mw := multipart.NewWriter(&buf) file, err := os.Open(localFilePath) if err != nil { return nil, fmt.Errorf("打开文件失败: %v", err) } defer file.Close() fw, err := mw.CreateFormFile("file", filepath.Base(localFilePath)) if err != nil { return nil, fmt.Errorf("创建表单文件字段失败: %v", err) } if _, err = io.Copy(fw, file); err != nil { return nil, fmt.Errorf("写入文件内容失败: %v", err) } mw.Close() // 使用 commonHttp 的客户端(含 Consul 服务发现),大文件上传设置长超时 client := commonHttp.Httpclient.Clone() // 必须单独设置 Transport.ResponseHeaderTimeout,SetTimeout 只设 Client.Timeout newTransport := http.DefaultTransport.(*http.Transport).Clone() newTransport.ResponseHeaderTimeout = 5 * time.Minute client.Transport = newTransport client.SetTimeout(10 * time.Minute) // 透传认证 headers(优先从 HTTP 请求头取) hasAuthHeader := false if r := g.RequestFromCtx(ctx); r != nil { for k, v := range r.Header { client.SetHeader(k, v[0]) if strings.EqualFold(k, "Authorization") || strings.EqualFold(k, "X-User-Info") { hasAuthHeader = true } } } // 无 HTTP 请求时(异步 goroutine),从 context 的用户信息构造 header if !hasAuthHeader { uploadUser := getUserFromCtx(ctx) userJSON, _ := json.Marshal(uploadUser) client.SetHeader("X-User-Info", string(userJSON)) } // 设置 multipart Content-Type(含 boundary) contentType := mw.FormDataContentType() client.SetHeader("Content-Type", contentType) g.Log().Debugf(ctx, "[UploadToMinIO] 请求URL: oss/file/uploadFile, 文件: %s, Body大小: %d bytes", localFilePath, buf.Len()) // 发送 multipart 请求(原始字节流) response, err := client.Post(ctx, "oss/file/uploadFile", buf.Bytes()) if err != nil { glog.Error(ctx, err) return nil, fmt.Errorf("调用OSS上传服务失败: %v", err) } defer response.Close() body := response.ReadAll() // 调试:打印原始响应 g.Log().Debugf(ctx, "[UploadToMinIO] OSS原始响应: %s", string(body)) // 解析标准 GoFrame 响应格式 {code, message, data} var apiResp struct { Code int `json:"code"` Message string `json:"message"` Data *uploadFileRes `json:"data"` } if err = json.Unmarshal(body, &apiResp); err != nil { return nil, fmt.Errorf("响应解析失败: %v", err) } if apiResp.Code != 200 && apiResp.Code != 0 { return nil, fmt.Errorf("OSS上传失败: %s", apiResp.Message) } g.Log().Infof(ctx, "[UploadToMinIO] 上传成功 fileURL=%s size=%d", apiResp.Data.FileURL, apiResp.Data.FileSize) return apiResp.Data, nil } // CleanupConcat 清理输入视频文件 func CleanupConcat(paths []string) { for _, p := range paths { os.Remove(p) } } // ---------- 异步拼接任务管理 ---------- // CreateAsyncTask 创建异步拼接任务(URL模式),返回 taskId,后台处理 func (s *concatService) CreateAsyncTask(ctx context.Context, videoURLs []string, method string, upload bool, callbackURL string) (string, error) { if len(videoURLs) < 2 { return "", fmt.Errorf("至少需要2个视频才能拼接") } taskID := "concat_" + guid.S() task := &entity.ConcatTask{ TaskID: taskID, Status: "pending", MethodUsed: method, } if _, err := dao.ConcatTask.Insert(ctx, task); err != nil { return "", fmt.Errorf("创建任务失败: %v", err) } // 提取调用方用户信息,传给 goroutine user := getUserFromCtx(ctx) g.Log().Infof(ctx, "[异步拼接] 创建任务 %s, 视频数=%d, 回调=%s", taskID, len(videoURLs), callbackURL) // 异步处理:先下载再拼接 go s.processAsyncTask(user, taskID, videoURLs, method, upload, callbackURL) return taskID, nil } // CreateAsyncTaskWithFiles 创建异步拼接任务(文件上传模式),直接处理本地文件 func (s *concatService) CreateAsyncTaskWithFiles(ctx context.Context, filePaths []string, method string, upload bool, callbackURL string) (string, error) { if len(filePaths) < 2 { return "", fmt.Errorf("至少需要2个视频才能拼接") } taskID := "concat_" + guid.S() task := &entity.ConcatTask{ TaskID: taskID, Status: "pending", MethodUsed: method, } if _, err := dao.ConcatTask.Insert(ctx, task); err != nil { return "", fmt.Errorf("创建任务失败: %v", err) } // 提取调用方用户信息,传给 goroutine user := getUserFromCtx(ctx) g.Log().Infof(ctx, "[异步拼接-文件] 创建任务 %s, 文件数=%d, 回调=%s", taskID, len(filePaths), callbackURL) // 异步处理:已有本地文件,直接拼接 go s.processAsyncTaskWithFiles(user, taskID, filePaths, method, upload, callbackURL) return taskID, nil } // getUserFromCtx 从 context 中提取用户信息,如果没有则返回默认 admin func getUserFromCtx(ctx context.Context) *beans.User { if u := ctx.Value("user"); u != nil { if user, ok := u.(*beans.User); ok { return user } } return &beans.User{UserName: "admin", TenantId: 1} } // GetTaskResult 查询异步任务结果 func (s *concatService) GetTaskResult(ctx context.Context, taskID string) (*dto.GetConcatTaskRes, error) { task, err := dao.ConcatTask.GetByTaskID(ctx, taskID) if err != nil { return nil, fmt.Errorf("查询任务失败: %v", err) } if task == nil { return nil, fmt.Errorf("任务不存在: %s", taskID) } return dao.EntityToTaskRes(task), nil } // processAsyncTaskWithFiles 后台处理异步拼接任务(文件上传模式,文件已在本地) func (s *concatService) processAsyncTaskWithFiles(user *beans.User, taskID string, filePaths []string, method string, upload bool, callbackURL string) { bgCtx := context.Background() bgCtx = context.WithValue(bgCtx, "user", user) dao.ConcatTask.UpdateRunning(bgCtx, taskID) defer func() { if r := recover(); r != nil { errMsg := fmt.Sprintf("异步拼接异常: %v", r) g.Log().Errorf(bgCtx, "[异步拼接 %s] %s", taskID, errMsg) dao.ConcatTask.UpdateError(bgCtx, taskID, errMsg) s.concatCallback(bgCtx, taskID, callbackURL) } }() concatErr := s.executeConcat(bgCtx, taskID, filePaths, method, upload) if concatErr != nil { dao.ConcatTask.UpdateError(bgCtx, taskID, concatErr.Error()) s.concatCallback(bgCtx, taskID, callbackURL) return } g.Log().Infof(bgCtx, "[异步拼接 %s] 完成", taskID) if callbackURL != "" { s.concatCallback(bgCtx, taskID, callbackURL) } } // processAsyncTask 后台处理异步拼接任务(URL模式,需要先下载) func (s *concatService) processAsyncTask(user *beans.User, taskID string, videoURLs []string, method string, upload bool, callbackURL string) { bgCtx := context.Background() bgCtx = context.WithValue(bgCtx, "user", user) dao.ConcatTask.UpdateRunning(bgCtx, taskID) defer func() { if r := recover(); r != nil { errMsg := fmt.Sprintf("异步拼接异常: %v", r) g.Log().Errorf(bgCtx, "[异步拼接 %s] %s", taskID, errMsg) dao.ConcatTask.UpdateError(bgCtx, taskID, errMsg) s.concatCallback(bgCtx, taskID, callbackURL) } }() // 下载视频 var savePaths []string tempDir := g.Cfg().MustGet(bgCtx, "ffmpeg.temp_dir", "resource/temp").String() os.MkdirAll(tempDir, 0755) for _, videoURL := range videoURLs { savePath, dlErr := downloadFile(bgCtx, videoURL, tempDir) if dlErr != nil { g.Log().Warningf(bgCtx, "[异步拼接 %s] 下载失败 %s: %v", taskID, videoURL, dlErr) continue } savePaths = append(savePaths, savePath) } if len(savePaths) < 2 { errMsg := fmt.Sprintf("成功下载的视频不足2个(共%d)", len(videoURLs)) dao.ConcatTask.UpdateError(bgCtx, taskID, errMsg) CleanupConcat(savePaths) s.concatCallback(bgCtx, taskID, callbackURL) return } // 执行拼接 concatErr := s.executeConcat(bgCtx, taskID, savePaths, method, upload) CleanupConcat(savePaths) if concatErr != nil { dao.ConcatTask.UpdateError(bgCtx, taskID, concatErr.Error()) s.concatCallback(bgCtx, taskID, callbackURL) return } g.Log().Infof(bgCtx, "[异步拼接 %s] 完成", taskID) if callbackURL != "" { s.concatCallback(bgCtx, taskID, callbackURL) } } // executeConcat 执行拼接并更新任务状态,返回输出路径 func (s *concatService) executeConcat(ctx context.Context, taskID string, filePaths []string, method string, upload bool) error { tempDir := filepath.Dir(filePaths[0]) outputPath := filepath.Join(tempDir, fmt.Sprintf("concat_%s_x%d_%s.mp4", taskID, len(filePaths), time.Now().Format("150405"))) res, concatErr := s.Concat(ctx, &ConcatReq{ VideoPaths: filePaths, OutputPath: outputPath, Method: method, Upload: upload, }) if concatErr != nil { os.Remove(outputPath) return concatErr } // 更新数据库为成功 fileName := filepath.Base(outputPath) fileFormat := "" if idx := strings.LastIndex(fileName, "."); idx > 0 { fileFormat = fileName[idx+1:] } dao.ConcatTask.UpdateSuccess(ctx, taskID, res.FileURL, res.FileSize, fileName, fileFormat, "", res.MethodUsed, res.DurationStr) os.Remove(outputPath) return nil } // concatCallback 回调通知(从数据库读取任务结果发送) func (s *concatService) concatCallback(ctx context.Context, taskID, callbackURL string) { if callbackURL == "" { return } task, err := dao.ConcatTask.GetByTaskID(ctx, taskID) if err != nil || task == nil { g.Log().Errorf(ctx, "[异步拼接回调 %s] 查询任务失败: %v", taskID, err) return } payload := map[string]interface{}{ "taskId": task.TaskID, "status": task.Status, } if task.Status == "success" { payload["fileURL"] = task.FileURL payload["fileSize"] = task.FileSize } if task.Status == "failed" { payload["errorMessage"] = task.ErrorMessage } body, _ := json.Marshal(payload) g.Log().Infof(ctx, "[异步拼接回调 %s] 状态=%s, 目标=%s", taskID, task.Status, callbackURL) req, _ := http.NewRequest("POST", callbackURL, bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") // 透传调用方用户信息 cbUser := getUserFromCtx(ctx) userJSON, _ := json.Marshal(cbUser) req.Header.Set("X-User-Info", string(userJSON)) client := &http.Client{Timeout: 2 * time.Minute} resp, reqErr := client.Do(req) if reqErr != nil { g.Log().Errorf(ctx, "[异步拼接回调 %s] 请求失败: %v", taskID, reqErr) return } defer resp.Body.Close() respBody, _ := io.ReadAll(resp.Body) g.Log().Infof(ctx, "[异步拼接回调 %s] 响应 status=%d, body=%s", taskID, resp.StatusCode, string(respBody)) } // downloadFile 下载文件到临时目录 func downloadFile(ctx context.Context, rawURL, tempDir string) (string, error) { parsedURL, err := url.Parse(rawURL) if err != nil { return "", err } segments := strings.Split(parsedURL.Path, "/") fileName := segments[len(segments)-1] if fileName == "" { fileName = fmt.Sprintf("video_%d.mp4", time.Now().UnixMilli()) } savePath := filepath.Join(tempDir, fmt.Sprintf("%d_%s", time.Now().UnixMilli(), fileName)) client := &http.Client{Timeout: 10 * time.Minute} resp, err := client.Get(rawURL) if err != nil { return "", err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return "", fmt.Errorf("HTTP %d", resp.StatusCode) } out, err := os.Create(savePath) if err != nil { return "", err } defer out.Close() _, err = io.Copy(out, resp.Body) if err != nil { os.Remove(savePath) return "", err } return savePath, nil }