Files
data-engine/dao/tencent/account_relation_dao.go
2026-06-10 15:56:02 +08:00

119 lines
3.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package tencent
import (
"context"
consts "dataengine/consts/public"
entity "dataengine/model/entity/tencent"
"time"
"gitea.redpowerfuture.com/red-future/common/db/gfdb"
"github.com/gogf/gf/v2/frame/g"
"github.com/sirupsen/logrus"
)
type accountRelationDao struct{}
var AccountRelation = new(accountRelationDao)
// Upsert 插入或更新账户关系根据account_id判断
func (d *accountRelationDao) Upsert(ctx context.Context, item *entity.AccountRelation) error {
now := time.Now()
// 检查是否已存在
var existing entity.AccountRelation
err := gfdb.DB(ctx).Model(ctx, consts.TencentAccountRelationTable).
Where("tenant_id", item.TenantId).
Where(entity.AccountRelationCols.AccountID, item.AccountID).
WhereNull(entity.AccountRelationCols.DeletedAt).
Scan(&existing)
// Scan找不到记录时err不为nil但这是正常情况需要继续执行插入
if err != nil && existing.Id == 0 {
// 记录不存在,执行插入
item.CreatedAt = &now
item.UpdatedAt = &now
_, err = gfdb.DB(ctx).Model(ctx, consts.TencentAccountRelationTable).
Data(item).
Insert()
return err
}
// 记录存在,执行更新
item.UpdatedAt = &now
_, err = gfdb.DB(ctx).Model(ctx, consts.TencentAccountRelationTable).
Where("id", existing.Id).
Data(g.Map{
entity.AccountRelationCols.CorporationName: item.CorporationName,
entity.AccountRelationCols.CommentDataList: item.CommentDataList,
entity.AccountRelationCols.IsAdx: item.IsAdx,
entity.AccountRelationCols.IsBid: item.IsBid,
entity.AccountRelationCols.IsMp: item.IsMp,
entity.AccountRelationCols.UpdatedAt: now,
}).
Update()
return err
}
// BatchUpsert 批量插入或更新(使用 OnConflict 实现 Upsert
func (d *accountRelationDao) BatchUpsert(ctx context.Context, items []*entity.AccountRelation) (successCount int, err error) {
if len(items) == 0 {
return 0, nil
}
logrus.Infof("开始批量Upsert: %d 条记录", len(items))
// 分批处理每批100条
batchSize := 100
successCount = 0
for i := 0; i < len(items); i += batchSize {
end := i + batchSize
if end > len(items) {
end = len(items)
}
batch := items[i:end]
logrus.Infof("处理第 %d-%d 条记录", i+1, end)
// 执行批量插入,使用 OnConflict 实现 Upsert
result, err := gfdb.DB(ctx).Model(ctx, consts.TencentAccountRelationTable).
Data(batch).
OnConflict(entity.AccountRelationCols.AccountID).
Save()
if err != nil {
logrus.Errorf("批量Upsert失败: %v尝试逐条处理", err)
// 批量失败,逐条处理
for _, item := range batch {
if upsertErr := d.Upsert(ctx, item); upsertErr != nil {
logrus.Errorf("逐条Upsert失败: account_id=%d, err=%v", item.AccountID, upsertErr)
continue
}
successCount++
}
} else {
affected, _ := result.RowsAffected()
successCount += int(affected)
logrus.Infof("批量Upsert成功: 影响 %d 条记录", affected)
}
}
logrus.Infof("批量Upsert完成: 成功 %d 条", successCount)
return successCount, nil
}
// ListAll 获取所有账户关系
func (d *accountRelationDao) ListAll(ctx context.Context) ([]entity.AccountRelation, error) {
var list []entity.AccountRelation
err := gfdb.DB(ctx).Model(ctx, consts.TencentAccountRelationTable).
WhereNull(entity.AccountRelationCols.DeletedAt).
OrderAsc(entity.AccountRelationCols.AccountID).
Scan(&list)
return list, err
}