package tencent import ( "context" dao "dataengine/dao/tencent" dto "dataengine/model/dto/tencent" entity "dataengine/model/entity/tencent" "encoding/json" "fmt" "io" "net/http" "time" "github.com/gogf/gf/v2/frame/g" "github.com/sirupsen/logrus" ) type accountRelationService struct{} var AccountRelationService = new(accountRelationService) // API响应结构 type accountRelationResponse struct { Code int `json:"code"` Message string `json:"message"` Data struct { List []struct { AccountID int64 `json:"account_id"` CorporationName string `json:"corporation_name"` CommentDataList json.RawMessage `json:"comment_data_list"` IsAdx bool `json:"is_adx"` IsBid bool `json:"is_bid"` IsMp bool `json:"is_mp"` } `json:"list"` PageInfo struct { Page int `json:"page"` PageSize int `json:"page_size"` TotalNumber int `json:"total_number"` TotalPage int `json:"total_page"` } `json:"page_info"` } `json:"data"` TraceID string `json:"trace_id"` } // SyncAll 同步所有账户关系数据(自动分页) func (s *accountRelationService) SyncAll(ctx context.Context, req *dto.SyncAccountRelationReq) (res *dto.SyncAccountRelationRes, err error) { // 获取access_token accessToken := req.AccessToken if accessToken == "" { accessToken = g.Cfg().MustGet(ctx, "tencent.oauth.access_token").String() } if accessToken == "" { return nil, fmt.Errorf("access_token不能为空") } res = &dto.SyncAccountRelationRes{} totalSynced := 0 // 先获取第一页,得到总页数 firstPageData, err := s.fetchPage(ctx, accessToken, 1, 100) if err != nil { return nil, fmt.Errorf("获取第一页数据失败: %w", err) } totalPage := firstPageData.Data.PageInfo.TotalPage res.TotalNumber = firstPageData.Data.PageInfo.TotalNumber res.TotalPage = totalPage logrus.Infof("开始同步腾讯广告账户关系 - 总页数: %d, 总记录数: %d", totalPage, res.TotalNumber) // 处理第一页数据 synced, err := s.savePageData(ctx, firstPageData) if err != nil { logrus.Errorf("保存第一页数据失败: %v", err) } totalSynced += synced // 循环获取剩余页 for page := 2; page <= totalPage; page++ { logrus.Infof("正在获取第 %d/%d 页...", page, totalPage) pageData, err := s.fetchPage(ctx, accessToken, page, 100) if err != nil { logrus.Errorf("获取第 %d 页失败: %v,继续下一页", page, err) continue } synced, err := s.savePageData(ctx, pageData) if err != nil { logrus.Errorf("保存第 %d 页数据失败: %v", page, err) continue } totalSynced += synced // 避免请求过快,休眠100ms time.Sleep(100 * time.Millisecond) } res.SyncedCount = totalSynced res.Message = fmt.Sprintf("同步完成,共处理 %d 条记录", totalSynced) logrus.Infof("同步完成 - 总页数: %d, 总记录数: %d, 成功同步: %d", totalPage, res.TotalNumber, totalSynced) return res, nil } // fetchPage 获取单页数据 func (s *accountRelationService) fetchPage(ctx context.Context, accessToken string, page, pageSize int) (*accountRelationResponse, error) { timestamp := time.Now().Unix() // 使用时间戳+随机数生成唯一的nonce nonce := fmt.Sprintf("%d_%d", timestamp, time.Now().UnixNano()) url := fmt.Sprintf("https://api.e.qq.com/v3.0/organization_account_relation/get?"+ "access_token=%s×tamp=%d&nonce=%s&pagination_mode=PAGINATION_MODE_NORMAL&page=%d&page_size=%d", accessToken, timestamp, nonce, page, pageSize) httpReq, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return nil, fmt.Errorf("创建请求失败: %w", err) } client := &http.Client{Timeout: 30 * time.Second} resp, err := client.Do(httpReq) if err != nil { return nil, fmt.Errorf("请求失败: %w", err) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("读取响应失败: %w", err) } var result accountRelationResponse if err := json.Unmarshal(body, &result); err != nil { return nil, fmt.Errorf("解析响应失败: %w", err) } if result.Code != 0 { return nil, fmt.Errorf("API错误: code=%d, message=%s", result.Code, result.Message) } return &result, nil } // savePageData 保存单页数据到数据库 func (s *accountRelationService) savePageData(ctx context.Context, data *accountRelationResponse) (int, error) { if len(data.Data.List) == 0 { return 0, nil } logrus.Infof("准备保存 %d 条账户关系数据", len(data.Data.List)) var items []*entity.AccountRelation for _, item := range data.Data.List { commentJSON := "{}" if len(item.CommentDataList) > 0 { commentJSON = string(item.CommentDataList) } accountRelation := &entity.AccountRelation{ AccountID: item.AccountID, CorporationName: item.CorporationName, CommentDataList: commentJSON, IsAdx: item.IsAdx, IsBid: item.IsBid, IsMp: item.IsMp, } // 设置 TenantID(框架将0视为空值,所以使用1) accountRelation.TenantId = 1 items = append(items, accountRelation) } logrus.Infof("调用 BatchUpsert...") successCount, err := dao.AccountRelation.BatchUpsert(ctx, items) logrus.Infof("BatchUpsert 返回: successCount=%d, err=%v", successCount, err) return successCount, err } // ListAll 获取所有账户关系 func (s *accountRelationService) ListAll(ctx context.Context) ([]entity.AccountRelation, error) { return dao.AccountRelation.ListAll(ctx) }