package dataengine import ( consts "cid/consts/dataengine" dao "cid/dao/dataengine" entity "cid/model/entity/dataengine" yidunService "cid/service/yidun" "context" "encoding/json" "fmt" "time" "gitea.redpowerfuture.com/red-future/common/beans" "github.com/gogf/gf/v2/frame/g" ) // ContentCheckConfig 送检配置 type ContentCheckConfig struct { // 每批处理数量 BatchSize int `json:"batch_size"` // 图片检测启用 ImageEnabled bool `json:"image_enabled"` // 视频检测启用 VideoEnabled bool `json:"video_enabled"` // 定时任务间隔(秒) IntervalSeconds int `json:"interval_seconds"` } // DefaultConfig 默认配置 var DefaultConfig = ContentCheckConfig{ BatchSize: 10, ImageEnabled: true, VideoEnabled: true, IntervalSeconds: 30, } // TencentContentCheckService 腾讯内容送检服务 type TencentContentCheckService struct { config ContentCheckConfig isRunning bool } // TencentContentCheck 送检服务单例 var TencentContentCheck = &TencentContentCheckService{ config: DefaultConfig, } // SetConfig 设置配置 func (s *TencentContentCheckService) SetConfig(config ContentCheckConfig) { s.config = config } // Start 启动定时任务 func (s *TencentContentCheckService) Start(ctx context.Context) error { if s.isRunning { g.Log().Info(ctx, "送检服务已在运行中,跳过启动") return nil } s.isRunning = true g.Log().Infof(ctx, "启动内容送检服务,配置: batch_size=%d, interval=%ds, image=%v, video=%v", s.config.BatchSize, s.config.IntervalSeconds, s.config.ImageEnabled, s.config.VideoEnabled) go s.runScheduler(ctx) return nil } // Stop 停止定时任务 func (s *TencentContentCheckService) Stop(ctx context.Context) { s.isRunning = false g.Log().Info(ctx, "停止内容送检服务") } // runScheduler 定时调度器 func (s *TencentContentCheckService) runScheduler(ctx context.Context) { ticker := time.NewTicker(time.Duration(s.config.IntervalSeconds) * time.Second) defer ticker.Stop() // 启动时先执行一次 s.processAll(ctx) for s.isRunning { select { case <-ticker.C: s.processAll(ctx) case <-ctx.Done(): s.isRunning = false return } } } // processAll 处理所有待送检数据 func (s *TencentContentCheckService) processAll(ctx context.Context) { // 添加系统用户上下文,绕过gfdb租户验证 ctx = context.WithValue(ctx, "user", &beans.User{UserName: "system", TenantId: 1}) startTime := time.Now() g.Log().Info(ctx, "开始处理待送检数据...") var totalProcessed int // 处理图片 if s.config.ImageEnabled { imageCount, _ := dao.TencentImage.CountPending(ctx) if imageCount > 0 { count, _ := s.processImages(ctx) totalProcessed += count } } // 处理视频 if s.config.VideoEnabled { videoCount, _ := dao.TencentVideo.CountPending(ctx) if videoCount > 0 { count, _ := s.processVideos(ctx) totalProcessed += count } } duration := time.Since(startTime).Milliseconds() g.Log().Infof(ctx, "处理完成,共处理 %d 条数据,耗时 %dms", totalProcessed, duration) } // processImages 处理图片送检 func (s *TencentContentCheckService) processImages(ctx context.Context) (int, error) { // 获取待送检图片 images, err := dao.TencentImage.GetPendingList(ctx, s.config.BatchSize) if err != nil { g.Log().Errorf(ctx, "获取待送检图片失败: %v", err) return 0, err } if len(images) == 0 { return 0, nil } g.Log().Infof(ctx, "开始送检 %d 张图片", len(images)) successCount := 0 failedCount := 0 for _, img := range images { // 创建送检日志 log := s.createCheckLog(ctx, consts.SourceTableTencentImage, img.Id, img.ImageID, img.PreviewURL) // 提交送检 err := s.submitImageCheck(ctx, &img, log) if err != nil { failedCount++ // 更新日志为失败 if log != nil { dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusFailed, "", err.Error()) } } else { successCount++ } // 避免请求过快 time.Sleep(100 * time.Millisecond) } g.Log().Infof(ctx, "图片送检完成,成功: %d,失败: %d", successCount, failedCount) return len(images), nil } // processVideos 处理视频送检 func (s *TencentContentCheckService) processVideos(ctx context.Context) (int, error) { // 获取待送检视频 videos, err := dao.TencentVideo.GetPendingList(ctx, s.config.BatchSize) if err != nil { g.Log().Errorf(ctx, "获取待送检视频失败: %v", err) return 0, err } if len(videos) == 0 { return 0, nil } g.Log().Infof(ctx, "开始送检 %d 个视频", len(videos)) successCount := 0 failedCount := 0 for _, video := range videos { // 创建送检日志 log := s.createCheckLog(ctx, consts.SourceTableTencentVideo, video.Id, video.VideoID, video.PreviewURL) // 提交送检 err := s.submitVideoCheck(ctx, &video, log) if err != nil { failedCount++ // 更新日志为失败 if log != nil { dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusFailed, "", err.Error()) } } else { successCount++ } // 避免请求过快 time.Sleep(100 * time.Millisecond) } g.Log().Infof(ctx, "视频送检完成,成功: %d,失败: %d", successCount, failedCount) return len(videos), nil } // createCheckLog 创建送检日志 func (s *TencentContentCheckService) createCheckLog(ctx context.Context, sourceTable string, sourceID int64, mediaID string, mediaURL string) *entity.TencentContentCheckLog { requestParam := map[string]interface{}{ "media_id": mediaID, "url": mediaURL, } requestParamJSON, _ := json.Marshal(requestParam) log := &entity.TencentContentCheckLog{ SourceTable: sourceTable, SourceID: sourceID, RequestURL: "易盾内容安全检测接口", RequestParam: string(requestParamJSON), Status: consts.CheckStatusPending, CheckTime: time.Now().UnixMilli(), } id, err := dao.TencentContentCheckLog.Create(ctx, log) if err != nil { g.Log().Errorf(ctx, "创建送检日志失败: %v", err) return nil } log.Id = id g.Log().Debugf(ctx, "创建送检日志成功, id=%d, sourceTable=%s, sourceID=%d", id, sourceTable, sourceID) return log } // submitImageCheck 提交图片送检 func (s *TencentContentCheckService) submitImageCheck(ctx context.Context, image *entity.TencentImage, log *entity.TencentContentCheckLog) error { startTime := time.Now() // 更新日志状态为送检中 if log != nil { dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusSubmitting, "", "") } // 获取回调地址 callbackURL := g.Cfg().MustGet(ctx, "yidun.image.callback_url").String() // 调用易盾图片检测 result, err := yidunService.ImageDetection.DetectImage(ctx, image.PreviewURL, image.ImageID, callbackURL) duration := time.Since(startTime).Milliseconds() // 更新日志 if log != nil { if err != nil { dao.TencentContentCheckLog.UpdateDuration(ctx, log.Id, duration) dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusFailed, "", err.Error()) g.Log().Errorf(ctx, "图片送检失败, id=%d, url=%s, error=%v", image.Id, image.PreviewURL, err) return err } // 更新日志和图片状态 responseData, _ := json.Marshal(result) dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusSuccess, string(responseData), "") dao.TencentContentCheckLog.UpdateTaskID(ctx, log.Id, result.TaskID) dao.TencentContentCheckLog.UpdateDuration(ctx, log.Id, duration) } g.Log().Infof(ctx, "图片送检成功, id=%d, imageId=%s, taskId=%s", image.Id, image.ImageID, result.TaskID) return nil } // submitVideoCheck 提交视频送检 func (s *TencentContentCheckService) submitVideoCheck(ctx context.Context, video *entity.TencentVideo, log *entity.TencentContentCheckLog) error { startTime := time.Now() // 更新日志状态为送检中 if log != nil { dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusSubmitting, "", "") } // 获取回调地址 callbackURL := g.Cfg().MustGet(ctx, "yidun.video.callback_url").String() // 调用易盾视频检测 result, err := yidunService.VideoDetection.DetectVideo(ctx, video.PreviewURL, video.VideoID, callbackURL) duration := time.Since(startTime).Milliseconds() // 更新日志 if log != nil { if err != nil { dao.TencentContentCheckLog.UpdateDuration(ctx, log.Id, duration) dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusFailed, "", err.Error()) g.Log().Errorf(ctx, "视频送检失败, id=%d, url=%s, error=%v", video.Id, video.PreviewURL, err) return err } // 更新日志和视频状态 responseData, _ := json.Marshal(result) dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusSuccess, string(responseData), "") dao.TencentContentCheckLog.UpdateTaskID(ctx, log.Id, result.TaskID) dao.TencentContentCheckLog.UpdateDuration(ctx, log.Id, duration) } g.Log().Infof(ctx, "视频送检成功, id=%d, videoId=%s, taskId=%s", video.Id, video.VideoID, result.TaskID) return nil } // SubmitImageByID 根据图片ID手动提交送检 func (s *TencentContentCheckService) SubmitImageByID(ctx context.Context, imageID string) (*yidunService.ImageSubmitResult, error) { // 根据图片ID获取数据 image, err := dao.TencentImage.GetByImageID(ctx, imageID) if err != nil { return nil, fmt.Errorf("查询图片数据失败: %w", err) } if image == nil { return nil, fmt.Errorf("未找到图片数据, imageID=%s", imageID) } // 创建送检日志 log := s.createCheckLog(ctx, consts.SourceTableTencentImage, image.Id, image.ImageID, image.PreviewURL) if log == nil { return nil, fmt.Errorf("创建送检日志失败") } // 提交送检 err = s.submitImageCheck(ctx, image, log) if err != nil { return nil, err } // 获取送检结果 return dao.TencentContentCheckLog.GetImageSubmitResult(ctx, log.Id) } // SubmitVideoByID 根据视频ID手动提交送检 func (s *TencentContentCheckService) SubmitVideoByID(ctx context.Context, videoID string) (*yidunService.VideoSubmitResult, error) { // 根据视频ID获取数据 video, err := dao.TencentVideo.GetByVideoID(ctx, videoID) if err != nil { return nil, fmt.Errorf("查询视频数据失败: %w", err) } if video == nil { return nil, fmt.Errorf("未找到视频数据, videoID=%s", videoID) } // 创建送检日志 log := s.createCheckLog(ctx, consts.SourceTableTencentVideo, video.Id, video.VideoID, video.PreviewURL) if log == nil { return nil, fmt.Errorf("创建送检日志失败") } // 提交送检 err = s.submitVideoCheck(ctx, video, log) if err != nil { return nil, err } // 获取送检结果 return dao.TencentContentCheckLog.GetVideoSubmitResult(ctx, log.Id) } // GetPendingStats 获取待送检统计 func (s *TencentContentCheckService) GetPendingStats(ctx context.Context) map[string]int { stats := make(map[string]int) if s.config.ImageEnabled { count, _ := dao.TencentImage.CountPending(ctx) stats["image_pending"] = count } if s.config.VideoEnabled { count, _ := dao.TencentVideo.CountPending(ctx) stats["video_pending"] = count } return stats } // IsRunning 获取运行状态 func (s *TencentContentCheckService) IsRunning() bool { return s.isRunning } // GetConfig 获取当前配置 func (s *TencentContentCheckService) GetConfig() ContentCheckConfig { return s.config }