gomod引用
This commit is contained in:
156
mongo/mongo.go
156
mongo/mongo.go
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitee.com/red-future---jilin-g/common/consts"
|
||||
@@ -22,37 +23,166 @@ import (
|
||||
"go.mongodb.org/mongo-driver/v2/mongo/options"
|
||||
)
|
||||
|
||||
var db *mongo.Database
|
||||
var (
|
||||
db *mongo.Database
|
||||
client *mongo.Client
|
||||
isConnected bool
|
||||
mu sync.RWMutex
|
||||
mongoAddr string
|
||||
dbName string
|
||||
healthCtx context.Context
|
||||
healthCancel context.CancelFunc
|
||||
)
|
||||
|
||||
// GetDB 获取 MongoDB 数据库实例
|
||||
func GetDB() *mongo.Database {
|
||||
mu.RLock()
|
||||
defer mu.RUnlock()
|
||||
return db
|
||||
}
|
||||
|
||||
// connect 建立MongoDB连接
|
||||
func connect() error {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
if client != nil {
|
||||
err := client.Disconnect(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// 创建连接选项
|
||||
opt := options.Client().
|
||||
ApplyURI(mongoAddr).
|
||||
SetMaxPoolSize(100).
|
||||
SetMinPoolSize(10).
|
||||
SetMaxConnecting(10).
|
||||
SetConnectTimeout(10 * time.Second)
|
||||
|
||||
var err error
|
||||
client, err = mongo.Connect(opt)
|
||||
if err != nil {
|
||||
isConnected = false
|
||||
glog.Error(context.Background(), "MongoDB连接失败", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// 测试连接
|
||||
testCtx, testCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer testCancel()
|
||||
|
||||
err = client.Ping(testCtx, nil)
|
||||
if err != nil {
|
||||
isConnected = false
|
||||
glog.Error(testCtx, "MongoDB连接测试失败", err)
|
||||
return err
|
||||
}
|
||||
|
||||
db = client.Database(dbName)
|
||||
isConnected = true
|
||||
glog.Info(context.Background(), "✅ MongoDB连接成功")
|
||||
return nil
|
||||
}
|
||||
|
||||
// healthCheck 健康检查协程
|
||||
func healthCheck() {
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-healthCtx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
mu.RLock()
|
||||
currentConnected := isConnected
|
||||
currentClient := client
|
||||
mu.RUnlock()
|
||||
|
||||
if !currentConnected || currentClient == nil {
|
||||
glog.Warning(context.Background(), "MongoDB连接断开,尝试重连")
|
||||
if err := reconnect(); err != nil {
|
||||
glog.Error(context.Background(), "MongoDB重连失败", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// 测试连接状态
|
||||
testCtx, testCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
err := currentClient.Ping(testCtx, nil)
|
||||
testCancel()
|
||||
|
||||
if err != nil {
|
||||
mu.Lock()
|
||||
isConnected = false
|
||||
mu.Unlock()
|
||||
glog.Warning(context.Background(), "MongoDB连接健康检查失败", err)
|
||||
|
||||
// 尝试重连
|
||||
if err := reconnect(); err != nil {
|
||||
glog.Error(context.Background(), "MongoDB重连失败", err)
|
||||
}
|
||||
} else {
|
||||
glog.Debug(context.Background(), "MongoDB连接健康检查通过")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// reconnect 重连函数
|
||||
func reconnect() error {
|
||||
maxRetries := 3
|
||||
retryDelay := 2 * time.Second
|
||||
|
||||
for i := 0; i < maxRetries; i++ {
|
||||
glog.Info(context.Background(), fmt.Sprintf("尝试第%d次重连MongoDB", i+1))
|
||||
|
||||
if err := connect(); err == nil {
|
||||
glog.Info(context.Background(), "MongoDB重连成功")
|
||||
return nil
|
||||
}
|
||||
|
||||
if i < maxRetries-1 {
|
||||
time.Sleep(retryDelay)
|
||||
retryDelay *= 2 // 指数退避
|
||||
}
|
||||
}
|
||||
|
||||
return gerror.New("MongoDB重连失败,已达到最大重试次数")
|
||||
}
|
||||
|
||||
// init 初始化MongoDB连接
|
||||
func init() {
|
||||
// 按需初始化:没有配置 mongo.address 则跳过
|
||||
mongoAddr := g.Cfg().MustGet(context.Background(), "mongo.address").String()
|
||||
mongoAddr = g.Cfg().MustGet(context.Background(), "mongo.address").String()
|
||||
if mongoAddr == "" {
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
opt := options.Client().ApplyURI(mongoAddr)
|
||||
client, err := mongo.Connect(opt)
|
||||
if err != nil {
|
||||
glog.Error(ctx, "mongodb连接失败", err)
|
||||
return
|
||||
}
|
||||
// 创建健康检查上下文
|
||||
healthCtx, healthCancel = context.WithCancel(context.Background())
|
||||
|
||||
// 从连接串中解析数据库名
|
||||
dbName := gstr.SubStr(mongoAddr, strings.LastIndex(mongoAddr, "/")+1, len(mongoAddr))
|
||||
dbName = gstr.SubStr(mongoAddr, strings.LastIndex(mongoAddr, "/")+1, len(mongoAddr))
|
||||
// 如果连接串带有参数(如 ?retryWrites=true),需要去掉参数部分
|
||||
if strings.Contains(dbName, "?") {
|
||||
dbName = gstr.SubStr(dbName, 0, strings.Index(dbName, "?"))
|
||||
}
|
||||
db = client.Database(dbName)
|
||||
glog.Info(ctx, "✅ MongoDB 初始化成功")
|
||||
|
||||
// 初始连接
|
||||
if err := connect(); err != nil {
|
||||
glog.Error(context.Background(), "MongoDB初始连接失败", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 启动健康检查协程
|
||||
go healthCheck()
|
||||
|
||||
glog.Info(context.Background(), "✅ MongoDB初始化完成,连接健康检查已启动")
|
||||
}
|
||||
|
||||
func listOptionsToMap(ctx context.Context, opts ...options.Lister[options.FindOptions]) (m map[string]interface{}) {
|
||||
// 输出opts参数中的值
|
||||
m = make(map[string]interface{})
|
||||
|
||||
Reference in New Issue
Block a user