385 lines
11 KiB
Go
385 lines
11 KiB
Go
|
|
package main
|
|||
|
|
|
|||
|
|
import (
|
|||
|
|
dao "cid/dao/copydata"
|
|||
|
|
taskDto "cid/model/dto/copydata"
|
|||
|
|
"cid/sync"
|
|||
|
|
"context"
|
|||
|
|
"encoding/json"
|
|||
|
|
"fmt"
|
|||
|
|
"strings"
|
|||
|
|
"time"
|
|||
|
|
|
|||
|
|
"gitea.com/red-future/common/beans"
|
|||
|
|
_ "github.com/gogf/gf/contrib/drivers/pgsql/v2"
|
|||
|
|
"github.com/gogf/gf/v2/os/gctx"
|
|||
|
|
"github.com/sirupsen/logrus"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
type CompensationScheduler struct {
|
|||
|
|
syncService *sync.SyncService
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func NewCompensationScheduler() *CompensationScheduler {
|
|||
|
|
return &CompensationScheduler{
|
|||
|
|
syncService: sync.NewSyncService(),
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (s *CompensationScheduler) RunCompensationOnce() {
|
|||
|
|
ctx := gctx.New()
|
|||
|
|
ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin"})
|
|||
|
|
|
|||
|
|
logrus.Info("=== 开始执行数据同步补偿任务 ===")
|
|||
|
|
s.processCompensation(ctx)
|
|||
|
|
logrus.Info("=== 补偿任务执行完毕 ===")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (s *CompensationScheduler) processCompensation(ctx context.Context) {
|
|||
|
|
logrus.Info(">>> 开始检查需要同步补偿的任务...")
|
|||
|
|
|
|||
|
|
queryReq := &taskDto.QueryFailedTasksReq{
|
|||
|
|
Status: []string{"failed", "retrying", "partial_failed"},
|
|||
|
|
MaxRetries: nil,
|
|||
|
|
Limit: 50,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
failedTasks, err := dao.SyncTaskLog.QueryFailedTasks(ctx, queryReq)
|
|||
|
|
if err != nil {
|
|||
|
|
logrus.Errorf("查询失败任务异常:%v", err)
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if len(failedTasks) == 0 {
|
|||
|
|
logrus.Info("✓ 没有需要补偿的任务")
|
|||
|
|
return
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logrus.Infof("发现 %d 个需要补偿的任务", len(failedTasks))
|
|||
|
|
|
|||
|
|
successCount := 0
|
|||
|
|
failCount := 0
|
|||
|
|
partialCount := 0
|
|||
|
|
|
|||
|
|
for _, task := range failedTasks {
|
|||
|
|
if task.RetryCount >= task.MaxRetry {
|
|||
|
|
logrus.Warnf("任务 %s 已达到最大重试次数 %d,标记为需人工处理", task.TaskID, task.MaxRetry)
|
|||
|
|
|
|||
|
|
updateReq := &taskDto.UpdateSyncTaskLogReq{
|
|||
|
|
ID: task.ID,
|
|||
|
|
Status: "manual_review",
|
|||
|
|
ErrorMessage: fmt.Sprintf("已达到最大重试次数 %d 次", task.MaxRetry),
|
|||
|
|
ErrorCode: "MAX_RETRY_EXCEEDED",
|
|||
|
|
}
|
|||
|
|
dao.SyncTaskLog.Update(ctx, updateReq)
|
|||
|
|
|
|||
|
|
s.sendAlert(task)
|
|||
|
|
failCount++
|
|||
|
|
continue
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logrus.Infof(">>> 开始补偿任务:%s (类型=%s, 第 %d/%d 次重试)",
|
|||
|
|
task.TaskID, task.TaskType, task.RetryCount+1, task.MaxRetry)
|
|||
|
|
|
|||
|
|
if s.compensateTask(ctx, task) {
|
|||
|
|
successCount++
|
|||
|
|
} else {
|
|||
|
|
failCount++
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
time.Sleep(1 * time.Second)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logrus.Infof("✓ 补偿任务完成:成功=%d, 部分成功=%d, 失败=%d",
|
|||
|
|
successCount, partialCount, failCount)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (s *CompensationScheduler) compensateTask(ctx context.Context, task *taskDto.SyncTaskLogItem) bool {
|
|||
|
|
retryCount := task.RetryCount + 1
|
|||
|
|
|
|||
|
|
updateReq := &taskDto.UpdateSyncTaskLogReq{
|
|||
|
|
ID: task.ID,
|
|||
|
|
Status: "retrying",
|
|||
|
|
RetryCount: &retryCount,
|
|||
|
|
}
|
|||
|
|
dao.SyncTaskLog.Update(ctx, updateReq)
|
|||
|
|
|
|||
|
|
startTime := s.parseTime(task.StartTime)
|
|||
|
|
endTime := s.parseTime(task.EndTime)
|
|||
|
|
|
|||
|
|
logrus.Infof(">>> 开始补偿任务: %s (advertiser=%d, time=[%s, %s], 第 %d/%d 次重试)",
|
|||
|
|
task.TaskID, task.AdvertiserID,
|
|||
|
|
startTime.Format("2006-01-02"), endTime.Format("2006-01-02"),
|
|||
|
|
retryCount, task.MaxRetry)
|
|||
|
|
|
|||
|
|
if task.TaskType == "account_report_page" {
|
|||
|
|
return s.compensatePageTask(ctx, task, retryCount)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if task.TaskType == "account_report" && task.Status == "partial_failed" {
|
|||
|
|
return s.compensatePartialFailedTask(ctx, task, startTime, endTime, retryCount)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return s.compensateMainTask(ctx, task, startTime, endTime, retryCount)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (s *CompensationScheduler) compensatePartialFailedTask(ctx context.Context, task *taskDto.SyncTaskLogItem, startTime, endTime time.Time, retryCount int) bool {
|
|||
|
|
logrus.Infof(">>> 检测到部分失败任务 %s,开始智能补偿(只重试失败的页)", task.TaskID)
|
|||
|
|
|
|||
|
|
failedPages := s.extractFailedPages(task)
|
|||
|
|
if len(failedPages) == 0 {
|
|||
|
|
logrus.Warnf("任务 %s 标记为部分失败,但未找到失败的页信息,将重新同步所有页", task.TaskID)
|
|||
|
|
return s.compensateMainTask(ctx, task, startTime, endTime, retryCount)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logrus.Infof("任务 %s 共有 %d 个失败的页需要补偿: %v", task.TaskID, len(failedPages), failedPages)
|
|||
|
|
|
|||
|
|
allSuccess := true
|
|||
|
|
compensatedPages := 0
|
|||
|
|
|
|||
|
|
for _, pageNumber := range failedPages {
|
|||
|
|
logrus.Infof(">>> 开始补偿第 %d 页...", pageNumber)
|
|||
|
|
|
|||
|
|
pageSuccess := s.compensateSinglePage(ctx, task, startTime, endTime, pageNumber, retryCount)
|
|||
|
|
if pageSuccess {
|
|||
|
|
compensatedPages++
|
|||
|
|
} else {
|
|||
|
|
allSuccess = false
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
time.Sleep(500 * time.Millisecond)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if allSuccess {
|
|||
|
|
logrus.Infof("✓ 部分失败任务 %s 补偿成功 - 共补偿 %d 个页", task.TaskID, compensatedPages)
|
|||
|
|
|
|||
|
|
updateReq := &taskDto.UpdateSyncTaskLogReq{
|
|||
|
|
ID: task.ID,
|
|||
|
|
Status: "success",
|
|||
|
|
}
|
|||
|
|
dao.SyncTaskLog.Update(ctx, updateReq)
|
|||
|
|
return true
|
|||
|
|
} else {
|
|||
|
|
logrus.Warnf("⚠ 部分失败任务 %s 补偿后仍有失败的页 - 成功补偿 %d/%d 个页",
|
|||
|
|
task.TaskID, compensatedPages, len(failedPages))
|
|||
|
|
|
|||
|
|
backoffMinutes := s.calculateBackoff(retryCount)
|
|||
|
|
nextRetry := time.Now().Add(time.Duration(backoffMinutes) * time.Minute)
|
|||
|
|
|
|||
|
|
updateReq := &taskDto.UpdateSyncTaskLogReq{
|
|||
|
|
ID: task.ID,
|
|||
|
|
Status: "partial_failed",
|
|||
|
|
NextRetryTime: nextRetry,
|
|||
|
|
}
|
|||
|
|
dao.SyncTaskLog.Update(ctx, updateReq)
|
|||
|
|
return false
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (s *CompensationScheduler) compensateSinglePage(ctx context.Context, task *taskDto.SyncTaskLogItem, startTime, endTime time.Time, pageNumber int, retryCount int) bool {
|
|||
|
|
req := &sync.AccountReportRequest{
|
|||
|
|
AdvertiserID: task.AdvertiserID,
|
|||
|
|
StartTime: startTime.UnixMilli(),
|
|||
|
|
EndTime: endTime.UnixMilli(),
|
|||
|
|
SelectColumns: []string{"impression", "click", "cost", "t0GMV"},
|
|||
|
|
GroupType: 1,
|
|||
|
|
QueryVersion: 1,
|
|||
|
|
PageInfo: &sync.PageInfo{
|
|||
|
|
CurrentPage: pageNumber,
|
|||
|
|
PageSize: 100,
|
|||
|
|
},
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
maxRetries := 3
|
|||
|
|
pageTaskID := fmt.Sprintf("%s_page_%d", task.TaskID, pageNumber)
|
|||
|
|
result, err := s.syncService.SyncSinglePageWithTask(ctx, req, true, maxRetries, pageTaskID, pageNumber)
|
|||
|
|
|
|||
|
|
if err != nil {
|
|||
|
|
logrus.Errorf("补偿第 %d 页失败:%v", pageNumber, err)
|
|||
|
|
return false
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logrus.Infof("✓ 补偿第 %d 页成功 - 记录数=%d", pageNumber, result.DetailCount)
|
|||
|
|
return true
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (s *CompensationScheduler) extractFailedPages(task *taskDto.SyncTaskLogItem) []int {
|
|||
|
|
if task.ResultSummary == nil {
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
summaryMap, ok := task.ResultSummary.(map[string]interface{})
|
|||
|
|
if !ok {
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pageResultsRaw, exists := summaryMap["page_results"]
|
|||
|
|
if !exists {
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
pageResultsJSON, err := json.Marshal(pageResultsRaw)
|
|||
|
|
if err != nil {
|
|||
|
|
logrus.Errorf("序列化 page_results 失败:%v", err)
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
var pageResults []map[string]interface{}
|
|||
|
|
if err := json.Unmarshal(pageResultsJSON, &pageResults); err != nil {
|
|||
|
|
logrus.Errorf("反序列化 page_results 失败:%v", err)
|
|||
|
|
return nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
failedPages := make([]int, 0)
|
|||
|
|
for _, pageResult := range pageResults {
|
|||
|
|
success, _ := pageResult["success"].(bool)
|
|||
|
|
pageNumberFloat, _ := pageResult["page_number"].(float64)
|
|||
|
|
pageNumber := int(pageNumberFloat)
|
|||
|
|
|
|||
|
|
if !success && pageNumber > 0 {
|
|||
|
|
failedPages = append(failedPages, pageNumber)
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return failedPages
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (s *CompensationScheduler) compensateMainTask(ctx context.Context, task *taskDto.SyncTaskLogItem, startTime, endTime time.Time, retryCount int) bool {
|
|||
|
|
req := &sync.AccountReportRequest{
|
|||
|
|
AdvertiserID: task.AdvertiserID,
|
|||
|
|
StartTime: startTime.UnixMilli(),
|
|||
|
|
EndTime: endTime.UnixMilli(),
|
|||
|
|
SelectColumns: []string{"impression", "click", "cost", "t0GMV"},
|
|||
|
|
GroupType: 1,
|
|||
|
|
QueryVersion: 1,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
maxRetries := 3
|
|||
|
|
result, err := s.syncService.SyncAccountReportWithPagination(ctx, req, true, maxRetries)
|
|||
|
|
|
|||
|
|
if err != nil {
|
|||
|
|
logrus.Errorf("补偿主任务 %s 失败(第 %d 次):%v", task.TaskID, retryCount, err)
|
|||
|
|
|
|||
|
|
backoffMinutes := s.calculateBackoff(retryCount)
|
|||
|
|
nextRetry := time.Now().Add(time.Duration(backoffMinutes) * time.Minute)
|
|||
|
|
|
|||
|
|
updateReq := &taskDto.UpdateSyncTaskLogReq{
|
|||
|
|
ID: task.ID,
|
|||
|
|
Status: "failed",
|
|||
|
|
ErrorMessage: err.Error(),
|
|||
|
|
ErrorCode: "COMPENSATION_FAILED",
|
|||
|
|
NextRetryTime: nextRetry,
|
|||
|
|
}
|
|||
|
|
dao.SyncTaskLog.Update(ctx, updateReq)
|
|||
|
|
|
|||
|
|
return false
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logrus.Infof("✓ 补偿主任务 %s 成功 - 汇总ID=%d, 明细成功=%d, 失败=%d, 页数=%d",
|
|||
|
|
task.TaskID, result.SumID, result.DetailSuccessCount, result.DetailFailCount, len(result.PageResults))
|
|||
|
|
return true
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (s *CompensationScheduler) compensatePageTask(ctx context.Context, task *taskDto.SyncTaskLogItem, retryCount int) bool {
|
|||
|
|
logrus.Infof(">>> 补偿分页任务: %s (重试第 %d 次)", task.TaskID, retryCount)
|
|||
|
|
|
|||
|
|
parentTaskID := s.extractParentTaskID(task.TaskID)
|
|||
|
|
pageNumber := s.extractPageNumber(task.TaskID)
|
|||
|
|
|
|||
|
|
if parentTaskID == "" || pageNumber == 0 {
|
|||
|
|
logrus.Errorf("无法解析分页任务ID: %s", task.TaskID)
|
|||
|
|
return false
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
startTime := s.parseTime(task.StartTime)
|
|||
|
|
endTime := s.parseTime(task.EndTime)
|
|||
|
|
|
|||
|
|
req := &sync.AccountReportRequest{
|
|||
|
|
AdvertiserID: task.AdvertiserID,
|
|||
|
|
StartTime: startTime.UnixMilli(),
|
|||
|
|
EndTime: endTime.UnixMilli(),
|
|||
|
|
SelectColumns: []string{"impression", "click", "cost", "t0GMV"},
|
|||
|
|
GroupType: 1,
|
|||
|
|
QueryVersion: 1,
|
|||
|
|
PageInfo: &sync.PageInfo{
|
|||
|
|
CurrentPage: pageNumber,
|
|||
|
|
PageSize: 100,
|
|||
|
|
},
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
maxRetries := 3
|
|||
|
|
pageTaskID := fmt.Sprintf("%s_page_%d", parentTaskID, pageNumber)
|
|||
|
|
result, err := s.syncService.SyncSinglePageWithTask(ctx, req, true, maxRetries, pageTaskID, pageNumber)
|
|||
|
|
|
|||
|
|
if err != nil {
|
|||
|
|
logrus.Errorf("补偿分页任务 %s 失败(第 %d 次):%v", task.TaskID, retryCount, err)
|
|||
|
|
|
|||
|
|
backoffMinutes := s.calculateBackoff(retryCount)
|
|||
|
|
nextRetry := time.Now().Add(time.Duration(backoffMinutes) * time.Minute)
|
|||
|
|
|
|||
|
|
updateReq := &taskDto.UpdateSyncTaskLogReq{
|
|||
|
|
ID: task.ID,
|
|||
|
|
Status: "failed",
|
|||
|
|
ErrorMessage: err.Error(),
|
|||
|
|
ErrorCode: "PAGE_COMPENSATION_FAILED",
|
|||
|
|
NextRetryTime: nextRetry,
|
|||
|
|
}
|
|||
|
|
dao.SyncTaskLog.Update(ctx, updateReq)
|
|||
|
|
|
|||
|
|
return false
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logrus.Infof("✓ 补偿分页任务 %s 成功 - 记录数=%d", task.TaskID, result.DetailCount)
|
|||
|
|
return true
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (s *CompensationScheduler) extractParentTaskID(taskID string) string {
|
|||
|
|
if idx := strings.LastIndex(taskID, "_page_"); idx > 0 {
|
|||
|
|
return taskID[:idx]
|
|||
|
|
}
|
|||
|
|
return ""
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (s *CompensationScheduler) extractPageNumber(taskID string) int {
|
|||
|
|
if idx := strings.LastIndex(taskID, "_page_"); idx > 0 {
|
|||
|
|
var pageNum int
|
|||
|
|
fmt.Sscanf(taskID[idx+6:], "%d", &pageNum)
|
|||
|
|
return pageNum
|
|||
|
|
}
|
|||
|
|
return 0
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (s *CompensationScheduler) calculateBackoff(retryCount int) int {
|
|||
|
|
backoffs := []int{5, 15, 30, 60, 120}
|
|||
|
|
if retryCount <= len(backoffs) {
|
|||
|
|
return backoffs[retryCount-1]
|
|||
|
|
}
|
|||
|
|
return backoffs[len(backoffs)-1]
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (s *CompensationScheduler) parseTime(t interface{}) time.Time {
|
|||
|
|
switch v := t.(type) {
|
|||
|
|
case time.Time:
|
|||
|
|
return v
|
|||
|
|
case string:
|
|||
|
|
if parsed, err := time.Parse("2006-01-02 15:04:05", v); err == nil {
|
|||
|
|
return parsed
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
return time.Now()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (s *CompensationScheduler) sendAlert(task *taskDto.SyncTaskLogItem) {
|
|||
|
|
logrus.Errorf("【告警】任务 %s 需要人工介入:广告主=%d, 类型=%s, 错误=%s",
|
|||
|
|
task.TaskID, task.AdvertiserID, task.TaskType, task.ErrorMessage)
|
|||
|
|
|
|||
|
|
// TODO: 集成钉钉/企业微信/邮件告警
|
|||
|
|
// s.sendDingTalkAlert(task)
|
|||
|
|
// s.sendEmailAlert(task)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func main() {
|
|||
|
|
scheduler := NewCompensationScheduler()
|
|||
|
|
scheduler.RunCompensationOnce()
|
|||
|
|
}
|