Files
customer-server/service/archive_service.go
2026-03-14 10:02:49 +08:00

208 lines
6.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package service - 归档服务
// 功能对话记录从MongoDB归档到Elasticsearch定时任务+手动触发
package service
import (
"context"
"customer-server/dao"
"customer-server/model/entity"
"gitea.com/red-future/common/elasticsearch"
"gitea.com/red-future/common/jaeger"
"gitea.com/red-future/common/redis"
"github.com/gogf/gf/v2/os/gcron"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
)
// archiveService 归档服务
type archiveService struct{}
// ArchiveService 归档服务单例
var ArchiveService = new(archiveService)
// 归档锁的键名和过期时间
const (
archiveLockKey = "archive:monthly:lock"
archiveLockExpire = 3600 // 1 小时
)
// MonthlyArchive 月度归档主流程
// 参数: ctx - 上下文
// 返回: err - 错误信息
// 功能: 将MongoDB对话记录归档到Elasticsearch流程1.复制到临时表 2.删除原表 3.写入ES 4.删临时表
// 注意: 使用分布式锁确保只有一个节点执行
func (s *archiveService) MonthlyArchive(ctx context.Context) (err error) {
// 获取分布式锁,确保只有一个节点执行归档
if !redis.TryLock(ctx, archiveLockKey, archiveLockExpire) {
glog.Info(ctx, "其他节点正在执行归档,本节点跳过")
return
}
defer redis.Unlock(ctx, archiveLockKey)
beginTime := gtime.Now()
glog.Info(ctx, "========== 开始月度归档 ==========")
// 计算归档时间范围
now := gtime.Now()
var archiveStart, archiveEnd *gtime.Time
testMode := GetConfigBool(ctx, "archive.testMode")
if testMode {
// 测试模式:归档最近 7 天
archiveStart = now.AddDate(0, 0, -7)
archiveEnd = now
glog.Infof(ctx, "[测试模式] 归档时间范围: %s 至 %s最近 7 天)",
archiveStart.Format("Y-m-d H:i:s"),
archiveEnd.Format("Y-m-d H:i:s"))
} else {
// 生产模式:归档上个月整月数据
// 本月第一天 00:00:00
archiveEnd = gtime.NewFromStr(now.Format("Y-m") + "-01 00:00:00")
// 上个月第一天 00:00:00
archiveStart = archiveEnd.AddDate(0, -1, 0)
// 计算上个月天数
daysInLastMonth := archiveEnd.AddDate(0, 0, -1).Day()
glog.Infof(ctx, "归档时间范围: %s 至 %s共 %d 天)",
archiveStart.Format("Y-m-d H:i:s"),
archiveEnd.AddDate(0, 0, -1).Format("Y-m-d 23:59:59"),
daysInLastMonth)
}
// Step 1: 复制数据到临时表
glog.Info(ctx, "[Step 1/4] 复制数据到临时表...")
copyCount, err := dao.Archive.CopyToTempByRange(ctx, archiveStart.Time, archiveEnd.Time)
if err != nil {
jaeger.RecordError(ctx, err, "复制数据到临时表失败")
return
}
if copyCount == 0 {
glog.Info(ctx, "没有需要归档的数据,跳过")
return
}
glog.Infof(ctx, "复制完成,共 %d 条记录", copyCount)
// Step 2: 删除原表数据
glog.Info(ctx, "[Step 2/4] 删除原表数据...")
deleteCount, err := dao.Archive.DeleteByTempIds(ctx)
if err != nil {
jaeger.RecordError(ctx, err, "删除原表数据失败")
// 不返回错误,继续尝试后续步骤
}
glog.Infof(ctx, "删除完成,共 %d 条记录", deleteCount)
// Step 3: 写入 ES
glog.Info(ctx, "[Step 3/4] 写入 ES...")
if err = s.writeToES(ctx); err != nil {
jaeger.RecordError(ctx, err, "写入 ES 失败")
// 不返回错误,临时表数据保留,下次可以重试
return
}
// Step 4: 删除临时表
glog.Info(ctx, "[Step 4/4] 删除临时表...")
if err = dao.Archive.DropTempCollection(ctx); err != nil {
jaeger.RecordError(ctx, err, "删除临时表失败")
return
}
elapsed := gtime.Now().Sub(beginTime)
glog.Infof(ctx, "========== 月度归档完成,耗时: %s ==========", elapsed)
return
}
// writeToES 将临时表数据写入 ES
func (s *archiveService) writeToES(ctx context.Context) (err error) {
// 确保索引存在
if err = elasticsearch.CreateIndexIfNotExists(ctx, entity.ConversationESIndex, entity.ConversationESMapping); err != nil {
return
}
// 获取临时表数据
tempData, err := dao.Archive.GetTempData(ctx)
if err != nil {
return
}
if len(tempData) == 0 {
glog.Info(ctx, "临时表无数据")
return
}
// 转换为 ES 文档格式
now := gtime.Now().Time
docs := make([]interface{}, 0, len(tempData))
for _, temp := range tempData {
doc := entity.ConversationES{
Id: temp.OriginalId,
UserId: temp.UserId,
Platform: temp.Platform,
SessionId: temp.SessionId,
Question: temp.Question,
Answer: temp.Answer,
MessageId: temp.MessageId,
MsgTime: *temp.MsgTime, // 解引用指针类型
TenantId: gconv.String(temp.TenantId),
CreatedAt: *temp.CreatedAt, // 解引用指针类型
UpdatedAt: *temp.UpdatedAt, // 解引用指针类型
ArchivedAt: now,
}
docs = append(docs, doc)
}
// 批量写入 ES
batchSize := GetConfigInt(ctx, "archive.esBatchSize")
for i := 0; i < len(docs); i += batchSize {
end := i + batchSize
if end > len(docs) {
end = len(docs)
}
if err = elasticsearch.BulkIndex(ctx, entity.ConversationESIndex, docs[i:end]); err != nil {
return
}
}
glog.Infof(ctx, "ES 写入完成,共 %d 条记录", len(docs))
return
}
// ============== 定时任务(月度归档)==============
// StartArchiveCron 启动归档定时任务
// 默认每月 1 号凌晨 3 点执行
func (s *archiveService) StartCron(ctx context.Context) {
cronExpr := GetConfigString(ctx, "archive.cron")
enabled := GetConfigBool(ctx, "archive.enabled")
if !enabled {
glog.Info(ctx, "月度归档定时任务已禁用")
return
}
ctx, span := jaeger.NewSpan(ctx, "cron.archive.register")
defer span.End()
_, err := gcron.Add(ctx, cronExpr, func(ctx context.Context) {
ctx, span := jaeger.NewSpan(ctx, "cron.archive.monthly")
defer span.End()
glog.Info(ctx, "月度归档定时任务开始执行")
if err := s.MonthlyArchive(ctx); err != nil {
jaeger.RecordError(ctx, err, "月度归档执行失败")
}
}, "monthly-archive")
if err != nil {
jaeger.RecordError(ctx, err, "注册月度归档定时任务失败")
return
}
glog.Infof(ctx, "月度归档定时任务已启动 - Cron: %s", cronExpr)
}
// RunNow 立即执行归档(用于手动触发或测试)
func (s *archiveService) RunNow(ctx context.Context) error {
return s.MonthlyArchive(ctx)
}