Files
cid/service/stat_report_scheduler.go
2025-12-09 16:10:45 +08:00

635 lines
19 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package service
import (
"context"
"fmt"
"sync"
"time"
"cid/dao"
"cid/model/entity"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
)
// StatReportScheduler 统计报表定时任务调度器
type StatReportScheduler struct{}
var StatReportSchedulerInstance = &StatReportScheduler{}
var schedulerLock sync.Mutex
var isSchedulerRunning bool
// StartScheduler 启动定时任务调度器(分布式安全)
func (s *StatReportScheduler) StartScheduler(ctx context.Context) error {
schedulerLock.Lock()
defer schedulerLock.Unlock()
// 检查是否已经有调度器在运行(分布式部署时避免重复执行)
if isSchedulerRunning {
g.Log().Info(ctx, "统计报表定时任务调度器已在运行")
return nil
}
// 尝试获取分布式锁
if !s.acquireDistributedLock(ctx) {
g.Log().Info(ctx, "其他节点正在运行统计报表定时任务,当前节点跳过")
return nil
}
isSchedulerRunning = true
// 启动锁续期任务
go s.startLockRenewal(ctx)
// 启动日报表生成任务每天凌晨3点执行
go s.startDailyReportScheduler(ctx)
// 启动月报表生成任务每月1日凌晨4点执行
go s.startMonthlyReportScheduler(ctx)
// 启动季度报表生成任务每季度第一天凌晨5点执行
go s.startQuarterlyReportScheduler(ctx)
// 启动年报表生成任务每年1月1日凌晨6点执行
go s.startYearlyReportScheduler(ctx)
g.Log().Info(ctx, "统计报表定时任务调度器已启动")
return nil
}
// acquireDistributedLock 获取分布式锁基于Redis
func (s *StatReportScheduler) acquireDistributedLock(ctx context.Context) bool {
// 使用Redis实现分布式锁
// 锁的有效期为1小时避免死锁
lockKey := "stat_report_scheduler_lock"
lockValue := fmt.Sprintf("%d", time.Now().Unix())
// 尝试获取锁
result, err := g.Redis().Do(ctx, "SET", lockKey, lockValue, "NX", "EX", 3600)
if err != nil {
g.Log().Errorf(ctx, "获取分布式锁失败: %v", err)
return false
}
return result != nil
}
// renewDistributedLock 续期分布式锁
func (s *StatReportScheduler) renewDistributedLock(ctx context.Context) bool {
lockKey := "stat_report_scheduler_lock"
// 检查锁是否存在
exists, err := g.Redis().Do(ctx, "EXISTS", lockKey)
if err != nil || exists == nil {
return false
}
// 检查锁是否存在EXISTS返回1表示存在0表示不存在
existsInt := exists.Int64()
if existsInt == 0 {
return false
}
// 续期锁延长1小时
_, err = g.Redis().Do(ctx, "EXPIRE", lockKey, 3600)
if err != nil {
g.Log().Errorf(ctx, "续期分布式锁失败: %v", err)
return false
}
return true
}
// startLockRenewal 启动锁续期任务
func (s *StatReportScheduler) startLockRenewal(ctx context.Context) {
ticker := time.NewTicker(30 * time.Minute) // 每30分钟续期一次
defer ticker.Stop()
for {
select {
case <-ticker.C:
if !s.renewDistributedLock(ctx) {
g.Log().Error(ctx, "锁续期失败,调度器将停止运行")
// 锁丢失,停止调度器
schedulerLock.Lock()
isSchedulerRunning = false
schedulerLock.Unlock()
return
}
case <-ctx.Done():
return
}
}
}
// acquireTaskLock 获取任务级分布式锁
func (s *StatReportScheduler) acquireTaskLock(ctx context.Context, lockKey string) bool {
lockValue := fmt.Sprintf("%d", time.Now().Unix())
// 尝试获取任务锁有效期为2小时
result, err := g.Redis().Do(ctx, "SET", lockKey, lockValue, "NX", "EX", 7200)
if err != nil {
g.Log().Errorf(ctx, "获取任务锁失败: %v", err)
return false
}
return result != nil
}
// releaseTaskLock 释放任务级分布式锁
func (s *StatReportScheduler) releaseTaskLock(ctx context.Context, lockKey string) {
_, err := g.Redis().Do(ctx, "DEL", lockKey)
if err != nil {
g.Log().Errorf(ctx, "释放任务锁失败: %v", err)
}
}
// startDailyReportScheduler 日报表定时任务
func (s *StatReportScheduler) startDailyReportScheduler(ctx context.Context) {
// 计算到凌晨3点的时间
now := time.Now()
next := time.Date(now.Year(), now.Month(), now.Day()+1, 3, 0, 0, 0, time.Local)
duration := next.Sub(now)
// 等待到凌晨3点
time.Sleep(duration)
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
// 立即执行一次昨天的日报表生成
go s.generateYesterdayDailyReport(ctx)
for {
select {
case <-ticker.C:
// 生成昨天的日报表
s.generateYesterdayDailyReport(ctx)
case <-ctx.Done():
return
}
}
}
// startMonthlyReportScheduler 月报表定时任务
func (s *StatReportScheduler) startMonthlyReportScheduler(ctx context.Context) {
// 计算到下个月1日凌晨4点的时间
now := time.Now()
next := time.Date(now.Year(), now.Month()+1, 1, 4, 0, 0, 0, time.Local)
duration := next.Sub(now)
// 等待到下个月1日凌晨4点
time.Sleep(duration)
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 检查是否是每月1日如果是则生成上个月的月报表
if time.Now().Day() == 1 {
go s.generateLastMonthReport(ctx)
}
case <-ctx.Done():
return
}
}
}
// startQuarterlyReportScheduler 季度报表定时任务
func (s *StatReportScheduler) startQuarterlyReportScheduler(ctx context.Context) {
// 计算到下个季度第一天凌晨5点的时间
now := time.Now()
nextQuarter := s.getNextQuarterFirstDay(now)
next := time.Date(nextQuarter.Year(), nextQuarter.Month(), nextQuarter.Day(), 5, 0, 0, 0, time.Local)
duration := next.Sub(now)
// 等待到下个季度第一天凌晨5点
time.Sleep(duration)
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 检查是否是季度第一天,如果是则生成上个季度的季度报表
if s.isQuarterFirstDay() {
go s.generateLastQuarterReport(ctx)
}
case <-ctx.Done():
return
}
}
}
// startYearlyReportScheduler 年报表定时任务
func (s *StatReportScheduler) startYearlyReportScheduler(ctx context.Context) {
// 计算到明年1月1日凌晨6点的时间
now := time.Now()
next := time.Date(now.Year()+1, time.January, 1, 6, 0, 0, 0, time.Local)
duration := next.Sub(now)
// 等待到明年1月1日凌晨6点
time.Sleep(duration)
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 检查是否是1月1日如果是则生成去年的年报表
if time.Now().Month() == time.January && time.Now().Day() == 1 {
go s.generateLastYearReport(ctx)
}
case <-ctx.Done():
return
}
}
}
// generateYesterdayDailyReport 生成昨天的日报表
func (s *StatReportScheduler) generateYesterdayDailyReport(ctx context.Context) error {
yesterday := time.Now().AddDate(0, 0, -1)
return s.generateDailyReportForDate(ctx, yesterday)
}
// generateLastMonthReport 生成上个月的月报表
func (s *StatReportScheduler) generateLastMonthReport(ctx context.Context) error {
lastMonth := time.Now().AddDate(0, -1, 0)
return s.generateMonthlyReportFromDaily(ctx, lastMonth)
}
// generateLastQuarterReport 生成上个季度的季度报表
func (s *StatReportScheduler) generateLastQuarterReport(ctx context.Context) error {
lastQuarter := time.Now().AddDate(0, -3, 0)
return s.generateQuarterlyReportFromMonthly(ctx, lastQuarter)
}
// generateLastYearReport 生成去年的年报表
func (s *StatReportScheduler) generateLastYearReport(ctx context.Context) error {
lastYear := time.Now().AddDate(-1, 0, 0)
return s.generateYearlyReportFromQuarterly(ctx, lastYear)
}
// generateDailyReportForDate 为指定日期生成日报表
func (s *StatReportScheduler) generateDailyReportForDate(ctx context.Context, date time.Time) error {
// 获取日报表任务分布式锁
dailyLockKey := fmt.Sprintf("daily_report_lock_%s", date.Format("2006-01-02"))
if !s.acquireTaskLock(ctx, dailyLockKey) {
g.Log().Info(ctx, "其他节点正在生成日报表,日期: %s", date.Format("2006-01-02"))
return nil
}
defer s.releaseTaskLock(ctx, dailyLockKey)
// 获取所有租户
tenants, err := s.getAllTenants(ctx)
if err != nil {
return err
}
for _, tenantID := range tenants {
// 检查是否已生成该日期的报表
if s.isReportGenerated(ctx, tenantID, "daily", date.Format("2006-01-02")) {
continue
}
// 生成日报表数据(从流水数据统计)
reportData, err := s.generateReportDataFromRawData(ctx, tenantID, 0, "daily", date)
if err != nil {
g.Log().Errorf(ctx, "生成租户%d日报表失败: %v", tenantID, err)
continue
}
// 保存日报表
report := &entity.StatReport{
TenantId: tenantID,
AppID: 0, // 0表示所有应用
ReportType: "daily",
ReportDate: date,
ReportData: gconv.String(reportData),
GeneratedAt: time.Now(),
Status: "completed",
}
_, err = dao.StatReport.Create(ctx, report)
if err != nil {
g.Log().Errorf(ctx, "保存租户%d日报表失败: %v", tenantID, err)
continue
}
g.Log().Infof(ctx, "成功生成租户%d的日报表日期: %s", tenantID, date.Format("2006-01-02"))
}
return nil
}
// generateMonthlyReportFromDaily 从日报表生成月报表
func (s *StatReportScheduler) generateMonthlyReportFromDaily(ctx context.Context, date time.Time) error {
// 获取月报表任务分布式锁
monthlyLockKey := fmt.Sprintf("monthly_report_lock_%s", date.Format("2006-01"))
if !s.acquireTaskLock(ctx, monthlyLockKey) {
g.Log().Info(ctx, "其他节点正在生成月报表,日期: %s", date.Format("2006-01"))
return nil
}
defer s.releaseTaskLock(ctx, monthlyLockKey)
tenants, err := s.getAllTenants(ctx)
if err != nil {
return err
}
for _, tenantID := range tenants {
if s.isReportGenerated(ctx, tenantID, "monthly", date.Format("2006-01")) {
continue
}
// 获取该月的所有日报表数据
dailyReports, err := s.getDailyReportsForMonth(ctx, tenantID, date)
if err != nil {
g.Log().Errorf(ctx, "获取租户%d月报数据失败: %v", tenantID, err)
continue
}
// 聚合日报表数据生成月报表
reportData := s.aggregateDailyReportsToMonthly(dailyReports)
report := &entity.StatReport{
TenantId: tenantID,
AppID: 0,
ReportType: "monthly",
ReportDate: date,
ReportData: gconv.String(reportData),
GeneratedAt: time.Now(),
Status: "completed",
}
_, err = dao.StatReport.Create(ctx, report)
if err != nil {
g.Log().Errorf(ctx, "保存租户%d月报表失败: %v", tenantID, err)
continue
}
g.Log().Infof(ctx, "成功生成租户%d的月报表日期: %s", tenantID, date.Format("2006-01"))
}
return nil
}
// generateQuarterlyReportFromMonthly 从月报表生成季度报表
func (s *StatReportScheduler) generateQuarterlyReportFromMonthly(ctx context.Context, date time.Time) error {
// 获取季度报表任务分布式锁
quarter := fmt.Sprintf("Q%d", (date.Month()-1)/3+1)
quarterlyLockKey := fmt.Sprintf("quarterly_report_lock_%d-%s", date.Year(), quarter)
if !s.acquireTaskLock(ctx, quarterlyLockKey) {
g.Log().Info(ctx, "其他节点正在生成季度报表,日期: %d-%s", date.Year(), quarter)
return nil
}
defer s.releaseTaskLock(ctx, quarterlyLockKey)
tenants, err := s.getAllTenants(ctx)
if err != nil {
return err
}
for _, tenantID := range tenants {
reportDate := fmt.Sprintf("%d-%s", date.Year(), quarter)
if s.isReportGenerated(ctx, tenantID, "quarterly", reportDate) {
continue
}
// 获取该季度的所有月报表数据
monthlyReports, err := s.getMonthlyReportsForQuarter(ctx, tenantID, date)
if err != nil {
g.Log().Errorf(ctx, "获取租户%d季报数据失败: %v", tenantID, err)
continue
}
// 聚合月报表数据生成季度报表
reportData := s.aggregateMonthlyReportsToQuarterly(monthlyReports)
report := &entity.StatReport{
TenantId: tenantID,
AppID: 0,
ReportType: "quarterly",
ReportDate: date,
ReportData: gconv.String(reportData),
GeneratedAt: time.Now(),
Status: "completed",
}
_, err = dao.StatReport.Create(ctx, report)
if err != nil {
g.Log().Errorf(ctx, "保存租户%d季度报表失败: %v", tenantID, err)
continue
}
g.Log().Infof(ctx, "成功生成租户%d的季度报表日期: %s", tenantID, reportDate)
}
return nil
}
// generateYearlyReportFromQuarterly 从季度报表生成年报表
func (s *StatReportScheduler) generateYearlyReportFromQuarterly(ctx context.Context, date time.Time) error {
// 获取年报表任务分布式锁
yearlyLockKey := fmt.Sprintf("yearly_report_lock_%d", date.Year())
if !s.acquireTaskLock(ctx, yearlyLockKey) {
g.Log().Info(ctx, "其他节点正在生成年报表,日期: %d", date.Year())
return nil
}
defer s.releaseTaskLock(ctx, yearlyLockKey)
tenants, err := s.getAllTenants(ctx)
if err != nil {
return err
}
for _, tenantID := range tenants {
reportDate := fmt.Sprintf("%d", date.Year())
if s.isReportGenerated(ctx, tenantID, "yearly", reportDate) {
continue
}
// 获取该年度的所有季度报表数据
quarterlyReports, err := s.getQuarterlyReportsForYear(ctx, tenantID, date)
if err != nil {
g.Log().Errorf(ctx, "获取租户%d年报数据失败: %v", tenantID, err)
continue
}
// 聚合季度报表数据生成年报表
reportData := s.aggregateQuarterlyReportsToYearly(quarterlyReports)
report := &entity.StatReport{
TenantId: tenantID,
AppID: 0,
ReportType: "yearly",
ReportDate: date,
ReportData: gconv.String(reportData),
GeneratedAt: time.Now(),
Status: "completed",
}
_, err = dao.StatReport.Create(ctx, report)
if err != nil {
g.Log().Errorf(ctx, "保存租户%d年报表失败: %v", tenantID, err)
continue
}
g.Log().Infof(ctx, "成功生成租户%d的年报表日期: %s", tenantID, reportDate)
}
return nil
}
// generateReportDataFromRawData 从原始流水数据生成报表数据
func (s *StatReportScheduler) generateReportDataFromRawData(ctx context.Context, tenantID, appID int64, reportType string, reportDate time.Time) (map[string]interface{}, error) {
// 使用现有的报表生成逻辑
return StatReport.generateReportData(ctx, tenantID, appID, reportType, reportDate)
}
// getDailyReportsForMonth 获取某个月的所有日报表
func (s *StatReportScheduler) getDailyReportsForMonth(ctx context.Context, tenantID int64, date time.Time) ([]map[string]interface{}, error) {
startDate := time.Date(date.Year(), date.Month(), 1, 0, 0, 0, 0, time.Local)
endDate := startDate.AddDate(0, 1, -1)
reports, _, err := dao.StatReport.List(ctx, tenantID, 0, "daily", startDate.Format("2006-01-02"), endDate.Format("2006-01-02"), 1, 31)
if err != nil {
return nil, err
}
var dailyData []map[string]interface{}
for _, report := range reports {
var data map[string]interface{}
if err := gconv.Struct(report.ReportData, &data); err != nil {
continue
}
dailyData = append(dailyData, data)
}
return dailyData, nil
}
// getMonthlyReportsForQuarter 获取某个季度的所有月报表
func (s *StatReportScheduler) getMonthlyReportsForQuarter(ctx context.Context, tenantID int64, date time.Time) ([]map[string]interface{}, error) {
quarterStartMonth := time.Month(((date.Month()-1)/3)*3 + 1)
reports := make([]map[string]interface{}, 0)
for i := 0; i < 3; i++ {
monthDate := time.Date(date.Year(), quarterStartMonth+time.Month(i), 1, 0, 0, 0, 0, time.Local)
reportDate := monthDate.Format("2006-01")
report, err := dao.StatReport.GetByTenantAndDate(ctx, tenantID, "monthly", reportDate)
if err != nil || report == nil {
continue
}
var data map[string]interface{}
if err := gconv.Struct(report.ReportData, &data); err != nil {
continue
}
reports = append(reports, data)
}
return reports, nil
}
// getQuarterlyReportsForYear 获取某年的所有季度报表
func (s *StatReportScheduler) getQuarterlyReportsForYear(ctx context.Context, tenantID int64, date time.Time) ([]map[string]interface{}, error) {
reports := make([]map[string]interface{}, 0)
for quarter := 1; quarter <= 4; quarter++ {
reportDate := fmt.Sprintf("%d-Q%d", date.Year(), quarter)
report, err := dao.StatReport.GetByTenantAndDate(ctx, tenantID, "quarterly", reportDate)
if err != nil || report == nil {
continue
}
var data map[string]interface{}
if err := gconv.Struct(report.ReportData, &data); err != nil {
continue
}
reports = append(reports, data)
}
return reports, nil
}
// aggregateDailyReportsToMonthly 聚合日报表数据生成月报表
func (s *StatReportScheduler) aggregateDailyReportsToMonthly(dailyReports []map[string]interface{}) map[string]interface{} {
// 实现聚合逻辑,这里简化处理
return map[string]interface{}{
"type": "monthly",
"data": dailyReports,
"summary": "聚合后的月报数据",
}
}
// aggregateMonthlyReportsToQuarterly 聚合月报表数据生成季度报表
func (s *StatReportScheduler) aggregateMonthlyReportsToQuarterly(monthlyReports []map[string]interface{}) map[string]interface{} {
// 实现聚合逻辑,这里简化处理
return map[string]interface{}{
"type": "quarterly",
"data": monthlyReports,
"summary": "聚合后的季报数据",
}
}
// aggregateQuarterlyReportsToYearly 聚合季度报表数据生成年报表
func (s *StatReportScheduler) aggregateQuarterlyReportsToYearly(quarterlyReports []map[string]interface{}) map[string]interface{} {
// 实现聚合逻辑,这里简化处理
return map[string]interface{}{
"type": "yearly",
"data": quarterlyReports,
"summary": "聚合后的年报数据",
}
}
// getAllTenants 获取所有租户ID
func (s *StatReportScheduler) getAllTenants(ctx context.Context) ([]int64, error) {
// 这里应该从数据库查询所有租户ID
// 暂时返回示例数据
return []int64{1, 2, 3}, nil
}
// isReportGenerated 检查报表是否已生成
func (s *StatReportScheduler) isReportGenerated(ctx context.Context, tenantID int64, reportType, date string) bool {
report, err := dao.StatReport.GetByTenantAndDate(ctx, tenantID, reportType, date)
if err != nil {
return false
}
return report != nil
}
// isQuarterFirstDay 检查是否是季度第一天
func (s *StatReportScheduler) isQuarterFirstDay() bool {
now := time.Now()
month := now.Month()
day := now.Day()
// 季度第一天1月1日、4月1日、7月1日、10月1日
return (month == time.January && day == 1) ||
(month == time.April && day == 1) ||
(month == time.July && day == 1) ||
(month == time.October && day == 1)
}
// getNextQuarterFirstDay 获取下个季度第一天
func (s *StatReportScheduler) getNextQuarterFirstDay(now time.Time) time.Time {
currentQuarter := (now.Month()-1)/3 + 1
nextQuarter := currentQuarter + 1
if nextQuarter > 4 {
nextQuarter = 1
now = now.AddDate(1, 0, 0)
}
nextQuarterMonth := time.Month((nextQuarter-1)*3 + 1)
return time.Date(now.Year(), nextQuarterMonth, 1, 0, 0, 0, 0, time.Local)
}