// ============================================================================= // MongoDB 数据源连接管理 // 负责数据源的连接、重连、健康检查和优雅关闭 // ============================================================================= package mongo import ( "context" "fmt" "net/url" "os" "os/signal" "strings" "sync" "syscall" "time" "gitea.com/red-future/common/log/consts" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/grpool" "github.com/gogf/gf/v2/text/gstr" "github.com/gogf/gf/v2/util/gconv" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" ) // ============================================================================= // 数据源配置结构 // ============================================================================= type DataSourceConfig struct { Name string `json:"name"` Address string `json:"address"` Database string `json:"database"` Username string `json:"username"` Password string `json:"password"` MaxPoolSize int32 `json:"maxPoolSize"` MinPoolSize int32 `json:"minPoolSize"` ConnectTimeout time.Duration `json:"connectTimeout"` } // ============================================================================= // 单个数据源接口 // ============================================================================= type DataSource interface { Name() string Database() *mongo.Database Client() *mongo.Client IsConnected() bool Connect(ctx context.Context) error Reconnect(ctx context.Context) error Close(ctx context.Context) error } // ============================================================================= // 数据源实现 // ============================================================================= type BaseDataSource struct { config *DataSourceConfig client *mongo.Client database *mongo.Database isConnected bool mu sync.RWMutex lastError error lastErrorTime time.Time } func NewBaseDataSource(config *DataSourceConfig) *BaseDataSource { return &BaseDataSource{ config: config, isConnected: false, } } func (d *BaseDataSource) Name() string { return d.config.Name } func (d *BaseDataSource) Database() *mongo.Database { d.mu.RLock() defer d.mu.RUnlock() return d.database } func (d *BaseDataSource) Client() *mongo.Client { d.mu.RLock() defer d.mu.RUnlock() return d.client } func (d *BaseDataSource) IsConnected() bool { d.mu.RLock() defer d.mu.RUnlock() return d.isConnected && d.client != nil } func (d *BaseDataSource) Connect(ctx context.Context) error { d.mu.Lock() defer d.mu.Unlock() if d.client != nil { d.client.Disconnect(ctx) } // 解析数据库名 dbName := d.config.Database if strings.Contains(dbName, "?") { dbName = gstr.SubStr(dbName, 0, strings.Index(dbName, "?")) } // 构建连接URI connectionURI := fmt.Sprintf("mongodb://%s", d.config.Address) // 如果配置了用户名和密码,添加到URI中 if d.config.Username != "" { // URL编码用户名和密码,正确处理特殊字符 encodedUsername := url.QueryEscape(d.config.Username) encodedPassword := url.QueryEscape(d.config.Password) // 构建认证信息 authInfo := fmt.Sprintf("%s:%s@", encodedUsername, encodedPassword) // 将认证信息插入到URI中 connectionURI = fmt.Sprintf("mongodb://%s%s", authInfo, d.config.Address) } // 构建连接选项 opt := options.Client(). ApplyURI(connectionURI). SetMaxPoolSize(uint64(d.config.MaxPoolSize)). SetMinPoolSize(uint64(d.config.MinPoolSize)). SetConnectTimeout(d.config.ConnectTimeout). SetMaxConnecting(10). SetServerSelectionTimeout(10 * time.Second). SetHeartbeatInterval(10 * time.Second). SetMaxConnIdleTime(60 * time.Second). SetRetryWrites(true). SetRetryReads(true).SetMonitor(commandMonitor()) var err error d.client, err = mongo.Connect(opt) if err != nil { d.isConnected = false d.lastError = err d.lastErrorTime = time.Now() return fmt.Errorf("datasource [%s] connection failed: %w", d.config.Name, err) } // 测试连接 pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if err = d.client.Ping(pingCtx, nil); err != nil { d.isConnected = false d.lastError = err d.lastErrorTime = time.Now() return fmt.Errorf("datasource [%s] ping failed: %w", d.config.Name, err) } d.database = d.client.Database(dbName) d.isConnected = true d.lastError = nil glog.Infof(ctx, "✅ datasource [%s] connected successfully", d.config.Name) return nil } func (d *BaseDataSource) Reconnect(ctx context.Context) error { glog.Infof(ctx, "🔄 reconnecting datasource [%s]", d.config.Name) return d.Connect(ctx) } func (d *BaseDataSource) Close(ctx context.Context) error { d.mu.Lock() defer d.mu.Unlock() if d.client != nil { disconnectCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if err := d.client.Disconnect(disconnectCtx); err != nil { return fmt.Errorf("datasource [%s] close failed: %w", d.config.Name, err) } } d.isConnected = false glog.Infof(ctx, "datasource [%s] closed", d.config.Name) return nil } // ============================================================================= // 多数据源管理器 // ============================================================================= type DataSourceManager struct { sources map[string]DataSource mu sync.RWMutex ctx context.Context cancel context.CancelFunc started bool maxRetries int } var ( globalManager *DataSourceManager managerOnce sync.Once ) // GetManager 获取全局管理器 func GetManager() *DataSourceManager { managerOnce.Do(func() { ctx, cancel := context.WithCancel(context.Background()) globalManager = &DataSourceManager{ sources: make(map[string]DataSource), ctx: ctx, cancel: cancel, started: false, maxRetries: 3, } }) return globalManager } // RegisterDataSource 注册数据源 func (m *DataSourceManager) RegisterDataSource(config *DataSourceConfig) error { m.mu.Lock() defer m.mu.Unlock() if _, exists := m.sources[config.Name]; exists { return fmt.Errorf("datasource [%s] already exists", config.Name) } source := NewBaseDataSource(config) m.sources[config.Name] = source return nil } // GetDataSource 获取数据源 func (m *DataSourceManager) GetDataSource(name string) (DataSource, error) { m.mu.RLock() defer m.mu.RUnlock() source, exists := m.sources[name] if !exists { return nil, fmt.Errorf("datasource [%s] not found", name) } return source, nil } // GetAllDataSourceNames 获取所有数据源名称 func (m *DataSourceManager) GetAllDataSourceNames() []string { m.mu.RLock() defer m.mu.RUnlock() names := make([]string, 0, len(m.sources)) for name := range m.sources { names = append(names, name) } return names } // init 初始化多数据源 func init() { logPool = grpool.New(1) serverName = g.Cfg().MustGet(context.TODO(), "server.name").String() LogRedisKey = fmt.Sprintf(consts.StreamKey, serverName) ctx := context.Background() // 从配置初始化多数据源 if err := manager.InitializeFromConfig(ctx); err != nil { glog.Errorf(ctx, "❌ Failed to initialize MongoDB datasources: %v", err) } else { glog.Infof(ctx, "✅ MongoDB datasources initialized: %v", manager.GetAllDataSourceNames()) } // 启动健康检查 manager.StartHealthCheck() // 设置优雅关闭 setupGracefulShutdown() } // InitializeFromConfig 从配置初始化数据源 // 动态读取 config.yml 中 mongo 下的所有配置项 func (m *DataSourceManager) InitializeFromConfig(ctx context.Context) error { var firstErr error // 获取 mongo 配置下的所有子键 mongoConfig := g.Cfg().MustGet(ctx, "mongo") if mongoConfig.IsNil() { glog.Warningf(ctx, "no mongo configuration found in config.yml") return nil } // 将配置转换为 map configMap := mongoConfig.Map() if configMap == nil { glog.Warningf(ctx, "mongo configuration is not a map") return nil } // 遍历所有 mongo 子配置 for name, subConfig := range configMap { // 跳过非对象类型的配置 subMap, ok := subConfig.(map[string]interface{}) if !ok { continue } // 检查是否有 address 配置 address, hasAddress := subMap["address"] if !hasAddress || gconv.String(address) == "" { continue } // 构建数据源配置 config := &DataSourceConfig{ Name: name, Address: gconv.String(address), Database: gconv.String(subMap["database"]), Username: gconv.String(subMap["username"]), Password: gconv.String(subMap["password"]), MaxPoolSize: int32(gconv.Int(subMap["maxPoolSize"])), MinPoolSize: int32(gconv.Int(subMap["minPoolSize"])), ConnectTimeout: gconv.Duration(subMap["connectTimeout"]), } // 设置默认值 if config.MaxPoolSize == 0 { config.MaxPoolSize = 100 } if config.MinPoolSize == 0 { config.MinPoolSize = 10 } if config.ConnectTimeout == 0 { config.ConnectTimeout = 10 * time.Second } // 注册数据源 if err := m.RegisterDataSource(config); err != nil { glog.Errorf(ctx, "failed to register datasource [%s]: %v", name, err) if firstErr == nil { firstErr = err } continue } // 连接数据源 source, _ := m.GetDataSource(name) if err := source.Connect(ctx); err != nil { glog.Errorf(ctx, "failed to initialize datasource [%s]: %v", name, err) if firstErr == nil { firstErr = err } } } return firstErr } // StartHealthCheck 启动健康检查 func (m *DataSourceManager) StartHealthCheck() { if m.started { return } m.started = true go m.healthCheckLoop() } // healthCheckLoop 健康检查循环 func (m *DataSourceManager) healthCheckLoop() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-m.ctx.Done(): return case <-ticker.C: m.checkAndReconnect() } } } // checkAndReconnect 检查并重新连接 func (m *DataSourceManager) checkAndReconnect() { m.mu.RLock() defer m.mu.RUnlock() for name, source := range m.sources { if !source.IsConnected() { glog.Warningf(context.Background(), "datasource [%s] disconnected, attempting reconnect", name) reconnectCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := source.Reconnect(reconnectCtx); err != nil { glog.Errorf(reconnectCtx, "datasource [%s] reconnect failed: %v", name, err) } else { glog.Infof(reconnectCtx, "✅ datasource [%s] reconnected successfully", name) } } } } // CloseAll 关闭所有数据源 func (m *DataSourceManager) CloseAll(ctx context.Context) error { m.cancel() m.mu.RLock() defer m.mu.RUnlock() var lastErr error for name, source := range m.sources { if err := source.Close(ctx); err != nil { glog.Errorf(ctx, "failed to close datasource [%s]: %v", name, err) lastErr = err } } return lastErr } // setupGracefulShutdown 设置优雅关闭 func setupGracefulShutdown() { go func() { sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) <-sigCh ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() glog.Info(ctx, "🔄 Shutting down MongoDB connections...") if err := manager.CloseAll(ctx); err != nil { glog.Errorf(ctx, "❌ Failed to close MongoDB connections: %v", err) } else { glog.Info(ctx, "✅ MongoDB connections closed successfully") } }() }