Files
data-engine/sync/sync_service.go

437 lines
13 KiB
Go
Raw Normal View History

2026-04-07 09:51:32 +08:00
package sync
import (
2026-04-08 09:03:20 +08:00
dao "cid/dao/copydata"
2026-04-07 09:51:32 +08:00
dto "cid/model/dto/copydata"
2026-04-08 09:03:20 +08:00
taskDto "cid/model/dto/copydata"
2026-04-07 09:51:32 +08:00
"cid/service/copydata"
"context"
"encoding/json"
"fmt"
"time"
"github.com/sirupsen/logrus"
)
type SyncService struct {
httpClient *HttpClient
converter *DataConverter
mockGen *MockDataGenerator
}
func NewSyncService() *SyncService {
return &SyncService{
httpClient: NewHttpClient("https://ad.e.kuaishou.com", 0),
converter: NewDataConverter(),
mockGen: NewMockDataGenerator(),
}
}
type SyncResult struct {
2026-04-08 09:03:20 +08:00
SumSuccess bool `json:"sum_success"`
SumID int64 `json:"sum_id"`
DetailSuccess bool `json:"detail_success"`
DetailCount int `json:"detail_count"`
DetailSuccessCount int64 `json:"detail_success_count"`
DetailFailCount int64 `json:"detail_fail_count"`
Error error `json:"error"`
TaskLogID int64 `json:"task_log_id"`
PageResults []*PageSyncResult `json:"page_results,omitempty"`
2026-04-07 09:51:32 +08:00
}
2026-04-08 09:03:20 +08:00
type PageSyncResult struct {
PageNumber int `json:"page_number"`
PageTaskLogID int64 `json:"page_task_log_id"`
Success bool `json:"success"`
RecordCount int `json:"record_count"`
DurationMs int64 `json:"duration_ms"`
ErrorMessage string `json:"error_message,omitempty"`
RetryCount int `json:"retry_count"`
}
func (s *SyncService) SyncAccountReport(ctx context.Context, req *AccountReportRequest, useMock bool) (*SyncResult, error) {
2026-04-07 09:51:32 +08:00
result := &SyncResult{}
2026-04-08 09:03:20 +08:00
var responseData *AccountReportResponse
2026-04-07 09:51:32 +08:00
if useMock {
2026-04-08 09:03:20 +08:00
logrus.Info("使用 Mock 数据同步快手广告账户报表")
responseData = s.mockGen.GenerateAccountReportResponse()
2026-04-07 09:51:32 +08:00
} else {
2026-04-08 09:03:20 +08:00
logrus.Info("从真实 API 同步快手广告账户报表")
respBytes, err := s.httpClient.Post(ctx, "/rest/openapi/gw/esp/report/accountReport", req)
2026-04-07 09:51:32 +08:00
if err != nil {
result.Error = fmt.Errorf("调用 API 失败:%w", err)
return result, result.Error
}
2026-04-08 09:03:20 +08:00
responseData = &AccountReportResponse{}
2026-04-07 09:51:32 +08:00
if err := json.Unmarshal(respBytes, responseData); err != nil {
result.Error = fmt.Errorf("解析响应失败:%w", err)
return result, result.Error
}
if responseData.Code != 0 {
result.Error = fmt.Errorf("API 返回错误code=%d, message=%s", responseData.Code, responseData.Message)
return result, result.Error
}
}
if responseData.Data.Sum != nil {
2026-04-08 09:03:20 +08:00
sumItem := s.converter.ConvertToSumItem(responseData.Data.Sum, "account_report", req.PageInfo.CurrentPage)
2026-04-07 09:51:32 +08:00
sumResult, saveErr := s.saveSumData(ctx, sumItem)
if saveErr != nil {
logrus.Errorf("保存汇总数据失败:%v", saveErr)
result.Error = fmt.Errorf("保存汇总数据失败:%w", saveErr)
} else {
result.SumSuccess = true
result.SumID = sumResult.Id
logrus.Infof("成功保存汇总数据ID=%d", sumResult.Id)
}
}
if len(responseData.Data.Detail) > 0 {
2026-04-08 09:03:20 +08:00
detailItems := s.converter.ConvertToDetailItems(responseData.Data.Detail, "account_report", req.PageInfo.CurrentPage)
2026-04-07 09:51:32 +08:00
detailResult, saveErr := s.saveDetailData(ctx, detailItems)
if saveErr != nil {
logrus.Errorf("保存明细数据失败:%v", saveErr)
result.Error = fmt.Errorf("保存明细数据失败:%w", saveErr)
} else {
result.DetailSuccess = true
result.DetailCount = len(detailItems)
result.DetailSuccessCount = detailResult.SuccessCount
result.DetailFailCount = detailResult.FailCount
2026-04-08 09:03:20 +08:00
logrus.Infof("成功保存 %d 条明细数据(成功=%d, 失败=%d", len(detailItems), detailResult.SuccessCount, detailResult.FailCount)
2026-04-07 09:51:32 +08:00
}
}
return result, result.Error
}
2026-04-08 09:03:20 +08:00
// SyncAccountReportWithPagination 带分页处理的同步方法(支持全量数据抽取)
func (s *SyncService) SyncAccountReportWithPagination(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int) (*SyncResult, error) {
startTime := time.Now()
parentTaskID := fmt.Sprintf("%d_%d_account", req.AdvertiserID, req.StartTime)
logReq := &taskDto.CreateSyncTaskLogReq{
TaskID: parentTaskID,
TaskType: "account_report",
AdvertiserID: req.AdvertiserID,
StartTime: time.UnixMilli(req.StartTime),
EndTime: time.UnixMilli(req.EndTime),
Status: "pending",
MaxRetry: maxRetries,
RequestParams: req,
}
parentLogID, err := dao.SyncTaskLog.Create(ctx, logReq)
if err != nil {
logrus.Errorf("创建主任务日志失败:%v", err)
}
updateParentLog := func(status, errMsg, errorCode string, summary interface{}) {
if parentLogID == 0 {
return
}
duration := time.Since(startTime).Milliseconds()
updateReq := &taskDto.UpdateSyncTaskLogReq{
ID: parentLogID,
Status: status,
ErrorMessage: errMsg,
ErrorCode: errorCode,
DurationMs: &duration,
}
if status == "success" || status == "manual_review" {
completedAt := time.Now()
updateReq.CompletedAt = completedAt
}
if summary != nil {
updateReq.ResultSummary = summary
}
if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil {
logrus.Errorf("更新主任务日志失败:%v", err)
}
}
updateParentLog("running", "", "", nil)
2026-04-07 09:51:32 +08:00
aggregatedResult := &SyncResult{
2026-04-08 09:03:20 +08:00
SumSuccess: false,
SumID: 0,
TaskLogID: parentLogID,
PageResults: make([]*PageSyncResult, 0),
2026-04-07 09:51:32 +08:00
}
totalCount := 0
currentPage := 1
pageSize := 100
2026-04-08 09:03:20 +08:00
successPages := 0
failedPages := 0
2026-04-07 09:51:32 +08:00
if req.PageInfo == nil {
req.PageInfo = &PageInfo{}
}
2026-04-08 09:03:20 +08:00
var totalPages int
2026-04-07 09:51:32 +08:00
for {
logrus.Infof(">>> 正在同步第 %d 页数据...", currentPage)
req.PageInfo.CurrentPage = currentPage
req.PageInfo.PageSize = pageSize
2026-04-08 09:03:20 +08:00
pageTaskID := fmt.Sprintf("%s_page_%d", parentTaskID, currentPage)
pageStartTime := time.Now()
pageResult := &PageSyncResult{
PageNumber: currentPage,
Success: false,
RecordCount: 0,
RetryCount: 0,
}
result, err := s.syncSinglePageWithTask(ctx, req, useMock, maxRetries, pageTaskID, currentPage)
pageDuration := time.Since(pageStartTime).Milliseconds()
pageResult.DurationMs = pageDuration
2026-04-07 09:51:32 +08:00
if err != nil {
logrus.Errorf("第 %d 页同步失败:%v", currentPage, err)
2026-04-08 09:03:20 +08:00
pageResult.ErrorMessage = err.Error()
failedPages++
aggregatedResult.PageResults = append(aggregatedResult.PageResults, pageResult)
if failedPages > maxRetries {
logrus.Warnf("失败页数超过阈值 %d终止同步", maxRetries)
break
}
currentPage++
time.Sleep(500 * time.Millisecond)
continue
2026-04-07 09:51:32 +08:00
}
if result.SumSuccess && aggregatedResult.SumID == 0 {
aggregatedResult.SumSuccess = true
aggregatedResult.SumID = result.SumID
logrus.Infof("✓ 汇总数据已保存ID=%d", result.SumID)
}
if result.DetailSuccess && result.DetailCount > 0 {
2026-04-08 09:03:20 +08:00
totalCount += result.DetailCount
pageResult.Success = true
pageResult.RecordCount = result.DetailCount
successPages++
logrus.Infof("✓ 第 %d 页获取到 %d 条明细数据,累计 %d 条", currentPage, result.DetailCount, totalCount)
2026-04-07 09:51:32 +08:00
}
2026-04-08 09:03:20 +08:00
aggregatedResult.PageResults = append(aggregatedResult.PageResults, pageResult)
2026-04-07 09:51:32 +08:00
currentData := s.fetchCurrentData(req, useMock)
if currentData != nil && currentData.TotalCount > 0 {
2026-04-08 09:03:20 +08:00
totalPages = (currentData.TotalCount + pageSize - 1) / pageSize
2026-04-07 09:51:32 +08:00
logrus.Infof("总记录数:%d, 总页数:%d, 当前页:%d/%d",
currentData.TotalCount, totalPages, currentPage, totalPages)
if currentPage >= totalPages {
logrus.Infof("✓ 已同步所有页面数据,共 %d 页,%d 条记录", totalPages, currentData.TotalCount)
break
}
}
if result.DetailCount < pageSize {
logrus.Infof("✓ 当前页数据不足 %d 条,已到达最后一页", pageSize)
break
}
currentPage++
time.Sleep(300 * time.Millisecond)
}
2026-04-08 09:03:20 +08:00
logrus.Infof("分页同步完成 - 成功:%d页, 失败:%d页, 总明细:%d条",
successPages, failedPages, totalCount)
// 统计所有子任务的结果
totalDetailCount := 0
var totalSuccessCount int64
var totalFailCount int64
for _, pageResult := range aggregatedResult.PageResults {
if pageResult.Success {
totalDetailCount += pageResult.RecordCount
totalSuccessCount++
2026-04-07 09:51:32 +08:00
} else {
2026-04-08 09:03:20 +08:00
totalFailCount++
}
}
aggregatedResult.DetailCount = totalDetailCount
aggregatedResult.DetailSuccessCount = totalSuccessCount
aggregatedResult.DetailFailCount = totalFailCount
if failedPages > 0 {
logrus.Warnf("存在 %d 个失败的页面,主任务标记为部分失败", failedPages)
summary := map[string]interface{}{
"sum_id": aggregatedResult.SumID,
"detail_count": totalDetailCount,
"total_pages": totalPages,
"success_pages": successPages,
"failed_pages": failedPages,
"page_results": aggregatedResult.PageResults,
2026-04-07 09:51:32 +08:00
}
2026-04-08 09:03:20 +08:00
updateParentLog("partial_failed", fmt.Sprintf("%d 个页面同步失败", failedPages), "PAGE_SYNC_FAILED", summary)
2026-04-07 09:51:32 +08:00
} else {
2026-04-08 09:03:20 +08:00
logrus.Info("✓ 所有页面同步成功")
summary := map[string]interface{}{
"sum_id": aggregatedResult.SumID,
"detail_count": totalDetailCount,
"total_pages": totalPages,
"success_pages": successPages,
"failed_pages": 0,
"page_results": aggregatedResult.PageResults,
}
updateParentLog("success", "", "", summary)
2026-04-07 09:51:32 +08:00
}
return aggregatedResult, aggregatedResult.Error
}
2026-04-08 09:03:20 +08:00
func (s *SyncService) SyncSinglePageWithTask(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int, pageTaskID string, pageNumber int) (*SyncResult, error) {
return s.syncSinglePageWithTask(ctx, req, useMock, maxRetries, pageTaskID, pageNumber)
}
func (s *SyncService) syncSinglePageWithTask(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int, pageTaskID string, pageNumber int) (*SyncResult, error) {
pageStartTime := time.Now()
pageLogReq := &taskDto.CreateSyncTaskLogReq{
TaskID: pageTaskID,
TaskType: "account_report_page",
AdvertiserID: req.AdvertiserID,
StartTime: time.UnixMilli(req.StartTime),
EndTime: time.UnixMilli(req.EndTime),
Status: "pending",
MaxRetry: maxRetries,
PageInfo: req.PageInfo,
RequestParams: map[string]interface{}{
"page_number": pageNumber,
"page_size": req.PageInfo.PageSize,
},
2026-04-07 09:51:32 +08:00
}
2026-04-08 09:03:20 +08:00
pageLogID, err := dao.SyncTaskLog.Create(ctx, pageLogReq)
2026-04-07 09:51:32 +08:00
if err != nil {
2026-04-08 09:03:20 +08:00
logrus.Errorf("创建分页任务日志失败:%v", err)
2026-04-07 09:51:32 +08:00
}
2026-04-08 09:03:20 +08:00
updatePageLog := func(status, errMsg, errorCode string, retryCount int) {
if pageLogID == 0 {
return
}
duration := time.Since(pageStartTime).Milliseconds()
updateReq := &taskDto.UpdateSyncTaskLogReq{
ID: pageLogID,
Status: status,
ErrorMessage: errMsg,
ErrorCode: errorCode,
DurationMs: &duration,
}
if retryCount > 0 {
updateReq.RetryCount = &retryCount
}
if status == "success" || status == "failed" {
completedAt := time.Now()
updateReq.CompletedAt = completedAt
}
if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil {
logrus.Errorf("更新分页任务日志失败:%v", err)
}
2026-04-07 09:51:32 +08:00
}
2026-04-08 09:03:20 +08:00
updatePageLog("running", "", "", 0)
logrus.Infof(">>> 开始同步第 %d 页数据...", pageNumber)
result, err := s.SyncWithRetry(ctx, req, useMock, maxRetries)
if err != nil {
updatePageLog("failed", err.Error(), "PAGE_SYNC_FAILED", 0)
return result, err
2026-04-07 09:51:32 +08:00
}
2026-04-08 09:03:20 +08:00
summary := map[string]interface{}{
"page_number": pageNumber,
"detail_count": result.DetailCount,
"sum_saved": result.SumSuccess,
}
updatePageLog("success", "", "", 0)
logrus.Debugf("分页任务 %s 完成: %v", pageTaskID, summary)
return result, nil
2026-04-07 09:51:32 +08:00
}
2026-04-08 09:03:20 +08:00
func (s *SyncService) fetchCurrentData(req *AccountReportRequest, useMock bool) *AccountReportData {
2026-04-07 09:51:32 +08:00
if useMock {
2026-04-08 09:03:20 +08:00
responseData := s.mockGen.GenerateAccountReportResponse()
2026-04-07 09:51:32 +08:00
if responseData != nil && responseData.Data != nil {
return responseData.Data
}
return nil
}
2026-04-08 09:03:20 +08:00
respBytes, err := s.httpClient.Post(context.Background(), "/rest/openapi/gw/esp/report/accountReport", req)
2026-04-07 09:51:32 +08:00
if err != nil {
return nil
}
2026-04-08 09:03:20 +08:00
responseData := &AccountReportResponse{}
2026-04-07 09:51:32 +08:00
if err := json.Unmarshal(respBytes, responseData); err != nil {
return nil
}
if responseData.Code == 0 && responseData.Data != nil {
return responseData.Data
}
return nil
}
func (s *SyncService) saveSumData(ctx context.Context, item *dto.CidAccountReportSumItem) (*dto.CreateCidAccountReportSumRes, error) {
return copydata.CidAccountReportDetail.CreateSum(ctx, item)
}
func (s *SyncService) saveDetailData(ctx context.Context, items []*dto.CidAccountReportDetailItem) (*dto.BatchCreateCidAccountReportDetailRes, error) {
req := &dto.BatchCreateCidAccountReportDetailReq{
Items: items,
}
return copydata.CidAccountReportDetail.BatchCreate(ctx, req)
}
2026-04-08 09:03:20 +08:00
func (s *SyncService) SyncWithRetry(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int) (*SyncResult, error) {
2026-04-07 09:51:32 +08:00
var lastResult *SyncResult
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
2026-04-08 09:03:20 +08:00
result, err := s.SyncAccountReport(ctx, req, useMock)
2026-04-07 09:51:32 +08:00
lastResult = result
lastErr = err
if err == nil {
logrus.Infof("同步成功,尝试次数:%d", attempt+1)
return result, nil
}
logrus.Warnf("同步失败,第 %d 次重试,错误:%v", attempt+1, err)
}
return lastResult, lastErr
}