From 5d72dd3aff97b362ffb478b4e2f7dfe7ac81b52d Mon Sep 17 00:00:00 2001 From: lmk <1095689763@qq.com> Date: Wed, 8 Apr 2026 17:26:00 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8A=BD=E5=8F=96=E6=95=B0=E6=8D=AE=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E5=8D=8F=E7=A8=8B=E9=80=BB=E8=BE=91=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dao/copydata/api_account_report_sum_dao.go | 2 + scheduler/run_account_report_task.go | 41 +++-- sync/mock_generator.go | 10 +- sync/quick_sync.go | 4 +- sync/sync_service.go | 185 +++++++++++++++------ 5 files changed, 172 insertions(+), 70 deletions(-) diff --git a/dao/copydata/api_account_report_sum_dao.go b/dao/copydata/api_account_report_sum_dao.go index da336f5..8a013a9 100644 --- a/dao/copydata/api_account_report_sum_dao.go +++ b/dao/copydata/api_account_report_sum_dao.go @@ -71,6 +71,8 @@ func (d *CidAccountReportSumDao) BatchInsert(ctx context.Context, reqs []*dto.Ci OnConflict( "report_date_str", "page_number", + "campaign_id", + "creative_id", ). Save() diff --git a/scheduler/run_account_report_task.go b/scheduler/run_account_report_task.go index 8351a1b..e3fa2e8 100644 --- a/scheduler/run_account_report_task.go +++ b/scheduler/run_account_report_task.go @@ -49,39 +49,54 @@ func main() { result, err := syncService.SyncAccountReportConcurrent(ctx, req, config) if err != nil { - logrus.Errorf("定时同步任务执行完成,存在失败的页面") + logrus.Errorf("定时同步任务执行失败:%v", err) logrus.Infof("主任务日志ID:%d", result.TaskLogID) - logrus.Infof("汇总数据:成功=%v, ID=%d", result.SumSuccess, result.SumID) - logrus.Infof("明细数据:总记录数=%d, 成功页数=%d, 失败页数=%d", - result.DetailCount, result.DetailSuccessCount, result.DetailFailCount) - logrus.Infof("失败的页面已记录到 sync_task_log 表,等待补偿调度器处理") + return + } + hasFailedPages := result.DetailFailCount > 0 + if hasFailedPages { + logrus.Warnf("⚠ 定时同步任务完成,但存在失败的页面") + } else { + logrus.Infof("✓ 定时同步任务全部成功") + } + + logrus.Infof("主任务日志ID:%d", result.TaskLogID) + logrus.Infof("汇总数据:已保存=%v, ID=%d", result.SumSuccess, result.SumID) + logrus.Infof("明细统计:总记录数=%d, 成功页数=%d, 失败页数=%d", + result.DetailCount, result.DetailSuccessCount, result.DetailFailCount) + + if hasFailedPages { + logrus.Warnf("失败的页面已记录到 sync_task_log 表,等待补偿调度器处理") for _, pageResult := range result.PageResults { if !pageResult.Success { logrus.Warnf(" - 第 %d 页失败,任务日志ID=%d,错误:%s", pageResult.PageNumber, pageResult.PageTaskLogID, pageResult.ErrorMessage) } } - return } - fmt.Printf("✓ 定时同步完成:\n") - fmt.Printf(" 时间区间:%s ~ %s\n", lastHourStart.Format("2006-01-02 15:04:05"), lastHourEnd.Format("2006-01-02 15:04:05")) - fmt.Printf(" 汇总数据:成功=%v, ID=%d\n", result.SumSuccess, result.SumID) - fmt.Printf(" 明细数据:总记录数=%d, 成功页数=%d, 失败页数=%d\n", + fmt.Printf("\n=== 同步结果汇总 ===\n") + fmt.Printf("时间区间:%s ~ %s\n", lastHourStart.Format("2006-01-02 15:04:05"), lastHourEnd.Format("2006-01-02 15:04:05")) + fmt.Printf("汇总数据:已保存=%v, ID=%d\n", result.SumSuccess, result.SumID) + fmt.Printf("明细统计:总记录数=%d, 成功页数=%d, 失败页数=%d\n", result.DetailCount, result.DetailSuccessCount, result.DetailFailCount) - fmt.Printf(" 主任务日志ID:%d\n", result.TaskLogID) + fmt.Printf("主任务日志ID:%d\n", result.TaskLogID) if len(result.PageResults) > 0 { - fmt.Printf(" 分页任务详情:\n") + fmt.Printf("\n分页任务详情:\n") for _, pageResult := range result.PageResults { status := "✓ 成功" if !pageResult.Success { status = fmt.Sprintf("✗ 失败: %s", pageResult.ErrorMessage) } - fmt.Printf(" - 第 %d 页:任务ID=%d, 记录数=%d, 耗时=%dms, 状态=%s\n", + fmt.Printf(" - 第 %d 页:任务ID=%d, 记录数=%d, 耗时=%dms, 状态=%s\n", pageResult.PageNumber, pageResult.PageTaskLogID, pageResult.RecordCount, pageResult.DurationMs, status) } } + + if hasFailedPages { + fmt.Printf("\n⚠ 警告:存在 %d 个失败的页面,请检查日志并触发补偿任务\n", result.DetailFailCount) + } } diff --git a/sync/mock_generator.go b/sync/mock_generator.go index 88718b6..e3cea5d 100644 --- a/sync/mock_generator.go +++ b/sync/mock_generator.go @@ -281,11 +281,11 @@ func (m *MockDataGenerator) generateSumData() *AccountReportSum { ItemEntranceClkCnt: m.randomIntPtr(50, 500), ShowCnt: m.randomIntPtr(1000, 10000), ReportDateStr: time.Now().Format("2006-01-02"), - CampaignId: m.randomIntPtr(1, 100), + CampaignId: int64Ptr(23), CampaignName: "测试计划", - UnitId: m.randomIntPtr(1, 50), + UnitId: int64Ptr(10), UnitName: "测试单元", - CreativeId: m.randomIntPtr(1, 20), + CreativeId: int64Ptr(13), CreativeName: "测试创意", CidActualRoiAfterSubsidy: m.randomFloatPtr(1.0, 3.0), CidCouponAmount: m.randomIntPtr(100, 1000), @@ -294,6 +294,10 @@ func (m *MockDataGenerator) generateSumData() *AccountReportSum { } } +func int64Ptr(v int64) *int64 { + return &v +} + func (m *MockDataGenerator) generateDetailData(count int) []*AccountReportItem { items := make([]*AccountReportItem, count) for i := 0; i < count; i++ { diff --git a/sync/quick_sync.go b/sync/quick_sync.go index 62730a2..1f3d786 100644 --- a/sync/quick_sync.go +++ b/sync/quick_sync.go @@ -26,7 +26,7 @@ func SyncAccountReportWithMock(ctx context.Context) error { }, } - result, err := syncService.SyncAccountReport(ctx, req, true) + result, err := syncService.SyncAccountReport(ctx, req, true, true) if err != nil { logrus.Errorf("同步失败:%v", err) return err @@ -39,7 +39,7 @@ func SyncAccountReportWithMock(ctx context.Context) error { func SyncAccountReportWithRealAPI(ctx context.Context, req *AccountReportRequest) error { syncService := NewSyncService() - result, err := syncService.SyncAccountReport(ctx, req, false) + result, err := syncService.SyncAccountReport(ctx, req, false, true) if err != nil { logrus.Errorf("同步失败:%v", err) return err diff --git a/sync/sync_service.go b/sync/sync_service.go index 2ec580d..c850ec2 100644 --- a/sync/sync_service.go +++ b/sync/sync_service.go @@ -53,7 +53,7 @@ type PageSyncResult struct { RetryCount int `json:"retry_count"` } -func (s *SyncService) SyncAccountReport(ctx context.Context, req *AccountReportRequest, useMock bool) (*SyncResult, error) { +func (s *SyncService) SyncAccountReport(ctx context.Context, req *AccountReportRequest, useMock bool, saveSum bool) (*SyncResult, error) { result := &SyncResult{} var responseData *AccountReportResponse @@ -81,7 +81,7 @@ func (s *SyncService) SyncAccountReport(ctx context.Context, req *AccountReportR } } - if responseData.Data.Sum != nil { + if saveSum && responseData.Data.Sum != nil { sumItem := s.converter.ConvertToSumItem(responseData.Data.Sum, "account_report", req.PageInfo.CurrentPage) sumResult, saveErr := s.saveSumData(ctx, sumItem) if saveErr != nil { @@ -202,9 +202,10 @@ func (s *SyncService) SyncAccountReportWithPagination(ctx context.Context, req * RetryCount: 0, } - result, err := s.syncSinglePageWithTask(ctx, req, useMock, maxRetries, pageTaskID, currentPage) + result, pageLogID, err := s.syncSinglePageWithTask(ctx, req, useMock, maxRetries, pageTaskID, currentPage) pageDuration := time.Since(pageStartTime).Milliseconds() pageResult.DurationMs = pageDuration + pageResult.PageTaskLogID = pageLogID if err != nil { logrus.Errorf("第 %d 页同步失败:%v", currentPage, err) @@ -310,11 +311,7 @@ func (s *SyncService) SyncAccountReportWithPagination(ctx context.Context, req * return aggregatedResult, aggregatedResult.Error } -func (s *SyncService) SyncSinglePageWithTask(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int, pageTaskID string, pageNumber int) (*SyncResult, error) { - return s.syncSinglePageWithTask(ctx, req, useMock, maxRetries, pageTaskID, pageNumber) -} - -func (s *SyncService) syncSinglePageWithTask(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int, pageTaskID string, pageNumber int) (*SyncResult, error) { +func (s *SyncService) syncSinglePageWithTask(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int, pageTaskID string, pageNumber int) (*SyncResult, int64, error) { pageStartTime := time.Now() pageLogReq := &taskDto.CreateSyncTaskLogReq{ @@ -354,7 +351,7 @@ func (s *SyncService) syncSinglePageWithTask(ctx context.Context, req *AccountRe updateReq.RetryCount = &retryCount } - if status == "success" || status == "failed" { + if status == "success" || status == "failed" || status == "partial_failed" { completedAt := time.Now() updateReq.CompletedAt = completedAt } @@ -368,60 +365,106 @@ func (s *SyncService) syncSinglePageWithTask(ctx context.Context, req *AccountRe logrus.Infof(">>> 开始同步第 %d 页数据...", pageNumber) - result, err := s.SyncWithRetry(ctx, req, useMock, maxRetries) + var lastResult *SyncResult + var lastErr error - if err != nil { - updatePageLog("failed", err.Error(), "PAGE_SYNC_FAILED", 0) - return result, err - } + for attempt := 0; attempt <= maxRetries; attempt++ { + saveSum := (attempt == 0 && pageNumber == 1) + result, err := s.SyncAccountReport(ctx, req, useMock, saveSum) + lastResult = result + lastErr = err - summary := map[string]interface{}{ - "page_number": pageNumber, - "detail_count": result.DetailCount, - "sum_saved": result.SumSuccess, - } - updatePageLog("success", "", "", 0) + if err == nil { + logrus.Infof("第 %d 页同步成功,尝试次数:%d", pageNumber, attempt+1) - logrus.Debugf("分页任务 %s 完成: %v", pageTaskID, summary) + updatePageLog("success", "", "", attempt) - return result, nil -} - -func (s *SyncService) fetchCurrentData(req *AccountReportRequest, useMock bool) *AccountReportData { - if useMock { - responseData := s.mockGen.GenerateAccountReportResponseWithPage(req.PageInfo.CurrentPage, req.PageInfo.PageSize) - if responseData != nil && responseData.Data != nil { - return responseData.Data + return result, pageLogID, nil } - return nil + + logrus.Warnf("第 %d 页同步失败,第 %d 次重试,错误:%v", pageNumber, attempt+1, err) } - respBytes, err := s.httpClient.Post(context.Background(), "/rest/openapi/gw/esp/report/accountReport", req) + updatePageLog("failed", lastErr.Error(), "PAGE_SYNC_FAILED", maxRetries) + return lastResult, pageLogID, lastErr +} + +func (s *SyncService) syncSinglePageWithTaskForConcurrent(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int, pageTaskID string, pageNumber int, isFirstPage bool) (*SyncResult, int64, error) { + pageStartTime := time.Now() + + pageLogReq := &taskDto.CreateSyncTaskLogReq{ + TaskID: pageTaskID, + TaskType: "account_report_page", + AdvertiserID: req.AdvertiserID, + StartTime: time.UnixMilli(req.StartTime), + EndTime: time.UnixMilli(req.EndTime), + Status: "pending", + MaxRetry: maxRetries, + PageInfo: req.PageInfo, + RequestParams: map[string]interface{}{ + "page_number": pageNumber, + "page_size": req.PageInfo.PageSize, + }, + } + + pageLogID, err := dao.SyncTaskLog.Create(ctx, pageLogReq) if err != nil { - return nil + logrus.Errorf("创建分页任务日志失败:%v", err) } - responseData := &AccountReportResponse{} - if err := json.Unmarshal(respBytes, responseData); err != nil { - return nil + updatePageLog := func(status, errMsg, errorCode string, retryCount int) { + if pageLogID == 0 { + return + } + duration := time.Since(pageStartTime).Milliseconds() + updateReq := &taskDto.UpdateSyncTaskLogReq{ + ID: pageLogID, + Status: status, + ErrorMessage: errMsg, + ErrorCode: errorCode, + DurationMs: &duration, + } + + if retryCount > 0 { + updateReq.RetryCount = &retryCount + } + + if status == "success" || status == "failed" || status == "partial_failed" { + completedAt := time.Now() + updateReq.CompletedAt = completedAt + } + + if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil { + logrus.Errorf("更新分页任务日志失败:%v", err) + } } - if responseData.Code == 0 && responseData.Data != nil { - return responseData.Data + updatePageLog("running", "", "", 0) + + logrus.Infof(">>> 开始同步第 %d 页数据...", pageNumber) + + var lastResult *SyncResult + var lastErr error + + for attempt := 0; attempt <= maxRetries; attempt++ { + saveSum := isFirstPage && (attempt == 0) + result, err := s.SyncAccountReport(ctx, req, useMock, saveSum) + lastResult = result + lastErr = err + + if err == nil { + logrus.Infof("第 %d 页同步成功,尝试次数:%d", pageNumber, attempt+1) + + updatePageLog("success", "", "", attempt) + + return result, pageLogID, nil + } + + logrus.Warnf("第 %d 页同步失败,第 %d 次重试,错误:%v", pageNumber, attempt+1, err) } - return nil -} - -func (s *SyncService) saveSumData(ctx context.Context, item *dto.CidAccountReportSumItem) (*dto.CreateCidAccountReportSumRes, error) { - return copydata.CidAccountReportDetail.CreateSum(ctx, item) -} - -func (s *SyncService) saveDetailData(ctx context.Context, items []*dto.CidAccountReportDetailItem) (*dto.BatchCreateCidAccountReportDetailRes, error) { - req := &dto.BatchCreateCidAccountReportDetailReq{ - Items: items, - } - return copydata.CidAccountReportDetail.BatchCreate(ctx, req) + updatePageLog("failed", lastErr.Error(), "PAGE_SYNC_FAILED", maxRetries) + return lastResult, pageLogID, lastErr } func (s *SyncService) SyncWithRetry(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int) (*SyncResult, error) { @@ -429,7 +472,8 @@ func (s *SyncService) SyncWithRetry(ctx context.Context, req *AccountReportReque var lastErr error for attempt := 0; attempt <= maxRetries; attempt++ { - result, err := s.SyncAccountReport(ctx, req, useMock) + saveSum := (attempt == 0 && req.PageInfo.CurrentPage == 1) + result, err := s.SyncAccountReport(ctx, req, useMock, saveSum) lastResult = result lastErr = err @@ -568,7 +612,7 @@ func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *Acco PageSize: pageSize, } - result, err := s.syncSinglePageWithTask(egCtx, &pageReq, config.UseMock, config.MaxRetries, pageTaskID, currentPage) + result, pageLogID, err := s.syncSinglePageWithTaskForConcurrent(egCtx, &pageReq, config.UseMock, config.MaxRetries, pageTaskID, currentPage, currentPage == 1) pageDuration := time.Since(pageStartTime).Milliseconds() pageResult := &PageSyncResult{ @@ -578,7 +622,7 @@ func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *Acco DurationMs: pageDuration, ErrorMessage: "", RetryCount: 0, - PageTaskLogID: 0, + PageTaskLogID: pageLogID, } if err != nil { @@ -689,3 +733,40 @@ func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *Acco return aggregatedResult, nil } + +func (s *SyncService) fetchCurrentData(req *AccountReportRequest, useMock bool) *AccountReportData { + if useMock { + responseData := s.mockGen.GenerateAccountReportResponseWithPage(req.PageInfo.CurrentPage, req.PageInfo.PageSize) + if responseData != nil && responseData.Data != nil { + return responseData.Data + } + return nil + } + + respBytes, err := s.httpClient.Post(context.Background(), "/rest/openapi/gw/esp/report/accountReport", req) + if err != nil { + return nil + } + + responseData := &AccountReportResponse{} + if err := json.Unmarshal(respBytes, responseData); err != nil { + return nil + } + + if responseData.Code == 0 && responseData.Data != nil { + return responseData.Data + } + + return nil +} + +func (s *SyncService) saveSumData(ctx context.Context, item *dto.CidAccountReportSumItem) (*dto.CreateCidAccountReportSumRes, error) { + return copydata.CidAccountReportDetail.CreateSum(ctx, item) +} + +func (s *SyncService) saveDetailData(ctx context.Context, items []*dto.CidAccountReportDetailItem) (*dto.BatchCreateCidAccountReportDetailRes, error) { + req := &dto.BatchCreateCidAccountReportDetailReq{ + Items: items, + } + return copydata.CidAccountReportDetail.BatchCreate(ctx, req) +}