抽取数据添加协程逻辑调整

This commit is contained in:
2026-04-08 17:26:00 +08:00
parent 000ea03420
commit 5d72dd3aff
5 changed files with 172 additions and 70 deletions

View File

@@ -281,11 +281,11 @@ func (m *MockDataGenerator) generateSumData() *AccountReportSum {
ItemEntranceClkCnt: m.randomIntPtr(50, 500),
ShowCnt: m.randomIntPtr(1000, 10000),
ReportDateStr: time.Now().Format("2006-01-02"),
CampaignId: m.randomIntPtr(1, 100),
CampaignId: int64Ptr(23),
CampaignName: "测试计划",
UnitId: m.randomIntPtr(1, 50),
UnitId: int64Ptr(10),
UnitName: "测试单元",
CreativeId: m.randomIntPtr(1, 20),
CreativeId: int64Ptr(13),
CreativeName: "测试创意",
CidActualRoiAfterSubsidy: m.randomFloatPtr(1.0, 3.0),
CidCouponAmount: m.randomIntPtr(100, 1000),
@@ -294,6 +294,10 @@ func (m *MockDataGenerator) generateSumData() *AccountReportSum {
}
}
func int64Ptr(v int64) *int64 {
return &v
}
func (m *MockDataGenerator) generateDetailData(count int) []*AccountReportItem {
items := make([]*AccountReportItem, count)
for i := 0; i < count; i++ {

View File

@@ -26,7 +26,7 @@ func SyncAccountReportWithMock(ctx context.Context) error {
},
}
result, err := syncService.SyncAccountReport(ctx, req, true)
result, err := syncService.SyncAccountReport(ctx, req, true, true)
if err != nil {
logrus.Errorf("同步失败:%v", err)
return err
@@ -39,7 +39,7 @@ func SyncAccountReportWithMock(ctx context.Context) error {
func SyncAccountReportWithRealAPI(ctx context.Context, req *AccountReportRequest) error {
syncService := NewSyncService()
result, err := syncService.SyncAccountReport(ctx, req, false)
result, err := syncService.SyncAccountReport(ctx, req, false, true)
if err != nil {
logrus.Errorf("同步失败:%v", err)
return err

View File

@@ -53,7 +53,7 @@ type PageSyncResult struct {
RetryCount int `json:"retry_count"`
}
func (s *SyncService) SyncAccountReport(ctx context.Context, req *AccountReportRequest, useMock bool) (*SyncResult, error) {
func (s *SyncService) SyncAccountReport(ctx context.Context, req *AccountReportRequest, useMock bool, saveSum bool) (*SyncResult, error) {
result := &SyncResult{}
var responseData *AccountReportResponse
@@ -81,7 +81,7 @@ func (s *SyncService) SyncAccountReport(ctx context.Context, req *AccountReportR
}
}
if responseData.Data.Sum != nil {
if saveSum && responseData.Data.Sum != nil {
sumItem := s.converter.ConvertToSumItem(responseData.Data.Sum, "account_report", req.PageInfo.CurrentPage)
sumResult, saveErr := s.saveSumData(ctx, sumItem)
if saveErr != nil {
@@ -202,9 +202,10 @@ func (s *SyncService) SyncAccountReportWithPagination(ctx context.Context, req *
RetryCount: 0,
}
result, err := s.syncSinglePageWithTask(ctx, req, useMock, maxRetries, pageTaskID, currentPage)
result, pageLogID, err := s.syncSinglePageWithTask(ctx, req, useMock, maxRetries, pageTaskID, currentPage)
pageDuration := time.Since(pageStartTime).Milliseconds()
pageResult.DurationMs = pageDuration
pageResult.PageTaskLogID = pageLogID
if err != nil {
logrus.Errorf("第 %d 页同步失败:%v", currentPage, err)
@@ -310,11 +311,7 @@ func (s *SyncService) SyncAccountReportWithPagination(ctx context.Context, req *
return aggregatedResult, aggregatedResult.Error
}
func (s *SyncService) SyncSinglePageWithTask(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int, pageTaskID string, pageNumber int) (*SyncResult, error) {
return s.syncSinglePageWithTask(ctx, req, useMock, maxRetries, pageTaskID, pageNumber)
}
func (s *SyncService) syncSinglePageWithTask(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int, pageTaskID string, pageNumber int) (*SyncResult, error) {
func (s *SyncService) syncSinglePageWithTask(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int, pageTaskID string, pageNumber int) (*SyncResult, int64, error) {
pageStartTime := time.Now()
pageLogReq := &taskDto.CreateSyncTaskLogReq{
@@ -354,7 +351,7 @@ func (s *SyncService) syncSinglePageWithTask(ctx context.Context, req *AccountRe
updateReq.RetryCount = &retryCount
}
if status == "success" || status == "failed" {
if status == "success" || status == "failed" || status == "partial_failed" {
completedAt := time.Now()
updateReq.CompletedAt = completedAt
}
@@ -368,60 +365,106 @@ func (s *SyncService) syncSinglePageWithTask(ctx context.Context, req *AccountRe
logrus.Infof(">>> 开始同步第 %d 页数据...", pageNumber)
result, err := s.SyncWithRetry(ctx, req, useMock, maxRetries)
var lastResult *SyncResult
var lastErr error
if err != nil {
updatePageLog("failed", err.Error(), "PAGE_SYNC_FAILED", 0)
return result, err
}
for attempt := 0; attempt <= maxRetries; attempt++ {
saveSum := (attempt == 0 && pageNumber == 1)
result, err := s.SyncAccountReport(ctx, req, useMock, saveSum)
lastResult = result
lastErr = err
summary := map[string]interface{}{
"page_number": pageNumber,
"detail_count": result.DetailCount,
"sum_saved": result.SumSuccess,
}
updatePageLog("success", "", "", 0)
if err == nil {
logrus.Infof("第 %d 页同步成功,尝试次数:%d", pageNumber, attempt+1)
logrus.Debugf("分页任务 %s 完成: %v", pageTaskID, summary)
updatePageLog("success", "", "", attempt)
return result, nil
}
func (s *SyncService) fetchCurrentData(req *AccountReportRequest, useMock bool) *AccountReportData {
if useMock {
responseData := s.mockGen.GenerateAccountReportResponseWithPage(req.PageInfo.CurrentPage, req.PageInfo.PageSize)
if responseData != nil && responseData.Data != nil {
return responseData.Data
return result, pageLogID, nil
}
return nil
logrus.Warnf("第 %d 页同步失败,第 %d 次重试,错误:%v", pageNumber, attempt+1, err)
}
respBytes, err := s.httpClient.Post(context.Background(), "/rest/openapi/gw/esp/report/accountReport", req)
updatePageLog("failed", lastErr.Error(), "PAGE_SYNC_FAILED", maxRetries)
return lastResult, pageLogID, lastErr
}
func (s *SyncService) syncSinglePageWithTaskForConcurrent(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int, pageTaskID string, pageNumber int, isFirstPage bool) (*SyncResult, int64, error) {
pageStartTime := time.Now()
pageLogReq := &taskDto.CreateSyncTaskLogReq{
TaskID: pageTaskID,
TaskType: "account_report_page",
AdvertiserID: req.AdvertiserID,
StartTime: time.UnixMilli(req.StartTime),
EndTime: time.UnixMilli(req.EndTime),
Status: "pending",
MaxRetry: maxRetries,
PageInfo: req.PageInfo,
RequestParams: map[string]interface{}{
"page_number": pageNumber,
"page_size": req.PageInfo.PageSize,
},
}
pageLogID, err := dao.SyncTaskLog.Create(ctx, pageLogReq)
if err != nil {
return nil
logrus.Errorf("创建分页任务日志失败:%v", err)
}
responseData := &AccountReportResponse{}
if err := json.Unmarshal(respBytes, responseData); err != nil {
return nil
updatePageLog := func(status, errMsg, errorCode string, retryCount int) {
if pageLogID == 0 {
return
}
duration := time.Since(pageStartTime).Milliseconds()
updateReq := &taskDto.UpdateSyncTaskLogReq{
ID: pageLogID,
Status: status,
ErrorMessage: errMsg,
ErrorCode: errorCode,
DurationMs: &duration,
}
if retryCount > 0 {
updateReq.RetryCount = &retryCount
}
if status == "success" || status == "failed" || status == "partial_failed" {
completedAt := time.Now()
updateReq.CompletedAt = completedAt
}
if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil {
logrus.Errorf("更新分页任务日志失败:%v", err)
}
}
if responseData.Code == 0 && responseData.Data != nil {
return responseData.Data
updatePageLog("running", "", "", 0)
logrus.Infof(">>> 开始同步第 %d 页数据...", pageNumber)
var lastResult *SyncResult
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
saveSum := isFirstPage && (attempt == 0)
result, err := s.SyncAccountReport(ctx, req, useMock, saveSum)
lastResult = result
lastErr = err
if err == nil {
logrus.Infof("第 %d 页同步成功,尝试次数:%d", pageNumber, attempt+1)
updatePageLog("success", "", "", attempt)
return result, pageLogID, nil
}
logrus.Warnf("第 %d 页同步失败,第 %d 次重试,错误:%v", pageNumber, attempt+1, err)
}
return nil
}
func (s *SyncService) saveSumData(ctx context.Context, item *dto.CidAccountReportSumItem) (*dto.CreateCidAccountReportSumRes, error) {
return copydata.CidAccountReportDetail.CreateSum(ctx, item)
}
func (s *SyncService) saveDetailData(ctx context.Context, items []*dto.CidAccountReportDetailItem) (*dto.BatchCreateCidAccountReportDetailRes, error) {
req := &dto.BatchCreateCidAccountReportDetailReq{
Items: items,
}
return copydata.CidAccountReportDetail.BatchCreate(ctx, req)
updatePageLog("failed", lastErr.Error(), "PAGE_SYNC_FAILED", maxRetries)
return lastResult, pageLogID, lastErr
}
func (s *SyncService) SyncWithRetry(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int) (*SyncResult, error) {
@@ -429,7 +472,8 @@ func (s *SyncService) SyncWithRetry(ctx context.Context, req *AccountReportReque
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
result, err := s.SyncAccountReport(ctx, req, useMock)
saveSum := (attempt == 0 && req.PageInfo.CurrentPage == 1)
result, err := s.SyncAccountReport(ctx, req, useMock, saveSum)
lastResult = result
lastErr = err
@@ -568,7 +612,7 @@ func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *Acco
PageSize: pageSize,
}
result, err := s.syncSinglePageWithTask(egCtx, &pageReq, config.UseMock, config.MaxRetries, pageTaskID, currentPage)
result, pageLogID, err := s.syncSinglePageWithTaskForConcurrent(egCtx, &pageReq, config.UseMock, config.MaxRetries, pageTaskID, currentPage, currentPage == 1)
pageDuration := time.Since(pageStartTime).Milliseconds()
pageResult := &PageSyncResult{
@@ -578,7 +622,7 @@ func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *Acco
DurationMs: pageDuration,
ErrorMessage: "",
RetryCount: 0,
PageTaskLogID: 0,
PageTaskLogID: pageLogID,
}
if err != nil {
@@ -689,3 +733,40 @@ func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *Acco
return aggregatedResult, nil
}
func (s *SyncService) fetchCurrentData(req *AccountReportRequest, useMock bool) *AccountReportData {
if useMock {
responseData := s.mockGen.GenerateAccountReportResponseWithPage(req.PageInfo.CurrentPage, req.PageInfo.PageSize)
if responseData != nil && responseData.Data != nil {
return responseData.Data
}
return nil
}
respBytes, err := s.httpClient.Post(context.Background(), "/rest/openapi/gw/esp/report/accountReport", req)
if err != nil {
return nil
}
responseData := &AccountReportResponse{}
if err := json.Unmarshal(respBytes, responseData); err != nil {
return nil
}
if responseData.Code == 0 && responseData.Data != nil {
return responseData.Data
}
return nil
}
func (s *SyncService) saveSumData(ctx context.Context, item *dto.CidAccountReportSumItem) (*dto.CreateCidAccountReportSumRes, error) {
return copydata.CidAccountReportDetail.CreateSum(ctx, item)
}
func (s *SyncService) saveDetailData(ctx context.Context, items []*dto.CidAccountReportDetailItem) (*dto.BatchCreateCidAccountReportDetailRes, error) {
req := &dto.BatchCreateCidAccountReportDetailReq{
Items: items,
}
return copydata.CidAccountReportDetail.BatchCreate(ctx, req)
}