Files
customer-server/service/archive_service.go

208 lines
6.2 KiB
Go
Raw Permalink Normal View History

2026-03-14 10:02:49 +08:00
// Package service - 归档服务
// 功能对话记录从MongoDB归档到Elasticsearch定时任务+手动触发
package service
import (
"context"
"customer-server/dao"
"customer-server/model/entity"
"gitea.com/red-future/common/elasticsearch"
"gitea.com/red-future/common/jaeger"
"gitea.com/red-future/common/redis"
"github.com/gogf/gf/v2/os/gcron"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
)
// archiveService 归档服务
type archiveService struct{}
// ArchiveService 归档服务单例
var ArchiveService = new(archiveService)
// 归档锁的键名和过期时间
const (
archiveLockKey = "archive:monthly:lock"
archiveLockExpire = 3600 // 1 小时
)
// MonthlyArchive 月度归档主流程
// 参数: ctx - 上下文
// 返回: err - 错误信息
// 功能: 将MongoDB对话记录归档到Elasticsearch流程1.复制到临时表 2.删除原表 3.写入ES 4.删临时表
// 注意: 使用分布式锁确保只有一个节点执行
func (s *archiveService) MonthlyArchive(ctx context.Context) (err error) {
// 获取分布式锁,确保只有一个节点执行归档
if !redis.TryLock(ctx, archiveLockKey, archiveLockExpire) {
glog.Info(ctx, "其他节点正在执行归档,本节点跳过")
return
}
defer redis.Unlock(ctx, archiveLockKey)
beginTime := gtime.Now()
glog.Info(ctx, "========== 开始月度归档 ==========")
// 计算归档时间范围
now := gtime.Now()
var archiveStart, archiveEnd *gtime.Time
testMode := GetConfigBool(ctx, "archive.testMode")
if testMode {
// 测试模式:归档最近 7 天
archiveStart = now.AddDate(0, 0, -7)
archiveEnd = now
glog.Infof(ctx, "[测试模式] 归档时间范围: %s 至 %s最近 7 天)",
archiveStart.Format("Y-m-d H:i:s"),
archiveEnd.Format("Y-m-d H:i:s"))
} else {
// 生产模式:归档上个月整月数据
// 本月第一天 00:00:00
archiveEnd = gtime.NewFromStr(now.Format("Y-m") + "-01 00:00:00")
// 上个月第一天 00:00:00
archiveStart = archiveEnd.AddDate(0, -1, 0)
// 计算上个月天数
daysInLastMonth := archiveEnd.AddDate(0, 0, -1).Day()
glog.Infof(ctx, "归档时间范围: %s 至 %s共 %d 天)",
archiveStart.Format("Y-m-d H:i:s"),
archiveEnd.AddDate(0, 0, -1).Format("Y-m-d 23:59:59"),
daysInLastMonth)
}
// Step 1: 复制数据到临时表
glog.Info(ctx, "[Step 1/4] 复制数据到临时表...")
copyCount, err := dao.Archive.CopyToTempByRange(ctx, archiveStart.Time, archiveEnd.Time)
if err != nil {
jaeger.RecordError(ctx, err, "复制数据到临时表失败")
return
}
if copyCount == 0 {
glog.Info(ctx, "没有需要归档的数据,跳过")
return
}
glog.Infof(ctx, "复制完成,共 %d 条记录", copyCount)
// Step 2: 删除原表数据
glog.Info(ctx, "[Step 2/4] 删除原表数据...")
deleteCount, err := dao.Archive.DeleteByTempIds(ctx)
if err != nil {
jaeger.RecordError(ctx, err, "删除原表数据失败")
// 不返回错误,继续尝试后续步骤
}
glog.Infof(ctx, "删除完成,共 %d 条记录", deleteCount)
// Step 3: 写入 ES
glog.Info(ctx, "[Step 3/4] 写入 ES...")
if err = s.writeToES(ctx); err != nil {
jaeger.RecordError(ctx, err, "写入 ES 失败")
// 不返回错误,临时表数据保留,下次可以重试
return
}
// Step 4: 删除临时表
glog.Info(ctx, "[Step 4/4] 删除临时表...")
if err = dao.Archive.DropTempCollection(ctx); err != nil {
jaeger.RecordError(ctx, err, "删除临时表失败")
return
}
elapsed := gtime.Now().Sub(beginTime)
glog.Infof(ctx, "========== 月度归档完成,耗时: %s ==========", elapsed)
return
}
// writeToES 将临时表数据写入 ES
func (s *archiveService) writeToES(ctx context.Context) (err error) {
// 确保索引存在
if err = elasticsearch.CreateIndexIfNotExists(ctx, entity.ConversationESIndex, entity.ConversationESMapping); err != nil {
return
}
// 获取临时表数据
tempData, err := dao.Archive.GetTempData(ctx)
if err != nil {
return
}
if len(tempData) == 0 {
glog.Info(ctx, "临时表无数据")
return
}
// 转换为 ES 文档格式
now := gtime.Now().Time
docs := make([]interface{}, 0, len(tempData))
for _, temp := range tempData {
doc := entity.ConversationES{
Id: temp.OriginalId,
UserId: temp.UserId,
Platform: temp.Platform,
SessionId: temp.SessionId,
Question: temp.Question,
Answer: temp.Answer,
MessageId: temp.MessageId,
MsgTime: *temp.MsgTime, // 解引用指针类型
TenantId: gconv.String(temp.TenantId),
CreatedAt: *temp.CreatedAt, // 解引用指针类型
UpdatedAt: *temp.UpdatedAt, // 解引用指针类型
ArchivedAt: now,
}
docs = append(docs, doc)
}
// 批量写入 ES
batchSize := GetConfigInt(ctx, "archive.esBatchSize")
for i := 0; i < len(docs); i += batchSize {
end := i + batchSize
if end > len(docs) {
end = len(docs)
}
if err = elasticsearch.BulkIndex(ctx, entity.ConversationESIndex, docs[i:end]); err != nil {
return
}
}
glog.Infof(ctx, "ES 写入完成,共 %d 条记录", len(docs))
return
}
// ============== 定时任务(月度归档)==============
// StartArchiveCron 启动归档定时任务
// 默认每月 1 号凌晨 3 点执行
func (s *archiveService) StartCron(ctx context.Context) {
cronExpr := GetConfigString(ctx, "archive.cron")
enabled := GetConfigBool(ctx, "archive.enabled")
if !enabled {
glog.Info(ctx, "月度归档定时任务已禁用")
return
}
ctx, span := jaeger.NewSpan(ctx, "cron.archive.register")
defer span.End()
_, err := gcron.Add(ctx, cronExpr, func(ctx context.Context) {
ctx, span := jaeger.NewSpan(ctx, "cron.archive.monthly")
defer span.End()
glog.Info(ctx, "月度归档定时任务开始执行")
if err := s.MonthlyArchive(ctx); err != nil {
jaeger.RecordError(ctx, err, "月度归档执行失败")
}
}, "monthly-archive")
if err != nil {
jaeger.RecordError(ctx, err, "注册月度归档定时任务失败")
return
}
glog.Infof(ctx, "月度归档定时任务已启动 - Cron: %s", cronExpr)
}
// RunNow 立即执行归档(用于手动触发或测试)
func (s *archiveService) RunNow(ctx context.Context) error {
return s.MonthlyArchive(ctx)
}