Files
customer-server/dao/archive_dao.go
2026-03-14 10:02:49 +08:00

159 lines
3.9 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package dao
import (
"context"
"customer-server/model/entity"
"time"
"gitea.com/red-future/common/db/mongo"
"github.com/gogf/gf/v2/frame/g"
"go.mongodb.org/mongo-driver/v2/bson"
)
// archive 归档 DAO
type archive struct{}
// Archive 归档 DAO 单例
var Archive = new(archive)
// CopyToTempByRange 将指定时间范围的数据复制到临时表
// startTime: 开始时间包含endTime: 结束时间(不包含)
func (d *archive) CopyToTempByRange(ctx context.Context, startTime, endTime time.Time) (count int64, err error) {
db := mongo.GetDB()
// 查询指定时间范围的数据
filter := bson.M{
"createdAt": bson.M{
"$gte": startTime,
"$lt": endTime,
},
"isDeleted": false,
}
cursor, err := db.Collection(entity.ConversationCollection).Find(ctx, filter)
if err != nil {
return
}
defer cursor.Close(ctx)
// 批量插入临时表
batchSize := g.Cfg().MustGet(ctx, "archive.mongoBatchSize", 1000).Int()
var docs []interface{}
for cursor.Next(ctx) {
var conv entity.Conversation
if err = cursor.Decode(&conv); err != nil {
return
}
// 转换为临时表结构
temp := entity.ConversationArchiveTemp{
MongoBaseDO: conv.MongoBaseDO,
UserId: conv.UserId,
Platform: conv.Platform,
SessionId: conv.SessionId,
Question: conv.Question,
Answer: conv.Answer,
MessageId: conv.MessageId,
MsgTime: conv.MsgTime,
OriginalId: conv.Id.Hex(), // 保存原始 ID
}
// 清空 ID让 MongoDB 自动生成新 ID
temp.Id = nil
docs = append(docs, temp)
// 批量插入
if len(docs) >= batchSize {
if _, err = db.Collection(entity.ConversationArchiveTempCollection).InsertMany(ctx, docs); err != nil {
return
}
count += int64(len(docs))
docs = docs[:0]
}
}
// 插入剩余数据
if len(docs) > 0 {
if _, err = db.Collection(entity.ConversationArchiveTempCollection).InsertMany(ctx, docs); err != nil {
return
}
count += int64(len(docs))
}
return
}
// DeleteByTempIds 根据临时表中的 originalId 删除原表数据
func (d *archive) DeleteByTempIds(ctx context.Context) (count int64, err error) {
db := mongo.GetDB()
// 从临时表获取所有 originalId
cursor, err := db.Collection(entity.ConversationArchiveTempCollection).Find(ctx, bson.M{})
if err != nil {
return
}
defer cursor.Close(ctx)
var ids []bson.ObjectID
for cursor.Next(ctx) {
var temp entity.ConversationArchiveTemp
if err = cursor.Decode(&temp); err != nil {
return
}
if oid, parseErr := bson.ObjectIDFromHex(temp.OriginalId); parseErr == nil {
ids = append(ids, oid)
}
// 每 1000 条批量删除一次
if len(ids) >= 1000 {
result, delErr := db.Collection(entity.ConversationCollection).DeleteMany(ctx, bson.M{
"_id": bson.M{"$in": ids},
})
if delErr != nil {
err = delErr
return
}
count += result.DeletedCount
ids = ids[:0]
}
}
// 删除剩余数据
if len(ids) > 0 {
result, delErr := db.Collection(entity.ConversationCollection).DeleteMany(ctx, bson.M{
"_id": bson.M{"$in": ids},
})
if delErr != nil {
err = delErr
return
}
count += result.DeletedCount
}
return
}
// GetTempData 获取临时表数据(用于写入 ES
func (d *archive) GetTempData(ctx context.Context) (data []*entity.ConversationArchiveTemp, err error) {
db := mongo.GetDB()
cursor, err := db.Collection(entity.ConversationArchiveTempCollection).Find(ctx, bson.M{})
if err != nil {
return
}
defer cursor.Close(ctx)
err = cursor.All(ctx, &data)
return
}
// DropTempCollection 删除临时表
func (d *archive) DropTempCollection(ctx context.Context) (err error) {
return mongo.GetDB().Collection(entity.ConversationArchiveTempCollection).Drop(ctx)
}
// CountTemp 统计临时表记录数
func (d *archive) CountTemp(ctx context.Context) (count int64, err error) {
return mongo.GetDB().Collection(entity.ConversationArchiveTempCollection).CountDocuments(ctx, bson.M{})
}