// ============================================================================= // MongoDB 业务操作封装 // 提供向后兼容的CRUD操作方法,支持多数据源 // ============================================================================= package mongo import ( "context" "errors" "fmt" "time" "gitea.com/red-future/common/log/consts" "go.mongodb.org/mongo-driver/v2/event" "gitea.com/red-future/common/beans" "gitea.com/red-future/common/log/model/entity" "gitea.com/red-future/common/utils" "github.com/gogf/gf/v2/container/gvar" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/grpool" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" ) // Redis 数据缓存 Key 常量 const ( CleanList = "list:tenantId-%v:collection-%s:*" // 清理列表Key CleanCount = "count:tenantId-%v:collection-%s:*" // 清理计数Key List = "list:tenantId-%v:collection-%s:filter:%s:options:%s" // 列表查询Key Count = "count:tenantId-%v:collection-%s:filter:%s" // 计数查询Key One = "one:tenantId-%v:collection-%s:filter:%s" // 单条查询Key ) // ============================================================================= // 向后兼容的MongoDB结构体 // ============================================================================= type mongoDB struct { noCache bool dataSource string // 数据源名称,默认为 "default" noTenantId bool // 是否跳过租户过滤 } func DB(cache ...bool) *mongoDB { return &mongoDB{ noCache: false, dataSource: "default", noTenantId: false, } } // commandMonitor 命令监控器 func commandMonitor() *event.CommandMonitor { return &event.CommandMonitor{ // 命令执行前触发 Started: func(ctx context.Context, evt *event.CommandStartedEvent) { // 1. 安全获取集合名:先判断字段是否存在,避免空值调用 collectionName := "无" // 默认值 collectionVal := evt.Command.Lookup("collection") if !g.IsEmpty(collectionVal) { // 先检查是否为nil collectionName = collectionVal.StringValue() } // 2. 打印标准化日志(避免字段缺失导致的格式错乱) fmt.Printf("[%s] 开始执行命令 | 数据库: %s | 集合: %s | 命令: %+v\n", time.Now().Format("2006-01-02 15:04:05"), evt.DatabaseName, collectionName, evt.Command, ) // 也可以在这里添加:参数校验、权限检查、链路追踪埋点等 // 例如:将开始时间存入ctx,供后续结束时计算耗时 ctx = context.WithValue(ctx, "cmd_start_time", time.Now()) }, // 命令执行成功后触发 Succeeded: func(ctx context.Context, evt *event.CommandSucceededEvent) { // 从ctx中获取开始时间,计算执行耗时 startTime, ok := ctx.Value("cmd_start_time").(time.Time) if ok { elapsed := time.Since(startTime) fmt.Printf("[%s] 命令执行成功 | 耗时: %s | 结果: %+v\n", time.Now().Format("2006-01-02 15:04:05"), elapsed, evt.Reply, ) } // 也可以在这里添加:日志入库、性能指标上报、结果校验等 }, // 命令执行失败后触发 Failed: func(ctx context.Context, evt *event.CommandFailedEvent) { fmt.Printf("[%s] 命令执行失败 | 错误: %s | 耗时: %s\n", time.Now().Format("2006-01-02 15:04:05"), evt.Failure, evt.Duration, ) // 也可以在这里添加:错误告警、重试逻辑、异常日志记录等 }, } } // WithDataSource 指定使用的数据源 func (m *mongoDB) WithDataSource(name string) *mongoDB { m.dataSource = name return m } // NoCache 不使用缓存 func (m *mongoDB) NoCache() *mongoDB { m.noCache = true return m } // NoTenantId 不使用租户过滤 func (m *mongoDB) NoTenantId() *mongoDB { m.noTenantId = true return m } // ============================================================================= // 向后兼容的全局变量和方法 // ============================================================================= var ( manager = GetManager() logPool *grpool.Pool serverName string LogRedisKey string ) // FieldInfo 定义字段信息结构体 type FieldInfo struct { FieldName string FieldValue interface{} } const PageSize = 20 // GetDB 获取默认数据源的数据库实例(向后兼容) func GetDB() *mongo.Database { source, err := manager.GetDataSource("default") if err != nil { return nil } return source.Database() } // ============================================================================= // MongoDB 操作方法(支持多数据源) // ============================================================================= // getDataSource 获取当前使用的数据源 func (m *mongoDB) getDataSource() (DataSource, error) { if m.dataSource == "" { m.dataSource = "default" } return manager.GetDataSource(m.dataSource) } // Count 查询总数 func (m *mongoDB) Count(ctx context.Context, filter bson.M, collection string) (count int64, err error) { source, err := m.getDataSource() if err != nil { return 0, err } db := source.Database() user, err := utils.GetUserInfo(ctx) if err != nil { return } filter["isDeleted"] = false delete(filter, "tenantId") filterKey := fmt.Sprintf("%+v", filter) redisKey := fmt.Sprintf(Count, user.TenantId, collection, filterKey) if !m.noCache { var resultStr *gvar.Var resultStr, err = g.Redis().Get(ctx, redisKey) if err != nil { return } if !g.IsEmpty(resultStr) { count = gconv.Int64(resultStr) return } } // 如果没有调用 noTenantId,则添加 tenantId 过滤 if !m.noTenantId && !g.IsEmpty(user.TenantId) { filter["tenantId"] = user.TenantId } count, err = db.Collection(collection).CountDocuments(ctx, filter) if !m.noCache { err = g.Redis().SetEX(ctx, redisKey, count, int64(time.Hour)) if err != nil { return } } return } // Find 查询多条记录 func (m *mongoDB) Find(ctx context.Context, filter bson.M, result interface{}, collection string, page *beans.Page, orderBy []beans.OrderBy) (total int64, err error) { source, err := m.getDataSource() if err != nil { return 0, err } db := source.Database() if err = utils.ValidStructPtr(result); err != nil { return } user, err := utils.GetUserInfo(ctx) if err != nil { return } if g.IsEmpty(filter["isDeleted"]) { filter["isDeleted"] = false } filterKey := fmt.Sprintf("%+v", filter) optionsKey := fmt.Sprintf("%+v%+v", page, orderBy) redisKey := fmt.Sprintf(List, user.TenantId, collection, filterKey, optionsKey) if !m.noCache { var resultStr *gvar.Var resultStr, err = g.Redis().Get(ctx, redisKey) if err != nil { return } if !resultStr.IsEmpty() { if err = resultStr.Structs(result); err != nil { return } total = int64(len(resultStr.Array())) return } } // 如果没有调用 noTenantId,则添加 tenantId 过滤 if !m.noTenantId && !g.IsEmpty(user.TenantId) { filter["tenantId"] = user.TenantId } limit := int64(PageSize) skip := int64(0) if page != nil && !g.IsEmpty(page.PageNum) && !g.IsEmpty(page.PageSize) { limit = page.PageSize if limit == -1 { skip = 0 } else { skip = (page.PageNum - 1) * limit } } opt := options.Find().SetSkip(skip) if limit != -1 { opt.SetLimit(limit) total, err = m.Count(ctx, filter, collection) if err != nil || total == 0 { return } } if orderBy == nil { opt.SetSort(bson.M{"createdAt": -1}) } else { orderBson := bson.D{} for _, v := range orderBy { if v.Order == beans.Asc { orderBson = append(orderBson, bson.E{Key: v.Field, Value: 1}) } else { orderBson = append(orderBson, bson.E{Key: v.Field, Value: -1}) } } opt.SetSort(orderBson) } cur, err := db.Collection(collection).Find(ctx, filter, opt) if err != nil { return } if limit == -1 { total = int64(cur.RemainingBatchLength()) } defer cur.Close(ctx) if err = cur.All(ctx, result); err != nil { return } if !m.noCache { err = g.Redis().SetEX(ctx, redisKey, result, int64(time.Hour)) if err != nil { return } } return } // FindOne 查询1条记录 func (m *mongoDB) FindOne(ctx context.Context, filter bson.M, result interface{}, collection string, opts ...options.Lister[options.FindOneOptions]) (err error) { source, err := m.getDataSource() if err != nil { return err } db := source.Database() if len(filter) == 0 { err = gerror.New("缺少查询条件") return } if err = utils.ValidStructPtr(result); err != nil { return } user, err := utils.GetUserInfo(ctx) if err != nil { return } filter["isDeleted"] = false filterKey := fmt.Sprintf("%+v", filter) redisKey := fmt.Sprintf(One, user.TenantId, collection, filterKey) if !m.noCache { var resultStr *gvar.Var resultStr, err = g.Redis().Get(ctx, redisKey) if err != nil { return } if !g.IsEmpty(resultStr) { err = gconv.Scan(resultStr, result) if err != nil { return err } return } } // 如果没有调用 noTenantId,则添加 tenantId 过滤 if !m.noTenantId && !g.IsEmpty(user.TenantId) { filter["tenantId"] = user.TenantId } cur := db.Collection(collection).FindOne(ctx, filter, opts...) err = cur.Decode(result) if errors.Is(err, mongo.ErrNoDocuments) { err = nil } if !m.noCache { err = g.Redis().SetEX(ctx, redisKey, result, int64(time.Hour)) if err != nil { return err } } return } // getDeletedData 获取要删除的数据 func (m *mongoDB) getDeletedData(ctx context.Context, filter bson.M, collection string) (deletedIDs []bson.ObjectID, deletedData []bson.M, err error) { // 查询要删除的数据 _, err = m.Find(ctx, filter, &deletedData, collection, nil, nil) // 从查询结果中获取 _id for _, doc := range deletedData { deletedIDs = append(deletedIDs, doc["_id"].(bson.ObjectID)) } return } func (m *mongoDB) CleanRedis(ctx context.Context, filter bson.M, tenantId interface{}, collection string) (err error) { listKeys := fmt.Sprintf(CleanList, tenantId, collection) keys, err := g.Redis().Keys(ctx, listKeys) if err != nil { return } for _, key := range keys { _, err = g.Redis().Del(ctx, key) if err != nil { return } } countKeys := fmt.Sprintf(CleanCount, tenantId, collection) keys, err = g.Redis().Keys(ctx, countKeys) if err != nil { return } for _, key := range keys { _, err = g.Redis().Del(ctx, key) if err != nil { return } } filter["isDeleted"] = false delete(filter, "tenantId") filterKey := fmt.Sprintf("%+v", filter) oneKey := fmt.Sprintf(One, tenantId, collection, filterKey) _, err = g.Redis().Del(ctx, oneKey) if err != nil { return } return } func (m *mongoDB) log(ctx context.Context, ids []bson.ObjectID, filter bson.M, collection string, data interface{}, userName, tenantId interface{}, operationType consts.OperationType) { // 提前获取 IP 地址,避免异步任务执行时请求已结束 var ipAddress string if request := g.RequestFromCtx(ctx); request != nil { ipAddress = request.GetClientIp() } if operationType != consts.OperationInsert && operationType != consts.OperationDelete { if !g.IsEmpty(filter["_id"]) { objectID := filter["_id"].(*bson.ObjectID) ids = append(ids, *objectID) } else { var err error if ids, _, err = m.getDeletedData(ctx, filter, collection); err != nil { return } } } log := &entity.OperationLog{ ServiceName: serverName, Collection: collection, CollectionID: ids, Operation: string(operationType), IPAddress: ipAddress, Data: data, } log.Creator = userName log.Updater = userName now := >ime.Now().Time log.CreatedAt = now log.UpdatedAt = now log.TenantId = tenantId // 将结构体转换为 map values := gconv.Map(log) // XADD streamKey * field1 value1 field2 value2 ... args := make([]interface{}, 0, len(values)*2+2) args = append(args, LogRedisKey, "*") // "*" 自动生成ID for key, val := range values { args = append(args, key, val) } _, err := g.Redis().Do(ctx, "XADD", args...) if err != nil { glog.Error(ctx, "mongoLog-AddToStream err: %v", err) } return } // Insert 插入多条记录 func (m *mongoDB) Insert(ctx context.Context, documents []interface{}, collection string, opts ...options.Lister[options.InsertManyOptions]) (ids []interface{}, err error) { source, err := m.getDataSource() if err != nil { return nil, err } db := source.Database() user, err := utils.GetUserInfo(ctx) if err != nil { return } docs := make([]interface{}, 0, len(documents)) for _, document := range documents { doc := gconv.Map(document) delete(doc, "id") if !g.IsEmpty(user.UserName) && g.IsEmpty(doc["creator"]) { doc["creator"] = user.UserName } if !g.IsEmpty(user.UserName) && g.IsEmpty(doc["updater"]) { doc["updater"] = user.UserName } if !g.IsEmpty(user.TenantId) && g.IsEmpty(doc["tenantId"]) { doc["tenantId"] = user.TenantId } if g.IsEmpty(doc["createdAt"]) { doc["createdAt"] = gtime.Now().Time } if g.IsEmpty(doc["updatedAt"]) { doc["updatedAt"] = gtime.Now().Time } doc["isDeleted"] = false docs = append(docs, doc) } r, err := db.Collection(collection).InsertMany(ctx, docs, opts...) if err != nil { return } ids = r.InsertedIDs err = m.CleanRedis(ctx, bson.M{}, user.TenantId, collection) //写日志 if collection != consts.OperationLogCollection { objectIds := make([]bson.ObjectID, 0) for _, id := range ids { objectIds = append(objectIds, id.(bson.ObjectID)) } m.log(ctx, objectIds, nil, collection, nil, user.UserName, user.TenantId, consts.OperationInsert) } return } // Delete 删除记录 func (m *mongoDB) Delete(ctx context.Context, filter bson.M, collection string, opts ...options.Lister[options.DeleteManyOptions]) (count int64, err error) { source, err := m.getDataSource() if err != nil { return 0, err } db := source.Database() if len(filter) == 0 { err = gerror.New("缺少查询条件") return } user, err := utils.GetUserInfo(ctx) if err != nil { return } filter["tenantId"] = user.TenantId // 获取要删除的数据 ds, ms, err := m.getDeletedData(ctx, filter, collection) if err != nil { return } // 执行删除操作 r, err := db.Collection(collection).DeleteMany(ctx, filter, opts...) if err != nil { return } count = r.DeletedCount // 清理redis err = m.CleanRedis(ctx, filter, user.TenantId, collection) // 写日志 m.log(ctx, ds, nil, collection, ms, user.UserName, user.TenantId, consts.OperationDelete) return } // DeleteSoft 假删除记录 func (m *mongoDB) DeleteSoft(ctx context.Context, filter bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (modifiedCount int64, err error) { update := bson.M{"$set": bson.M{"isDeleted": true}} return m.Update(ctx, filter, update, collection, opts...) } // Update 修改记录 func (m *mongoDB) Update(ctx context.Context, filter bson.M, update bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (modifiedCount int64, err error) { source, err := m.getDataSource() if err != nil { return 0, err } db := source.Database() if len(filter) == 0 { err = gerror.New("缺少查询条件") return } filter["isDeleted"] = false user, err := utils.GetUserInfo(ctx) if err != nil { return } if !g.IsEmpty(user.TenantId) { filter["tenantId"] = user.TenantId } // 遍历 update 中的所有操作符和字段,存放到 list 中 fieldList := make([]FieldInfo, 0) for _, doc := range update { if m, ok := doc.(bson.M); ok { for fieldName, fieldValue := range m { // 获取到字段名和字段值 fieldList = append(fieldList, FieldInfo{ FieldName: fieldName, FieldValue: fieldValue, }) } } } setDoc := bson.M{} if !g.IsEmpty(update["$set"]) { setDoc = update["$set"].(bson.M) } if !g.IsEmpty(user.UserName) { setDoc["updater"] = user.UserName } setDoc["updatedAt"] = gtime.Now().Time update["$set"] = setDoc result, err := db.Collection(collection).UpdateMany(ctx, filter, update, opts...) if err != nil { return } modifiedCount = result.ModifiedCount // 清理redis err = m.CleanRedis(ctx, filter, user.TenantId, collection) // 写日志 if !g.IsEmpty(setDoc["isDeleted"]) && gconv.Bool(setDoc["isDeleted"]) { filter["isDeleted"] = true m.log(ctx, nil, filter, collection, nil, user.UserName, user.TenantId, consts.OperationDeleteSoft) } else { m.log(ctx, nil, filter, collection, fieldList, user.UserName, user.TenantId, consts.OperationUpdate) } return } // SaveOrUpdate 批量增加或修改 func (m *mongoDB) SaveOrUpdate(ctx context.Context, filter []bson.M, update []bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (result *mongo.BulkWriteResult, err error) { source, err := m.getDataSource() if err != nil { return nil, err } db := source.Database() if len(filter) == 0 || len(update) == 0 { err = gerror.New("缺少查询条件或更新数据") return } if len(filter) != len(update) { err = gerror.New("查询条件和更新数据的数量必须一致") return } user, err := utils.GetUserInfo(ctx) if err != nil { return } var models []mongo.WriteModel for i := 0; i < len(filter); i++ { filter[i]["isDeleted"] = false if !g.IsEmpty(user.TenantId) { filter[i]["tenantId"] = user.TenantId } if setDoc, exists := update[i]["$set"].(bson.M); exists { if !g.IsEmpty(user.UserName) { setDoc["updater"] = user.UserName } setDoc["updatedAt"] = gtime.Now().Time } else { setDoc := bson.M{} if !g.IsEmpty(user.UserName) { setDoc["updater"] = user.UserName } setDoc["updatedAt"] = gtime.Now().Time update[i]["$set"] = setDoc } updateModel := mongo.NewUpdateOneModel() updateModel.SetFilter(filter[i]) updateModel.SetUpdate(update[i]) updateModel.SetUpsert(true) if len(opts) > 0 { for _, opt := range opts { var updateOpts options.UpdateManyOptions optFuncs := opt.List() for _, fn := range optFuncs { fn(&updateOpts) } if updateOpts.Upsert != nil { updateModel.SetUpsert(*updateOpts.Upsert) } } } models = append(models, updateModel) } bulkOpts := options.BulkWrite().SetOrdered(false) bulkResult, err := db.Collection(collection).BulkWrite(ctx, models, bulkOpts) if err != nil { return nil, err } for _, filterItem := range filter { err = m.CleanRedis(ctx, filterItem, user.TenantId, collection) if err != nil { glog.Warning(ctx, "清理Redis缓存失败:", err) } } return bulkResult, nil } func BuildUpdateData(ctx context.Context, req interface{}) (filter bson.M, err error) { _ = ctx filter = bson.M{} reqMap := gconv.Map(req) for mk, mv := range reqMap { if mk != "id" && !g.IsEmpty(mv) { filter[mk] = mv } } return }