同步音频和图片

This commit is contained in:
2026-05-06 16:19:22 +08:00
parent 3814c95047
commit 162bab15e6
22 changed files with 5970 additions and 2 deletions

View File

@@ -0,0 +1,118 @@
package tencent
import (
"context"
consts "dataengine/consts/public"
entity "dataengine/model/entity/tencent"
"time"
"gitea.com/red-future/common/db/gfdb"
"github.com/gogf/gf/v2/frame/g"
"github.com/sirupsen/logrus"
)
type accountRelationDao struct{}
var AccountRelation = new(accountRelationDao)
// Upsert 插入或更新账户关系根据account_id判断
func (d *accountRelationDao) Upsert(ctx context.Context, item *entity.AccountRelation) error {
now := time.Now()
// 检查是否已存在
var existing entity.AccountRelation
err := gfdb.DB(ctx).Model(ctx, consts.TencentAccountRelationTable).
Where("tenant_id", item.TenantId).
Where(entity.AccountRelationCols.AccountID, item.AccountID).
WhereNull(entity.AccountRelationCols.DeletedAt).
Scan(&existing)
// Scan找不到记录时err不为nil但这是正常情况需要继续执行插入
if err != nil && existing.Id == 0 {
// 记录不存在,执行插入
item.CreatedAt = &now
item.UpdatedAt = &now
_, err = gfdb.DB(ctx).Model(ctx, consts.TencentAccountRelationTable).
Data(item).
Insert()
return err
}
// 记录存在,执行更新
item.UpdatedAt = &now
_, err = gfdb.DB(ctx).Model(ctx, consts.TencentAccountRelationTable).
Where("id", existing.Id).
Data(g.Map{
entity.AccountRelationCols.CorporationName: item.CorporationName,
entity.AccountRelationCols.CommentDataList: item.CommentDataList,
entity.AccountRelationCols.IsAdx: item.IsAdx,
entity.AccountRelationCols.IsBid: item.IsBid,
entity.AccountRelationCols.IsMp: item.IsMp,
entity.AccountRelationCols.UpdatedAt: now,
}).
Update()
return err
}
// BatchUpsert 批量插入或更新(使用 OnConflict 实现 Upsert
func (d *accountRelationDao) BatchUpsert(ctx context.Context, items []*entity.AccountRelation) (successCount int, err error) {
if len(items) == 0 {
return 0, nil
}
logrus.Infof("开始批量Upsert: %d 条记录", len(items))
// 分批处理每批100条
batchSize := 100
successCount = 0
for i := 0; i < len(items); i += batchSize {
end := i + batchSize
if end > len(items) {
end = len(items)
}
batch := items[i:end]
logrus.Infof("处理第 %d-%d 条记录", i+1, end)
// 执行批量插入,使用 OnConflict 实现 Upsert
result, err := gfdb.DB(ctx).Model(ctx, consts.TencentAccountRelationTable).
Data(batch).
OnConflict(entity.AccountRelationCols.AccountID).
Save()
if err != nil {
logrus.Errorf("批量Upsert失败: %v尝试逐条处理", err)
// 批量失败,逐条处理
for _, item := range batch {
if upsertErr := d.Upsert(ctx, item); upsertErr != nil {
logrus.Errorf("逐条Upsert失败: account_id=%d, err=%v", item.AccountID, upsertErr)
continue
}
successCount++
}
} else {
affected, _ := result.RowsAffected()
successCount += int(affected)
logrus.Infof("批量Upsert成功: 影响 %d 条记录", affected)
}
}
logrus.Infof("批量Upsert完成: 成功 %d 条", successCount)
return successCount, nil
}
// ListAll 获取所有账户关系
func (d *accountRelationDao) ListAll(ctx context.Context) ([]entity.AccountRelation, error) {
var list []entity.AccountRelation
err := gfdb.DB(ctx).Model(ctx, consts.TencentAccountRelationTable).
WhereNull(entity.AccountRelationCols.DeletedAt).
OrderAsc(entity.AccountRelationCols.AccountID).
Scan(&list)
return list, err
}

111
dao/tencent/audio_dao.go Normal file
View File

