package dao import ( "assets/consts/public" dto "assets/model/dto/sync" entity "assets/model/entity/sync" "context" "gitea.com/red-future/common/db/mongo" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" "go.mongodb.org/mongo-driver/v2/bson" ) var SyncTask = new(syncTask) type syncTask struct{} // Insert 插入同步任务 func (d *syncTask) Insert(ctx context.Context, task *entity.SyncTask) (err error) { _, err = mongo.DB().Insert(ctx, []interface{}{task}, task.CollectionName()) return } // GetOne 获取单个同步任务 func (d *syncTask) GetOne(ctx context.Context, id *bson.ObjectID) (task *entity.SyncTask, err error) { filter := bson.M{"_id": id} task = &entity.SyncTask{} err = mongo.DB().FindOne(ctx, filter, task, task.CollectionName()) return task, err } // List 获取同步任务列表 func (d *syncTask) List(ctx context.Context, req *dto.ListSyncTaskReq) (list []*entity.SyncTask, total int64, err error) { // 构建查询过滤条件 filter := d.buildListFilter(req) // 调用 common/db/mongo 的 Find 方法,不使用排序 total, err = mongo.DB().Find(ctx, filter, &list, "sync_task", &req.Page, nil) return } // Update 更新同步任务 func (d *syncTask) Update(ctx context.Context, id string, updateData *entity.SyncTask) (err error) { objectId, err := bson.ObjectIDFromHex(id) if err != nil { return err } filter := bson.M{"_id": objectId} if !g.IsEmpty(updateData) { // 直接使用 struct 转 map,不需要额外的转换 update := bson.M{"$set": gconv.Map(updateData)} _, err = mongo.DB().Update(ctx, filter, update, "sync_task") } return err } // UpdateStatus 更新同步任务状态 func (d *syncTask) UpdateStatus(ctx context.Context, id *bson.ObjectID, status public.SyncStatus, errorMessage string) (err error) { filter := bson.M{"_id": id} updateData := bson.M{ "status": status, "errorMessage": errorMessage, } if status == public.SyncStatusSyncing { updateData["startedAt"] = gtime.Now() } else if status == public.SyncStatusSuccess || status == public.SyncStatusFailed { updateData["finishedAt"] = gtime.Now() } update := bson.M{"$set": updateData} _, err = mongo.DB().Update(ctx, filter, update, "sync_task") return err } // UpdateErrorCount 更新错误计数 func (d *syncTask) UpdateErrorCount(ctx context.Context, id string, increment int) (err error) { objectId, err := bson.ObjectIDFromHex(id) if err != nil { return err } filter := bson.M{"_id": objectId} update := bson.M{"$inc": bson.M{"errorCount": increment}} _, err = mongo.DB().Update(ctx, filter, update, "sync_task") return err } // Delete 删除同步任务 func (d *syncTask) Delete(ctx context.Context, id string) (err error) { objectId, err := bson.ObjectIDFromHex(id) if err != nil { return err } filter := bson.M{"_id": objectId} _, err = mongo.DB().Delete(ctx, filter, "sync_task") return err } // GetPendingTasks 获取待处理的同步任务 func (d *syncTask) GetPendingTasks(ctx context.Context, limit int) (tasks []*entity.SyncTask, err error) { filter := bson.M{"status": public.SyncStatusPending} // 调用 common/db/mongo 的 Find 方法,不使用排序 _, err = mongo.DB().Find(ctx, filter, &tasks, "sync_task", nil, nil) return tasks, err } // buildListFilter 构建列表查询的过滤条件 func (d *syncTask) buildListFilter(req *dto.ListSyncTaskReq) bson.M { filter := bson.M{} if !g.IsEmpty(req.Platform) { filter["platform"] = req.Platform } if !g.IsEmpty(req.Status) { filter["status"] = req.Status } if req.StartTime != nil { filter["createdAt"] = bson.M{"$gte": req.StartTime} } if req.EndTime != nil { if existingFilter, exists := filter["createdAt"]; exists { if existingTimeRange, ok := existingFilter.(bson.M); ok { existingTimeRange["$lte"] = req.EndTime filter["createdAt"] = existingTimeRange } } else { filter["createdAt"] = bson.M{"$lte": req.EndTime} } } return filter } // SyncConfigDao 同步配置DAO var SyncConfig = new(syncConfig) type syncConfig struct{} // GetByPlatform 根据平台获取同步配置 func (d *syncConfig) GetByPlatform(ctx context.Context, platform public.SyncPlatform) (config *entity.ChannelConfig, err error) { filter := bson.M{"platform": platform} config = &entity.ChannelConfig{} err = mongo.DB().FindOne(ctx, filter, config, "sync_config") return config, err } // List 获取同步配置列表 func (d *syncConfig) List(ctx context.Context) (configs []*entity.ChannelConfig, err error) { _, err = mongo.DB().Find(ctx, bson.M{}, &configs, "sync_config", nil, nil) return configs, err } // Update 更新同步配置 func (d *syncConfig) Update(ctx context.Context, platform public.SyncPlatform, updateData *entity.ChannelConfig) (err error) { filter := bson.M{"platform": platform} if !g.IsEmpty(updateData) { // 直接使用 struct 转 map update := bson.M{"$set": gconv.Map(updateData)} _, err = mongo.DB().Update(ctx, filter, update, "sync_config") } return err }