diff --git a/rabbitmq/rabbitmq_client.go b/rabbitmq/rabbitmq_client.go new file mode 100644 index 0000000..d6a4585 --- /dev/null +++ b/rabbitmq/rabbitmq_client.go @@ -0,0 +1,210 @@ +package rabbitmq + +import ( + "context" + "sync" + "time" + + "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/util/gconv" + amqp "github.com/rabbitmq/amqp091-go" +) + +var ( + conn *amqp.Connection + channel *amqp.Channel + rabbitmqOnce sync.Once + rabbitmqMu sync.RWMutex + closeWatcher chan struct{} // 用于停止监听 goroutine + watcherStarted bool // 防止重复启动监听 +) + +// Config RabbitMQ 配置 +type Config struct { + Host string + Port int + Username string + Password string + VHost string +} + +// Init 初始化 RabbitMQ 连接 +func Init(ctx context.Context, cfg *Config) error { + var err error + rabbitmqOnce.Do(func() { + // 构建连接字符串 + url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost + + // 创建连接 + conn, err = amqp.Dial(url) + if err != nil { + g.Log().Errorf(ctx, "RabbitMQ 连接失败: %v", err) + return + } + + // 创建 Channel + channel, err = conn.Channel() + if err != nil { + g.Log().Errorf(ctx, "创建 RabbitMQ Channel 失败: %v", err) + return + } + + // 初始化关闭监听器 + closeWatcher = make(chan struct{}) + + // 监听连接关闭(只启动一次) + if !watcherStarted { + go handleConnectionClose(ctx) + watcherStarted = true + } + + g.Log().Info(ctx, "RabbitMQ 连接成功") + }) + + return err +} + +// InitFromConfig 从配置文件初始化 +func InitFromConfig(ctx context.Context) error { + cfg := &Config{ + Host: g.Cfg().MustGet(ctx, "rabbitmq.host").String(), + Port: g.Cfg().MustGet(ctx, "rabbitmq.port").Int(), + Username: g.Cfg().MustGet(ctx, "rabbitmq.username").String(), + Password: g.Cfg().MustGet(ctx, "rabbitmq.password").String(), + VHost: g.Cfg().MustGet(ctx, "rabbitmq.vhost", "/").String(), + } + + return Init(ctx, cfg) +} + +// GetChannel 获取 Channel +func GetChannel() (*amqp.Channel, error) { + rabbitmqMu.RLock() + defer rabbitmqMu.RUnlock() + + if channel == nil || channel.IsClosed() { + return nil, gerror.New("RabbitMQ Channel 未初始化或已关闭") + } + + return channel, nil +} + +// GetConnection 获取连接 +func GetConnection() (*amqp.Connection, error) { + rabbitmqMu.RLock() + defer rabbitmqMu.RUnlock() + + if conn == nil || conn.IsClosed() { + return nil, gerror.New("RabbitMQ 连接未初始化或已关闭") + } + + return conn, nil +} + +// handleConnectionClose 监听连接关闭并重连 +func handleConnectionClose(ctx context.Context) { + for { + // 检查是否需要停止监听 + select { + case <-closeWatcher: + g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态") + return + default: + } + + rabbitmqMu.RLock() + currentConn := conn + rabbitmqMu.RUnlock() + + if currentConn == nil { + return + } + + // 创建关闭通知 channel + closeErr := make(chan *amqp.Error, 1) + currentConn.NotifyClose(closeErr) + + // 等待连接关闭或停止信号 + select { + case err := <-closeErr: + if err != nil { + g.Log().Errorf(ctx, "RabbitMQ 连接关闭: %v,尝试重连...", err) + reconnect(ctx) + } + case <-closeWatcher: + g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态") + return + } + } +} + +// reconnect 重新连接 +func reconnect(ctx context.Context) { + rabbitmqMu.Lock() + defer rabbitmqMu.Unlock() + + for i := 0; i < 10; i++ { + time.Sleep(time.Duration(i+1) * time.Second) + + cfg := &Config{ + Host: g.Cfg().MustGet(ctx, "rabbitmq.host").String(), + Port: g.Cfg().MustGet(ctx, "rabbitmq.port").Int(), + Username: g.Cfg().MustGet(ctx, "rabbitmq.username").String(), + Password: g.Cfg().MustGet(ctx, "rabbitmq.password").String(), + VHost: g.Cfg().MustGet(ctx, "rabbitmq.vhost", "/").String(), + } + + url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost + + var err error + conn, err = amqp.Dial(url) + if err != nil { + g.Log().Errorf(ctx, "重连失败 (尝试 %d/10): %v", i+1, err) + continue + } + + channel, err = conn.Channel() + if err != nil { + g.Log().Errorf(ctx, "创建 Channel 失败 (尝试 %d/10): %v", i+1, err) + continue + } + + g.Log().Info(ctx, "RabbitMQ 重连成功") + // 不再重复启动监听 goroutine + return + } + + g.Log().Fatal(ctx, "RabbitMQ 重连失败,已达到最大重试次数") +} + +// Close 关闭连接 +func Close(ctx context.Context) (err error) { + rabbitmqMu.Lock() + defer rabbitmqMu.Unlock() + + // 停止监听 goroutine + if closeWatcher != nil { + close(closeWatcher) + closeWatcher = nil + } + + if channel != nil { + if err = channel.Close(); err != nil { + g.Log().Errorf(ctx, "关闭 RabbitMQ Channel 失败: %v", err) + } + channel = nil + } + + if conn != nil { + if err = conn.Close(); err != nil { + g.Log().Errorf(ctx, "关闭 RabbitMQ 连接失败: %v", err) + return + } + conn = nil + } + + watcherStarted = false + g.Log().Info(ctx, "RabbitMQ 连接已关闭") + return +}