oss文件存储服务-租户存储空间信息同步接口优化
This commit is contained in:
@@ -34,9 +34,7 @@ minio:
|
|||||||
secretKey: "infini_rag_flow" # 秘密密钥(本地默认)
|
secretKey: "infini_rag_flow" # 秘密密钥(本地默认)
|
||||||
secure: false # 本地 MinIO 关闭 SSL(生产按需改为 true)
|
secure: false # 本地 MinIO 关闭 SSL(生产按需改为 true)
|
||||||
region: "us-east-1" # 与 MinIO 服务端 REGION 一致(默认 us-east-1)
|
region: "us-east-1" # 与 MinIO 服务端 REGION 一致(默认 us-east-1)
|
||||||
bucketName: "my-dev-bucket" # 默认桶名(可选)
|
|
||||||
presignedExpire: "5m" # 预签名URL过期时间(≥1秒,支持m/h/d等单位)
|
|
||||||
|
|
||||||
# 文件存储初始化容量大小配置
|
# 文件存储初始化容量大小配置
|
||||||
oss:
|
oss:
|
||||||
capacitySize: 500
|
capacitySize: 10 #文件存储初始化容量(单位G)
|
||||||
95
dao/mongo_dao.go
Normal file
95
dao/mongo_dao.go
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
package dao
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
commonMongo "gitee.com/red-future---jilin-g/common/mongo"
|
||||||
|
"github.com/gogf/gf/v2/errors/gerror"
|
||||||
|
"github.com/gogf/gf/v2/frame/g"
|
||||||
|
"github.com/gogf/gf/v2/os/glog"
|
||||||
|
"github.com/gogf/gf/v2/os/gtime"
|
||||||
|
"github.com/gogf/gf/v2/util/gconv"
|
||||||
|
"go.mongodb.org/mongo-driver/v2/bson"
|
||||||
|
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||||
|
"go.mongodb.org/mongo-driver/v2/mongo/options"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MongoDAO MongoDB原生查询(不需要token验证)
|
||||||
|
var MongoDAO = &mongoDAO{}
|
||||||
|
|
||||||
|
type mongoDAO struct{}
|
||||||
|
|
||||||
|
// SaveOrUpdate 原生批量增加或修改
|
||||||
|
func (d *mongoDAO) SaveOrUpdate(ctx context.Context, filter []bson.M, update []bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (result *mongo.BulkWriteResult, err error) {
|
||||||
|
db := commonMongo.GetDB()
|
||||||
|
if len(filter) == 0 || len(update) == 0 {
|
||||||
|
err = gerror.New("缺少查询条件或更新数据")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(filter) != len(update) {
|
||||||
|
err = gerror.New("查询条件和更新数据的数量必须一致")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// 构建批量操作模型
|
||||||
|
var models []mongo.WriteModel
|
||||||
|
for i := 0; i < len(filter); i++ {
|
||||||
|
if g.IsEmpty(filter[i]["tenantId"]) && g.IsEmpty(gconv.Map(update[i]["$set"])["updater"]) {
|
||||||
|
return nil, gerror.New("tenantId不能为空")
|
||||||
|
}
|
||||||
|
if g.IsEmpty(filter[i]["updater"]) && g.IsEmpty(gconv.Map(update[i]["$set"])["updater"]) {
|
||||||
|
return nil, gerror.New("updater不能为空")
|
||||||
|
}
|
||||||
|
// 处理过滤器
|
||||||
|
filter[i]["isDeleted"] = false
|
||||||
|
// 处理更新数据
|
||||||
|
if setDoc, exists := update[i]["$set"].(bson.M); exists {
|
||||||
|
setDoc["updater"] = gconv.Map(update[i]["$set"])["updater"]
|
||||||
|
setDoc["updatedAt"] = gtime.Now().Time
|
||||||
|
} else {
|
||||||
|
// 如果没有$set字段,则创建一个
|
||||||
|
setDoc := bson.M{}
|
||||||
|
setDoc["updater"] = gconv.Map(update[i]["$set"])["updater"]
|
||||||
|
setDoc["updatedAt"] = gtime.Now().Time
|
||||||
|
update[i]["$set"] = setDoc
|
||||||
|
}
|
||||||
|
// 创建更新操作模型
|
||||||
|
updateModel := mongo.NewUpdateOneModel()
|
||||||
|
updateModel.SetFilter(filter[i])
|
||||||
|
updateModel.SetUpdate(update[i])
|
||||||
|
updateModel.SetUpsert(true) // 默认不插入新文档
|
||||||
|
// 处理选项参数
|
||||||
|
if len(opts) > 0 {
|
||||||
|
for _, opt := range opts {
|
||||||
|
var updateOpts options.UpdateManyOptions
|
||||||
|
optFuncs := opt.List()
|
||||||
|
for _, fn := range optFuncs {
|
||||||
|
fn(&updateOpts)
|
||||||
|
}
|
||||||
|
if updateOpts.Upsert != nil {
|
||||||
|
updateModel.SetUpsert(*updateOpts.Upsert)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
models = append(models, updateModel)
|
||||||
|
}
|
||||||
|
// 执行批量操作,无序执行提高性能
|
||||||
|
bulkOpts := options.BulkWrite().SetOrdered(false)
|
||||||
|
bulkResult, err := db.Collection(collection).BulkWrite(ctx, models, bulkOpts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// 清理相关缓存
|
||||||
|
for i := 0; i < len(filter); i++ {
|
||||||
|
var tenantId any
|
||||||
|
if g.IsEmpty(filter[i]["tenantId"]) {
|
||||||
|
tenantId = filter[i]["tenantId"]
|
||||||
|
}
|
||||||
|
if g.IsEmpty(gconv.Map(update[i]["$set"])["tenantId"]) {
|
||||||
|
tenantId = gconv.Map(update[i]["$set"])["tenantId"]
|
||||||
|
}
|
||||||
|
err = commonMongo.DB().CleanRedis(ctx, filter[i], tenantId, collection)
|
||||||
|
if err != nil {
|
||||||
|
glog.Warning(ctx, "清理Redis缓存失败:", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return bulkResult, nil
|
||||||
|
}
|
||||||
@@ -26,10 +26,17 @@ func (d *tenantOssTotal) SaveOrUpdate(ctx context.Context, updateData []*entity.
|
|||||||
if !g.IsEmpty(updateData) {
|
if !g.IsEmpty(updateData) {
|
||||||
var filter, update []bson.M
|
var filter, update []bson.M
|
||||||
for _, v := range updateData {
|
for _, v := range updateData {
|
||||||
|
bsonm, err := mongo.EntityToBSONM(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
filter = append(filter, bson.M{"tenantId": v.TenantId})
|
filter = append(filter, bson.M{"tenantId": v.TenantId})
|
||||||
update = append(update, bson.M{"$set": bson.M{"usedOssSize": v.UsedOssSize, "totalOssSize": v.TotalOssSize}})
|
update = append(update, bson.M{"$set": bsonm})
|
||||||
|
}
|
||||||
|
_, err = MongoDAO.SaveOrUpdate(ctx, filter, update, consts.TenantOssTotalCollection)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
_, err = mongo.DB().SaveOrUpdate(ctx, filter, update, consts.TenantOssTotalCollection)
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
2
main.go
2
main.go
@@ -23,7 +23,7 @@ func main() {
|
|||||||
controller.File,
|
controller.File,
|
||||||
})
|
})
|
||||||
|
|
||||||
gtimer.AddSingleton(ctx, time.Minute*5, func(ctx context.Context) {
|
gtimer.AddSingleton(ctx, time.Minute*1, func(ctx context.Context) {
|
||||||
err := service.TenantOssTotal.UpdateUsedOssSize(ctx)
|
err := service.TenantOssTotal.UpdateUsedOssSize(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Error(ctx, "UpdateUsedOssSize err: %v", err)
|
glog.Error(ctx, "UpdateUsedOssSize err: %v", err)
|
||||||
|
|||||||
@@ -16,3 +16,10 @@ type UploadFileRes struct {
|
|||||||
FileURL string `json:"fileURL" dc:"上传地址"`
|
FileURL string `json:"fileURL" dc:"上传地址"`
|
||||||
FileAddressPrefix string `json:"fileAddressPrefix"`
|
FileAddressPrefix string `json:"fileAddressPrefix"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TenantOssTotal struct {
|
||||||
|
TenantId string `json:"tenantId"`
|
||||||
|
UsedOssSize int `json:"usedOssSize"`
|
||||||
|
TotalOssSize int `json:"totalOssSize"`
|
||||||
|
Updater string `json:"updater"`
|
||||||
|
}
|
||||||
|
|||||||
@@ -9,8 +9,8 @@ import (
|
|||||||
type File struct {
|
type File struct {
|
||||||
do.MongoBaseDO `bson:",inline"` // 嵌入基础字段:Id, Creator, CreatedAt, Updater, UpdatedAt, TenantId, IsDeleted
|
do.MongoBaseDO `bson:",inline"` // 嵌入基础字段:Id, Creator, CreatedAt, Updater, UpdatedAt, TenantId, IsDeleted
|
||||||
// 基础信息
|
// 基础信息
|
||||||
FileURL string `bson:"fileURL" json:"fileURL"` // 图URL
|
FileURL string `bson:"fileURL" json:"fileURL"` // 图URL
|
||||||
FileSize byte `bson:"fileSize" json:"fileSize"`
|
FileSize int `bson:"fileSize" json:"fileSize"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// CollectionName 存储集合名称
|
// CollectionName 存储集合名称
|
||||||
|
|||||||
@@ -9,8 +9,8 @@ import (
|
|||||||
type TenantOssTotal struct {
|
type TenantOssTotal struct {
|
||||||
do.MongoBaseDO `bson:",inline"` // 嵌入基础字段:Id, Creator, CreatedAt, Updater, UpdatedAt, TenantId, IsDeleted
|
do.MongoBaseDO `bson:",inline"` // 嵌入基础字段:Id, Creator, CreatedAt, Updater, UpdatedAt, TenantId, IsDeleted
|
||||||
// 基础信息
|
// 基础信息
|
||||||
UsedOssSize int64 `bson:"usedOssSize" json:"usedOssSize"`
|
UsedOssSize int `bson:"usedOssSize" json:"usedOssSize"`
|
||||||
TotalOssSize int64 `bson:"totalOssSize" json:"totalOssSize"`
|
TotalOssSize int `bson:"totalOssSize" json:"totalOssSize"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// CollectionName 租户储存服务总计集合名称
|
// CollectionName 租户储存服务总计集合名称
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package service
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/gogf/gf/v2/os/glog"
|
||||||
"oss/consts"
|
"oss/consts"
|
||||||
"oss/dao"
|
"oss/dao"
|
||||||
"oss/model/dto"
|
"oss/model/dto"
|
||||||
@@ -23,11 +24,12 @@ type file struct{}
|
|||||||
var File = new(file)
|
var File = new(file)
|
||||||
|
|
||||||
func (f *file) UploadFile(ctx context.Context, req *dto.UploadFileReq) (res *dto.UploadFileRes, err error) {
|
func (f *file) UploadFile(ctx context.Context, req *dto.UploadFileReq) (res *dto.UploadFileRes, err error) {
|
||||||
fileSize := gconv.Byte(req.File.Size)
|
fileSize := gconv.Int(req.File.Size)
|
||||||
totalFileSize := int64(0)
|
totalFileSize := 0
|
||||||
// 获取租户id
|
// 获取租户id
|
||||||
user, err := utils.GetUserInfo(ctx)
|
user, err := utils.GetUserInfo(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
glog.Errorf(ctx, "获取用户信息失败: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
tenantId := gconv.String(user.TenantId)
|
tenantId := gconv.String(user.TenantId)
|
||||||
@@ -40,6 +42,7 @@ func (f *file) UploadFile(ctx context.Context, req *dto.UploadFileReq) (res *dto
|
|||||||
// 获取redis-租户存储容量总数
|
// 获取redis-租户存储容量总数
|
||||||
get, err := redis.RedisClient.Get(ctx, tenantOssTotalKey)
|
get, err := redis.RedisClient.Get(ctx, tenantOssTotalKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
glog.Errorf(ctx, "获取redis-租户存储容量总数失败: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
tenantOssTotalEntity := &entity.TenantOssTotal{}
|
tenantOssTotalEntity := &entity.TenantOssTotal{}
|
||||||
@@ -50,33 +53,39 @@ func (f *file) UploadFile(ctx context.Context, req *dto.UploadFileReq) (res *dto
|
|||||||
}
|
}
|
||||||
tenantOssTotal, err := TenantOssTotal.GetOneByTenantId(ctx, getByTenantIdReq)
|
tenantOssTotal, err := TenantOssTotal.GetOneByTenantId(ctx, getByTenantIdReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
glog.Errorf(ctx, "查询数据库-获取租户存储容量总数失败: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if tenantOssTotal.Id.IsZero() {
|
if tenantOssTotal.Id.IsZero() {
|
||||||
tenantOssTotalEntity.TenantId = user.TenantId
|
tenantOssTotalEntity.TenantId = user.TenantId
|
||||||
tenantOssTotalEntity.UsedOssSize = int64(0)
|
tenantOssTotalEntity.UsedOssSize = 0
|
||||||
tenantOssTotalEntity.TotalOssSize = g.Cfg().MustGet(ctx, "oss.capacitySize").Int64()
|
tenantOssTotalEntity.TotalOssSize = g.Cfg().MustGet(ctx, "oss.capacitySize").Int() * 1024 * 1024 * 1024
|
||||||
} else {
|
} else {
|
||||||
tenantOssTotalEntity = tenantOssTotal.TenantOssTotal
|
tenantOssTotalEntity = tenantOssTotal.TenantOssTotal
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// 反序列化-redis获取租户存储容量总数
|
// 反序列化-redis获取租户存储容量总数
|
||||||
err = gconv.Struct(get, tenantOssTotalEntity)
|
if err = gconv.Struct(get, tenantOssTotalEntity); err != nil {
|
||||||
if err != nil {
|
glog.Errorf(ctx, "反序列化-redis获取租户存储容量总数失败: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tenantId = gconv.String(tenantOssTotalEntity.TenantId)
|
tenantId = gconv.String(tenantOssTotalEntity.TenantId)
|
||||||
fileSize = gconv.Byte(tenantOssTotalEntity.UsedOssSize) + fileSize
|
fileSize = tenantOssTotalEntity.UsedOssSize + fileSize
|
||||||
totalFileSize = tenantOssTotalEntity.TotalOssSize
|
totalFileSize = tenantOssTotalEntity.TotalOssSize
|
||||||
// 设置redis-租户存储容量总数
|
// 设置redis-租户存储容量总数
|
||||||
tenantOssTotalKeyMap := map[string]interface{}{"tenantId": tenantId, "UsedOssSize": fileSize, "TotalOssSize": totalFileSize}
|
tenantOssTotalKeyMap := dto.TenantOssTotal{
|
||||||
|
TenantId: tenantId,
|
||||||
|
UsedOssSize: fileSize,
|
||||||
|
TotalOssSize: totalFileSize,
|
||||||
|
Updater: gconv.String(user.UserName),
|
||||||
|
}
|
||||||
// 修改redis-租户存储容量总数 超时时间10分钟
|
// 修改redis-租户存储容量总数 超时时间10分钟
|
||||||
err = redis.RedisClient.SetEX(ctx, tenantOssTotalKey, tenantOssTotalKeyMap, gconv.Int64(time.Minute*10))
|
if err = redis.RedisClient.SetEX(ctx, tenantOssTotalKey, tenantOssTotalKeyMap, gconv.Int64(time.Minute*10)); err != nil {
|
||||||
if err != nil {
|
glog.Errorf(ctx, "修改redis-租户存储容量总数 超时时间10分钟失败: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if fileSize > gconv.Byte(totalFileSize) {
|
if fileSize > totalFileSize {
|
||||||
return gerror.New("存储服务内存不足")
|
return gerror.New("存储服务内存不足")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -88,8 +97,9 @@ func (f *file) UploadFile(ctx context.Context, req *dto.UploadFileReq) (res *dto
|
|||||||
return nil, gerror.New("存储服务内存不足")
|
return nil, gerror.New("存储服务内存不足")
|
||||||
}
|
}
|
||||||
// 上传图片
|
// 上传图片
|
||||||
fileURL, err := minio.UploadImage(ctx, req.File)
|
fileURL, err := minio.UploadFile(ctx, req.File)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
glog.Errorf(ctx, "上传图片失败: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// 插入数据库
|
// 插入数据库
|
||||||
@@ -104,6 +114,6 @@ func (f *file) UploadFile(ctx context.Context, req *dto.UploadFileReq) (res *dto
|
|||||||
// 返回图片url
|
// 返回图片url
|
||||||
return &dto.UploadFileRes{
|
return &dto.UploadFileRes{
|
||||||
FileURL: fileURL,
|
FileURL: fileURL,
|
||||||
FileAddressPrefix: minio.GetImgAddressPrefix(ctx),
|
FileAddressPrefix: minio.GetIFileAddressPrefix(ctx),
|
||||||
}, err
|
}, err
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user