Files
assets/dao/sync/sync_dao.go

170 lines
4.9 KiB
Go
Raw Normal View History

2026-03-18 10:18:03 +08:00
package dao
import (
"assets/consts/public"
dto "assets/model/dto/sync"
entity "assets/model/entity/sync"
"context"
"gitea.com/red-future/common/db/mongo"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
"go.mongodb.org/mongo-driver/v2/bson"
)
var SyncTask = new(syncTask)
type syncTask struct{}
// Insert 插入同步任务
func (d *syncTask) Insert(ctx context.Context, task *entity.SyncTask) (err error) {
_, err = mongo.DB().Insert(ctx, []interface{}{task}, task.CollectionName())
return
}
// GetOne 获取单个同步任务
func (d *syncTask) GetOne(ctx context.Context, id *bson.ObjectID) (task *entity.SyncTask, err error) {
filter := bson.M{"_id": id}
task = &entity.SyncTask{}
err = mongo.DB().FindOne(ctx, filter, task, task.CollectionName())
return task, err
}
// List 获取同步任务列表
func (d *syncTask) List(ctx context.Context, req *dto.ListSyncTaskReq) (list []*entity.SyncTask, total int64, err error) {
// 构建查询过滤条件
filter := d.buildListFilter(req)
// 调用 common/db/mongo 的 Find 方法,不使用排序
total, err = mongo.DB().Find(ctx, filter, &list, "sync_task", &req.Page, nil)
return
}
// Update 更新同步任务
func (d *syncTask) Update(ctx context.Context, id string, updateData *entity.SyncTask) (err error) {
objectId, err := bson.ObjectIDFromHex(id)
if err != nil {
return err
}
filter := bson.M{"_id": objectId}
if !g.IsEmpty(updateData) {
// 直接使用 struct 转 map不需要额外的转换
update := bson.M{"$set": gconv.Map(updateData)}
_, err = mongo.DB().Update(ctx, filter, update, "sync_task")
}
return err
}
// UpdateStatus 更新同步任务状态
func (d *syncTask) UpdateStatus(ctx context.Context, id *bson.ObjectID, status public.SyncStatus, errorMessage string) (err error) {
filter := bson.M{"_id": id}
updateData := bson.M{
"status": status,
"errorMessage": errorMessage,
}
if status == public.SyncStatusSyncing {
updateData["startedAt"] = gtime.Now()
} else if status == public.SyncStatusSuccess || status == public.SyncStatusFailed {
updateData["finishedAt"] = gtime.Now()
}
update := bson.M{"$set": updateData}
_, err = mongo.DB().Update(ctx, filter, update, "sync_task")
return err
}
// UpdateErrorCount 更新错误计数
func (d *syncTask) UpdateErrorCount(ctx context.Context, id string, increment int) (err error) {
objectId, err := bson.ObjectIDFromHex(id)
if err != nil {
return err
}
filter := bson.M{"_id": objectId}
update := bson.M{"$inc": bson.M{"errorCount": increment}}
_, err = mongo.DB().Update(ctx, filter, update, "sync_task")
return err
}
// Delete 删除同步任务
func (d *syncTask) Delete(ctx context.Context, id string) (err error) {
objectId, err := bson.ObjectIDFromHex(id)
if err != nil {
return err
}
filter := bson.M{"_id": objectId}
_, err = mongo.DB().Delete(ctx, filter, "sync_task")
return err
}
// GetPendingTasks 获取待处理的同步任务
func (d *syncTask) GetPendingTasks(ctx context.Context, limit int) (tasks []*entity.SyncTask, err error) {
filter := bson.M{"status": public.SyncStatusPending}
// 调用 common/db/mongo 的 Find 方法,不使用排序
_, err = mongo.DB().Find(ctx, filter, &tasks, "sync_task", nil, nil)
return tasks, err
}
// buildListFilter 构建列表查询的过滤条件
func (d *syncTask) buildListFilter(req *dto.ListSyncTaskReq) bson.M {
filter := bson.M{}
if !g.IsEmpty(req.Platform) {
filter["platform"] = req.Platform
}
if !g.IsEmpty(req.Status) {
filter["status"] = req.Status
}
if req.StartTime != nil {
filter["createdAt"] = bson.M{"$gte": req.StartTime}
}
if req.EndTime != nil {
if existingFilter, exists := filter["createdAt"]; exists {
if existingTimeRange, ok := existingFilter.(bson.M); ok {
existingTimeRange["$lte"] = req.EndTime
filter["createdAt"] = existingTimeRange
}
} else {
filter["createdAt"] = bson.M{"$lte": req.EndTime}
}
}
return filter
}
// SyncConfigDao 同步配置DAO
var SyncConfig = new(syncConfig)
type syncConfig struct{}
// GetByPlatform 根据平台获取同步配置
func (d *syncConfig) GetByPlatform(ctx context.Context, platform public.SyncPlatform) (config *entity.ChannelConfig, err error) {
filter := bson.M{"platform": platform}
config = &entity.ChannelConfig{}
err = mongo.DB().FindOne(ctx, filter, config, "sync_config")
return config, err
}
// List 获取同步配置列表
func (d *syncConfig) List(ctx context.Context) (configs []*entity.ChannelConfig, err error) {
_, err = mongo.DB().Find(ctx, bson.M{}, &configs, "sync_config", nil, nil)
return configs, err
}
// Update 更新同步配置
func (d *syncConfig) Update(ctx context.Context, platform public.SyncPlatform, updateData *entity.ChannelConfig) (err error) {
filter := bson.M{"platform": platform}
if !g.IsEmpty(updateData) {
// 直接使用 struct 转 map
update := bson.M{"$set": gconv.Map(updateData)}
_, err = mongo.DB().Update(ctx, filter, update, "sync_config")
}
return err
}