From 17e7ae0437bc907da5fad9dc500b026b8c4f8b34 Mon Sep 17 00:00:00 2001 From: qhd <1766646056@qq.com> Date: Tue, 30 Dec 2025 18:45:37 +0800 Subject: [PATCH] =?UTF-8?q?oss=E6=96=87=E4=BB=B6=E5=AD=98=E5=82=A8?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1-=E7=A7=9F=E6=88=B7=E5=AD=98=E5=82=A8?= =?UTF-8?q?=E7=A9=BA=E9=97=B4=E4=BF=A1=E6=81=AF=E5=90=8C=E6=AD=A5=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.yml | 4 +- dao/mongo_dao.go | 95 ++++++++++++++++++++++++++++++++ dao/tenant_oss_total.go | 11 +++- main.go | 2 +- model/dto/file_dto.go | 7 +++ model/entity/file.go | 4 +- model/entity/tenant_oss_total.go | 4 +- service/file_service.go | 36 +++++++----- 8 files changed, 140 insertions(+), 23 deletions(-) create mode 100644 dao/mongo_dao.go diff --git a/config.yml b/config.yml index 56198f0..9f37e8e 100644 --- a/config.yml +++ b/config.yml @@ -34,9 +34,7 @@ minio: secretKey: "infini_rag_flow" # 秘密密钥(本地默认) secure: false # 本地 MinIO 关闭 SSL(生产按需改为 true) region: "us-east-1" # 与 MinIO 服务端 REGION 一致(默认 us-east-1) - bucketName: "my-dev-bucket" # 默认桶名(可选) - presignedExpire: "5m" # 预签名URL过期时间(≥1秒,支持m/h/d等单位) # 文件存储初始化容量大小配置 oss: - capacitySize: 500 \ No newline at end of file + capacitySize: 10 #文件存储初始化容量(单位G) \ No newline at end of file diff --git a/dao/mongo_dao.go b/dao/mongo_dao.go new file mode 100644 index 0000000..7417f92 --- /dev/null +++ b/dao/mongo_dao.go @@ -0,0 +1,95 @@ +package dao + +import ( + "context" + commonMongo "gitee.com/red-future---jilin-g/common/mongo" + "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/glog" + "github.com/gogf/gf/v2/os/gtime" + "github.com/gogf/gf/v2/util/gconv" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" +) + +// MongoDAO MongoDB原生查询(不需要token验证) +var MongoDAO = &mongoDAO{} + +type mongoDAO struct{} + +// SaveOrUpdate 原生批量增加或修改 +func (d *mongoDAO) SaveOrUpdate(ctx context.Context, filter []bson.M, update []bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (result *mongo.BulkWriteResult, err error) { + db := commonMongo.GetDB() + if len(filter) == 0 || len(update) == 0 { + err = gerror.New("缺少查询条件或更新数据") + return + } + if len(filter) != len(update) { + err = gerror.New("查询条件和更新数据的数量必须一致") + return + } + // 构建批量操作模型 + var models []mongo.WriteModel + for i := 0; i < len(filter); i++ { + if g.IsEmpty(filter[i]["tenantId"]) && g.IsEmpty(gconv.Map(update[i]["$set"])["updater"]) { + return nil, gerror.New("tenantId不能为空") + } + if g.IsEmpty(filter[i]["updater"]) && g.IsEmpty(gconv.Map(update[i]["$set"])["updater"]) { + return nil, gerror.New("updater不能为空") + } + // 处理过滤器 + filter[i]["isDeleted"] = false + // 处理更新数据 + if setDoc, exists := update[i]["$set"].(bson.M); exists { + setDoc["updater"] = gconv.Map(update[i]["$set"])["updater"] + setDoc["updatedAt"] = gtime.Now().Time + } else { + // 如果没有$set字段,则创建一个 + setDoc := bson.M{} + setDoc["updater"] = gconv.Map(update[i]["$set"])["updater"] + setDoc["updatedAt"] = gtime.Now().Time + update[i]["$set"] = setDoc + } + // 创建更新操作模型 + updateModel := mongo.NewUpdateOneModel() + updateModel.SetFilter(filter[i]) + updateModel.SetUpdate(update[i]) + updateModel.SetUpsert(true) // 默认不插入新文档 + // 处理选项参数 + if len(opts) > 0 { + for _, opt := range opts { + var updateOpts options.UpdateManyOptions + optFuncs := opt.List() + for _, fn := range optFuncs { + fn(&updateOpts) + } + if updateOpts.Upsert != nil { + updateModel.SetUpsert(*updateOpts.Upsert) + } + } + } + models = append(models, updateModel) + } + // 执行批量操作,无序执行提高性能 + bulkOpts := options.BulkWrite().SetOrdered(false) + bulkResult, err := db.Collection(collection).BulkWrite(ctx, models, bulkOpts) + if err != nil { + return nil, err + } + // 清理相关缓存 + for i := 0; i < len(filter); i++ { + var tenantId any + if g.IsEmpty(filter[i]["tenantId"]) { + tenantId = filter[i]["tenantId"] + } + if g.IsEmpty(gconv.Map(update[i]["$set"])["tenantId"]) { + tenantId = gconv.Map(update[i]["$set"])["tenantId"] + } + err = commonMongo.DB().CleanRedis(ctx, filter[i], tenantId, collection) + if err != nil { + glog.Warning(ctx, "清理Redis缓存失败:", err) + } + } + return bulkResult, nil +} diff --git a/dao/tenant_oss_total.go b/dao/tenant_oss_total.go index 9d7835f..9dbdedf 100644 --- a/dao/tenant_oss_total.go +++ b/dao/tenant_oss_total.go @@ -26,10 +26,17 @@ func (d *tenantOssTotal) SaveOrUpdate(ctx context.Context, updateData []*entity. if !g.IsEmpty(updateData) { var filter, update []bson.M for _, v := range updateData { + bsonm, err := mongo.EntityToBSONM(v) + if err != nil { + return err + } filter = append(filter, bson.M{"tenantId": v.TenantId}) - update = append(update, bson.M{"$set": bson.M{"usedOssSize": v.UsedOssSize, "totalOssSize": v.TotalOssSize}}) + update = append(update, bson.M{"$set": bsonm}) + } + _, err = MongoDAO.SaveOrUpdate(ctx, filter, update, consts.TenantOssTotalCollection) + if err != nil { + return err } - _, err = mongo.DB().SaveOrUpdate(ctx, filter, update, consts.TenantOssTotalCollection) } return } diff --git a/main.go b/main.go index 3e7e14e..6bc49bf 100644 --- a/main.go +++ b/main.go @@ -23,7 +23,7 @@ func main() { controller.File, }) - gtimer.AddSingleton(ctx, time.Minute*5, func(ctx context.Context) { + gtimer.AddSingleton(ctx, time.Minute*1, func(ctx context.Context) { err := service.TenantOssTotal.UpdateUsedOssSize(ctx) if err != nil { glog.Error(ctx, "UpdateUsedOssSize err: %v", err) diff --git a/model/dto/file_dto.go b/model/dto/file_dto.go index a367fde..f9c495c 100644 --- a/model/dto/file_dto.go +++ b/model/dto/file_dto.go @@ -16,3 +16,10 @@ type UploadFileRes struct { FileURL string `json:"fileURL" dc:"上传地址"` FileAddressPrefix string `json:"fileAddressPrefix"` } + +type TenantOssTotal struct { + TenantId string `json:"tenantId"` + UsedOssSize int `json:"usedOssSize"` + TotalOssSize int `json:"totalOssSize"` + Updater string `json:"updater"` +} diff --git a/model/entity/file.go b/model/entity/file.go index d9f8bf0..cf0cc10 100644 --- a/model/entity/file.go +++ b/model/entity/file.go @@ -9,8 +9,8 @@ import ( type File struct { do.MongoBaseDO `bson:",inline"` // 嵌入基础字段:Id, Creator, CreatedAt, Updater, UpdatedAt, TenantId, IsDeleted // 基础信息 - FileURL string `bson:"fileURL" json:"fileURL"` // 图URL - FileSize byte `bson:"fileSize" json:"fileSize"` + FileURL string `bson:"fileURL" json:"fileURL"` // 图URL + FileSize int `bson:"fileSize" json:"fileSize"` } // CollectionName 存储集合名称 diff --git a/model/entity/tenant_oss_total.go b/model/entity/tenant_oss_total.go index c7fc67e..7790e41 100644 --- a/model/entity/tenant_oss_total.go +++ b/model/entity/tenant_oss_total.go @@ -9,8 +9,8 @@ import ( type TenantOssTotal struct { do.MongoBaseDO `bson:",inline"` // 嵌入基础字段:Id, Creator, CreatedAt, Updater, UpdatedAt, TenantId, IsDeleted // 基础信息 - UsedOssSize int64 `bson:"usedOssSize" json:"usedOssSize"` - TotalOssSize int64 `bson:"totalOssSize" json:"totalOssSize"` + UsedOssSize int `bson:"usedOssSize" json:"usedOssSize"` + TotalOssSize int `bson:"totalOssSize" json:"totalOssSize"` } // CollectionName 租户储存服务总计集合名称 diff --git a/service/file_service.go b/service/file_service.go index 1b4aef4..a74c1e1 100644 --- a/service/file_service.go +++ b/service/file_service.go @@ -3,6 +3,7 @@ package service import ( "context" "fmt" + "github.com/gogf/gf/v2/os/glog" "oss/consts" "oss/dao" "oss/model/dto" @@ -23,11 +24,12 @@ type file struct{} var File = new(file) func (f *file) UploadFile(ctx context.Context, req *dto.UploadFileReq) (res *dto.UploadFileRes, err error) { - fileSize := gconv.Byte(req.File.Size) - totalFileSize := int64(0) + fileSize := gconv.Int(req.File.Size) + totalFileSize := 0 // 获取租户id user, err := utils.GetUserInfo(ctx) if err != nil { + glog.Errorf(ctx, "获取用户信息失败: %v", err) return } tenantId := gconv.String(user.TenantId) @@ -40,6 +42,7 @@ func (f *file) UploadFile(ctx context.Context, req *dto.UploadFileReq) (res *dto // 获取redis-租户存储容量总数 get, err := redis.RedisClient.Get(ctx, tenantOssTotalKey) if err != nil { + glog.Errorf(ctx, "获取redis-租户存储容量总数失败: %v", err) return err } tenantOssTotalEntity := &entity.TenantOssTotal{} @@ -50,33 +53,39 @@ func (f *file) UploadFile(ctx context.Context, req *dto.UploadFileReq) (res *dto } tenantOssTotal, err := TenantOssTotal.GetOneByTenantId(ctx, getByTenantIdReq) if err != nil { + glog.Errorf(ctx, "查询数据库-获取租户存储容量总数失败: %v", err) return err } if tenantOssTotal.Id.IsZero() { tenantOssTotalEntity.TenantId = user.TenantId - tenantOssTotalEntity.UsedOssSize = int64(0) - tenantOssTotalEntity.TotalOssSize = g.Cfg().MustGet(ctx, "oss.capacitySize").Int64() + tenantOssTotalEntity.UsedOssSize = 0 + tenantOssTotalEntity.TotalOssSize = g.Cfg().MustGet(ctx, "oss.capacitySize").Int() * 1024 * 1024 * 1024 } else { tenantOssTotalEntity = tenantOssTotal.TenantOssTotal } } else { // 反序列化-redis获取租户存储容量总数 - err = gconv.Struct(get, tenantOssTotalEntity) - if err != nil { + if err = gconv.Struct(get, tenantOssTotalEntity); err != nil { + glog.Errorf(ctx, "反序列化-redis获取租户存储容量总数失败: %v", err) return err } } tenantId = gconv.String(tenantOssTotalEntity.TenantId) - fileSize = gconv.Byte(tenantOssTotalEntity.UsedOssSize) + fileSize + fileSize = tenantOssTotalEntity.UsedOssSize + fileSize totalFileSize = tenantOssTotalEntity.TotalOssSize // 设置redis-租户存储容量总数 - tenantOssTotalKeyMap := map[string]interface{}{"tenantId": tenantId, "UsedOssSize": fileSize, "TotalOssSize": totalFileSize} + tenantOssTotalKeyMap := dto.TenantOssTotal{ + TenantId: tenantId, + UsedOssSize: fileSize, + TotalOssSize: totalFileSize, + Updater: gconv.String(user.UserName), + } // 修改redis-租户存储容量总数 超时时间10分钟 - err = redis.RedisClient.SetEX(ctx, tenantOssTotalKey, tenantOssTotalKeyMap, gconv.Int64(time.Minute*10)) - if err != nil { + if err = redis.RedisClient.SetEX(ctx, tenantOssTotalKey, tenantOssTotalKeyMap, gconv.Int64(time.Minute*10)); err != nil { + glog.Errorf(ctx, "修改redis-租户存储容量总数 超时时间10分钟失败: %v", err) return err } - if fileSize > gconv.Byte(totalFileSize) { + if fileSize > totalFileSize { return gerror.New("存储服务内存不足") } return nil @@ -88,8 +97,9 @@ func (f *file) UploadFile(ctx context.Context, req *dto.UploadFileReq) (res *dto return nil, gerror.New("存储服务内存不足") } // 上传图片 - fileURL, err := minio.UploadImage(ctx, req.File) + fileURL, err := minio.UploadFile(ctx, req.File) if err != nil { + glog.Errorf(ctx, "上传图片失败: %v", err) return nil, err } // 插入数据库 @@ -104,6 +114,6 @@ func (f *file) UploadFile(ctx context.Context, req *dto.UploadFileReq) (res *dto // 返回图片url return &dto.UploadFileRes{ FileURL: fileURL, - FileAddressPrefix: minio.GetImgAddressPrefix(ctx), + FileAddressPrefix: minio.GetIFileAddressPrefix(ctx), }, err }