gomod引用

This commit is contained in:
2025-12-15 09:02:30 +08:00
parent 465c138f21
commit 97fd0af10f
8 changed files with 1048 additions and 508 deletions

View File

@@ -4,18 +4,17 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strings"
"time" "time"
"gitee.com/red-future---jilin-g/common/mongo"
"go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"order/consts" "order/consts"
"order/model/entity" "order/model/entity"
)
// Init 初始化DAO "gitee.com/red-future---jilin-g/common/mongo"
func Init() error { )
return nil
}
type order struct{} type order struct{}
@@ -169,7 +168,7 @@ func (d *order) UpdatePendingOrder(ctx context.Context, orderNo string, update b
} }
// ListOrdersByStatus 根据状态查询订单列表 // ListOrdersByStatus 根据状态查询订单列表
func (d *order) ListOrdersByStatus(ctx context.Context, status consts.OrderStatus, userID string, page, pageSize int) (interface{}, int64, error) { func (d *order) ListOrdersByStatus(ctx context.Context, status consts.OrderStatus, userID string, page int, pageSize int, sortFields ...string) (interface{}, int64, error) {
collection, err := d.getCollection(status) collection, err := d.getCollection(status)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
@@ -187,32 +186,51 @@ func (d *order) ListOrdersByStatus(ctx context.Context, status consts.OrderStatu
return nil, 0, err return nil, 0, err
} }
// 分页查询(暂时忽略排序和分页,因为 mongo.Find 不支持这些参数 // 设置分页参数
// TODO: 需要在 common/mongo 中添加支持排序和分页的 Find 方法 var findOptions []options.Lister[options.FindOptions]
if page > 0 && pageSize > 0 {
findOptions = append(findOptions, options.Find().SetSkip(int64((page-1)*pageSize)).SetLimit(int64(pageSize)))
}
// 设置排序参数
if len(sortFields) > 0 {
sort := bson.D{}
for _, field := range sortFields {
var order int
if strings.HasPrefix(field, "-") {
order = -1
field = strings.TrimPrefix(field, "-")
} else {
order = 1
}
sort = append(sort, bson.E{Key: field, Value: order})
}
findOptions = append(findOptions, options.Find().SetSort(sort))
}
// 根据状态返回对应的订单类型 // 根据状态返回对应的订单类型
switch status { switch status {
case consts.OrderStatusPending: case consts.OrderStatusPending:
var orders []entity.OrderPending var orders []entity.OrderPending
if err := mongo.Find(ctx, filter, &orders, collection); err != nil { if err := mongo.Find(ctx, filter, &orders, collection, findOptions...); err != nil {
return nil, 0, err return nil, 0, err
} }
return orders, total, nil return orders, total, nil
case consts.OrderStatusPaid: case consts.OrderStatusPaid:
var orders []entity.OrderPaid var orders []entity.OrderPaid
if err := mongo.Find(ctx, filter, &orders, collection); err != nil { if err := mongo.Find(ctx, filter, &orders, collection, findOptions...); err != nil {
return nil, 0, err return nil, 0, err
} }
return orders, total, nil return orders, total, nil
case consts.OrderStatusShipped: case consts.OrderStatusShipped:
var orders []entity.OrderShipped var orders []entity.OrderShipped
if err := mongo.Find(ctx, filter, &orders, collection); err != nil { if err := mongo.Find(ctx, filter, &orders, collection, findOptions...); err != nil {
return nil, 0, err return nil, 0, err
} }
return orders, total, nil return orders, total, nil
case consts.OrderStatusCompleted: case consts.OrderStatusCompleted:
var orders []entity.OrderCompleted var orders []entity.OrderCompleted
if err := mongo.Find(ctx, filter, &orders, collection); err != nil { if err := mongo.Find(ctx, filter, &orders, collection, findOptions...); err != nil {
return nil, 0, err return nil, 0, err
} }
return orders, total, nil return orders, total, nil
@@ -222,6 +240,26 @@ func (d *order) ListOrdersByStatus(ctx context.Context, status consts.OrderStatu
} }
// GetExpiredPendingOrders 获取过期的待支付订单 // GetExpiredPendingOrders 获取过期的待支付订单
// GetPendingOrder gets pending order by order number
func (d *order) GetPendingOrder(ctx context.Context, orderNo string) (*entity.OrderPending, error) {
collection, err := d.getCollection(consts.OrderStatusPending)
if err != nil {
return nil, err
}
filter := bson.M{"order_no": orderNo}
var order entity.OrderPending
if err := mongo.FindOne(ctx, filter, &order, collection); err != nil {
if err.Error() == "mongo: no documents in result" {
return nil, nil
}
return nil, err
}
return &order, nil
}
func (d *order) GetExpiredPendingOrders(ctx context.Context) ([]entity.OrderPending, error) { func (d *order) GetExpiredPendingOrders(ctx context.Context) ([]entity.OrderPending, error) {
collection, err := d.getCollection(consts.OrderStatusPending) collection, err := d.getCollection(consts.OrderStatusPending)
if err != nil { if err != nil {

1
go.mod
View File

@@ -7,6 +7,7 @@ require (
github.com/gogf/gf/contrib/drivers/mysql/v2 v2.9.6 github.com/gogf/gf/contrib/drivers/mysql/v2 v2.9.6
github.com/gogf/gf/contrib/nosql/redis/v2 v2.9.6 github.com/gogf/gf/contrib/nosql/redis/v2 v2.9.6
github.com/gogf/gf/v2 v2.9.6 github.com/gogf/gf/v2 v2.9.6
github.com/robfig/cron/v3 v3.0.1
go.mongodb.org/mongo-driver/v2 v2.4.0 go.mongodb.org/mongo-driver/v2 v2.4.0
) )

2
go.sum
View File

@@ -260,6 +260,8 @@ github.com/redis/go-redis/v9 v9.12.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=

18
main.go
View File

@@ -3,6 +3,7 @@ package main
import ( import (
"context" "context"
"order/controller" "order/controller"
"order/service"
"gitee.com/red-future---jilin-g/common/http" "gitee.com/red-future---jilin-g/common/http"
"gitee.com/red-future---jilin-g/common/jaeger" "gitee.com/red-future---jilin-g/common/jaeger"
@@ -10,6 +11,7 @@ import (
_ "gitee.com/red-future---jilin-g/common/ragflow" // RAGFlow 客户端自动初始化 _ "gitee.com/red-future---jilin-g/common/ragflow" // RAGFlow 客户端自动初始化
_ "github.com/gogf/gf/contrib/drivers/mysql/v2" _ "github.com/gogf/gf/contrib/drivers/mysql/v2"
_ "github.com/gogf/gf/contrib/nosql/redis/v2" _ "github.com/gogf/gf/contrib/nosql/redis/v2"
"github.com/gogf/gf/v2/frame/g"
) )
func main() { func main() {
@@ -19,10 +21,18 @@ func main() {
controller.OrderStatistics, controller.OrderStatistics,
controller.PaymentConfig, controller.PaymentConfig,
}) })
// 启动定时任务
ctx := context.Background()
// 启动订单超时处理定时任务
if err := service.StartOrderTimeoutScheduler(ctx); err != nil {
g.Log().Errorf(ctx, "启动订单超时处理定时任务失败: %v", err)
}
// 启动订单统计定时任务调度器 // 启动订单统计定时任务调度器
//ctx := context.Background() if err := service.StartOrderStatisticsScheduler(ctx); err != nil {
//if err := scheduler.OrderStatisticsSchedulerInstance.StartScheduler(ctx); err != nil { g.Log().Errorf(ctx, "启动订单统计定时任务失败: %v", err)
// g.Log().Errorf(ctx, "启动订单统计定时任务失败: %v", err) }
//}
select {} select {}
} }

View File

@@ -1,481 +0,0 @@
package scheduler
import (
"context"
"fmt"
"sync"
"time"
"order/service"
"github.com/gogf/gf/v2/frame/g"
)
// OrderStatisticsScheduler 订单统计定时任务调度器
type OrderStatisticsScheduler struct{}
var OrderStatisticsSchedulerInstance = &OrderStatisticsScheduler{}
var schedulerLock sync.Mutex
var isSchedulerRunning bool
// StartScheduler 启动定时任务调度器(分布式安全)
func (s *OrderStatisticsScheduler) StartScheduler(ctx context.Context) error {
schedulerLock.Lock()
defer schedulerLock.Unlock()
// 检查是否已经有调度器在运行(分布式部署时避免重复执行)
if isSchedulerRunning {
g.Log().Info(ctx, "订单统计定时任务调度器已在运行")
return nil
}
// 尝试获取分布式锁
if !s.acquireDistributedLock(ctx) {
g.Log().Info(ctx, "其他节点正在运行订单统计定时任务,当前节点跳过")
return nil
}
isSchedulerRunning = true
// 启动锁续期任务
go s.startLockRenewal(ctx)
// 启动日报表生成任务每天凌晨3点执行
go s.startDailyReportScheduler(ctx)
// 启动月报表生成任务每月1日凌晨4点执行
go s.startMonthlyReportScheduler(ctx)
// 启动季度报表生成任务每季度首月5日凌晨5点执行
go s.startQuarterlyReportScheduler(ctx)
// 启动年报表生成任务每年1月10日凌晨6点执行
go s.startYearlyReportScheduler(ctx)
g.Log().Info(ctx, "订单统计定时任务调度器已启动")
return nil
}
// acquireDistributedLock 获取分布式锁
func (s *OrderStatisticsScheduler) acquireDistributedLock(ctx context.Context) bool {
lockKey := "order_statistics_scheduler_lock"
// 尝试设置锁过期时间30秒
success, err := g.Redis().SetNX(ctx, lockKey, "locked")
if err != nil {
g.Log().Errorf(ctx, "获取分布式锁失败: %v", err)
return false
}
// 设置过期时间
if success {
_, err = g.Redis().Do(ctx, "EXPIRE", lockKey, 30)
if err != nil {
g.Log().Errorf(ctx, "设置锁过期时间失败: %v", err)
// 删除已设置的锁
g.Redis().Do(ctx, "DEL", lockKey)
return false
}
}
return success
}
// renewDistributedLock 续期分布式锁
func (s *OrderStatisticsScheduler) renewDistributedLock(ctx context.Context) bool {
lockKey := "order_statistics_scheduler_lock"
// 续期30秒
_, err := g.Redis().Do(ctx, "EXPIRE", lockKey, 30)
if err != nil {
g.Log().Errorf(ctx, "续期分布式锁失败: %v", err)
return false
}
return true
}
// startLockRenewal 启动锁续期任务
func (s *OrderStatisticsScheduler) startLockRenewal(ctx context.Context) {
ticker := time.NewTicker(20 * time.Second) // 每20秒续期一次
defer ticker.Stop()
for {
select {
case <-ticker.C:
if !s.renewDistributedLock(ctx) {
g.Log().Error(ctx, "分布式锁续期失败,停止调度器")
schedulerLock.Lock()
isSchedulerRunning = false
schedulerLock.Unlock()
return
}
case <-ctx.Done():
// 释放分布式锁
s.releaseDistributedLock(ctx)
return
}
}
}
// releaseDistributedLock 释放分布式锁
func (s *OrderStatisticsScheduler) releaseDistributedLock(ctx context.Context) {
lockKey := "order_statistics_scheduler_lock"
_, err := g.Redis().Do(ctx, "DEL", lockKey)
if err != nil {
g.Log().Errorf(ctx, "释放分布式锁失败: %v", err)
}
}
// acquireTaskLock 获取任务级分布式锁
func (s *OrderStatisticsScheduler) acquireTaskLock(ctx context.Context, lockKey string) bool {
// 尝试设置锁过期时间10分钟
success, err := g.Redis().SetNX(ctx, lockKey, "locked")
if err != nil {
g.Log().Errorf(ctx, "获取任务锁失败: %v", err)
return false
}
// 设置过期时间
if success {
_, err = g.Redis().Do(ctx, "EXPIRE", lockKey, 600) // 10分钟=600秒
if err != nil {
g.Log().Errorf(ctx, "设置锁过期时间失败: %v", err)
// 删除已设置的锁
g.Redis().Do(ctx, "DEL", lockKey)
return false
}
}
return success
}
// releaseTaskLock 释放任务级分布式锁
func (s *OrderStatisticsScheduler) releaseTaskLock(ctx context.Context, lockKey string) {
_, err := g.Redis().Do(ctx, "DEL", lockKey)
if err != nil {
g.Log().Errorf(ctx, "释放任务锁失败: %v", err)
}
}
// startDailyReportScheduler 日报表定时任务
func (s *OrderStatisticsScheduler) startDailyReportScheduler(ctx context.Context) {
// 计算到凌晨3点的时间
now := time.Now()
next := time.Date(now.Year(), now.Month(), now.Day()+1, 3, 0, 0, 0, time.Local)
duration := next.Sub(now)
// 等待到凌晨3点
time.Sleep(duration)
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
// 立即执行一次昨天的日报表生成
go s.generateYesterdayDailyReport(ctx)
for {
select {
case <-ticker.C:
// 生成昨天的日报表
s.generateYesterdayDailyReport(ctx)
case <-ctx.Done():
return
}
}
}
// startMonthlyReportScheduler 月报表定时任务
func (s *OrderStatisticsScheduler) startMonthlyReportScheduler(ctx context.Context) {
// 计算到下个月1日凌晨4点的时间
now := time.Now()
next := time.Date(now.Year(), now.Month()+1, 1, 4, 0, 0, 0, time.Local)
duration := next.Sub(now)
// 等待到下个月1日凌晨4点
time.Sleep(duration)
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 检查是否是每月1日如果是则生成上个月的月报表
if time.Now().Day() == 1 {
go s.generateLastMonthReport(ctx)
}
case <-ctx.Done():
return
}
}
}
// startQuarterlyReportScheduler 季度报表定时任务
func (s *OrderStatisticsScheduler) startQuarterlyReportScheduler(ctx context.Context) {
// 计算到下个季度第一天凌晨5点的时间
now := time.Now()
nextQuarter := s.getNextQuarterFirstDay(now)
next := time.Date(nextQuarter.Year(), nextQuarter.Month(), nextQuarter.Day(), 5, 0, 0, 0, time.Local)
duration := next.Sub(now)
// 等待到下个季度第一天凌晨5点
time.Sleep(duration)
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 检查是否是季度第一天,如果是则生成上个季度的季度报表
if s.isQuarterFirstDay() {
go s.generateLastQuarterReport(ctx)
}
case <-ctx.Done():
return
}
}
}
// startYearlyReportScheduler 年报表定时任务
func (s *OrderStatisticsScheduler) startYearlyReportScheduler(ctx context.Context) {
// 计算到明年1月1日凌晨6点的时间
now := time.Now()
next := time.Date(now.Year()+1, time.January, 1, 6, 0, 0, 0, time.Local)
duration := next.Sub(now)
// 等待到明年1月1日凌晨6点
time.Sleep(duration)
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 检查是否是1月1日如果是则生成上一年的年报表
if time.Now().Month() == time.January && time.Now().Day() == 1 {
go s.generateLastYearReport(ctx)
}
case <-ctx.Done():
return
}
}
}
// generateYesterdayDailyReport 生成昨天的日报表
func (s *OrderStatisticsScheduler) generateYesterdayDailyReport(ctx context.Context) {
yesterday := time.Now().AddDate(0, 0, -1)
g.Log().Infof(ctx, "开始生成所有租户的日统计数据: %s", yesterday.Format("2006-01-02"))
// 获取所有租户ID
tenantIDs, err := s.getAllTenants(ctx)
if err != nil {
g.Log().Errorf(ctx, "获取租户列表失败: %v", err)
return
}
// 并发处理每个租户的日统计
for _, tenantID := range tenantIDs {
go s.generateTenantDailyReport(ctx, tenantID, yesterday)
}
}
// generateTenantDailyReport 生成指定租户的日报表
func (s *OrderStatisticsScheduler) generateTenantDailyReport(ctx context.Context, tenantID int64, date time.Time) {
lockKey := fmt.Sprintf("order_stats_daily_%d_%s", tenantID, date.Format("2006-01-02"))
// 获取任务锁
if !s.acquireTaskLock(ctx, lockKey) {
g.Log().Infof(ctx, "租户 %d 的日统计任务正在执行,跳过", tenantID)
return
}
defer s.releaseTaskLock(ctx, lockKey)
g.Log().Infof(ctx, "开始生成租户 %d 的日统计: %s", tenantID, date.Format("2006-01-02"))
err := service.OrderStatistics.GenerateDailyStatistics(ctx, tenantID, date, false)
if err != nil {
g.Log().Errorf(ctx, "生成租户 %d 日统计失败: %v", tenantID, err)
return
}
g.Log().Infof(ctx, "租户 %d 日统计生成成功: %s", tenantID, date.Format("2006-01-02"))
}
// generateLastMonthReport 生成上个月的月报表
func (s *OrderStatisticsScheduler) generateLastMonthReport(ctx context.Context) {
lastMonth := time.Now().AddDate(0, -1, 0)
year := lastMonth.Year()
month := int(lastMonth.Month())
g.Log().Infof(ctx, "开始生成所有租户的月统计数据: %d年%d月", year, month)
// 获取所有租户ID
tenantIDs, err := s.getAllTenants(ctx)
if err != nil {
g.Log().Errorf(ctx, "获取租户列表失败: %v", err)
return
}
// 并发处理每个租户的月统计
for _, tenantID := range tenantIDs {
go s.generateTenantMonthlyReport(ctx, tenantID, year, month)
}
}
// generateTenantMonthlyReport 生成指定租户的月报表
func (s *OrderStatisticsScheduler) generateTenantMonthlyReport(ctx context.Context, tenantID int64, year int, month int) {
lockKey := fmt.Sprintf("order_stats_monthly_%d_%d_%d", tenantID, year, month)
// 获取任务锁
if !s.acquireTaskLock(ctx, lockKey) {
g.Log().Infof(ctx, "租户 %d 的月统计任务正在执行,跳过", tenantID)
return
}
defer s.releaseTaskLock(ctx, lockKey)
g.Log().Infof(ctx, "开始生成租户 %d 的月统计: %d年%d月", tenantID, year, month)
err := service.OrderStatistics.GenerateMonthlyStatistics(ctx, tenantID, year, month, false)
if err != nil {
g.Log().Errorf(ctx, "生成租户 %d 月统计失败: %v", tenantID, err)
return
}
g.Log().Infof(ctx, "租户 %d 月统计生成成功: %d年%d月", tenantID, year, month)
}
// generateLastQuarterReport 生成上个季度的季度报表
func (s *OrderStatisticsScheduler) generateLastQuarterReport(ctx context.Context) {
lastQuarter := time.Now().AddDate(0, -3, 0)
year := lastQuarter.Year()
quarter := s.getQuarter(lastQuarter.Month())
g.Log().Infof(ctx, "开始生成所有租户的季度统计数据: %d年第%d季度", year, quarter)
// 获取所有租户ID
tenantIDs, err := s.getAllTenants(ctx)
if err != nil {
g.Log().Errorf(ctx, "获取租户列表失败: %v", err)
return
}
// 并发处理每个租户的季度统计
for _, tenantID := range tenantIDs {
go s.generateTenantQuarterlyReport(ctx, tenantID, year, quarter)
}
}
// generateTenantQuarterlyReport 生成指定租户的季度报表
func (s *OrderStatisticsScheduler) generateTenantQuarterlyReport(ctx context.Context, tenantID int64, year int, quarter int) {
lockKey := fmt.Sprintf("order_stats_quarterly_%d_%d_%d", tenantID, year, quarter)
// 获取任务锁
if !s.acquireTaskLock(ctx, lockKey) {
g.Log().Infof(ctx, "租户 %d 的季度统计任务正在执行,跳过", tenantID)
return
}
defer s.releaseTaskLock(ctx, lockKey)
g.Log().Infof(ctx, "开始生成租户 %d 的季度统计: %d年第%d季度", tenantID, year, quarter)
err := service.OrderStatistics.GenerateQuarterlyStatistics(ctx, tenantID, year, quarter, false)
if err != nil {
g.Log().Errorf(ctx, "生成租户 %d 季度统计失败: %v", tenantID, err)
return
}
g.Log().Infof(ctx, "租户 %d 季度统计生成成功: %d年第%d季度", tenantID, year, quarter)
}
// generateLastYearReport 生成上一年的年报表
func (s *OrderStatisticsScheduler) generateLastYearReport(ctx context.Context) {
lastYear := time.Now().Year() - 1
g.Log().Infof(ctx, "开始生成所有租户的年统计数据: %d年", lastYear)
// 获取所有租户ID
tenantIDs, err := s.getAllTenants(ctx)
if err != nil {
g.Log().Errorf(ctx, "获取租户列表失败: %v", err)
return
}
// 并发处理每个租户的年统计
for _, tenantID := range tenantIDs {
go s.generateTenantYearlyReport(ctx, tenantID, lastYear)
}
}
// generateTenantYearlyReport 生成指定租户的年报表
func (s *OrderStatisticsScheduler) generateTenantYearlyReport(ctx context.Context, tenantID int64, year int) {
lockKey := fmt.Sprintf("order_stats_yearly_%d_%d", tenantID, year)
// 获取任务锁
if !s.acquireTaskLock(ctx, lockKey) {
g.Log().Infof(ctx, "租户 %d 的年统计任务正在执行,跳过", tenantID)
return
}
defer s.releaseTaskLock(ctx, lockKey)
g.Log().Infof(ctx, "开始生成租户 %d 的年统计: %d年", tenantID, year)
err := service.OrderStatistics.GenerateYearlyStatistics(ctx, tenantID, year, false)
if err != nil {
g.Log().Errorf(ctx, "生成租户 %d 年统计失败: %v", tenantID, err)
return
}
g.Log().Infof(ctx, "租户 %d 年统计生成成功: %d年", tenantID, year)
}
// getNextQuarterFirstDay 获取下个季度的第一天
func (s *OrderStatisticsScheduler) getNextQuarterFirstDay(now time.Time) time.Time {
quarter := s.getQuarter(now.Month())
if quarter == 4 {
return time.Date(now.Year()+1, time.January, 1, 0, 0, 0, 0, now.Location())
}
return time.Date(now.Year(), time.Month((quarter*3)+1), 1, 0, 0, 0, 0, now.Location())
}
// getQuarter 获取月份对应的季度
func (s *OrderStatisticsScheduler) getQuarter(month time.Month) int {
switch {
case month >= 1 && month <= 3:
return 1
case month >= 4 && month <= 6:
return 2
case month >= 7 && month <= 9:
return 3
default:
return 4
}
}
// isQuarterFirstDay 检查今天是否是季度的第一天
func (s *OrderStatisticsScheduler) isQuarterFirstDay() bool {
now := time.Now()
quarter := s.getQuarter(now.Month())
switch quarter {
case 1:
return now.Day() == 1 && now.Month() == time.January
case 2:
return now.Day() == 1 && now.Month() == time.April
case 3:
return now.Day() == 1 && now.Month() == time.July
case 4:
return now.Day() == 1 && now.Month() == time.October
default:
return false
}
}
// getAllTenants 获取所有租户ID这里需要根据实际业务实现
func (s *OrderStatisticsScheduler) getAllTenants(ctx context.Context) ([]int64, error) {
// 这里应该从实际的租户管理服务获取租户列表
// 暂时返回一个默认的租户ID实际使用时需要替换为真实的租户获取逻辑
return []int64{1, 2, 3}, nil
}

