Files
assets/dao/sync/sync_dao.go
2026-03-18 10:18:03 +08:00

170 lines
4.9 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package dao
import (
"assets/consts/public"
dto "assets/model/dto/sync"
entity "assets/model/entity/sync"
"context"
"gitea.com/red-future/common/db/mongo"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
"go.mongodb.org/mongo-driver/v2/bson"
)
var SyncTask = new(syncTask)
type syncTask struct{}
// Insert 插入同步任务
func (d *syncTask) Insert(ctx context.Context, task *entity.SyncTask) (err error) {
_, err = mongo.DB().Insert(ctx, []interface{}{task}, task.CollectionName())
return
}
// GetOne 获取单个同步任务
func (d *syncTask) GetOne(ctx context.Context, id *bson.ObjectID) (task *entity.SyncTask, err error) {
filter := bson.M{"_id": id}
task = &entity.SyncTask{}
err = mongo.DB().FindOne(ctx, filter, task, task.CollectionName())
return task, err
}
// List 获取同步任务列表
func (d *syncTask) List(ctx context.Context, req *dto.ListSyncTaskReq) (list []*entity.SyncTask, total int64, err error) {
// 构建查询过滤条件
filter := d.buildListFilter(req)
// 调用 common/db/mongo 的 Find 方法,不使用排序
total, err = mongo.DB().Find(ctx, filter, &list, "sync_task", &req.Page, nil)
return
}
// Update 更新同步任务
func (d *syncTask) Update(ctx context.Context, id string, updateData *entity.SyncTask) (err error) {
objectId, err := bson.ObjectIDFromHex(id)
if err != nil {
return err
}
filter := bson.M{"_id": objectId}
if !g.IsEmpty(updateData) {
// 直接使用 struct 转 map不需要额外的转换
update := bson.M{"$set": gconv.Map(updateData)}
_, err = mongo.DB().Update(ctx, filter, update, "sync_task")
}
return err
}
// UpdateStatus 更新同步任务状态
func (d *syncTask) UpdateStatus(ctx context.Context, id *bson.ObjectID, status public.SyncStatus, errorMessage string) (err error) {
filter := bson.M{"_id": id}
updateData := bson.M{
"status": status,
"errorMessage": errorMessage,
}
if status == public.SyncStatusSyncing {
updateData["startedAt"] = gtime.Now()
} else if status == public.SyncStatusSuccess || status == public.SyncStatusFailed {
updateData["finishedAt"] = gtime.Now()
}
update := bson.M{"$set": updateData}
_, err = mongo.DB().Update(ctx, filter, update, "sync_task")
return err
}
// UpdateErrorCount 更新错误计数
func (d *syncTask) UpdateErrorCount(ctx context.Context, id string, increment int) (err error) {
objectId, err := bson.ObjectIDFromHex(id)
if err != nil {
return err
}
filter := bson.M{"_id": objectId}
update := bson.M{"$inc": bson.M{"errorCount": increment}}
_, err = mongo.DB().Update(ctx, filter, update, "sync_task")
return err
}
// Delete 删除同步任务
func (d *syncTask) Delete(ctx context.Context, id string) (err error) {
objectId, err := bson.ObjectIDFromHex(id)
if err != nil {
return err
}
filter := bson.M{"_id": objectId}
_, err = mongo.DB().Delete(ctx, filter, "sync_task")
return err
}
// GetPendingTasks 获取待处理的同步任务
func (d *syncTask) GetPendingTasks(ctx context.Context, limit int) (tasks []*entity.SyncTask, err error) {
filter := bson.M{"status": public.SyncStatusPending}
// 调用 common/db/mongo 的 Find 方法,不使用排序
_, err = mongo.DB().Find(ctx, filter, &tasks, "sync_task", nil, nil)
return tasks, err
}
// buildListFilter 构建列表查询的过滤条件
func (d *syncTask) buildListFilter(req *dto.ListSyncTaskReq) bson.M {
filter := bson.M{}
if !g.IsEmpty(req.Platform) {
filter["platform"] = req.Platform
}
if !g.IsEmpty(req.Status) {
filter["status"] = req.Status
}
if req.StartTime != nil {
filter["createdAt"] = bson.M{"$gte": req.StartTime}
}
if req.EndTime != nil {
if existingFilter, exists := filter["createdAt"]; exists {
if existingTimeRange, ok := existingFilter.(bson.M); ok {
existingTimeRange["$lte"] = req.EndTime
filter["createdAt"] = existingTimeRange
}
} else {
filter["createdAt"] = bson.M{"$lte": req.EndTime}
}
}
return filter
}
// SyncConfigDao 同步配置DAO
var SyncConfig = new(syncConfig)
type syncConfig struct{}
// GetByPlatform 根据平台获取同步配置
func (d *syncConfig) GetByPlatform(ctx context.Context, platform public.SyncPlatform) (config *entity.ChannelConfig, err error) {
filter := bson.M{"platform": platform}
config = &entity.ChannelConfig{}
err = mongo.DB().FindOne(ctx, filter, config, "sync_config")
return config, err
}
// List 获取同步配置列表
func (d *syncConfig) List(ctx context.Context) (configs []*entity.ChannelConfig, err error) {
_, err = mongo.DB().Find(ctx, bson.M{}, &configs, "sync_config", nil, nil)
return configs, err
}
// Update 更新同步配置
func (d *syncConfig) Update(ctx context.Context, platform public.SyncPlatform, updateData *entity.ChannelConfig) (err error) {
filter := bson.M{"platform": platform}
if !g.IsEmpty(updateData) {
// 直接使用 struct 转 map
update := bson.M{"$set": gconv.Map(updateData)}
_, err = mongo.DB().Update(ctx, filter, update, "sync_config")
}
return err
}