yidun送检功能

This commit is contained in:
2026-05-15 10:28:17 +08:00
parent 51d26aeee7
commit c8cc19e8e7
29 changed files with 5133 additions and 121 deletions

View File

@@ -0,0 +1,662 @@
package dataengine
import (
consts "cid/consts/dataengine"
dao "cid/dao/dataengine"
entity "cid/model/entity/dataengine"
yidunService "cid/service/yidun"
"context"
"encoding/json"
"fmt"
"time"
"github.com/gogf/gf/v2/frame/g"
)
// 轮询配置常量
const (
// PollBatchSize 每次轮询处理数量
PollBatchSize = 20
)
// 状态常量
const (
// 原表状态 - 与 tencent_image/tencent_video 表的 status 字段对应
StatusSubmitting = consts.CheckStatusSubmitting // 送检中
)
// MaterialVerifyService 素材校验服务
type MaterialVerifyService struct{}
// MaterialVerify 校验服务单例
var MaterialVerify = new(MaterialVerifyService)
// =============================================================================
// 校验状态转换
// =============================================================================
// SuggestionToVerifyStatus 根据易盾处置建议转换为校验状态
func SuggestionToVerifyStatus(suggestion int) string {
switch suggestion {
case consts.SuggestionPass:
return entity.VerifyStatusVerified // 通过
case consts.SuggestionReview:
return entity.VerifyStatusPending // 嫌疑,需要人工审核,暂不更新状态
case consts.SuggestionBlock:
return entity.VerifyStatusRejected // 不通过
default:
return entity.VerifyStatusPending
}
}
// =============================================================================
// 图片校验
// =============================================================================
// VerifyImageByID 根据图片ID执行校验
func (s *MaterialVerifyService) VerifyImageByID(ctx context.Context, imageID string) (*entity.MaterialVerifyLog, error) {
// 1. 获取图片数据
image, err := dao.TencentImage.GetByImageID(ctx, imageID)
if err != nil {
return nil, fmt.Errorf("查询图片数据失败: %w", err)
}
if image == nil {
return nil, fmt.Errorf("未找到图片数据, imageID=%s", imageID)
}
// 2. 创建校验日志
log := s.createVerifyLog(ctx, entity.MaterialTypeImage, imageID, consts.SourceTableTencentImage, image.Id, image.AccountID)
if log == nil {
return nil, fmt.Errorf("创建校验日志失败")
}
// 3. 执行校验
err = s.submitImageCheck(ctx, image, log)
if err != nil {
return nil, err
}
return log, nil
}
// submitImageCheck 提交图片校验
func (s *MaterialVerifyService) submitImageCheck(ctx context.Context, image *entity.TencentImage, log *entity.MaterialVerifyLog) error {
startTime := time.Now()
// 获取回调模式开关
callbackMode := g.Cfg().MustGet(ctx, "yidun.callback_mode").Bool()
// 构建请求参数
requestParams := map[string]interface{}{
"imageURL": image.PreviewURL,
"dataID": image.ImageID,
}
requestParamsJSON, _ := json.Marshal(requestParams)
var (
taskID string
duration int64
)
if callbackMode {
// 回调模式:使用异步检测,易盾处理完成后会回调
callbackURL := g.Cfg().MustGet(ctx, "yidun.image.callback_url").String()
requestParams["callbackURL"] = callbackURL
result, err := yidunService.ImageDetection.DetectImage(ctx, image.PreviewURL, image.ImageID, callbackURL)
duration = time.Since(startTime).Milliseconds()
if err != nil {
dao.MaterialVerifyLog.UpdateError(ctx, log.Id, entity.VerifyStatusPending, err.Error())
dao.MaterialVerifyLog.UpdateDuration(ctx, log.Id, duration)
g.Log().Warningf(ctx, "图片异步检测失败(保持待检验), id=%d, imageId=%s, error=%v", image.Id, image.ImageID, err)
return fmt.Errorf("图片异步检测失败: %w", err)
}
taskID = result.TaskID
// 保存任务ID和请求参数
dao.MaterialVerifyLog.UpdateTaskID(ctx, log.Id, taskID)
dao.MaterialVerifyLog.UpdateRequestParams(ctx, log.Id, string(requestParamsJSON))
// 更新原表状态为 submitting等待回调
s.updateImageStatus(ctx, image.Id, StatusSubmitting)
g.Log().Infof(ctx, "图片异步检测已提交, id=%d, imageId=%s, taskId=%s, duration=%dms",
image.Id, image.ImageID, taskID, duration)
} else {
// 轮询模式:使用同步检测,直接返回结果
syncResult, err := yidunService.ImageDetection.DetectImageSync(ctx, image.PreviewURL, image.ImageID)
duration = time.Since(startTime).Milliseconds()
if err != nil {
dao.MaterialVerifyLog.UpdateError(ctx, log.Id, entity.VerifyStatusPending, err.Error())
dao.MaterialVerifyLog.UpdateDuration(ctx, log.Id, duration)
g.Log().Warningf(ctx, "图片同步检测失败(保持待检验), id=%d, imageId=%s, error=%v", image.Id, image.ImageID, err)
return fmt.Errorf("图片同步检测失败: %w", err)
}
taskID = syncResult.TaskID
// 保存任务ID和请求参数
dao.MaterialVerifyLog.UpdateTaskID(ctx, log.Id, taskID)
dao.MaterialVerifyLog.UpdateRequestParams(ctx, log.Id, string(requestParamsJSON))
// 根据同步结果更新状态
verifyStatus := SuggestionToVerifyStatus(syncResult.Suggestion)
responseJSON, _ := json.Marshal(syncResult)
dao.MaterialVerifyLog.UpdateVerifyResult(ctx, log.Id, verifyStatus,
syncResult.Suggestion, syncResult.Label, syncResult.ResultType, string(responseJSON), syncResult.CensorTime)
s.updateImageStatus(ctx, image.Id, verifyStatus)
g.Log().Infof(ctx, "图片同步检测完成, id=%d, imageId=%s, taskId=%s, suggestion=%d, verifyStatus=%s, duration=%dms",
image.Id, image.ImageID, taskID, syncResult.Suggestion, verifyStatus, duration)
}
return nil
}
// =============================================================================
// 视频校验
// =============================================================================
// VerifyVideoByID 根据视频ID执行校验
func (s *MaterialVerifyService) VerifyVideoByID(ctx context.Context, videoID string) (*entity.MaterialVerifyLog, error) {
// 1. 获取视频数据
video, err := dao.TencentVideo.GetByVideoID(ctx, videoID)
if err != nil {
return nil, fmt.Errorf("查询视频数据失败: %w", err)
}
if video == nil {
return nil, fmt.Errorf("未找到视频数据, videoID=%s", videoID)
}
// 2. 创建校验日志
log := s.createVerifyLog(ctx, entity.MaterialTypeVideo, videoID, consts.SourceTableTencentVideo, video.Id, video.AccountID)
if log == nil {
return nil, fmt.Errorf("创建校验日志失败")
}
// 3. 执行校验
err = s.submitVideoCheck(ctx, video, log)
if err != nil {
return nil, err
}
return log, nil
}
// submitVideoCheck 提交视频校验
func (s *MaterialVerifyService) submitVideoCheck(ctx context.Context, video *entity.TencentVideo, log *entity.MaterialVerifyLog) error {
startTime := time.Now()
// 获取回调模式开关
callbackMode := g.Cfg().MustGet(ctx, "yidun.callback_mode").Bool()
// 根据开关决定回调地址
var callbackURL string
if callbackMode {
callbackURL = g.Cfg().MustGet(ctx, "yidun.video.callback_url").String()
}
// 构建请求参数
requestParams := map[string]interface{}{
"videoURL": video.PreviewURL,
"dataID": video.VideoID,
"callbackURL": callbackURL,
}
requestParamsJSON, _ := json.Marshal(requestParams)
// 调用易盾视频检测
result, err := yidunService.VideoDetection.DetectVideo(ctx, video.PreviewURL, video.VideoID, callbackURL)
duration := time.Since(startTime).Milliseconds()
if err != nil {
// 调用易盾接口失败(如额度用光、网络错误、超时等),不更新状态,保持待检验
// 只有易盾明确返回检测结果且suggestion=BLOCK时才标记为失败
dao.MaterialVerifyLog.UpdateError(ctx, log.Id, entity.VerifyStatusPending, err.Error())
dao.MaterialVerifyLog.UpdateDuration(ctx, log.Id, duration)
g.Log().Warningf(ctx, "视频校验接口调用失败(保持待检验), id=%d, videoId=%s, error=%v", video.Id, video.VideoID, err)
return fmt.Errorf("视频校验调用失败: %w", err)
}
// 保存任务ID和请求参数
dao.MaterialVerifyLog.UpdateTaskID(ctx, log.Id, result.TaskID)
dao.MaterialVerifyLog.UpdateRequestParams(ctx, log.Id, string(requestParamsJSON))
// 更新原表状态为 submitting
s.updateVideoStatus(ctx, video.Id, StatusSubmitting)
// 轮询模式(无回调):提交后立即尝试查询检测结果
if !callbackMode {
g.Log().Infof(ctx, "轮询模式:提交后立即查询结果, taskId=%s", result.TaskID)
// 等待500ms让易盾有时间处理
time.Sleep(500 * time.Millisecond)
if err := s.ProcessVideoResultByTask(ctx, result.TaskID); err != nil {
g.Log().Warningf(ctx, "提交后立即查询结果失败(不影响状态,后续轮询继续), taskId=%s, error=%v", result.TaskID, err)
}
}
g.Log().Infof(ctx, "视频校验已提交, id=%d, videoId=%s, taskId=%s, duration=%dms",
video.Id, video.VideoID, result.TaskID, duration)
return nil
}
// =============================================================================
// 回调处理
// =============================================================================
// ProcessImageCallback 处理图片校验回调
func (s *MaterialVerifyService) ProcessImageCallback(ctx context.Context, callbackData string) error {
g.Log().Infof(ctx, "处理图片校验回调, data: %s", callbackData)
var callback yidunService.ImageCallbackData
if err := json.Unmarshal([]byte(callbackData), &callback); err != nil {
g.Log().Errorf(ctx, "解析图片回调数据失败: %v", err)
return fmt.Errorf("解析回调数据失败: %w", err)
}
if callback.Antispam == nil {
return fmt.Errorf("回调数据格式错误缺少antispam字段")
}
antispam := callback.Antispam
g.Log().Infof(ctx, "处理图片校验结果 - taskId: %s, suggestion: %d, resultType: %d",
antispam.TaskId, antispam.Suggestion, antispam.ResultType)
// 根据 taskId 查找校验日志
log, err := dao.MaterialVerifyLog.GetByTaskID(ctx, antispam.TaskId)
if err != nil {
return fmt.Errorf("查找校验日志失败: %w", err)
}
if log == nil {
g.Log().Warningf(ctx, "未找到校验日志, taskId=%s", antispam.TaskId)
return nil
}
// 构建响应结果
responseResult := callbackData
// 根据 suggestion 确定校验状态
verifyStatus := SuggestionToVerifyStatus(antispam.Suggestion)
// 更新日志
err = dao.MaterialVerifyLog.UpdateVerifyResult(ctx, log.Id, verifyStatus,
antispam.Suggestion, antispam.Label, antispam.ResultType, responseResult, antispam.CensorTime)
if err != nil {
return fmt.Errorf("更新校验日志失败: %w", err)
}
// 更新原表状态(图片回调只处理图片来源)
if log.SourceTable == consts.SourceTableTencentImage {
s.updateImageStatus(ctx, log.SourceID, verifyStatus)
}
g.Log().Infof(ctx, "图片校验回调处理完成, taskId=%s, verifyStatus=%s, suggestion=%d",
antispam.TaskId, verifyStatus, antispam.Suggestion)
return nil
}
// ProcessVideoCallback 处理视频校验回调
func (s *MaterialVerifyService) ProcessVideoCallback(ctx context.Context, callbackData string) error {
g.Log().Infof(ctx, "处理视频校验回调, data: %s", callbackData)
var callback yidunService.VideoCallbackData
if err := json.Unmarshal([]byte(callbackData), &callback); err != nil {
g.Log().Errorf(ctx, "解析视频回调数据失败: %v", err)
return fmt.Errorf("解析回调数据失败: %w", err)
}
if callback.Antispam == nil {
return fmt.Errorf("视频回调数据格式错误缺少antispam字段")
}
antispam := callback.Antispam
g.Log().Infof(ctx, "处理视频校验结果 - taskId: %s, suggestion: %d, resultType: %d",
antispam.TaskID, antispam.Suggestion, antispam.ResultType)
// 根据 taskId 查找校验日志
log, err := dao.MaterialVerifyLog.GetByTaskID(ctx, antispam.TaskID)
if err != nil {
return fmt.Errorf("查找校验日志失败: %w", err)
}
if log == nil {
g.Log().Warningf(ctx, "未找到校验日志, taskId=%s", antispam.TaskID)
return nil
}
// 构建响应结果
responseResult := callbackData
// 根据 suggestion 确定校验状态
verifyStatus := SuggestionToVerifyStatus(antispam.Suggestion)
// 审核时间
checkTime := antispam.CensorTime
if checkTime == 0 {
checkTime = antispam.CheckTime
}
// 更新日志
err = dao.MaterialVerifyLog.UpdateVerifyResult(ctx, log.Id, verifyStatus,
antispam.Suggestion, antispam.Label, antispam.ResultType, responseResult, checkTime)
if err != nil {
return fmt.Errorf("更新校验日志失败: %w", err)
}
// 更新原表状态(视频回调只处理视频来源)
if log.SourceTable == consts.SourceTableTencentVideo {
s.updateVideoStatus(ctx, log.SourceID, verifyStatus)
}
g.Log().Infof(ctx, "视频校验回调处理完成, taskId=%s, verifyStatus=%s, suggestion=%d",
antispam.TaskID, verifyStatus, antispam.Suggestion)
return nil
}
// =============================================================================
// 轮询模式处理
// =============================================================================
// 易盾检测状态常量
const (
YidunStatusNotStart = 0 // 未开始
YidunStatusProcessing = 1 // 检测中
YidunStatusSuccess = 2 // 检测成功
YidunStatusFailed = 3 // 检测失败
)
// ProcessImageResultByTask 根据任务ID处理图片结果轮询模式
func (s *MaterialVerifyService) ProcessImageResultByTask(ctx context.Context, taskID string) error {
log, err := dao.MaterialVerifyLog.GetByTaskID(ctx, taskID)
if err != nil || log == nil {
return fmt.Errorf("未找到校验日志, taskId=%s", taskID)
}
result, err := yidunService.ImageDetection.GetImageResult(ctx, taskID)
if err != nil {
// 判断是否是未找到结果或仍在检测中的错误
if err == yidunService.ErrImageResultNotFound || err == yidunService.ErrImageStillProcessing {
// 未获取到结果(任务不存在或仍在处理),不更新状态,保持等待下次轮询
g.Log().Infof(ctx, "图片检测结果未就绪, taskId=%s, 保持pending状态, err=%v", taskID, err)
return nil
}
// 其他错误如额度用光、网络错误、API错误等不更新状态保持待检验
// 只有易盾明确返回suggestion=BLOCK时才标记为失败
dao.MaterialVerifyLog.UpdateError(ctx, log.Id, entity.VerifyStatusPending, err.Error())
g.Log().Warningf(ctx, "图片检测查询失败(保持待检验), taskId=%s, error=%v", taskID, err)
return nil // 返回nil避免日志被反复处理但保持pending状态
}
// 判断检测状态
if result.Status == YidunStatusProcessing || result.Status == YidunStatusNotStart {
// 检测仍在进行中保持pending状态
g.Log().Infof(ctx, "图片检测仍在进行中, taskId=%s, status=%d, 保持pending状态", taskID, result.Status)
return nil
}
if result.Status == YidunStatusFailed {
// 易盾检测失败(如额度用光、服务端错误等),不更新状态,保持待检验
// 只有易盾明确返回suggestion=BLOCK时才标记为失败
errMsg := fmt.Sprintf("易盾检测失败, status=%d", result.Status)
dao.MaterialVerifyLog.UpdateError(ctx, log.Id, entity.VerifyStatusPending, errMsg)
g.Log().Warningf(ctx, "图片检测失败(保持待检验), taskId=%s, status=%d", taskID, result.Status)
return nil
}
// status == YidunStatusSuccess检测成功根据suggestion更新状态
verifyStatus := SuggestionToVerifyStatus(result.Suggestion)
responseJSON, _ := json.Marshal(result)
dao.MaterialVerifyLog.UpdateVerifyResult(ctx, log.Id, verifyStatus,
result.Suggestion, result.Label, result.ResultType, string(responseJSON), result.CensorTime)
if log.SourceTable == consts.SourceTableTencentImage {
s.updateImageStatus(ctx, log.SourceID, verifyStatus)
}
g.Log().Infof(ctx, "图片检测结果更新成功, taskId=%s, status=%d, suggestion=%d, verifyStatus=%s",
taskID, result.Status, result.Suggestion, verifyStatus)
return nil
}
// ProcessVideoResultByTask 根据任务ID处理视频结果轮询模式
func (s *MaterialVerifyService) ProcessVideoResultByTask(ctx context.Context, taskID string) error {
log, err := dao.MaterialVerifyLog.GetByTaskID(ctx, taskID)
if err != nil || log == nil {
return fmt.Errorf("未找到校验日志, taskId=%s", taskID)
}
result, err := yidunService.VideoDetection.GetVideoResult(ctx, taskID)
if err != nil {
// 判断是否是未找到结果或仍在检测中的错误
if err == yidunService.ErrVideoResultNotFound || err == yidunService.ErrVideoStillProcessing {
// 未获取到结果(任务不存在或仍在处理),不更新状态,保持等待下次轮询
g.Log().Infof(ctx, "视频检测结果未就绪, taskId=%s, 保持pending状态, err=%v", taskID, err)
return nil
}
// 其他错误如额度用光、网络错误、API错误等不更新状态保持待检验
// 只有易盾明确返回suggestion=BLOCK时才标记为失败
dao.MaterialVerifyLog.UpdateError(ctx, log.Id, entity.VerifyStatusPending, err.Error())
g.Log().Warningf(ctx, "视频检测查询失败(保持待检验), taskId=%s, error=%v", taskID, err)
return nil // 返回nil避免日志被反复处理但保持pending状态
}
// 判断检测状态
if result.Status == YidunStatusProcessing || result.Status == YidunStatusNotStart {
// 检测仍在进行中保持pending状态
g.Log().Infof(ctx, "视频检测仍在进行中, taskId=%s, status=%d, 保持pending状态", taskID, result.Status)
return nil
}
if result.Status == YidunStatusFailed {
// 易盾检测失败(如额度用光、服务端错误等),不更新状态,保持待检验
// 只有易盾明确返回suggestion=BLOCK时才标记为失败
errMsg := fmt.Sprintf("易盾检测失败, status=%d", result.Status)
dao.MaterialVerifyLog.UpdateError(ctx, log.Id, entity.VerifyStatusPending, errMsg)
g.Log().Warningf(ctx, "视频检测失败(保持待检验), taskId=%s, status=%d", taskID, result.Status)
return nil
}
// status == YidunStatusSuccess检测成功根据suggestion更新状态
verifyStatus := SuggestionToVerifyStatus(result.Suggestion)
responseJSON, _ := json.Marshal(result)
dao.MaterialVerifyLog.UpdateVerifyResult(ctx, log.Id, verifyStatus,
result.Suggestion, result.Label, result.ResultType, string(responseJSON), result.CensorTime)
if log.SourceTable == consts.SourceTableTencentVideo {
s.updateVideoStatus(ctx, log.SourceID, verifyStatus)
}
g.Log().Infof(ctx, "视频检测结果更新成功, taskId=%s, status=%d, suggestion=%d, verifyStatus=%s",
taskID, result.Status, result.Suggestion, verifyStatus)
return nil
}
// =============================================================================
// 辅助方法
// =============================================================================
// createVerifyLog 创建校验日志
func (s *MaterialVerifyService) createVerifyLog(ctx context.Context, materialType, materialID, sourceTable string, sourceID, accountID int64) *entity.MaterialVerifyLog {
log := &entity.MaterialVerifyLog{
TenantID: 0,
MaterialType: materialType,
MaterialID: materialID,
SourceTable: sourceTable,
SourceID: sourceID,
AccountID: accountID,
VerifyStatus: entity.VerifyStatusPending,
}
id, err := dao.MaterialVerifyLog.Create(ctx, log)
if err != nil {
g.Log().Errorf(ctx, "创建校验日志失败: %v", err)
return nil
}
log.Id = id
return log
}
// updateImageStatus 更新图片状态
func (s *MaterialVerifyService) updateImageStatus(ctx context.Context, imageID int64, verifyStatus string) {
_, err := dao.TencentImage.UpdateStatus(ctx, imageID, verifyStatus)
if err != nil {
g.Log().Errorf(ctx, "更新图片状态失败: %v", err)
} else {
g.Log().Infof(ctx, "更新图片状态成功, imageID=%d, status=%s", imageID, verifyStatus)
}
}
// updateVideoStatus 更新视频状态
func (s *MaterialVerifyService) updateVideoStatus(ctx context.Context, videoID int64, verifyStatus string) {
_, err := dao.TencentVideo.UpdateStatus(ctx, videoID, verifyStatus)
if err != nil {
g.Log().Errorf(ctx, "更新视频状态失败: %v", err)
} else {
g.Log().Infof(ctx, "更新视频状态成功, videoID=%d, status=%s", videoID, verifyStatus)
}
}
// =============================================================================
// 查询接口
// =============================================================================
// GetLogByID 根据ID获取日志
func (s *MaterialVerifyService) GetLogByID(ctx context.Context, id int64) (*entity.MaterialVerifyLog, error) {
return dao.MaterialVerifyLog.GetByID(ctx, id)
}
// GetLogsByMaterialID 根据素材ID获取日志列表
func (s *MaterialVerifyService) GetLogsByMaterialID(ctx context.Context, materialID string) ([]entity.MaterialVerifyLog, error) {
return dao.MaterialVerifyLog.GetByMaterialID(ctx, materialID)
}
// GetLogsByCondition 条件查询日志
func (s *MaterialVerifyService) GetLogsByCondition(ctx context.Context, condition map[string]interface{}, page, pageSize int) ([]entity.MaterialVerifyLog, int, error) {
return dao.MaterialVerifyLog.GetByCondition(ctx, condition, page, pageSize)
}
// GetStats 获取统计信息
func (s *MaterialVerifyService) GetStats(ctx context.Context) (map[string]int, error) {
return dao.MaterialVerifyLog.GetStats(ctx)
}
// =============================================================================
// 轮询模式 - 批量查询检测结果
// =============================================================================
// PollPendingResults 轮询所有待查询结果的日志(手动触发)
// 返回处理成功的数量和错误信息
func (s *MaterialVerifyService) PollPendingResults(ctx context.Context) (int, int, error) {
// 获取待查询的日志
logs, err := dao.MaterialVerifyLog.GetPendingResults(ctx, PollBatchSize)
if err != nil {
return 0, 0, err
}
if len(logs) == 0 {
g.Log().Infof(ctx, "没有待查询结果的日志")
return 0, 0, nil
}
g.Log().Infof(ctx, "开始轮询 %d 条待处理结果", len(logs))
successCount := 0
failCount := 0
var lastErr error
for _, log := range logs {
var err error
// 根据来源表判断调用哪个接口
if log.SourceTable == consts.SourceTableTencentImage {
err = s.ProcessImageResultByTask(ctx, log.TaskID)
} else if log.SourceTable == consts.SourceTableTencentVideo {
err = s.ProcessVideoResultByTask(ctx, log.TaskID)
} else {
g.Log().Warningf(ctx, "未知的来源表: %s, logId=%d", log.SourceTable, log.Id)
continue
}
if err != nil {
failCount++
lastErr = err
g.Log().Warningf(ctx, "处理结果失败, logId=%d, taskId=%s, error=%v", log.Id, log.TaskID, err)
} else {
successCount++
g.Log().Infof(ctx, "处理结果成功, logId=%d, taskId=%s", log.Id, log.TaskID)
}
// 避免请求过快
time.Sleep(100 * time.Millisecond)
}
g.Log().Infof(ctx, "轮询完成, 成功=%d, 失败=%d", successCount, failCount)
return successCount, failCount, lastErr
}
// PollPendingResultsByType 按类型轮询待查询结果的日志
func (s *MaterialVerifyService) PollPendingResultsByType(ctx context.Context, sourceTable string) (int, int, error) {
// 获取待查询的日志
logs, err := dao.MaterialVerifyLog.GetPendingResults(ctx, PollBatchSize)
if err != nil {
return 0, 0, err
}
// 过滤指定类型
var filteredLogs []entity.MaterialVerifyLog
for _, log := range logs {
if log.SourceTable == sourceTable {
filteredLogs = append(filteredLogs, log)
}
}
if len(filteredLogs) == 0 {
g.Log().Infof(ctx, "没有待查询结果的日志, sourceTable=%s", sourceTable)
return 0, 0, nil
}
g.Log().Infof(ctx, "开始轮询 %d 条待处理结果, sourceTable=%s", len(filteredLogs), sourceTable)
successCount := 0
failCount := 0
var lastErr error
for _, log := range filteredLogs {
var err error
if sourceTable == consts.SourceTableTencentImage {
err = s.ProcessImageResultByTask(ctx, log.TaskID)
} else if sourceTable == consts.SourceTableTencentVideo {
err = s.ProcessVideoResultByTask(ctx, log.TaskID)
}
if err != nil {
failCount++
lastErr = err
} else {
successCount++
}
time.Sleep(100 * time.Millisecond)
}
g.Log().Infof(ctx, "轮询完成, sourceTable=%s, 成功=%d, 失败=%d", sourceTable, successCount, failCount)
return successCount, failCount, lastErr
}
// PollPendingImageResults 轮询图片待查询结果
func (s *MaterialVerifyService) PollPendingImageResults(ctx context.Context) (int, int, error) {
return s.PollPendingResultsByType(ctx, consts.SourceTableTencentImage)
}
// PollPendingVideoResults 轮询视频待查询结果
func (s *MaterialVerifyService) PollPendingVideoResults(ctx context.Context) (int, int, error) {
return s.PollPendingResultsByType(ctx, consts.SourceTableTencentVideo)
}
// GetPendingResultsCount 获取待查询结果的数量
func (s *MaterialVerifyService) GetPendingResultsCount(ctx context.Context) (int, error) {
return dao.MaterialVerifyLog.CountPendingResults(ctx)
}

