新增快手平台和对应的接口

This commit is contained in:
2026-06-01 14:08:17 +08:00
parent 15db71b7ba
commit 812693caae
14 changed files with 1529 additions and 185 deletions

View File

@@ -3,13 +3,16 @@ package sync
import (
"bytes"
"context"
"crypto/md5"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math/big"
"net/http"
"net/url"
"sort"
"strings"
"time"
@@ -24,8 +27,9 @@ type ApiResult struct {
// ApiClient 通用 API 客户端
type ApiClient struct {
config *PlatformConfig
client *http.Client
config *PlatformConfig
client *http.Client
rateLimiter <-chan time.Time // 限流 ticker
}
// NewApiClient 创建客户端
@@ -34,10 +38,17 @@ func NewApiClient(config *PlatformConfig) *ApiClient {
if config.RequestTimeoutMs > 0 {
timeout = time.Duration(config.RequestTimeoutMs) * time.Millisecond
}
return &ApiClient{
ac := &ApiClient{
config: config,
client: &http.Client{Timeout: timeout},
}
// 初始化限流
if config.RateLimitPerMinute > 0 {
interval := time.Minute / time.Duration(config.RateLimitPerMinute)
ac.rateLimiter = time.Tick(interval)
logrus.Infof("限流已启用: %d 次/分钟, 间隔 %v", config.RateLimitPerMinute, interval)
}
return ac
}
// Get 发送 GET 请求(无参数)
@@ -85,6 +96,15 @@ func (c *ApiClient) doRequest(ctx context.Context, method, path string, body int
}
func (c *ApiClient) execute(ctx context.Context, method, path string, body interface{}, paramsInQuery bool) (*ApiResult, error) {
// 限流等待
if c.rateLimiter != nil {
select {
case <-c.rateLimiter:
case <-ctx.Done():
return nil, ctx.Err()
}
}
start := time.Now()
fullURL := c.config.GetApiUrl(path)
@@ -104,6 +124,8 @@ func (c *ApiClient) execute(ctx context.Context, method, path string, body inter
}
}
// 计算签名并追加(如快手 API 的 MD5 签名)
fullURL = c.applySignature(fullURL, body, paramsInQuery)
logrus.Infof("请求 URL: %s", fullURL)
req, err := http.NewRequestWithContext(ctx, method, fullURL, reqBody)
@@ -192,6 +214,7 @@ func (c *ApiClient) applyAuthURL(rawURL string) string {
for k, v := range eq {
val := fmt.Sprintf("%v", v)
val = strings.ReplaceAll(val, "{timestamp}", fmt.Sprintf("%d", time.Now().Unix()))
val = strings.ReplaceAll(val, "{timestamp_ms}", fmt.Sprintf("%d", time.Now().UnixMilli()))
val = strings.ReplaceAll(val, "{nonce}", generateNonce())
extraParams[k] = val
}
@@ -250,3 +273,58 @@ func generateNonce() string {
r, _ := rand.Int(rand.Reader, big.NewInt(10000))
return fmt.Sprintf("%012d%04d", nanoPart, r.Int64())
}
// applySignature 计算签名并追加到 URL支持快手等平台的 MD5 签名)
func (c *ApiClient) applySignature(rawURL string, body interface{}, paramsInQuery bool) string {
cfg := c.config.AuthConfig
if cfg == nil {
return rawURL
}
signAlgo, _ := cfg["sign_algorithm"].(string)
if signAlgo == "" {
return rawURL
}
appSecret, _ := cfg["app_secret"].(string)
if appSecret == "" && c.config.AppSecret != "" {
appSecret = c.config.AppSecret
}
if appSecret == "" {
return rawURL
}
parsed, _ := url.Parse(rawURL)
q := parsed.Query()
// 收集所有参数并按 key 排序
keys := make([]string, 0, len(q))
for k := range q {
if k == "sign" {
continue
}
keys = append(keys, k)
}
sort.Strings(keys)
var signStr string
for _, k := range keys {
signStr += k + "=" + q.Get(k) + "&"
}
signStr += "key=" + appSecret
var sign string
switch signAlgo {
case "md5":
h := md5.Sum([]byte(signStr))
sign = hex.EncodeToString(h[:])
case "md5_upper":
h := md5.Sum([]byte(signStr))
sign = strings.ToUpper(hex.EncodeToString(h[:]))
default:
return rawURL
}
q.Set("sign", sign)
parsed.RawQuery = q.Encode()
return parsed.String()
}

View File

@@ -17,6 +17,9 @@ import (
"github.com/sirupsen/logrus"
)
// syncRunningMap 防止同一个接口被并发执行同步
var syncRunningMap sync.Map
// SyncResult 同步结果
type SyncResult struct {
TableName string
@@ -37,6 +40,14 @@ type PrefetchConfig struct {
// SyncByConfig 执行同步
func SyncByConfig(ctx context.Context, platformCode, interfaceCode string, isFullSync bool) (*SyncResult, error) {
// 内存锁:防止同一个接口被并发执行(两个调度周期重叠时跳过)
lockKey := platformCode + "/" + interfaceCode
if _, loaded := syncRunningMap.LoadOrStore(lockKey, true); loaded {
logrus.Warnf("接口 [%s] 正在同步中,跳过重复请求", lockKey)
return nil, fmt.Errorf("接口 [%s] 正在同步中,跳过", lockKey)
}
defer syncRunningMap.Delete(lockKey)
start := time.Now()
pm := &PlatformManager{}
@@ -87,7 +98,7 @@ func SyncByConfig(ctx context.Context, platformCode, interfaceCode string, isFul
if prefetch != nil {
return syncWithPrefetch(ctx, api, platform, iface, ifaces, td, prefetch, isFullSync, lastSyncTime, start)
}
return syncSingleAPI(ctx, api, platform, iface, td, lastSyncTime, start)
return syncSingleAPI(ctx, api, platform, iface, td, isFullSync, lastSyncTime, start)
}
// paramsInQuery 判断参数是否应放在 URL 查询字符串中
@@ -104,24 +115,39 @@ func paramsInQuery(iface *entity.ApiInterface) bool {
}
// syncSingleAPI 单接口分页同步
func syncSingleAPI(ctx context.Context, api *ApiClient, platform *PlatformConfig, iface *entity.ApiInterface, td *TableDefinition, lastSyncTime int64, start time.Time) (*SyncResult, error) {
func syncSingleAPI(ctx context.Context, api *ApiClient, platform *PlatformConfig, iface *entity.ApiInterface, td *TableDefinition, isFullSync bool, lastSyncTime int64, start time.Time) (*SyncResult, error) {
pageSize := GetSyncPageSize(ctx)
if ps, ok := iface.RequestConfig["page_size"].(float64); ok {
pageSize = int(ps)
}
taskType := "incremental"
if isFullSync || lastSyncTime <= 0 {
taskType = "full"
}
inQuery := paramsInQuery(iface)
method := string(iface.Method)
body := buildReqBody(iface, 1, pageSize, lastSyncTime, nil)
// 游标分页首次请求需要 cursor=""(通过 extraParams 覆盖 buildReqBody 的 page=1 赋值)
firstExtra := map[string]interface{}{}
if isCursorPagination(iface) {
cp := "cursor"
if p, ok := iface.RequestConfig["page_param"].(string); ok && p != "" {
cp = p
}
firstExtra[cp] = ""
}
body := buildReqBody(iface, 1, pageSize, lastSyncTime, firstExtra)
resp, err := api.Request(ctx, method, iface.Url, body, inQuery)
if err != nil {
recordFailure(ctx, platform.PlatformCode, iface.Code, err.Error())
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, err.Error())
return nil, fmt.Errorf("获取第一页失败: %w", err)
}
rows, totalPages, maxTime, err := parseResp(resp.Body, iface.ResponseConfig)
rows, totalPages, maxTime, nextCursor, err := parseRespExt(resp.Body, iface.ResponseConfig)
if err != nil {
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("解析第一页响应失败: %v", err))
return nil, err
}
@@ -130,24 +156,68 @@ func syncSingleAPI(ctx context.Context, api *ApiClient, platform *PlatformConfig
result.InsertedRows += inserted
result.TotalRows += len(rows)
for page := 2; page <= totalPages; page++ {
body := buildReqBody(iface, page, pageSize, lastSyncTime, nil)
resp, err := api.Request(ctx, method, iface.Url, body, inQuery)
if err != nil {
logrus.Errorf("第 %d 页失败: %v", page, err)
continue
// 游标分页
if isCursorPagination(iface) {
for nextCursor != "" && nextCursor != "nomore" {
cp := "cursor"
if p, ok := iface.RequestConfig["page_param"].(string); ok && p != "" {
cp = p
}
body := buildReqBody(iface, 1, pageSize, lastSyncTime, map[string]interface{}{
cp: nextCursor,
})
resp, err := api.Request(ctx, method, iface.Url, body, inQuery)
if err != nil {
logrus.Errorf("游标 %s 请求失败: %v", nextCursor, err)
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("游标 %s 请求失败: %v", nextCursor, err))
break
}
rows, _, mt, nc, pe := parseRespExt(resp.Body, iface.ResponseConfig)
if pe != nil {
logrus.Errorf("游标 %s 解析失败: %v", nextCursor, pe)
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("游标 %s 解析失败: %v", nextCursor, pe))
break
}
if len(rows) == 0 {
break
}
nextCursor = nc
inserted, _ = savePage(ctx, td, rows)
result.InsertedRows += inserted
result.TotalRows += len(rows)
if mt > maxTime {
maxTime = mt
}
result.TotalPages++
time.Sleep(100 * time.Millisecond)
}
rows, _, mt, err := parseResp(resp.Body, iface.ResponseConfig)
if err != nil {
continue
} else {
// 普通分页
for page := 2; page <= totalPages; page++ {
body := buildReqBody(iface, page, pageSize, lastSyncTime, nil)
resp, err := api.Request(ctx, method, iface.Url, body, inQuery)
if err != nil {
logrus.Errorf("第 %d 页请求失败: %v", page, err)
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("第 %d 页请求失败: %v", page, err))
continue
}
rows, _, mt, _, pe := parseRespExt(resp.Body, iface.ResponseConfig)
if pe != nil {
logrus.Errorf("第 %d 页解析失败: %v", page, pe)
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("第 %d 页解析失败: %v", page, pe))
continue
}
inserted, _ = savePage(ctx, td, rows)
result.InsertedRows += inserted
result.TotalRows += len(rows)
if mt > maxTime {
maxTime = mt
}
time.Sleep(100 * time.Millisecond)
}
inserted, _ = savePage(ctx, td, rows)
result.InsertedRows += inserted
result.TotalRows += len(rows)
if mt > maxTime {
maxTime = mt
}
time.Sleep(100 * time.Millisecond)
}
if maxTime <= 0 {
@@ -160,84 +230,150 @@ func syncSingleAPI(ctx context.Context, api *ApiClient, platform *PlatformConfig
return result, nil
}
func isCursorPagination(iface *entity.ApiInterface) bool {
if iface.RequestConfig == nil {
return false
}
cp, _ := iface.RequestConfig["cursor_pagination"].(bool)
return cp
}
// collectPrefetchEntities 从 rows 中收集实体和行数据
func collectPrefetchEntities(rows []map[string]interface{}, prefetch *PrefetchConfig, allEntities *[]interface{}, allRows *[]map[string]interface{}) {
for _, item := range rows {
*allRows = append(*allRows, item)
if prefetch.ValueField == "" {
*allEntities = append(*allEntities, item)
} else if v, ok := item[prefetch.ValueField]; ok {
if f, ok := v.(float64); ok {
*allEntities = append(*allEntities, int64(f))
} else {
*allEntities = append(*allEntities, v)
}
}
}
}
// syncWithPrefetch 预取模式同步(先分页拉取全部实体列表,再并发处理每个实体)
func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformConfig, iface *entity.ApiInterface, allIfaces []entity.ApiInterface, td *TableDefinition, prefetch *PrefetchConfig, isFullSync bool, lastSyncTime int64, start time.Time) (*SyncResult, error) {
logrus.Infof("预取模式: %s -> %s", prefetch.URL, iface.Url)
// 1. 查找匹配 prefetch URL 的接口配置(用于获取正确的请求参数)
taskType := "incremental"
if isFullSync || lastSyncTime <= 0 {
taskType = "full"
}
// ====== 1. 预取阶段:分页拉取全部实体列表 ======
prefetchIface := findInterfaceByURL(allIfaces, prefetch.URL)
prefetchParams := buildPrefetchParams(iface)
// 判断预取来源是否游标分页,以及分页参数名
prefetchIsCursor := false
prefetchPageParam := "page"
if prefetchIface != nil && prefetchIface.RequestConfig != nil {
// 使用 prefetch 目标接口的 request_config 重建参数(覆盖默认值)
for k, v := range prefetchIface.RequestConfig {
if k == "headers" || k == "prefetch" || k == "page_param" ||
k == "page_size_param" || k == "time_field" || k == "parameters_location" ||
k == "filtering" || k == "group_by" || k == "date_range" {
continue
}
prefetchParams[k] = v
if cp, ok := prefetchIface.RequestConfig["cursor_pagination"].(bool); ok {
prefetchIsCursor = cp
}
if p, ok := prefetchIface.RequestConfig["page_param"].(string); ok && p != "" {
prefetchPageParam = p
}
}
method := strings.ToUpper(prefetch.Method)
inQuery := paramsInQuery(iface)
prefetchMethod := strings.ToUpper(prefetch.Method)
prefetchPageSize := 100
if prefetchIface != nil && prefetchIface.RequestConfig != nil {
if ps, ok := prefetchIface.RequestConfig["pageSize"].(float64); ok {
prefetchPageSize = int(ps)
}
}
// 使用 prefetch 来源接口自己的配置判断参数位置
var prefetchInQuery bool
if prefetchIface != nil {
prefetchInQuery = paramsInQuery(prefetchIface)
} else {
prefetchInQuery = paramsInQuery(iface)
}
// prefetch 来源接口的 response_config用于正确解析列表路径
var prefetchRespCfg map[string]interface{}
if prefetchIface != nil {
prefetchRespCfg = prefetchIface.ResponseConfig
}
allEntities := make([]interface{}, 0)
allRows := make([]map[string]interface{}, 0)
prefetchPage := 1
prefetchTotalPages := 1
for prefetchPage <= prefetchTotalPages {
params := make(map[string]interface{})
for k, v := range prefetchParams {
params[k] = v
}
pageParam := "page"
if p, ok := iface.RequestConfig["page_param"].(string); ok {
pageParam = p
}
params[pageParam] = prefetchPage
// 第一页(游标分页首次 cursor=""
firstExtra := make(map[string]interface{})
if prefetchIsCursor {
firstExtra[prefetchPageParam] = ""
}
prefetchReqIface := prefetchIface
if prefetchReqIface == nil {
prefetchReqIface = iface
}
body := buildReqBody(prefetchReqIface, 1, prefetchPageSize, lastSyncTime, firstExtra)
resp, err := api.Request(ctx, prefetchMethod, prefetch.URL, body, prefetchInQuery)
if err != nil {
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("预取第一页请求失败: %v", err))
return nil, fmt.Errorf("预取第一页失败: %w", err)
}
resp, err := api.Request(ctx, method, prefetch.URL, params, true)
if err != nil {
return nil, fmt.Errorf("预取第 %d 页失败: %w", prefetchPage, err)
}
rows, prefetchTotalPages, _, nextCursor, err := parseRespExt(resp.Body, prefetchRespCfg)
if err != nil {
recordFailure(ctx, platform.PlatformCode, iface.Code, taskType, fmt.Sprintf("解析预取响应失败: %v", err))
return nil, fmt.Errorf("解析预取响应失败: %w", err)
}
collectPrefetchEntities(rows, prefetch, &allEntities, &allRows)
entities, _, _, err := parseResp(resp.Body, nil)
if err != nil {
return nil, fmt.Errorf("解析预取第 %d 页响应失败: %w", prefetchPage, err)
}
// 收集完整数据行(用于存库)和提取的 ID 值(用于遍历)
for _, item := range entities {
allRows = append(allRows, item)
if prefetch.ValueField == "" {
allEntities = append(allEntities, item)
} else if v, ok := item[prefetch.ValueField]; ok {
// 将 float64 转 int64避免后续 URL 参数中出现科学计数法
if f, ok := v.(float64); ok {
allEntities = append(allEntities, int64(f))
} else {
allEntities = append(allEntities, v)
}
}
}
if prefetchPage == 1 {
if tp := getTotalPages(resp.Body); tp > 0 {
prefetchTotalPages = tp
} else {
// 分页循环
if prefetchIsCursor {
// 游标分页
for nextCursor != "" && nextCursor != "nomore" {
body := buildReqBody(prefetchReqIface, 1, prefetchPageSize, lastSyncTime, map[string]interface{}{
prefetchPageParam: nextCursor,
})
resp, err := api.Request(ctx, prefetchMethod, prefetch.URL, body, prefetchInQuery)
if err != nil {
logrus.Errorf("预取游标 %s 请求失败: %v", nextCursor, err)
break
}
rows, _, _, nc, pe := parseRespExt(resp.Body, prefetchRespCfg)
if pe != nil {
logrus.Errorf("预取游标 %s 解析失败: %v", nextCursor, pe)
break
}
if len(rows) == 0 {
break
}
nextCursor = nc
collectPrefetchEntities(rows, prefetch, &allEntities, &allRows)
time.Sleep(100 * time.Millisecond)
}
} else {
// 普通分页
for page := 2; page <= prefetchTotalPages; page++ {
body := buildReqBody(prefetchReqIface, page, prefetchPageSize, lastSyncTime, nil)
resp, err := api.Request(ctx, prefetchMethod, prefetch.URL, body, prefetchInQuery)
if err != nil {
logrus.Errorf("预取第 %d 页请求失败: %v", page, err)
continue
}
rows, _, _, _, pe := parseRespExt(resp.Body, prefetchRespCfg)
if pe != nil {
logrus.Errorf("预取第 %d 页解析失败: %v", page, pe)
continue
}
collectPrefetchEntities(rows, prefetch, &allEntities, &allRows)
time.Sleep(100 * time.Millisecond)
}
prefetchPage++
time.Sleep(50 * time.Millisecond)
}
if len(allEntities) == 0 {
logrus.Warn("预取结果为空列表,跳过同步")
return &SyncResult{TableName: td.TableName, Duration: fmt.Sprintf("%.1fs", time.Since(start).Seconds())}, nil
}
logrus.Infof("预取到 %d 个实体(共 %d 页)", len(allEntities), prefetchPage-1)
logrus.Infof("预取到 %d 个实体", len(allEntities))
// 2. 将预取的数据也存入库(如账户列表存入 tencent_account_relation
if prefetchIface != nil && prefetchIface.TableDefinition != nil {
@@ -258,6 +394,7 @@ func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformCon
}
dataMethod := string(iface.Method)
inQuery := paramsInQuery(iface)
concurrency := GetSyncConcurrency(ctx)
var mu sync.Mutex
@@ -346,21 +483,24 @@ func syncWithPrefetch(ctx context.Context, api *ApiClient, platform *PlatformCon
// getTotalPages 从响应中提取总页数
func getTotalPages(raw []byte) int {
var r struct {
Data map[string]interface{} `json:"data"`
}
if err := json.Unmarshal(raw, &r); err != nil {
rows, tp, _, _, err := parseRespExt(raw, nil)
if err != nil || len(rows) == 0 {
return 0
}
if r.Data == nil {
return 0
return tp
}
func toFloat64(v interface{}) (float64, bool) {
switch val := v.(type) {
case float64:
return val, true
case int:
return float64(val), true
case int64:
return float64(val), true
default:
return 0, false
}
if pi, ok := r.Data["page_info"].(map[string]interface{}); ok {
if tp, ok := pi["total_page"].(float64); ok {
return int(tp)
}
}
return 0
}
// buildPrefetchParams 构建预取接口的请求参数
@@ -382,7 +522,9 @@ func buildPrefetchParams(iface *entity.ApiInterface) map[string]interface{} {
for k, v := range iface.RequestConfig {
if k == "headers" || k == "prefetch" || k == "page_param" ||
k == "page_size_param" || k == "time_field" || k == "parameters_location" ||
k == "filtering" || k == "group_by" || k == "date_range" {
k == "filtering" || k == "group_by" || k == "date_range" ||
k == "body_wrapper_field" || k == "exclude_from_wrapper" ||
k == "cursor_pagination" || k == "time_field_mode" {
continue
}
if k == pageParam || k == psParam {
@@ -467,7 +609,10 @@ func buildReqBody(iface *entity.ApiInterface, page, pageSize int, lastSyncTime i
if iface.RequestConfig != nil {
for k, v := range iface.RequestConfig {
if k == "time_field" || k == "headers" || k == "prefetch" ||
k == "page_param" || k == "page_size_param" || k == "parameters_location" {
k == "page_param" || k == "page_size_param" || k == "parameters_location" ||
k == "cursor_pagination" || k == "time_field_mode" ||
k == "body_wrapper_field" || k == "exclude_from_wrapper" ||
k == "top_level_params" {
continue
}
body[k] = v
@@ -485,16 +630,33 @@ func buildReqBody(iface *entity.ApiInterface, page, pageSize int, lastSyncTime i
}
body[pageParam] = page
body[psParam] = pageSize
// 增量同步:将 time_field 转为 API 期望的 filtering 格式
// 如 filtering=[{"field":"last_modified_time","operator":"GREATER_EQUALS","values":["1780037982"]}]
if lastSyncTime > 0 {
if tf, ok := iface.RequestConfig["time_field"].(string); ok && tf != "" {
// 时间过滤处理:支持两种模式
// 1. "filtering" 模式(默认):生成 filtering=[{"field":"...","operator":"GREATER_EQUALS","values":["..."]}](腾讯)
// 2. "range" 模式:生成 beginTime/endTime + queryType快手
if tf, ok := iface.RequestConfig["time_field"].(string); ok && tf != "" {
timeMode := "filtering"
if tm, ok := iface.RequestConfig["time_field_mode"].(string); ok && tm != "" {
timeMode = tm
}
if timeMode == "range" {
// 快手模式beginTime/endTime毫秒时间戳
timeMs := lastSyncTime
if timeMs <= 0 {
// 全量默认90天前
timeMs = time.Now().Add(-90 * 24 * time.Hour).UnixMilli()
}
body["queryType"] = 2
body["beginTime"] = timeMs
body["endTime"] = time.Now().UnixMilli()
} else if lastSyncTime > 0 {
// 腾讯 filtering 模式(仅增量时)
timeFilter := map[string]interface{}{
"field": tf,
"operator": "GREATER_EQUALS",
"values": []interface{}{fmt.Sprintf("%d", lastSyncTime)},
}
// 合并已有的 filtering如果 request_config 中已定义其他过滤条件)
if existing, ok := body["filtering"].([]interface{}); ok {
body["filtering"] = append(existing, timeFilter)
} else {
@@ -502,64 +664,197 @@ func buildReqBody(iface *entity.ApiInterface, page, pageSize int, lastSyncTime i
}
}
}
for k, v := range extraParams {
body[k] = v
}
// body_wrapper_field 支持:将业务参数包装到指定字段(如快手 API 的 param JSON
if iface.RequestConfig != nil {
if wf, ok := iface.RequestConfig["body_wrapper_field"].(string); ok && wf != "" {
excludeSet := map[string]bool{"method": true, "version": true, "signMethod": true}
if excl, ok := iface.RequestConfig["exclude_from_wrapper"].([]interface{}); ok {
for _, v := range excl {
if s, ok := v.(string); ok {
excludeSet[s] = true
}
}
}
wrapperObj := make(map[string]interface{})
for k, v := range body {
if !excludeSet[k] && k != wf {
wrapperObj[k] = v
delete(body, k)
}
}
b, _ := json.Marshal(wrapperObj)
body[wf] = string(b)
}
}
return body
}
// parseResp 解析同步接口返回值
func parseResp(raw []byte, responseConfig map[string]interface{}) ([]map[string]interface{}, int, int64, error) {
var r struct {
Code int `json:"code"`
Message string `json:"message"`
Data map[string]interface{} `json:"data"`
// parseRespExt 解析响应,支持自定义成功判断和数据路径
func parseRespExt(raw []byte, rc map[string]interface{}) ([]map[string]interface{}, int, int64, string, error) {
var respMap map[string]interface{}
if err := json.Unmarshal(raw, &respMap); err != nil {
return nil, 0, 0, "", fmt.Errorf("JSON解析失败: %w", err)
}
if err := json.Unmarshal(raw, &r); err != nil {
return nil, 0, 0, fmt.Errorf("解析响应失败: %w", err)
}
if r.Code != 0 {
return nil, 0, 0, fmt.Errorf("API错误: code=%d, message=%s", r.Code, r.Message)
}
var rows []map[string]interface{}
totalPages := 1
maxTime := int64(0)
var listData []interface{}
if lp, ok := r.Data["list"]; ok {
listData, _ = lp.([]interface{})
} else if lp, ok := r.Data["data"]; ok {
if m, ok := lp.(map[string]interface{}); ok {
if l, ok := m["list"].([]interface{}); ok {
listData = l
successField, successVal := "code", float64(0)
msgField, listPath, cursorPath := "message", "data", ""
singleRecord := false
if rc != nil {
if sf, _ := rc["success_field"].(string); sf != "" {
successField = sf
}
if sv, ok := rc["success_value"]; ok {
if f, ok := toFloat64(sv); ok {
successVal = f
}
}
if mf, _ := rc["message_field"].(string); mf != "" {
msgField = mf
}
if lp, _ := rc["list_path"].(string); lp != "" {
listPath = lp
}
if cf, _ := rc["cursor_field"].(string); cf != "" {
cursorPath = cf
}
if sr, _ := rc["single_record"].(bool); sr {
singleRecord = true
}
}
if v, ok := respMap[successField]; ok {
actual, _ := toFloat64(v)
if actual != successVal {
msg, _ := respMap[msgField].(string)
return nil, 0, 0, "", fmt.Errorf("API错误: %s=%v, %s=%s", successField, v, msgField, msg)
}
}
// 解析 list_path支持最后一段是数组的情况如 data.orderList
var listData []interface{}
var dataContainer map[string]interface{}
if listPath != "" {
parts := strings.Split(listPath, ".")
cur := respMap
for i, p := range parts {
if i == len(parts)-1 {
// 最后一段:可能直接是数组,也可能是包含 list/orderList 的 map
listData, _ = cur[p].([]interface{})
if listData == nil {
if m, ok := cur[p].(map[string]interface{}); ok {
dataContainer = m
if l, ok := m["list"]; ok {
listData, _ = l.([]interface{})
}
if listData == nil {
if ol, ok := m["orderList"]; ok {
listData, _ = ol.([]interface{})
}
}
}
} else {
dataContainer = cur
}
} else {
next, ok := cur[p].(map[string]interface{})
if !ok {
return nil, 0, 0, "", fmt.Errorf("路径 %s 在 %s 处中断", listPath, p)
}
cur = next
}
}
}
if listData == nil {
if singleRecord && listPath != "" {
// 详情接口list_path 指向单个对象,包装为单元素数组
parts := strings.Split(listPath, ".")
cur := respMap
ok := true
for _, p := range parts {
if m, exists := cur[p].(map[string]interface{}); exists {
cur = m
} else {
ok = false
break
}
}
if ok {
listData = []interface{}{cur}
dataContainer = cur
}
}
}
if listData == nil {
// 回退到根级字段
listData, _ = respMap["list"].([]interface{})
if listData == nil {
listData, _ = respMap["orderList"].([]interface{})
}
dataContainer = respMap
}
var rows []map[string]interface{}
totalPages, maxTime := 1, int64(0)
for _, item := range listData {
if m, ok := item.(map[string]interface{}); ok {
// 展平嵌套 map将子 map 的字段合并到顶层(如 orderBaseInfo.oid → oid
flat := flattenRow(m)
j, _ := json.Marshal(m)
m["raw_data"] = string(j)
if t, ok := m["last_modified_time"].(float64); ok && int64(t) > maxTime {
maxTime = int64(t)
flat["raw_data"] = string(j)
for _, tf := range []string{"last_modified_time", "created_time", "update_time", "createTime", "updateTime", "lastModifiedTime"} {
if t, ok := flat[tf].(float64); ok && int64(t) > maxTime {
maxTime = int64(t)
}
}
if t, ok := m["created_time"].(float64); ok && int64(t) > maxTime {
maxTime = int64(t)
}
rows = append(rows, m)
rows = append(rows, flat)
}
}
if pi, ok := r.Data["page_info"].(map[string]interface{}); ok {
nextCursor := ""
if cursorPath != "" {
cp := strings.Split(cursorPath, ".")
cc := respMap
for i, p := range cp {
if i == len(cp)-1 {
if s, ok := cc[p].(string); ok {
nextCursor = s
}
} else if m, ok := cc[p].(map[string]interface{}); ok {
cc = m
}
}
}
if pi, ok := dataContainer["page_info"].(map[string]interface{}); ok {
if tp, ok := pi["total_page"].(float64); ok {
totalPages = int(tp)
} else if tp, ok := pi["total_page"].(int); ok {
totalPages = tp
}
}
return rows, totalPages, maxTime, nextCursor, nil
}
return rows, totalPages, maxTime, nil
// flattenRow 展平嵌套 map将子 map 的字段递归合并到顶层
// 数组类型的字段保持原样不展平
func flattenRow(m map[string]interface{}) map[string]interface{} {
result := make(map[string]interface{}, len(m))
for k, v := range m {
if sub, ok := v.(map[string]interface{}); ok {
// 子 map 递归展平后合并到顶层
for sk, sv := range flattenRow(sub) {
result[sk] = sv
}
} else {
result[k] = v
}
}
return result
}
// parseResp 兼容旧版保持4个返回值
func parseResp(raw []byte, responseConfig map[string]interface{}) ([]map[string]interface{}, int, int64, error) {
rows, tp, mt, _, err := parseRespExt(raw, responseConfig)
return rows, tp, mt, err
}
func savePage(ctx context.Context, td *TableDefinition, rows []map[string]interface{}) (int, error) {
@@ -631,18 +926,17 @@ func updateSyncTime(ctx context.Context, platformCode, interfaceCode string, t i
Save()
}
func recordFailure(ctx context.Context, platformCode, interfaceCode, errMsg string) {
func recordFailure(ctx context.Context, platformCode, interfaceCode, taskType, errMsg string) {
dao.SyncTaskLog.Create(ctx, &taskDto.CreateSyncTaskLogReq{
TaskID: fmt.Sprintf("%s_%s_%d", platformCode, interfaceCode, time.Now().UnixNano()),
TaskType: fmt.Sprintf("%s_%s", platformCode, interfaceCode),
TaskType: taskType,
PlatformCode: platformCode,
InterfaceCode: interfaceCode,
Status: "failed",
MaxRetry: 3,
StartTime: time.Now(),
RequestParams: map[string]interface{}{
"platform_code": platformCode,
"interface_code": interfaceCode,
"error": errMsg,
"error": errMsg,
},
})
}

View File

@@ -15,6 +15,8 @@ import (
type PlatformConfig struct {
*entity.DatasourcePlatform
AccessToken string
AppKey string
AppSecret string
}
// GetApiUrl 拼接完整 API URL
@@ -52,6 +54,15 @@ func (m *PlatformManager) GetPlatform(ctx context.Context, platformCode string)
}
case "API_KEY":
cfg.AccessToken = platform.ApiKey
case "SIGN":
if platform.AuthConfig != nil {
if ak, _ := platform.AuthConfig["app_key"].(string); ak != "" {
cfg.AppKey = ak
}
if as, _ := platform.AuthConfig["app_secret"].(string); as != "" {
cfg.AppSecret = as
}
}
default:
logrus.Warnf("平台 %s 认证类型 %s 未处理", platformCode, platform.AuthType)
}

View File

@@ -12,26 +12,15 @@ import (
"github.com/sirupsen/logrus"
)
// StartAutoSync 启动自动同步(独立 goroutine启动后自动循环执行
// StartAutoSync 启动自动同步(独立 goroutine每次完成后等待 interval 再执行下一次
func StartAutoSync(ctx context.Context) {
interval := GetSyncInterval(ctx)
logrus.Infof("自动同步调度器启动,间隔: %d 分钟", interval)
// 首次执行:根据 sync_tracker 是否有记录自动判断全量/增量
// 无记录 → 全量,有记录 → 增量
runAutoSync(ctx)
ticker := time.NewTicker(time.Duration(interval) * time.Minute)
defer ticker.Stop()
logrus.Infof("自动同步调度器启动,间隔: %d 分钟(完成一次后开始计时)", interval)
for {
select {
case <-ticker.C:
runAutoSync(ctx)
case <-ctx.Done():
logrus.Info("自动同步调度器已停止")
return
}
runAutoSync(ctx)
logrus.Infof("自动同步完成,等待 %d 分钟后执行下一次", interval)
time.Sleep(time.Duration(interval) * time.Minute)
}
}

View File

@@ -86,18 +86,29 @@ func buildCreateSQL(td *TableDefinition) string {
}
cols = append(cols, "raw_data JSONB DEFAULT '{}'")
// 添加复合唯一索引(用于 ON CONFLICT upsert,所有 conflict_keys 作为一个复合索引
// 添加复合唯一索引(用于 ON CONFLICT upsert
var constraints []string
if len(td.ConflictKeys) > 0 {
cols := strings.Join(td.ConflictKeys, ", ")
ck := strings.Join(td.ConflictKeys, ", ")
indexName := fmt.Sprintf("uq_%s_conflict", td.TableName)
constraints = append(constraints,
fmt.Sprintf("CREATE UNIQUE INDEX IF NOT EXISTS %s ON %s (%s)", indexName, td.TableName, cols))
fmt.Sprintf("CREATE UNIQUE INDEX IF NOT EXISTS %s ON %s (%s)", indexName, td.TableName, ck))
}
sql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (\n %s\n);\n", td.TableName, strings.Join(cols, ",\n "))
// 添加唯一索引
if len(constraints) > 0 {
sql += strings.Join(constraints, ";\n") + ";"
sql += strings.Join(constraints, ";\n") + ";\n"
}
// 添加字段注释COMMENT ON COLUMN
for _, c := range td.Columns {
if c.Comment != "" {
escaped := strings.ReplaceAll(c.Comment, "'", "''")
sql += fmt.Sprintf("COMMENT ON COLUMN %s.%s IS '%s';\n", td.TableName, c.Name, escaped)
}
}
return sql
}