Files
common/message/connection_rabbitmq.go

165 lines
4.7 KiB
Go
Raw Permalink Normal View History

package message
import (
"context"
"fmt"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
amqp "github.com/rabbitmq/amqp091-go"
"sync"
)
var (
muRabbitMQ sync.RWMutex
rabbitmqConns map[string]*amqp.Connection
rabbitmqChannels map[string]*amqp.Channel
)
func init() {
rabbitmqConns = make(map[string]*amqp.Connection)
rabbitmqChannels = make(map[string]*amqp.Channel)
}
// rabbitmqConnect 建立 RabbitMQ 连接
func rabbitmqConnect(ctx context.Context, name string) error {
if g.Cfg().MustGet(ctx, "rabbitmq").IsEmpty() {
g.Log().Errorf(ctx, "❌ RabbitMQ 配置不存在")
return fmt.Errorf("RabbitMQ Configuration does not exist")
}
// 确定数据源名称
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
g.Log().Infof(ctx, "🔔 RabbitMQ [%s] 开始创建连接", dsName)
muRabbitMQ.Lock()
defer muRabbitMQ.Unlock()
// 安全地关闭旧连接(仅针对该数据源)
if oldConn, exists := rabbitmqConns[dsName]; exists && oldConn != nil && !oldConn.IsClosed() {
oldConn.Close()
}
if oldChannel, exists := rabbitmqChannels[dsName]; exists && oldChannel != nil && !oldChannel.IsClosed() {
oldChannel.Close()
}
delete(rabbitmqConns, dsName)
delete(rabbitmqChannels, dsName)
// 从配置文件读取 RabbitMQ 配置
host := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.host", dsName)).String()
port := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.port", dsName)).Int()
username := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.username", dsName)).String()
password := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.password", dsName)).String()
vHost := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.vhost", dsName), "/").String()
if g.IsEmpty(host) {
return fmt.Errorf("❌ RabbitMQ 配置错误: host 不能为空 (数据源: %s)", dsName)
}
if g.IsEmpty(port) {
return fmt.Errorf("❌ RabbitMQ 配置错误: port 不能为空 (数据源: %s)", dsName)
}
if g.IsEmpty(username) {
return fmt.Errorf("❌ RabbitMQ 配置错误: username 不能为空 (数据源: %s)", dsName)
}
if g.IsEmpty(password) {
return fmt.Errorf("❌ RabbitMQ 配置错误: password 不能为空 (数据源: %s)", dsName)
}
// 构建连接 URL
url := "amqp://" + username + ":" + password + "@" + host + ":" + gconv.String(port) + "/" + vHost
// 创建连接
newConn, err := amqp.Dial(url)
if err != nil {
g.Log().Errorf(ctx, "❌ RabbitMQ [%s] 连接失败: %v", dsName, err)
return err
}
// 创建 Channel
newChannel, err := newConn.Channel()
if err != nil {
g.Log().Errorf(ctx, "❌ RabbitMQ [%s] 创建 Channel 失败: %v", dsName, err)
newConn.Close()
return err
}
// 保存连接和 Channel
rabbitmqConns[dsName] = newConn
rabbitmqChannels[dsName] = newChannel
g.Log().Infof(ctx, "✅ RabbitMQ [%s] 连接成功", dsName)
return nil
}
// rabbitmqPing 检测 RabbitMQ 连接状态
func rabbitmqPing(ctx context.Context, name string) bool {
// 确定数据源名称
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
muRabbitMQ.RLock()
defer muRabbitMQ.RUnlock()
conn, exists := rabbitmqConns[dsName]
channel, channelExists := rabbitmqChannels[dsName]
if !exists || conn == nil || conn.IsClosed() || !channelExists || channel == nil || channel.IsClosed() {
g.Log().Errorf(ctx, "❌ RabbitMQ [%s] 连接已关闭或不可用", dsName)
return false
}
g.Log().Infof(ctx, "📊 RabbitMQ [%s] 连接正常", dsName)
return true
}
// rabbitmqClose 关闭 RabbitMQ 连接
func rabbitmqClose(ctx context.Context, name string) error {
// 确定数据源名称
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
muRabbitMQ.Lock()
defer muRabbitMQ.Unlock()
var lastErr error
if channel, exists := rabbitmqChannels[dsName]; exists && channel != nil && !channel.IsClosed() {
if err := channel.Close(); err != nil {
g.Log().Errorf(ctx, "❌ RabbitMQ [%s] 关闭 Channel 失败: %v", dsName, err)
lastErr = err
}
}
delete(rabbitmqChannels, dsName)
if conn, exists := rabbitmqConns[dsName]; exists && conn != nil && !conn.IsClosed() {
if err := conn.Close(); err != nil {
g.Log().Errorf(ctx, "❌ RabbitMQ [%s] 关闭连接失败: %v", dsName, err)
lastErr = err
}
}
delete(rabbitmqConns, dsName)
g.Log().Infof(ctx, "✅ RabbitMQ [%s] 连接已关闭", dsName)
return lastErr
}
// getRabbitMQConn 获取 RabbitMQ 连接(内部使用)
func getRabbitMQConn(name string) *amqp.Connection {
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
return rabbitmqConns[dsName]
}
// getRabbitMQChannel 获取 RabbitMQ Channel内部使用
func getRabbitMQChannel(name string) *amqp.Channel {
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
return rabbitmqChannels[dsName]
}