From 4380527eff141129b3da671622c9af5bde7a7212 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=96=8C?= <259278618@qq.com> Date: Fri, 19 Dec 2025 12:42:37 +0800 Subject: [PATCH] =?UTF-8?q?gomod=E5=BC=95=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mongo/mongo.go | 156 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 143 insertions(+), 13 deletions(-) diff --git a/mongo/mongo.go b/mongo/mongo.go index 394cec0..8bb52c8 100644 --- a/mongo/mongo.go +++ b/mongo/mongo.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" "gitee.com/red-future---jilin-g/common/consts" @@ -22,37 +23,166 @@ import ( "go.mongodb.org/mongo-driver/v2/mongo/options" ) -var db *mongo.Database +var ( + db *mongo.Database + client *mongo.Client + isConnected bool + mu sync.RWMutex + mongoAddr string + dbName string + healthCtx context.Context + healthCancel context.CancelFunc +) // GetDB 获取 MongoDB 数据库实例 func GetDB() *mongo.Database { + mu.RLock() + defer mu.RUnlock() return db } +// connect 建立MongoDB连接 +func connect() error { + mu.Lock() + defer mu.Unlock() + + if client != nil { + err := client.Disconnect(context.Background()) + if err != nil { + return err + } + } + + // 创建连接选项 + opt := options.Client(). + ApplyURI(mongoAddr). + SetMaxPoolSize(100). + SetMinPoolSize(10). + SetMaxConnecting(10). + SetConnectTimeout(10 * time.Second) + + var err error + client, err = mongo.Connect(opt) + if err != nil { + isConnected = false + glog.Error(context.Background(), "MongoDB连接失败", err) + return err + } + + // 测试连接 + testCtx, testCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer testCancel() + + err = client.Ping(testCtx, nil) + if err != nil { + isConnected = false + glog.Error(testCtx, "MongoDB连接测试失败", err) + return err + } + + db = client.Database(dbName) + isConnected = true + glog.Info(context.Background(), "✅ MongoDB连接成功") + return nil +} + +// healthCheck 健康检查协程 +func healthCheck() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-healthCtx.Done(): + return + case <-ticker.C: + mu.RLock() + currentConnected := isConnected + currentClient := client + mu.RUnlock() + + if !currentConnected || currentClient == nil { + glog.Warning(context.Background(), "MongoDB连接断开,尝试重连") + if err := reconnect(); err != nil { + glog.Error(context.Background(), "MongoDB重连失败", err) + } + continue + } + + // 测试连接状态 + testCtx, testCancel := context.WithTimeout(context.Background(), 5*time.Second) + err := currentClient.Ping(testCtx, nil) + testCancel() + + if err != nil { + mu.Lock() + isConnected = false + mu.Unlock() + glog.Warning(context.Background(), "MongoDB连接健康检查失败", err) + + // 尝试重连 + if err := reconnect(); err != nil { + glog.Error(context.Background(), "MongoDB重连失败", err) + } + } else { + glog.Debug(context.Background(), "MongoDB连接健康检查通过") + } + } + } +} + +// reconnect 重连函数 +func reconnect() error { + maxRetries := 3 + retryDelay := 2 * time.Second + + for i := 0; i < maxRetries; i++ { + glog.Info(context.Background(), fmt.Sprintf("尝试第%d次重连MongoDB", i+1)) + + if err := connect(); err == nil { + glog.Info(context.Background(), "MongoDB重连成功") + return nil + } + + if i < maxRetries-1 { + time.Sleep(retryDelay) + retryDelay *= 2 // 指数退避 + } + } + + return gerror.New("MongoDB重连失败,已达到最大重试次数") +} + +// init 初始化MongoDB连接 func init() { // 按需初始化:没有配置 mongo.address 则跳过 - mongoAddr := g.Cfg().MustGet(context.Background(), "mongo.address").String() + mongoAddr = g.Cfg().MustGet(context.Background(), "mongo.address").String() if mongoAddr == "" { return } - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - opt := options.Client().ApplyURI(mongoAddr) - client, err := mongo.Connect(opt) - if err != nil { - glog.Error(ctx, "mongodb连接失败", err) - return - } + // 创建健康检查上下文 + healthCtx, healthCancel = context.WithCancel(context.Background()) + // 从连接串中解析数据库名 - dbName := gstr.SubStr(mongoAddr, strings.LastIndex(mongoAddr, "/")+1, len(mongoAddr)) + dbName = gstr.SubStr(mongoAddr, strings.LastIndex(mongoAddr, "/")+1, len(mongoAddr)) // 如果连接串带有参数(如 ?retryWrites=true),需要去掉参数部分 if strings.Contains(dbName, "?") { dbName = gstr.SubStr(dbName, 0, strings.Index(dbName, "?")) } - db = client.Database(dbName) - glog.Info(ctx, "✅ MongoDB 初始化成功") + + // 初始连接 + if err := connect(); err != nil { + glog.Error(context.Background(), "MongoDB初始连接失败", err) + return + } + + // 启动健康检查协程 + go healthCheck() + + glog.Info(context.Background(), "✅ MongoDB初始化完成,连接健康检查已启动") } + func listOptionsToMap(ctx context.Context, opts ...options.Lister[options.FindOptions]) (m map[string]interface{}) { // 输出opts参数中的值 m = make(map[string]interface{})