Files
data-engine/service/tencent/account_relation_service.go
2026-05-06 16:19:22 +08:00

187 lines
5.5 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package tencent
import (
"context"
dao "dataengine/dao/tencent"
dto "dataengine/model/dto/tencent"
entity "dataengine/model/entity/tencent"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"github.com/gogf/gf/v2/frame/g"
"github.com/sirupsen/logrus"
)
type accountRelationService struct{}
var AccountRelationService = new(accountRelationService)
// API响应结构
type accountRelationResponse struct {
Code int `json:"code"`
Message string `json:"message"`
Data struct {
List []struct {
AccountID int64 `json:"account_id"`
CorporationName string `json:"corporation_name"`
CommentDataList json.RawMessage `json:"comment_data_list"`
IsAdx bool `json:"is_adx"`
IsBid bool `json:"is_bid"`
IsMp bool `json:"is_mp"`
} `json:"list"`
PageInfo struct {
Page int `json:"page"`
PageSize int `json:"page_size"`
TotalNumber int `json:"total_number"`
TotalPage int `json:"total_page"`
} `json:"page_info"`
} `json:"data"`
TraceID string `json:"trace_id"`
}
// SyncAll 同步所有账户关系数据(自动分页)
func (s *accountRelationService) SyncAll(ctx context.Context, req *dto.SyncAccountRelationReq) (res *dto.SyncAccountRelationRes, err error) {
// 获取access_token
accessToken := req.AccessToken
if accessToken == "" {
accessToken = g.Cfg().MustGet(ctx, "tencent.oauth.access_token").String()
}
if accessToken == "" {
return nil, fmt.Errorf("access_token不能为空")
}
res = &dto.SyncAccountRelationRes{}
totalSynced := 0
// 先获取第一页,得到总页数
firstPageData, err := s.fetchPage(ctx, accessToken, 1, 100)
if err != nil {
return nil, fmt.Errorf("获取第一页数据失败: %w", err)
}
totalPage := firstPageData.Data.PageInfo.TotalPage
res.TotalNumber = firstPageData.Data.PageInfo.TotalNumber
res.TotalPage = totalPage
logrus.Infof("开始同步腾讯广告账户关系 - 总页数: %d, 总记录数: %d", totalPage, res.TotalNumber)
// 处理第一页数据
synced, err := s.savePageData(ctx, firstPageData)
if err != nil {
logrus.Errorf("保存第一页数据失败: %v", err)
}
totalSynced += synced
// 循环获取剩余页
for page := 2; page <= totalPage; page++ {
logrus.Infof("正在获取第 %d/%d 页...", page, totalPage)
pageData, err := s.fetchPage(ctx, accessToken, page, 100)
if err != nil {
logrus.Errorf("获取第 %d 页失败: %v继续下一页", page, err)
continue
}
synced, err := s.savePageData(ctx, pageData)
if err != nil {
logrus.Errorf("保存第 %d 页数据失败: %v", page, err)
continue
}
totalSynced += synced
// 避免请求过快休眠100ms
time.Sleep(100 * time.Millisecond)
}
res.SyncedCount = totalSynced
res.Message = fmt.Sprintf("同步完成,共处理 %d 条记录", totalSynced)
logrus.Infof("同步完成 - 总页数: %d, 总记录数: %d, 成功同步: %d", totalPage, res.TotalNumber, totalSynced)
return res, nil
}
// fetchPage 获取单页数据
func (s *accountRelationService) fetchPage(ctx context.Context, accessToken string, page, pageSize int) (*accountRelationResponse, error) {
timestamp := time.Now().Unix()
// 使用时间戳+随机数生成唯一的nonce
nonce := fmt.Sprintf("%d_%d", timestamp, time.Now().UnixNano())
url := fmt.Sprintf("https://api.e.qq.com/v3.0/organization_account_relation/get?"+
"access_token=%s&timestamp=%d&nonce=%s&pagination_mode=PAGINATION_MODE_NORMAL&page=%d&page_size=%d",
accessToken, timestamp, nonce, page, pageSize)
httpReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, fmt.Errorf("创建请求失败: %w", err)
}
client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("请求失败: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("读取响应失败: %w", err)
}
var result accountRelationResponse
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("解析响应失败: %w", err)
}
if result.Code != 0 {
return nil, fmt.Errorf("API错误: code=%d, message=%s", result.Code, result.Message)
}
return &result, nil
}
// savePageData 保存单页数据到数据库
func (s *accountRelationService) savePageData(ctx context.Context, data *accountRelationResponse) (int, error) {
if len(data.Data.List) == 0 {
return 0, nil
}
logrus.Infof("准备保存 %d 条账户关系数据", len(data.Data.List))
var items []*entity.AccountRelation
for _, item := range data.Data.List {
commentJSON := "{}"
if len(item.CommentDataList) > 0 {
commentJSON = string(item.CommentDataList)
}
accountRelation := &entity.AccountRelation{
AccountID: item.AccountID,
CorporationName: item.CorporationName,
CommentDataList: commentJSON,
IsAdx: item.IsAdx,
IsBid: item.IsBid,
IsMp: item.IsMp,
}
// 设置 TenantID框架将0视为空值所以使用1
accountRelation.TenantId = 1
items = append(items, accountRelation)
}
logrus.Infof("调用 BatchUpsert...")
successCount, err := dao.AccountRelation.BatchUpsert(ctx, items)
logrus.Infof("BatchUpsert 返回: successCount=%d, err=%v", successCount, err)
return successCount, err
}
// ListAll 获取所有账户关系
func (s *accountRelationService) ListAll(ctx context.Context) ([]entity.AccountRelation, error) {
return dao.AccountRelation.ListAll(ctx)
}