Compare commits
5 Commits
e76bf57d54
...
e6c27e2dee
| Author | SHA1 | Date | |
|---|---|---|---|
| e6c27e2dee | |||
| e79f8a6131 | |||
| 036b5cec37 | |||
| 9a40fd7e1e | |||
| ccd17903c7 |
@@ -49,6 +49,6 @@ RUN mkdir -p /app/resource/log/run \
|
||||
|
||||
USER appuser
|
||||
|
||||
EXPOSE 3001
|
||||
EXPOSE 3010
|
||||
|
||||
CMD ["./main"]
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
server:
|
||||
address : ":3001"
|
||||
address : ":3010"
|
||||
name: "media"
|
||||
workerId: 1
|
||||
logPath: "resource/log/server"
|
||||
@@ -37,7 +37,7 @@ database:
|
||||
redis:
|
||||
# 集群模式配置方法
|
||||
default:
|
||||
address: 116.204.74.41:6379
|
||||
address: 192.168.3.30:6379
|
||||
db: 0
|
||||
idleTimeout: "60s" #连接最大空闲时间,使用时间字符串例如30s/1m/1d
|
||||
maxConnLifetime: "90s" #连接最长存活时间,使用时间字符串例如30s/1m/1d
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
service "media/service/asr"
|
||||
|
||||
"gitea.com/red-future/common/beans"
|
||||
"gitea.com/red-future/common/utils"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
)
|
||||
|
||||
@@ -56,10 +57,23 @@ func (c *audio) ListTasks(ctx context.Context, req *dto.ListTaskReq) (res *dto.L
|
||||
return service.AudioTask.ListTasks(ctx, req)
|
||||
}
|
||||
|
||||
// withUser 为 context 注入默认用户(无认证基础设施时使用)
|
||||
// withUser 优先从请求头/X-User-Info/Token 提取用户信息,没有则用默认 admin
|
||||
func withUser(ctx context.Context) context.Context {
|
||||
if ctx.Value("user") == nil {
|
||||
ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin", TenantId: 1})
|
||||
if ctx.Value("user") != nil {
|
||||
return ctx
|
||||
}
|
||||
|
||||
user, err := utils.GetUserInfo(ctx)
|
||||
if err == nil && user != nil && user.TenantId > 0 {
|
||||
g.Log().Infof(ctx, "[用户信息] 从请求头解析到用户: userName=%s, tenantId=%d", user.UserName, user.TenantId)
|
||||
ctx = context.WithValue(ctx, "user", user)
|
||||
return ctx
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
g.Log().Debugf(ctx, "[用户信息] 解析失败(%v), 使用默认admin/tenant=1", err)
|
||||
}
|
||||
|
||||
ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin", TenantId: 1})
|
||||
return ctx
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
service "media/service/video"
|
||||
|
||||
"gitea.com/red-future/common/beans"
|
||||
"gitea.com/red-future/common/utils"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
)
|
||||
|
||||
@@ -26,6 +27,9 @@ var Concat = new(video)
|
||||
// Concat 视频拼接(URL模式) POST /video/concat
|
||||
func (c *video) Concat(ctx context.Context, req *dto.ConcatReq) (res *dto.ConcatRes, err error) {
|
||||
ctx = withUser(ctx)
|
||||
g.Log().Infof(ctx, "[视频拼接] 收到请求 入参: method=%s, upload=%v, video_urls=%v",
|
||||
req.Method, req.Upload, req.VideoURLs)
|
||||
|
||||
if req.Method == "" {
|
||||
req.Method = "auto"
|
||||
}
|
||||
@@ -44,12 +48,33 @@ func (c *video) Concat(ctx context.Context, req *dto.ConcatReq) (res *dto.Concat
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer os.Remove(svcRes.OutputPath)
|
||||
return toDTORes(svcRes), nil
|
||||
}
|
||||
|
||||
// ConcatAsync 视频拼接-异步(URL模式) POST /video/concat/async
|
||||
func (c *video) ConcatAsync(ctx context.Context, req *dto.ConcatAsyncReq) (res *dto.CreateConcatTaskRes, err error) {
|
||||
ctx = withUser(ctx)
|
||||
g.Log().Infof(ctx, "[视频拼接-异步] 收到请求 入参: method=%s, upload=%v, callback=%s, video_urls=%v",
|
||||
req.Method, req.Upload, req.CallbackURL, req.VideoURLs)
|
||||
|
||||
if req.Method == "" {
|
||||
req.Method = "auto"
|
||||
}
|
||||
|
||||
taskID, taskErr := service.Concat.CreateAsyncTask(ctx, req.VideoURLs, req.Method, req.Upload, req.CallbackURL)
|
||||
if taskErr != nil {
|
||||
return nil, taskErr
|
||||
}
|
||||
return &dto.CreateConcatTaskRes{TaskID: taskID}, nil
|
||||
}
|
||||
|
||||
// ConcatUpload 视频拼接(文件上传模式) POST /video/concat/upload
|
||||
func (c *video) ConcatUpload(ctx context.Context, req *dto.ConcatUploadReq) (res *dto.ConcatRes, err error) {
|
||||
ctx = withUser(ctx)
|
||||
g.Log().Infof(ctx, "[视频拼接-上传] 收到请求 入参: method=%s, upload=%v", req.Method, req.Upload)
|
||||
|
||||
savePaths, err := common.SaveUploadedFilesFromCtx(ctx)
|
||||
if err != nil || len(savePaths) < 2 {
|
||||
return nil, fmt.Errorf("至少需要2个视频,当前%d个", len(savePaths))
|
||||
@@ -68,14 +93,63 @@ func (c *video) ConcatUpload(ctx context.Context, req *dto.ConcatUploadReq) (res
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer os.Remove(svcRes.OutputPath)
|
||||
return toDTORes(svcRes), nil
|
||||
}
|
||||
|
||||
// withUser 为 context 注入默认用户(无认证基础设施时使用)
|
||||
func withUser(ctx context.Context) context.Context {
|
||||
if ctx.Value("user") == nil {
|
||||
ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin", TenantId: 1})
|
||||
// ConcatUploadAsync 视频拼接-异步(文件上传模式) POST /video/concat/upload/async
|
||||
func (c *video) ConcatUploadAsync(ctx context.Context, req *dto.ConcatUploadAsyncReq) (res *dto.CreateConcatTaskRes, err error) {
|
||||
ctx = withUser(ctx)
|
||||
g.Log().Infof(ctx, "[视频拼接-上传-异步] 收到请求 入参: method=%s, upload=%v, callback=%s",
|
||||
req.Method, req.Upload, req.CallbackURL)
|
||||
|
||||
savePaths, err := common.SaveUploadedFilesFromCtx(ctx)
|
||||
if err != nil || len(savePaths) < 2 {
|
||||
return nil, fmt.Errorf("至少需要2个视频,当前%d个", len(savePaths))
|
||||
}
|
||||
defer service.CleanupConcat(savePaths)
|
||||
|
||||
if req.Method == "" {
|
||||
req.Method = "auto"
|
||||
}
|
||||
|
||||
taskID, taskErr := service.Concat.CreateAsyncTaskWithFiles(ctx, savePaths, req.Method, req.Upload, req.CallbackURL)
|
||||
if taskErr != nil {
|
||||
return nil, taskErr
|
||||
}
|
||||
return &dto.CreateConcatTaskRes{TaskID: taskID}, nil
|
||||
}
|
||||
|
||||
// GetConcatTask 查询异步拼接任务结果 GET /video/concat/task/{taskId}
|
||||
func (c *video) GetConcatTask(ctx context.Context, req *dto.GetConcatTaskReq) (res *dto.GetConcatTaskRes, err error) {
|
||||
ctx = withUser(ctx)
|
||||
return service.Concat.GetTaskResult(ctx, req.TaskID)
|
||||
}
|
||||
|
||||
// withUser 优先从请求头/X-User-Info/Token 提取用户信息,没有则用默认 admin
|
||||
func withUser(ctx context.Context) context.Context {
|
||||
if ctx.Value("user") != nil {
|
||||
return ctx
|
||||
}
|
||||
|
||||
// 调试:打印 Authorization 头
|
||||
if req := g.RequestFromCtx(ctx); req != nil {
|
||||
g.Log().Debugf(ctx, "[withUser] Authorization头=%q", req.Header.Get("Authorization"))
|
||||
}
|
||||
|
||||
user, err := utils.GetUserInfo(ctx)
|
||||
if err == nil && user != nil && user.TenantId > 0 {
|
||||
g.Log().Infof(ctx, "[用户信息] 从请求头解析到用户: userName=%s, tenantId=%d", user.UserName, user.TenantId)
|
||||
ctx = context.WithValue(ctx, "user", user)
|
||||
return ctx
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
g.Log().Debugf(ctx, "[用户信息] 解析失败(%v), 使用默认admin/tenant=1", err)
|
||||
}
|
||||
|
||||
ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin", TenantId: 1})
|
||||
return ctx
|
||||
}
|
||||
|
||||
|
||||
92
controller/video/cut_controller.go
Normal file
92
controller/video/cut_controller.go
Normal file
@@ -0,0 +1,92 @@
|
||||
package video
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
dto "media/model/dto/video"
|
||||
service "media/service/video"
|
||||
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
)
|
||||
|
||||
type cut struct{}
|
||||
|
||||
var Cut = new(cut)
|
||||
|
||||
// Cut 视频分镜剪切(URL模式) POST /video/cut
|
||||
func (c *cut) Cut(ctx context.Context, req *dto.CutReq) (res *dto.CutRes, err error) {
|
||||
ctx = withUser(ctx)
|
||||
g.Log().Infof(ctx, "[视频分镜剪切] 收到请求 入参: video_url=%s, total_scenes=%d, upload=%v",
|
||||
req.VideoURL, len(req.Scenes), req.Upload)
|
||||
|
||||
// 下载视频到临时目录
|
||||
tempDir := getTempDir(ctx)
|
||||
os.MkdirAll(tempDir, 0755)
|
||||
|
||||
savePath, err := downloadFromURL(ctx, req.VideoURL, tempDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("下载视频失败: %v", err)
|
||||
}
|
||||
defer os.Remove(savePath)
|
||||
|
||||
// 转换分镜为 service 层类型(解析时间字符串为秒)
|
||||
serviceShots, err := service.ConvertScenes(req.Scenes)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("解析分镜失败: %v", err)
|
||||
}
|
||||
if len(serviceShots) == 0 {
|
||||
return nil, fmt.Errorf("没有有效的分镜片段")
|
||||
}
|
||||
|
||||
// 调用服务层
|
||||
svcRes, err := service.Cut.Cut(ctx, &service.CutReq{
|
||||
VideoPath: savePath,
|
||||
Shots: serviceShots,
|
||||
Upload: req.Upload,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer os.Remove(svcRes.OutputPath)
|
||||
|
||||
return &dto.CutRes{
|
||||
OutputPath: svcRes.OutputPath,
|
||||
FileSize: svcRes.FileSize,
|
||||
Duration: svcRes.Duration,
|
||||
DurationStr: svcRes.DurationStr,
|
||||
ShotsCount: svcRes.ShotsCount,
|
||||
FileURL: svcRes.FileURL,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// CutAsync 视频分镜剪切-异步(URL模式) POST /video/cut/async
|
||||
func (c *cut) CutAsync(ctx context.Context, req *dto.CutAsyncReq) (res *dto.CreateCutTaskRes, err error) {
|
||||
ctx = withUser(ctx)
|
||||
g.Log().Infof(ctx, "[视频分镜剪切-异步] 收到请求 入参: video_url=%s, total_scenes=%d, upload=%v, callback=%s",
|
||||
req.VideoURL, len(req.Scenes), req.Upload, req.CallbackURL)
|
||||
|
||||
// 转换分镜为 service 层类型(解析时间字符串为秒)
|
||||
serviceShots, err := service.ConvertScenes(req.Scenes)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("解析分镜失败: %v", err)
|
||||
}
|
||||
if len(serviceShots) == 0 {
|
||||
return nil, fmt.Errorf("没有有效的分镜片段")
|
||||
}
|
||||
|
||||
taskID, taskErr := service.Cut.CreateAsyncTask(ctx, req.VideoURL, serviceShots, req.Upload, req.CallbackURL)
|
||||
if taskErr != nil {
|
||||
return nil, taskErr
|
||||
}
|
||||
|
||||
return &dto.CreateCutTaskRes{TaskID: taskID}, nil
|
||||
}
|
||||
|
||||
// GetCutTask 查询异步剪切任务结果 GET /video/cut/task/{taskId}
|
||||
func (c *cut) GetCutTask(ctx context.Context, req *dto.GetCutTaskReq) (res *dto.GetCutTaskRes, err error) {
|
||||
ctx = withUser(ctx)
|
||||
return service.Cut.GetTaskResult(ctx, req.TaskID)
|
||||
}
|
||||
113
dao/video/concat_task_dao.go
Normal file
113
dao/video/concat_task_dao.go
Normal file
@@ -0,0 +1,113 @@
|
||||
package video
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
dto "media/model/dto/video"
|
||||
entity "media/model/entity/video"
|
||||
|
||||
"gitea.com/red-future/common/db/gfdb"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/util/gconv"
|
||||
)
|
||||
|
||||
var ConcatTask = new(concatTaskDao)
|
||||
|
||||
type concatTaskDao struct{}
|
||||
|
||||
const concatTaskTable = "concat_task"
|
||||
|
||||
// Insert 创建任务(排除 id 字段,让数据库自增)
|
||||
func (d *concatTaskDao) Insert(ctx context.Context, data *entity.ConcatTask) (id int64, err error) {
|
||||
r, err := gfdb.DB(ctx).Model(ctx, concatTaskTable).
|
||||
Data(data).
|
||||
FieldsEx(entity.ConcatTaskCols.Id).
|
||||
Insert()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return r.LastInsertId()
|
||||
}
|
||||
|
||||
// GetByTaskID 根据taskId查询任务
|
||||
func (d *concatTaskDao) GetByTaskID(ctx context.Context, taskID string) (res *entity.ConcatTask, err error) {
|
||||
r, err := gfdb.DB(ctx).Model(ctx, concatTaskTable).
|
||||
Where(entity.ConcatTaskCols.TaskID, taskID).
|
||||
One()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if r == nil {
|
||||
return nil, nil
|
||||
}
|
||||
err = r.Struct(&res)
|
||||
return
|
||||
}
|
||||
|
||||
// UpdateRunning 更新为运行中
|
||||
func (d *concatTaskDao) UpdateRunning(ctx context.Context, taskID string) error {
|
||||
_, err := gfdb.DB(ctx).Model(ctx, concatTaskTable).
|
||||
Data(g.Map{
|
||||
entity.ConcatTaskCols.Status: "running",
|
||||
}).
|
||||
Where(entity.ConcatTaskCols.TaskID, taskID).
|
||||
Update()
|
||||
return err
|
||||
}
|
||||
|
||||
// UpdateSuccess 更新为成功
|
||||
func (d *concatTaskDao) UpdateSuccess(ctx context.Context, taskID string, fileURL string, fileSize int64, fileName, fileFormat, fileAddrPrefix, methodUsed, durationStr string) error {
|
||||
_, err := gfdb.DB(ctx).Model(ctx, concatTaskTable).
|
||||
Data(g.Map{
|
||||
entity.ConcatTaskCols.Status: "success",
|
||||
entity.ConcatTaskCols.FileURL: fileURL,
|
||||
entity.ConcatTaskCols.FileSize: fileSize,
|
||||
entity.ConcatTaskCols.FileName: fileName,
|
||||
entity.ConcatTaskCols.FileFormat: fileFormat,
|
||||
entity.ConcatTaskCols.FileAddressPrefix: fileAddrPrefix,
|
||||
entity.ConcatTaskCols.MethodUsed: methodUsed,
|
||||
entity.ConcatTaskCols.DurationStr: durationStr,
|
||||
entity.ConcatTaskCols.ErrorMessage: "",
|
||||
}).
|
||||
Where(entity.ConcatTaskCols.TaskID, taskID).
|
||||
Update()
|
||||
return err
|
||||
}
|
||||
|
||||
// UpdateError 更新为失败
|
||||
func (d *concatTaskDao) UpdateError(ctx context.Context, taskID string, errMsg string) error {
|
||||
_, err := gfdb.DB(ctx).Model(ctx, concatTaskTable).
|
||||
Data(g.Map{
|
||||
entity.ConcatTaskCols.Status: "failed",
|
||||
entity.ConcatTaskCols.ErrorMessage: errMsg,
|
||||
}).
|
||||
Where(entity.ConcatTaskCols.TaskID, taskID).
|
||||
Update()
|
||||
return err
|
||||
}
|
||||
|
||||
// EntityToTaskRes 实体转DTO
|
||||
func EntityToTaskRes(e *entity.ConcatTask) *dto.GetConcatTaskRes {
|
||||
res := &dto.GetConcatTaskRes{
|
||||
TaskID: e.TaskID,
|
||||
Status: e.Status,
|
||||
CreatedAt: gconv.Int64(e.CreatedAt.Timestamp()),
|
||||
}
|
||||
if e.CreatedAt == nil {
|
||||
res.CreatedAt = time.Now().UnixMilli()
|
||||
}
|
||||
if e.Status == "success" {
|
||||
res.FileURL = e.FileURL
|
||||
res.FileSize = e.FileSize
|
||||
res.FileName = e.FileName
|
||||
res.FileFormat = e.FileFormat
|
||||
res.FileAddressPrefix = e.FileAddressPrefix
|
||||
res.MethodUsed = e.MethodUsed
|
||||
res.DurationStr = e.DurationStr
|
||||
}
|
||||
if e.Status == "failed" {
|
||||
res.ErrorMessage = e.ErrorMessage
|
||||
}
|
||||
return res
|
||||
}
|
||||
111
dao/video/cut_task_dao.go
Normal file
111
dao/video/cut_task_dao.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package video
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
dto "media/model/dto/video"
|
||||
entity "media/model/entity/video"
|
||||
|
||||
"gitea.com/red-future/common/db/gfdb"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/util/gconv"
|
||||
)
|
||||
|
||||
var CutTask = new(cutTaskDao)
|
||||
|
||||
type cutTaskDao struct{}
|
||||
|
||||
const cutTaskTable = "cut_task"
|
||||
|
||||
// Insert 创建任务(排除 id 字段,让数据库自增)
|
||||
func (d *cutTaskDao) Insert(ctx context.Context, data *entity.CutTask) (id int64, err error) {
|
||||
r, err := gfdb.DB(ctx).Model(ctx, cutTaskTable).
|
||||
Data(data).
|
||||
FieldsEx(entity.CutTaskCols.Id).
|
||||
Insert()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return r.LastInsertId()
|
||||
}
|
||||
|
||||
// GetByTaskID 根据taskId查询任务
|
||||
func (d *cutTaskDao) GetByTaskID(ctx context.Context, taskID string) (res *entity.CutTask, err error) {
|
||||
r, err := gfdb.DB(ctx).Model(ctx, cutTaskTable).
|
||||
Where(entity.CutTaskCols.TaskID, taskID).
|
||||
One()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if r == nil {
|
||||
return nil, nil
|
||||
}
|
||||
err = r.Struct(&res)
|
||||
return
|
||||
}
|
||||
|
||||
// UpdateRunning 更新为运行中
|
||||
func (d *cutTaskDao) UpdateRunning(ctx context.Context, taskID string) error {
|
||||
_, err := gfdb.DB(ctx).Model(ctx, cutTaskTable).
|
||||
Data(g.Map{
|
||||
entity.CutTaskCols.Status: "running",
|
||||
}).
|
||||
Where(entity.CutTaskCols.TaskID, taskID).
|
||||
Update()
|
||||
return err
|
||||
}
|
||||
|
||||
// UpdateSuccess 更新为成功
|
||||
func (d *cutTaskDao) UpdateSuccess(ctx context.Context, taskID string, fileURL string, fileSize int64, fileName, fileFormat, fileAddrPrefix, durationStr string) error {
|
||||
_, err := gfdb.DB(ctx).Model(ctx, cutTaskTable).
|
||||
Data(g.Map{
|
||||
entity.CutTaskCols.Status: "success",
|
||||
entity.CutTaskCols.FileURL: fileURL,
|
||||
entity.CutTaskCols.FileSize: fileSize,
|
||||
entity.CutTaskCols.FileName: fileName,
|
||||
entity.CutTaskCols.FileFormat: fileFormat,
|
||||
entity.CutTaskCols.FileAddressPrefix: fileAddrPrefix,
|
||||
entity.CutTaskCols.DurationStr: durationStr,
|
||||
entity.CutTaskCols.ErrorMessage: "",
|
||||
}).
|
||||
Where(entity.CutTaskCols.TaskID, taskID).
|
||||
Update()
|
||||
return err
|
||||
}
|
||||
|
||||
// UpdateError 更新为失败
|
||||
func (d *cutTaskDao) UpdateError(ctx context.Context, taskID string, errMsg string) error {
|
||||
_, err := gfdb.DB(ctx).Model(ctx, cutTaskTable).
|
||||
Data(g.Map{
|
||||
entity.CutTaskCols.Status: "failed",
|
||||
entity.CutTaskCols.ErrorMessage: errMsg,
|
||||
}).
|
||||
Where(entity.CutTaskCols.TaskID, taskID).
|
||||
Update()
|
||||
return err
|
||||
}
|
||||
|
||||
// EntityToTaskRes 实体转DTO
|
||||
func CutEntityToTaskRes(e *entity.CutTask) *dto.GetCutTaskRes {
|
||||
res := &dto.GetCutTaskRes{
|
||||
TaskID: e.TaskID,
|
||||
Status: e.Status,
|
||||
CreatedAt: gconv.Int64(e.CreatedAt.Timestamp()),
|
||||
}
|
||||
if e.CreatedAt == nil {
|
||||
res.CreatedAt = time.Now().UnixMilli()
|
||||
}
|
||||
if e.Status == "success" {
|
||||
res.FileURL = e.FileURL
|
||||
res.FileSize = e.FileSize
|
||||
res.FileName = e.FileName
|
||||
res.FileFormat = e.FileFormat
|
||||
res.FileAddressPrefix = e.FileAddressPrefix
|
||||
res.DurationStr = e.DurationStr
|
||||
}
|
||||
if e.Status == "failed" {
|
||||
res.ErrorMessage = e.ErrorMessage
|
||||
}
|
||||
return res
|
||||
}
|
||||
1
main.go
1
main.go
@@ -18,6 +18,7 @@ func main() {
|
||||
http.RouteRegister([]interface{}{
|
||||
controllerAudio.AudioExtract,
|
||||
controllerVideo.Concat,
|
||||
controllerVideo.Cut,
|
||||
})
|
||||
select {}
|
||||
}
|
||||
|
||||
@@ -93,16 +93,10 @@ type ListTaskRes struct {
|
||||
|
||||
// ---------- 回调通知结构 ----------
|
||||
|
||||
// CallbackPayload 回调通知内容
|
||||
// CallbackPayload 回调通知内容(与 GetTaskRes 出参一致)
|
||||
type CallbackPayload struct {
|
||||
TaskID string `json:"taskId" dc:"任务ID"`
|
||||
Status string `json:"status" dc:"任务状态"`
|
||||
TotalFiles int `json:"totalFiles" dc:"文件总数"`
|
||||
SuccessFiles int `json:"successFiles" dc:"成功文件数"`
|
||||
FailFiles int `json:"failFiles" dc:"失败文件数"`
|
||||
Result string `json:"result,omitempty" dc:"完整的处理结果JSON"`
|
||||
ErrorMessage string `json:"errorMessage,omitempty" dc:"错误信息"`
|
||||
DetailList []TranscribeTaskDetailItem `json:"detailList" dc:"明细列表"`
|
||||
TaskInfo TranscribeTaskItem `json:"taskInfo" dc:"任务信息"`
|
||||
DetailList []TranscribeTaskDetailItem `json:"detailList" dc:"明细列表(每视频一条)"`
|
||||
}
|
||||
|
||||
// ---------- 任务处理结果结构(用于result JSONB) ----------
|
||||
|
||||
78
model/dto/video/cut_dto.go
Normal file
78
model/dto/video/cut_dto.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package video
|
||||
|
||||
import "github.com/gogf/gf/v2/frame/g"
|
||||
|
||||
// CutScene 单个分镜场景
|
||||
// 同时兼容两种格式:
|
||||
// 1. 完整格式: {"sceneIndex":1,"startTimeStr":"00:00:00.000","endTimeStr":"00:00:03.115", ...}
|
||||
// 2. 简化格式: {"start": 1.5, "end": 5.0}
|
||||
type CutScene struct {
|
||||
SceneIndex int `json:"sceneIndex" dc:"场景序号"`
|
||||
StartTimeStr string `json:"startTimeStr" dc:"起始时间 HH:MM:SS.mmm"`
|
||||
EndTimeStr string `json:"endTimeStr" dc:"结束时间 HH:MM:SS.mmm"`
|
||||
Start float64 `json:"start" dc:"起始时间(秒)"`
|
||||
End float64 `json:"end" dc:"结束时间(秒)"`
|
||||
DurationStr string `json:"durationStr" dc:"时长"`
|
||||
ShotType string `json:"shotType" dc:"镜头类型"`
|
||||
Composition string `json:"composition" dc:"构图"`
|
||||
NarrativePos string `json:"narrativePos" dc:"叙事位置"`
|
||||
Description string `json:"description" dc:"描述"`
|
||||
}
|
||||
|
||||
// CutReq 视频分镜剪切请求(URL 方式)
|
||||
type CutReq struct {
|
||||
g.Meta `path:"/video/cut" method:"post" tags:"视频剪切" summary:"视频分镜剪切(URL模式)" dc:"根据分镜JSON从视频中剪切多个片段并拼接输出"`
|
||||
VideoURL string `json:"video_url" v:"required#视频URL不能为空" dc:"原始视频URL"`
|
||||
Scenes []CutScene `json:"scenes" v:"required#分镜片段不能为空" dc:"分镜片段数组"`
|
||||
TotalScenes int `json:"totalScenes" dc:"总分镜数"`
|
||||
DurationStr string `json:"durationStr" dc:"总时长"`
|
||||
Upload bool `json:"upload" dc:"是否上传到MinIO" d:"false"`
|
||||
}
|
||||
|
||||
// CutAsyncReq 视频分镜剪切-异步请求(URL方式)
|
||||
type CutAsyncReq struct {
|
||||
g.Meta `path:"/video/cut/async" method:"post" tags:"视频剪切" summary:"视频分镜剪切-异步(URL模式)" dc:"异步根据分镜JSON剪切视频,立即返回taskId,完成后通过callback_url通知结果"`
|
||||
VideoURL string `json:"video_url" v:"required#视频URL不能为空" dc:"原始视频URL"`
|
||||
Scenes []CutScene `json:"scenes" v:"required#分镜片段不能为空" dc:"分镜片段数组"`
|
||||
TotalScenes int `json:"totalScenes" dc:"总分镜数"`
|
||||
DurationStr string `json:"durationStr" dc:"总时长"`
|
||||
Upload bool `json:"upload" dc:"是否上传到MinIO" d:"false"`
|
||||
CallbackURL string `json:"callback_url" v:"required#回调地址不能为空" dc:"回调地址,剪切完成后POST结果到该地址"`
|
||||
}
|
||||
|
||||
// CutRes 视频分镜剪切响应
|
||||
type CutRes struct {
|
||||
OutputPath string `json:"outputPath" dc:"输出文件路径"`
|
||||
FileSize int64 `json:"fileSize" dc:"文件大小(字节)"`
|
||||
Duration float64 `json:"duration" dc:"总时长(秒)"`
|
||||
DurationStr string `json:"durationStr" dc:"可读时长"`
|
||||
ShotsCount int `json:"shotsCount" dc:"输出片段数"`
|
||||
FileURL string `json:"fileURL" dc:"MinIO访问地址(上传后返回)"`
|
||||
}
|
||||
|
||||
// ---------- 异步剪切任务 ----------
|
||||
|
||||
// CreateCutTaskRes 创建异步剪切任务响应
|
||||
type CreateCutTaskRes struct {
|
||||
TaskID string `json:"taskId" dc:"任务ID"`
|
||||
}
|
||||
|
||||
// GetCutTaskReq 查询异步剪切任务请求
|
||||
type GetCutTaskReq struct {
|
||||
g.Meta `path:"/video/cut/task/{taskId}" method:"get" tags:"视频剪切" summary:"查询剪切任务结果" dc:"根据taskId查询异步剪切任务的结果"`
|
||||
TaskID string `json:"taskId" dc:"任务ID"`
|
||||
}
|
||||
|
||||
// GetCutTaskRes 查询异步剪切任务响应
|
||||
type GetCutTaskRes struct {
|
||||
TaskID string `json:"taskId" dc:"任务ID"`
|
||||
Status string `json:"status" dc:"状态: pending/running/success/failed"`
|
||||
FileURL string `json:"fileURL,omitempty" dc:"MinIO文件访问路径"`
|
||||
FileSize int64 `json:"fileSize,omitempty" dc:"文件大小(字节)"`
|
||||
FileName string `json:"fileName,omitempty" dc:"文件名"`
|
||||
FileFormat string `json:"fileFormat,omitempty" dc:"文件格式"`
|
||||
FileAddressPrefix string `json:"fileAddressPrefix,omitempty" dc:"MinIO地址前缀"`
|
||||
DurationStr string `json:"durationStr,omitempty" dc:"剪切后时长"`
|
||||
ErrorMessage string `json:"errorMessage,omitempty" dc:"错误信息"`
|
||||
CreatedAt int64 `json:"createdAt" dc:"创建时间戳"`
|
||||
}
|
||||
@@ -10,6 +10,15 @@ type ConcatReq struct {
|
||||
Upload bool `json:"upload" dc:"是否上传到MinIO" d:"false"`
|
||||
}
|
||||
|
||||
// ConcatAsyncReq 视频拼接-异步请求(URL模式)
|
||||
type ConcatAsyncReq struct {
|
||||
g.Meta `path:"/concat/async" method:"post" tags:"视频拼接" summary:"视频拼接-异步(URL模式)" dc:"异步拼接视频,立即返回taskId,完成后通过callback_url通知结果"`
|
||||
VideoURLs []string `json:"video_urls" v:"required#视频URL列表不能为空" dc:"视频URL列表(按此顺序拼接)"`
|
||||
Method string `json:"method" dc:"拼接方式(auto/fast/reencode)" d:"auto"`
|
||||
Upload bool `json:"upload" dc:"是否上传到MinIO" d:"false"`
|
||||
CallbackURL string `json:"callback_url" v:"required#回调地址不能为空" dc:"回调地址,拼接完成后POST结果到该地址"`
|
||||
}
|
||||
|
||||
// ConcatUploadReq 视频拼接请求(文件上传模式)
|
||||
type ConcatUploadReq struct {
|
||||
g.Meta `path:"/concat/upload" method:"post" tags:"视频拼接" summary:"视频拼接(文件上传)" dc:"上传视频文件并拼接(至少2个视频)"`
|
||||
@@ -17,6 +26,14 @@ type ConcatUploadReq struct {
|
||||
Upload bool `json:"upload" dc:"是否上传到MinIO" d:"false"`
|
||||
}
|
||||
|
||||
// ConcatUploadAsyncReq 视频拼接-异步请求(文件上传模式)
|
||||
type ConcatUploadAsyncReq struct {
|
||||
g.Meta `path:"/concat/upload/async" method:"post" tags:"视频拼接" summary:"视频拼接-异步(文件上传)" dc:"异步拼接上传的视频,立即返回taskId,完成后通过callback_url通知结果"`
|
||||
Method string `json:"method" dc:"拼接方式(auto/fast/reencode)" d:"auto"`
|
||||
Upload bool `json:"upload" dc:"是否上传到MinIO" d:"false"`
|
||||
CallbackURL string `json:"callback_url" v:"required#回调地址不能为空" dc:"回调地址,拼接完成后POST结果到该地址"`
|
||||
}
|
||||
|
||||
// ConcatRes 视频拼接响应
|
||||
type ConcatRes struct {
|
||||
OutputPath string `json:"outputPath" dc:"输出文件路径"`
|
||||
@@ -28,6 +45,36 @@ type ConcatRes struct {
|
||||
FileURL string `json:"fileURL" dc:"MinIO访问地址(上传后返回)"`
|
||||
}
|
||||
|
||||
// ---------- 异步拼接任务 ----------
|
||||
|
||||
// CreateConcatTaskRes 创建异步拼接任务响应
|
||||
type CreateConcatTaskRes struct {
|
||||
TaskID string `json:"taskId" dc:"任务ID"`
|
||||
}
|
||||
|
||||
// GetConcatTaskReq 查询异步拼接任务请求
|
||||
type GetConcatTaskReq struct {
|
||||
g.Meta `path:"/concat/task/{taskId}" method:"get" tags:"视频拼接" summary:"查询拼接任务结果" dc:"根据taskId查询异步拼接任务的结果"`
|
||||
TaskID string `json:"taskId" dc:"任务ID"`
|
||||
}
|
||||
|
||||
// GetConcatTaskRes 查询异步拼接任务响应
|
||||
type GetConcatTaskRes struct {
|
||||
TaskID string `json:"taskId" dc:"任务ID"`
|
||||
Status string `json:"status" dc:"状态: pending/running/success/failed"`
|
||||
FileURL string `json:"fileURL,omitempty" dc:"MinIO文件访问路径"`
|
||||
FileSize int64 `json:"fileSize,omitempty" dc:"文件大小(字节)"`
|
||||
FileName string `json:"fileName,omitempty" dc:"文件名"`
|
||||
FileFormat string `json:"fileFormat,omitempty" dc:"文件格式"`
|
||||
FileAddressPrefix string `json:"fileAddressPrefix,omitempty" dc:"MinIO地址前缀"`
|
||||
MethodUsed string `json:"methodUsed,omitempty" dc:"实际使用的拼接方式"`
|
||||
DurationStr string `json:"durationStr,omitempty" dc:"拼接后时长"`
|
||||
ErrorMessage string `json:"errorMessage,omitempty" dc:"错误信息"`
|
||||
CreatedAt int64 `json:"createdAt" dc:"创建时间戳"`
|
||||
}
|
||||
|
||||
// ---------- 上传工具 ----------
|
||||
|
||||
// UploadFileBytesReq 上传文件请求(字节流)
|
||||
type UploadFileBytesReq struct {
|
||||
FileName string `json:"fileName" dc:"文件名"`
|
||||
|
||||
48
model/entity/video/concat_task.go
Normal file
48
model/entity/video/concat_task.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package video
|
||||
|
||||
import "gitea.com/red-future/common/beans"
|
||||
|
||||
// ConcatTask 视频拼接异步任务实体
|
||||
type ConcatTask struct {
|
||||
beans.SQLBaseDO `orm:",inherit"`
|
||||
TaskID string `orm:"task_id" json:"taskId" description:"任务唯一标识"`
|
||||
Status string `orm:"status" json:"status" description:"任务状态:pending/running/success/failed"`
|
||||
FileURL string `orm:"file_url" json:"fileUrl" description:"MinIO文件访问路径"`
|
||||
FileSize int64 `orm:"file_size" json:"fileSize" description:"文件大小(字节)"`
|
||||
FileName string `orm:"file_name" json:"fileName" description:"文件名"`
|
||||
FileFormat string `orm:"file_format" json:"fileFormat" description:"文件格式"`
|
||||
FileAddressPrefix string `orm:"file_address_prefix" json:"fileAddressPrefix" description:"MinIO地址前缀"`
|
||||
MethodUsed string `orm:"method_used" json:"methodUsed" description:"实际使用的拼接方式"`
|
||||
DurationStr string `orm:"duration_str" json:"durationStr" description:"拼接后时长"`
|
||||
ErrorMessage string `orm:"error_message" json:"errorMessage" description:"错误信息"`
|
||||
}
|
||||
|
||||
// ConcatTaskCol 字段定义
|
||||
type ConcatTaskCol struct {
|
||||
beans.SQLBaseCol
|
||||
TaskID string
|
||||
Status string
|
||||
FileURL string
|
||||
FileSize string
|
||||
FileName string
|
||||
FileFormat string
|
||||
FileAddressPrefix string
|
||||
MethodUsed string
|
||||
DurationStr string
|
||||
ErrorMessage string
|
||||
}
|
||||
|
||||
// ConcatTaskCols 字段常量
|
||||
var ConcatTaskCols = ConcatTaskCol{
|
||||
SQLBaseCol: beans.DefSQLBaseCol,
|
||||
TaskID: "task_id",
|
||||
Status: "status",
|
||||
FileURL: "file_url",
|
||||
FileSize: "file_size",
|
||||
FileName: "file_name",
|
||||
FileFormat: "file_format",
|
||||
FileAddressPrefix: "file_address_prefix",
|
||||
MethodUsed: "method_used",
|
||||
DurationStr: "duration_str",
|
||||
ErrorMessage: "error_message",
|
||||
}
|
||||
54
model/entity/video/cut_task.go
Normal file
54
model/entity/video/cut_task.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package video
|
||||
|
||||
import "gitea.com/red-future/common/beans"
|
||||
|
||||
// CutTask 视频分镜剪切异步任务实体
|
||||
type CutTask struct {
|
||||
beans.SQLBaseDO `orm:",inherit"`
|
||||
TaskID string `orm:"task_id" json:"taskId" description:"任务唯一标识"`
|
||||
VideoURL string `orm:"video_url" json:"videoUrl" description:"原始视频URL"`
|
||||
ShotsJSON string `orm:"shots_json" json:"shotsJson" description:"分镜JSON"`
|
||||
Status string `orm:"status" json:"status" description:"任务状态:pending/running/success/failed"`
|
||||
FileURL string `orm:"file_url" json:"fileUrl" description:"MinIO文件访问路径"`
|
||||
FileSize int64 `orm:"file_size" json:"fileSize" description:"文件大小(字节)"`
|
||||
FileName string `orm:"file_name" json:"fileName" description:"文件名"`
|
||||
FileFormat string `orm:"file_format" json:"fileFormat" description:"文件格式"`
|
||||
FileAddressPrefix string `orm:"file_address_prefix" json:"fileAddressPrefix" description:"MinIO地址前缀"`
|
||||
DurationStr string `orm:"duration_str" json:"durationStr" description:"剪切后时长"`
|
||||
ErrorMessage string `orm:"error_message" json:"errorMessage" description:"错误信息"`
|
||||
CallbackURL string `orm:"callback_url" json:"callbackUrl" description:"回调地址"`
|
||||
}
|
||||
|
||||
// CutTaskCol 字段定义
|
||||
type CutTaskCol struct {
|
||||
beans.SQLBaseCol
|
||||
TaskID string
|
||||
VideoURL string
|
||||
ShotsJSON string
|
||||
Status string
|
||||
FileURL string
|
||||
FileSize string
|
||||
FileName string
|
||||
FileFormat string
|
||||
FileAddressPrefix string
|
||||
DurationStr string
|
||||
ErrorMessage string
|
||||
CallbackURL string
|
||||
}
|
||||
|
||||
// CutTaskCols 字段常量
|
||||
var CutTaskCols = CutTaskCol{
|
||||
SQLBaseCol: beans.DefSQLBaseCol,
|
||||
TaskID: "task_id",
|
||||
VideoURL: "video_url",
|
||||
ShotsJSON: "shots_json",
|
||||
Status: "status",
|
||||
FileURL: "file_url",
|
||||
FileSize: "file_size",
|
||||
FileName: "file_name",
|
||||
FileFormat: "file_format",
|
||||
FileAddressPrefix: "file_address_prefix",
|
||||
DurationStr: "duration_str",
|
||||
ErrorMessage: "error_message",
|
||||
CallbackURL: "callback_url",
|
||||
}
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
serviceScene "media/service/scene"
|
||||
|
||||
"gitea.com/red-future/common/beans"
|
||||
"gitea.com/red-future/common/utils"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/util/gconv"
|
||||
"github.com/gogf/gf/v2/util/guid"
|
||||
@@ -79,16 +80,19 @@ func (s *audioTaskService) Create(ctx context.Context, params *CreateTaskParams)
|
||||
g.Log().Infof(ctx, "[创建任务 %s] 文件数=%d, 模型=%s, 语言=%s, 回调=%s",
|
||||
taskID, len(params.InputData), params.Model, params.Language, params.CallbackURL)
|
||||
|
||||
// 提取调用方用户信息,传给 goroutine
|
||||
user := getUserFromCtx(ctx)
|
||||
|
||||
// 异步处理
|
||||
go s.processTask(taskID, params.InputData, params.Model, params.Language, params.Threshold, params.CallbackURL)
|
||||
go s.processTask(user, taskID, params.InputData, params.Model, params.Language, params.Threshold, params.CallbackURL)
|
||||
|
||||
return &dto.CreateTaskRes{TaskID: taskID}, nil
|
||||
}
|
||||
|
||||
// processTask 异步处理所有URL,每个文件生成一条明细
|
||||
func (s *audioTaskService) processTask(taskID string, urls []string, model, language string, threshold float64, callbackURL string) {
|
||||
func (s *audioTaskService) processTask(user *beans.User, taskID string, urls []string, model, language string, threshold float64, callbackURL string) {
|
||||
ctx := context.Background()
|
||||
ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin", TenantId: 1})
|
||||
ctx = context.WithValue(ctx, "user", user)
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
@@ -196,7 +200,7 @@ func (s *audioTaskService) processTask(taskID string, urls []string, model, lang
|
||||
g.Log().Infof(ctx, "[任务 %s] 全部处理流程结束", taskID)
|
||||
}
|
||||
|
||||
// callback 向回调地址 POST 任务结果
|
||||
// callback 向回调地址 POST 任务结果(与查询接口 GetTaskRes 出参一致)
|
||||
func (s *audioTaskService) callback(ctx context.Context, taskID, status, errMsg, callbackURL string) {
|
||||
if callbackURL == "" {
|
||||
return
|
||||
@@ -214,27 +218,30 @@ func (s *audioTaskService) callback(ctx context.Context, taskID, status, errMsg,
|
||||
detailItems = append(detailItems, dao.DetailEntityToItem(&detailList[i]))
|
||||
}
|
||||
|
||||
// 构建与查询接口一致的 taskInfo
|
||||
taskInfo := dao.EntityToItem(task)
|
||||
|
||||
// 与查询接口一致:从 result 中补全 scenes 等字段
|
||||
detailItems = enrichDetailsFromResult(task.Result, detailItems)
|
||||
|
||||
payload := dto.CallbackPayload{
|
||||
TaskID: taskID,
|
||||
Status: status,
|
||||
TotalFiles: task.TotalFiles,
|
||||
SuccessFiles: task.SuccessFiles,
|
||||
FailFiles: task.FailFiles,
|
||||
ErrorMessage: errMsg,
|
||||
Result: task.Result,
|
||||
DetailList: detailItems,
|
||||
TaskInfo: taskInfo,
|
||||
DetailList: detailItems,
|
||||
}
|
||||
|
||||
body, _ := json.Marshal(payload)
|
||||
g.Log().Infof(ctx, "[回调 %s] 触发回调, 状态=%s, 成功=%d 失败=%d, 错误=%s, 目标=%s",
|
||||
taskID, status, payload.SuccessFiles, payload.FailFiles, errMsg, callbackURL)
|
||||
taskID, taskInfo.Status, taskInfo.SuccessFiles, taskInfo.FailFiles, errMsg, callbackURL)
|
||||
g.Log().Debugf(ctx, "[回调 %s] 回调载荷长度=%d字节, 明细条数=%d",
|
||||
taskID, len(body), len(detailItems))
|
||||
// 透传调用方的用户信息,供回调方 GetUserInfo 从 X-User-Info 头获取
|
||||
cbUser := getUserFromCtx(ctx)
|
||||
userJSON, _ := json.Marshal(cbUser)
|
||||
g.Log().Infof(ctx, "[回调 %s] curl -X POST '%s' -H 'Content-Type: application/json' -H 'X-User-Info: %s' -d '%s'",
|
||||
taskID, callbackURL, string(userJSON), strings.ReplaceAll(string(body), "'", "'\\''"))
|
||||
|
||||
req, _ := http.NewRequest("POST", callbackURL, bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
// 透传调用方的用户信息,供回调方 GetUserInfo 从 X-User-Info 头获取
|
||||
userJSON, _ := json.Marshal(beans.User{UserName: "admin", TenantId: 1})
|
||||
req.Header.Set("X-User-Info", string(userJSON))
|
||||
|
||||
resp, reqErr := http.DefaultClient.Do(req)
|
||||
@@ -314,6 +321,21 @@ func (s *audioTaskService) processSingleVideo(ctx context.Context, taskID, saveP
|
||||
}
|
||||
}
|
||||
|
||||
// getUserFromCtx 从 context 提取用户信息,没有则返回默认 admin
|
||||
func getUserFromCtx(ctx context.Context) *beans.User {
|
||||
if u := ctx.Value("user"); u != nil {
|
||||
if user, ok := u.(*beans.User); ok {
|
||||
return user
|
||||
}
|
||||
}
|
||||
// 尝试用 common 库解析
|
||||
user, err := utils.GetUserInfo(ctx)
|
||||
if err == nil && user != nil {
|
||||
return user
|
||||
}
|
||||
return &beans.User{UserName: "admin", TenantId: 1}
|
||||
}
|
||||
|
||||
// saveDetail 保存单文件明细到 transcribe_task_detail
|
||||
func (s *audioTaskService) saveDetail(ctx context.Context, taskID string, fileIndex int, fileName, text, scenes string, audioSize int64, audioDuration, model, language, errMsg string) {
|
||||
detail := &entity.TranscribeTaskDetail{
|
||||
|
||||
@@ -7,15 +7,24 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
dao "media/dao/video"
|
||||
dto "media/model/dto/video"
|
||||
entity "media/model/entity/video"
|
||||
|
||||
"gitea.com/red-future/common/beans"
|
||||
commonHttp "gitea.com/red-future/common/http"
|
||||
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/os/glog"
|
||||
"github.com/gogf/gf/v2/util/guid"
|
||||
)
|
||||
|
||||
type concatService struct{}
|
||||
@@ -44,6 +53,9 @@ type ConcatRes struct {
|
||||
|
||||
// Concat 拼接多个视频为一个
|
||||
func (s *concatService) Concat(ctx context.Context, req *ConcatReq) (res *ConcatRes, err error) {
|
||||
g.Log().Infof(ctx, "[Concat] 服务层收到请求: videoPaths=%v, method=%s, upload=%v",
|
||||
req.VideoPaths, req.Method, req.Upload)
|
||||
|
||||
if len(req.VideoPaths) < 2 {
|
||||
return nil, fmt.Errorf("至少需要2个视频才能拼接")
|
||||
}
|
||||
@@ -64,7 +76,16 @@ func (s *concatService) Concat(ctx context.Context, req *ConcatReq) (res *Concat
|
||||
outputPath := req.OutputPath
|
||||
if outputPath == "" {
|
||||
outputDir := filepath.Dir(req.VideoPaths[0])
|
||||
outputPath = filepath.Join(outputDir, "concat_output.mp4")
|
||||
// 用第一个输入文件名 + 拼接数 + 时间戳,溯源更清晰
|
||||
baseName := filepath.Base(req.VideoPaths[0])
|
||||
ext := filepath.Ext(baseName)
|
||||
stem := strings.TrimSuffix(baseName, ext)
|
||||
stemRunes := []rune(stem)
|
||||
if len(stemRunes) > 20 {
|
||||
stemRunes = stemRunes[:20]
|
||||
}
|
||||
outputPath = filepath.Join(outputDir,
|
||||
fmt.Sprintf("concat_%s_x%d_%s%s", string(stemRunes), len(req.VideoPaths), time.Now().Format("150405"), ext))
|
||||
}
|
||||
|
||||
method := req.Method
|
||||
@@ -107,9 +128,10 @@ func (s *concatService) Concat(ctx context.Context, req *ConcatReq) (res *Concat
|
||||
InputFiles: len(req.VideoPaths),
|
||||
}
|
||||
|
||||
// 如果需要上传到 MinIO
|
||||
// 如果需要上传到 MinIO(用独立 context,避免 HTTP 断开后 ctx 被取消)
|
||||
if req.Upload {
|
||||
uploadRes, uploadErr := s.UploadToMinIO(ctx, outputPath)
|
||||
uploadCtx := context.Background()
|
||||
uploadRes, uploadErr := s.UploadToMinIO(uploadCtx, outputPath)
|
||||
if uploadErr != nil {
|
||||
return nil, fmt.Errorf("上传到MinIO失败: %v", uploadErr)
|
||||
}
|
||||
@@ -141,7 +163,9 @@ func (s *concatService) concatByDemuxer(ctx context.Context, ffmpegPath string,
|
||||
output,
|
||||
}
|
||||
|
||||
cmd := exec.CommandContext(ctx, ffmpegPath, args...)
|
||||
// 使用独立 context,避免 HTTP 请求超时导致 ffmpeg 被 SIGKILL
|
||||
bgCtx := context.Background()
|
||||
cmd := exec.CommandContext(bgCtx, ffmpegPath, args...)
|
||||
outputBytes, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("ffmpeg demuxer 失败: %v\n%s", err, string(outputBytes))
|
||||
@@ -180,22 +204,40 @@ func (s *concatService) concatByFilter(ctx context.Context, ffmpegPath string, i
|
||||
inputArgs = append(inputArgs, "-i", p)
|
||||
}
|
||||
|
||||
// 3. 构建 filter_complex:每个视频 scale+pad 到统一尺寸,然后 concat
|
||||
// 3. 检测每个视频是否有音频轨道及时长
|
||||
hasAudio := make([]bool, n)
|
||||
videoDuration := make([]float64, n)
|
||||
for i, p := range inputs {
|
||||
hasAudio[i] = s.hasVideoAudio(ctx, ffmpegPath, p)
|
||||
videoDuration[i], _ = s.getVideoDuration(ctx, ffmpegPath, p)
|
||||
}
|
||||
|
||||
// 4. 构建 filter_complex:每个视频 scale+pad 到统一尺寸,然后 concat
|
||||
var filterParts []string
|
||||
var concatInputs []string
|
||||
for i := 0; i < n; i++ {
|
||||
filterParts = append(filterParts, fmt.Sprintf(
|
||||
"[%d:v]scale=%d:%d:force_original_aspect_ratio=decrease,pad=%d:%d:(ow-iw)/2:(oh-ih)/2,setsar=1,fps=30[v%d]",
|
||||
i, maxW, maxH, maxW, maxH, i,
|
||||
))
|
||||
filterParts = append(filterParts, fmt.Sprintf(
|
||||
"[%d:a]aresample=44100[a%d]",
|
||||
i, i,
|
||||
))
|
||||
}
|
||||
// 收集归一化后的流
|
||||
var concatInputs []string
|
||||
for i := 0; i < n; i++ {
|
||||
concatInputs = append(concatInputs, fmt.Sprintf("[v%d][a%d]", i, i))
|
||||
if hasAudio[i] {
|
||||
filterParts = append(filterParts, fmt.Sprintf(
|
||||
"[%d:a]aresample=44100[a%d]",
|
||||
i, i,
|
||||
))
|
||||
concatInputs = append(concatInputs, fmt.Sprintf("[v%d][a%d]", i, i))
|
||||
} else {
|
||||
// 无音频轨道,生成匹配视频时长的静音音频
|
||||
dur := videoDuration[i]
|
||||
if dur <= 0 {
|
||||
dur = 30 // 保底30秒
|
||||
}
|
||||
filterParts = append(filterParts, fmt.Sprintf(
|
||||
"aevalsrc=0:n=2:s=44100:d=%.2f[a%d]",
|
||||
dur, i,
|
||||
))
|
||||
concatInputs = append(concatInputs, fmt.Sprintf("[v%d][a%d]", i, i))
|
||||
}
|
||||
}
|
||||
filterStr := fmt.Sprintf("%s;%sconcat=n=%d:v=1:a=1[outv][outa]",
|
||||
strings.Join(filterParts, ";"),
|
||||
@@ -206,8 +248,10 @@ func (s *concatService) concatByFilter(ctx context.Context, ffmpegPath string, i
|
||||
"-filter_complex", filterStr,
|
||||
"-map", "[outv]",
|
||||
"-map", "[outa]",
|
||||
"-preset", "fast",
|
||||
"-crf", "23",
|
||||
"-c:v", "h264_videotoolbox",
|
||||
"-b:v", "5M",
|
||||
"-allow_sw", "true",
|
||||
"-c:a", "aac",
|
||||
"-y",
|
||||
output,
|
||||
)
|
||||
@@ -220,7 +264,9 @@ func (s *concatService) concatByFilter(ctx context.Context, ffmpegPath string, i
|
||||
os.WriteFile(filterFile, []byte(filterStr), 0644)
|
||||
defer os.Remove(filterFile)
|
||||
|
||||
cmd := exec.CommandContext(ctx, ffmpegPath, args...)
|
||||
// 使用独立 context,避免 HTTP 请求超时导致 ffmpeg 被 SIGKILL
|
||||
bgCtx := context.Background()
|
||||
cmd := exec.CommandContext(bgCtx, ffmpegPath, args...)
|
||||
outputBytes, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("ffmpeg filter 失败: %v\n日志:\n%s", err, string(outputBytes))
|
||||
@@ -274,6 +320,28 @@ func (s *concatService) getVideoDuration(ctx context.Context, ffmpegPath, videoP
|
||||
return duration, nil
|
||||
}
|
||||
|
||||
// hasVideoAudio 检测视频文件是否有音频轨道
|
||||
func (s *concatService) hasVideoAudio(ctx context.Context, ffmpegPath, videoPath string) bool {
|
||||
ffprobePath := filepath.Join(filepath.Dir(ffmpegPath), "ffprobe")
|
||||
if _, err := os.Stat(ffprobePath); os.IsNotExist(err) {
|
||||
ffprobePath = "ffprobe"
|
||||
}
|
||||
|
||||
cmd := exec.CommandContext(ctx, ffprobePath,
|
||||
"-v", "error",
|
||||
"-select_streams", "a:0",
|
||||
"-show_entries", "stream=codec_type",
|
||||
"-of", "default=noprint_wrappers=1:nokey=1",
|
||||
videoPath,
|
||||
)
|
||||
output, err := cmd.Output()
|
||||
if err != nil || len(strings.TrimSpace(string(output))) == 0 {
|
||||
return false
|
||||
}
|
||||
// 检测视频时长,如果为0则用 aevalsrc 生成静音
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *concatService) getFFmpegPath() (string, error) {
|
||||
ffmpegPath := g.Cfg().MustGet(context.Background(), "ffmpeg.path", "").String()
|
||||
if ffmpegPath != "" {
|
||||
@@ -304,9 +372,9 @@ type uploadFileRes struct {
|
||||
FileAddressPrefix string `json:"fileAddressPrefix"`
|
||||
}
|
||||
|
||||
// UploadToMinIO 通过 OSS 微服务的 multipart 文件上传接口上传到 MinIO
|
||||
// UploadToMinIO 通过 OSS 微服务的 uploadFile 接口上传到 MinIO(multipart/form-data)
|
||||
func (s *concatService) UploadToMinIO(ctx context.Context, localFilePath string) (*uploadFileRes, error) {
|
||||
// 手动构建 multipart/form-data 表单
|
||||
// 构建 multipart/form-data 表单
|
||||
var buf bytes.Buffer
|
||||
mw := multipart.NewWriter(&buf)
|
||||
|
||||
@@ -325,25 +393,40 @@ func (s *concatService) UploadToMinIO(ctx context.Context, localFilePath string)
|
||||
}
|
||||
mw.Close()
|
||||
|
||||
// 使用 commonHttp 的客户端(含 Consul 服务发现),大文件上传设置长超时
|
||||
client := commonHttp.Httpclient.Clone()
|
||||
// 必须单独设置 Transport.ResponseHeaderTimeout,SetTimeout 只设 Client.Timeout
|
||||
newTransport := http.DefaultTransport.(*http.Transport).Clone()
|
||||
newTransport.ResponseHeaderTimeout = 5 * time.Minute
|
||||
client.Transport = newTransport
|
||||
client.SetTimeout(10 * time.Minute)
|
||||
|
||||
// 透传认证 headers
|
||||
// 透传认证 headers(优先从 HTTP 请求头取)
|
||||
hasAuthHeader := false
|
||||
if r := g.RequestFromCtx(ctx); r != nil {
|
||||
for k, v := range r.Header {
|
||||
client.SetHeader(k, v[0])
|
||||
if strings.EqualFold(k, "Authorization") || strings.EqualFold(k, "X-User-Info") {
|
||||
hasAuthHeader = true
|
||||
}
|
||||
}
|
||||
}
|
||||
// 无 HTTP 请求时(异步 goroutine),从 context 的用户信息构造 header
|
||||
if !hasAuthHeader {
|
||||
uploadUser := getUserFromCtx(ctx)
|
||||
userJSON, _ := json.Marshal(uploadUser)
|
||||
client.SetHeader("X-User-Info", string(userJSON))
|
||||
}
|
||||
|
||||
// 设置 multipart Content-Type(含 boundary)
|
||||
contentType := mw.FormDataContentType()
|
||||
g.Log().Debugf(ctx, "[UploadToMinIO] Content-Type: %s", contentType)
|
||||
client.SetHeader("Content-Type", contentType)
|
||||
|
||||
// 打印请求信息
|
||||
postBytes := buf.Bytes()
|
||||
g.Log().Debugf(ctx, "[UploadToMinIO] 请求URL: oss/file/uploadFile, 文件: %s, Body大小: %d bytes, Boundary: %s",
|
||||
localFilePath, len(postBytes), mw.Boundary())
|
||||
g.Log().Debugf(ctx, "[UploadToMinIO] 请求URL: oss/file/uploadFile, 文件: %s, Body大小: %d bytes",
|
||||
localFilePath, buf.Len())
|
||||
|
||||
response, err := client.Post(ctx, "oss/file/uploadFile", postBytes)
|
||||
// 发送 multipart 请求(原始字节流)
|
||||
response, err := client.Post(ctx, "oss/file/uploadFile", buf.Bytes())
|
||||
if err != nil {
|
||||
glog.Error(ctx, err)
|
||||
return nil, fmt.Errorf("调用OSS上传服务失败: %v", err)
|
||||
@@ -379,3 +462,281 @@ func CleanupConcat(paths []string) {
|
||||
os.Remove(p)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- 异步拼接任务管理 ----------
|
||||
|
||||
// CreateAsyncTask 创建异步拼接任务(URL模式),返回 taskId,后台处理
|
||||
func (s *concatService) CreateAsyncTask(ctx context.Context, videoURLs []string, method string, upload bool, callbackURL string) (string, error) {
|
||||
if len(videoURLs) < 2 {
|
||||
return "", fmt.Errorf("至少需要2个视频才能拼接")
|
||||
}
|
||||
|
||||
taskID := "concat_" + guid.S()
|
||||
task := &entity.ConcatTask{
|
||||
TaskID: taskID,
|
||||
Status: "pending",
|
||||
MethodUsed: method,
|
||||
}
|
||||
if _, err := dao.ConcatTask.Insert(ctx, task); err != nil {
|
||||
return "", fmt.Errorf("创建任务失败: %v", err)
|
||||
}
|
||||
|
||||
// 提取调用方用户信息,传给 goroutine
|
||||
user := getUserFromCtx(ctx)
|
||||
|
||||
g.Log().Infof(ctx, "[异步拼接] 创建任务 %s, 视频数=%d, 回调=%s", taskID, len(videoURLs), callbackURL)
|
||||
|
||||
// 异步处理:先下载再拼接
|
||||
go s.processAsyncTask(user, taskID, videoURLs, method, upload, callbackURL)
|
||||
|
||||
return taskID, nil
|
||||
}
|
||||
|
||||
// CreateAsyncTaskWithFiles 创建异步拼接任务(文件上传模式),直接处理本地文件
|
||||
func (s *concatService) CreateAsyncTaskWithFiles(ctx context.Context, filePaths []string, method string, upload bool, callbackURL string) (string, error) {
|
||||
if len(filePaths) < 2 {
|
||||
return "", fmt.Errorf("至少需要2个视频才能拼接")
|
||||
}
|
||||
|
||||
taskID := "concat_" + guid.S()
|
||||
task := &entity.ConcatTask{
|
||||
TaskID: taskID,
|
||||
Status: "pending",
|
||||
MethodUsed: method,
|
||||
}
|
||||
if _, err := dao.ConcatTask.Insert(ctx, task); err != nil {
|
||||
return "", fmt.Errorf("创建任务失败: %v", err)
|
||||
}
|
||||
|
||||
// 提取调用方用户信息,传给 goroutine
|
||||
user := getUserFromCtx(ctx)
|
||||
|
||||
g.Log().Infof(ctx, "[异步拼接-文件] 创建任务 %s, 文件数=%d, 回调=%s", taskID, len(filePaths), callbackURL)
|
||||
|
||||
// 异步处理:已有本地文件,直接拼接
|
||||
go s.processAsyncTaskWithFiles(user, taskID, filePaths, method, upload, callbackURL)
|
||||
|
||||
return taskID, nil
|
||||
}
|
||||
|
||||
// getUserFromCtx 从 context 中提取用户信息,如果没有则返回默认 admin
|
||||
func getUserFromCtx(ctx context.Context) *beans.User {
|
||||
if u := ctx.Value("user"); u != nil {
|
||||
if user, ok := u.(*beans.User); ok {
|
||||
return user
|
||||
}
|
||||
}
|
||||
return &beans.User{UserName: "admin", TenantId: 1}
|
||||
}
|
||||
|
||||
// GetTaskResult 查询异步任务结果
|
||||
func (s *concatService) GetTaskResult(ctx context.Context, taskID string) (*dto.GetConcatTaskRes, error) {
|
||||
task, err := dao.ConcatTask.GetByTaskID(ctx, taskID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("查询任务失败: %v", err)
|
||||
}
|
||||
if task == nil {
|
||||
return nil, fmt.Errorf("任务不存在: %s", taskID)
|
||||
}
|
||||
|
||||
return dao.EntityToTaskRes(task), nil
|
||||
}
|
||||
|
||||
// processAsyncTaskWithFiles 后台处理异步拼接任务(文件上传模式,文件已在本地)
|
||||
func (s *concatService) processAsyncTaskWithFiles(user *beans.User, taskID string, filePaths []string, method string, upload bool, callbackURL string) {
|
||||
bgCtx := context.Background()
|
||||
bgCtx = context.WithValue(bgCtx, "user", user)
|
||||
|
||||
dao.ConcatTask.UpdateRunning(bgCtx, taskID)
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
errMsg := fmt.Sprintf("异步拼接异常: %v", r)
|
||||
g.Log().Errorf(bgCtx, "[异步拼接 %s] %s", taskID, errMsg)
|
||||
dao.ConcatTask.UpdateError(bgCtx, taskID, errMsg)
|
||||
s.concatCallback(bgCtx, taskID, callbackURL)
|
||||
}
|
||||
}()
|
||||
|
||||
concatErr := s.executeConcat(bgCtx, taskID, filePaths, method, upload)
|
||||
if concatErr != nil {
|
||||
dao.ConcatTask.UpdateError(bgCtx, taskID, concatErr.Error())
|
||||
s.concatCallback(bgCtx, taskID, callbackURL)
|
||||
return
|
||||
}
|
||||
|
||||
g.Log().Infof(bgCtx, "[异步拼接 %s] 完成", taskID)
|
||||
|
||||
if callbackURL != "" {
|
||||
s.concatCallback(bgCtx, taskID, callbackURL)
|
||||
}
|
||||
}
|
||||
|
||||
// processAsyncTask 后台处理异步拼接任务(URL模式,需要先下载)
|
||||
func (s *concatService) processAsyncTask(user *beans.User, taskID string, videoURLs []string, method string, upload bool, callbackURL string) {
|
||||
bgCtx := context.Background()
|
||||
bgCtx = context.WithValue(bgCtx, "user", user)
|
||||
|
||||
dao.ConcatTask.UpdateRunning(bgCtx, taskID)
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
errMsg := fmt.Sprintf("异步拼接异常: %v", r)
|
||||
g.Log().Errorf(bgCtx, "[异步拼接 %s] %s", taskID, errMsg)
|
||||
dao.ConcatTask.UpdateError(bgCtx, taskID, errMsg)
|
||||
s.concatCallback(bgCtx, taskID, callbackURL)
|
||||
}
|
||||
}()
|
||||
|
||||
// 下载视频
|
||||
var savePaths []string
|
||||
tempDir := g.Cfg().MustGet(bgCtx, "ffmpeg.temp_dir", "resource/temp").String()
|
||||
os.MkdirAll(tempDir, 0755)
|
||||
|
||||
for _, videoURL := range videoURLs {
|
||||
savePath, dlErr := downloadFile(bgCtx, videoURL, tempDir)
|
||||
if dlErr != nil {
|
||||
g.Log().Warningf(bgCtx, "[异步拼接 %s] 下载失败 %s: %v", taskID, videoURL, dlErr)
|
||||
continue
|
||||
}
|
||||
savePaths = append(savePaths, savePath)
|
||||
}
|
||||
|
||||
if len(savePaths) < 2 {
|
||||
errMsg := fmt.Sprintf("成功下载的视频不足2个(共%d)", len(videoURLs))
|
||||
dao.ConcatTask.UpdateError(bgCtx, taskID, errMsg)
|
||||
CleanupConcat(savePaths)
|
||||
s.concatCallback(bgCtx, taskID, callbackURL)
|
||||
return
|
||||
}
|
||||
|
||||
// 执行拼接
|
||||
concatErr := s.executeConcat(bgCtx, taskID, savePaths, method, upload)
|
||||
CleanupConcat(savePaths)
|
||||
|
||||
if concatErr != nil {
|
||||
dao.ConcatTask.UpdateError(bgCtx, taskID, concatErr.Error())
|
||||
s.concatCallback(bgCtx, taskID, callbackURL)
|
||||
return
|
||||
}
|
||||
|
||||
g.Log().Infof(bgCtx, "[异步拼接 %s] 完成", taskID)
|
||||
|
||||
if callbackURL != "" {
|
||||
s.concatCallback(bgCtx, taskID, callbackURL)
|
||||
}
|
||||
}
|
||||
|
||||
// executeConcat 执行拼接并更新任务状态,返回输出路径
|
||||
func (s *concatService) executeConcat(ctx context.Context, taskID string, filePaths []string, method string, upload bool) error {
|
||||
tempDir := filepath.Dir(filePaths[0])
|
||||
outputPath := filepath.Join(tempDir,
|
||||
fmt.Sprintf("concat_%s_x%d_%s.mp4", taskID, len(filePaths), time.Now().Format("150405")))
|
||||
|
||||
res, concatErr := s.Concat(ctx, &ConcatReq{
|
||||
VideoPaths: filePaths,
|
||||
OutputPath: outputPath,
|
||||
Method: method,
|
||||
Upload: upload,
|
||||
})
|
||||
if concatErr != nil {
|
||||
os.Remove(outputPath)
|
||||
return concatErr
|
||||
}
|
||||
|
||||
// 更新数据库为成功
|
||||
fileName := filepath.Base(outputPath)
|
||||
fileFormat := ""
|
||||
if idx := strings.LastIndex(fileName, "."); idx > 0 {
|
||||
fileFormat = fileName[idx+1:]
|
||||
}
|
||||
dao.ConcatTask.UpdateSuccess(ctx, taskID,
|
||||
res.FileURL, res.FileSize, fileName, fileFormat,
|
||||
"", res.MethodUsed, res.DurationStr)
|
||||
|
||||
os.Remove(outputPath)
|
||||
return nil
|
||||
}
|
||||
|
||||
// concatCallback 回调通知(从数据库读取任务结果发送)
|
||||
func (s *concatService) concatCallback(ctx context.Context, taskID, callbackURL string) {
|
||||
if callbackURL == "" {
|
||||
return
|
||||
}
|
||||
|
||||
task, err := dao.ConcatTask.GetByTaskID(ctx, taskID)
|
||||
if err != nil || task == nil {
|
||||
g.Log().Errorf(ctx, "[异步拼接回调 %s] 查询任务失败: %v", taskID, err)
|
||||
return
|
||||
}
|
||||
|
||||
payload := map[string]interface{}{
|
||||
"taskId": task.TaskID,
|
||||
"status": task.Status,
|
||||
}
|
||||
if task.Status == "success" {
|
||||
payload["fileURL"] = task.FileURL
|
||||
payload["fileSize"] = task.FileSize
|
||||
}
|
||||
if task.Status == "failed" {
|
||||
payload["errorMessage"] = task.ErrorMessage
|
||||
}
|
||||
|
||||
body, _ := json.Marshal(payload)
|
||||
g.Log().Infof(ctx, "[异步拼接回调 %s] 状态=%s, 目标=%s", taskID, task.Status, callbackURL)
|
||||
|
||||
req, _ := http.NewRequest("POST", callbackURL, bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
// 透传调用方用户信息
|
||||
cbUser := getUserFromCtx(ctx)
|
||||
userJSON, _ := json.Marshal(cbUser)
|
||||
req.Header.Set("X-User-Info", string(userJSON))
|
||||
|
||||
client := &http.Client{Timeout: 2 * time.Minute}
|
||||
resp, reqErr := client.Do(req)
|
||||
if reqErr != nil {
|
||||
g.Log().Errorf(ctx, "[异步拼接回调 %s] 请求失败: %v", taskID, reqErr)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
g.Log().Infof(ctx, "[异步拼接回调 %s] 响应 status=%d, body=%s", taskID, resp.StatusCode, string(respBody))
|
||||
}
|
||||
|
||||
// downloadFile 下载文件到临时目录
|
||||
func downloadFile(ctx context.Context, rawURL, tempDir string) (string, error) {
|
||||
parsedURL, err := url.Parse(rawURL)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
segments := strings.Split(parsedURL.Path, "/")
|
||||
fileName := segments[len(segments)-1]
|
||||
if fileName == "" {
|
||||
fileName = fmt.Sprintf("video_%d.mp4", time.Now().UnixMilli())
|
||||
}
|
||||
savePath := filepath.Join(tempDir, fmt.Sprintf("%d_%s", time.Now().UnixMilli(), fileName))
|
||||
|
||||
client := &http.Client{Timeout: 10 * time.Minute}
|
||||
resp, err := client.Get(rawURL)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return "", fmt.Errorf("HTTP %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
out, err := os.Create(savePath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer out.Close()
|
||||
|
||||
_, err = io.Copy(out, resp.Body)
|
||||
if err != nil {
|
||||
os.Remove(savePath)
|
||||
return "", err
|
||||
}
|
||||
return savePath, nil
|
||||
}
|
||||
|
||||
662
service/video/cut_service.go
Normal file
662
service/video/cut_service.go
Normal file
@@ -0,0 +1,662 @@
|
||||
package video
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
dao "media/dao/video"
|
||||
dto "media/model/dto/video"
|
||||
entity "media/model/entity/video"
|
||||
|
||||
"gitea.com/red-future/common/beans"
|
||||
commonHttp "gitea.com/red-future/common/http"
|
||||
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/os/glog"
|
||||
"github.com/gogf/gf/v2/util/guid"
|
||||
)
|
||||
|
||||
type cutService struct{}
|
||||
|
||||
// Cut 视频分镜剪切服务单例
|
||||
var Cut = new(cutService)
|
||||
|
||||
// CutShot 单个分镜片段(转换为秒后内部存储)
|
||||
type CutShot struct {
|
||||
Start float64 `json:"start"`
|
||||
End float64 `json:"end"`
|
||||
}
|
||||
|
||||
// CutReq 视频剪切请求
|
||||
type CutReq struct {
|
||||
VideoPath string // 输入视频文件路径
|
||||
Shots []CutShot // 分镜片段列表(按此顺序剪切拼接)
|
||||
OutputPath string // 输出视频文件路径,空则自动生成
|
||||
Upload bool // 是否上传到MinIO
|
||||
}
|
||||
|
||||
// CutRes 视频剪切响应
|
||||
type CutRes struct {
|
||||
OutputPath string `json:"outputPath"` // 输出文件路径
|
||||
FileSize int64 `json:"fileSize"` // 文件大小(bytes)
|
||||
Duration float64 `json:"duration"` // 总时长(秒)
|
||||
DurationStr string `json:"durationStr"` // 可读时长
|
||||
ShotsCount int `json:"shotsCount"` // 输出片段数
|
||||
FileURL string `json:"fileURL"` // MinIO访问地址(上传后返回)
|
||||
}
|
||||
|
||||
// parseTimeStr 解析 HH:MM:SS.mmm 格式为秒
|
||||
func parseTimeStr(timeStr string) (float64, error) {
|
||||
parts := strings.Split(timeStr, ":")
|
||||
if len(parts) != 3 {
|
||||
return 0, fmt.Errorf("invalid time format: %s, expected HH:MM:SS.mmm", timeStr)
|
||||
}
|
||||
|
||||
h, err := strconv.Atoi(parts[0])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
m, err := strconv.Atoi(parts[1])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// 处理秒和毫秒
|
||||
secParts := strings.SplitN(parts[2], ".", 2)
|
||||
s, err := strconv.Atoi(secParts[0])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
ms := 0
|
||||
if len(secParts) > 1 {
|
||||
ms, _ = strconv.Atoi(secParts[1])
|
||||
}
|
||||
|
||||
total := float64(h)*3600 + float64(m)*60 + float64(s) + float64(ms)/1000.0
|
||||
return total, nil
|
||||
}
|
||||
|
||||
// ConvertScenes 将 DTO 的分镜片段数组转换为内部的 CutShot 列表
|
||||
// 同时兼容两种格式:
|
||||
// 1. 如果 scene.Start > 0 && scene.End > scene.Start → 直接使用
|
||||
// 2. 否则 → 解析 scene.StartTimeStr / scene.EndTimeStr
|
||||
func ConvertScenes(scenes []dto.CutScene) ([]CutShot, error) {
|
||||
var shots []CutShot
|
||||
for _, scene := range scenes {
|
||||
var (
|
||||
start float64
|
||||
end float64
|
||||
err error
|
||||
)
|
||||
|
||||
// 优先使用 start/end 直接给出的秒数
|
||||
if scene.Start > 0 && scene.End > scene.Start {
|
||||
start = scene.Start
|
||||
end = scene.End
|
||||
} else if scene.StartTimeStr != "" && scene.EndTimeStr != "" {
|
||||
// 否则解析时间字符串 HH:MM:SS.mmm
|
||||
start, err = parseTimeStr(scene.StartTimeStr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse start time %s: %v", scene.StartTimeStr, err)
|
||||
}
|
||||
end, err = parseTimeStr(scene.EndTimeStr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse end time %s: %v", scene.EndTimeStr, err)
|
||||
}
|
||||
} else {
|
||||
// 没有可用的时间信息,跳过
|
||||
continue
|
||||
}
|
||||
|
||||
if end > start {
|
||||
shots = append(shots, CutShot{Start: start, End: end})
|
||||
}
|
||||
}
|
||||
return shots, nil
|
||||
}
|
||||
|
||||
// Cut 根据分镜剪切多个片段并拼接输出
|
||||
func (s *cutService) Cut(ctx context.Context, req *CutReq) (res *CutRes, err error) {
|
||||
g.Log().Infof(ctx, "[Cut] 服务层收到请求: video=%s, shots=%d, upload=%v",
|
||||
req.VideoPath, len(req.Shots), req.Upload)
|
||||
|
||||
// 校验输入
|
||||
if _, err := os.Stat(req.VideoPath); os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("输入视频文件不存在: %s", req.VideoPath)
|
||||
}
|
||||
|
||||
// 过滤掉无效片段(start >= end)
|
||||
var validShots []CutShot
|
||||
for _, shot := range req.Shots {
|
||||
if shot.End > shot.Start {
|
||||
validShots = append(validShots, shot)
|
||||
}
|
||||
}
|
||||
if len(validShots) == 0 {
|
||||
return nil, fmt.Errorf("没有有效的分镜片段(所有片段 start >= end)")
|
||||
}
|
||||
|
||||
ffmpegPath, err := s.getFFmpegPath()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 生成输出路径
|
||||
outputPath := req.OutputPath
|
||||
if outputPath == "" {
|
||||
outputDir := filepath.Dir(req.VideoPath)
|
||||
baseName := filepath.Base(req.VideoPath)
|
||||
ext := filepath.Ext(baseName)
|
||||
stem := strings.TrimSuffix(baseName, ext)
|
||||
stemRunes := []rune(stem)
|
||||
if len(stemRunes) > 20 {
|
||||
stemRunes = stemRunes[:20]
|
||||
}
|
||||
outputPath = filepath.Join(outputDir,
|
||||
fmt.Sprintf("cut_%s_x%d_%s%s", string(stemRunes), len(validShots), time.Now().Format("150405"), ext))
|
||||
}
|
||||
|
||||
// 执行剪切拼接
|
||||
err = s.cutByFilterComplex(ctx, ffmpegPath, req.VideoPath, validShots, outputPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("视频剪切失败: %v", err)
|
||||
}
|
||||
|
||||
// 获取输出文件信息
|
||||
stat, statErr := os.Stat(outputPath)
|
||||
if statErr != nil {
|
||||
return nil, fmt.Errorf("输出文件异常: %v", statErr)
|
||||
}
|
||||
|
||||
// 获取时长
|
||||
duration, _ := s.getVideoDuration(ctx, ffmpegPath, outputPath)
|
||||
|
||||
res = &CutRes{
|
||||
OutputPath: outputPath,
|
||||
FileSize: stat.Size(),
|
||||
Duration: duration,
|
||||
DurationStr: formatDuration(duration),
|
||||
ShotsCount: len(validShots),
|
||||
}
|
||||
|
||||
// 如果需要上传到 MinIO
|
||||
if req.Upload {
|
||||
uploadCtx := context.Background()
|
||||
uploadRes, uploadErr := s.UploadToMinIO(uploadCtx, outputPath)
|
||||
if uploadErr != nil {
|
||||
return nil, fmt.Errorf("上传到MinIO失败: %v", uploadErr)
|
||||
}
|
||||
res.FileURL = uploadRes.FileURL
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// cutByFilterComplex 使用 filter_complex 一步完成剪切拼接
|
||||
// 对每个片段用 trim/atrim 截取,然后 concat 拼接
|
||||
func (s *cutService) cutByFilterComplex(ctx context.Context, ffmpegPath, inputPath string, shots []CutShot, outputPath string) error {
|
||||
n := len(shots)
|
||||
|
||||
// 检测视频是否有音频轨道
|
||||
hasAudio, err := s.checkVideoAudio(ctx, ffmpegPath, inputPath)
|
||||
if err != nil {
|
||||
g.Log().Debugf(ctx, "[Cut] 检测音频失败: %v", err)
|
||||
hasAudio = false
|
||||
}
|
||||
|
||||
// 获取视频总时长,用于裁剪超出的 end
|
||||
videoDuration, _ := s.getVideoDuration(ctx, ffmpegPath, inputPath)
|
||||
|
||||
// 构建 filter_complex
|
||||
var filterParts []string
|
||||
var concatInputs []string
|
||||
|
||||
for i, shot := range shots {
|
||||
// 确保 end 不超过视频时长
|
||||
end := shot.End
|
||||
if videoDuration > 0 && end > videoDuration {
|
||||
end = videoDuration
|
||||
}
|
||||
if end <= shot.Start {
|
||||
continue
|
||||
}
|
||||
|
||||
// 视频: trim 截取,重置 PTS
|
||||
filterParts = append(filterParts, fmt.Sprintf(
|
||||
"[0:v]trim=start=%.3f:end=%.3f,setpts=PTS-STARTPTS[v%d]",
|
||||
shot.Start, end, i,
|
||||
))
|
||||
|
||||
// 音频处理
|
||||
if hasAudio {
|
||||
// 有音频轨道,用 atrim 截取
|
||||
filterParts = append(filterParts, fmt.Sprintf(
|
||||
"[0:a]atrim=start=%.3f:end=%.3f,asetpts=PTS-STARTPTS[a%d]",
|
||||
shot.Start, end, i,
|
||||
))
|
||||
concatInputs = append(concatInputs, fmt.Sprintf("[v%d][a%d]", i, i))
|
||||
} else {
|
||||
// 无音频轨道,为这个片段生成静音音频
|
||||
dur := end - shot.Start
|
||||
filterParts = append(filterParts, fmt.Sprintf(
|
||||
"aevalsrc=0:n=2:s=44100:d=%.2f[a%d]",
|
||||
dur, i,
|
||||
))
|
||||
concatInputs = append(concatInputs, fmt.Sprintf("[v%d][a%d]", i, i))
|
||||
}
|
||||
}
|
||||
|
||||
// 拼接所有片段
|
||||
filterStr := fmt.Sprintf("%s;%sconcat=n=%d:v=1:a=1[outv][outa]",
|
||||
strings.Join(filterParts, ";"),
|
||||
strings.Join(concatInputs, ""), n,
|
||||
)
|
||||
|
||||
// 构造完整参数
|
||||
args := []string{
|
||||
"-i", inputPath,
|
||||
"-filter_complex", filterStr,
|
||||
"-map", "[outv]",
|
||||
"-map", "[outa]",
|
||||
"-c:v", "h264_videotoolbox",
|
||||
"-b:v", "5M",
|
||||
"-allow_sw", "true",
|
||||
"-c:a", "aac",
|
||||
"-y",
|
||||
outputPath,
|
||||
}
|
||||
|
||||
// 调试:记录完整命令
|
||||
g.Log().Debugf(ctx, "[Cut] ffmpeg 命令: %s %v", ffmpegPath, args)
|
||||
|
||||
// 保存 filter 用于调试
|
||||
outputDir := filepath.Dir(outputPath)
|
||||
filterFile := filepath.Join(outputDir, "cut_filter.txt")
|
||||
os.WriteFile(filterFile, []byte(filterStr), 0644)
|
||||
defer os.Remove(filterFile)
|
||||
|
||||
// 使用独立 context,避免 HTTP 请求超时导致 ffmpeg 被 SIGKILL
|
||||
bgCtx := context.Background()
|
||||
cmd := exec.CommandContext(bgCtx, ffmpegPath, args...)
|
||||
outputBytes, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("ffmpeg 执行失败: %v\n日志:\n%s", err, string(outputBytes))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getFFmpegPath 获取 FFmpeg 路径
|
||||
func (s *cutService) getFFmpegPath() (string, error) {
|
||||
ffmpegPath := g.Cfg().MustGet(context.Background(), "ffmpeg.path", "").String()
|
||||
if ffmpegPath != "" {
|
||||
if _, err := os.Stat(ffmpegPath); err == nil {
|
||||
return ffmpegPath, nil
|
||||
}
|
||||
}
|
||||
path, err := exec.LookPath("ffmpeg")
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("未找到 ffmpeg")
|
||||
}
|
||||
return path, nil
|
||||
}
|
||||
|
||||
// getVideoDuration 获取视频时长
|
||||
func (s *cutService) getVideoDuration(ctx context.Context, ffmpegPath, videoPath string) (float64, error) {
|
||||
ffprobePath := filepath.Join(filepath.Dir(ffmpegPath), "ffprobe")
|
||||
if _, err := os.Stat(ffprobePath); os.IsNotExist(err) {
|
||||
ffprobePath = "ffprobe"
|
||||
}
|
||||
|
||||
cmd := exec.CommandContext(ctx, ffprobePath,
|
||||
"-v", "error",
|
||||
"-show_entries", "format=duration",
|
||||
"-of", "default=noprint_wrappers=1:nokey=1",
|
||||
videoPath,
|
||||
)
|
||||
|
||||
output, err := cmd.Output()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var duration float64
|
||||
fmt.Sscanf(strings.TrimSpace(string(output)), "%f", &duration)
|
||||
return duration, nil
|
||||
}
|
||||
|
||||
// checkVideoAudio 检测视频文件是否有音频轨道
|
||||
func (s *cutService) checkVideoAudio(ctx context.Context, ffmpegPath, videoPath string) (bool, error) {
|
||||
ffprobePath := filepath.Join(filepath.Dir(ffmpegPath), "ffprobe")
|
||||
if _, err := os.Stat(ffprobePath); os.IsNotExist(err) {
|
||||
ffprobePath = "ffprobe"
|
||||
}
|
||||
|
||||
cmd := exec.CommandContext(ctx, ffprobePath,
|
||||
"-v", "error",
|
||||
"-select_streams", "a:0",
|
||||
"-show_entries", "stream=codec_type",
|
||||
"-of", "default=noprint_wrappers=1:nokey=1",
|
||||
videoPath,
|
||||
)
|
||||
output, err := cmd.Output()
|
||||
if err != nil || len(strings.TrimSpace(string(output))) == 0 {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// hasVideoAudio 别名保持命名一致(从concat_service复制)
|
||||
func (s *cutService) hasVideoAudio(ctx context.Context, ffmpegPath, videoPath string) bool {
|
||||
has, _ := s.checkVideoAudio(ctx, ffmpegPath, videoPath)
|
||||
return has
|
||||
}
|
||||
|
||||
// getVideoResolution 获取视频分辨率(复用)
|
||||
func (s *cutService) getVideoResolution(ctx context.Context, ffmpegPath, videoPath string) (width, height int, err error) {
|
||||
ffprobePath := filepath.Join(filepath.Dir(ffmpegPath), "ffprobe")
|
||||
if _, err := os.Stat(ffprobePath); os.IsNotExist(err) {
|
||||
ffprobePath = "ffprobe"
|
||||
}
|
||||
|
||||
cmd := exec.CommandContext(ctx, ffprobePath,
|
||||
"-v", "error",
|
||||
"-select_streams", "v:0",
|
||||
"-show_entries", "stream=width,height",
|
||||
"-of", "csv=p=0",
|
||||
videoPath,
|
||||
)
|
||||
output, err := cmd.Output()
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
fmt.Sscanf(strings.TrimSpace(string(output)), "%d,%d", &width, &height)
|
||||
return
|
||||
}
|
||||
|
||||
// cutUploadFileRes 上传文件响应(类型别名避免重定义)
|
||||
type cutUploadFileRes struct {
|
||||
FileURL string `json:"fileURL" dc:"上传地址"`
|
||||
FileSize int `json:"fileSize" dc:"文件大小"`
|
||||
FileName string `json:"fileName" dc:"文件名称"`
|
||||
FileFormat string `json:"fileFormat" dc:"文件格式"`
|
||||
FileAddressPrefix string `json:"fileAddressPrefix" dc:"文件地址前缀"`
|
||||
}
|
||||
|
||||
// UploadToMinIO 通过 OSS 微服务的 uploadFile 接口上传到 MinIO(multipart/form-data)
|
||||
func (s *cutService) UploadToMinIO(ctx context.Context, localFilePath string) (*cutUploadFileRes, error) {
|
||||
// 构建 multipart/form-data 表单
|
||||
var buf bytes.Buffer
|
||||
mw := multipart.NewWriter(&buf)
|
||||
|
||||
file, err := os.Open(localFilePath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("打开文件失败: %v", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
fw, err := mw.CreateFormFile("file", filepath.Base(localFilePath))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("创建表单文件字段失败: %v", err)
|
||||
}
|
||||
if _, err = io.Copy(fw, file); err != nil {
|
||||
return nil, fmt.Errorf("写入文件内容失败: %v", err)
|
||||
}
|
||||
mw.Close()
|
||||
|
||||
// 使用 commonHttp 的客户端(含 Consul 服务发现),大文件上传设置长超时
|
||||
client := commonHttp.Httpclient.Clone()
|
||||
// 必须单独设置 Transport.ResponseHeaderTimeout,SetTimeout 只设 Client.Timeout
|
||||
newTransport := http.DefaultTransport.(*http.Transport).Clone()
|
||||
newTransport.ResponseHeaderTimeout = 5 * time.Minute
|
||||
client.Transport = newTransport
|
||||
client.SetTimeout(10 * time.Minute)
|
||||
|
||||
// 透传认证 headers(优先从 HTTP 请求头取)
|
||||
hasAuthHeader := false
|
||||
if r := g.RequestFromCtx(ctx); r != nil {
|
||||
for k, v := range r.Header {
|
||||
client.SetHeader(k, v[0])
|
||||
if strings.EqualFold(k, "Authorization") || strings.EqualFold(k, "X-User-Info") {
|
||||
hasAuthHeader = true
|
||||
}
|
||||
}
|
||||
}
|
||||
// 无 HTTP 请求时(异步 goroutine),从 context 的用户信息构造 header
|
||||
if !hasAuthHeader {
|
||||
uploadUser := getUserFromCtx(ctx)
|
||||
userJSON, _ := json.Marshal(uploadUser)
|
||||
client.SetHeader("X-User-Info", string(userJSON))
|
||||
}
|
||||
|
||||
// 设置 multipart Content-Type(含 boundary)
|
||||
contentType := mw.FormDataContentType()
|
||||
client.SetHeader("Content-Type", contentType)
|
||||
|
||||
g.Log().Debugf(ctx, "[UploadToMinIO] 请求URL: oss/file/uploadFile, 文件: %s, Body大小: %d bytes",
|
||||
localFilePath, buf.Len())
|
||||
|
||||
// 发送 multipart 请求(原始字节流)
|
||||
response, err := client.Post(ctx, "oss/file/uploadFile", buf.Bytes())
|
||||
if err != nil {
|
||||
glog.Error(ctx, err)
|
||||
return nil, fmt.Errorf("调用OSS上传服务失败: %v", err)
|
||||
}
|
||||
defer response.Close()
|
||||
|
||||
body := response.ReadAll()
|
||||
|
||||
// 调试:打印原始响应
|
||||
g.Log().Debugf(ctx, "[UploadToMinIO] OSS原始响应: %s", string(body))
|
||||
|
||||
// 解析标准 GoFrame 响应格式 {code, message, data}
|
||||
var apiResp struct {
|
||||
Code int `json:"code"`
|
||||
Message string `json:"message"`
|
||||
Data *cutUploadFileRes `json:"data"`
|
||||
}
|
||||
if err = json.Unmarshal(body, &apiResp); err != nil {
|
||||
return nil, fmt.Errorf("响应解析失败: %v", err)
|
||||
}
|
||||
|
||||
if apiResp.Code != 200 && apiResp.Code != 0 {
|
||||
return nil, fmt.Errorf("OSS上传失败: %s", apiResp.Message)
|
||||
}
|
||||
|
||||
g.Log().Infof(ctx, "[UploadToMinIO] 上传成功 fileURL=%s size=%d", apiResp.Data.FileURL, apiResp.Data.FileSize)
|
||||
return apiResp.Data, nil
|
||||
}
|
||||
|
||||
// ---------- 异步任务管理 ----------
|
||||
|
||||
// CreateAsyncTask 创建异步剪切任务,返回 taskId,后台处理
|
||||
func (s *cutService) CreateAsyncTask(ctx context.Context, videoURL string, shots []CutShot, upload bool, callbackURL string) (string, error) {
|
||||
if videoURL == "" {
|
||||
return "", fmt.Errorf("视频URL不能为空")
|
||||
}
|
||||
|
||||
// 过滤无效片段
|
||||
var validShots []CutShot
|
||||
for _, shot := range shots {
|
||||
if shot.End > shot.Start {
|
||||
validShots = append(validShots, shot)
|
||||
}
|
||||
}
|
||||
if len(validShots) == 0 {
|
||||
return "", fmt.Errorf("没有有效的分镜片段")
|
||||
}
|
||||
|
||||
// 序列化 shots JSON
|
||||
shotsJSON, err := json.Marshal(shots)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("序列化分镜失败: %v", err)
|
||||
}
|
||||
|
||||
taskID := "cut_" + guid.S()
|
||||
task := &entity.CutTask{
|
||||
TaskID: taskID,
|
||||
VideoURL: videoURL,
|
||||
ShotsJSON: string(shotsJSON),
|
||||
Status: "pending",
|
||||
CallbackURL: callbackURL,
|
||||
}
|
||||
if _, err := dao.CutTask.Insert(ctx, task); err != nil {
|
||||
return "", fmt.Errorf("创建任务失败: %v", err)
|
||||
}
|
||||
|
||||
// 提取调用方用户信息,传给 goroutine
|
||||
user := getUserFromCtx(ctx)
|
||||
|
||||
g.Log().Infof(ctx, "[异步剪切] 创建任务 %s, 视频=%s, 片段数=%d, 回调=%s", taskID, videoURL, len(validShots), callbackURL)
|
||||
|
||||
// 异步处理
|
||||
go s.processAsyncTask(user, taskID, videoURL, shots, upload, callbackURL)
|
||||
|
||||
return taskID, nil
|
||||
}
|
||||
|
||||
// GetTaskResult 查询异步任务结果
|
||||
func (s *cutService) GetTaskResult(ctx context.Context, taskID string) (*dto.GetCutTaskRes, error) {
|
||||
task, err := dao.CutTask.GetByTaskID(ctx, taskID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("查询任务失败: %v", err)
|
||||
}
|
||||
if task == nil {
|
||||
return nil, fmt.Errorf("任务不存在: %s", taskID)
|
||||
}
|
||||
|
||||
return dao.CutEntityToTaskRes(task), nil
|
||||
}
|
||||
|
||||
// processAsyncTask 后台处理异步剪切任务
|
||||
func (s *cutService) processAsyncTask(user *beans.User, taskID string, videoURL string, shots []CutShot, upload bool, callbackURL string) {
|
||||
bgCtx := context.Background()
|
||||
bgCtx = context.WithValue(bgCtx, "user", user)
|
||||
|
||||
dao.CutTask.UpdateRunning(bgCtx, taskID)
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
errMsg := fmt.Sprintf("异步剪切异常: %v", r)
|
||||
g.Log().Errorf(bgCtx, "[异步剪切 %s] %s", taskID, errMsg)
|
||||
dao.CutTask.UpdateError(bgCtx, taskID, errMsg)
|
||||
s.cutCallback(bgCtx, taskID, callbackURL)
|
||||
}
|
||||
}()
|
||||
|
||||
// 下载视频
|
||||
tempDir := g.Cfg().MustGet(bgCtx, "ffmpeg.temp_dir", "resource/temp").String()
|
||||
os.MkdirAll(tempDir, 0755)
|
||||
|
||||
savePath, dlErr := downloadFile(bgCtx, videoURL, tempDir)
|
||||
if dlErr != nil {
|
||||
errMsg := fmt.Sprintf("下载视频失败: %v", dlErr)
|
||||
dao.CutTask.UpdateError(bgCtx, taskID, errMsg)
|
||||
s.cutCallback(bgCtx, taskID, callbackURL)
|
||||
return
|
||||
}
|
||||
defer os.Remove(savePath)
|
||||
|
||||
// 执行剪切
|
||||
cutErr := s.executeCut(bgCtx, taskID, savePath, shots, upload)
|
||||
if cutErr != nil {
|
||||
dao.CutTask.UpdateError(bgCtx, taskID, cutErr.Error())
|
||||
s.cutCallback(bgCtx, taskID, callbackURL)
|
||||
return
|
||||
}
|
||||
|
||||
g.Log().Infof(bgCtx, "[异步剪切 %s] 完成", taskID)
|
||||
|
||||
if callbackURL != "" {
|
||||
s.cutCallback(bgCtx, taskID, callbackURL)
|
||||
}
|
||||
}
|
||||
|
||||
// executeCut 执行剪切并更新任务状态
|
||||
func (s *cutService) executeCut(ctx context.Context, taskID string, videoPath string, shots []CutShot, upload bool) error {
|
||||
tempDir := filepath.Dir(videoPath)
|
||||
outputPath := filepath.Join(tempDir,
|
||||
fmt.Sprintf("cut_%s_x%d_%s.mp4", taskID, len(shots), time.Now().Format("150405")))
|
||||
|
||||
res, cutErr := s.Cut(ctx, &CutReq{
|
||||
VideoPath: videoPath,
|
||||
Shots: shots,
|
||||
OutputPath: outputPath,
|
||||
Upload: upload,
|
||||
})
|
||||
if cutErr != nil {
|
||||
os.Remove(outputPath)
|
||||
return cutErr
|
||||
}
|
||||
|
||||
// 更新数据库为成功
|
||||
fileName := filepath.Base(outputPath)
|
||||
fileFormat := ""
|
||||
if idx := strings.LastIndex(fileName, "."); idx > 0 {
|
||||
fileFormat = fileName[idx+1:]
|
||||
}
|
||||
dao.CutTask.UpdateSuccess(ctx, taskID,
|
||||
res.FileURL, res.FileSize, fileName, fileFormat,
|
||||
"", res.DurationStr)
|
||||
|
||||
os.Remove(outputPath)
|
||||
return nil
|
||||
}
|
||||
|
||||
// cutCallback 回调通知
|
||||
func (s *cutService) cutCallback(ctx context.Context, taskID, callbackURL string) {
|
||||
if callbackURL == "" {
|
||||
return
|
||||
}
|
||||
|
||||
task, err := dao.CutTask.GetByTaskID(ctx, taskID)
|
||||
if err != nil || task == nil {
|
||||
g.Log().Errorf(ctx, "[异步剪切回调 %s] 查询任务失败: %v", taskID, err)
|
||||
return
|
||||
}
|
||||
|
||||
payload := map[string]interface{}{
|
||||
"taskId": task.TaskID,
|
||||
"status": task.Status,
|
||||
}
|
||||
if task.Status == "success" {
|
||||
payload["fileURL"] = task.FileURL
|
||||
payload["fileSize"] = task.FileSize
|
||||
}
|
||||
if task.Status == "failed" {
|
||||
payload["errorMessage"] = task.ErrorMessage
|
||||
}
|
||||
|
||||
body, _ := json.Marshal(payload)
|
||||
g.Log().Infof(ctx, "[异步剪切回调 %s] 状态=%s, 目标=%s", taskID, task.Status, callbackURL)
|
||||
|
||||
req, _ := http.NewRequest("POST", callbackURL, bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
// 透传调用方用户信息
|
||||
cbUser := getUserFromCtx(ctx)
|
||||
userJSON, _ := json.Marshal(cbUser)
|
||||
req.Header.Set("X-User-Info", string(userJSON))
|
||||
|
||||
client := &http.Client{Timeout: 2 * time.Minute}
|
||||
resp, reqErr := client.Do(req)
|
||||
if reqErr != nil {
|
||||
g.Log().Errorf(ctx, "[异步剪切回调 %s] 请求失败: %v", taskID, reqErr)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
g.Log().Infof(ctx, "[异步剪切回调 %s] 响应 status=%d, body=%s", taskID, resp.StatusCode, string(respBody))
|
||||
}
|
||||
39
sql/concat_task.sql
Normal file
39
sql/concat_task.sql
Normal file
@@ -0,0 +1,39 @@
|
||||
-- concat_task 视频拼接异步任务表
|
||||
CREATE TABLE IF NOT EXISTS concat_task (
|
||||
id BIGSERIAL NOT NULL,
|
||||
task_id VARCHAR(64) NOT NULL,
|
||||
tenant_id BIGINT NOT NULL DEFAULT 0,
|
||||
status VARCHAR(20) NOT NULL DEFAULT 'pending',
|
||||
file_url TEXT NOT NULL DEFAULT '',
|
||||
file_size BIGINT NOT NULL DEFAULT 0,
|
||||
file_name VARCHAR(255) NOT NULL DEFAULT '',
|
||||
file_format VARCHAR(32) NOT NULL DEFAULT '',
|
||||
file_address_prefix TEXT NOT NULL DEFAULT '',
|
||||
method_used VARCHAR(64) NOT NULL DEFAULT '',
|
||||
duration_str VARCHAR(32) NOT NULL DEFAULT '',
|
||||
error_message TEXT,
|
||||
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
||||
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
||||
deleted_at TIMESTAMP WITH TIME ZONE,
|
||||
PRIMARY KEY (id)
|
||||
);
|
||||
|
||||
COMMENT ON TABLE concat_task IS '视频拼接异步任务表';
|
||||
COMMENT ON COLUMN concat_task.task_id IS '任务唯一标识';
|
||||
COMMENT ON COLUMN concat_task.tenant_id IS '租户ID';
|
||||
COMMENT ON COLUMN concat_task.status IS '任务状态:pending/running/success/failed';
|
||||
COMMENT ON COLUMN concat_task.file_url IS 'MinIO文件访问路径';
|
||||
COMMENT ON COLUMN concat_task.file_size IS '文件大小(字节)';
|
||||
COMMENT ON COLUMN concat_task.file_name IS '文件名';
|
||||
COMMENT ON COLUMN concat_task.file_format IS '文件格式';
|
||||
COMMENT ON COLUMN concat_task.file_address_prefix IS 'MinIO地址前缀';
|
||||
COMMENT ON COLUMN concat_task.method_used IS '实际使用的拼接方式';
|
||||
COMMENT ON COLUMN concat_task.duration_str IS '拼接后时长';
|
||||
COMMENT ON COLUMN concat_task.error_message IS '错误信息';
|
||||
COMMENT ON COLUMN concat_task.created_at IS '创建时间';
|
||||
COMMENT ON COLUMN concat_task.updated_at IS '更新时间';
|
||||
COMMENT ON COLUMN concat_task.deleted_at IS '删除时间(软删除)';
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_concat_task_task_id ON concat_task(task_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_concat_task_status ON concat_task(status);
|
||||
CREATE INDEX IF NOT EXISTS idx_concat_task_created_at ON concat_task(created_at);
|
||||
42
sql/cut_task.sql
Normal file
42
sql/cut_task.sql
Normal file
@@ -0,0 +1,42 @@
|
||||
-- cut_task 视频分镜剪切异步任务表
|
||||
CREATE TABLE IF NOT EXISTS cut_task (
|
||||
id BIGSERIAL NOT NULL,
|
||||
tenant_id BIGINT NOT NULL DEFAULT 0,
|
||||
task_id VARCHAR(64) NOT NULL,
|
||||
video_url TEXT NOT NULL,
|
||||
shots_json TEXT NOT NULL,
|
||||
status VARCHAR(20) NOT NULL DEFAULT 'pending',
|
||||
file_url TEXT NOT NULL DEFAULT '',
|
||||
file_size BIGINT NOT NULL DEFAULT 0,
|
||||
file_name VARCHAR(255) NOT NULL DEFAULT '',
|
||||
file_format VARCHAR(32) NOT NULL DEFAULT '',
|
||||
file_address_prefix TEXT NOT NULL DEFAULT '',
|
||||
duration_str VARCHAR(32) NOT NULL DEFAULT '',
|
||||
error_message TEXT,
|
||||
callback_url VARCHAR(500) NOT NULL DEFAULT '',
|
||||
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
||||
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
||||
deleted_at TIMESTAMP WITH TIME ZONE,
|
||||
PRIMARY KEY (id)
|
||||
);
|
||||
|
||||
COMMENT ON TABLE cut_task IS '视频分镜剪切异步任务表';
|
||||
COMMENT ON COLUMN cut_task.task_id IS '任务唯一标识';
|
||||
COMMENT ON COLUMN cut_task.video_url IS '原始视频URL';
|
||||
COMMENT ON COLUMN cut_task.shots_json IS '分镜JSON数组';
|
||||
COMMENT ON COLUMN cut_task.status IS '任务状态:pending/running/success/failed';
|
||||
COMMENT ON COLUMN cut_task.file_url IS 'MinIO文件访问路径';
|
||||
COMMENT ON COLUMN cut_task.file_size IS '文件大小(字节)';
|
||||
COMMENT ON COLUMN cut_task.file_name IS '文件名';
|
||||
COMMENT ON COLUMN cut_task.file_format IS '文件格式';
|
||||
COMMENT ON COLUMN cut_task.file_address_prefix IS 'MinIO地址前缀';
|
||||
COMMENT ON COLUMN cut_task.duration_str IS '剪切后时长';
|
||||
COMMENT ON COLUMN cut_task.error_message IS '错误信息';
|
||||
COMMENT ON COLUMN cut_task.callback_url IS '回调地址';
|
||||
COMMENT ON COLUMN cut_task.created_at IS '创建时间';
|
||||
COMMENT ON COLUMN cut_task.updated_at IS '更新时间';
|
||||
COMMENT ON COLUMN cut_task.deleted_at IS '删除时间(软删除)';
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_cut_task_task_id ON cut_task(task_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_cut_task_status ON cut_task(status);
|
||||
CREATE INDEX IF NOT EXISTS idx_cut_task_created_at ON cut_task(created_at);
|
||||
Reference in New Issue
Block a user