Files
assets/service/sync/sync_service.go
2026-03-18 10:18:03 +08:00

293 lines
8.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package service
import (
consts "assets/consts/public"
dao "assets/dao/sync"
dto "assets/model/dto/sync"
entity "assets/model/entity/sync"
"context"
"gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/grpool"
"go.mongodb.org/mongo-driver/v2/bson"
)
type syncService struct{}
// Sync 同步服务
var Sync = new(syncService)
// PlatformFactory 平台服务工厂实例
var platformFactory = NewPlatformServiceFactory()
// SyncPool 同步任务协程池限制并发数避免goroutine爆炸
var SyncPool = grpool.New(20)
// CreateSyncTask 创建同步任务
func (s *syncService) CreateSyncTask(ctx context.Context, req *dto.CreateSyncTaskReq) (*bson.ObjectID, error) {
task := &entity.SyncTask{
Platform: req.Platform,
SyncType: req.SyncType,
Status: consts.SyncStatusPending,
AssetID: req.AssetID,
AssetSKUID: req.AssetSKUID,
StockID: req.StockID,
ErrorMessage: "",
ErrorCount: 0,
}
err := dao.SyncTask.Insert(ctx, task)
if err != nil {
return nil, err
}
return task.Id, nil
}
// GetSyncTask 获取同步任务详情
func (s *syncService) GetSyncTask(ctx context.Context, id *bson.ObjectID) (res *dto.GetSyncTaskRes, err error) {
task, err := dao.SyncTask.GetOne(ctx, id)
if err != nil {
return
}
res = &dto.GetSyncTaskRes{}
err = utils.Struct(task, &res.SyncTaskItem)
return
}
// ListSyncTasks 获取同步任务列表
func (s *syncService) ListSyncTasks(ctx context.Context, req *dto.ListSyncTaskReq) (list []*dto.SyncTaskItem, total int64, err error) {
tasks, total, err := dao.SyncTask.List(ctx, req)
if err != nil {
return
}
err = utils.Struct(tasks, &list)
return
}
// UpdateSyncTaskStatus 更新同步任务状态
func (s *syncService) UpdateSyncTaskStatus(ctx context.Context, req *dto.UpdateSyncTaskStatusReq) error {
return dao.SyncTask.UpdateStatus(ctx, req.ID, req.Status, req.ErrorMessage)
}
// SyncAsset 同步资产
func (s *syncService) SyncAsset(ctx context.Context, req *dto.SyncAssetReq) (*bson.ObjectID, error) {
// 创建同步任务
taskReq := &dto.CreateSyncTaskReq{
Platform: req.Platform,
SyncType: consts.SyncTypeIncremental,
AssetID: req.AssetID,
}
taskID, err := s.CreateSyncTask(ctx, taskReq)
if err != nil {
return nil, err
}
// 异步执行同步任务(使用协程池限制并发)
asyncCtx := context.WithoutCancel(ctx)
SyncPool.Add(asyncCtx, func(ctx context.Context) {
s.executeAssetSync(ctx, taskID, req.AssetID, req.Platform)
})
return taskID, nil
}
// SyncAssetSku 同步资产SKU
func (s *syncService) SyncAssetSku(ctx context.Context, req *dto.SyncAssetSkuReq) (*bson.ObjectID, error) {
taskReq := &dto.CreateSyncTaskReq{
Platform: req.Platform,
SyncType: consts.SyncTypeIncremental,
AssetSKUID: req.AssetSKUID,
}
taskID, err := s.CreateSyncTask(ctx, taskReq)
if err != nil {
return nil, err
}
// 异步执行同步任务(使用协程池限制并发)
asyncCtx := context.WithoutCancel(ctx)
SyncPool.Add(asyncCtx, func(ctx context.Context) {
s.executeAssetSkuSync(ctx, taskID, req.AssetSKUID, req.Platform)
})
return taskID, nil
}
// SyncStock 同步库存
func (s *syncService) SyncStock(ctx context.Context, req *dto.SyncStockReq) (*bson.ObjectID, error) {
taskReq := &dto.CreateSyncTaskReq{
Platform: req.Platform,
SyncType: consts.SyncTypeIncremental,
StockID: req.StockID,
}
taskID, err := s.CreateSyncTask(ctx, taskReq)
if err != nil {
return nil, err
}
// 异步执行同步任务(使用协程池限制并发)
asyncCtx := context.WithoutCancel(ctx)
SyncPool.Add(asyncCtx, func(ctx context.Context) {
s.executeStockSync(ctx, taskID, req.StockID, req.Platform)
})
return taskID, nil
}
// BatchSyncAssets 批量同步资产
func (s *syncService) BatchSyncAssets(ctx context.Context, req *dto.BatchSyncAssetsReq) ([]*bson.ObjectID, error) {
var taskIDs []*bson.ObjectID
for _, assetID := range req.AssetIDs {
taskReq := &dto.CreateSyncTaskReq{
Platform: req.Platform,
SyncType: consts.SyncTypeIncremental,
AssetID: assetID,
}
taskID, err := s.CreateSyncTask(ctx, taskReq)
if err != nil {
return taskIDs, err
}
taskIDs = append(taskIDs, taskID)
// 异步执行同步任务(使用协程池限制并发)
asyncCtx := context.WithoutCancel(ctx)
currentAssetID := assetID
SyncPool.Add(asyncCtx, func(ctx context.Context) {
s.executeAssetSync(ctx, taskID, currentAssetID, req.Platform)
})
}
return taskIDs, nil
}
// GetPlatformSyncStatus 获取平台同步状态
func (s *syncService) GetPlatformSyncStatus(ctx context.Context, req *dto.GetPlatformSyncStatusReq) (*dto.GetPlatformSyncStatusRes, error) {
// 统计各状态任务数量
totalReq := &dto.ListSyncTaskReq{Platform: req.Platform}
totalReq.PageNum = 1
totalReq.PageSize = 1
_, total, err := dao.SyncTask.List(ctx, totalReq)
if err != nil {
return nil, err
}
successReq := &dto.ListSyncTaskReq{Platform: req.Platform, Status: consts.SyncStatusSuccess}
successReq.PageNum = 1
successReq.PageSize = 1
_, successCount, err := dao.SyncTask.List(ctx, successReq)
if err != nil {
return nil, err
}
failedReq := &dto.ListSyncTaskReq{Platform: req.Platform, Status: consts.SyncStatusFailed}
failedReq.PageNum = 1
failedReq.PageSize = 1
_, failedCount, err := dao.SyncTask.List(ctx, failedReq)
if err != nil {
return nil, err
}
return &dto.GetPlatformSyncStatusRes{
Platform: req.Platform,
IsEnabled: true,
SyncCount: total,
SuccessCount: successCount,
FailedCount: failedCount,
}, nil
}
// executeAssetSync 执行资产同步
func (s *syncService) executeAssetSync(ctx context.Context, taskID, assetID *bson.ObjectID, platform consts.SyncPlatform) {
// 更新任务状态为同步中
dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusSyncing, "")
// 获取平台服务
assetService := platformFactory.CreateAssetService(platform)
// 执行同步
err := assetService.SyncAsset(ctx, assetID)
if err != nil {
// 同步失败
g.Log().Error(ctx, "资产同步失败", g.Map{
"task_id": taskID,
"asset_id": assetID,
"platform": string(platform),
"error": err.Error(),
})
dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusFailed, err.Error())
} else {
// 同步成功
g.Log().Info(ctx, "资产同步成功", g.Map{
"task_id": taskID,
"asset_id": assetID,
"platform": string(platform),
})
dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusSuccess, "")
}
}
// executeAssetSkuSync 执行资产SKU同步
func (s *syncService) executeAssetSkuSync(ctx context.Context, taskID, assetSkuID *bson.ObjectID, platform consts.SyncPlatform) {
// 更新任务状态为同步中
dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusSyncing, "")
// 获取平台服务
assetSkuService := platformFactory.CreateAssetSkuService(platform)
// 执行同步
err := assetSkuService.SyncAssetSku(ctx, assetSkuID)
if err != nil {
// 同步失败
g.Log().Error(ctx, "资产SKU同步失败", g.Map{
"task_id": taskID,
"asset_sku_id": assetSkuID,
"platform": string(platform),
"error": err.Error(),
})
dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusFailed, err.Error())
} else {
// 同步成功
g.Log().Info(ctx, "资产SKU同步成功", g.Map{
"task_id": taskID,
"asset_sku_id": assetSkuID,
"platform": string(platform),
})
dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusSuccess, "")
}
}
// executeStockSync 执行库存同步
func (s *syncService) executeStockSync(ctx context.Context, taskID, stockID *bson.ObjectID, platform consts.SyncPlatform) {
// 更新任务状态为同步中
dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusSyncing, "")
// 获取平台服务
stockService := platformFactory.CreateStockService(platform)
// 执行同步
err := stockService.SyncStock(ctx, stockID)
if err != nil {
// 同步失败
g.Log().Error(ctx, "库存同步失败", g.Map{
"task_id": taskID,
"stock_id": stockID,
"platform": string(platform),
"error": err.Error(),
})
dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusFailed, err.Error())
} else {
// 同步成功
g.Log().Info(ctx, "库存同步成功", g.Map{
"task_id": taskID,
"stock_id": stockID,
"platform": string(platform),
})
dao.SyncTask.UpdateStatus(ctx, taskID, consts.SyncStatusSuccess, "")
}
}