@@ -0,0 +1,111 @@
package tencent
import (
"context"
consts "dataengine/consts/public"
entity "dataengine/model/entity/tencent"
"gitea.com/red-future/common/db/gfdb"
"github.com/gogf/gf/v2/frame/g"
"github.com/sirupsen/logrus"
)
type audioDao struct{}
var Audio = new(audioDao)
// BatchUpsert 批量插入或更新(使用 OnConflict 实现 Upsert
func (d *audioDao) BatchUpsert(ctx context.Context, items []*entity.Audio) (successCount int, err error) {
if len(items) == 0 {
return 0, nil
}
logrus.Infof("开始批量Upsert音乐素材: %d 条记录", len(items))
// 分批处理每批100条
batchSize := 100
successCount = 0
for i := 0; i < len(items); i += batchSize {
end := i + batchSize
if end > len(items) {
end = len(items)
}
batch := items[i:end]
logrus.Infof("处理第 %d-%d 条音乐素材记录", i+1, end)
// 执行批量插入,使用 OnConflict 实现 Upsert
result, err := gfdb.DB(ctx).Model(ctx, consts.TencentAudioTable).
Data(batch).
OnConflict(entity.AudioCols.AudioId).
Save()
if err != nil {
logrus.Errorf("批量Upsert音乐素材失败: %v尝试逐条处理", err)
// 批量失败,逐条处理
for _, item := range batch {
if upsertErr := d.upsertSingle(ctx, item); upsertErr != nil {
logrus.Errorf("逐条Upsert音乐素材失败: audio_id=%s, err=%v", item.AudioId, upsertErr)
continue
}
successCount++
}
} else {
affected, _ := result.RowsAffected()
successCount += int(affected)
logrus.Infof("批量Upsert音乐素材成功: 影响 %d 条记录", affected)
}
}
logrus.Infof("批量Upsert音乐素材完成: 成功 %d 条", successCount)
return successCount, nil
}
// upsertSingle 单条插入或更新
func (d *audioDao) upsertSingle(ctx context.Context, item *entity.Audio) error {
var existing entity.Audio
err := gfdb.DB(ctx).Model(ctx, consts.TencentAudioTable).
Where("tenant_id", item.TenantId).
Where(entity.AudioCols.AudioId, item.AudioId).
WhereNull(entity.AudioCols.DeletedAt).
Scan(&existing)
if err != nil && existing.Id == 0 {
// 记录不存在,执行插入
_, err = gfdb.DB(ctx).Model(ctx, consts.TencentAudioTable).
Data(item).
Insert()
return err
}
// 记录存在,执行更新
_, err = gfdb.DB(ctx).Model(ctx, consts.TencentAudioTable).
Where("id", existing.Id).
Data(g.Map{
entity.AudioCols.CoverImageUrl: item.CoverImageUrl,
entity.AudioCols.AudioName: item.AudioName,
entity.AudioCols.Author: item.Author,
entity.AudioCols.Duration: item.Duration,
entity.AudioCols.ExpireTime: item.ExpireTime,
entity.AudioCols.FeelTags: item.FeelTags,
entity.AudioCols.GenreTags: item.GenreTags,
entity.AudioCols.Updater: item.Updater,
entity.AudioCols.UpdatedAt: item.UpdatedAt,
}).
Update()
return err
}
// ListAll 获取所有音乐素材
func (d *audioDao) ListAll(ctx context.Context) ([]entity.Audio, error) {
var list []entity.Audio
err := gfdb.DB(ctx).Model(ctx, consts.TencentAudioTable).
WhereNull(entity.AudioCols.DeletedAt).
OrderAsc(entity.AudioCols.AudioId).
Scan(&list)
return list, err
}

159
dao/tencent/image_dao.go Normal file
View File