View File

@@ -0,0 +1,190 @@
package dataengine
import (
consts "cid/consts/dataengine"
dao "cid/dao/dataengine"
entity "cid/model/entity/dataengine"
yidunService "cid/service/yidun"
"context"
"encoding/json"
"fmt"
"github.com/gogf/gf/v2/frame/g"
)
// TencentContentCallbackService 腾讯内容检测回调处理服务
type TencentContentCallbackService struct{}
// TencentContentCallback 回调处理服务单例
var TencentContentCallback = new(TencentContentCallbackService)
// ProcessImageCallback 处理图片检测回调
func (s *TencentContentCallbackService) ProcessImageCallback(ctx context.Context, callbackData string) error {
g.Log().Infof(ctx, "处理图片检测回调, data: %s", callbackData)
var callback yidunService.ImageCallbackData
if err := json.Unmarshal([]byte(callbackData), &callback); err != nil {
g.Log().Errorf(ctx, "解析图片回调数据失败: %v", err)
return fmt.Errorf("解析回调数据失败: %w", err)
}
if callback.Antispam == nil {
return fmt.Errorf("回调数据格式错误缺少antispam字段")
}
antispam := callback.Antispam
g.Log().Infof(ctx, "处理图片检测结果 - taskId: %s, suggestion: %d, resultType: %d",
antispam.TaskId, antispam.Suggestion, antispam.ResultType)
// 根据 taskId 查找送检日志
log, err := dao.TencentContentCheckLog.GetByTaskID(ctx, antispam.TaskId)
if err != nil {
g.Log().Errorf(ctx, "查找送检日志失败, taskId=%s: %v", antispam.TaskId, err)
return fmt.Errorf("查找送检日志失败: %w", err)
}
if log == nil {
g.Log().Warningf(ctx, "未找到送检日志, taskId=%s", antispam.TaskId)
return nil
}
// 更新送检日志
checkTime := antispam.CensorTime
err = dao.TencentContentCheckLog.UpdateCheckResult(ctx, log.Id,
antispam.Suggestion, antispam.Label, antispam.ResultType, checkTime)
if err != nil {
g.Log().Errorf(ctx, "更新送检日志检测结果失败: %v", err)
return err
}
g.Log().Infof(ctx, "图片检测回调处理完成, taskId=%s, suggestion=%d", antispam.TaskId, antispam.Suggestion)
return nil
}
// ProcessVideoCallback 处理视频检测回调
func (s *TencentContentCallbackService) ProcessVideoCallback(ctx context.Context, callbackData string) error {
g.Log().Infof(ctx, "处理视频检测回调, data: %s", callbackData)
var callback yidunService.VideoCallbackData
if err := json.Unmarshal([]byte(callbackData), &callback); err != nil {
g.Log().Errorf(ctx, "解析视频回调数据失败: %v", err)
return fmt.Errorf("解析回调数据失败: %w", err)
}
if callback.Antispam == nil {
return fmt.Errorf("回调数据格式错误缺少antispam字段")
}
antispam := callback.Antispam
g.Log().Infof(ctx, "处理视频检测结果 - taskId: %s, suggestion: %d, resultType: %d, censorSource: %d",
antispam.TaskID, antispam.Suggestion, antispam.ResultType, antispam.CensorSource)
// 根据 taskId 查找送检日志
log, err := dao.TencentContentCheckLog.GetByTaskID(ctx, antispam.TaskID)
if err != nil {
g.Log().Errorf(ctx, "查找送检日志失败, taskId=%s: %v", antispam.TaskID, err)
return fmt.Errorf("查找送检日志失败: %w", err)
}
if log == nil {
g.Log().Warningf(ctx, "未找到送检日志, taskId=%s", antispam.TaskID)
return nil
}
// 更新送检日志
checkTime := antispam.CensorTime
if checkTime == 0 {
checkTime = antispam.CheckTime
}
err = dao.TencentContentCheckLog.UpdateCheckResult(ctx, log.Id,
antispam.Suggestion, antispam.Label, antispam.ResultType, checkTime)
if err != nil {
g.Log().Errorf(ctx, "更新送检日志检测结果失败: %v", err)
return err
}
g.Log().Infof(ctx, "视频检测回调处理完成, taskId=%s, suggestion=%d", antispam.TaskID, antispam.Suggestion)
return nil
}
// ProcessImageResult 手动处理图片检测结果(轮询模式)
func (s *TencentContentCallbackService) ProcessImageResult(ctx context.Context, taskID string) error {
g.Log().Infof(ctx, "查询图片检测结果, taskId: %s", taskID)
// 查找送检日志
log, err := dao.TencentContentCheckLog.GetByTaskID(ctx, taskID)
if err != nil || log == nil {
return fmt.Errorf("未找到送检日志, taskId=%s", taskID)
}
// 调用易盾查询结果
result, err := yidunService.ImageDetection.GetImageResult(ctx, taskID)
if err != nil {
g.Log().Errorf(ctx, "查询图片检测结果失败: %v", err)
return err
}
// 更新日志
err = dao.TencentContentCheckLog.UpdateCheckResult(ctx, log.Id,
result.Suggestion, result.Label, result.ResultType, result.CensorTime)
if err != nil {
g.Log().Errorf(ctx, "更新送检日志检测结果失败: %v", err)
return err
}
g.Log().Infof(ctx, "图片检测结果处理完成, taskId=%s, suggestion=%d", taskID, result.Suggestion)
return nil
}
// ProcessVideoResult 手动处理视频检测结果(轮询模式)
func (s *TencentContentCallbackService) ProcessVideoResult(ctx context.Context, taskID string) error {
g.Log().Infof(ctx, "查询视频检测结果, taskId: %s", taskID)
// 查找送检日志
log, err := dao.TencentContentCheckLog.GetByTaskID(ctx, taskID)
if err != nil || log == nil {
return fmt.Errorf("未找到送检日志, taskId=%s", taskID)
}
// 调用易盾查询结果
result, err := yidunService.VideoDetection.GetVideoResult(ctx, taskID)
if err != nil {
g.Log().Errorf(ctx, "查询视频检测结果失败: %v", err)
return err
}
// 更新日志
err = dao.TencentContentCheckLog.UpdateCheckResult(ctx, log.Id,
result.Suggestion, result.Label, result.ResultType, result.CensorTime)
if err != nil {
g.Log().Errorf(ctx, "更新送检日志检测结果失败: %v", err)
return err
}
g.Log().Infof(ctx, "视频检测结果处理完成, taskId=%s, suggestion=%d", taskID, result.Suggestion)
return nil
}
// GetCheckLogsByImageID 根据图片ID获取送检日志
func (s *TencentContentCallbackService) GetCheckLogsByImageID(ctx context.Context, imageID string) ([]entity.TencentContentCheckLog, error) {
// 先获取图片数据
image, err := dao.TencentImage.GetByImageID(ctx, imageID)
if err != nil || image == nil {
return nil, fmt.Errorf("未找到图片数据")
}
return dao.TencentContentCheckLog.GetBySourceID(ctx, consts.SourceTableTencentImage, image.Id)
}
// GetCheckLogsByVideoID 根据视频ID获取送检日志
func (s *TencentContentCallbackService) GetCheckLogsByVideoID(ctx context.Context, videoID string) ([]entity.TencentContentCheckLog, error) {
// 先获取视频数据
video, err := dao.TencentVideo.GetByVideoID(ctx, videoID)
if err != nil || video == nil {
return nil, fmt.Errorf("未找到视频数据")
}
return dao.TencentContentCheckLog.GetBySourceID(ctx, consts.SourceTableTencentVideo, video.Id)
}

