Files
data-engine/service/tencent/video_service.go
2026-05-07 09:07:20 +08:00

420 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package tencent
import (
"context"
dao "dataengine/dao/tencent"
dto "dataengine/model/dto/tencent"
entity "dataengine/model/entity/tencent"
"encoding/json"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"time"
"gitea.com/red-future/common/db/gfdb"
"github.com/gogf/gf/v2/frame/g"
"github.com/sirupsen/logrus"
)
type videoService struct{}
var VideoService = new(videoService)
// API响应结构
type videoResponse struct {
Code int `json:"code"`
Message string `json:"message"`
Data struct {
List []struct {
VideoId int64 `json:"video_id"`
Width int `json:"width"`
Height int `json:"height"`
VideoFrames int `json:"video_frames"`
VideoFps int `json:"video_fps"`
VideoCodec string `json:"video_codec"`
VideoBitRate int64 `json:"video_bit_rate"`
AudioCodec string `json:"audio_codec"`
AudioBitRate int64 `json:"audio_bit_rate"`
FileSize int64 `json:"file_size"`
Type string `json:"type"`
Signature string `json:"signature"`
SystemStatus string `json:"system_status"`
Description string `json:"description"`
PreviewUrl string `json:"preview_url"`
KeyFrameImageUrl string `json:"key_frame_image_url"`
CreatedTime int64 `json:"created_time"`
LastModifiedTime int64 `json:"last_modified_time"`
VideoProfileName string `json:"video_profile_name"`
AudioSampleRate int `json:"audio_sample_rate"`
MaxKeyframeInterval int `json:"max_keyframe_interval"`
MinKeyframeInterval int `json:"min_keyframe_interval"`
SampleAspectRatio string `json:"sample_aspect_ratio"`
AudioProfileName string `json:"audio_profile_name"`
ScanType string `json:"scan_type"`
ImageDurationMs int64 `json:"image_duration_millisecond"`
AudioDurationMs int64 `json:"audio_duration_millisecond"`
SourceType string `json:"source_type"`
ProductCatalogId string `json:"product_catalog_id"`
ProductOuterId string `json:"product_outer_id"`
SourceReferenceId string `json:"source_reference_id"`
OwnerAccountId string `json:"owner_account_id"`
Status string `json:"status"`
SourceMaterialId string `json:"source_material_id"`
NewSourceType string `json:"new_source_type"`
AigcType int `json:"aigc_type"`
FirstPublicationStatus string `json:"first_publication_status"`
QualityStatus string `json:"quality_status"`
CoverId string `json:"cover_id"`
SimilarityStatus string `json:"similarity_status"`
UserAigcStatus string `json:"user_aigc_status"`
SystemAigcStatus string `json:"system_aigc_status"`
AigcSource string `json:"aigc_source"`
AigcFlag string `json:"aigc_flag"`
MuseAigcVersion int `json:"muse_aigc_version"`
} `json:"list"`
PageInfo struct {
Page int `json:"page"`
PageSize int `json:"page_size"`
TotalNumber int `json:"total_number"`
TotalPage int `json:"total_page"`
} `json:"page_info"`
} `json:"data"`
TraceId string `json:"trace_id"`
}
// SyncAll 同步所有视频素材数据(遍历所有账户,自动分页)
func (s *videoService) SyncAll(ctx context.Context, req *dto.SyncVideoReq) (res *dto.SyncVideoRes, err error) {
// 创建独立的context避免HTTP请求超时导致context被取消
// 设置30分钟超时足够完成所有账户的同步任务
independentCtx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
// 保留原context中的user信息供数据库中间件使用
if user := ctx.Value("user"); user != nil {
independentCtx = context.WithValue(independentCtx, "user", user)
}
// 获取access_token
accessToken := req.AccessToken
if accessToken == "" {
accessToken = g.Cfg().MustGet(independentCtx, "tencent.oauth.access_token").String()
}
if accessToken == "" {
return nil, fmt.Errorf("access_token不能为空")
}
res = &dto.SyncVideoRes{}
totalSynced := 0
totalVideos := 0
// 获取所有账户列表
accounts, err := s.getAccountList(independentCtx)
if err != nil {
return nil, fmt.Errorf("获取账户列表失败: %w", err)
}
res.TotalAccounts = len(accounts)
logrus.Infof("开始同步腾讯广告视频素材 - 账户数: %d", len(accounts))
// 遍历每个账户
for _, account := range accounts {
logrus.Infof("========== 开始处理账户: %d (%s) ==========", account.AccountID, account.CorporationName)
// 获取该账户的所有视频(分页)
accountVideos, err := s.syncAccountVideos(independentCtx, accessToken, account.AccountID)
if err != nil {
logrus.Errorf("账户 %d 同步失败: %v继续下一个账户", account.AccountID, err)
continue
}
totalVideos += accountVideos
totalSynced += accountVideos
// 避免请求过快休眠200ms
time.Sleep(200 * time.Millisecond)
}
res.TotalVideos = totalVideos
res.SyncedCount = totalSynced
res.Message = fmt.Sprintf("同步完成,共处理 %d 个账户,%d 条视频记录", res.TotalAccounts, totalSynced)
logrus.Infof("同步完成 - 账户数: %d, 总视频数: %d, 成功同步: %d", res.TotalAccounts, totalVideos, totalSynced)
return res, nil
}
// getAccountList 获取所有账户列表
func (s *videoService) getAccountList(ctx context.Context) ([]entity.AccountRelation, error) {
var accounts []entity.AccountRelation
err := gfdb.DB(ctx).Model(ctx, "tencent_account_relation").
WhereNull("deleted_at").
Scan(&accounts)
return accounts, err
}
// syncAccountVideos 同步单个账户的视频数据
func (s *videoService) syncAccountVideos(ctx context.Context, accessToken string, accountId int64) (int, error) {
totalSynced := 0
// 先获取第一页,得到总页数
firstPageData, err := s.fetchPage(ctx, accessToken, accountId, 1, 100)
if err != nil {
// 如果是请求失败或API错误返回友好的提示
errMsg := err.Error()
if contains(errMsg, "请求失败") || contains(errMsg, "API错误") {
return 0, fmt.Errorf("该账户没有视频或无法访问")
}
return 0, fmt.Errorf("获取第一页数据失败: %w", err)
}
totalPage := firstPageData.Data.PageInfo.TotalPage
logrus.Infof("账户 %d - 总页数: %d, 总记录数: %d", accountId, totalPage, firstPageData.Data.PageInfo.TotalNumber)
// 如果没有数据,直接返回
if totalPage == 0 || firstPageData.Data.PageInfo.TotalNumber == 0 {
logrus.Infof("账户 %d - 没有视频数据", accountId)
return 0, nil
}
// 处理第一页数据
synced, err := s.savePageData(ctx, firstPageData, accountId)
if err != nil {
logrus.Errorf("保存第一页数据失败: %v", err)
}
totalSynced += synced
// 循环获取剩余页
for page := 2; page <= totalPage; page++ {
logrus.Infof("账户 %d - 正在获取第 %d/%d 页...", accountId, page, totalPage)
pageData, err := s.fetchPage(ctx, accessToken, accountId, page, 100)
if err != nil {
logrus.Errorf("账户 %d - 获取第 %d 页失败: %v继续下一页", accountId, page, err)
continue
}
synced, err := s.savePageData(ctx, pageData, accountId)
if err != nil {
logrus.Errorf("账户 %d - 保存第 %d 页数据失败: %v", accountId, page, err)
continue
}
totalSynced += synced
// 避免请求过快休眠100ms
time.Sleep(100 * time.Millisecond)
}
logrus.Infof("账户 %d - 同步完成,共 %d 条记录", accountId, totalSynced)
return totalSynced, nil
}
// fetchPage 获取单页数据
func (s *videoService) fetchPage(ctx context.Context, accessToken string, accountId int64, page, pageSize int) (*videoResponse, error) {
// 构建filtering参数状态为正常
filtering := `[{"field":"status","operator":"EQUALS","values":["ADSTATUS_NORMAL"]}]`
// URL编码filtering参数
encodedFiltering := url.QueryEscape(filtering)
// 在发送请求前生成最新的时间戳和nonce避免时间戳过期
timestamp := time.Now().Unix()
// 使用时间戳+纳秒后6位+随机数确保唯一性且不超过32字符
nanoSuffix := time.Now().UnixNano() % 1000000 // 取纳秒的后6位
nonce := fmt.Sprintf("%d%06d%d", timestamp, nanoSuffix, rand.Intn(1000))
urlStr := fmt.Sprintf("https://api.e.qq.com/v3.0/videos/get?access_token=%s&nonce=%s&timestamp=%d&account_id=%d&filtering=%s&page=%d&page_size=%d",
accessToken, nonce, timestamp, accountId, encodedFiltering, page, pageSize)
logrus.Debugf("请求URL: %s", urlStr)
httpReq, err := http.NewRequestWithContext(ctx, "GET", urlStr, nil)
if err != nil {
return nil, fmt.Errorf("创建请求失败: %w", err)
}
client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("请求失败: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("读取响应失败: %w", err)
}
logrus.Debugf("API响应: %s", string(body))
var result videoResponse
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("解析响应失败: %w", err)
}
if result.Code != 0 {
return nil, fmt.Errorf("API错误: code=%d, message=%s", result.Code, result.Message)
}
return &result, nil
}
// savePageData 保存单页数据到数据库
func (s *videoService) savePageData(ctx context.Context, data *videoResponse, accountId int64) (int, error) {
if len(data.Data.List) == 0 {
return 0, nil
}
logrus.Infof("准备保存 %d 条视频素材数据", len(data.Data.List))
var items []*entity.Video
for _, item := range data.Data.List {
video := &entity.Video{
VideoId: fmt.Sprintf("%d", item.VideoId),
AccountId: accountId,
Width: item.Width,
Height: item.Height,
VideoFrames: item.VideoFrames,
VideoFps: item.VideoFps,
VideoCodec: item.VideoCodec,
VideoBitRate: item.VideoBitRate,
AudioCodec: item.AudioCodec,
AudioBitRate: item.AudioBitRate,
FileSize: item.FileSize,
Type: item.Type,
Signature: item.Signature,
SystemStatus: item.SystemStatus,
Description: item.Description,
PreviewUrl: item.PreviewUrl,
KeyFrameImageUrl: item.KeyFrameImageUrl,
CreatedTime: item.CreatedTime,
LastModifiedTime: item.LastModifiedTime,
VideoProfileName: item.VideoProfileName,
AudioSampleRate: item.AudioSampleRate,
MaxKeyframeInterval: item.MaxKeyframeInterval,
MinKeyframeInterval: item.MinKeyframeInterval,
SampleAspectRatio: item.SampleAspectRatio,
AudioProfileName: item.AudioProfileName,
ScanType: item.ScanType,
ImageDurationMillisecond: item.ImageDurationMs,
AudioDurationMillisecond: item.AudioDurationMs,
SourceType: item.SourceType,
ProductCatalogId: item.ProductCatalogId,
ProductOuterId: item.ProductOuterId,
SourceReferenceId: item.SourceReferenceId,
OwnerAccountId: item.OwnerAccountId,
Status: item.Status,
SourceMaterialId: item.SourceMaterialId,
NewSourceType: item.NewSourceType,
AigcType: item.AigcType,
FirstPublicationStatus: item.FirstPublicationStatus,
QualityStatus: item.QualityStatus,
CoverId: item.CoverId,
SimilarityStatus: item.SimilarityStatus,
UserAigcStatus: item.UserAigcStatus,
SystemAigcStatus: item.SystemAigcStatus,
AigcSource: item.AigcSource,
AigcFlag: item.AigcFlag,
MuseAigcVersion: item.MuseAigcVersion,
}
// 设置 TenantID框架将0视为空值所以使用1
video.TenantId = 1
// 设置默认校验状态为待校验
video.VerifyStatus = "PENDING"
items = append(items, video)
}
logrus.Infof("调用 BatchUpsert...")
successCount, err := dao.Video.BatchUpsert(ctx, items)
logrus.Infof("BatchUpsert 返回: successCount=%d, err=%v", successCount, err)
return successCount, err
}
// ListAll 获取所有视频素材
func (s *videoService) ListAll(ctx context.Context) ([]entity.Video, error) {
return dao.Video.ListAll(ctx)
}
// ListWithPage 分页查询视频素材(支持时间过滤)
func (s *videoService) ListWithPage(ctx context.Context, req *dto.ListVideoQueryReq) (*dto.ListVideoRes, error) {
// 设置默认值
page := req.Page
if page <= 0 {
page = 1
}
pageSize := req.PageSize
if pageSize <= 0 {
pageSize = 20
}
if pageSize > 100 {
pageSize = 100 // 限制最大每页数量
}
// 调用DAO层查询
list, total, err := dao.Video.ListWithPage(ctx, page, pageSize, req.AccountId, req.StartTime, req.EndTime, req.Status)
if err != nil {
return nil, fmt.Errorf("查询视频素材失败: %w", err)
}
// 计算总页数
totalPages := (total + pageSize - 1) / pageSize
if totalPages == 0 && total > 0 {
totalPages = 1
}
// 转换为DTO
items := make([]dto.VideoItem, 0, len(list))
for _, item := range list {
items = append(items, dto.VideoItem{
Id: item.Id,
VideoId: item.VideoId,
AccountId: item.AccountId,
Width: item.Width,
Height: item.Height,
VideoFrames: item.VideoFrames,
VideoFps: item.VideoFps,
FileSize: item.FileSize,
Type: item.Type,
Description: item.Description,
PreviewUrl: item.PreviewUrl,
KeyFrameImageUrl: item.KeyFrameImageUrl,
Status: item.Status,
CreatedTime: item.CreatedTime,
LastModifiedTime: item.LastModifiedTime,
CreatedAt: item.CreatedAt.Format("2006-01-02 15:04:05"),
UpdatedAt: item.UpdatedAt.Format("2006-01-02 15:04:05"),
})
}
res := &dto.ListVideoRes{
List: items,
Total: total,
Page: page,
PageSize: pageSize,
TotalPages: totalPages,
}
logrus.Infof("查询视频素材 - 页码: %d, 每页: %d, 总数: %d, 总页数: %d", page, pageSize, total, totalPages)
return res, nil
}
// contains 检查字符串是否包含子串
func contains(s, substr string) bool {
return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && (s[:len(substr)] == substr || s[len(s)-len(substr):] == substr || findSubstring(s, substr)))
}
func findSubstring(s, substr string) bool {
for i := 0; i <= len(s)-len(substr); i++ {
if s[i:i+len(substr)] == substr {
return true
}
}
return false
}