package rabbitmq import ( "context" "sync" "time" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/util/gconv" amqp "github.com/rabbitmq/amqp091-go" ) var ( conn *amqp.Connection channel *amqp.Channel rabbitmqOnce sync.Once rabbitmqMu sync.RWMutex closeWatcher chan struct{} // 用于停止监听 goroutine watcherStarted bool // 防止重复启动监听 ) // Config RabbitMQ 配置 type Config struct { Host string Port int Username string Password string VHost string } // Init 初始化 RabbitMQ 连接 func Init(ctx context.Context, cfg *Config) error { var err error rabbitmqOnce.Do(func() { // 构建连接字符串 url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost // 创建连接 conn, err = amqp.Dial(url) if err != nil { g.Log().Errorf(ctx, "RabbitMQ 连接失败: %v", err) return } // 创建 Channel channel, err = conn.Channel() if err != nil { g.Log().Errorf(ctx, "创建 RabbitMQ Channel 失败: %v", err) return } // 初始化关闭监听器 closeWatcher = make(chan struct{}) // 监听连接关闭(只启动一次) if !watcherStarted { go handleConnectionClose(ctx) watcherStarted = true } g.Log().Info(ctx, "RabbitMQ 连接成功") }) return err } // InitFromConfig 从配置文件初始化 func InitFromConfig(ctx context.Context) error { cfg := &Config{ Host: g.Cfg().MustGet(ctx, "rabbitmq.host").String(), Port: g.Cfg().MustGet(ctx, "rabbitmq.port").Int(), Username: g.Cfg().MustGet(ctx, "rabbitmq.username").String(), Password: g.Cfg().MustGet(ctx, "rabbitmq.password").String(), VHost: g.Cfg().MustGet(ctx, "rabbitmq.vhost", "/").String(), } return Init(ctx, cfg) } // GetChannel 获取 Channel func GetChannel() (*amqp.Channel, error) { rabbitmqMu.RLock() defer rabbitmqMu.RUnlock() if channel == nil || channel.IsClosed() { return nil, gerror.New("RabbitMQ Channel 未初始化或已关闭") } return channel, nil } // GetConnection 获取连接 func GetConnection() (*amqp.Connection, error) { rabbitmqMu.RLock() defer rabbitmqMu.RUnlock() if conn == nil || conn.IsClosed() { return nil, gerror.New("RabbitMQ 连接未初始化或已关闭") } return conn, nil } // handleConnectionClose 监听连接关闭并重连 func handleConnectionClose(ctx context.Context) { for { // 检查是否需要停止监听 select { case <-closeWatcher: g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态") return default: } rabbitmqMu.RLock() currentConn := conn rabbitmqMu.RUnlock() if currentConn == nil { return } // 创建关闭通知 channel closeErr := make(chan *amqp.Error, 1) currentConn.NotifyClose(closeErr) // 等待连接关闭或停止信号 select { case err := <-closeErr: if err != nil { g.Log().Errorf(ctx, "RabbitMQ 连接关闭: %v,尝试重连...", err) reconnect(ctx) } case <-closeWatcher: g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态") return } } } // reconnect 重新连接 func reconnect(ctx context.Context) { rabbitmqMu.Lock() defer rabbitmqMu.Unlock() for i := 0; i < 10; i++ { time.Sleep(time.Duration(i+1) * time.Second) cfg := &Config{ Host: g.Cfg().MustGet(ctx, "rabbitmq.host").String(), Port: g.Cfg().MustGet(ctx, "rabbitmq.port").Int(), Username: g.Cfg().MustGet(ctx, "rabbitmq.username").String(), Password: g.Cfg().MustGet(ctx, "rabbitmq.password").String(), VHost: g.Cfg().MustGet(ctx, "rabbitmq.vhost", "/").String(), } url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost var err error conn, err = amqp.Dial(url) if err != nil { g.Log().Errorf(ctx, "重连失败 (尝试 %d/10): %v", i+1, err) continue } channel, err = conn.Channel() if err != nil { g.Log().Errorf(ctx, "创建 Channel 失败 (尝试 %d/10): %v", i+1, err) continue } g.Log().Info(ctx, "RabbitMQ 重连成功") // 不再重复启动监听 goroutine return } g.Log().Fatal(ctx, "RabbitMQ 重连失败,已达到最大重试次数") } // Close 关闭连接 func Close(ctx context.Context) (err error) { rabbitmqMu.Lock() defer rabbitmqMu.Unlock() // 停止监听 goroutine if closeWatcher != nil { close(closeWatcher) closeWatcher = nil } if channel != nil { if err = channel.Close(); err != nil { g.Log().Errorf(ctx, "关闭 RabbitMQ Channel 失败: %v", err) } channel = nil } if conn != nil { if err = conn.Close(); err != nil { g.Log().Errorf(ctx, "关闭 RabbitMQ 连接失败: %v", err) return } conn = nil } watcherStarted = false g.Log().Info(ctx, "RabbitMQ 连接已关闭") return }