View File

@@ -0,0 +1,390 @@
package dataengine
import (
consts "cid/consts/dataengine"
dao "cid/dao/dataengine"
entity "cid/model/entity/dataengine"
yidunService "cid/service/yidun"
"context"
"encoding/json"
"fmt"
"time"
"gitea.com/red-future/common/beans"
"github.com/gogf/gf/v2/frame/g"
)
// ContentCheckConfig 送检配置
type ContentCheckConfig struct {
// 每批处理数量
BatchSize int `json:"batch_size"`
// 图片检测启用
ImageEnabled bool `json:"image_enabled"`
// 视频检测启用
VideoEnabled bool `json:"video_enabled"`
// 定时任务间隔(秒)
IntervalSeconds int `json:"interval_seconds"`
}
// DefaultConfig 默认配置
var DefaultConfig = ContentCheckConfig{
BatchSize: 10,
ImageEnabled: true,
VideoEnabled: true,
IntervalSeconds: 30,
}
// TencentContentCheckService 腾讯内容送检服务
type TencentContentCheckService struct {
config ContentCheckConfig
isRunning bool
}
// TencentContentCheck 送检服务单例
var TencentContentCheck = &TencentContentCheckService{
config: DefaultConfig,
}
// SetConfig 设置配置
func (s *TencentContentCheckService) SetConfig(config ContentCheckConfig) {
s.config = config
}
// Start 启动定时任务
func (s *TencentContentCheckService) Start(ctx context.Context) error {
if s.isRunning {
g.Log().Info(ctx, "送检服务已在运行中,跳过启动")
return nil
}
s.isRunning = true
g.Log().Infof(ctx, "启动内容送检服务,配置: batch_size=%d, interval=%ds, image=%v, video=%v",
s.config.BatchSize, s.config.IntervalSeconds, s.config.ImageEnabled, s.config.VideoEnabled)
go s.runScheduler(ctx)
return nil
}
// Stop 停止定时任务
func (s *TencentContentCheckService) Stop(ctx context.Context) {
s.isRunning = false
g.Log().Info(ctx, "停止内容送检服务")
}
// runScheduler 定时调度器
func (s *TencentContentCheckService) runScheduler(ctx context.Context) {
ticker := time.NewTicker(time.Duration(s.config.IntervalSeconds) * time.Second)
defer ticker.Stop()
// 启动时先执行一次
s.processAll(ctx)
for s.isRunning {
select {
case <-ticker.C:
s.processAll(ctx)
case <-ctx.Done():
s.isRunning = false
return
}
}
}
// processAll 处理所有待送检数据
func (s *TencentContentCheckService) processAll(ctx context.Context) {
// 添加系统用户上下文绕过gfdb租户验证
ctx = context.WithValue(ctx, "user", &beans.User{UserName: "system", TenantId: 1})
startTime := time.Now()
g.Log().Info(ctx, "开始处理待送检数据...")
var totalProcessed int
// 处理图片
if s.config.ImageEnabled {
imageCount, _ := dao.TencentImage.CountPending(ctx)
if imageCount > 0 {
count, _ := s.processImages(ctx)
totalProcessed += count
}
}
// 处理视频
if s.config.VideoEnabled {
videoCount, _ := dao.TencentVideo.CountPending(ctx)
if videoCount > 0 {
count, _ := s.processVideos(ctx)
totalProcessed += count
}
}
duration := time.Since(startTime).Milliseconds()
g.Log().Infof(ctx, "处理完成,共处理 %d 条数据,耗时 %dms", totalProcessed, duration)
}
// processImages 处理图片送检
func (s *TencentContentCheckService) processImages(ctx context.Context) (int, error) {
// 获取待送检图片
images, err := dao.TencentImage.GetPendingList(ctx, s.config.BatchSize)
if err != nil {
g.Log().Errorf(ctx, "获取待送检图片失败: %v", err)
return 0, err
}
if len(images) == 0 {
return 0, nil
}
g.Log().Infof(ctx, "开始送检 %d 张图片", len(images))
successCount := 0
failedCount := 0
for _, img := range images {
// 创建送检日志
log := s.createCheckLog(ctx, consts.SourceTableTencentImage, img.Id, img.ImageID, img.PreviewURL)
// 提交送检
err := s.submitImageCheck(ctx, &img, log)
if err != nil {
failedCount++
// 更新日志为失败
if log != nil {
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusFailed, "", err.Error())
}
} else {
successCount++
}
// 避免请求过快
time.Sleep(100 * time.Millisecond)
}
g.Log().Infof(ctx, "图片送检完成,成功: %d失败: %d", successCount, failedCount)
return len(images), nil
}
// processVideos 处理视频送检
func (s *TencentContentCheckService) processVideos(ctx context.Context) (int, error) {
// 获取待送检视频
videos, err := dao.TencentVideo.GetPendingList(ctx, s.config.BatchSize)
if err != nil {
g.Log().Errorf(ctx, "获取待送检视频失败: %v", err)
return 0, err
}
if len(videos) == 0 {
return 0, nil
}
g.Log().Infof(ctx, "开始送检 %d 个视频", len(videos))
successCount := 0
failedCount := 0
for _, video := range videos {
// 创建送检日志
log := s.createCheckLog(ctx, consts.SourceTableTencentVideo, video.Id, video.VideoID, video.PreviewURL)
// 提交送检
err := s.submitVideoCheck(ctx, &video, log)
if err != nil {
failedCount++
// 更新日志为失败
if log != nil {
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusFailed, "", err.Error())
}
} else {
successCount++
}
// 避免请求过快
time.Sleep(100 * time.Millisecond)
}
g.Log().Infof(ctx, "视频送检完成,成功: %d失败: %d", successCount, failedCount)
return len(videos), nil
}
// createCheckLog 创建送检日志
func (s *TencentContentCheckService) createCheckLog(ctx context.Context, sourceTable string, sourceID int64, mediaID string, mediaURL string) *entity.TencentContentCheckLog {
requestParam := map[string]interface{}{
"media_id": mediaID,
"url": mediaURL,
}
requestParamJSON, _ := json.Marshal(requestParam)
log := &entity.TencentContentCheckLog{
SourceTable: sourceTable,
SourceID: sourceID,
RequestURL: "易盾内容安全检测接口",
RequestParam: string(requestParamJSON),
Status: consts.CheckStatusPending,
CheckTime: time.Now().UnixMilli(),
}
id, err := dao.TencentContentCheckLog.Create(ctx, log)
if err != nil {
g.Log().Errorf(ctx, "创建送检日志失败: %v", err)
return nil
}
log.Id = id
g.Log().Debugf(ctx, "创建送检日志成功, id=%d, sourceTable=%s, sourceID=%d", id, sourceTable, sourceID)
return log
}
// submitImageCheck 提交图片送检
func (s *TencentContentCheckService) submitImageCheck(ctx context.Context, image *entity.TencentImage, log *entity.TencentContentCheckLog) error {
startTime := time.Now()
// 更新日志状态为送检中
if log != nil {
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusSubmitting, "", "")
}
// 获取回调地址
callbackURL := g.Cfg().MustGet(ctx, "yidun.image.callback_url").String()
// 调用易盾图片检测
result, err := yidunService.ImageDetection.DetectImage(ctx, image.PreviewURL, image.ImageID, callbackURL)
duration := time.Since(startTime).Milliseconds()
// 更新日志
if log != nil {
if err != nil {
dao.TencentContentCheckLog.UpdateDuration(ctx, log.Id, duration)
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusFailed, "", err.Error())
g.Log().Errorf(ctx, "图片送检失败, id=%d, url=%s, error=%v", image.Id, image.PreviewURL, err)
return err
}
// 更新日志和图片状态
responseData, _ := json.Marshal(result)
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusSuccess, string(responseData), "")
dao.TencentContentCheckLog.UpdateTaskID(ctx, log.Id, result.TaskID)
dao.TencentContentCheckLog.UpdateDuration(ctx, log.Id, duration)
}
g.Log().Infof(ctx, "图片送检成功, id=%d, imageId=%s, taskId=%s", image.Id, image.ImageID, result.TaskID)
return nil
}
// submitVideoCheck 提交视频送检
func (s *TencentContentCheckService) submitVideoCheck(ctx context.Context, video *entity.TencentVideo, log *entity.TencentContentCheckLog) error {
startTime := time.Now()
// 更新日志状态为送检中
if log != nil {
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusSubmitting, "", "")
}
// 获取回调地址
callbackURL := g.Cfg().MustGet(ctx, "yidun.video.callback_url").String()
// 调用易盾视频检测
result, err := yidunService.VideoDetection.DetectVideo(ctx, video.PreviewURL, video.VideoID, callbackURL)
duration := time.Since(startTime).Milliseconds()
// 更新日志
if log != nil {
if err != nil {
dao.TencentContentCheckLog.UpdateDuration(ctx, log.Id, duration)
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusFailed, "", err.Error())
g.Log().Errorf(ctx, "视频送检失败, id=%d, url=%s, error=%v", video.Id, video.PreviewURL, err)
return err
}
// 更新日志和视频状态
responseData, _ := json.Marshal(result)
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusSuccess, string(responseData), "")
dao.TencentContentCheckLog.UpdateTaskID(ctx, log.Id, result.TaskID)
dao.TencentContentCheckLog.UpdateDuration(ctx, log.Id, duration)
}
g.Log().Infof(ctx, "视频送检成功, id=%d, videoId=%s, taskId=%s", video.Id, video.VideoID, result.TaskID)
return nil
}
// SubmitImageByID 根据图片ID手动提交送检
func (s *TencentContentCheckService) SubmitImageByID(ctx context.Context, imageID string) (*yidunService.ImageSubmitResult, error) {
// 根据图片ID获取数据
image, err := dao.TencentImage.GetByImageID(ctx, imageID)
if err != nil {
return nil, fmt.Errorf("查询图片数据失败: %w", err)
}
if image == nil {
return nil, fmt.Errorf("未找到图片数据, imageID=%s", imageID)
}
// 创建送检日志
log := s.createCheckLog(ctx, consts.SourceTableTencentImage, image.Id, image.ImageID, image.PreviewURL)
if log == nil {
return nil, fmt.Errorf("创建送检日志失败")
}
// 提交送检
err = s.submitImageCheck(ctx, image, log)
if err != nil {
return nil, err
}
// 获取送检结果
return dao.TencentContentCheckLog.GetImageSubmitResult(ctx, log.Id)
}
// SubmitVideoByID 根据视频ID手动提交送检
func (s *TencentContentCheckService) SubmitVideoByID(ctx context.Context, videoID string) (*yidunService.VideoSubmitResult, error) {
// 根据视频ID获取数据
video, err := dao.TencentVideo.GetByVideoID(ctx, videoID)
if err != nil {
return nil, fmt.Errorf("查询视频数据失败: %w", err)
}
if video == nil {
return nil, fmt.Errorf("未找到视频数据, videoID=%s", videoID)
}
// 创建送检日志
log := s.createCheckLog(ctx, consts.SourceTableTencentVideo, video.Id, video.VideoID, video.PreviewURL)
if log == nil {
return nil, fmt.Errorf("创建送检日志失败")
}
// 提交送检
err = s.submitVideoCheck(ctx, video, log)
if err != nil {
return nil, err
}
// 获取送检结果
return dao.TencentContentCheckLog.GetVideoSubmitResult(ctx, log.Id)
}
// GetPendingStats 获取待送检统计
func (s *TencentContentCheckService) GetPendingStats(ctx context.Context) map[string]int {
stats := make(map[string]int)
if s.config.ImageEnabled {
count, _ := dao.TencentImage.CountPending(ctx)
stats["image_pending"] = count
}
if s.config.VideoEnabled {
count, _ := dao.TencentVideo.CountPending(ctx)
stats["video_pending"] = count
}
return stats
}
// IsRunning 获取运行状态
func (s *TencentContentCheckService) IsRunning() bool {
return s.isRunning
}
// GetConfig 获取当前配置
func (s *TencentContentCheckService) GetConfig() ContentCheckConfig {
return s.config
}