@@ -0,0 +1,159 @@
package tencent
import (
"context"
consts "dataengine/consts/public"
entity "dataengine/model/entity/tencent"
"gitea.com/red-future/common/db/gfdb"
"github.com/gogf/gf/v2/frame/g"
"github.com/sirupsen/logrus"
)
type imageDao struct{}
var Image = new(imageDao)
// BatchUpsert 批量插入或更新(使用 OnConflict 实现 Upsert
func (d *imageDao) BatchUpsert(ctx context.Context, items []*entity.Image) (successCount int, err error) {
if len(items) == 0 {
return 0, nil
}
logrus.Infof("开始批量Upsert图片素材: %d 条记录", len(items))
// 分批处理每批100条
batchSize := 100
successCount = 0
for i := 0; i < len(items); i += batchSize {
end := i + batchSize
if end > len(items) {
end = len(items)
}
batch := items[i:end]
logrus.Infof("处理第 %d-%d 条图片素材记录", i+1, end)
// 执行批量插入,使用 OnConflict 实现 Upsert
result, err := gfdb.DB(ctx).Model(ctx, consts.TencentImageTable).
Data(batch).
OnConflict("(image_id, account_id)").
Save()
if err != nil {
logrus.Errorf("批量Upsert图片素材失败: %v尝试逐条处理", err)
// 批量失败,逐条处理
for _, item := range batch {
if upsertErr := d.upsertSingle(ctx, item); upsertErr != nil {
logrus.Errorf("逐条Upsert图片素材失败: image_id=%s, account_id=%d, err=%v", item.ImageId, item.AccountId, upsertErr)
} else {
successCount++
}
}
} else {
affected, _ := result.RowsAffected()
successCount += int(affected)
logrus.Infof("批量Upsert图片素材成功: 影响 %d 条记录", affected)
}
}
logrus.Infof("批量Upsert图片素材完成: 成功 %d 条", successCount)
return successCount, nil
}
// upsertSingle 单条插入或更新
func (d *imageDao) upsertSingle(ctx context.Context, item *entity.Image) error {
var existing entity.Image
err := gfdb.DB(ctx).Model(ctx, consts.TencentImageTable).
Where(entity.ImageCols.ImageId, item.ImageId).
Where(entity.ImageCols.AccountId, item.AccountId).
WhereNull("deleted_at").
Scan(&existing)
if err != nil && existing.Id == 0 {
// 记录不存在,执行插入
_, err = gfdb.DB(ctx).Model(ctx, consts.TencentImageTable).
Data(item).
Insert()
return err
}
// 记录存在,执行更新
_, err = gfdb.DB(ctx).Model(ctx, consts.TencentImageTable).
Where("id", existing.Id).
Data(g.Map{
entity.ImageCols.Width: item.Width,
entity.ImageCols.Height: item.Height,
entity.ImageCols.FileSize: item.FileSize,
entity.ImageCols.Type: item.Type,
entity.ImageCols.Signature: item.Signature,
entity.ImageCols.Description: item.Description,
entity.ImageCols.PreviewUrl: item.PreviewUrl,
entity.ImageCols.ThumbPreviewUrl: item.ThumbPreviewUrl,
entity.ImageCols.Status: item.Status,
entity.ImageCols.LastModifiedTime: item.LastModifiedTime,
}).
Update()
return err
}
// ListAll 获取所有图片素材
func (d *imageDao) ListAll(ctx context.Context) ([]entity.Image, error) {
var list []entity.Image
err := gfdb.DB(ctx).Model(ctx, consts.TencentImageTable).
WhereNull("deleted_at").
OrderAsc(entity.ImageCols.ImageId).
Scan(&list)
return list, err
}
// ListWithPage 分页查询图片素材(支持时间过滤)
func (d *imageDao) ListWithPage(ctx context.Context, page, pageSize int, accountId *int64, startTime, endTime *int64, status string) ([]entity.Image, int, error) {
model := gfdb.DB(ctx).Model(ctx, consts.TencentImageTable).
WhereNull("deleted_at")
// 账户ID过滤
if accountId != nil && *accountId > 0 {
model = model.Where(entity.ImageCols.AccountId, *accountId)
}
// 状态过滤
if status != "" {
model = model.Where(entity.ImageCols.Status, status)
}
// 时间范围过滤(根据 last_modified_time
if startTime != nil && *startTime > 0 {
model = model.WhereGTE(entity.ImageCols.LastModifiedTime, *startTime)
}
if endTime != nil && *endTime > 0 {
model = model.WhereLTE(entity.ImageCols.LastModifiedTime, *endTime)
}
// 设置排序(按最后修改时间降序)
model = model.OrderDesc(entity.ImageCols.LastModifiedTime)
// 获取总数
total, err := model.Count()
if err != nil {
return nil, 0, err
}
// 分页查询
var list []entity.Image
if page > 0 && pageSize > 0 {
err = model.Page(page, pageSize).Scan(&list)
} else {
err = model.Scan(&list)
}
if err != nil {
return nil, 0, err
}
return list, total, nil
}