View File

@@ -12,20 +12,29 @@ import (
"order/model/dto" "order/model/dto"
"order/model/entity" "order/model/entity"
"gitee.com/red-future---jilin-g/common/http"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv" "github.com/gogf/gf/v2/util/gconv"
"github.com/robfig/cron/v3"
"go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/bson"
) )
// AssetServiceResponse 资产服务响应结构体
type AssetServiceResponse struct {
ID string `json:"id"`
Name string `json:"name"`
UnlimitedStock bool `json:"unlimitedStock"`
Price *struct {
TotalStock int `json:"totalStock"`
} `json:"price"`
SaleMode string `json:"saleMode"`
}
type order struct{} type order struct{}
// Order 订单服务 // Order 订单服务
var Order = new(order) var Order = new(order)
// Init 初始化服务
func Init() error {
return nil
}
// convertOrderItemsFromDTO 从DTO转换订单商品项 // convertOrderItemsFromDTO 从DTO转换订单商品项
func convertOrderItemsFromDTO(items []dto.OrderItemReq) []entity.OrderItem { func convertOrderItemsFromDTO(items []dto.OrderItemReq) []entity.OrderItem {
var result []entity.OrderItem var result []entity.OrderItem
@@ -122,7 +131,41 @@ func (s *order) CreateOrder(ctx context.Context, req *dto.CreateOrderReq) (*dto.
return nil, errors.New("订单商品不能为空") return nil, errors.New("订单商品不能为空")
} }
// 2. 计算订单金额 // 2. 检查库存扣减策略并处理库存
for i := range req.OrderItems {
item := &req.OrderItems[i]
// 判断是否需要扣减库存
shouldDeduct, saleMode, err := s.shouldDeductStock(ctx, gconv.String(req.TenantID), item.AssetID)
if err != nil {
return nil, fmt.Errorf("检查库存策略失败: %w", err)
}
// 如果需要扣减库存,判断扣减时机
if shouldDeduct {
timing, err := s.getDeductStockTiming(ctx, gconv.String(req.TenantID), item.AssetID)
if err != nil {
return nil, fmt.Errorf("获取库存扣减时机失败: %w", err)
}
// 秒杀场景:下单时立即扣减库存
if timing == "order_create" {
quantity := len(item.Stocks) // 每个库存项数量为1
// 先生成订单号用于库存操作
orderNo := s.generateOrderNo(gconv.String(req.TenantID))
if err := s.deductStock(ctx, gconv.String(req.TenantID), orderNo, item.AssetID, quantity, fmt.Sprintf("秒杀订单预占库存,售卖方式:%s", saleMode)); err != nil {
return nil, fmt.Errorf("秒杀订单预占库存失败: %w", err)
}
// 记录库存已预占
g.Log().Infof(ctx, "资产 %s 秒杀订单 %s 预占库存成功,数量:%d", item.AssetID, orderNo, quantity)
}
}
}
// 3. 计算订单金额
totalAmount := int64(0) totalAmount := int64(0)
for i := range req.OrderItems { for i := range req.OrderItems {
item := &req.OrderItems[i] item := &req.OrderItems[i]
@@ -141,10 +184,10 @@ func (s *order) CreateOrder(ctx context.Context, req *dto.CreateOrderReq) (*dto.
return nil, errors.New("订单总金额必须大于0") return nil, errors.New("订单总金额必须大于0")
} }
// 3. 生成订单号 // 4. 生成订单号
orderNo := s.generateOrderNo(gconv.String(req.TenantID)) orderNo := s.generateOrderNo(gconv.String(req.TenantID))
// 4. 设置订单过期时间30分钟后 // 5. 设置订单过期时间30分钟后
expiredAt := time.Now().Add(30 * time.Minute) expiredAt := time.Now().Add(30 * time.Minute)
// 5. 创建待支付订单 // 5. 创建待支付订单
@@ -414,7 +457,44 @@ func (s *order) CancelOrder(ctx context.Context, req *dto.CancelOrderReq) (*dto.
return nil, fmt.Errorf("订单状态不正确,当前状态: %s", status) return nil, fmt.Errorf("订单状态不正确,当前状态: %s", status)
} }
// 4. 将订单移动到已取消状态 pendingOrder, ok := order.(*entity.OrderPending)
if !ok {
return nil, errors.New("订单类型错误")
}
// 4. 处理库存回充(针对秒杀场景预占的库存)
for _, item := range pendingOrder.OrderItems {
// 判断是否需要回充库存
shouldDeduct, saleMode, err := s.shouldDeductStock(ctx, req.TenantID, item.AssetID)
if err != nil {
g.Log().Errorf(ctx, "检查库存策略失败,订单:%s资产%s错误%v", req.OrderNo, item.AssetID, err)
continue
}
// 如果该资产需要扣减库存且是秒杀场景,则需要回充预占库存
if shouldDeduct && saleMode == "flash" {
timing, err := s.getDeductStockTiming(ctx, req.TenantID, item.AssetID)
if err != nil {
g.Log().Errorf(ctx, "获取库存扣减时机失败,订单:%s资产%s错误%v", req.OrderNo, item.AssetID, err)
continue
}
// 秒杀场景:下单时已扣减库存,取消时需要回充
if timing == "order_create" {
quantity := len(item.Stocks) // 每个库存项数量为1
if err := s.refundStock(ctx, req.TenantID, req.OrderNo, item.AssetID, quantity, fmt.Sprintf("秒杀订单取消回充库存,售卖方式:%s原因%s", saleMode, req.Reason)); err != nil {
g.Log().Errorf(ctx, "订单取消回充库存失败,订单:%s资产%s错误%v", req.OrderNo, item.AssetID, err)
// 库存回充失败不影响订单状态更新,但需要记录错误
continue
}
g.Log().Infof(ctx, "资产 %s 订单取消回充库存成功,订单:%s数量%d", item.AssetID, req.OrderNo, quantity)
}
}
}
// 5. 将订单移动到已取消状态
updateData := bson.M{ updateData := bson.M{
"cancel_reason": req.Reason, "cancel_reason": req.Reason,
} }
@@ -530,3 +610,236 @@ func (s *order) ProcessExpiredOrders(ctx context.Context) error {
func (s *order) UpdatePayInfo(ctx context.Context, orderNo string, payInfo entity.PayInfo) error { func (s *order) UpdatePayInfo(ctx context.Context, orderNo string, payInfo entity.PayInfo) error {
return dao.Order.UpdatePayInfo(ctx, orderNo, payInfo) return dao.Order.UpdatePayInfo(ctx, orderNo, payInfo)
} }
// shouldDeductStock 判断是否需要扣减库存
func (s *order) shouldDeductStock(ctx context.Context, tenantID, assetID string) (bool, string, error) {
// 调用assets服务获取资产信息
assetResp, err := s.getAssetFromAssetService(ctx, tenantID, assetID)
if err != nil {
return false, "", fmt.Errorf("获取资产信息失败: %w", err)
}
if assetResp == nil {
return false, "", errors.New("资产不存在")
}
// 无库存限制的资产不需要扣减库存
if assetResp.UnlimitedStock {
return false, "unlimited_stock", nil
}
// 预售场景不扣减库存
if assetResp.SaleMode == "presale" {
return false, "presale_no_deduct", nil
}
// 有库存限制的常规售卖和秒杀需要扣减库存
return true, assetResp.SaleMode, nil
}
// getDeductStockTiming 获取库存扣减时机
func (s *order) getDeductStockTiming(ctx context.Context, tenantID, assetID string) (string, error) {
assetResp, err := s.getAssetFromAssetService(ctx, tenantID, assetID)
if err != nil {
return "", fmt.Errorf("获取资产信息失败: %w", err)
}
if assetResp == nil {
return "", errors.New("资产不存在")
}
// 根据售卖方式确定扣减时机
switch assetResp.SaleMode {
case "flash":
return "order_create", nil // 秒杀:下单时扣减库存
case "regular":
return "payment_success", nil // 常规售卖:支付成功时扣减库存
default:
return "", errors.New("未知售卖方式")
}
}
// deductStock 扣减库存
func (s *order) deductStock(ctx context.Context, tenantID, orderNo, assetID string, quantity int, reason string) error {
// 调用assets服务扣减库存
err := s.deductStockFromAssetService(ctx, tenantID, assetID, quantity)
if err != nil {
return fmt.Errorf("扣减库存失败: %w", err)
}
g.Log().Infof(ctx, "库存扣减成功,订单:%s资产%s数量%d原因%s", orderNo, assetID, quantity, reason)
return nil
}
// refundStock 回充库存
func (s *order) refundStock(ctx context.Context, tenantID, orderNo, assetID string, quantity int, reason string) error {
// 调用资产服务回充库存
err := s.refundStockFromAssetService(ctx, tenantID, assetID, quantity)
if err != nil {
return fmt.Errorf("回充库存失败: %w", err)
}
g.Log().Infof(ctx, "库存回充成功,订单:%s资产%s数量%d原因%s", orderNo, assetID, quantity, reason)
return nil
}
// getAssetFromAssetService 从资产服务获取资产信息
func (s *order) getAssetFromAssetService(ctx context.Context, tenantID, assetID string) (*AssetServiceResponse, error) {
// 使用common/http中的封装方法调用资产服务
// 这里应该调用assets服务的getAsset接口
// 暂时返回模拟数据
var assetResp AssetServiceResponse
err := http.Get(ctx, fmt.Sprintf("http://assets-service/internal/asset/%s", assetID), &assetResp)
if err != nil {
return nil, err
}
return &assetResp, nil
}
// deductStockFromAssetService 从资产服务扣减库存
func (s *order) deductStockFromAssetService(ctx context.Context, tenantID, assetID string, quantity int) error {
// 调用assets服务的扣减库存接口
req := map[string]interface{}{
"asset_id": assetID,
"quantity": quantity,
"reason": "订单扣减",
}
var result struct {
Success bool `json:"success"`
Message string `json:"message"`
}
err := http.Post(ctx, "http://assets-service/internal/stock/deduct", &result, req)
if err != nil {
return err
}
if !result.Success {
return errors.New(result.Message)
}
return nil
}
// refundStockFromAssetService 从资产服务回充库存
func (s *order) refundStockFromAssetService(ctx context.Context, tenantID, assetID string, quantity int) error {
// 调用资产服务的回充库存接口
req := map[string]interface{}{
"asset_id": assetID,
"quantity": quantity,
"reason": "订单取消回充",
}
var result struct {
Success bool `json:"success"`
Message string `json:"message"`
}
err := http.Post(ctx, "http://assets-service/internal/stock/refund", &result, req)
if err != nil {
return err
}
if !result.Success {
return errors.New(result.Message)
}
return nil
}
// OrderTimeoutScheduler 订单超时处理定时任务
type OrderTimeoutScheduler struct {
cron *cron.Cron
}
var orderTimeoutSchedulerInstance = &OrderTimeoutScheduler{}
// StartOrderTimeoutScheduler 启动订单超时处理定时任务
func StartOrderTimeoutScheduler(ctx context.Context) error {
return orderTimeoutSchedulerInstance.StartScheduler(ctx)
}
// StopOrderTimeoutScheduler 停止订单超时处理定时任务
func StopOrderTimeoutScheduler() {
orderTimeoutSchedulerInstance.StopScheduler()
}
// StartScheduler 启动订单超时处理定时任务
func (s *OrderTimeoutScheduler) StartScheduler(ctx context.Context) error {
if s.cron != nil {
s.cron.Stop()
}
s.cron = cron.New(cron.WithSeconds())
// 每分钟检查一次超时订单
_, err := s.cron.AddFunc("0 */1 * * * *", func() {
s.processTimeoutOrders(ctx)
})
if err != nil {
return fmt.Errorf("添加订单超时检查定时任务失败: %w", err)
}
s.cron.Start()
g.Log().Info(ctx, "订单超时处理定时任务已启动")
return nil
}
// StopScheduler 停止定时任务
func (s *OrderTimeoutScheduler) StopScheduler() {
if s.cron != nil {
s.cron.Stop()
s.cron = nil
}
}
// processTimeoutOrders 处理超时订单
func (s *OrderTimeoutScheduler) processTimeoutOrders(ctx context.Context) {
g.Log().Info(ctx, "开始处理超时订单")
// 查询所有超时的待支付订单
expiredOrders, err := dao.Order.GetExpiredPendingOrders(ctx)
if err != nil {
g.Log().Errorf(ctx, "查询超时订单失败: %v", err)
return
}
if len(expiredOrders) == 0 {
g.Log().Debug(ctx, "没有需要处理的超时订单")
return
}
g.Log().Infof(ctx, "发现 %d 个超时订单", len(expiredOrders))
// 逐个处理超时订单
for _, order := range expiredOrders {
if err := s.processTimeoutOrder(ctx, &order); err != nil {
g.Log().Errorf(ctx, "处理超时订单失败,订单号:%s错误%v", order.OrderNo, err)
continue
}
g.Log().Infof(ctx, "超时订单处理成功,订单号:%s", order.OrderNo)
}
}
// processTimeoutOrder 处理单个超时订单
func (s *OrderTimeoutScheduler) processTimeoutOrder(ctx context.Context, order *entity.OrderPending) error {
// 获取租户ID从订单中获取
tenantID := order.TenantId
if tenantID == "" {
return fmt.Errorf("订单缺少租户ID")
}
// 将订单移动到已取消状态
updateData := map[string]interface{}{
"cancel_reason": "订单超时自动取消",
}
if err := dao.Order.MoveOrderToStatus(ctx, consts.OrderStatusPending, consts.OrderStatusCancelled, order.OrderNo, updateData); err != nil {
return fmt.Errorf("更新订单状态失败: %w", err)
}
return nil
}

View File

@@ -3,6 +3,7 @@ package service
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"time" "time"
"order/dao" "order/dao"
@@ -437,3 +438,477 @@ func (s *orderStatistics) GenerateStatistics(ctx context.Context, req *dto.Gener
Message: "统计数据生成成功", Message: "统计数据生成成功",
}, nil }, nil
} }
// OrderStatisticsScheduler 订单统计定时任务调度器
type OrderStatisticsScheduler struct{}
var orderStatisticsSchedulerInstance = &OrderStatisticsScheduler{}
var statisticsSchedulerLock sync.Mutex
var isStatisticsSchedulerRunning bool
// StartOrderStatisticsScheduler 启动订单统计定时任务调度器
func StartOrderStatisticsScheduler(ctx context.Context) error {
return orderStatisticsSchedulerInstance.StartScheduler(ctx)
}
// StartScheduler 启动定时任务调度器(分布式安全)
func (s *OrderStatisticsScheduler) StartScheduler(ctx context.Context) error {
statisticsSchedulerLock.Lock()
defer statisticsSchedulerLock.Unlock()
// 检查是否已经有调度器在运行(分布式部署时避免重复执行)
if isStatisticsSchedulerRunning {
g.Log().Info(ctx, "订单统计定时任务调度器已在运行")
return nil
}
// 尝试获取分布式锁
if !s.acquireDistributedLock(ctx) {
g.Log().Info(ctx, "其他节点正在运行订单统计定时任务,当前节点跳过")
return nil
}
isStatisticsSchedulerRunning = true
// 启动锁续期任务
go s.startLockRenewal(ctx)
// 启动日报表生成任务每天凌晨3点执行
go s.startDailyReportScheduler(ctx)
// 启动月报表生成任务每月1日凌晨4点执行
go s.startMonthlyReportScheduler(ctx)
// 启动季度报表生成任务每季度首月5日凌晨5点执行
go s.startQuarterlyReportScheduler(ctx)
// 启动年报表生成任务每年1月10日凌晨6点执行
go s.startYearlyReportScheduler(ctx)
g.Log().Info(ctx, "订单统计定时任务调度器已启动")
return nil
}
// acquireDistributedLock 获取分布式锁
func (s *OrderStatisticsScheduler) acquireDistributedLock(ctx context.Context) bool {
lockKey := "order_statistics_scheduler_lock"
// 尝试设置锁过期时间30秒
success, err := g.Redis().SetNX(ctx, lockKey, "locked")
if err != nil {
g.Log().Errorf(ctx, "获取分布式锁失败: %v", err)
return false
}
// 设置过期时间
if success {
_, err = g.Redis().Do(ctx, "EXPIRE", lockKey, 30)
if err != nil {
g.Log().Errorf(ctx, "设置锁过期时间失败: %v", err)
// 删除已设置的锁
g.Redis().Do(ctx, "DEL", lockKey)
return false
}
}
return success
}
// renewDistributedLock 续期分布式锁
func (s *OrderStatisticsScheduler) renewDistributedLock(ctx context.Context) bool {
lockKey := "order_statistics_scheduler_lock"
// 续期30秒
_, err := g.Redis().Do(ctx, "EXPIRE", lockKey, 30)
if err != nil {
g.Log().Errorf(ctx, "续期分布式锁失败: %v", err)
return false
}
return true
}
// startLockRenewal 启动锁续期任务
func (s *OrderStatisticsScheduler) startLockRenewal(ctx context.Context) {
ticker := time.NewTicker(20 * time.Second) // 每20秒续期一次
defer ticker.Stop()
for {
select {
case <-ticker.C:
if !s.renewDistributedLock(ctx) {
g.Log().Error(ctx, "分布式锁续期失败,停止调度器")
statisticsSchedulerLock.Lock()
isStatisticsSchedulerRunning = false
statisticsSchedulerLock.Unlock()
return
}
case <-ctx.Done():
// 释放分布式锁
s.releaseDistributedLock(ctx)
return
}
}
}
// releaseDistributedLock 释放分布式锁
func (s *OrderStatisticsScheduler) releaseDistributedLock(ctx context.Context) {
lockKey := "order_statistics_scheduler_lock"
_, err := g.Redis().Do(ctx, "DEL", lockKey)
if err != nil {
g.Log().Errorf(ctx, "释放分布式锁失败: %v", err)
}
}
// acquireTaskLock 获取任务级分布式锁
func (s *OrderStatisticsScheduler) acquireTaskLock(ctx context.Context, lockKey string) bool {
// 尝试设置锁过期时间10分钟
success, err := g.Redis().SetNX(ctx, lockKey, "locked")
if err != nil {
g.Log().Errorf(ctx, "获取任务锁失败: %v", err)
return false
}
// 设置过期时间
if success {
_, err = g.Redis().Do(ctx, "EXPIRE", lockKey, 600) // 10分钟=600秒
if err != nil {
g.Log().Errorf(ctx, "设置锁过期时间失败: %v", err)
// 删除已设置的锁
g.Redis().Do(ctx, "DEL", lockKey)
return false
}
}
return success
}
// releaseTaskLock 释放任务级分布式锁
func (s *OrderStatisticsScheduler) releaseTaskLock(ctx context.Context, lockKey string) {
_, err := g.Redis().Do(ctx, "DEL", lockKey)
if err != nil {
g.Log().Errorf(ctx, "释放任务锁失败: %v", err)
}
}
// startDailyReportScheduler 日报表定时任务
func (s *OrderStatisticsScheduler) startDailyReportScheduler(ctx context.Context) {
// 计算到凌晨3点的时间
now := time.Now()
next := time.Date(now.Year(), now.Month(), now.Day()+1, 3, 0, 0, 0, time.Local)
duration := next.Sub(now)
// 等待到凌晨3点
time.Sleep(duration)
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
// 立即执行一次昨天的日报表生成
go s.generateYesterdayDailyReport(ctx)
for {
select {
case <-ticker.C:
// 生成昨天的日报表
s.generateYesterdayDailyReport(ctx)
case <-ctx.Done():
return
}
}
}
// startMonthlyReportScheduler 月报表定时任务
func (s *OrderStatisticsScheduler) startMonthlyReportScheduler(ctx context.Context) {
// 计算到下个月1日凌晨4点的时间
now := time.Now()
next := time.Date(now.Year(), now.Month()+1, 1, 4, 0, 0, 0, time.Local)
duration := next.Sub(now)
// 等待到下个月1日凌晨4点
time.Sleep(duration)
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 检查是否是每月1日如果是则生成上个月的月报表
if time.Now().Day() == 1 {
go s.generateLastMonthReport(ctx)
}
case <-ctx.Done():
return
}
}
}
// startQuarterlyReportScheduler 季度报表定时任务
func (s *OrderStatisticsScheduler) startQuarterlyReportScheduler(ctx context.Context) {
// 计算到下个季度第一天凌晨5点的时间
now := time.Now()
nextQuarter := s.getNextQuarterFirstDay(now)
next := time.Date(nextQuarter.Year(), nextQuarter.Month(), nextQuarter.Day(), 5, 0, 0, 0, time.Local)
duration := next.Sub(now)
// 等待到下个季度第一天凌晨5点
time.Sleep(duration)
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 检查是否是季度第一天,如果是则生成上个季度的季度报表
if s.isQuarterFirstDay() {
go s.generateLastQuarterReport(ctx)
}
case <-ctx.Done():
return
}
}
}
// startYearlyReportScheduler 年报表定时任务
func (s *OrderStatisticsScheduler) startYearlyReportScheduler(ctx context.Context) {
// 计算到明年1月1日凌晨6点的时间
now := time.Now()
next := time.Date(now.Year()+1, time.January, 1, 6, 0, 0, 0, time.Local)
duration := next.Sub(now)
// 等待到明年1月1日凌晨6点
time.Sleep(duration)
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 检查是否是1月1日如果是则生成上一年的年报表
if time.Now().Month() == time.January && time.Now().Day() == 1 {
go s.generateLastYearReport(ctx)
}
case <-ctx.Done():
return
}
}
}
// generateYesterdayDailyReport 生成昨天的日报表
func (s *OrderStatisticsScheduler) generateYesterdayDailyReport(ctx context.Context) {
yesterday := time.Now().AddDate(0, 0, -1)
g.Log().Infof(ctx, "开始生成所有租户的日统计数据: %s", yesterday.Format("2006-01-02"))
// 获取所有租户ID
tenantIDs, err := s.getAllTenants(ctx)
if err != nil {
g.Log().Errorf(ctx, "获取租户列表失败: %v", err)
return
}
// 并发处理每个租户的日统计
for _, tenantID := range tenantIDs {
go s.generateTenantDailyReport(ctx, tenantID, yesterday)
}
}
// generateTenantDailyReport 生成指定租户的日报表
func (s *OrderStatisticsScheduler) generateTenantDailyReport(ctx context.Context, tenantID int64, date time.Time) {
lockKey := fmt.Sprintf("order_stats_daily_%d_%s", tenantID, date.Format("2006-01-02"))
// 获取任务锁
if !s.acquireTaskLock(ctx, lockKey) {
g.Log().Infof(ctx, "租户 %d 的日统计任务正在执行,跳过", tenantID)
return
}
defer s.releaseTaskLock(ctx, lockKey)
g.Log().Infof(ctx, "开始生成租户 %d 的日统计: %s", tenantID, date.Format("2006-01-02"))
err := OrderStatistics.GenerateDailyStatistics(ctx, tenantID, date, false)
if err != nil {
g.Log().Errorf(ctx, "生成租户 %d 日统计失败: %v", tenantID, err)
return
}
g.Log().Infof(ctx, "租户 %d 日统计生成成功: %s", tenantID, date.Format("2006-01-02"))
}
// generateLastMonthReport 生成上个月的月报表
func (s *OrderStatisticsScheduler) generateLastMonthReport(ctx context.Context) {
lastMonth := time.Now().AddDate(0, -1, 0)
year := lastMonth.Year()
month := int(lastMonth.Month())
g.Log().Infof(ctx, "开始生成所有租户的月统计数据: %d年%d月", year, month)
// 获取所有租户ID
tenantIDs, err := s.getAllTenants(ctx)
if err != nil {
g.Log().Errorf(ctx, "获取租户列表失败: %v", err)
return
}
// 并发处理每个租户的月统计
for _, tenantID := range tenantIDs {
go s.generateTenantMonthlyReport(ctx, tenantID, year, month)
}
}
// generateTenantMonthlyReport 生成指定租户的月报表
func (s *OrderStatisticsScheduler) generateTenantMonthlyReport(ctx context.Context, tenantID int64, year int, month int) {
lockKey := fmt.Sprintf("order_stats_monthly_%d_%d_%d", tenantID, year, month)
// 获取任务锁
if !s.acquireTaskLock(ctx, lockKey) {
g.Log().Infof(ctx, "租户 %d 的月统计任务正在执行,跳过", tenantID)
return
}
defer s.releaseTaskLock(ctx, lockKey)
g.Log().Infof(ctx, "开始生成租户 %d 的月统计: %d年%d月", tenantID, year, month)
err := OrderStatistics.GenerateMonthlyStatistics(ctx, tenantID, year, month, false)
if err != nil {
g.Log().Errorf(ctx, "生成租户 %d 月统计失败: %v", tenantID, err)
return
}
g.Log().Infof(ctx, "租户 %d 月统计生成成功: %d年%d月", tenantID, year, month)
}
// generateLastQuarterReport 生成上个季度的季度报表
func (s *OrderStatisticsScheduler) generateLastQuarterReport(ctx context.Context) {
lastQuarter := time.Now().AddDate(0, -3, 0)
year := lastQuarter.Year()
quarter := s.getQuarter(lastQuarter.Month())
g.Log().Infof(ctx, "开始生成所有租户的季度统计数据: %d年第%d季度", year, quarter)
// 获取所有租户ID
tenantIDs, err := s.getAllTenants(ctx)
if err != nil {
g.Log().Errorf(ctx, "获取租户列表失败: %v", err)
return
}
// 并发处理每个租户的季度统计
for _, tenantID := range tenantIDs {
go s.generateTenantQuarterlyReport(ctx, tenantID, year, quarter)
}
}
// generateTenantQuarterlyReport 生成指定租户的季度报表
func (s *OrderStatisticsScheduler) generateTenantQuarterlyReport(ctx context.Context, tenantID int64, year int, quarter int) {
lockKey := fmt.Sprintf("order_stats_quarterly_%d_%d_%d", tenantID, year, quarter)
// 获取任务锁
if !s.acquireTaskLock(ctx, lockKey) {
g.Log().Infof(ctx, "租户 %d 的季度统计任务正在执行,跳过", tenantID)
return
}
defer s.releaseTaskLock(ctx, lockKey)
g.Log().Infof(ctx, "开始生成租户 %d 的季度统计: %d年第%d季度", tenantID, year, quarter)
err := OrderStatistics.GenerateQuarterlyStatistics(ctx, tenantID, year, quarter, false)
if err != nil {
g.Log().Errorf(ctx, "生成租户 %d 季度统计失败: %v", tenantID, err)
return
}
g.Log().Infof(ctx, "租户 %d 季度统计生成成功: %d年第%d季度", tenantID, year, quarter)
}
// generateLastYearReport 生成上一年的年报表
func (s *OrderStatisticsScheduler) generateLastYearReport(ctx context.Context) {
lastYear := time.Now().Year() - 1
g.Log().Infof(ctx, "开始生成所有租户的年统计数据: %d年", lastYear)
// 获取所有租户ID
tenantIDs, err := s.getAllTenants(ctx)
if err != nil {
g.Log().Errorf(ctx, "获取租户列表失败: %v", err)
return
}
// 并发处理每个租户的年统计
for _, tenantID := range tenantIDs {
go s.generateTenantYearlyReport(ctx, tenantID, lastYear)
}
}
// generateTenantYearlyReport 生成指定租户的年报表
func (s *OrderStatisticsScheduler) generateTenantYearlyReport(ctx context.Context, tenantID int64, year int) {
lockKey := fmt.Sprintf("order_stats_yearly_%d_%d", tenantID, year)
// 获取任务锁
if !s.acquireTaskLock(ctx, lockKey) {
g.Log().Infof(ctx, "租户 %d 的年统计任务正在执行,跳过", tenantID)
return
}
defer s.releaseTaskLock(ctx, lockKey)
g.Log().Infof(ctx, "开始生成租户 %d 的年统计: %d年", tenantID, year)
err := OrderStatistics.GenerateYearlyStatistics(ctx, tenantID, year, false)
if err != nil {
g.Log().Errorf(ctx, "生成租户 %d 年统计失败: %v", tenantID, err)
return
}
g.Log().Infof(ctx, "租户 %d 年统计生成成功: %d年", tenantID, year)
}
// getNextQuarterFirstDay 获取下个季度的第一天
func (s *OrderStatisticsScheduler) getNextQuarterFirstDay(now time.Time) time.Time {
quarter := s.getQuarter(now.Month())
if quarter == 4 {
return time.Date(now.Year()+1, time.January, 1, 0, 0, 0, 0, now.Location())
}
return time.Date(now.Year(), time.Month((quarter*3)+1), 1, 0, 0, 0, 0, now.Location())
}
// getQuarter 获取月份对应的季度
func (s *OrderStatisticsScheduler) getQuarter(month time.Month) int {
switch {
case month >= 1 && month <= 3:
return 1
case month >= 4 && month <= 6:
return 2
case month >= 7 && month <= 9:
return 3
default:
return 4
}
}
// isQuarterFirstDay 检查今天是否是季度的第一天
func (s *OrderStatisticsScheduler) isQuarterFirstDay() bool {
now := time.Now()
quarter := s.getQuarter(now.Month())
switch quarter {
case 1:
return now.Day() == 1 && now.Month() == time.January
case 2:
return now.Day() == 1 && now.Month() == time.April
case 3:
return now.Day() == 1 && now.Month() == time.July
case 4:
return now.Day() == 1 && now.Month() == time.October
default:
return false
}
}
// getAllTenants 获取所有租户ID这里需要根据实际业务实现
func (s *OrderStatisticsScheduler) getAllTenants(ctx context.Context) ([]int64, error) {
// 这里应该从实际的租户管理服务获取租户列表
// 暂时返回一个默认的租户ID实际使用时需要替换为真实的租户获取逻辑
return []int64{1, 2, 3}, nil
}

