diff --git a/dao/order_dao.go b/dao/order_dao.go index 177fe47..04df3a3 100644 --- a/dao/order_dao.go +++ b/dao/order_dao.go @@ -4,18 +4,17 @@ import ( "context" "errors" "fmt" + "strings" "time" - "gitee.com/red-future---jilin-g/common/mongo" "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo/options" + "order/consts" "order/model/entity" -) -// Init 初始化DAO -func Init() error { - return nil -} + "gitee.com/red-future---jilin-g/common/mongo" +) type order struct{} @@ -169,7 +168,7 @@ func (d *order) UpdatePendingOrder(ctx context.Context, orderNo string, update b } // ListOrdersByStatus 根据状态查询订单列表 -func (d *order) ListOrdersByStatus(ctx context.Context, status consts.OrderStatus, userID string, page, pageSize int) (interface{}, int64, error) { +func (d *order) ListOrdersByStatus(ctx context.Context, status consts.OrderStatus, userID string, page int, pageSize int, sortFields ...string) (interface{}, int64, error) { collection, err := d.getCollection(status) if err != nil { return nil, 0, err @@ -187,32 +186,51 @@ func (d *order) ListOrdersByStatus(ctx context.Context, status consts.OrderStatu return nil, 0, err } - // 分页查询(暂时忽略排序和分页,因为 mongo.Find 不支持这些参数) - // TODO: 需要在 common/mongo 中添加支持排序和分页的 Find 方法 + // 设置分页参数 + var findOptions []options.Lister[options.FindOptions] + if page > 0 && pageSize > 0 { + findOptions = append(findOptions, options.Find().SetSkip(int64((page-1)*pageSize)).SetLimit(int64(pageSize))) + } + + // 设置排序参数 + if len(sortFields) > 0 { + sort := bson.D{} + for _, field := range sortFields { + var order int + if strings.HasPrefix(field, "-") { + order = -1 + field = strings.TrimPrefix(field, "-") + } else { + order = 1 + } + sort = append(sort, bson.E{Key: field, Value: order}) + } + findOptions = append(findOptions, options.Find().SetSort(sort)) + } // 根据状态返回对应的订单类型 switch status { case consts.OrderStatusPending: var orders []entity.OrderPending - if err := mongo.Find(ctx, filter, &orders, collection); err != nil { + if err := mongo.Find(ctx, filter, &orders, collection, findOptions...); err != nil { return nil, 0, err } return orders, total, nil case consts.OrderStatusPaid: var orders []entity.OrderPaid - if err := mongo.Find(ctx, filter, &orders, collection); err != nil { + if err := mongo.Find(ctx, filter, &orders, collection, findOptions...); err != nil { return nil, 0, err } return orders, total, nil case consts.OrderStatusShipped: var orders []entity.OrderShipped - if err := mongo.Find(ctx, filter, &orders, collection); err != nil { + if err := mongo.Find(ctx, filter, &orders, collection, findOptions...); err != nil { return nil, 0, err } return orders, total, nil case consts.OrderStatusCompleted: var orders []entity.OrderCompleted - if err := mongo.Find(ctx, filter, &orders, collection); err != nil { + if err := mongo.Find(ctx, filter, &orders, collection, findOptions...); err != nil { return nil, 0, err } return orders, total, nil @@ -222,6 +240,26 @@ func (d *order) ListOrdersByStatus(ctx context.Context, status consts.OrderStatu } // GetExpiredPendingOrders 获取过期的待支付订单 +// GetPendingOrder gets pending order by order number +func (d *order) GetPendingOrder(ctx context.Context, orderNo string) (*entity.OrderPending, error) { + collection, err := d.getCollection(consts.OrderStatusPending) + if err != nil { + return nil, err + } + + filter := bson.M{"order_no": orderNo} + + var order entity.OrderPending + if err := mongo.FindOne(ctx, filter, &order, collection); err != nil { + if err.Error() == "mongo: no documents in result" { + return nil, nil + } + return nil, err + } + + return &order, nil +} + func (d *order) GetExpiredPendingOrders(ctx context.Context) ([]entity.OrderPending, error) { collection, err := d.getCollection(consts.OrderStatusPending) if err != nil { diff --git a/go.mod b/go.mod index a940198..172e20f 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/gogf/gf/contrib/drivers/mysql/v2 v2.9.6 github.com/gogf/gf/contrib/nosql/redis/v2 v2.9.6 github.com/gogf/gf/v2 v2.9.6 + github.com/robfig/cron/v3 v3.0.1 go.mongodb.org/mongo-driver/v2 v2.4.0 ) diff --git a/go.sum b/go.sum index 0783b33..336c13c 100644 --- a/go.sum +++ b/go.sum @@ -260,6 +260,8 @@ github.com/redis/go-redis/v9 v9.12.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6 github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= diff --git a/main.go b/main.go index a9809fc..59330fa 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "context" "order/controller" + "order/service" "gitee.com/red-future---jilin-g/common/http" "gitee.com/red-future---jilin-g/common/jaeger" @@ -10,6 +11,7 @@ import ( _ "gitee.com/red-future---jilin-g/common/ragflow" // RAGFlow 客户端自动初始化 _ "github.com/gogf/gf/contrib/drivers/mysql/v2" _ "github.com/gogf/gf/contrib/nosql/redis/v2" + "github.com/gogf/gf/v2/frame/g" ) func main() { @@ -19,10 +21,18 @@ func main() { controller.OrderStatistics, controller.PaymentConfig, }) + // 启动定时任务 + ctx := context.Background() + + // 启动订单超时处理定时任务 + if err := service.StartOrderTimeoutScheduler(ctx); err != nil { + g.Log().Errorf(ctx, "启动订单超时处理定时任务失败: %v", err) + } + // 启动订单统计定时任务调度器 - //ctx := context.Background() - //if err := scheduler.OrderStatisticsSchedulerInstance.StartScheduler(ctx); err != nil { - // g.Log().Errorf(ctx, "启动订单统计定时任务失败: %v", err) - //} + if err := service.StartOrderStatisticsScheduler(ctx); err != nil { + g.Log().Errorf(ctx, "启动订单统计定时任务失败: %v", err) + } + select {} } diff --git a/scheduler/order_statistics_scheduler.go b/scheduler/order_statistics_scheduler.go deleted file mode 100644 index 6177a6c..0000000 --- a/scheduler/order_statistics_scheduler.go +++ /dev/null @@ -1,481 +0,0 @@ -package scheduler - -import ( - "context" - "fmt" - "sync" - "time" - - "order/service" - - "github.com/gogf/gf/v2/frame/g" -) - -// OrderStatisticsScheduler 订单统计定时任务调度器 -type OrderStatisticsScheduler struct{} - -var OrderStatisticsSchedulerInstance = &OrderStatisticsScheduler{} -var schedulerLock sync.Mutex -var isSchedulerRunning bool - -// StartScheduler 启动定时任务调度器(分布式安全) -func (s *OrderStatisticsScheduler) StartScheduler(ctx context.Context) error { - schedulerLock.Lock() - defer schedulerLock.Unlock() - - // 检查是否已经有调度器在运行(分布式部署时避免重复执行) - if isSchedulerRunning { - g.Log().Info(ctx, "订单统计定时任务调度器已在运行") - return nil - } - - // 尝试获取分布式锁 - if !s.acquireDistributedLock(ctx) { - g.Log().Info(ctx, "其他节点正在运行订单统计定时任务,当前节点跳过") - return nil - } - - isSchedulerRunning = true - - // 启动锁续期任务 - go s.startLockRenewal(ctx) - - // 启动日报表生成任务(每天凌晨3点执行) - go s.startDailyReportScheduler(ctx) - - // 启动月报表生成任务(每月1日凌晨4点执行) - go s.startMonthlyReportScheduler(ctx) - - // 启动季度报表生成任务(每季度首月5日凌晨5点执行) - go s.startQuarterlyReportScheduler(ctx) - - // 启动年报表生成任务(每年1月10日凌晨6点执行) - go s.startYearlyReportScheduler(ctx) - - g.Log().Info(ctx, "订单统计定时任务调度器已启动") - return nil -} - -// acquireDistributedLock 获取分布式锁 -func (s *OrderStatisticsScheduler) acquireDistributedLock(ctx context.Context) bool { - lockKey := "order_statistics_scheduler_lock" - - // 尝试设置锁,过期时间30秒 - success, err := g.Redis().SetNX(ctx, lockKey, "locked") - if err != nil { - g.Log().Errorf(ctx, "获取分布式锁失败: %v", err) - return false - } - - // 设置过期时间 - if success { - _, err = g.Redis().Do(ctx, "EXPIRE", lockKey, 30) - if err != nil { - g.Log().Errorf(ctx, "设置锁过期时间失败: %v", err) - // 删除已设置的锁 - g.Redis().Do(ctx, "DEL", lockKey) - return false - } - } - - return success -} - -// renewDistributedLock 续期分布式锁 -func (s *OrderStatisticsScheduler) renewDistributedLock(ctx context.Context) bool { - lockKey := "order_statistics_scheduler_lock" - - // 续期30秒 - _, err := g.Redis().Do(ctx, "EXPIRE", lockKey, 30) - if err != nil { - g.Log().Errorf(ctx, "续期分布式锁失败: %v", err) - return false - } - - return true -} - -// startLockRenewal 启动锁续期任务 -func (s *OrderStatisticsScheduler) startLockRenewal(ctx context.Context) { - ticker := time.NewTicker(20 * time.Second) // 每20秒续期一次 - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if !s.renewDistributedLock(ctx) { - g.Log().Error(ctx, "分布式锁续期失败,停止调度器") - schedulerLock.Lock() - isSchedulerRunning = false - schedulerLock.Unlock() - return - } - case <-ctx.Done(): - // 释放分布式锁 - s.releaseDistributedLock(ctx) - return - } - } -} - -// releaseDistributedLock 释放分布式锁 -func (s *OrderStatisticsScheduler) releaseDistributedLock(ctx context.Context) { - lockKey := "order_statistics_scheduler_lock" - _, err := g.Redis().Do(ctx, "DEL", lockKey) - if err != nil { - g.Log().Errorf(ctx, "释放分布式锁失败: %v", err) - } -} - -// acquireTaskLock 获取任务级分布式锁 -func (s *OrderStatisticsScheduler) acquireTaskLock(ctx context.Context, lockKey string) bool { - // 尝试设置锁,过期时间10分钟 - success, err := g.Redis().SetNX(ctx, lockKey, "locked") - if err != nil { - g.Log().Errorf(ctx, "获取任务锁失败: %v", err) - return false - } - - // 设置过期时间 - if success { - _, err = g.Redis().Do(ctx, "EXPIRE", lockKey, 600) // 10分钟=600秒 - if err != nil { - g.Log().Errorf(ctx, "设置锁过期时间失败: %v", err) - // 删除已设置的锁 - g.Redis().Do(ctx, "DEL", lockKey) - return false - } - } - - return success -} - -// releaseTaskLock 释放任务级分布式锁 -func (s *OrderStatisticsScheduler) releaseTaskLock(ctx context.Context, lockKey string) { - _, err := g.Redis().Do(ctx, "DEL", lockKey) - if err != nil { - g.Log().Errorf(ctx, "释放任务锁失败: %v", err) - } -} - -// startDailyReportScheduler 日报表定时任务 -func (s *OrderStatisticsScheduler) startDailyReportScheduler(ctx context.Context) { - // 计算到凌晨3点的时间 - now := time.Now() - next := time.Date(now.Year(), now.Month(), now.Day()+1, 3, 0, 0, 0, time.Local) - duration := next.Sub(now) - - // 等待到凌晨3点 - time.Sleep(duration) - - ticker := time.NewTicker(24 * time.Hour) - defer ticker.Stop() - - // 立即执行一次昨天的日报表生成 - go s.generateYesterdayDailyReport(ctx) - - for { - select { - case <-ticker.C: - // 生成昨天的日报表 - s.generateYesterdayDailyReport(ctx) - case <-ctx.Done(): - return - } - } -} - -// startMonthlyReportScheduler 月报表定时任务 -func (s *OrderStatisticsScheduler) startMonthlyReportScheduler(ctx context.Context) { - // 计算到下个月1日凌晨4点的时间 - now := time.Now() - next := time.Date(now.Year(), now.Month()+1, 1, 4, 0, 0, 0, time.Local) - duration := next.Sub(now) - - // 等待到下个月1日凌晨4点 - time.Sleep(duration) - - ticker := time.NewTicker(24 * time.Hour) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - // 检查是否是每月1日,如果是则生成上个月的月报表 - if time.Now().Day() == 1 { - go s.generateLastMonthReport(ctx) - } - case <-ctx.Done(): - return - } - } -} - -// startQuarterlyReportScheduler 季度报表定时任务 -func (s *OrderStatisticsScheduler) startQuarterlyReportScheduler(ctx context.Context) { - // 计算到下个季度第一天凌晨5点的时间 - now := time.Now() - nextQuarter := s.getNextQuarterFirstDay(now) - next := time.Date(nextQuarter.Year(), nextQuarter.Month(), nextQuarter.Day(), 5, 0, 0, 0, time.Local) - duration := next.Sub(now) - - // 等待到下个季度第一天凌晨5点 - time.Sleep(duration) - - ticker := time.NewTicker(24 * time.Hour) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - // 检查是否是季度第一天,如果是则生成上个季度的季度报表 - if s.isQuarterFirstDay() { - go s.generateLastQuarterReport(ctx) - } - case <-ctx.Done(): - return - } - } -} - -// startYearlyReportScheduler 年报表定时任务 -func (s *OrderStatisticsScheduler) startYearlyReportScheduler(ctx context.Context) { - // 计算到明年1月1日凌晨6点的时间 - now := time.Now() - next := time.Date(now.Year()+1, time.January, 1, 6, 0, 0, 0, time.Local) - duration := next.Sub(now) - - // 等待到明年1月1日凌晨6点 - time.Sleep(duration) - - ticker := time.NewTicker(24 * time.Hour) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - // 检查是否是1月1日,如果是则生成上一年的年报表 - if time.Now().Month() == time.January && time.Now().Day() == 1 { - go s.generateLastYearReport(ctx) - } - case <-ctx.Done(): - return - } - } -} - -// generateYesterdayDailyReport 生成昨天的日报表 -func (s *OrderStatisticsScheduler) generateYesterdayDailyReport(ctx context.Context) { - yesterday := time.Now().AddDate(0, 0, -1) - - g.Log().Infof(ctx, "开始生成所有租户的日统计数据: %s", yesterday.Format("2006-01-02")) - - // 获取所有租户ID - tenantIDs, err := s.getAllTenants(ctx) - if err != nil { - g.Log().Errorf(ctx, "获取租户列表失败: %v", err) - return - } - - // 并发处理每个租户的日统计 - for _, tenantID := range tenantIDs { - go s.generateTenantDailyReport(ctx, tenantID, yesterday) - } -} - -// generateTenantDailyReport 生成指定租户的日报表 -func (s *OrderStatisticsScheduler) generateTenantDailyReport(ctx context.Context, tenantID int64, date time.Time) { - lockKey := fmt.Sprintf("order_stats_daily_%d_%s", tenantID, date.Format("2006-01-02")) - - // 获取任务锁 - if !s.acquireTaskLock(ctx, lockKey) { - g.Log().Infof(ctx, "租户 %d 的日统计任务正在执行,跳过", tenantID) - return - } - defer s.releaseTaskLock(ctx, lockKey) - - g.Log().Infof(ctx, "开始生成租户 %d 的日统计: %s", tenantID, date.Format("2006-01-02")) - - err := service.OrderStatistics.GenerateDailyStatistics(ctx, tenantID, date, false) - if err != nil { - g.Log().Errorf(ctx, "生成租户 %d 日统计失败: %v", tenantID, err) - return - } - - g.Log().Infof(ctx, "租户 %d 日统计生成成功: %s", tenantID, date.Format("2006-01-02")) -} - -// generateLastMonthReport 生成上个月的月报表 -func (s *OrderStatisticsScheduler) generateLastMonthReport(ctx context.Context) { - lastMonth := time.Now().AddDate(0, -1, 0) - year := lastMonth.Year() - month := int(lastMonth.Month()) - - g.Log().Infof(ctx, "开始生成所有租户的月统计数据: %d年%d月", year, month) - - // 获取所有租户ID - tenantIDs, err := s.getAllTenants(ctx) - if err != nil { - g.Log().Errorf(ctx, "获取租户列表失败: %v", err) - return - } - - // 并发处理每个租户的月统计 - for _, tenantID := range tenantIDs { - go s.generateTenantMonthlyReport(ctx, tenantID, year, month) - } -} - -// generateTenantMonthlyReport 生成指定租户的月报表 -func (s *OrderStatisticsScheduler) generateTenantMonthlyReport(ctx context.Context, tenantID int64, year int, month int) { - lockKey := fmt.Sprintf("order_stats_monthly_%d_%d_%d", tenantID, year, month) - - // 获取任务锁 - if !s.acquireTaskLock(ctx, lockKey) { - g.Log().Infof(ctx, "租户 %d 的月统计任务正在执行,跳过", tenantID) - return - } - defer s.releaseTaskLock(ctx, lockKey) - - g.Log().Infof(ctx, "开始生成租户 %d 的月统计: %d年%d月", tenantID, year, month) - - err := service.OrderStatistics.GenerateMonthlyStatistics(ctx, tenantID, year, month, false) - if err != nil { - g.Log().Errorf(ctx, "生成租户 %d 月统计失败: %v", tenantID, err) - return - } - - g.Log().Infof(ctx, "租户 %d 月统计生成成功: %d年%d月", tenantID, year, month) -} - -// generateLastQuarterReport 生成上个季度的季度报表 -func (s *OrderStatisticsScheduler) generateLastQuarterReport(ctx context.Context) { - lastQuarter := time.Now().AddDate(0, -3, 0) - year := lastQuarter.Year() - quarter := s.getQuarter(lastQuarter.Month()) - - g.Log().Infof(ctx, "开始生成所有租户的季度统计数据: %d年第%d季度", year, quarter) - - // 获取所有租户ID - tenantIDs, err := s.getAllTenants(ctx) - if err != nil { - g.Log().Errorf(ctx, "获取租户列表失败: %v", err) - return - } - - // 并发处理每个租户的季度统计 - for _, tenantID := range tenantIDs { - go s.generateTenantQuarterlyReport(ctx, tenantID, year, quarter) - } -} - -// generateTenantQuarterlyReport 生成指定租户的季度报表 -func (s *OrderStatisticsScheduler) generateTenantQuarterlyReport(ctx context.Context, tenantID int64, year int, quarter int) { - lockKey := fmt.Sprintf("order_stats_quarterly_%d_%d_%d", tenantID, year, quarter) - - // 获取任务锁 - if !s.acquireTaskLock(ctx, lockKey) { - g.Log().Infof(ctx, "租户 %d 的季度统计任务正在执行,跳过", tenantID) - return - } - defer s.releaseTaskLock(ctx, lockKey) - - g.Log().Infof(ctx, "开始生成租户 %d 的季度统计: %d年第%d季度", tenantID, year, quarter) - - err := service.OrderStatistics.GenerateQuarterlyStatistics(ctx, tenantID, year, quarter, false) - if err != nil { - g.Log().Errorf(ctx, "生成租户 %d 季度统计失败: %v", tenantID, err) - return - } - - g.Log().Infof(ctx, "租户 %d 季度统计生成成功: %d年第%d季度", tenantID, year, quarter) -} - -// generateLastYearReport 生成上一年的年报表 -func (s *OrderStatisticsScheduler) generateLastYearReport(ctx context.Context) { - lastYear := time.Now().Year() - 1 - - g.Log().Infof(ctx, "开始生成所有租户的年统计数据: %d年", lastYear) - - // 获取所有租户ID - tenantIDs, err := s.getAllTenants(ctx) - if err != nil { - g.Log().Errorf(ctx, "获取租户列表失败: %v", err) - return - } - - // 并发处理每个租户的年统计 - for _, tenantID := range tenantIDs { - go s.generateTenantYearlyReport(ctx, tenantID, lastYear) - } -} - -// generateTenantYearlyReport 生成指定租户的年报表 -func (s *OrderStatisticsScheduler) generateTenantYearlyReport(ctx context.Context, tenantID int64, year int) { - lockKey := fmt.Sprintf("order_stats_yearly_%d_%d", tenantID, year) - - // 获取任务锁 - if !s.acquireTaskLock(ctx, lockKey) { - g.Log().Infof(ctx, "租户 %d 的年统计任务正在执行,跳过", tenantID) - return - } - defer s.releaseTaskLock(ctx, lockKey) - - g.Log().Infof(ctx, "开始生成租户 %d 的年统计: %d年", tenantID, year) - - err := service.OrderStatistics.GenerateYearlyStatistics(ctx, tenantID, year, false) - if err != nil { - g.Log().Errorf(ctx, "生成租户 %d 年统计失败: %v", tenantID, err) - return - } - - g.Log().Infof(ctx, "租户 %d 年统计生成成功: %d年", tenantID, year) -} - -// getNextQuarterFirstDay 获取下个季度的第一天 -func (s *OrderStatisticsScheduler) getNextQuarterFirstDay(now time.Time) time.Time { - quarter := s.getQuarter(now.Month()) - if quarter == 4 { - return time.Date(now.Year()+1, time.January, 1, 0, 0, 0, 0, now.Location()) - } - return time.Date(now.Year(), time.Month((quarter*3)+1), 1, 0, 0, 0, 0, now.Location()) -} - -// getQuarter 获取月份对应的季度 -func (s *OrderStatisticsScheduler) getQuarter(month time.Month) int { - switch { - case month >= 1 && month <= 3: - return 1 - case month >= 4 && month <= 6: - return 2 - case month >= 7 && month <= 9: - return 3 - default: - return 4 - } -} - -// isQuarterFirstDay 检查今天是否是季度的第一天 -func (s *OrderStatisticsScheduler) isQuarterFirstDay() bool { - now := time.Now() - quarter := s.getQuarter(now.Month()) - switch quarter { - case 1: - return now.Day() == 1 && now.Month() == time.January - case 2: - return now.Day() == 1 && now.Month() == time.April - case 3: - return now.Day() == 1 && now.Month() == time.July - case 4: - return now.Day() == 1 && now.Month() == time.October - default: - return false - } -} - -// getAllTenants 获取所有租户ID(这里需要根据实际业务实现) -func (s *OrderStatisticsScheduler) getAllTenants(ctx context.Context) ([]int64, error) { - // 这里应该从实际的租户管理服务获取租户列表 - // 暂时返回一个默认的租户ID,实际使用时需要替换为真实的租户获取逻辑 - return []int64{1, 2, 3}, nil -} diff --git a/service/order.go b/service/order.go index f68a9fe..e5b046c 100644 --- a/service/order.go +++ b/service/order.go @@ -12,20 +12,29 @@ import ( "order/model/dto" "order/model/entity" + "gitee.com/red-future---jilin-g/common/http" + "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/util/gconv" + "github.com/robfig/cron/v3" "go.mongodb.org/mongo-driver/v2/bson" ) +// AssetServiceResponse 资产服务响应结构体 +type AssetServiceResponse struct { + ID string `json:"id"` + Name string `json:"name"` + UnlimitedStock bool `json:"unlimitedStock"` + Price *struct { + TotalStock int `json:"totalStock"` + } `json:"price"` + SaleMode string `json:"saleMode"` +} + type order struct{} // Order 订单服务 var Order = new(order) -// Init 初始化服务 -func Init() error { - return nil -} - // convertOrderItemsFromDTO 从DTO转换订单商品项 func convertOrderItemsFromDTO(items []dto.OrderItemReq) []entity.OrderItem { var result []entity.OrderItem @@ -122,7 +131,41 @@ func (s *order) CreateOrder(ctx context.Context, req *dto.CreateOrderReq) (*dto. return nil, errors.New("订单商品不能为空") } - // 2. 计算订单金额 + // 2. 检查库存扣减策略并处理库存 + for i := range req.OrderItems { + item := &req.OrderItems[i] + + // 判断是否需要扣减库存 + shouldDeduct, saleMode, err := s.shouldDeductStock(ctx, gconv.String(req.TenantID), item.AssetID) + if err != nil { + return nil, fmt.Errorf("检查库存策略失败: %w", err) + } + + // 如果需要扣减库存,判断扣减时机 + if shouldDeduct { + timing, err := s.getDeductStockTiming(ctx, gconv.String(req.TenantID), item.AssetID) + if err != nil { + return nil, fmt.Errorf("获取库存扣减时机失败: %w", err) + } + + // 秒杀场景:下单时立即扣减库存 + if timing == "order_create" { + quantity := len(item.Stocks) // 每个库存项数量为1 + + // 先生成订单号用于库存操作 + orderNo := s.generateOrderNo(gconv.String(req.TenantID)) + + if err := s.deductStock(ctx, gconv.String(req.TenantID), orderNo, item.AssetID, quantity, fmt.Sprintf("秒杀订单预占库存,售卖方式:%s", saleMode)); err != nil { + return nil, fmt.Errorf("秒杀订单预占库存失败: %w", err) + } + + // 记录库存已预占 + g.Log().Infof(ctx, "资产 %s 秒杀订单 %s 预占库存成功,数量:%d", item.AssetID, orderNo, quantity) + } + } + } + + // 3. 计算订单金额 totalAmount := int64(0) for i := range req.OrderItems { item := &req.OrderItems[i] @@ -141,10 +184,10 @@ func (s *order) CreateOrder(ctx context.Context, req *dto.CreateOrderReq) (*dto. return nil, errors.New("订单总金额必须大于0") } - // 3. 生成订单号 + // 4. 生成订单号 orderNo := s.generateOrderNo(gconv.String(req.TenantID)) - // 4. 设置订单过期时间(30分钟后) + // 5. 设置订单过期时间(30分钟后) expiredAt := time.Now().Add(30 * time.Minute) // 5. 创建待支付订单 @@ -414,7 +457,44 @@ func (s *order) CancelOrder(ctx context.Context, req *dto.CancelOrderReq) (*dto. return nil, fmt.Errorf("订单状态不正确,当前状态: %s", status) } - // 4. 将订单移动到已取消状态 + pendingOrder, ok := order.(*entity.OrderPending) + if !ok { + return nil, errors.New("订单类型错误") + } + + // 4. 处理库存回充(针对秒杀场景预占的库存) + for _, item := range pendingOrder.OrderItems { + // 判断是否需要回充库存 + shouldDeduct, saleMode, err := s.shouldDeductStock(ctx, req.TenantID, item.AssetID) + if err != nil { + g.Log().Errorf(ctx, "检查库存策略失败,订单:%s,资产:%s,错误:%v", req.OrderNo, item.AssetID, err) + continue + } + + // 如果该资产需要扣减库存且是秒杀场景,则需要回充预占库存 + if shouldDeduct && saleMode == "flash" { + timing, err := s.getDeductStockTiming(ctx, req.TenantID, item.AssetID) + if err != nil { + g.Log().Errorf(ctx, "获取库存扣减时机失败,订单:%s,资产:%s,错误:%v", req.OrderNo, item.AssetID, err) + continue + } + + // 秒杀场景:下单时已扣减库存,取消时需要回充 + if timing == "order_create" { + quantity := len(item.Stocks) // 每个库存项数量为1 + + if err := s.refundStock(ctx, req.TenantID, req.OrderNo, item.AssetID, quantity, fmt.Sprintf("秒杀订单取消回充库存,售卖方式:%s,原因:%s", saleMode, req.Reason)); err != nil { + g.Log().Errorf(ctx, "订单取消回充库存失败,订单:%s,资产:%s,错误:%v", req.OrderNo, item.AssetID, err) + // 库存回充失败不影响订单状态更新,但需要记录错误 + continue + } + + g.Log().Infof(ctx, "资产 %s 订单取消回充库存成功,订单:%s,数量:%d", item.AssetID, req.OrderNo, quantity) + } + } + } + + // 5. 将订单移动到已取消状态 updateData := bson.M{ "cancel_reason": req.Reason, } @@ -530,3 +610,236 @@ func (s *order) ProcessExpiredOrders(ctx context.Context) error { func (s *order) UpdatePayInfo(ctx context.Context, orderNo string, payInfo entity.PayInfo) error { return dao.Order.UpdatePayInfo(ctx, orderNo, payInfo) } + +// shouldDeductStock 判断是否需要扣减库存 +func (s *order) shouldDeductStock(ctx context.Context, tenantID, assetID string) (bool, string, error) { + // 调用assets服务获取资产信息 + assetResp, err := s.getAssetFromAssetService(ctx, tenantID, assetID) + if err != nil { + return false, "", fmt.Errorf("获取资产信息失败: %w", err) + } + + if assetResp == nil { + return false, "", errors.New("资产不存在") + } + + // 无库存限制的资产不需要扣减库存 + if assetResp.UnlimitedStock { + return false, "unlimited_stock", nil + } + + // 预售场景不扣减库存 + if assetResp.SaleMode == "presale" { + return false, "presale_no_deduct", nil + } + + // 有库存限制的常规售卖和秒杀需要扣减库存 + return true, assetResp.SaleMode, nil +} + +// getDeductStockTiming 获取库存扣减时机 +func (s *order) getDeductStockTiming(ctx context.Context, tenantID, assetID string) (string, error) { + assetResp, err := s.getAssetFromAssetService(ctx, tenantID, assetID) + if err != nil { + return "", fmt.Errorf("获取资产信息失败: %w", err) + } + + if assetResp == nil { + return "", errors.New("资产不存在") + } + + // 根据售卖方式确定扣减时机 + switch assetResp.SaleMode { + case "flash": + return "order_create", nil // 秒杀:下单时扣减库存 + case "regular": + return "payment_success", nil // 常规售卖:支付成功时扣减库存 + default: + return "", errors.New("未知售卖方式") + } +} + +// deductStock 扣减库存 +func (s *order) deductStock(ctx context.Context, tenantID, orderNo, assetID string, quantity int, reason string) error { + // 调用assets服务扣减库存 + err := s.deductStockFromAssetService(ctx, tenantID, assetID, quantity) + if err != nil { + return fmt.Errorf("扣减库存失败: %w", err) + } + + g.Log().Infof(ctx, "库存扣减成功,订单:%s,资产:%s,数量:%d,原因:%s", orderNo, assetID, quantity, reason) + return nil +} + +// refundStock 回充库存 +func (s *order) refundStock(ctx context.Context, tenantID, orderNo, assetID string, quantity int, reason string) error { + // 调用资产服务回充库存 + err := s.refundStockFromAssetService(ctx, tenantID, assetID, quantity) + if err != nil { + return fmt.Errorf("回充库存失败: %w", err) + } + + g.Log().Infof(ctx, "库存回充成功,订单:%s,资产:%s,数量:%d,原因:%s", orderNo, assetID, quantity, reason) + return nil +} + +// getAssetFromAssetService 从资产服务获取资产信息 +func (s *order) getAssetFromAssetService(ctx context.Context, tenantID, assetID string) (*AssetServiceResponse, error) { + // 使用common/http中的封装方法调用资产服务 + // 这里应该调用assets服务的getAsset接口 + // 暂时返回模拟数据 + var assetResp AssetServiceResponse + err := http.Get(ctx, fmt.Sprintf("http://assets-service/internal/asset/%s", assetID), &assetResp) + if err != nil { + return nil, err + } + + return &assetResp, nil +} + +// deductStockFromAssetService 从资产服务扣减库存 +func (s *order) deductStockFromAssetService(ctx context.Context, tenantID, assetID string, quantity int) error { + // 调用assets服务的扣减库存接口 + req := map[string]interface{}{ + "asset_id": assetID, + "quantity": quantity, + "reason": "订单扣减", + } + + var result struct { + Success bool `json:"success"` + Message string `json:"message"` + } + + err := http.Post(ctx, "http://assets-service/internal/stock/deduct", &result, req) + if err != nil { + return err + } + + if !result.Success { + return errors.New(result.Message) + } + + return nil +} + +// refundStockFromAssetService 从资产服务回充库存 +func (s *order) refundStockFromAssetService(ctx context.Context, tenantID, assetID string, quantity int) error { + // 调用资产服务的回充库存接口 + req := map[string]interface{}{ + "asset_id": assetID, + "quantity": quantity, + "reason": "订单取消回充", + } + + var result struct { + Success bool `json:"success"` + Message string `json:"message"` + } + + err := http.Post(ctx, "http://assets-service/internal/stock/refund", &result, req) + if err != nil { + return err + } + + if !result.Success { + return errors.New(result.Message) + } + + return nil +} + +// OrderTimeoutScheduler 订单超时处理定时任务 +type OrderTimeoutScheduler struct { + cron *cron.Cron +} + +var orderTimeoutSchedulerInstance = &OrderTimeoutScheduler{} + +// StartOrderTimeoutScheduler 启动订单超时处理定时任务 +func StartOrderTimeoutScheduler(ctx context.Context) error { + return orderTimeoutSchedulerInstance.StartScheduler(ctx) +} + +// StopOrderTimeoutScheduler 停止订单超时处理定时任务 +func StopOrderTimeoutScheduler() { + orderTimeoutSchedulerInstance.StopScheduler() +} + +// StartScheduler 启动订单超时处理定时任务 +func (s *OrderTimeoutScheduler) StartScheduler(ctx context.Context) error { + if s.cron != nil { + s.cron.Stop() + } + + s.cron = cron.New(cron.WithSeconds()) + + // 每分钟检查一次超时订单 + _, err := s.cron.AddFunc("0 */1 * * * *", func() { + s.processTimeoutOrders(ctx) + }) + + if err != nil { + return fmt.Errorf("添加订单超时检查定时任务失败: %w", err) + } + + s.cron.Start() + g.Log().Info(ctx, "订单超时处理定时任务已启动") + return nil +} + +// StopScheduler 停止定时任务 +func (s *OrderTimeoutScheduler) StopScheduler() { + if s.cron != nil { + s.cron.Stop() + s.cron = nil + } +} + +// processTimeoutOrders 处理超时订单 +func (s *OrderTimeoutScheduler) processTimeoutOrders(ctx context.Context) { + g.Log().Info(ctx, "开始处理超时订单") + + // 查询所有超时的待支付订单 + expiredOrders, err := dao.Order.GetExpiredPendingOrders(ctx) + if err != nil { + g.Log().Errorf(ctx, "查询超时订单失败: %v", err) + return + } + + if len(expiredOrders) == 0 { + g.Log().Debug(ctx, "没有需要处理的超时订单") + return + } + + g.Log().Infof(ctx, "发现 %d 个超时订单", len(expiredOrders)) + + // 逐个处理超时订单 + for _, order := range expiredOrders { + if err := s.processTimeoutOrder(ctx, &order); err != nil { + g.Log().Errorf(ctx, "处理超时订单失败,订单号:%s,错误:%v", order.OrderNo, err) + continue + } + g.Log().Infof(ctx, "超时订单处理成功,订单号:%s", order.OrderNo) + } +} + +// processTimeoutOrder 处理单个超时订单 +func (s *OrderTimeoutScheduler) processTimeoutOrder(ctx context.Context, order *entity.OrderPending) error { + // 获取租户ID(从订单中获取) + tenantID := order.TenantId + if tenantID == "" { + return fmt.Errorf("订单缺少租户ID") + } + + // 将订单移动到已取消状态 + updateData := map[string]interface{}{ + "cancel_reason": "订单超时自动取消", + } + + if err := dao.Order.MoveOrderToStatus(ctx, consts.OrderStatusPending, consts.OrderStatusCancelled, order.OrderNo, updateData); err != nil { + return fmt.Errorf("更新订单状态失败: %w", err) + } + + return nil +} diff --git a/service/order_statistics.go b/service/order_statistics.go index 88f3321..e185d68 100644 --- a/service/order_statistics.go +++ b/service/order_statistics.go @@ -3,6 +3,7 @@ package service import ( "context" "fmt" + "sync" "time" "order/dao" @@ -437,3 +438,477 @@ func (s *orderStatistics) GenerateStatistics(ctx context.Context, req *dto.Gener Message: "统计数据生成成功", }, nil } + +// OrderStatisticsScheduler 订单统计定时任务调度器 +type OrderStatisticsScheduler struct{} + +var orderStatisticsSchedulerInstance = &OrderStatisticsScheduler{} +var statisticsSchedulerLock sync.Mutex +var isStatisticsSchedulerRunning bool + +// StartOrderStatisticsScheduler 启动订单统计定时任务调度器 +func StartOrderStatisticsScheduler(ctx context.Context) error { + return orderStatisticsSchedulerInstance.StartScheduler(ctx) +} + +// StartScheduler 启动定时任务调度器(分布式安全) +func (s *OrderStatisticsScheduler) StartScheduler(ctx context.Context) error { + statisticsSchedulerLock.Lock() + defer statisticsSchedulerLock.Unlock() + + // 检查是否已经有调度器在运行(分布式部署时避免重复执行) + if isStatisticsSchedulerRunning { + g.Log().Info(ctx, "订单统计定时任务调度器已在运行") + return nil + } + + // 尝试获取分布式锁 + if !s.acquireDistributedLock(ctx) { + g.Log().Info(ctx, "其他节点正在运行订单统计定时任务,当前节点跳过") + return nil + } + + isStatisticsSchedulerRunning = true + + // 启动锁续期任务 + go s.startLockRenewal(ctx) + + // 启动日报表生成任务(每天凌晨3点执行) + go s.startDailyReportScheduler(ctx) + + // 启动月报表生成任务(每月1日凌晨4点执行) + go s.startMonthlyReportScheduler(ctx) + + // 启动季度报表生成任务(每季度首月5日凌晨5点执行) + go s.startQuarterlyReportScheduler(ctx) + + // 启动年报表生成任务(每年1月10日凌晨6点执行) + go s.startYearlyReportScheduler(ctx) + + g.Log().Info(ctx, "订单统计定时任务调度器已启动") + return nil +} + +// acquireDistributedLock 获取分布式锁 +func (s *OrderStatisticsScheduler) acquireDistributedLock(ctx context.Context) bool { + lockKey := "order_statistics_scheduler_lock" + + // 尝试设置锁,过期时间30秒 + success, err := g.Redis().SetNX(ctx, lockKey, "locked") + if err != nil { + g.Log().Errorf(ctx, "获取分布式锁失败: %v", err) + return false + } + + // 设置过期时间 + if success { + _, err = g.Redis().Do(ctx, "EXPIRE", lockKey, 30) + if err != nil { + g.Log().Errorf(ctx, "设置锁过期时间失败: %v", err) + // 删除已设置的锁 + g.Redis().Do(ctx, "DEL", lockKey) + return false + } + } + + return success +} + +// renewDistributedLock 续期分布式锁 +func (s *OrderStatisticsScheduler) renewDistributedLock(ctx context.Context) bool { + lockKey := "order_statistics_scheduler_lock" + + // 续期30秒 + _, err := g.Redis().Do(ctx, "EXPIRE", lockKey, 30) + if err != nil { + g.Log().Errorf(ctx, "续期分布式锁失败: %v", err) + return false + } + + return true +} + +// startLockRenewal 启动锁续期任务 +func (s *OrderStatisticsScheduler) startLockRenewal(ctx context.Context) { + ticker := time.NewTicker(20 * time.Second) // 每20秒续期一次 + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if !s.renewDistributedLock(ctx) { + g.Log().Error(ctx, "分布式锁续期失败,停止调度器") + statisticsSchedulerLock.Lock() + isStatisticsSchedulerRunning = false + statisticsSchedulerLock.Unlock() + return + } + case <-ctx.Done(): + // 释放分布式锁 + s.releaseDistributedLock(ctx) + return + } + } +} + +// releaseDistributedLock 释放分布式锁 +func (s *OrderStatisticsScheduler) releaseDistributedLock(ctx context.Context) { + lockKey := "order_statistics_scheduler_lock" + _, err := g.Redis().Do(ctx, "DEL", lockKey) + if err != nil { + g.Log().Errorf(ctx, "释放分布式锁失败: %v", err) + } +} + +// acquireTaskLock 获取任务级分布式锁 +func (s *OrderStatisticsScheduler) acquireTaskLock(ctx context.Context, lockKey string) bool { + // 尝试设置锁,过期时间10分钟 + success, err := g.Redis().SetNX(ctx, lockKey, "locked") + if err != nil { + g.Log().Errorf(ctx, "获取任务锁失败: %v", err) + return false + } + + // 设置过期时间 + if success { + _, err = g.Redis().Do(ctx, "EXPIRE", lockKey, 600) // 10分钟=600秒 + if err != nil { + g.Log().Errorf(ctx, "设置锁过期时间失败: %v", err) + // 删除已设置的锁 + g.Redis().Do(ctx, "DEL", lockKey) + return false + } + } + + return success +} + +// releaseTaskLock 释放任务级分布式锁 +func (s *OrderStatisticsScheduler) releaseTaskLock(ctx context.Context, lockKey string) { + _, err := g.Redis().Do(ctx, "DEL", lockKey) + if err != nil { + g.Log().Errorf(ctx, "释放任务锁失败: %v", err) + } +} + +// startDailyReportScheduler 日报表定时任务 +func (s *OrderStatisticsScheduler) startDailyReportScheduler(ctx context.Context) { + // 计算到凌晨3点的时间 + now := time.Now() + next := time.Date(now.Year(), now.Month(), now.Day()+1, 3, 0, 0, 0, time.Local) + duration := next.Sub(now) + + // 等待到凌晨3点 + time.Sleep(duration) + + ticker := time.NewTicker(24 * time.Hour) + defer ticker.Stop() + + // 立即执行一次昨天的日报表生成 + go s.generateYesterdayDailyReport(ctx) + + for { + select { + case <-ticker.C: + // 生成昨天的日报表 + s.generateYesterdayDailyReport(ctx) + case <-ctx.Done(): + return + } + } +} + +// startMonthlyReportScheduler 月报表定时任务 +func (s *OrderStatisticsScheduler) startMonthlyReportScheduler(ctx context.Context) { + // 计算到下个月1日凌晨4点的时间 + now := time.Now() + next := time.Date(now.Year(), now.Month()+1, 1, 4, 0, 0, 0, time.Local) + duration := next.Sub(now) + + // 等待到下个月1日凌晨4点 + time.Sleep(duration) + + ticker := time.NewTicker(24 * time.Hour) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // 检查是否是每月1日,如果是则生成上个月的月报表 + if time.Now().Day() == 1 { + go s.generateLastMonthReport(ctx) + } + case <-ctx.Done(): + return + } + } +} + +// startQuarterlyReportScheduler 季度报表定时任务 +func (s *OrderStatisticsScheduler) startQuarterlyReportScheduler(ctx context.Context) { + // 计算到下个季度第一天凌晨5点的时间 + now := time.Now() + nextQuarter := s.getNextQuarterFirstDay(now) + next := time.Date(nextQuarter.Year(), nextQuarter.Month(), nextQuarter.Day(), 5, 0, 0, 0, time.Local) + duration := next.Sub(now) + + // 等待到下个季度第一天凌晨5点 + time.Sleep(duration) + + ticker := time.NewTicker(24 * time.Hour) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // 检查是否是季度第一天,如果是则生成上个季度的季度报表 + if s.isQuarterFirstDay() { + go s.generateLastQuarterReport(ctx) + } + case <-ctx.Done(): + return + } + } +} + +// startYearlyReportScheduler 年报表定时任务 +func (s *OrderStatisticsScheduler) startYearlyReportScheduler(ctx context.Context) { + // 计算到明年1月1日凌晨6点的时间 + now := time.Now() + next := time.Date(now.Year()+1, time.January, 1, 6, 0, 0, 0, time.Local) + duration := next.Sub(now) + + // 等待到明年1月1日凌晨6点 + time.Sleep(duration) + + ticker := time.NewTicker(24 * time.Hour) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // 检查是否是1月1日,如果是则生成上一年的年报表 + if time.Now().Month() == time.January && time.Now().Day() == 1 { + go s.generateLastYearReport(ctx) + } + case <-ctx.Done(): + return + } + } +} + +// generateYesterdayDailyReport 生成昨天的日报表 +func (s *OrderStatisticsScheduler) generateYesterdayDailyReport(ctx context.Context) { + yesterday := time.Now().AddDate(0, 0, -1) + + g.Log().Infof(ctx, "开始生成所有租户的日统计数据: %s", yesterday.Format("2006-01-02")) + + // 获取所有租户ID + tenantIDs, err := s.getAllTenants(ctx) + if err != nil { + g.Log().Errorf(ctx, "获取租户列表失败: %v", err) + return + } + + // 并发处理每个租户的日统计 + for _, tenantID := range tenantIDs { + go s.generateTenantDailyReport(ctx, tenantID, yesterday) + } +} + +// generateTenantDailyReport 生成指定租户的日报表 +func (s *OrderStatisticsScheduler) generateTenantDailyReport(ctx context.Context, tenantID int64, date time.Time) { + lockKey := fmt.Sprintf("order_stats_daily_%d_%s", tenantID, date.Format("2006-01-02")) + + // 获取任务锁 + if !s.acquireTaskLock(ctx, lockKey) { + g.Log().Infof(ctx, "租户 %d 的日统计任务正在执行,跳过", tenantID) + return + } + defer s.releaseTaskLock(ctx, lockKey) + + g.Log().Infof(ctx, "开始生成租户 %d 的日统计: %s", tenantID, date.Format("2006-01-02")) + + err := OrderStatistics.GenerateDailyStatistics(ctx, tenantID, date, false) + if err != nil { + g.Log().Errorf(ctx, "生成租户 %d 日统计失败: %v", tenantID, err) + return + } + + g.Log().Infof(ctx, "租户 %d 日统计生成成功: %s", tenantID, date.Format("2006-01-02")) +} + +// generateLastMonthReport 生成上个月的月报表 +func (s *OrderStatisticsScheduler) generateLastMonthReport(ctx context.Context) { + lastMonth := time.Now().AddDate(0, -1, 0) + year := lastMonth.Year() + month := int(lastMonth.Month()) + + g.Log().Infof(ctx, "开始生成所有租户的月统计数据: %d年%d月", year, month) + + // 获取所有租户ID + tenantIDs, err := s.getAllTenants(ctx) + if err != nil { + g.Log().Errorf(ctx, "获取租户列表失败: %v", err) + return + } + + // 并发处理每个租户的月统计 + for _, tenantID := range tenantIDs { + go s.generateTenantMonthlyReport(ctx, tenantID, year, month) + } +} + +// generateTenantMonthlyReport 生成指定租户的月报表 +func (s *OrderStatisticsScheduler) generateTenantMonthlyReport(ctx context.Context, tenantID int64, year int, month int) { + lockKey := fmt.Sprintf("order_stats_monthly_%d_%d_%d", tenantID, year, month) + + // 获取任务锁 + if !s.acquireTaskLock(ctx, lockKey) { + g.Log().Infof(ctx, "租户 %d 的月统计任务正在执行,跳过", tenantID) + return + } + defer s.releaseTaskLock(ctx, lockKey) + + g.Log().Infof(ctx, "开始生成租户 %d 的月统计: %d年%d月", tenantID, year, month) + + err := OrderStatistics.GenerateMonthlyStatistics(ctx, tenantID, year, month, false) + if err != nil { + g.Log().Errorf(ctx, "生成租户 %d 月统计失败: %v", tenantID, err) + return + } + + g.Log().Infof(ctx, "租户 %d 月统计生成成功: %d年%d月", tenantID, year, month) +} + +// generateLastQuarterReport 生成上个季度的季度报表 +func (s *OrderStatisticsScheduler) generateLastQuarterReport(ctx context.Context) { + lastQuarter := time.Now().AddDate(0, -3, 0) + year := lastQuarter.Year() + quarter := s.getQuarter(lastQuarter.Month()) + + g.Log().Infof(ctx, "开始生成所有租户的季度统计数据: %d年第%d季度", year, quarter) + + // 获取所有租户ID + tenantIDs, err := s.getAllTenants(ctx) + if err != nil { + g.Log().Errorf(ctx, "获取租户列表失败: %v", err) + return + } + + // 并发处理每个租户的季度统计 + for _, tenantID := range tenantIDs { + go s.generateTenantQuarterlyReport(ctx, tenantID, year, quarter) + } +} + +// generateTenantQuarterlyReport 生成指定租户的季度报表 +func (s *OrderStatisticsScheduler) generateTenantQuarterlyReport(ctx context.Context, tenantID int64, year int, quarter int) { + lockKey := fmt.Sprintf("order_stats_quarterly_%d_%d_%d", tenantID, year, quarter) + + // 获取任务锁 + if !s.acquireTaskLock(ctx, lockKey) { + g.Log().Infof(ctx, "租户 %d 的季度统计任务正在执行,跳过", tenantID) + return + } + defer s.releaseTaskLock(ctx, lockKey) + + g.Log().Infof(ctx, "开始生成租户 %d 的季度统计: %d年第%d季度", tenantID, year, quarter) + + err := OrderStatistics.GenerateQuarterlyStatistics(ctx, tenantID, year, quarter, false) + if err != nil { + g.Log().Errorf(ctx, "生成租户 %d 季度统计失败: %v", tenantID, err) + return + } + + g.Log().Infof(ctx, "租户 %d 季度统计生成成功: %d年第%d季度", tenantID, year, quarter) +} + +// generateLastYearReport 生成上一年的年报表 +func (s *OrderStatisticsScheduler) generateLastYearReport(ctx context.Context) { + lastYear := time.Now().Year() - 1 + + g.Log().Infof(ctx, "开始生成所有租户的年统计数据: %d年", lastYear) + + // 获取所有租户ID + tenantIDs, err := s.getAllTenants(ctx) + if err != nil { + g.Log().Errorf(ctx, "获取租户列表失败: %v", err) + return + } + + // 并发处理每个租户的年统计 + for _, tenantID := range tenantIDs { + go s.generateTenantYearlyReport(ctx, tenantID, lastYear) + } +} + +// generateTenantYearlyReport 生成指定租户的年报表 +func (s *OrderStatisticsScheduler) generateTenantYearlyReport(ctx context.Context, tenantID int64, year int) { + lockKey := fmt.Sprintf("order_stats_yearly_%d_%d", tenantID, year) + + // 获取任务锁 + if !s.acquireTaskLock(ctx, lockKey) { + g.Log().Infof(ctx, "租户 %d 的年统计任务正在执行,跳过", tenantID) + return + } + defer s.releaseTaskLock(ctx, lockKey) + + g.Log().Infof(ctx, "开始生成租户 %d 的年统计: %d年", tenantID, year) + + err := OrderStatistics.GenerateYearlyStatistics(ctx, tenantID, year, false) + if err != nil { + g.Log().Errorf(ctx, "生成租户 %d 年统计失败: %v", tenantID, err) + return + } + + g.Log().Infof(ctx, "租户 %d 年统计生成成功: %d年", tenantID, year) +} + +// getNextQuarterFirstDay 获取下个季度的第一天 +func (s *OrderStatisticsScheduler) getNextQuarterFirstDay(now time.Time) time.Time { + quarter := s.getQuarter(now.Month()) + if quarter == 4 { + return time.Date(now.Year()+1, time.January, 1, 0, 0, 0, 0, now.Location()) + } + return time.Date(now.Year(), time.Month((quarter*3)+1), 1, 0, 0, 0, 0, now.Location()) +} + +// getQuarter 获取月份对应的季度 +func (s *OrderStatisticsScheduler) getQuarter(month time.Month) int { + switch { + case month >= 1 && month <= 3: + return 1 + case month >= 4 && month <= 6: + return 2 + case month >= 7 && month <= 9: + return 3 + default: + return 4 + } +} + +// isQuarterFirstDay 检查今天是否是季度的第一天 +func (s *OrderStatisticsScheduler) isQuarterFirstDay() bool { + now := time.Now() + quarter := s.getQuarter(now.Month()) + switch quarter { + case 1: + return now.Day() == 1 && now.Month() == time.January + case 2: + return now.Day() == 1 && now.Month() == time.April + case 3: + return now.Day() == 1 && now.Month() == time.July + case 4: + return now.Day() == 1 && now.Month() == time.October + default: + return false + } +} + +// getAllTenants 获取所有租户ID(这里需要根据实际业务实现) +func (s *OrderStatisticsScheduler) getAllTenants(ctx context.Context) ([]int64, error) { + // 这里应该从实际的租户管理服务获取租户列表 + // 暂时返回一个默认的租户ID,实际使用时需要替换为真实的租户获取逻辑 + return []int64{1, 2, 3}, nil +} diff --git a/service/payment.go b/service/payment.go index 9e26837..f7e9c68 100644 --- a/service/payment.go +++ b/service/payment.go @@ -12,6 +12,8 @@ import ( "order/model/dto" "order/model/entity" + "gitee.com/red-future---jilin-g/common/http" + "github.com/gogf/gf/v2/frame/g" "go.mongodb.org/mongo-driver/v2/bson" ) @@ -188,8 +190,50 @@ func (s *payment) HandlePaymentNotify(ctx context.Context, req *dto.PaymentNotif return fmt.Errorf("更新支付记录失败: %w", err) } - // 4. 如果支付成功,更新订单状态 + // 4. 如果支付成功,更新订单状态并处理库存 if req.Status == "success" { + // 获取订单信息以处理库存 + order, err := dao.Order.GetPendingOrder(ctx, req.OrderNo) + if err != nil { + return fmt.Errorf("获取待支付订单失败: %w", err) + } + + if order == nil { + return errors.New("待支付订单不存在") + } + + // 处理库存扣减 + for _, item := range order.OrderItems { + // 判断是否需要扣减库存 + shouldDeduct, saleMode, err := s.shouldDeductStock(ctx, paymentRecord.TenantId.(string), item.AssetID) + if err != nil { + g.Log().Errorf(ctx, "检查库存策略失败,订单:%s,资产:%s,错误:%v", req.OrderNo, item.AssetID, err) + continue + } + + // 如果需要扣减库存,判断扣减时机 + if shouldDeduct { + timing, err := s.getDeductStockTiming(ctx, paymentRecord.TenantId.(string), item.AssetID) + if err != nil { + g.Log().Errorf(ctx, "获取库存扣减时机失败,订单:%s,资产:%s,错误:%v", req.OrderNo, item.AssetID, err) + continue + } + + // 常规售卖:支付成功时扣减库存 + if timing == "payment_success" { + quantity := len(item.Stocks) // 每个库存项数量为1 + if err := s.deductStock(ctx, paymentRecord.TenantId.(string), req.OrderNo, item.AssetID, quantity, fmt.Sprintf("支付成功扣减库存,售卖方式:%s", saleMode)); err != nil { + g.Log().Errorf(ctx, "支付成功扣减库存失败,订单:%s,资产:%s,错误:%v", req.OrderNo, item.AssetID, err) + // 库存扣减失败不影响订单状态更新,但需要记录错误 + continue + } + + g.Log().Infof(ctx, "资产 %s 支付成功扣减库存成功,订单:%s,数量:%d", item.AssetID, req.OrderNo, quantity) + } + } + } + + // 更新订单状态 updateData := bson.M{ "paid_at": time.Now(), "transaction_id": req.TransactionID, @@ -344,3 +388,141 @@ func (s *payment) verifyRefundNotifySignature(req *dto.RefundNotifyReq) bool { // 为了演示,我们总是返回true return true } + +// shouldDeductStock 判断是否需要扣减库存 +func (s *payment) shouldDeductStock(ctx context.Context, tenantID, assetID string) (bool, string, error) { + // 调用assets服务获取资产信息 + assetResp, err := s.getAssetFromAssetService(ctx, tenantID, assetID) + if err != nil { + return false, "", fmt.Errorf("获取资产信息失败: %w", err) + } + + if assetResp == nil { + return false, "", errors.New("资产不存在") + } + + // 无库存限制的资产不需要扣减库存 + if assetResp.UnlimitedStock { + return false, "unlimited_stock", nil + } + + // 预售场景不扣减库存 + if assetResp.SaleMode == "presale" { + return false, "presale_no_deduct", nil + } + + // 有库存限制的常规售卖和秒杀需要扣减库存 + return true, assetResp.SaleMode, nil +} + +// getDeductStockTiming 获取库存扣减时机 +func (s *payment) getDeductStockTiming(ctx context.Context, tenantID, assetID string) (string, error) { + assetResp, err := s.getAssetFromAssetService(ctx, tenantID, assetID) + if err != nil { + return "", fmt.Errorf("获取资产信息失败: %w", err) + } + + if assetResp == nil { + return "", errors.New("资产不存在") + } + + // 根据售卖方式确定扣减时机 + switch assetResp.SaleMode { + case "flash": + return "order_create", nil // 秒杀:下单时扣减库存 + case "regular": + return "payment_success", nil // 常规售卖:支付成功时扣减库存 + default: + return "", errors.New("未知售卖方式") + } +} + +// deductStock 扣减库存 +func (s *payment) deductStock(ctx context.Context, tenantID, orderNo, assetID string, quantity int, reason string) error { + // 调用assets服务扣减库存 + err := s.deductStockFromAssetService(ctx, tenantID, assetID, quantity) + if err != nil { + return fmt.Errorf("扣减库存失败: %w", err) + } + + g.Log().Infof(ctx, "库存扣减成功,订单:%s,资产:%s,数量:%d,原因:%s", orderNo, assetID, quantity, reason) + return nil +} + +// refundStock 回充库存 +func (s *payment) refundStock(ctx context.Context, tenantID, orderNo, assetID string, quantity int, reason string) error { + // 调用资产服务回充库存 + err := s.refundStockFromAssetService(ctx, tenantID, assetID, quantity) + if err != nil { + return fmt.Errorf("回充库存失败: %w", err) + } + + g.Log().Infof(ctx, "库存回充成功,订单:%s,资产:%s,数量:%d,原因:%s", orderNo, assetID, quantity, reason) + return nil +} + +// getAssetFromAssetService 从资产服务获取资产信息 +func (s *payment) getAssetFromAssetService(ctx context.Context, tenantID, assetID string) (*AssetServiceResponse, error) { + // 使用common/http中的封装方法调用资产服务 + // 这里应该调用assets服务的getAsset接口 + // 暂时返回模拟数据 + var assetResp AssetServiceResponse + err := http.Get(ctx, fmt.Sprintf("http://assets-service/internal/asset/%s", assetID), &assetResp) + if err != nil { + return nil, err + } + + return &assetResp, nil +} + +// deductStockFromAssetService 从资产服务扣减库存 +func (s *payment) deductStockFromAssetService(ctx context.Context, tenantID, assetID string, quantity int) error { + // 调用assets服务的扣减库存接口 + req := map[string]interface{}{ + "asset_id": assetID, + "quantity": quantity, + "reason": "订单扣减", + } + + var result struct { + Success bool `json:"success"` + Message string `json:"message"` + } + + err := http.Post(ctx, "http://assets-service/internal/stock/deduct", &result, req) + if err != nil { + return err + } + + if !result.Success { + return errors.New(result.Message) + } + + return nil +} + +// refundStockFromAssetService 从资产服务回充库存 +func (s *payment) refundStockFromAssetService(ctx context.Context, tenantID, assetID string, quantity int) error { + // 调用资产服务的回充库存接口 + req := map[string]interface{}{ + "asset_id": assetID, + "quantity": quantity, + "reason": "订单取消回充", + } + + var result struct { + Success bool `json:"success"` + Message string `json:"message"` + } + + err := http.Post(ctx, "http://assets-service/internal/stock/refund", &result, req) + if err != nil { + return err + } + + if !result.Success { + return errors.New(result.Message) + } + + return nil +}