主要变更: 1. 重构NATS、RabbitMQ和Redis连接管理模块,支持多数据源配置 2. 统一连接管理接口,增加数据源名称参数 3. 优化连接状态检查和错误处理 4. 增加连接池管理和资源清理机制 5. 改进日志输出格式和内容
168 lines
4.2 KiB
Go
168 lines
4.2 KiB
Go
package message
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gogf/gf/v2/frame/g"
|
|
"github.com/nats-io/nats.go"
|
|
)
|
|
|
|
var (
|
|
muNats sync.RWMutex
|
|
natsConns map[string]*nats.Conn // key: 数据源名称, value: NATS 连接
|
|
natsJS map[string]nats.JetStreamContext // key: 数据源名称, value: JetStream 上下文
|
|
)
|
|
|
|
func init() {
|
|
natsConns = make(map[string]*nats.Conn)
|
|
natsJS = make(map[string]nats.JetStreamContext)
|
|
}
|
|
|
|
// natsConnect 建立 NATS 连接
|
|
func natsConnect(ctx context.Context, name string) error {
|
|
|
|
if g.Cfg().MustGet(ctx, "nats").IsEmpty() {
|
|
g.Log().Errorf(ctx, "❌ NATS 配置不存在")
|
|
return fmt.Errorf("NATS Configuration does not exist")
|
|
}
|
|
// 确定数据源名称
|
|
dsName := "default"
|
|
if !g.IsEmpty(name) {
|
|
dsName = name
|
|
}
|
|
|
|
g.Log().Infof(ctx, "🔔 NATS [%s] 开始创建连接", dsName)
|
|
muNats.Lock()
|
|
defer muNats.Unlock()
|
|
|
|
// 安全地关闭旧连接(仅针对该数据源)
|
|
if oldConn, exists := natsConns[dsName]; exists && oldConn != nil && !oldConn.IsClosed() {
|
|
oldConn.Close()
|
|
delete(natsConns, dsName)
|
|
delete(natsJS, dsName)
|
|
}
|
|
|
|
// 从配置文件读取 NATS 地址
|
|
natsURL := g.Cfg().MustGet(ctx, fmt.Sprintf("nats.%s.url", dsName)).String()
|
|
if natsURL == "" {
|
|
// 默认使用本地地址
|
|
natsURL = nats.DefaultURL
|
|
}
|
|
|
|
// 连接选项配置
|
|
opts := []nats.Option{
|
|
nats.Name(fmt.Sprintf("goframe-nats-client-%s", dsName)),
|
|
nats.NoReconnect(),
|
|
nats.PingInterval(10 * time.Second),
|
|
nats.MaxPingsOutstanding(5),
|
|
nats.ClosedHandler(func(nc *nats.Conn) {
|
|
g.Log().Infof(ctx, "NATS [%s] 连接已关闭: %s", dsName, nc.ConnectedUrl())
|
|
}),
|
|
nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
|
|
g.Log().Errorf(ctx, "❌ NATS [%s] 错误: %v", dsName, err)
|
|
}),
|
|
}
|
|
|
|
newConn, err := nats.Connect(natsURL, opts...)
|
|
if err != nil {
|
|
g.Log().Errorf(ctx, "❌ NATS [%s] 连接失败: %v", dsName, err)
|
|
return err
|
|
}
|
|
|
|
// 等待连接就绪
|
|
if newConn.Status() != nats.CONNECTED {
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
// 连接超时,清理资源
|
|
newConn.Close()
|
|
g.Log().Errorf(ctx, "❌ NATS [%s] 连接超时", dsName)
|
|
return fmt.Errorf("NATS 连接超时")
|
|
case <-newConn.StatusChanged(nats.CONNECTED):
|
|
// 连接成功
|
|
g.Log().Infof(ctx, "✅ NATS [%s] 连接成功: %s", dsName, newConn.ConnectedUrl())
|
|
case <-ctx.Done():
|
|
// 外部上下文被取消,清理资源
|
|
newConn.Close()
|
|
g.Log().Errorf(ctx, "NATS [%s] 连接被取消: %v", dsName, ctx.Err())
|
|
return fmt.Errorf("NATS 连接被取消: %w", ctx.Err())
|
|
}
|
|
}
|
|
|
|
// 创建 JetStream 实例
|
|
newJS, err := newConn.JetStream(nats.MaxWait(10 * time.Second))
|
|
if err != nil {
|
|
// 创建 JetStream 失败,清理连接
|
|
newConn.Close()
|
|
g.Log().Errorf(ctx, "❌ NATS [%s] 创建 JetStream 失败: %v", dsName, err)
|
|
return err
|
|
}
|
|
|
|
// 保存连接和 JetStream 上下文
|
|
natsConns[dsName] = newConn
|
|
natsJS[dsName] = newJS
|
|
|
|
return nil
|
|
}
|
|
|
|
// natsPing 检测 NATS 连接状态
|
|
func natsPing(ctx context.Context, name string) bool {
|
|
// 确定数据源名称
|
|
dsName := "default"
|
|
if !g.IsEmpty(name) {
|
|
dsName = name
|
|
}
|
|
|
|
muNats.RLock()
|
|
defer muNats.RUnlock()
|
|
|
|
nc, exists := natsConns[dsName]
|
|
if !exists || nc == nil || nc.IsClosed() || nc.Status() != nats.CONNECTED {
|
|
g.Log().Errorf(ctx, "❌ NATS [%s] 连接已关闭或不可用", dsName)
|
|
return false
|
|
}
|
|
g.Log().Infof(ctx, "📊 NATS [%s] 连接正常: %s", dsName, nc.ConnectedUrl())
|
|
return true
|
|
}
|
|
|
|
// natsClose 关闭 NATS 连接
|
|
func natsClose(ctx context.Context, name string) error {
|
|
// 确定数据源名称
|
|
dsName := "default"
|
|
if !g.IsEmpty(name) {
|
|
dsName = name
|
|
}
|
|
|
|
muNats.Lock()
|
|
defer muNats.Unlock()
|
|
|
|
if nc, exists := natsConns[dsName]; exists && nc != nil && !nc.IsClosed() {
|
|
nc.Close()
|
|
}
|
|
delete(natsConns, dsName)
|
|
delete(natsJS, dsName)
|
|
|
|
g.Log().Infof(ctx, "✅ NATS [%s] 连接已关闭", dsName)
|
|
return nil
|
|
}
|
|
|
|
// getNatsConn 获取 NATS 连接(内部使用)
|
|
func getNatsConn(name string) *nats.Conn {
|
|
dsName := "default"
|
|
if !g.IsEmpty(name) {
|
|
dsName = name
|
|
}
|
|
return natsConns[dsName]
|
|
}
|
|
|
|
// getNatsJS 获取 JetStream 上下文(内部使用)
|
|
func getNatsJS(name string) nats.JetStreamContext {
|
|
dsName := "default"
|
|
if !g.IsEmpty(name) {
|
|
dsName = name
|
|
}
|
|
return natsJS[dsName]
|
|
}
|