View File

@@ -12,6 +12,8 @@ import (
"order/model/dto" "order/model/dto"
"order/model/entity" "order/model/entity"
"gitee.com/red-future---jilin-g/common/http"
"github.com/gogf/gf/v2/frame/g"
"go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/bson"
) )
@@ -188,8 +190,50 @@ func (s *payment) HandlePaymentNotify(ctx context.Context, req *dto.PaymentNotif
return fmt.Errorf("更新支付记录失败: %w", err) return fmt.Errorf("更新支付记录失败: %w", err)
} }
// 4. 如果支付成功,更新订单状态 // 4. 如果支付成功,更新订单状态并处理库存
if req.Status == "success" { if req.Status == "success" {
// 获取订单信息以处理库存
order, err := dao.Order.GetPendingOrder(ctx, req.OrderNo)
if err != nil {
return fmt.Errorf("获取待支付订单失败: %w", err)
}
if order == nil {
return errors.New("待支付订单不存在")
}
// 处理库存扣减
for _, item := range order.OrderItems {
// 判断是否需要扣减库存
shouldDeduct, saleMode, err := s.shouldDeductStock(ctx, paymentRecord.TenantId.(string), item.AssetID)
if err != nil {
g.Log().Errorf(ctx, "检查库存策略失败,订单:%s资产%s错误%v", req.OrderNo, item.AssetID, err)
continue
}
// 如果需要扣减库存,判断扣减时机
if shouldDeduct {
timing, err := s.getDeductStockTiming(ctx, paymentRecord.TenantId.(string), item.AssetID)
if err != nil {
g.Log().Errorf(ctx, "获取库存扣减时机失败,订单:%s资产%s错误%v", req.OrderNo, item.AssetID, err)
continue
}
// 常规售卖:支付成功时扣减库存
if timing == "payment_success" {
quantity := len(item.Stocks) // 每个库存项数量为1
if err := s.deductStock(ctx, paymentRecord.TenantId.(string), req.OrderNo, item.AssetID, quantity, fmt.Sprintf("支付成功扣减库存,售卖方式:%s", saleMode)); err != nil {
g.Log().Errorf(ctx, "支付成功扣减库存失败,订单:%s资产%s错误%v", req.OrderNo, item.AssetID, err)
// 库存扣减失败不影响订单状态更新,但需要记录错误
continue
}
g.Log().Infof(ctx, "资产 %s 支付成功扣减库存成功,订单:%s数量%d", item.AssetID, req.OrderNo, quantity)
}
}
}
// 更新订单状态
updateData := bson.M{ updateData := bson.M{
"paid_at": time.Now(), "paid_at": time.Now(),
"transaction_id": req.TransactionID, "transaction_id": req.TransactionID,
@@ -344,3 +388,141 @@ func (s *payment) verifyRefundNotifySignature(req *dto.RefundNotifyReq) bool {
// 为了演示我们总是返回true // 为了演示我们总是返回true
return true return true
} }
// shouldDeductStock 判断是否需要扣减库存
func (s *payment) shouldDeductStock(ctx context.Context, tenantID, assetID string) (bool, string, error) {
// 调用assets服务获取资产信息
assetResp, err := s.getAssetFromAssetService(ctx, tenantID, assetID)
if err != nil {
return false, "", fmt.Errorf("获取资产信息失败: %w", err)
}
if assetResp == nil {
return false, "", errors.New("资产不存在")
}
// 无库存限制的资产不需要扣减库存
if assetResp.UnlimitedStock {
return false, "unlimited_stock", nil
}
// 预售场景不扣减库存
if assetResp.SaleMode == "presale" {
return false, "presale_no_deduct", nil
}
// 有库存限制的常规售卖和秒杀需要扣减库存
return true, assetResp.SaleMode, nil
}
// getDeductStockTiming 获取库存扣减时机
func (s *payment) getDeductStockTiming(ctx context.Context, tenantID, assetID string) (string, error) {
assetResp, err := s.getAssetFromAssetService(ctx, tenantID, assetID)
if err != nil {
return "", fmt.Errorf("获取资产信息失败: %w", err)
}
if assetResp == nil {
return "", errors.New("资产不存在")
}
// 根据售卖方式确定扣减时机
switch assetResp.SaleMode {
case "flash":
return "order_create", nil // 秒杀:下单时扣减库存
case "regular":
return "payment_success", nil // 常规售卖:支付成功时扣减库存
default:
return "", errors.New("未知售卖方式")
}
}
// deductStock 扣减库存
func (s *payment) deductStock(ctx context.Context, tenantID, orderNo, assetID string, quantity int, reason string) error {
// 调用assets服务扣减库存
err := s.deductStockFromAssetService(ctx, tenantID, assetID, quantity)
if err != nil {
return fmt.Errorf("扣减库存失败: %w", err)
}
g.Log().Infof(ctx, "库存扣减成功,订单:%s资产%s数量%d原因%s", orderNo, assetID, quantity, reason)
return nil
}
// refundStock 回充库存
func (s *payment) refundStock(ctx context.Context, tenantID, orderNo, assetID string, quantity int, reason string) error {
// 调用资产服务回充库存
err := s.refundStockFromAssetService(ctx, tenantID, assetID, quantity)
if err != nil {
return fmt.Errorf("回充库存失败: %w", err)
}
g.Log().Infof(ctx, "库存回充成功,订单:%s资产%s数量%d原因%s", orderNo, assetID, quantity, reason)
return nil
}
// getAssetFromAssetService 从资产服务获取资产信息
func (s *payment) getAssetFromAssetService(ctx context.Context, tenantID, assetID string) (*AssetServiceResponse, error) {
// 使用common/http中的封装方法调用资产服务
// 这里应该调用assets服务的getAsset接口
// 暂时返回模拟数据
var assetResp AssetServiceResponse
err := http.Get(ctx, fmt.Sprintf("http://assets-service/internal/asset/%s", assetID), &assetResp)
if err != nil {
return nil, err
}
return &assetResp, nil
}
// deductStockFromAssetService 从资产服务扣减库存
func (s *payment) deductStockFromAssetService(ctx context.Context, tenantID, assetID string, quantity int) error {
// 调用assets服务的扣减库存接口
req := map[string]interface{}{
"asset_id": assetID,
"quantity": quantity,
"reason": "订单扣减",
}
var result struct {
Success bool `json:"success"`
Message string `json:"message"`
}
err := http.Post(ctx, "http://assets-service/internal/stock/deduct", &result, req)
if err != nil {
return err
}
if !result.Success {
return errors.New(result.Message)
}
return nil
}
// refundStockFromAssetService 从资产服务回充库存
func (s *payment) refundStockFromAssetService(ctx context.Context, tenantID, assetID string, quantity int) error {
// 调用资产服务的回充库存接口
req := map[string]interface{}{
"asset_id": assetID,
"quantity": quantity,
"reason": "订单取消回充",
}
var result struct {
Success bool `json:"success"`
Message string `json:"message"`
}
err := http.Post(ctx, "http://assets-service/internal/stock/refund", &result, req)
if err != nil {
return err
}
if !result.Success {
return errors.New(result.Message)
}
return nil
}