2026-04-09 10:48:30 +08:00
|
|
|
|
package syncdata
|
2026-04-07 09:51:32 +08:00
|
|
|
|
|
|
|
|
|
|
import (
|
2026-04-08 09:03:20 +08:00
|
|
|
|
dao "cid/dao/copydata"
|
2026-04-07 09:51:32 +08:00
|
|
|
|
dto "cid/model/dto/copydata"
|
2026-04-08 09:03:20 +08:00
|
|
|
|
taskDto "cid/model/dto/copydata"
|
2026-04-07 09:51:32 +08:00
|
|
|
|
"cid/service/copydata"
|
|
|
|
|
|
"context"
|
|
|
|
|
|
"encoding/json"
|
|
|
|
|
|
"fmt"
|
2026-04-08 16:00:54 +08:00
|
|
|
|
"sync"
|
|
|
|
|
|
"sync/atomic"
|
2026-04-07 09:51:32 +08:00
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
|
|
"github.com/sirupsen/logrus"
|
2026-04-08 16:00:54 +08:00
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
|
|
"golang.org/x/sync/semaphore"
|
2026-04-07 09:51:32 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
type SyncService struct {
|
|
|
|
|
|
httpClient *HttpClient
|
|
|
|
|
|
converter *DataConverter
|
|
|
|
|
|
mockGen *MockDataGenerator
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func NewSyncService() *SyncService {
|
|
|
|
|
|
return &SyncService{
|
|
|
|
|
|
httpClient: NewHttpClient("https://ad.e.kuaishou.com", 0),
|
|
|
|
|
|
converter: NewDataConverter(),
|
|
|
|
|
|
mockGen: NewMockDataGenerator(),
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
type SyncResult struct {
|
2026-04-08 09:03:20 +08:00
|
|
|
|
SumSuccess bool `json:"sum_success"`
|
|
|
|
|
|
SumID int64 `json:"sum_id"`
|
|
|
|
|
|
DetailSuccess bool `json:"detail_success"`
|
|
|
|
|
|
DetailCount int `json:"detail_count"`
|
|
|
|
|
|
DetailSuccessCount int64 `json:"detail_success_count"`
|
|
|
|
|
|
DetailFailCount int64 `json:"detail_fail_count"`
|
|
|
|
|
|
Error error `json:"error"`
|
|
|
|
|
|
TaskLogID int64 `json:"task_log_id"`
|
|
|
|
|
|
PageResults []*PageSyncResult `json:"page_results,omitempty"`
|
2026-04-07 09:51:32 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-08 09:03:20 +08:00
|
|
|
|
type PageSyncResult struct {
|
|
|
|
|
|
PageNumber int `json:"page_number"`
|
|
|
|
|
|
PageTaskLogID int64 `json:"page_task_log_id"`
|
|
|
|
|
|
Success bool `json:"success"`
|
|
|
|
|
|
RecordCount int `json:"record_count"`
|
|
|
|
|
|
DurationMs int64 `json:"duration_ms"`
|
|
|
|
|
|
ErrorMessage string `json:"error_message,omitempty"`
|
|
|
|
|
|
RetryCount int `json:"retry_count"`
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-08 17:26:00 +08:00
|
|
|
|
func (s *SyncService) SyncAccountReport(ctx context.Context, req *AccountReportRequest, useMock bool, saveSum bool) (*SyncResult, error) {
|
2026-04-07 09:51:32 +08:00
|
|
|
|
result := &SyncResult{}
|
|
|
|
|
|
|
2026-04-08 09:03:20 +08:00
|
|
|
|
var responseData *AccountReportResponse
|
2026-04-07 09:51:32 +08:00
|
|
|
|
|
|
|
|
|
|
if useMock {
|
2026-04-09 09:48:22 +08:00
|
|
|
|
logrus.Infof("使用 Mock 数据同步快手广告账户报表 - page=%d, pageSize=%d",
|
|
|
|
|
|
req.PageInfo.CurrentPage, req.PageInfo.PageSize)
|
2026-04-08 16:00:54 +08:00
|
|
|
|
responseData = s.mockGen.GenerateAccountReportResponseWithPage(req.PageInfo.CurrentPage, req.PageInfo.PageSize)
|
2026-04-09 09:48:22 +08:00
|
|
|
|
|
|
|
|
|
|
logrus.Infof("✓ Mock 数据生成完成 - TotalCount=%d, Detail数组长度=%d",
|
|
|
|
|
|
responseData.Data.TotalCount, len(responseData.Data.Detail))
|
|
|
|
|
|
|
|
|
|
|
|
if len(responseData.Data.Detail) > 0 {
|
|
|
|
|
|
firstItem := responseData.Data.Detail[0]
|
|
|
|
|
|
lastItem := responseData.Data.Detail[len(responseData.Data.Detail)-1]
|
|
|
|
|
|
logrus.Infof(" 第一条: campaignId=%v, creativeId=%v", firstItem.CampaignId, firstItem.CreativeId)
|
|
|
|
|
|
logrus.Infof(" 最后一条: campaignId=%v, creativeId=%v", lastItem.CampaignId, lastItem.CreativeId)
|
|
|
|
|
|
}
|
2026-04-07 09:51:32 +08:00
|
|
|
|
} else {
|
2026-04-08 09:03:20 +08:00
|
|
|
|
logrus.Info("从真实 API 同步快手广告账户报表")
|
|
|
|
|
|
respBytes, err := s.httpClient.Post(ctx, "/rest/openapi/gw/esp/report/accountReport", req)
|
2026-04-07 09:51:32 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
result.Error = fmt.Errorf("调用 API 失败:%w", err)
|
|
|
|
|
|
return result, result.Error
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-08 09:03:20 +08:00
|
|
|
|
responseData = &AccountReportResponse{}
|
2026-04-07 09:51:32 +08:00
|
|
|
|
if err := json.Unmarshal(respBytes, responseData); err != nil {
|
|
|
|
|
|
result.Error = fmt.Errorf("解析响应失败:%w", err)
|
|
|
|
|
|
return result, result.Error
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if responseData.Code != 0 {
|
|
|
|
|
|
result.Error = fmt.Errorf("API 返回错误:code=%d, message=%s", responseData.Code, responseData.Message)
|
|
|
|
|
|
return result, result.Error
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-08 17:26:00 +08:00
|
|
|
|
if saveSum && responseData.Data.Sum != nil {
|
2026-04-08 09:03:20 +08:00
|
|
|
|
sumItem := s.converter.ConvertToSumItem(responseData.Data.Sum, "account_report", req.PageInfo.CurrentPage)
|
2026-04-07 09:51:32 +08:00
|
|
|
|
sumResult, saveErr := s.saveSumData(ctx, sumItem)
|
|
|
|
|
|
if saveErr != nil {
|
|
|
|
|
|
logrus.Errorf("保存汇总数据失败:%v", saveErr)
|
|
|
|
|
|
result.Error = fmt.Errorf("保存汇总数据失败:%w", saveErr)
|
|
|
|
|
|
} else {
|
|
|
|
|
|
result.SumSuccess = true
|
|
|
|
|
|
result.SumID = sumResult.Id
|
|
|
|
|
|
logrus.Infof("成功保存汇总数据,ID=%d", sumResult.Id)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if len(responseData.Data.Detail) > 0 {
|
2026-04-08 09:03:20 +08:00
|
|
|
|
detailItems := s.converter.ConvertToDetailItems(responseData.Data.Detail, "account_report", req.PageInfo.CurrentPage)
|
2026-04-09 09:48:22 +08:00
|
|
|
|
|
|
|
|
|
|
logrus.Infof("▶ 准备插入 %d 条明细数据(page=%d)", len(detailItems), req.PageInfo.CurrentPage)
|
|
|
|
|
|
|
2026-04-07 09:51:32 +08:00
|
|
|
|
detailResult, saveErr := s.saveDetailData(ctx, detailItems)
|
|
|
|
|
|
if saveErr != nil {
|
|
|
|
|
|
logrus.Errorf("保存明细数据失败:%v", saveErr)
|
|
|
|
|
|
result.Error = fmt.Errorf("保存明细数据失败:%w", saveErr)
|
|
|
|
|
|
} else {
|
|
|
|
|
|
result.DetailSuccess = true
|
|
|
|
|
|
result.DetailCount = len(detailItems)
|
|
|
|
|
|
result.DetailSuccessCount = detailResult.SuccessCount
|
|
|
|
|
|
result.DetailFailCount = detailResult.FailCount
|
2026-04-09 09:48:22 +08:00
|
|
|
|
logrus.Infof("✓ 成功保存 %d 条明细数据(成功=%d, 失败=%d)", len(detailItems), detailResult.SuccessCount, detailResult.FailCount)
|
2026-04-07 09:51:32 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return result, result.Error
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-08 09:03:20 +08:00
|
|
|
|
// SyncAccountReportWithPagination 带分页处理的同步方法(支持全量数据抽取)
|
|
|
|
|
|
func (s *SyncService) SyncAccountReportWithPagination(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int) (*SyncResult, error) {
|
|
|
|
|
|
startTime := time.Now()
|
|
|
|
|
|
parentTaskID := fmt.Sprintf("%d_%d_account", req.AdvertiserID, req.StartTime)
|
|
|
|
|
|
|
|
|
|
|
|
logReq := &taskDto.CreateSyncTaskLogReq{
|
|
|
|
|
|
TaskID: parentTaskID,
|
|
|
|
|
|
TaskType: "account_report",
|
|
|
|
|
|
AdvertiserID: req.AdvertiserID,
|
|
|
|
|
|
StartTime: time.UnixMilli(req.StartTime),
|
|
|
|
|
|
EndTime: time.UnixMilli(req.EndTime),
|
|
|
|
|
|
Status: "pending",
|
|
|
|
|
|
MaxRetry: maxRetries,
|
|
|
|
|
|
RequestParams: req,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
parentLogID, err := dao.SyncTaskLog.Create(ctx, logReq)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
logrus.Errorf("创建主任务日志失败:%v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
updateParentLog := func(status, errMsg, errorCode string, summary interface{}) {
|
|
|
|
|
|
if parentLogID == 0 {
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
duration := time.Since(startTime).Milliseconds()
|
|
|
|
|
|
updateReq := &taskDto.UpdateSyncTaskLogReq{
|
|
|
|
|
|
ID: parentLogID,
|
|
|
|
|
|
Status: status,
|
|
|
|
|
|
ErrorMessage: errMsg,
|
|
|
|
|
|
ErrorCode: errorCode,
|
|
|
|
|
|
DurationMs: &duration,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if status == "success" || status == "manual_review" {
|
|
|
|
|
|
completedAt := time.Now()
|
|
|
|
|
|
updateReq.CompletedAt = completedAt
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if summary != nil {
|
|
|
|
|
|
updateReq.ResultSummary = summary
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil {
|
|
|
|
|
|
logrus.Errorf("更新主任务日志失败:%v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
updateParentLog("running", "", "", nil)
|
|
|
|
|
|
|
2026-04-07 09:51:32 +08:00
|
|
|
|
aggregatedResult := &SyncResult{
|
2026-04-08 09:03:20 +08:00
|
|
|
|
SumSuccess: false,
|
|
|
|
|
|
SumID: 0,
|
|
|
|
|
|
TaskLogID: parentLogID,
|
|
|
|
|
|
PageResults: make([]*PageSyncResult, 0),
|
2026-04-07 09:51:32 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
totalCount := 0
|
|
|
|
|
|
currentPage := 1
|
2026-04-08 16:00:54 +08:00
|
|
|
|
pageSize := 10
|
2026-04-07 09:51:32 +08:00
|
|
|
|
|
|
|
|
|
|
if req.PageInfo == nil {
|
|
|
|
|
|
req.PageInfo = &PageInfo{}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-08 16:00:54 +08:00
|
|
|
|
if req.PageInfo.PageSize > 0 {
|
|
|
|
|
|
pageSize = req.PageInfo.PageSize
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
successPages := 0
|
|
|
|
|
|
failedPages := 0
|
|
|
|
|
|
|
2026-04-08 09:03:20 +08:00
|
|
|
|
var totalPages int
|
|
|
|
|
|
|
2026-04-07 09:51:32 +08:00
|
|
|
|
for {
|
|
|
|
|
|
logrus.Infof(">>> 正在同步第 %d 页数据...", currentPage)
|
|
|
|
|
|
|
|
|
|
|
|
req.PageInfo.CurrentPage = currentPage
|
|
|
|
|
|
req.PageInfo.PageSize = pageSize
|
|
|
|
|
|
|
2026-04-08 09:03:20 +08:00
|
|
|
|
pageTaskID := fmt.Sprintf("%s_page_%d", parentTaskID, currentPage)
|
|
|
|
|
|
pageStartTime := time.Now()
|
|
|
|
|
|
|
|
|
|
|
|
pageResult := &PageSyncResult{
|
|
|
|
|
|
PageNumber: currentPage,
|
|
|
|
|
|
Success: false,
|
|
|
|
|
|
RecordCount: 0,
|
|
|
|
|
|
RetryCount: 0,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-09 09:48:22 +08:00
|
|
|
|
result, pageLogID, err := s.SyncSinglePageWithTask(ctx, req, useMock, maxRetries, pageTaskID, currentPage)
|
2026-04-08 09:03:20 +08:00
|
|
|
|
pageDuration := time.Since(pageStartTime).Milliseconds()
|
|
|
|
|
|
pageResult.DurationMs = pageDuration
|
2026-04-08 17:26:00 +08:00
|
|
|
|
pageResult.PageTaskLogID = pageLogID
|
2026-04-08 09:03:20 +08:00
|
|
|
|
|
2026-04-07 09:51:32 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
logrus.Errorf("第 %d 页同步失败:%v", currentPage, err)
|
2026-04-08 09:03:20 +08:00
|
|
|
|
pageResult.ErrorMessage = err.Error()
|
|
|
|
|
|
failedPages++
|
|
|
|
|
|
|
|
|
|
|
|
aggregatedResult.PageResults = append(aggregatedResult.PageResults, pageResult)
|
|
|
|
|
|
|
|
|
|
|
|
if failedPages > maxRetries {
|
|
|
|
|
|
logrus.Warnf("失败页数超过阈值 %d,终止同步", maxRetries)
|
|
|
|
|
|
break
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
currentPage++
|
|
|
|
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
|
|
continue
|
2026-04-07 09:51:32 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if result.SumSuccess && aggregatedResult.SumID == 0 {
|
|
|
|
|
|
aggregatedResult.SumSuccess = true
|
|
|
|
|
|
aggregatedResult.SumID = result.SumID
|
|
|
|
|
|
logrus.Infof("✓ 汇总数据已保存,ID=%d", result.SumID)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if result.DetailSuccess && result.DetailCount > 0 {
|
2026-04-08 09:03:20 +08:00
|
|
|
|
totalCount += result.DetailCount
|
|
|
|
|
|
pageResult.Success = true
|
|
|
|
|
|
pageResult.RecordCount = result.DetailCount
|
|
|
|
|
|
successPages++
|
|
|
|
|
|
logrus.Infof("✓ 第 %d 页获取到 %d 条明细数据,累计 %d 条", currentPage, result.DetailCount, totalCount)
|
2026-04-07 09:51:32 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-08 09:03:20 +08:00
|
|
|
|
aggregatedResult.PageResults = append(aggregatedResult.PageResults, pageResult)
|
|
|
|
|
|
|
2026-04-07 09:51:32 +08:00
|
|
|
|
currentData := s.fetchCurrentData(req, useMock)
|
|
|
|
|
|
if currentData != nil && currentData.TotalCount > 0 {
|
2026-04-08 09:03:20 +08:00
|
|
|
|
totalPages = (currentData.TotalCount + pageSize - 1) / pageSize
|
2026-04-07 09:51:32 +08:00
|
|
|
|
logrus.Infof("总记录数:%d, 总页数:%d, 当前页:%d/%d",
|
|
|
|
|
|
currentData.TotalCount, totalPages, currentPage, totalPages)
|
|
|
|
|
|
|
|
|
|
|
|
if currentPage >= totalPages {
|
|
|
|
|
|
logrus.Infof("✓ 已同步所有页面数据,共 %d 页,%d 条记录", totalPages, currentData.TotalCount)
|
|
|
|
|
|
break
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if result.DetailCount < pageSize {
|
|
|
|
|
|
logrus.Infof("✓ 当前页数据不足 %d 条,已到达最后一页", pageSize)
|
|
|
|
|
|
break
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
currentPage++
|
|
|
|
|
|
time.Sleep(300 * time.Millisecond)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-08 09:03:20 +08:00
|
|
|
|
logrus.Infof("分页同步完成 - 成功:%d页, 失败:%d页, 总明细:%d条",
|
|
|
|
|
|
successPages, failedPages, totalCount)
|
|
|
|
|
|
|
|
|
|
|
|
// 统计所有子任务的结果
|
|
|
|
|
|
totalDetailCount := 0
|
|
|
|
|
|
var totalSuccessCount int64
|
|
|
|
|
|
var totalFailCount int64
|
|
|
|
|
|
|
|
|
|
|
|
for _, pageResult := range aggregatedResult.PageResults {
|
|
|
|
|
|
if pageResult.Success {
|
|
|
|
|
|
totalDetailCount += pageResult.RecordCount
|
|
|
|
|
|
totalSuccessCount++
|
2026-04-07 09:51:32 +08:00
|
|
|
|
} else {
|
2026-04-08 09:03:20 +08:00
|
|
|
|
totalFailCount++
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
aggregatedResult.DetailCount = totalDetailCount
|
|
|
|
|
|
aggregatedResult.DetailSuccessCount = totalSuccessCount
|
|
|
|
|
|
aggregatedResult.DetailFailCount = totalFailCount
|
|
|
|
|
|
|
|
|
|
|
|
if failedPages > 0 {
|
|
|
|
|
|
logrus.Warnf("存在 %d 个失败的页面,主任务标记为部分失败", failedPages)
|
|
|
|
|
|
|
|
|
|
|
|
summary := map[string]interface{}{
|
|
|
|
|
|
"sum_id": aggregatedResult.SumID,
|
|
|
|
|
|
"detail_count": totalDetailCount,
|
|
|
|
|
|
"total_pages": totalPages,
|
|
|
|
|
|
"success_pages": successPages,
|
|
|
|
|
|
"failed_pages": failedPages,
|
|
|
|
|
|
"page_results": aggregatedResult.PageResults,
|
2026-04-07 09:51:32 +08:00
|
|
|
|
}
|
2026-04-08 09:03:20 +08:00
|
|
|
|
updateParentLog("partial_failed", fmt.Sprintf("%d 个页面同步失败", failedPages), "PAGE_SYNC_FAILED", summary)
|
2026-04-07 09:51:32 +08:00
|
|
|
|
} else {
|
2026-04-08 09:03:20 +08:00
|
|
|
|
logrus.Info("✓ 所有页面同步成功")
|
|
|
|
|
|
|
|
|
|
|
|
summary := map[string]interface{}{
|
|
|
|
|
|
"sum_id": aggregatedResult.SumID,
|
|
|
|
|
|
"detail_count": totalDetailCount,
|
|
|
|
|
|
"total_pages": totalPages,
|
|
|
|
|
|
"success_pages": successPages,
|
|
|
|
|
|
"failed_pages": 0,
|
|
|
|
|
|
"page_results": aggregatedResult.PageResults,
|
|
|
|
|
|
}
|
|
|
|
|
|
updateParentLog("success", "", "", summary)
|
2026-04-07 09:51:32 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return aggregatedResult, aggregatedResult.Error
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-09 09:48:22 +08:00
|
|
|
|
func (s *SyncService) SyncSinglePageWithTask(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int, pageTaskID string, pageNumber int) (*SyncResult, int64, error) {
|
2026-04-08 09:03:20 +08:00
|
|
|
|
pageStartTime := time.Now()
|
|
|
|
|
|
|
|
|
|
|
|
pageLogReq := &taskDto.CreateSyncTaskLogReq{
|
|
|
|
|
|
TaskID: pageTaskID,
|
|
|
|
|
|
TaskType: "account_report_page",
|
|
|
|
|
|
AdvertiserID: req.AdvertiserID,
|
|
|
|
|
|
StartTime: time.UnixMilli(req.StartTime),
|
|
|
|
|
|
EndTime: time.UnixMilli(req.EndTime),
|
|
|
|
|
|
Status: "pending",
|
|
|
|
|
|
MaxRetry: maxRetries,
|
|
|
|
|
|
PageInfo: req.PageInfo,
|
|
|
|
|
|
RequestParams: map[string]interface{}{
|
|
|
|
|
|
"page_number": pageNumber,
|
|
|
|
|
|
"page_size": req.PageInfo.PageSize,
|
|
|
|
|
|
},
|
2026-04-07 09:51:32 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-08 09:03:20 +08:00
|
|
|
|
pageLogID, err := dao.SyncTaskLog.Create(ctx, pageLogReq)
|
2026-04-07 09:51:32 +08:00
|
|
|
|
if err != nil {
|
2026-04-08 09:03:20 +08:00
|
|
|
|
logrus.Errorf("创建分页任务日志失败:%v", err)
|
2026-04-07 09:51:32 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-08 09:03:20 +08:00
|
|
|
|
updatePageLog := func(status, errMsg, errorCode string, retryCount int) {
|
|
|
|
|
|
if pageLogID == 0 {
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
duration := time.Since(pageStartTime).Milliseconds()
|
|
|
|
|
|
updateReq := &taskDto.UpdateSyncTaskLogReq{
|
|
|
|
|
|
ID: pageLogID,
|
|
|
|
|
|
Status: status,
|
|
|
|
|
|
ErrorMessage: errMsg,
|
|
|
|
|
|
ErrorCode: errorCode,
|
|
|
|
|
|
DurationMs: &duration,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if retryCount > 0 {
|
|
|
|
|
|
updateReq.RetryCount = &retryCount
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-08 17:26:00 +08:00
|
|
|
|
if status == "success" || status == "failed" || status == "partial_failed" {
|
2026-04-08 09:03:20 +08:00
|
|
|
|
completedAt := time.Now()
|
|
|
|
|
|
updateReq.CompletedAt = completedAt
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil {
|
|
|
|
|
|
logrus.Errorf("更新分页任务日志失败:%v", err)
|
|
|
|
|
|
}
|
2026-04-07 09:51:32 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-08 09:03:20 +08:00
|
|
|
|
updatePageLog("running", "", "", 0)
|
|
|
|
|
|
|
|
|
|
|
|
logrus.Infof(">>> 开始同步第 %d 页数据...", pageNumber)
|
|
|
|
|
|
|
2026-04-08 17:26:00 +08:00
|
|
|
|
var lastResult *SyncResult
|
|
|
|
|
|
var lastErr error
|
2026-04-08 09:03:20 +08:00
|
|
|
|
|
2026-04-08 17:26:00 +08:00
|
|
|
|
for attempt := 0; attempt <= maxRetries; attempt++ {
|
|
|
|
|
|
saveSum := (attempt == 0 && pageNumber == 1)
|
|
|
|
|
|
result, err := s.SyncAccountReport(ctx, req, useMock, saveSum)
|
|
|
|
|
|
lastResult = result
|
|
|
|
|
|
lastErr = err
|
2026-04-07 09:51:32 +08:00
|
|
|
|
|
2026-04-08 17:26:00 +08:00
|
|
|
|
if err == nil {
|
|
|
|
|
|
logrus.Infof("第 %d 页同步成功,尝试次数:%d", pageNumber, attempt+1)
|
|
|
|
|
|
|
|
|
|
|
|
updatePageLog("success", "", "", attempt)
|
|
|
|
|
|
|
|
|
|
|
|
return result, pageLogID, nil
|
|
|
|
|
|
}
|
2026-04-08 09:03:20 +08:00
|
|
|
|
|
2026-04-08 17:26:00 +08:00
|
|
|
|
logrus.Warnf("第 %d 页同步失败,第 %d 次重试,错误:%v", pageNumber, attempt+1, err)
|
|
|
|
|
|
}
|
2026-04-08 09:03:20 +08:00
|
|
|
|
|
2026-04-08 17:26:00 +08:00
|
|
|
|
updatePageLog("failed", lastErr.Error(), "PAGE_SYNC_FAILED", maxRetries)
|
|
|
|
|
|
return lastResult, pageLogID, lastErr
|
2026-04-07 09:51:32 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-08 17:26:00 +08:00
|
|
|
|
func (s *SyncService) syncSinglePageWithTaskForConcurrent(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int, pageTaskID string, pageNumber int, isFirstPage bool) (*SyncResult, int64, error) {
|
|
|
|
|
|
pageStartTime := time.Now()
|
|
|
|
|
|
|
|
|
|
|
|
pageLogReq := &taskDto.CreateSyncTaskLogReq{
|
|
|
|
|
|
TaskID: pageTaskID,
|
|
|
|
|
|
TaskType: "account_report_page",
|
|
|
|
|
|
AdvertiserID: req.AdvertiserID,
|
|
|
|
|
|
StartTime: time.UnixMilli(req.StartTime),
|
|
|
|
|
|
EndTime: time.UnixMilli(req.EndTime),
|
|
|
|
|
|
Status: "pending",
|
|
|
|
|
|
MaxRetry: maxRetries,
|
|
|
|
|
|
PageInfo: req.PageInfo,
|
|
|
|
|
|
RequestParams: map[string]interface{}{
|
|
|
|
|
|
"page_number": pageNumber,
|
|
|
|
|
|
"page_size": req.PageInfo.PageSize,
|
|
|
|
|
|
},
|
2026-04-07 09:51:32 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-08 17:26:00 +08:00
|
|
|
|
pageLogID, err := dao.SyncTaskLog.Create(ctx, pageLogReq)
|
2026-04-07 09:51:32 +08:00
|
|
|
|
if err != nil {
|
2026-04-09 13:43:15 +08:00
|
|
|
|
logrus.Errorf("创建分页任务日志失败(page=%d):%v", pageNumber, err)
|
|
|
|
|
|
return nil, 0, fmt.Errorf("创建分页任务日志失败:%w", err)
|
2026-04-07 09:51:32 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-08 17:26:00 +08:00
|
|
|
|
updatePageLog := func(status, errMsg, errorCode string, retryCount int) {
|
|
|
|
|
|
if pageLogID == 0 {
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
duration := time.Since(pageStartTime).Milliseconds()
|
|
|
|
|
|
updateReq := &taskDto.UpdateSyncTaskLogReq{
|
|
|
|
|
|
ID: pageLogID,
|
|
|
|
|
|
Status: status,
|
|
|
|
|
|
ErrorMessage: errMsg,
|
|
|
|
|
|
ErrorCode: errorCode,
|
|
|
|
|
|
DurationMs: &duration,
|
|
|
|
|
|
}
|
2026-04-07 09:51:32 +08:00
|
|
|
|
|
2026-04-08 17:26:00 +08:00
|
|
|
|
if retryCount > 0 {
|
|
|
|
|
|
updateReq.RetryCount = &retryCount
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if status == "success" || status == "failed" || status == "partial_failed" {
|
|
|
|
|
|
completedAt := time.Now()
|
|
|
|
|
|
updateReq.CompletedAt = completedAt
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil {
|
|
|
|
|
|
logrus.Errorf("更新分页任务日志失败:%v", err)
|
|
|
|
|
|
}
|
2026-04-07 09:51:32 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-08 17:26:00 +08:00
|
|
|
|
updatePageLog("running", "", "", 0)
|
2026-04-07 09:51:32 +08:00
|
|
|
|
|
2026-04-08 17:26:00 +08:00
|
|
|
|
logrus.Infof(">>> 开始同步第 %d 页数据...", pageNumber)
|
2026-04-07 09:51:32 +08:00
|
|
|
|
|
2026-04-08 17:26:00 +08:00
|
|
|
|
var lastResult *SyncResult
|
|
|
|
|
|
var lastErr error
|
|
|
|
|
|
|
|
|
|
|
|
for attempt := 0; attempt <= maxRetries; attempt++ {
|
|
|
|
|
|
saveSum := isFirstPage && (attempt == 0)
|
|
|
|
|
|
result, err := s.SyncAccountReport(ctx, req, useMock, saveSum)
|
|
|
|
|
|
lastResult = result
|
|
|
|
|
|
lastErr = err
|
|
|
|
|
|
|
|
|
|
|
|
if err == nil {
|
|
|
|
|
|
logrus.Infof("第 %d 页同步成功,尝试次数:%d", pageNumber, attempt+1)
|
|
|
|
|
|
|
|
|
|
|
|
updatePageLog("success", "", "", attempt)
|
|
|
|
|
|
|
|
|
|
|
|
return result, pageLogID, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
logrus.Warnf("第 %d 页同步失败,第 %d 次重试,错误:%v", pageNumber, attempt+1, err)
|
2026-04-07 09:51:32 +08:00
|
|
|
|
}
|
2026-04-08 17:26:00 +08:00
|
|
|
|
|
|
|
|
|
|
updatePageLog("failed", lastErr.Error(), "PAGE_SYNC_FAILED", maxRetries)
|
|
|
|
|
|
return lastResult, pageLogID, lastErr
|
2026-04-07 09:51:32 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-08 09:03:20 +08:00
|
|
|
|
func (s *SyncService) SyncWithRetry(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int) (*SyncResult, error) {
|
2026-04-07 09:51:32 +08:00
|
|
|
|
var lastResult *SyncResult
|
|
|
|
|
|
var lastErr error
|
|
|
|
|
|
|
|
|
|
|
|
for attempt := 0; attempt <= maxRetries; attempt++ {
|
2026-04-08 17:26:00 +08:00
|
|
|
|
saveSum := (attempt == 0 && req.PageInfo.CurrentPage == 1)
|
|
|
|
|
|
result, err := s.SyncAccountReport(ctx, req, useMock, saveSum)
|
2026-04-07 09:51:32 +08:00
|
|
|
|
lastResult = result
|
|
|
|
|
|
lastErr = err
|
|
|
|
|
|
|
|
|
|
|
|
if err == nil {
|
|
|
|
|
|
logrus.Infof("同步成功,尝试次数:%d", attempt+1)
|
|
|
|
|
|
return result, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
logrus.Warnf("同步失败,第 %d 次重试,错误:%v", attempt+1, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return lastResult, lastErr
|
|
|
|
|
|
}
|
2026-04-08 16:00:54 +08:00
|
|
|
|
|
|
|
|
|
|
type ConcurrentSyncConfig struct {
|
|
|
|
|
|
MaxConcurrency int
|
|
|
|
|
|
UseMock bool
|
|
|
|
|
|
MaxRetries int
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *AccountReportRequest, config ConcurrentSyncConfig) (*SyncResult, error) {
|
|
|
|
|
|
startTime := time.Now()
|
2026-04-09 13:43:15 +08:00
|
|
|
|
batchID := startTime.UnixNano()
|
|
|
|
|
|
parentTaskID := fmt.Sprintf("%d_%d_%d_account", req.AdvertiserID, req.StartTime, batchID)
|
2026-04-08 16:00:54 +08:00
|
|
|
|
|
|
|
|
|
|
logReq := &taskDto.CreateSyncTaskLogReq{
|
|
|
|
|
|
TaskID: parentTaskID,
|
|
|
|
|
|
TaskType: "account_report",
|
|
|
|
|
|
AdvertiserID: req.AdvertiserID,
|
|
|
|
|
|
StartTime: time.UnixMilli(req.StartTime),
|
|
|
|
|
|
EndTime: time.UnixMilli(req.EndTime),
|
|
|
|
|
|
Status: "pending",
|
|
|
|
|
|
MaxRetry: config.MaxRetries,
|
|
|
|
|
|
RequestParams: req,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
parentLogID, err := dao.SyncTaskLog.Create(ctx, logReq)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
logrus.Errorf("创建主任务日志失败:%v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
updateParentLog := func(status, errMsg, errorCode string, summary interface{}) {
|
|
|
|
|
|
if parentLogID == 0 {
|
|
|
|
|
|
return
|
|
|
|
|
|
}
|
|
|
|
|
|
duration := time.Since(startTime).Milliseconds()
|
|
|
|
|
|
updateReq := &taskDto.UpdateSyncTaskLogReq{
|
|
|
|
|
|
ID: parentLogID,
|
|
|
|
|
|
Status: status,
|
|
|
|
|
|
ErrorMessage: errMsg,
|
|
|
|
|
|
ErrorCode: errorCode,
|
|
|
|
|
|
DurationMs: &duration,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if status == "success" || status == "manual_review" {
|
|
|
|
|
|
completedAt := time.Now()
|
|
|
|
|
|
updateReq.CompletedAt = completedAt
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if summary != nil {
|
|
|
|
|
|
updateReq.ResultSummary = summary
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil {
|
|
|
|
|
|
logrus.Errorf("更新主任务日志失败:%v", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
updateParentLog("running", "", "", nil)
|
|
|
|
|
|
|
|
|
|
|
|
if req.PageInfo == nil {
|
|
|
|
|
|
req.PageInfo = &PageInfo{}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
pageSize := 10
|
|
|
|
|
|
if req.PageInfo.PageSize > 0 {
|
|
|
|
|
|
pageSize = req.PageInfo.PageSize
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
firstPageReq := *req
|
|
|
|
|
|
firstPageReq.PageInfo.CurrentPage = 1
|
|
|
|
|
|
firstPageReq.PageInfo.PageSize = pageSize
|
|
|
|
|
|
|
|
|
|
|
|
currentData := s.fetchCurrentData(&firstPageReq, config.UseMock)
|
|
|
|
|
|
if currentData == nil || currentData.TotalCount == 0 {
|
|
|
|
|
|
logrus.Warn("未获取到总记录数,降级为串行同步")
|
|
|
|
|
|
return s.SyncAccountReportWithPagination(ctx, req, config.UseMock, config.MaxRetries)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
totalPages := (currentData.TotalCount + pageSize - 1) / pageSize
|
2026-04-09 13:43:15 +08:00
|
|
|
|
logrus.Infof("开始并发同步 - 批次ID:%d, 总记录数:%d, 总页数:%d, 每页大小:%d, 并发数:%d",
|
|
|
|
|
|
batchID, currentData.TotalCount, totalPages, pageSize, config.MaxConcurrency)
|
2026-04-08 16:00:54 +08:00
|
|
|
|
|
|
|
|
|
|
updateParentLog("running", "", "", map[string]interface{}{
|
|
|
|
|
|
"total_pages": totalPages,
|
|
|
|
|
|
"total_records": currentData.TotalCount,
|
|
|
|
|
|
"page_size": pageSize,
|
|
|
|
|
|
"concurrency": config.MaxConcurrency,
|
2026-04-09 13:43:15 +08:00
|
|
|
|
"batch_id": batchID,
|
2026-04-08 16:00:54 +08:00
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
pageResults := make([]*PageSyncResult, totalPages)
|
|
|
|
|
|
var pageResultsMu sync.Mutex
|
|
|
|
|
|
|
|
|
|
|
|
var sumSuccess bool
|
|
|
|
|
|
var sumID int64
|
|
|
|
|
|
var sumMu sync.Mutex
|
|
|
|
|
|
|
|
|
|
|
|
var totalDetailCount int32
|
|
|
|
|
|
var successPages int64
|
|
|
|
|
|
var failedPages int64
|
|
|
|
|
|
|
|
|
|
|
|
sem := semaphore.NewWeighted(int64(config.MaxConcurrency))
|
|
|
|
|
|
eg, egCtx := errgroup.WithContext(ctx)
|
|
|
|
|
|
|
|
|
|
|
|
for pageNum := 1; pageNum <= totalPages; pageNum++ {
|
2026-04-09 13:43:15 +08:00
|
|
|
|
if err := sem.Acquire(ctx, 1); err != nil {
|
2026-04-08 16:00:54 +08:00
|
|
|
|
logrus.Errorf("获取信号量失败:%v", err)
|
|
|
|
|
|
break
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
currentPage := pageNum
|
|
|
|
|
|
eg.Go(func() error {
|
|
|
|
|
|
defer sem.Release(1)
|
|
|
|
|
|
|
|
|
|
|
|
pageTaskID := fmt.Sprintf("%s_page_%d", parentTaskID, currentPage)
|
|
|
|
|
|
pageStartTime := time.Now()
|
|
|
|
|
|
|
|
|
|
|
|
pageReq := *req
|
|
|
|
|
|
pageReq.PageInfo = &PageInfo{
|
|
|
|
|
|
CurrentPage: currentPage,
|
|
|
|
|
|
PageSize: pageSize,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-08 17:26:00 +08:00
|
|
|
|
result, pageLogID, err := s.syncSinglePageWithTaskForConcurrent(egCtx, &pageReq, config.UseMock, config.MaxRetries, pageTaskID, currentPage, currentPage == 1)
|
2026-04-08 16:00:54 +08:00
|
|
|
|
pageDuration := time.Since(pageStartTime).Milliseconds()
|
|
|
|
|
|
|
|
|
|
|
|
pageResult := &PageSyncResult{
|
|
|
|
|
|
PageNumber: currentPage,
|
|
|
|
|
|
Success: false,
|
|
|
|
|
|
RecordCount: 0,
|
|
|
|
|
|
DurationMs: pageDuration,
|
|
|
|
|
|
ErrorMessage: "",
|
|
|
|
|
|
RetryCount: 0,
|
2026-04-08 17:26:00 +08:00
|
|
|
|
PageTaskLogID: pageLogID,
|
2026-04-08 16:00:54 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
logrus.Errorf("第 %d 页同步失败:%v", currentPage, err)
|
|
|
|
|
|
pageResult.ErrorMessage = err.Error()
|
|
|
|
|
|
|
|
|
|
|
|
pageResultsMu.Lock()
|
|
|
|
|
|
pageResults[currentPage-1] = pageResult
|
|
|
|
|
|
pageResultsMu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
newFailedCount := atomic.AddInt64(&failedPages, 1)
|
|
|
|
|
|
if int(newFailedCount) > config.MaxRetries {
|
|
|
|
|
|
logrus.Warnf("失败页数超过阈值 %d", config.MaxRetries)
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if result.SumSuccess {
|
|
|
|
|
|
sumMu.Lock()
|
|
|
|
|
|
if !sumSuccess {
|
|
|
|
|
|
sumSuccess = true
|
|
|
|
|
|
sumID = result.SumID
|
|
|
|
|
|
logrus.Infof("✓ 汇总数据已保存,ID=%d(来自第 %d 页)", result.SumID, currentPage)
|
|
|
|
|
|
}
|
|
|
|
|
|
sumMu.Unlock()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if result.DetailSuccess && result.DetailCount > 0 {
|
|
|
|
|
|
pageResult.Success = true
|
|
|
|
|
|
pageResult.RecordCount = result.DetailCount
|
|
|
|
|
|
|
|
|
|
|
|
atomic.AddInt32(&totalDetailCount, int32(result.DetailCount))
|
|
|
|
|
|
atomic.AddInt64(&successPages, 1)
|
|
|
|
|
|
|
|
|
|
|
|
logrus.Debugf("✓ 第 %d 页完成:%d 条明细,耗时 %dms", currentPage, result.DetailCount, pageDuration)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
pageResultsMu.Lock()
|
|
|
|
|
|
pageResults[currentPage-1] = pageResult
|
|
|
|
|
|
pageResultsMu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
})
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if err := eg.Wait(); err != nil {
|
|
|
|
|
|
logrus.Errorf("协程组执行出错:%v", err)
|
|
|
|
|
|
updateParentLog("failed", err.Error(), "CONCURRENT_SYNC_FAILED", nil)
|
|
|
|
|
|
return &SyncResult{
|
|
|
|
|
|
SumSuccess: sumSuccess,
|
|
|
|
|
|
SumID: sumID,
|
|
|
|
|
|
TaskLogID: parentLogID,
|
|
|
|
|
|
PageResults: pageResults,
|
|
|
|
|
|
Error: err,
|
|
|
|
|
|
}, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
actualResults := make([]*PageSyncResult, 0, len(pageResults))
|
|
|
|
|
|
for _, pr := range pageResults {
|
|
|
|
|
|
if pr != nil {
|
|
|
|
|
|
actualResults = append(actualResults, pr)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
finalDetailCount := atomic.LoadInt32(&totalDetailCount)
|
|
|
|
|
|
finalSuccessPages := atomic.LoadInt64(&successPages)
|
|
|
|
|
|
finalFailedPages := atomic.LoadInt64(&failedPages)
|
|
|
|
|
|
|
|
|
|
|
|
logrus.Infof("并发同步完成 - 成功:%d页, 失败:%d页, 总明细:%d条",
|
|
|
|
|
|
finalSuccessPages, finalFailedPages, finalDetailCount)
|
|
|
|
|
|
|
|
|
|
|
|
aggregatedResult := &SyncResult{
|
|
|
|
|
|
SumSuccess: sumSuccess,
|
|
|
|
|
|
SumID: sumID,
|
|
|
|
|
|
TaskLogID: parentLogID,
|
|
|
|
|
|
DetailCount: int(finalDetailCount),
|
|
|
|
|
|
DetailSuccessCount: finalSuccessPages,
|
|
|
|
|
|
DetailFailCount: finalFailedPages,
|
|
|
|
|
|
PageResults: actualResults,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if finalFailedPages > 0 {
|
|
|
|
|
|
logrus.Warnf("存在 %d 个失败的页面,主任务标记为部分失败", finalFailedPages)
|
|
|
|
|
|
|
|
|
|
|
|
summary := map[string]interface{}{
|
|
|
|
|
|
"sum_id": sumID,
|
|
|
|
|
|
"detail_count": finalDetailCount,
|
|
|
|
|
|
"total_pages": totalPages,
|
|
|
|
|
|
"success_pages": finalSuccessPages,
|
|
|
|
|
|
"failed_pages": finalFailedPages,
|
|
|
|
|
|
"page_results": actualResults,
|
|
|
|
|
|
}
|
|
|
|
|
|
updateParentLog("partial_failed", fmt.Sprintf("%d 个页面同步失败", finalFailedPages), "PAGE_SYNC_FAILED", summary)
|
|
|
|
|
|
} else {
|
|
|
|
|
|
logrus.Info("✓ 所有页面同步成功")
|
|
|
|
|
|
|
|
|
|
|
|
summary := map[string]interface{}{
|
|
|
|
|
|
"sum_id": sumID,
|
|
|
|
|
|
"detail_count": finalDetailCount,
|
|
|
|
|
|
"total_pages": totalPages,
|
|
|
|
|
|
"success_pages": finalSuccessPages,
|
|
|
|
|
|
"failed_pages": 0,
|
|
|
|
|
|
"page_results": actualResults,
|
|
|
|
|
|
}
|
|
|
|
|
|
updateParentLog("success", "", "", summary)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return aggregatedResult, nil
|
|
|
|
|
|
}
|
2026-04-08 17:26:00 +08:00
|
|
|
|
|
|
|
|
|
|
func (s *SyncService) fetchCurrentData(req *AccountReportRequest, useMock bool) *AccountReportData {
|
|
|
|
|
|
if useMock {
|
|
|
|
|
|
responseData := s.mockGen.GenerateAccountReportResponseWithPage(req.PageInfo.CurrentPage, req.PageInfo.PageSize)
|
|
|
|
|
|
if responseData != nil && responseData.Data != nil {
|
|
|
|
|
|
return responseData.Data
|
|
|
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
respBytes, err := s.httpClient.Post(context.Background(), "/rest/openapi/gw/esp/report/accountReport", req)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
responseData := &AccountReportResponse{}
|
|
|
|
|
|
if err := json.Unmarshal(respBytes, responseData); err != nil {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if responseData.Code == 0 && responseData.Data != nil {
|
|
|
|
|
|
return responseData.Data
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (s *SyncService) saveSumData(ctx context.Context, item *dto.CidAccountReportSumItem) (*dto.CreateCidAccountReportSumRes, error) {
|
|
|
|
|
|
return copydata.CidAccountReportDetail.CreateSum(ctx, item)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (s *SyncService) saveDetailData(ctx context.Context, items []*dto.CidAccountReportDetailItem) (*dto.BatchCreateCidAccountReportDetailRes, error) {
|
|
|
|
|
|
req := &dto.BatchCreateCidAccountReportDetailReq{
|
|
|
|
|
|
Items: items,
|
|
|
|
|
|
}
|
|
|
|
|
|
return copydata.CidAccountReportDetail.BatchCreate(ctx, req)
|
|
|
|
|
|
}
|