From a88ed6e4f48ae208fe5b62d8fe2fdcc14a474eca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=96=8C?= <259278618@qq.com> Date: Wed, 4 Feb 2026 10:35:20 +0800 Subject: [PATCH] .gitignore --- mongo/connection.go | 5 +-- mongo/mongo.go | 84 +++++++++++++++++++++++++++++++++++---------- 2 files changed, 69 insertions(+), 20 deletions(-) diff --git a/mongo/connection.go b/mongo/connection.go index a46a559..b58727b 100644 --- a/mongo/connection.go +++ b/mongo/connection.go @@ -8,7 +8,6 @@ package mongo import ( "context" "fmt" - "gitee.com/red-future---jilin-g/common/log/consts" "os" "os/signal" "strings" @@ -16,6 +15,8 @@ import ( "syscall" "time" + "gitee.com/red-future---jilin-g/common/log/consts" + "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/grpool" @@ -120,7 +121,7 @@ func (d *BaseDataSource) Connect(ctx context.Context) error { SetHeartbeatInterval(10 * time.Second). SetMaxConnIdleTime(60 * time.Second). SetRetryWrites(true). - SetRetryReads(true) + SetRetryReads(true).SetMonitor(commandMonitor()) var err error d.client, err = mongo.Connect(opt) diff --git a/mongo/mongo.go b/mongo/mongo.go index b0426d7..a7f30f8 100644 --- a/mongo/mongo.go +++ b/mongo/mongo.go @@ -12,6 +12,7 @@ import ( "time" "gitee.com/red-future---jilin-g/common/log/consts" + "go.mongodb.org/mongo-driver/v2/event" "gitee.com/red-future---jilin-g/common/beans" "gitee.com/red-future---jilin-g/common/log/model/entity" @@ -33,34 +34,81 @@ import ( // 向后兼容的MongoDB结构体 // ============================================================================= -type MongoDB struct { +type mongoDB struct { noCache bool dataSource string // 数据源名称,默认为 "default" noTenantId bool // 是否跳过租户过滤 } -func DB(cache ...bool) *MongoDB { - return &MongoDB{ +func DB(cache ...bool) *mongoDB { + return &mongoDB{ noCache: false, dataSource: "default", noTenantId: false, } } +// commandMonitor 命令监控器 +func commandMonitor() *event.CommandMonitor { + return &event.CommandMonitor{ + // 命令执行前触发 + Started: func(ctx context.Context, evt *event.CommandStartedEvent) { + // 执行前的处理逻辑示例:记录开始时间、打印执行的命令 + fmt.Printf("[%s] 开始执行命令 | 数据库: %s | 集合: %s | 命令: %+v\n", + time.Now().Format("2006-01-02 15:04:05"), + evt.DatabaseName, + evt.Command.Lookup("collection").StringValue(), // 获取集合名 + evt.Command, + ) + + // 也可以在这里添加:参数校验、权限检查、链路追踪埋点等 + // 例如:将开始时间存入ctx,供后续结束时计算耗时 + ctx = context.WithValue(ctx, "cmd_start_time", time.Now()) + }, + + // 命令执行成功后触发 + Succeeded: func(ctx context.Context, evt *event.CommandSucceededEvent) { + // 从ctx中获取开始时间,计算执行耗时 + startTime, ok := ctx.Value("cmd_start_time").(time.Time) + if ok { + elapsed := time.Since(startTime) + fmt.Printf("[%s] 命令执行成功 | 耗时: %s | 结果: %+v\n", + time.Now().Format("2006-01-02 15:04:05"), + elapsed, + evt.Reply, + ) + } + + // 也可以在这里添加:日志入库、性能指标上报、结果校验等 + }, + + // 命令执行失败后触发 + Failed: func(ctx context.Context, evt *event.CommandFailedEvent) { + fmt.Printf("[%s] 命令执行失败 | 错误: %s | 耗时: %s\n", + time.Now().Format("2006-01-02 15:04:05"), + evt.Failure, + evt.Duration, + ) + + // 也可以在这里添加:错误告警、重试逻辑、异常日志记录等 + }, + } +} + // WithDataSource 指定使用的数据源 -func (m *MongoDB) WithDataSource(name string) *MongoDB { +func (m *mongoDB) WithDataSource(name string) *mongoDB { m.dataSource = name return m } // NoCache 不使用缓存 -func (m *MongoDB) NoCache() *MongoDB { +func (m *mongoDB) NoCache() *mongoDB { m.noCache = true return m } // NoTenantId 不使用租户过滤 -func (m *MongoDB) NoTenantId() *MongoDB { +func (m *mongoDB) NoTenantId() *mongoDB { m.noTenantId = true return m } @@ -98,7 +146,7 @@ func GetDB() *mongo.Database { // ============================================================================= // getDataSource 获取当前使用的数据源 -func (m *MongoDB) getDataSource() (DataSource, error) { +func (m *mongoDB) getDataSource() (DataSource, error) { if m.dataSource == "" { m.dataSource = "default" } @@ -106,7 +154,7 @@ func (m *MongoDB) getDataSource() (DataSource, error) { } // Count 查询总数 -func (m *MongoDB) Count(ctx context.Context, filter bson.M, collection string) (count int64, err error) { +func (m *mongoDB) Count(ctx context.Context, filter bson.M, collection string) (count int64, err error) { source, err := m.getDataSource() if err != nil { return 0, err @@ -147,7 +195,7 @@ func (m *MongoDB) Count(ctx context.Context, filter bson.M, collection string) ( } // Find 查询多条记录 -func (m *MongoDB) Find(ctx context.Context, filter bson.M, result interface{}, collection string, page *beans.Page, orderBy []beans.OrderBy) (total int64, err error) { +func (m *mongoDB) Find(ctx context.Context, filter bson.M, result interface{}, collection string, page *beans.Page, orderBy []beans.OrderBy) (total int64, err error) { source, err := m.getDataSource() if err != nil { return 0, err @@ -238,7 +286,7 @@ func (m *MongoDB) Find(ctx context.Context, filter bson.M, result interface{}, c } // FindOne 查询1条记录 -func (m *MongoDB) FindOne(ctx context.Context, filter bson.M, result interface{}, collection string, opts ...options.Lister[options.FindOneOptions]) (err error) { +func (m *mongoDB) FindOne(ctx context.Context, filter bson.M, result interface{}, collection string, opts ...options.Lister[options.FindOneOptions]) (err error) { source, err := m.getDataSource() if err != nil { return err @@ -292,7 +340,7 @@ func (m *MongoDB) FindOne(ctx context.Context, filter bson.M, result interface{} } // getDeletedData 获取要删除的数据 -func (m *MongoDB) getDeletedData(ctx context.Context, filter bson.M, collection string) (deletedIDs []bson.ObjectID, deletedData []bson.M, err error) { +func (m *mongoDB) getDeletedData(ctx context.Context, filter bson.M, collection string) (deletedIDs []bson.ObjectID, deletedData []bson.M, err error) { // 查询要删除的数据 _, err = m.Find(ctx, filter, &deletedData, collection, nil, nil) // 从查询结果中获取 _id @@ -302,7 +350,7 @@ func (m *MongoDB) getDeletedData(ctx context.Context, filter bson.M, collection return } -func (m *MongoDB) CleanRedis(ctx context.Context, filter bson.M, tenantId interface{}, collection string) (err error) { +func (m *mongoDB) CleanRedis(ctx context.Context, filter bson.M, tenantId interface{}, collection string) (err error) { listKeys := fmt.Sprintf(redis.CleanList, tenantId, collection) keys, err := redis.RedisClient().Keys(ctx, listKeys) if err != nil { @@ -336,7 +384,7 @@ func (m *MongoDB) CleanRedis(ctx context.Context, filter bson.M, tenantId interf return } -func (m *MongoDB) log(ctx context.Context, ids []bson.ObjectID, filter bson.M, collection string, data interface{}, userName, tenantId interface{}, operationType consts.OperationType) { +func (m *mongoDB) log(ctx context.Context, ids []bson.ObjectID, filter bson.M, collection string, data interface{}, userName, tenantId interface{}, operationType consts.OperationType) { // 提前获取 IP 地址,避免异步任务执行时请求已结束 var ipAddress string if request := g.RequestFromCtx(ctx); request != nil { @@ -375,7 +423,7 @@ func (m *MongoDB) log(ctx context.Context, ids []bson.ObjectID, filter bson.M, c } // Insert 插入多条记录 -func (m *MongoDB) Insert(ctx context.Context, documents []interface{}, collection string, opts ...options.Lister[options.InsertManyOptions]) (ids []interface{}, err error) { +func (m *mongoDB) Insert(ctx context.Context, documents []interface{}, collection string, opts ...options.Lister[options.InsertManyOptions]) (ids []interface{}, err error) { source, err := m.getDataSource() if err != nil { return nil, err @@ -426,7 +474,7 @@ func (m *MongoDB) Insert(ctx context.Context, documents []interface{}, collectio } // Delete 删除记录 -func (m *MongoDB) Delete(ctx context.Context, filter bson.M, collection string, opts ...options.Lister[options.DeleteManyOptions]) (count int64, err error) { +func (m *mongoDB) Delete(ctx context.Context, filter bson.M, collection string, opts ...options.Lister[options.DeleteManyOptions]) (count int64, err error) { source, err := m.getDataSource() if err != nil { return 0, err @@ -461,13 +509,13 @@ func (m *MongoDB) Delete(ctx context.Context, filter bson.M, collection string, } // DeleteSoft 假删除记录 -func (m *MongoDB) DeleteSoft(ctx context.Context, filter bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (modifiedCount int64, err error) { +func (m *mongoDB) DeleteSoft(ctx context.Context, filter bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (modifiedCount int64, err error) { update := bson.M{"$set": bson.M{"isDeleted": true}} return m.Update(ctx, filter, update, collection, opts...) } // Update 修改记录 -func (m *MongoDB) Update(ctx context.Context, filter bson.M, update bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (modifiedCount int64, err error) { +func (m *mongoDB) Update(ctx context.Context, filter bson.M, update bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (modifiedCount int64, err error) { source, err := m.getDataSource() if err != nil { return 0, err @@ -526,7 +574,7 @@ func (m *MongoDB) Update(ctx context.Context, filter bson.M, update bson.M, coll } // SaveOrUpdate 批量增加或修改 -func (m *MongoDB) SaveOrUpdate(ctx context.Context, filter []bson.M, update []bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (result *mongo.BulkWriteResult, err error) { +func (m *mongoDB) SaveOrUpdate(ctx context.Context, filter []bson.M, update []bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (result *mongo.BulkWriteResult, err error) { source, err := m.getDataSource() if err != nil { return nil, err