2026-04-08 09:03:20 +08:00
|
|
|
|
package copydata
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"context"
|
2026-04-30 13:45:41 +08:00
|
|
|
|
consts "dataengine/consts/public"
|
|
|
|
|
|
dto "dataengine/model/dto/copydata"
|
|
|
|
|
|
entity "dataengine/model/entity/copydata"
|
2026-04-08 09:03:20 +08:00
|
|
|
|
"time"
|
|
|
|
|
|
|
2026-06-10 15:56:02 +08:00
|
|
|
|
"gitea.redpowerfuture.com/red-future/common/db/gfdb"
|
2026-04-08 09:03:20 +08:00
|
|
|
|
"github.com/gogf/gf/v2/database/gdb"
|
|
|
|
|
|
"github.com/gogf/gf/v2/util/gconv"
|
2026-04-08 14:30:09 +08:00
|
|
|
|
"github.com/sirupsen/logrus"
|
2026-04-08 09:03:20 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
var SyncTaskLog = new(SyncTaskLogDao)
|
|
|
|
|
|
|
|
|
|
|
|
type SyncTaskLogDao struct{}
|
|
|
|
|
|
|
2026-04-08 14:30:09 +08:00
|
|
|
|
// Create 创建任务日志(如果task_id已存在则返回现有ID)
|
2026-04-08 09:03:20 +08:00
|
|
|
|
func (d *SyncTaskLogDao) Create(ctx context.Context, req *dto.CreateSyncTaskLogReq) (int64, error) {
|
2026-04-08 14:30:09 +08:00
|
|
|
|
existingTask, err := d.GetByTaskID(ctx, req.TaskID, req.TaskType)
|
|
|
|
|
|
if err == nil && existingTask != nil {
|
|
|
|
|
|
logrus.Debugf("任务日志已存在,task_id=%s, task_type=%s, id=%d", req.TaskID, req.TaskType, existingTask.Id)
|
|
|
|
|
|
return existingTask.Id, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-09 13:43:15 +08:00
|
|
|
|
data := map[string]interface{}{
|
|
|
|
|
|
"task_id": req.TaskID,
|
|
|
|
|
|
"task_type": req.TaskType,
|
2026-05-29 18:39:32 +08:00
|
|
|
|
"platform_code": req.PlatformCode,
|
|
|
|
|
|
"interface_code": req.InterfaceCode,
|
2026-04-09 13:43:15 +08:00
|
|
|
|
"advertiser_id": req.AdvertiserID,
|
|
|
|
|
|
"start_time": req.StartTime,
|
|
|
|
|
|
"end_time": req.EndTime,
|
|
|
|
|
|
"status": req.Status,
|
|
|
|
|
|
"max_retry": req.MaxRetry,
|
|
|
|
|
|
"page_info": req.PageInfo,
|
|
|
|
|
|
"request_params": req.RequestParams,
|
|
|
|
|
|
"retry_count": 0,
|
|
|
|
|
|
"duration_ms": 0,
|
2026-04-08 09:03:20 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-04-09 13:43:15 +08:00
|
|
|
|
r, err := gfdb.DB(ctx).Model(ctx, consts.SyncTaskLogTable).Data(data).Insert()
|
2026-04-08 09:03:20 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
|
return 0, err
|
|
|
|
|
|
}
|
|
|
|
|
|
return r.LastInsertId()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Update 更新任务日志
|
|
|
|
|
|
func (d *SyncTaskLogDao) Update(ctx context.Context, req *dto.UpdateSyncTaskLogReq) error {
|
|
|
|
|
|
data := make(gdb.Map)
|
|
|
|
|
|
|
|
|
|
|
|
if req.Status != "" {
|
|
|
|
|
|
data["status"] = req.Status
|
|
|
|
|
|
}
|
|
|
|
|
|
if req.RetryCount != nil {
|
|
|
|
|
|
data["retry_count"] = *req.RetryCount
|
|
|
|
|
|
}
|
|
|
|
|
|
if req.ErrorMessage != "" {
|
|
|
|
|
|
data["error_message"] = req.ErrorMessage
|
|
|
|
|
|
}
|
|
|
|
|
|
if req.ErrorCode != "" {
|
|
|
|
|
|
data["error_code"] = req.ErrorCode
|
|
|
|
|
|
}
|
|
|
|
|
|
if req.ResultSummary != nil {
|
|
|
|
|
|
data["result_summary"] = req.ResultSummary
|
|
|
|
|
|
}
|
|
|
|
|
|
if req.NextRetryTime != nil {
|
|
|
|
|
|
data["next_retry_time"] = req.NextRetryTime
|
|
|
|
|
|
}
|
|
|
|
|
|
if req.CompletedAt != nil {
|
|
|
|
|
|
data["completed_at"] = req.CompletedAt
|
|
|
|
|
|
}
|
|
|
|
|
|
if req.DurationMs != nil {
|
|
|
|
|
|
data["duration_ms"] = *req.DurationMs
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
data["updated_at"] = time.Now()
|
|
|
|
|
|
|
|
|
|
|
|
_, err := gfdb.DB(ctx).Model(ctx, consts.SyncTaskLogTable).
|
|
|
|
|
|
Data(data).
|
|
|
|
|
|
Where("id", req.ID).
|
|
|
|
|
|
Update()
|
|
|
|
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// QueryFailedTasks 查询需要补偿的失败任务
|
|
|
|
|
|
func (d *SyncTaskLogDao) QueryFailedTasks(ctx context.Context, req *dto.QueryFailedTasksReq) ([]*dto.SyncTaskLogItem, error) {
|
|
|
|
|
|
model := gfdb.DB(ctx).Model(ctx, consts.SyncTaskLogTable).Model
|
|
|
|
|
|
|
|
|
|
|
|
// 状态过滤
|
|
|
|
|
|
if len(req.Status) > 0 {
|
|
|
|
|
|
model = model.WhereIn("status", req.Status)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 任务类型过滤
|
|
|
|
|
|
if req.TaskType != "" {
|
|
|
|
|
|
model = model.Where("task_type", req.TaskType)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 只查询到达重试时间的任务(或从未设置过重试时间)
|
|
|
|
|
|
model = model.Where(
|
|
|
|
|
|
"(next_retry_time <= ? OR next_retry_time IS NULL)",
|
|
|
|
|
|
time.Now(),
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// 限制数量
|
|
|
|
|
|
limit := req.Limit
|
|
|
|
|
|
if limit <= 0 {
|
|
|
|
|
|
limit = 100
|
|
|
|
|
|
}
|
|
|
|
|
|
model = model.Limit(limit)
|
|
|
|
|
|
|
|
|
|
|
|
var results []*entity.SyncTaskLog
|
|
|
|
|
|
if err := model.Scan(&results); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
items := make([]*dto.SyncTaskLogItem, len(results))
|
|
|
|
|
|
for i, r := range results {
|
|
|
|
|
|
item := &dto.SyncTaskLogItem{}
|
|
|
|
|
|
gconv.Struct(r, item)
|
|
|
|
|
|
items[i] = item
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return items, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// GetByTaskID 根据任务ID获取日志
|
|
|
|
|
|
func (d *SyncTaskLogDao) GetByTaskID(ctx context.Context, taskID, taskType string) (*entity.SyncTaskLog, error) {
|
|
|
|
|
|
var result *entity.SyncTaskLog
|
|
|
|
|
|
err := gfdb.DB(ctx).Model(ctx, consts.SyncTaskLogTable).
|
|
|
|
|
|
Where("task_id", taskID).
|
|
|
|
|
|
Where("task_type", taskType).
|
|
|
|
|
|
Scan(&result)
|
|
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
return result, nil
|
|
|
|
|
|
}
|
2026-04-08 14:30:09 +08:00
|
|
|
|
|
|
|
|
|
|
// QueryAllPageTasksByParentID 根据主任务ID查询所有分页任务
|
|
|
|
|
|
func (d *SyncTaskLogDao) QueryAllPageTasksByParentID(ctx context.Context, parentTaskID string, limit int) ([]*dto.SyncTaskLogItem, error) {
|
|
|
|
|
|
if limit <= 0 {
|
|
|
|
|
|
limit = 1000
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
model := gfdb.DB(ctx).Model(ctx, consts.SyncTaskLogTable).Model
|
|
|
|
|
|
model = model.Where("task_type", "account_report_page")
|
|
|
|
|
|
model = model.WhereLike("task_id", parentTaskID+"_page_%")
|
|
|
|
|
|
model = model.Limit(limit)
|
|
|
|
|
|
|
|
|
|
|
|
var results []*entity.SyncTaskLog
|
|
|
|
|
|
if err := model.Scan(&results); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
items := make([]*dto.SyncTaskLogItem, len(results))
|
|
|
|
|
|
for i, r := range results {
|
|
|
|
|
|
item := &dto.SyncTaskLogItem{}
|
|
|
|
|
|
gconv.Struct(r, item)
|
|
|
|
|
|
items[i] = item
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return items, nil
|
|
|
|
|
|
}
|