Files
cid/service/dataengine/material_verify_service.go

797 lines
29 KiB
Go
Raw Normal View History

2026-05-15 10:28:17 +08:00
package dataengine
import (
consts "cid/consts/dataengine"
dao "cid/dao/dataengine"
entity "cid/model/entity/dataengine"
yidunService "cid/service/yidun"
"context"
"encoding/json"
"fmt"
"time"
"github.com/gogf/gf/v2/frame/g"
)
// 轮询配置常量
const (
// PollBatchSize 每次轮询处理数量
PollBatchSize = 20
)
// 状态常量
const (
// 原表状态 - 与 tencent_image/tencent_video 表的 status 字段对应
StatusSubmitting = consts.CheckStatusSubmitting // 送检中
)
// MaterialVerifyService 素材校验服务
type MaterialVerifyService struct{}
// MaterialVerify 校验服务单例
var MaterialVerify = new(MaterialVerifyService)
// =============================================================================
// 校验状态转换
// =============================================================================
// SuggestionToVerifyStatus 根据易盾处置建议转换为校验状态
func SuggestionToVerifyStatus(suggestion int) string {
switch suggestion {
case consts.SuggestionPass:
return entity.VerifyStatusVerified // 通过
case consts.SuggestionReview:
return entity.VerifyStatusPending // 嫌疑,需要人工审核,暂不更新状态
case consts.SuggestionBlock:
return entity.VerifyStatusRejected // 不通过
default:
return entity.VerifyStatusPending
}
}
// =============================================================================
// 图片校验
// =============================================================================
// VerifyImageByID 根据图片ID执行校验
func (s *MaterialVerifyService) VerifyImageByID(ctx context.Context, imageID string) (*entity.MaterialVerifyLog, error) {
// 1. 获取图片数据
image, err := dao.TencentImage.GetByImageID(ctx, imageID)
if err != nil {
return nil, fmt.Errorf("查询图片数据失败: %w", err)
}
if image == nil {
return nil, fmt.Errorf("未找到图片数据, imageID=%s", imageID)
}
// 2. 创建校验日志
log := s.createVerifyLog(ctx, entity.MaterialTypeImage, imageID, consts.SourceTableTencentImage, image.Id, image.AccountID)
if log == nil {
return nil, fmt.Errorf("创建校验日志失败")
}
// 3. 执行校验
err = s.submitImageCheck(ctx, image, log)
if err != nil {
return nil, err
}
return log, nil
}
// submitImageCheck 提交图片校验
func (s *MaterialVerifyService) submitImageCheck(ctx context.Context, image *entity.TencentImage, log *entity.MaterialVerifyLog) error {
startTime := time.Now()
// 获取回调模式开关
callbackMode := g.Cfg().MustGet(ctx, "yidun.callback_mode").Bool()
// 构建请求参数
requestParams := map[string]interface{}{
"imageURL": image.PreviewURL,
"dataID": image.ImageID,
}
requestParamsJSON, _ := json.Marshal(requestParams)
var (
taskID string
duration int64
)
if callbackMode {
// 回调模式:使用异步检测,易盾处理完成后会回调
callbackURL := g.Cfg().MustGet(ctx, "yidun.image.callback_url").String()
requestParams["callbackURL"] = callbackURL
result, err := yidunService.ImageDetection.DetectImage(ctx, image.PreviewURL, image.ImageID, callbackURL)
duration = time.Since(startTime).Milliseconds()
if err != nil {
dao.MaterialVerifyLog.UpdateError(ctx, log.Id, entity.VerifyStatusPending, err.Error())
dao.MaterialVerifyLog.UpdateDuration(ctx, log.Id, duration)
g.Log().Warningf(ctx, "图片异步检测失败(保持待检验), id=%d, imageId=%s, error=%v", image.Id, image.ImageID, err)
return fmt.Errorf("图片异步检测失败: %w", err)
}
taskID = result.TaskID
// 保存任务ID和请求参数
dao.MaterialVerifyLog.UpdateTaskID(ctx, log.Id, taskID)
dao.MaterialVerifyLog.UpdateRequestParams(ctx, log.Id, string(requestParamsJSON))
// 更新原表状态为 submitting等待回调
s.updateImageStatus(ctx, image.Id, StatusSubmitting)
g.Log().Infof(ctx, "图片异步检测已提交, id=%d, imageId=%s, taskId=%s, duration=%dms",
image.Id, image.ImageID, taskID, duration)
} else {
// 轮询模式:使用同步检测,直接返回结果
syncResult, err := yidunService.ImageDetection.DetectImageSync(ctx, image.PreviewURL, image.ImageID)
duration = time.Since(startTime).Milliseconds()
if err != nil {
dao.MaterialVerifyLog.UpdateError(ctx, log.Id, entity.VerifyStatusPending, err.Error())
dao.MaterialVerifyLog.UpdateDuration(ctx, log.Id, duration)
g.Log().Warningf(ctx, "图片同步检测失败(保持待检验), id=%d, imageId=%s, error=%v", image.Id, image.ImageID, err)
return fmt.Errorf("图片同步检测失败: %w", err)
}
taskID = syncResult.TaskID
// 保存任务ID和请求参数
dao.MaterialVerifyLog.UpdateTaskID(ctx, log.Id, taskID)
dao.MaterialVerifyLog.UpdateRequestParams(ctx, log.Id, string(requestParamsJSON))
// 根据同步结果更新状态
verifyStatus := SuggestionToVerifyStatus(syncResult.Suggestion)
responseJSON, _ := json.Marshal(syncResult)
dao.MaterialVerifyLog.UpdateVerifyResult(ctx, log.Id, verifyStatus,
syncResult.Suggestion, syncResult.Label, syncResult.ResultType, string(responseJSON), syncResult.CensorTime)
s.updateImageStatus(ctx, image.Id, verifyStatus)
g.Log().Infof(ctx, "图片同步检测完成, id=%d, imageId=%s, taskId=%s, suggestion=%d, verifyStatus=%s, duration=%dms",
image.Id, image.ImageID, taskID, syncResult.Suggestion, verifyStatus, duration)
}
return nil
}
// =============================================================================
// 视频校验
// =============================================================================
// VerifyVideoByID 根据视频ID执行校验
func (s *MaterialVerifyService) VerifyVideoByID(ctx context.Context, videoID string) (*entity.MaterialVerifyLog, error) {
// 1. 获取视频数据
video, err := dao.TencentVideo.GetByVideoID(ctx, videoID)
if err != nil {
return nil, fmt.Errorf("查询视频数据失败: %w", err)
}
if video == nil {
return nil, fmt.Errorf("未找到视频数据, videoID=%s", videoID)
}
// 2. 创建校验日志
log := s.createVerifyLog(ctx, entity.MaterialTypeVideo, videoID, consts.SourceTableTencentVideo, video.Id, video.AccountID)
if log == nil {
return nil, fmt.Errorf("创建校验日志失败")
}
// 3. 执行校验
err = s.submitVideoCheck(ctx, video, log)
if err != nil {
return nil, err
}
return log, nil
}
// submitVideoCheck 提交视频校验
func (s *MaterialVerifyService) submitVideoCheck(ctx context.Context, video *entity.TencentVideo, log *entity.MaterialVerifyLog) error {
startTime := time.Now()
// 获取回调模式开关
callbackMode := g.Cfg().MustGet(ctx, "yidun.callback_mode").Bool()
// 根据开关决定回调地址
var callbackURL string
if callbackMode {
callbackURL = g.Cfg().MustGet(ctx, "yidun.video.callback_url").String()
}
// 构建请求参数
requestParams := map[string]interface{}{
"videoURL": video.PreviewURL,
"dataID": video.VideoID,
"callbackURL": callbackURL,
}
requestParamsJSON, _ := json.Marshal(requestParams)
// 调用易盾视频检测
result, err := yidunService.VideoDetection.DetectVideo(ctx, video.PreviewURL, video.VideoID, callbackURL)
duration := time.Since(startTime).Milliseconds()
if err != nil {
// 调用易盾接口失败(如额度用光、网络错误、超时等),不更新状态,保持待检验
// 只有易盾明确返回检测结果且suggestion=BLOCK时才标记为失败
dao.MaterialVerifyLog.UpdateError(ctx, log.Id, entity.VerifyStatusPending, err.Error())
dao.MaterialVerifyLog.UpdateDuration(ctx, log.Id, duration)
g.Log().Warningf(ctx, "视频校验接口调用失败(保持待检验), id=%d, videoId=%s, error=%v", video.Id, video.VideoID, err)
return fmt.Errorf("视频校验调用失败: %w", err)
}
// 保存任务ID和请求参数
dao.MaterialVerifyLog.UpdateTaskID(ctx, log.Id, result.TaskID)
dao.MaterialVerifyLog.UpdateRequestParams(ctx, log.Id, string(requestParamsJSON))
// 更新原表状态为 submitting
s.updateVideoStatus(ctx, video.Id, StatusSubmitting)
// 轮询模式(无回调):提交后立即尝试查询检测结果
if !callbackMode {
g.Log().Infof(ctx, "轮询模式:提交后立即查询结果, taskId=%s", result.TaskID)
// 等待500ms让易盾有时间处理
time.Sleep(500 * time.Millisecond)
if err := s.ProcessVideoResultByTask(ctx, result.TaskID); err != nil {
g.Log().Warningf(ctx, "提交后立即查询结果失败(不影响状态,后续轮询继续), taskId=%s, error=%v", result.TaskID, err)
}
}
g.Log().Infof(ctx, "视频校验已提交, id=%d, videoId=%s, taskId=%s, duration=%dms",
video.Id, video.VideoID, result.TaskID, duration)
return nil
}
// =============================================================================
// 回调处理
// =============================================================================
// ProcessImageCallback 处理图片校验回调
func (s *MaterialVerifyService) ProcessImageCallback(ctx context.Context, callbackData string) error {
g.Log().Infof(ctx, "处理图片校验回调, data: %s", callbackData)
var callback yidunService.ImageCallbackData
if err := json.Unmarshal([]byte(callbackData), &callback); err != nil {
g.Log().Errorf(ctx, "解析图片回调数据失败: %v", err)
return fmt.Errorf("解析回调数据失败: %w", err)
}
if callback.Antispam == nil {
return fmt.Errorf("回调数据格式错误缺少antispam字段")
}
antispam := callback.Antispam
g.Log().Infof(ctx, "处理图片校验结果 - taskId: %s, suggestion: %d, resultType: %d",
antispam.TaskId, antispam.Suggestion, antispam.ResultType)
// 根据 taskId 查找校验日志
log, err := dao.MaterialVerifyLog.GetByTaskID(ctx, antispam.TaskId)
if err != nil {
return fmt.Errorf("查找校验日志失败: %w", err)
}
if log == nil {
g.Log().Warningf(ctx, "未找到校验日志, taskId=%s", antispam.TaskId)
return nil
}
// 构建响应结果
responseResult := callbackData
// 根据 suggestion 确定校验状态
verifyStatus := SuggestionToVerifyStatus(antispam.Suggestion)
// 更新日志
err = dao.MaterialVerifyLog.UpdateVerifyResult(ctx, log.Id, verifyStatus,
antispam.Suggestion, antispam.Label, antispam.ResultType, responseResult, antispam.CensorTime)
if err != nil {
return fmt.Errorf("更新校验日志失败: %w", err)
}
// 更新原表状态(图片回调只处理图片来源)
if log.SourceTable == consts.SourceTableTencentImage {
s.updateImageStatus(ctx, log.SourceID, verifyStatus)
}
g.Log().Infof(ctx, "图片校验回调处理完成, taskId=%s, verifyStatus=%s, suggestion=%d",
antispam.TaskId, verifyStatus, antispam.Suggestion)
return nil
}
// ProcessVideoCallback 处理视频校验回调
func (s *MaterialVerifyService) ProcessVideoCallback(ctx context.Context, callbackData string) error {
g.Log().Infof(ctx, "处理视频校验回调, data: %s", callbackData)
var callback yidunService.VideoCallbackData
if err := json.Unmarshal([]byte(callbackData), &callback); err != nil {
g.Log().Errorf(ctx, "解析视频回调数据失败: %v", err)
return fmt.Errorf("解析回调数据失败: %w", err)
}
if callback.Antispam == nil {
return fmt.Errorf("视频回调数据格式错误缺少antispam字段")
}
antispam := callback.Antispam
g.Log().Infof(ctx, "处理视频校验结果 - taskId: %s, suggestion: %d, resultType: %d",
antispam.TaskID, antispam.Suggestion, antispam.ResultType)
// 根据 taskId 查找校验日志
log, err := dao.MaterialVerifyLog.GetByTaskID(ctx, antispam.TaskID)
if err != nil {
return fmt.Errorf("查找校验日志失败: %w", err)
}
if log == nil {
g.Log().Warningf(ctx, "未找到校验日志, taskId=%s", antispam.TaskID)
return nil
}
// 构建响应结果
responseResult := callbackData
// 根据 suggestion 确定校验状态
verifyStatus := SuggestionToVerifyStatus(antispam.Suggestion)
// 审核时间
checkTime := antispam.CensorTime
if checkTime == 0 {
checkTime = antispam.CheckTime
}
// 更新日志
err = dao.MaterialVerifyLog.UpdateVerifyResult(ctx, log.Id, verifyStatus,
antispam.Suggestion, antispam.Label, antispam.ResultType, responseResult, checkTime)
if err != nil {
return fmt.Errorf("更新校验日志失败: %w", err)
}
// 更新原表状态(视频回调只处理视频来源)
if log.SourceTable == consts.SourceTableTencentVideo {
s.updateVideoStatus(ctx, log.SourceID, verifyStatus)
}
g.Log().Infof(ctx, "视频校验回调处理完成, taskId=%s, verifyStatus=%s, suggestion=%d",
antispam.TaskID, verifyStatus, antispam.Suggestion)
return nil
}
// =============================================================================
// 轮询模式处理
// =============================================================================
// 易盾检测状态常量
const (
YidunStatusNotStart = 0 // 未开始
YidunStatusProcessing = 1 // 检测中
YidunStatusSuccess = 2 // 检测成功
YidunStatusFailed = 3 // 检测失败
)
// ProcessImageResultByTask 根据任务ID处理图片结果轮询模式
func (s *MaterialVerifyService) ProcessImageResultByTask(ctx context.Context, taskID string) error {
log, err := dao.MaterialVerifyLog.GetByTaskID(ctx, taskID)
if err != nil || log == nil {
return fmt.Errorf("未找到校验日志, taskId=%s", taskID)
}
result, err := yidunService.ImageDetection.GetImageResult(ctx, taskID)
if err != nil {
// 判断是否是未找到结果或仍在检测中的错误
if err == yidunService.ErrImageResultNotFound || err == yidunService.ErrImageStillProcessing {
// 未获取到结果(任务不存在或仍在处理),不更新状态,保持等待下次轮询
g.Log().Infof(ctx, "图片检测结果未就绪, taskId=%s, 保持pending状态, err=%v", taskID, err)
return nil
}
// 其他错误如额度用光、网络错误、API错误等不更新状态保持待检验
// 只有易盾明确返回suggestion=BLOCK时才标记为失败
dao.MaterialVerifyLog.UpdateError(ctx, log.Id, entity.VerifyStatusPending, err.Error())
g.Log().Warningf(ctx, "图片检测查询失败(保持待检验), taskId=%s, error=%v", taskID, err)
return nil // 返回nil避免日志被反复处理但保持pending状态
}
// 判断检测状态
if result.Status == YidunStatusProcessing || result.Status == YidunStatusNotStart {
// 检测仍在进行中保持pending状态
g.Log().Infof(ctx, "图片检测仍在进行中, taskId=%s, status=%d, 保持pending状态", taskID, result.Status)
return nil
}
if result.Status == YidunStatusFailed {
// 易盾检测失败(如额度用光、服务端错误等),不更新状态,保持待检验
// 只有易盾明确返回suggestion=BLOCK时才标记为失败
errMsg := fmt.Sprintf("易盾检测失败, status=%d", result.Status)
dao.MaterialVerifyLog.UpdateError(ctx, log.Id, entity.VerifyStatusPending, errMsg)
g.Log().Warningf(ctx, "图片检测失败(保持待检验), taskId=%s, status=%d", taskID, result.Status)
return nil
}
// status == YidunStatusSuccess检测成功根据suggestion更新状态
verifyStatus := SuggestionToVerifyStatus(result.Suggestion)
responseJSON, _ := json.Marshal(result)
dao.MaterialVerifyLog.UpdateVerifyResult(ctx, log.Id, verifyStatus,
result.Suggestion, result.Label, result.ResultType, string(responseJSON), result.CensorTime)
if log.SourceTable == consts.SourceTableTencentImage {
s.updateImageStatus(ctx, log.SourceID, verifyStatus)
}
g.Log().Infof(ctx, "图片检测结果更新成功, taskId=%s, status=%d, suggestion=%d, verifyStatus=%s",
taskID, result.Status, result.Suggestion, verifyStatus)
return nil
}
// ProcessVideoResultByTask 根据任务ID处理视频结果轮询模式
func (s *MaterialVerifyService) ProcessVideoResultByTask(ctx context.Context, taskID string) error {
log, err := dao.MaterialVerifyLog.GetByTaskID(ctx, taskID)
if err != nil || log == nil {
return fmt.Errorf("未找到校验日志, taskId=%s", taskID)
}
result, err := yidunService.VideoDetection.GetVideoResult(ctx, taskID)
if err != nil {
// 判断是否是未找到结果或仍在检测中的错误
if err == yidunService.ErrVideoResultNotFound || err == yidunService.ErrVideoStillProcessing {
// 未获取到结果(任务不存在或仍在处理),不更新状态,保持等待下次轮询
g.Log().Infof(ctx, "视频检测结果未就绪, taskId=%s, 保持pending状态, err=%v", taskID, err)
return nil
}
// 其他错误如额度用光、网络错误、API错误等不更新状态保持待检验
// 只有易盾明确返回suggestion=BLOCK时才标记为失败
dao.MaterialVerifyLog.UpdateError(ctx, log.Id, entity.VerifyStatusPending, err.Error())
g.Log().Warningf(ctx, "视频检测查询失败(保持待检验), taskId=%s, error=%v", taskID, err)
return nil // 返回nil避免日志被反复处理但保持pending状态
}
// 判断检测状态
if result.Status == YidunStatusProcessing || result.Status == YidunStatusNotStart {
// 检测仍在进行中保持pending状态
g.Log().Infof(ctx, "视频检测仍在进行中, taskId=%s, status=%d, 保持pending状态", taskID, result.Status)
return nil
}
if result.Status == YidunStatusFailed {
// 易盾检测失败(如额度用光、服务端错误等),不更新状态,保持待检验
// 只有易盾明确返回suggestion=BLOCK时才标记为失败
errMsg := fmt.Sprintf("易盾检测失败, status=%d", result.Status)
dao.MaterialVerifyLog.UpdateError(ctx, log.Id, entity.VerifyStatusPending, errMsg)
g.Log().Warningf(ctx, "视频检测失败(保持待检验), taskId=%s, status=%d", taskID, result.Status)
return nil
}
// status == YidunStatusSuccess检测成功根据suggestion更新状态
verifyStatus := SuggestionToVerifyStatus(result.Suggestion)
responseJSON, _ := json.Marshal(result)
dao.MaterialVerifyLog.UpdateVerifyResult(ctx, log.Id, verifyStatus,
result.Suggestion, result.Label, result.ResultType, string(responseJSON), result.CensorTime)
if log.SourceTable == consts.SourceTableTencentVideo {
s.updateVideoStatus(ctx, log.SourceID, verifyStatus)
}
g.Log().Infof(ctx, "视频检测结果更新成功, taskId=%s, status=%d, suggestion=%d, verifyStatus=%s",
taskID, result.Status, result.Suggestion, verifyStatus)
return nil
}
// =============================================================================
// 辅助方法
// =============================================================================
// createVerifyLog 创建校验日志
func (s *MaterialVerifyService) createVerifyLog(ctx context.Context, materialType, materialID, sourceTable string, sourceID, accountID int64) *entity.MaterialVerifyLog {
log := &entity.MaterialVerifyLog{
TenantID: 0,
MaterialType: materialType,
MaterialID: materialID,
SourceTable: sourceTable,
SourceID: sourceID,
AccountID: accountID,
VerifyStatus: entity.VerifyStatusPending,
}
id, err := dao.MaterialVerifyLog.Create(ctx, log)
if err != nil {
g.Log().Errorf(ctx, "创建校验日志失败: %v", err)
return nil
}
log.Id = id
return log
}
// updateImageStatus 更新图片状态
func (s *MaterialVerifyService) updateImageStatus(ctx context.Context, imageID int64, verifyStatus string) {
_, err := dao.TencentImage.UpdateStatus(ctx, imageID, verifyStatus)
if err != nil {
g.Log().Errorf(ctx, "更新图片状态失败: %v", err)
} else {
g.Log().Infof(ctx, "更新图片状态成功, imageID=%d, status=%s", imageID, verifyStatus)
}
}
// updateVideoStatus 更新视频状态
func (s *MaterialVerifyService) updateVideoStatus(ctx context.Context, videoID int64, verifyStatus string) {
_, err := dao.TencentVideo.UpdateStatus(ctx, videoID, verifyStatus)
if err != nil {
g.Log().Errorf(ctx, "更新视频状态失败: %v", err)
} else {
g.Log().Infof(ctx, "更新视频状态成功, videoID=%d, status=%s", videoID, verifyStatus)
}
}
// =============================================================================
// 查询接口
// =============================================================================
// GetLogByID 根据ID获取日志
func (s *MaterialVerifyService) GetLogByID(ctx context.Context, id int64) (*entity.MaterialVerifyLog, error) {
return dao.MaterialVerifyLog.GetByID(ctx, id)
}
// GetLogsByMaterialID 根据素材ID获取日志列表
func (s *MaterialVerifyService) GetLogsByMaterialID(ctx context.Context, materialID string) ([]entity.MaterialVerifyLog, error) {
return dao.MaterialVerifyLog.GetByMaterialID(ctx, materialID)
}
// GetLogsByCondition 条件查询日志
func (s *MaterialVerifyService) GetLogsByCondition(ctx context.Context, condition map[string]interface{}, page, pageSize int) ([]entity.MaterialVerifyLog, int, error) {
return dao.MaterialVerifyLog.GetByCondition(ctx, condition, page, pageSize)
}
// GetStats 获取统计信息
func (s *MaterialVerifyService) GetStats(ctx context.Context) (map[string]int, error) {
return dao.MaterialVerifyLog.GetStats(ctx)
}
// =============================================================================
// 轮询模式 - 批量查询检测结果
// =============================================================================
// PollPendingResults 轮询所有待查询结果的日志(手动触发)
// 返回处理成功的数量和错误信息
func (s *MaterialVerifyService) PollPendingResults(ctx context.Context) (int, int, error) {
// 获取待查询的日志
logs, err := dao.MaterialVerifyLog.GetPendingResults(ctx, PollBatchSize)
if err != nil {
return 0, 0, err
}
if len(logs) == 0 {
g.Log().Infof(ctx, "没有待查询结果的日志")
return 0, 0, nil
}
g.Log().Infof(ctx, "开始轮询 %d 条待处理结果", len(logs))
successCount := 0
failCount := 0
var lastErr error
for _, log := range logs {
var err error
// 根据来源表判断调用哪个接口
if log.SourceTable == consts.SourceTableTencentImage {
err = s.ProcessImageResultByTask(ctx, log.TaskID)
} else if log.SourceTable == consts.SourceTableTencentVideo {
err = s.ProcessVideoResultByTask(ctx, log.TaskID)
} else {
g.Log().Warningf(ctx, "未知的来源表: %s, logId=%d", log.SourceTable, log.Id)
continue
}
if err != nil {
failCount++
lastErr = err
g.Log().Warningf(ctx, "处理结果失败, logId=%d, taskId=%s, error=%v", log.Id, log.TaskID, err)
} else {
successCount++
g.Log().Infof(ctx, "处理结果成功, logId=%d, taskId=%s", log.Id, log.TaskID)
}
// 避免请求过快
time.Sleep(100 * time.Millisecond)
}
g.Log().Infof(ctx, "轮询完成, 成功=%d, 失败=%d", successCount, failCount)
return successCount, failCount, lastErr
}
// PollPendingResultsByType 按类型轮询待查询结果的日志
func (s *MaterialVerifyService) PollPendingResultsByType(ctx context.Context, sourceTable string) (int, int, error) {
// 获取待查询的日志
logs, err := dao.MaterialVerifyLog.GetPendingResults(ctx, PollBatchSize)
if err != nil {
return 0, 0, err
}
// 过滤指定类型
var filteredLogs []entity.MaterialVerifyLog
for _, log := range logs {
if log.SourceTable == sourceTable {
filteredLogs = append(filteredLogs, log)
}
}
if len(filteredLogs) == 0 {
g.Log().Infof(ctx, "没有待查询结果的日志, sourceTable=%s", sourceTable)
return 0, 0, nil
}
g.Log().Infof(ctx, "开始轮询 %d 条待处理结果, sourceTable=%s", len(filteredLogs), sourceTable)
successCount := 0
failCount := 0
var lastErr error
for _, log := range filteredLogs {
var err error
if sourceTable == consts.SourceTableTencentImage {
err = s.ProcessImageResultByTask(ctx, log.TaskID)
} else if sourceTable == consts.SourceTableTencentVideo {
err = s.ProcessVideoResultByTask(ctx, log.TaskID)
}
if err != nil {
failCount++
lastErr = err
} else {
successCount++
}
time.Sleep(100 * time.Millisecond)
}
g.Log().Infof(ctx, "轮询完成, sourceTable=%s, 成功=%d, 失败=%d", sourceTable, successCount, failCount)
return successCount, failCount, lastErr
}
// PollPendingImageResults 轮询图片待查询结果
func (s *MaterialVerifyService) PollPendingImageResults(ctx context.Context) (int, int, error) {
return s.PollPendingResultsByType(ctx, consts.SourceTableTencentImage)
}
// PollPendingVideoResults 轮询视频待查询结果
func (s *MaterialVerifyService) PollPendingVideoResults(ctx context.Context) (int, int, error) {
return s.PollPendingResultsByType(ctx, consts.SourceTableTencentVideo)
}
// =============================================================================
// 导出服务 - 不通过数据导出
// =============================================================================
// ExportRejectedItem 导出的不通过数据项
type ExportRejectedItem struct {
ID int64 `json:"id"` // 素材表主键ID
MaterialID string `json:"materialId"` // 素材IDimageId/videoId
AccountID int64 `json:"accountId"` // 账户ID
CorporationName string `json:"corporationName"` // 公司名称
PreviewURL string `json:"previewUrl"` // 预览URL
Description string `json:"description"` // 描述
ErrorMsg string `json:"errorMsg"` // 失败原因最后一条失败日志的error_msg
MaterialType string `json:"materialType"` // 素材类型 IMAGE/VIDEO
ImageUsage string `json:"imageUsage"` // 图片用途(仅图片)
CreatedAt string `json:"createdAt"` // 检测时间(日志创建时间)
}
// getFailureReason 获取失败原因
func getFailureReason(log *entity.MaterialVerifyLog) string {
if log == nil {
return "无校验日志"
}
// 优先使用 error_msg
if log.ErrorMsg != "" {
return log.ErrorMsg
}
// 根据 suggestion 和 label 生成原因
reasonMap := map[int]string{
0: "内容检测通过",
1: "内容嫌疑(需人工审核)",
2: "内容不通过",
}
suggestionText := reasonMap[log.Suggestion]
if suggestionText == "" {
suggestionText = fmt.Sprintf("未知(suggestion=%d)", log.Suggestion)
}
// 如果有 response_result尝试提取更多信息
if log.ResponseResult != "" {
var resultMap map[string]interface{}
if err := json.Unmarshal([]byte(log.ResponseResult), &resultMap); err == nil {
if labels, ok := resultMap["labels"]; ok {
return fmt.Sprintf("%s (labels: %v)", suggestionText, labels)
}
}
return suggestionText
}
return suggestionText
}
// ExportRejectedData 导出不通过数据
func (s *MaterialVerifyService) ExportRejectedData(ctx context.Context, materialType string) ([]ExportRejectedItem, error) {
var items []ExportRejectedItem
// 加载账户名称映射
accountMap := make(map[int64]string)
if accounts, err := dao.TencentAccountRelation.GetAll(ctx); err == nil {
for _, acc := range accounts {
if acc.CorporationName != "" {
accountMap[acc.AccountID] = acc.CorporationName
}
}
}
// 处理图片
if materialType == "" || materialType == entity.MaterialTypeImage {
condition := map[string]interface{}{
entity.TencentImageCols.VerifyStatus: entity.VerifyStatusRejected,
}
images, total, err := dao.TencentImage.GetByCondition(ctx, condition, 1, 100000)
if err != nil {
g.Log().Errorf(ctx, "查询不通过图片失败: %v", err)
return nil, fmt.Errorf("查询不通过图片失败: %w", err)
}
g.Log().Infof(ctx, "导出不通过图片: total=%d, got=%d", total, len(images))
for _, img := range images {
// 查询最后一条失败的校验日志
log, _ := dao.MaterialVerifyLog.GetLastRejectedLogByMaterialID(ctx, img.ImageID, entity.VerifyStatusRejected)
var createdAtStr string
if log != nil && log.CreatedAt != nil {
createdAtStr = log.CreatedAt.Format("Y-m-d H:i:s")
}
items = append(items, ExportRejectedItem{
ID: img.Id,
MaterialID: img.ImageID,
AccountID: img.AccountID,
CorporationName: accountMap[img.AccountID],
PreviewURL: img.PreviewURL,
Description: img.Description,
ErrorMsg: getFailureReason(log),
MaterialType: entity.MaterialTypeImage,
ImageUsage: img.ImageUsage,
CreatedAt: createdAtStr,
})
}
}
// 处理视频
if materialType == "" || materialType == entity.MaterialTypeVideo {
condition := map[string]interface{}{
entity.TencentVideoCols.VerifyStatus: entity.VerifyStatusRejected,
}
videos, total, err := dao.TencentVideo.GetByCondition(ctx, condition, 1, 100000)
if err != nil {
g.Log().Errorf(ctx, "查询不通过视频失败: %v", err)
return nil, fmt.Errorf("查询不通过视频失败: %w", err)
}
g.Log().Infof(ctx, "导出不通过视频: total=%d, got=%d", total, len(videos))
for _, vid := range videos {
// 查询最后一条失败的校验日志
log, _ := dao.MaterialVerifyLog.GetLastRejectedLogByMaterialID(ctx, vid.VideoID, entity.VerifyStatusRejected)
var createdAtStr string
if log != nil && log.CreatedAt != nil {
createdAtStr = log.CreatedAt.Format("Y-m-d H:i:s")
}
items = append(items, ExportRejectedItem{
ID: vid.Id,
MaterialID: vid.VideoID,
AccountID: vid.AccountID,
CorporationName: accountMap[vid.AccountID],
PreviewURL: vid.PreviewURL,
Description: vid.Description,
ErrorMsg: getFailureReason(log),
MaterialType: entity.MaterialTypeVideo,
CreatedAt: createdAtStr,
})
}
}
return items, nil
}
2026-05-15 10:28:17 +08:00
// GetPendingResultsCount 获取待查询结果的数量
func (s *MaterialVerifyService) GetPendingResultsCount(ctx context.Context) (int, error) {
return dao.MaterialVerifyLog.CountPendingResults(ctx)
}