package service import ( consts "assets/consts/public" dao "assets/dao/sync" dto "assets/model/dto/sync" entity "assets/model/entity/sync" "context" "gitea.com/red-future/common/utils" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/grpool" "go.mongodb.org/mongo-driver/v2/bson" ) type syncService struct{} // Sync 同步服务 var Sync = new(syncService) // PlatformFactory 平台服务工厂实例 var platformFactory = NewPlatformServiceFactory() // SyncPool 同步任务协程池,限制并发数避免goroutine爆炸 var SyncPool = grpool.New(20) // CreateSyncTask 创建同步任务 func (s *syncService) CreateSyncTask(ctx context.Context, req *dto.CreateSyncTaskReq) (*bson.ObjectID, error) { task := &entity.SyncTask{ Platform: req.Platform, SyncType: req.SyncType, Status: consts.SyncStatusPending, AssetID: req.AssetID, AssetSKUID: req.AssetSKUID, StockID: req.StockID, ErrorMessage: "", ErrorCount: 0, } err := dao.SyncTask.Insert(ctx, task) if err != nil { return nil, err } return task.Id, nil } // GetSyncTask 获取同步任务详情 func (s *syncService) GetSyncTask(ctx context.Context, id *bson.ObjectID) (res *dto.GetSyncTaskRes, err error) { task, err := dao.SyncTask.GetOne(ctx, id) if err != nil { return } res = &dto.GetSyncTaskRes{} err = utils.Struct(task, &res.SyncTaskItem) return } // ListSyncTasks 获取同步任务列表 func (s *syncService) ListSyncTasks(ctx context.Context, req *dto.ListSyncTaskReq) (list []*dto.SyncTaskItem, total int64, err error) { tasks, total, err := dao.SyncTask.List(ctx, req) if err != nil { return } err = utils.Struct(tasks, &list) return } // UpdateSyncTaskStatus 更新同步任务状态 func (s *syncService) UpdateSyncTaskStatus(ctx context.Context, req *dto.UpdateSyncTaskStatusReq) error { return dao.SyncTask.UpdateStatus(ctx, req.ID, req.Status, req.ErrorMessage) } // SyncAsset 同步资产 func (s *syncService) SyncAsset(ctx context.Context, req *dto.SyncAssetReq) (*bson.ObjectID, error) { // 创建同步任务 taskReq := &dto.CreateSyncTaskReq{ Platform: req.Platform, SyncType: consts.SyncTypeIncremental, AssetID: req.AssetID, } taskID, err := s.CreateSyncTask(ctx, taskReq) if err != nil { return nil, err } // 异步执行同步任务(使用协程池限制并发) asyncCtx := context.WithoutCancel(ctx) SyncPool.Add(asyncCtx, func(ctx context.Context) { s.executeAssetSync(ctx, taskID, req.AssetID, req.Platform) }) return taskID, nil } // SyncAssetSku 同步资产SKU func (s *syncService) SyncAssetSku(ctx context.Context, req *dto.SyncAssetSkuReq) (*bson.ObjectID, error) { taskReq := &dto.CreateSyncTaskReq{ Platform: req.Platform, SyncType: consts.SyncTypeIncremental, AssetSKUID: req.AssetSKUID, } taskID, err := s.CreateSyncTask(ctx, taskReq) if err != nil { return nil, err } // 异步执行同步任务(使用协程池限制并发) asyncCtx := context.WithoutCancel(ctx) SyncPool.Add(asyncCtx, func(ctx context.Context) { s.executeAssetSkuSync(ctx, taskID, req.AssetSKUID, req.Platform) }) return taskID, nil } // SyncStock 同步库存 func (s *syncService) SyncStock(ctx context.Context, req *dto.SyncStockReq) (*bson.ObjectID, error) { taskReq := &dto.CreateSyncTaskReq{ Platform: req.Platform, SyncType: consts.SyncTypeIncremental, StockID: req.StockID, } taskID, err := s.CreateSyncTask(ctx, taskReq) if err != nil { return nil, err } // 异步执行同步任务(使用协程池限制并发) asyncCtx := context.WithoutCancel(ctx) SyncPool.Add(asyncCtx, func(ctx context.Context) { s.executeStockSync(ctx, taskID, req.StockID, req.Platform) }) return taskID, nil } // BatchSyncAssets 批量同步资产 func (s *syncService) BatchSyncAssets(ctx context.Context, req *dto.BatchSyncAssetsReq) ([]*bson.ObjectID, error) { var taskIDs []*bson.ObjectID for _, assetID := range req.AssetIDs { taskReq := &dto.CreateSyncTaskReq{ Platform: req.Platform, SyncType: consts.SyncTypeIncremental, AssetID: assetID, } taskID, err := s.CreateSyncTask(ctx, taskReq) if err != nil { return taskIDs, err } taskIDs = append(taskIDs, taskID) // 异步执行同步任务(使用协程池限制并发) asyncCtx := context.WithoutCancel(ctx) currentAssetID := assetID SyncPool.Add(asyncCtx, func(ctx context.Context) { s.executeAssetSync(ctx, taskID, currentAssetID, req.Platform) }) } return taskIDs, nil } // GetPlatformSyncStatus 获取平台同步状态 func (s *syncService) GetPlatformSyncStatus(ctx context.Context, req *dto.GetPlatformSyncStatusReq) (*dto.GetPlatformSyncStatusRes, error) { // 统计各状态任务数量 totalReq := &dto.ListSyncTaskReq{Platform: req.Platform} totalReq.PageNum = 1 totalReq.PageSize = 1 _, total, err := dao.SyncTask.List(ctx, totalReq) if err != nil { return nil, err } successReq := &dto.ListSyncTaskReq{Platform: req.Platform, Status: consts.SyncStatusSuccess} successReq.PageNum = 1 successReq.PageSize = 1 _, successCount, err := dao.SyncTask.List(ctx, successReq) if err != nil { return nil, err } failedReq := &dto.ListSyncTaskReq{Platform: req.Platform, Status: consts.SyncStatusFailed} failedReq.PageNum = 1 failedReq.PageSize = 1 _, failedCount, err := dao.SyncTask.List(ctx, failedReq) if err != nil { return nil, err } return &dto.GetPlatformSyncStatusRes{ Platform: req.Platform, IsEnabled: true, SyncCount: total, SuccessCount: successCount, FailedCount: failedCount, }, nil } // executeAssetSync 执行资产同步 func (s *syncService) executeAssetSync(ctx context.Context, taskID, assetID *bson.ObjectID, platform consts.SyncPlatform) { // 更新任务状态为同步中 dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusSyncing, "") // 获取平台服务 assetService := platformFactory.CreateAssetService(platform) // 执行同步 err := assetService.SyncAsset(ctx, assetID) if err != nil { // 同步失败 g.Log().Error(ctx, "资产同步失败", g.Map{ "task_id": taskID, "asset_id": assetID, "platform": string(platform), "error": err.Error(), }) dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusFailed, err.Error()) } else { // 同步成功 g.Log().Info(ctx, "资产同步成功", g.Map{ "task_id": taskID, "asset_id": assetID, "platform": string(platform), }) dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusSuccess, "") } } // executeAssetSkuSync 执行资产SKU同步 func (s *syncService) executeAssetSkuSync(ctx context.Context, taskID, assetSkuID *bson.ObjectID, platform consts.SyncPlatform) { // 更新任务状态为同步中 dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusSyncing, "") // 获取平台服务 assetSkuService := platformFactory.CreateAssetSkuService(platform) // 执行同步 err := assetSkuService.SyncAssetSku(ctx, assetSkuID) if err != nil { // 同步失败 g.Log().Error(ctx, "资产SKU同步失败", g.Map{ "task_id": taskID, "asset_sku_id": assetSkuID, "platform": string(platform), "error": err.Error(), }) dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusFailed, err.Error()) } else { // 同步成功 g.Log().Info(ctx, "资产SKU同步成功", g.Map{ "task_id": taskID, "asset_sku_id": assetSkuID, "platform": string(platform), }) dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusSuccess, "") } } // executeStockSync 执行库存同步 func (s *syncService) executeStockSync(ctx context.Context, taskID, stockID *bson.ObjectID, platform consts.SyncPlatform) { // 更新任务状态为同步中 dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusSyncing, "") // 获取平台服务 stockService := platformFactory.CreateStockService(platform) // 执行同步 err := stockService.SyncStock(ctx, stockID) if err != nil { // 同步失败 g.Log().Error(ctx, "库存同步失败", g.Map{ "task_id": taskID, "stock_id": stockID, "platform": string(platform), "error": err.Error(), }) dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusFailed, err.Error()) } else { // 同步成功 g.Log().Info(ctx, "库存同步成功", g.Map{ "task_id": taskID, "stock_id": stockID, "platform": string(platform), }) dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusSuccess, "") } }