Files
data-engine/service/sync/dynamic_sync.go
2026-05-29 18:39:32 +08:00

659 lines
19 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package sync
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
consts "dataengine/consts/public"
dao "dataengine/dao/copydata"
taskDto "dataengine/model/dto/copydata"
entity "dataengine/model/entity/dict"
"gitea.com/red-future/common/db/gfdb"
"github.com/sirupsen/logrus"
)
// SyncResult 同步结果
type SyncResult struct {
TableName string
TotalPages int
TotalRows int
InsertedRows int
Duration string
}
// PrefetchConfig 预取配置
type PrefetchConfig struct {
URL string `json:"url"`
Method string `json:"method"`
ResponsePath string `json:"response_path"`
TargetParam string `json:"target_param"`
ValueField string `json:"value_field"`
}
// SyncByConfig 执行同步
func SyncByConfig(ctx context.Context, platformCode, interfaceCode string, isFullSync bool) (*SyncResult, error) {
start := time.Now()
pm := &PlatformManager{}
platform, ifaces, err := pm.GetPlatformWithInterfaces(ctx, platformCode)
if err != nil {
return nil, fmt.Errorf("读取平台配置失败: %w", err)
}
var iface *entity.ApiInterface
for i := range ifaces {
if ifaces[i].Code == interfaceCode {
iface = &ifaces[i]
break
}
}
if iface == nil {
return nil, fmt.Errorf("未找到接口 [%s]", interfaceCode)
}
if iface.TableDefinition == nil || len(iface.TableDefinition) == 0 {
return nil, fmt.Errorf("接口 [%s] 未配置 table_definition", interfaceCode)
}
td, err := ParseTableDefinition(iface.TableDefinition)
if err != nil {
return nil, fmt.Errorf("解析表结构失败: %w", err)
}
if err := EnsureTable(ctx, td); err != nil {
return nil, fmt.Errorf("建表失败: %w", err)
}
// 检查上次同步状态(在标记 running 之前检查)
prevStatus := getSyncStatus(ctx, platformCode, interfaceCode)
lastSyncTime := int64(0)
if !isFullSync {
lastSyncTime = getLastSyncTime(ctx, platformCode, interfaceCode)
}
if prevStatus == "running" {
logrus.Warnf("检测到上次同步异常中断 [%s/%s],将重新全量同步", platformCode, interfaceCode)
lastSyncTime = 0
}
// 标记同步开始(保留 last_sync_time 不变,状态设为 running
markSyncRunning(ctx, platformCode, interfaceCode, lastSyncTime)
api := NewApiClient(platform)
prefetch := parsePrefetchConfig(iface.RequestConfig)
if prefetch != nil {
return syncWithPrefetch(ctx, api, platform, iface, ifaces, td, prefetch, isFullSync, lastSyncTime, start)
}
return syncSingleAPI(ctx, api, platform, iface, td, lastSyncTime, start)
}
// paramsInQuery 判断参数是否应放在 URL 查询字符串中
func paramsInQuery(iface *entity.ApiInterface) bool {
if iface.Method == "GET" {
return true
}
if iface.RequestConfig != nil {
if loc, _ := iface.RequestConfig["parameters_location"].(string); loc == "query" {
return true
}
}
return false
}
// syncSingleAPI 单接口分页同步
func syncSingleAPI(ctx context.Context, api *ApiClient, platform *PlatformConfig, iface *entity.ApiInterface, td *TableDefinition, lastSyncTime int64, start time.Time) (*SyncResult, error) {
pageSize := GetSyncPageSize(ctx)
if ps, ok := iface.RequestConfig["page_size"].(float64); ok {
pageSize = int(ps)
}
inQuery := paramsInQuery(iface)
method := string(iface.Method)
body := buildReqBody(iface, 1, pageSize, lastSyncTime, nil)
resp, err := api.Request(ctx, method, iface.Url, body, inQuery)
if err != nil {
recordFailure(ctx, platform.PlatformCode, iface.Code, err.Error())
return nil, fmt.Errorf("获取第一页失败: %w", err)
}
rows, totalPages, maxTime, err := parseResp(resp.Body, iface.ResponseConfig)
if err != nil {
return nil, err
}
result := &SyncResult{TableName: td.TableName, TotalPages: totalPages}
inserted, _ := savePage(ctx, td, rows)
result.InsertedRows += inserted
result.TotalRows += len(rows)
for page := 2; page <= totalPages; page++ {
body := buildReqBody(iface, page, pageSize, lastSyncTime, nil)
resp, err := api.Request(ctx, method, iface.Url, body, inQuery)
if err != nil {
logrus.Errorf("第 %d 页失败: %v", page, err)
continue
}
rows, _, mt, err := parseResp(resp.Body, iface.ResponseConfig)
if err != nil {
continue
}
inserted, _ = savePage(ctx, td, rows)
result.InsertedRows += inserted
result.TotalRows += len(rows)
if mt > maxTime {
maxTime = mt
}
time.Sleep(100 * time.Millisecond)
}
if maxTime <= 0 {
maxTime = time.Now().Unix()
}
updateSyncTime(ctx, platform.PlatformCode, iface.Code, maxTime)
result.Duration = fmt.Sprintf("%.1fs", time.Since(start).Seconds())
logrus.Infof("同步完成 - 表:%s, %d条, 写入%d条, 耗时%s", td.TableName, result.TotalRows, result.InsertedRows, result.Duration)
return result, nil
}
// syncWithPrefetch 预取模式同步(先分页拉取全部实体列表,再并发处理每个实体)
func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformConfig, iface *entity.ApiInterface, allIfaces []entity.ApiInterface, td *TableDefinition, prefetch *PrefetchConfig, isFullSync bool, lastSyncTime int64, start time.Time) (*SyncResult, error) {
logrus.Infof("预取模式: %s -> %s", prefetch.URL, iface.Url)
// 1. 查找匹配 prefetch URL 的接口配置(用于获取正确的请求参数)
prefetchIface := findInterfaceByURL(allIfaces, prefetch.URL)
prefetchParams := buildPrefetchParams(iface)
if prefetchIface != nil && prefetchIface.RequestConfig != nil {
// 使用 prefetch 目标接口的 request_config 重建参数(覆盖默认值)
for k, v := range prefetchIface.RequestConfig {
if k == "headers" || k == "prefetch" || k == "page_param" ||
k == "page_size_param" || k == "time_field" || k == "parameters_location" ||
k == "filtering" || k == "group_by" || k == "date_range" {
continue
}
prefetchParams[k] = v
}
}
method := strings.ToUpper(prefetch.Method)
inQuery := paramsInQuery(iface)
allEntities := make([]interface{}, 0)
allRows := make([]map[string]interface{}, 0)
prefetchPage := 1
prefetchTotalPages := 1
for prefetchPage <= prefetchTotalPages {
params := make(map[string]interface{})
for k, v := range prefetchParams {
params[k] = v
}
pageParam := "page"
if p, ok := iface.RequestConfig["page_param"].(string); ok {
pageParam = p
}
params[pageParam] = prefetchPage
resp, err := api.Request(ctx, method, prefetch.URL, params, true)
if err != nil {
return nil, fmt.Errorf("预取第 %d 页失败: %w", prefetchPage, err)
}
entities, _, _, err := parseResp(resp.Body, nil)
if err != nil {
return nil, fmt.Errorf("解析预取第 %d 页响应失败: %w", prefetchPage, err)
}
// 收集完整数据行(用于存库)和提取的 ID 值(用于遍历)
for _, item := range entities {
allRows = append(allRows, item)
if prefetch.ValueField == "" {
allEntities = append(allEntities, item)
} else if v, ok := item[prefetch.ValueField]; ok {
// 将 float64 转 int64避免后续 URL 参数中出现科学计数法
if f, ok := v.(float64); ok {
allEntities = append(allEntities, int64(f))
} else {
allEntities = append(allEntities, v)
}
}
}
if prefetchPage == 1 {
if tp := getTotalPages(resp.Body); tp > 0 {
prefetchTotalPages = tp
} else {
break
}
}
prefetchPage++
time.Sleep(50 * time.Millisecond)
}
if len(allEntities) == 0 {
logrus.Warn("预取结果为空列表,跳过同步")
return &SyncResult{TableName: td.TableName, Duration: fmt.Sprintf("%.1fs", time.Since(start).Seconds())}, nil
}
logrus.Infof("预取到 %d 个实体(共 %d 页)", len(allEntities), prefetchPage-1)
// 2. 将预取的数据也存入库(如账户列表存入 tencent_account_relation
if prefetchIface != nil && prefetchIface.TableDefinition != nil {
prefetchTd, err := ParseTableDefinition(prefetchIface.TableDefinition)
if err == nil {
if ensureErr := EnsureTable(ctx, prefetchTd); ensureErr == nil {
saved, _ := savePage(ctx, prefetchTd, allRows)
logrus.Infof("预取数据已存库: %s, %d 条", prefetchTd.TableName, saved)
}
}
}
// 2. 并发处理每个实体的数据
result := &SyncResult{TableName: td.TableName}
pageSize := GetSyncPageSize(ctx)
if ps, ok := iface.RequestConfig["page_size"].(float64); ok {
pageSize = int(ps)
}
dataMethod := string(iface.Method)
concurrency := GetSyncConcurrency(ctx)
var mu sync.Mutex
var wg sync.WaitGroup
sem := make(chan struct{}, concurrency)
globalMaxTime := lastSyncTime
for idx, entityVal := range allEntities {
wg.Add(1)
sem <- struct{}{}
go func(idx int, val interface{}) {
defer wg.Done()
defer func() { <-sem }()
logrus.Infof(" 处理实体 [%d/%d]: %v", idx+1, len(allEntities), val)
page := 1
totalPages := 1
entityMaxTime := int64(0)
for page <= totalPages {
body := buildReqBody(iface, page, pageSize, lastSyncTime, map[string]interface{}{
prefetch.TargetParam: val,
})
resp, err := api.Request(ctx, dataMethod, iface.Url, body, inQuery)
if err != nil {
logrus.Errorf(" 实体 %v 第 %d 页失败: %v", val, page, err)
page++
time.Sleep(200 * time.Millisecond)
continue
}
rows, tp, mt, parseErr := parseResp(resp.Body, iface.ResponseConfig)
if parseErr != nil {
logrus.Errorf(" 解析响应失败: %v", parseErr)
page++
continue
}
if page == 1 {
totalPages = tp
}
for i := range rows {
rows[i][prefetch.TargetParam] = val
}
inserted, _ := savePage(ctx, td, rows)
mu.Lock()
result.InsertedRows += inserted
result.TotalRows += len(rows)
mu.Unlock()
if mt > entityMaxTime {
entityMaxTime = mt
}
page++
time.Sleep(100 * time.Millisecond)
}
if entityMaxTime > 0 {
mu.Lock()
if entityMaxTime > globalMaxTime {
globalMaxTime = entityMaxTime
}
mu.Unlock()
}
}(idx, entityVal)
}
wg.Wait()
if globalMaxTime <= 0 {
globalMaxTime = time.Now().Unix()
}
updateSyncTime(ctx, platform.PlatformCode, iface.Code, globalMaxTime)
result.Duration = fmt.Sprintf("%.1fs", time.Since(start).Seconds())
logrus.Infof("同步完成 - 表:%s, %d条, 写入%d条, 耗时%s", td.TableName, result.TotalRows, result.InsertedRows, result.Duration)
return result, nil
}
// getTotalPages 从响应中提取总页数
func getTotalPages(raw []byte) int {
var r struct {
Data map[string]interface{} `json:"data"`
}
if err := json.Unmarshal(raw, &r); err != nil {
return 0
}
if r.Data == nil {
return 0
}
if pi, ok := r.Data["page_info"].(map[string]interface{}); ok {
if tp, ok := pi["total_page"].(float64); ok {
return int(tp)
}
}
return 0
}
// buildPrefetchParams 构建预取接口的请求参数
func buildPrefetchParams(iface *entity.ApiInterface) map[string]interface{} {
params := make(map[string]interface{})
if iface.RequestConfig != nil {
pageParam := "page"
psParam := "page_size"
if p, ok := iface.RequestConfig["page_param"].(string); ok {
pageParam = p
}
if p, ok := iface.RequestConfig["page_size_param"].(string); ok {
psParam = p
}
params[pageParam] = 1
params[psParam] = 100
for k, v := range iface.RequestConfig {
if k == "headers" || k == "prefetch" || k == "page_param" ||
k == "page_size_param" || k == "time_field" || k == "parameters_location" ||
k == "filtering" || k == "group_by" || k == "date_range" {
continue
}
if k == pageParam || k == psParam {
continue
}
params[k] = v
}
}
return params
}
// parsePrefetchConfig 解析预取配置
func parsePrefetchConfig(requestConfig map[string]interface{}) *PrefetchConfig {
if requestConfig == nil {
return nil
}
raw, ok := requestConfig["prefetch"]
if !ok || raw == nil {
return nil
}
m, ok := raw.(map[string]interface{})
if !ok {
return nil
}
pc := &PrefetchConfig{}
if u, _ := m["url"].(string); u != "" {
pc.URL = u
} else {
return nil
}
if method, _ := m["method"].(string); method != "" {
pc.Method = method
} else {
pc.Method = "GET"
}
pc.ResponsePath, _ = m["response_path"].(string)
pc.TargetParam, _ = m["target_param"].(string)
pc.ValueField, _ = m["value_field"].(string)
return pc
}
// extractValues 从 JSON 响应中提取值列表
func extractValues(raw []byte, path, valueField string) ([]interface{}, error) {
var resp map[string]interface{}
if err := json.Unmarshal(raw, &resp); err != nil {
return nil, fmt.Errorf("JSON解析失败: %w", err)
}
parts := strings.Split(path, ".")
current := resp
for i, part := range parts {
if i == len(parts)-1 {
list, ok := current[part].([]interface{})
if !ok {
return nil, fmt.Errorf("路径 %s 不是数组", path)
}
var values []interface{}
for _, item := range list {
if valueField == "" {
values = append(values, item)
} else if m, ok := item.(map[string]interface{}); ok {
if v, exists := m[valueField]; exists {
values = append(values, v)
}
}
}
return values, nil
}
next, ok := current[part].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("路径 %s 在 %s 处中断", path, part)
}
current = next
}
return nil, fmt.Errorf("路径 %s 不完整", path)
}
// buildReqBody 构建请求参数
func buildReqBody(iface *entity.ApiInterface, page, pageSize int, lastSyncTime int64, extraParams map[string]interface{}) map[string]interface{} {
body := make(map[string]interface{})
if iface.RequestConfig != nil {
for k, v := range iface.RequestConfig {
if k == "time_field" || k == "headers" || k == "prefetch" ||
k == "page_param" || k == "page_size_param" || k == "parameters_location" {
continue
}
body[k] = v
}
}
pageParam := "page"
psParam := "page_size"
if iface.RequestConfig != nil {
if p, ok := iface.RequestConfig["page_param"].(string); ok {
pageParam = p
}
if p, ok := iface.RequestConfig["page_size_param"].(string); ok {
psParam = p
}
}
body[pageParam] = page
body[psParam] = pageSize
// 增量同步:将 time_field 转为 API 期望的 filtering 格式
// 如 filtering=[{"field":"last_modified_time","operator":"GREATER_EQUALS","values":["1780037982"]}]
if lastSyncTime > 0 {
if tf, ok := iface.RequestConfig["time_field"].(string); ok && tf != "" {
timeFilter := map[string]interface{}{
"field": tf,
"operator": "GREATER_EQUALS",
"values": []interface{}{fmt.Sprintf("%d", lastSyncTime)},
}
// 合并已有的 filtering如果 request_config 中已定义其他过滤条件)
if existing, ok := body["filtering"].([]interface{}); ok {
body["filtering"] = append(existing, timeFilter)
} else {
body["filtering"] = []interface{}{timeFilter}
}
}
}
for k, v := range extraParams {
body[k] = v
}
return body
}
// parseResp 解析同步接口返回值
func parseResp(raw []byte, responseConfig map[string]interface{}) ([]map[string]interface{}, int, int64, error) {
var r struct {
Code int `json:"code"`
Message string `json:"message"`
Data map[string]interface{} `json:"data"`
}
if err := json.Unmarshal(raw, &r); err != nil {
return nil, 0, 0, fmt.Errorf("解析响应失败: %w", err)
}
if r.Code != 0 {
return nil, 0, 0, fmt.Errorf("API错误: code=%d, message=%s", r.Code, r.Message)
}
var rows []map[string]interface{}
totalPages := 1
maxTime := int64(0)
var listData []interface{}
if lp, ok := r.Data["list"]; ok {
listData, _ = lp.([]interface{})
} else if lp, ok := r.Data["data"]; ok {
if m, ok := lp.(map[string]interface{}); ok {
if l, ok := m["list"].([]interface{}); ok {
listData = l
}
}
}
for _, item := range listData {
if m, ok := item.(map[string]interface{}); ok {
j, _ := json.Marshal(m)
m["raw_data"] = string(j)
if t, ok := m["last_modified_time"].(float64); ok && int64(t) > maxTime {
maxTime = int64(t)
}
if t, ok := m["created_time"].(float64); ok && int64(t) > maxTime {
maxTime = int64(t)
}
rows = append(rows, m)
}
}
if pi, ok := r.Data["page_info"].(map[string]interface{}); ok {
if tp, ok := pi["total_page"].(float64); ok {
totalPages = int(tp)
} else if tp, ok := pi["total_page"].(int); ok {
totalPages = tp
}
}
return rows, totalPages, maxTime, nil
}
func savePage(ctx context.Context, td *TableDefinition, rows []map[string]interface{}) (int, error) {
if len(rows) == 0 {
return 0, nil
}
colSet := make(map[string]bool)
for _, c := range td.Columns {
colSet[c.Name] = true
}
var clean []map[string]interface{}
for _, row := range rows {
c := make(map[string]interface{})
for k, v := range row {
if colSet[k] {
c[k] = v
}
}
if r, ok := row["raw_data"]; ok {
c["raw_data"] = r
}
clean = append(clean, c)
}
return InsertRows(ctx, td.TableName, td.ConflictKeys, clean)
}
func getLastSyncTime(ctx context.Context, platformCode, interfaceCode string) int64 {
var t int64
gfdb.DB(ctx).Model(ctx, consts.SyncTrackerTable).
Fields("last_sync_time").
Where("platform_code", platformCode).
Where("interface_code", interfaceCode).
Scan(&t)
return t
}
func getSyncStatus(ctx context.Context, platformCode, interfaceCode string) string {
var s string
gfdb.DB(ctx).Model(ctx, consts.SyncTrackerTable).
Fields("sync_status").
Where("platform_code", platformCode).
Where("interface_code", interfaceCode).
Scan(&s)
return s
}
func markSyncRunning(ctx context.Context, platformCode, interfaceCode string, lastSyncTime int64) {
gfdb.DB(ctx).Model(ctx, consts.SyncTrackerTable).
Data(map[string]interface{}{
"platform_code": platformCode,
"interface_code": interfaceCode,
"last_sync_time": lastSyncTime,
"sync_status": "running",
}).
OnConflict("platform_code", "interface_code").
Save()
}
func updateSyncTime(ctx context.Context, platformCode, interfaceCode string, t int64) {
gfdb.DB(ctx).Model(ctx, consts.SyncTrackerTable).
Data(map[string]interface{}{
"platform_code": platformCode,
"interface_code": interfaceCode,
"last_sync_time": t,
"last_sync_at": time.Now(),
"sync_status": "success",
}).
OnConflict("platform_code", "interface_code").
Save()
}
func recordFailure(ctx context.Context, platformCode, interfaceCode, errMsg string) {
dao.SyncTaskLog.Create(ctx, &taskDto.CreateSyncTaskLogReq{
TaskID: fmt.Sprintf("%s_%s_%d", platformCode, interfaceCode, time.Now().UnixNano()),
TaskType: fmt.Sprintf("%s_%s", platformCode, interfaceCode),
PlatformCode: platformCode,
InterfaceCode: interfaceCode,
Status: "failed",
MaxRetry: 3,
RequestParams: map[string]interface{}{
"platform_code": platformCode,
"interface_code": interfaceCode,
"error": errMsg,
},
})
}
// findInterfaceByURL 在所有接口中查找匹配 URL 的接口
func findInterfaceByURL(ifaces []entity.ApiInterface, url string) *entity.ApiInterface {
for i := range ifaces {
if ifaces[i].Url == url {
return &ifaces[i]
}
}
return nil
}