2026-01-29 13:55:32 +08:00
|
|
|
|
package message
|
2026-01-13 14:15:45 +08:00
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"context"
|
|
|
|
|
|
"fmt"
|
|
|
|
|
|
"sync"
|
|
|
|
|
|
"sync/atomic"
|
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
|
|
"github.com/gogf/gf/v2/frame/g"
|
|
|
|
|
|
"github.com/nats-io/nats.go"
|
|
|
|
|
|
"github.com/nats-io/nats.go/jetstream"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
|
nc *nats.Conn
|
|
|
|
|
|
js jetstream.JetStream
|
|
|
|
|
|
inited bool
|
2026-01-29 13:55:32 +08:00
|
|
|
|
natsMu sync.RWMutex
|
2026-01-13 14:15:45 +08:00
|
|
|
|
natsURL string
|
|
|
|
|
|
healthCtx context.Context
|
|
|
|
|
|
healthCancel context.CancelFunc
|
|
|
|
|
|
connected bool
|
|
|
|
|
|
reconnectChan chan struct{}
|
|
|
|
|
|
|
|
|
|
|
|
// 连接状态变化监听器
|
2026-01-29 13:55:32 +08:00
|
|
|
|
connStateListeners []connStateListener
|
2026-01-13 14:15:45 +08:00
|
|
|
|
connListenersMu sync.RWMutex
|
|
|
|
|
|
|
|
|
|
|
|
// 监控指标
|
2026-01-29 13:55:32 +08:00
|
|
|
|
metrics metricsCounter
|
2026-01-13 14:15:45 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// Metrics 监控指标
|
2026-01-29 13:55:32 +08:00
|
|
|
|
type metricsCounter struct {
|
2026-01-13 14:15:45 +08:00
|
|
|
|
PublishCount atomic.Int64
|
|
|
|
|
|
PublishError atomic.Int64
|
|
|
|
|
|
SubscribeCount atomic.Int64
|
|
|
|
|
|
RequestCount atomic.Int64
|
|
|
|
|
|
RequestError atomic.Int64
|
|
|
|
|
|
ConsumeCount atomic.Int64
|
|
|
|
|
|
ConsumeError atomic.Int64
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ConnState 连接状态
|
2026-01-29 13:55:32 +08:00
|
|
|
|
type connState int
|
2026-01-13 14:15:45 +08:00
|
|
|
|
|
|
|
|
|
|
const (
|
2026-01-29 13:55:32 +08:00
|
|
|
|
connStateDisconnected connState = iota
|
|
|
|
|
|
connStateConnecting
|
|
|
|
|
|
connStateConnected
|
|
|
|
|
|
connStateReconnecting
|
|
|
|
|
|
connStateClosed
|
2026-01-13 14:15:45 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// ConnStateListener 连接状态监听器
|
2026-01-29 13:55:32 +08:00
|
|
|
|
type connStateListener func(state connState, err error)
|
2026-01-13 14:15:45 +08:00
|
|
|
|
|
|
|
|
|
|
// GetMetrics 获取监控指标
|
2026-01-29 13:55:32 +08:00
|
|
|
|
func getMetrics() metricsCounter {
|
2026-01-13 14:15:45 +08:00
|
|
|
|
return metrics
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-01-29 13:55:32 +08:00
|
|
|
|
// registerConnStateListener 注册连接状态监听器
|
|
|
|
|
|
func registerConnStateListener(listener connStateListener) {
|
2026-01-13 14:15:45 +08:00
|
|
|
|
connListenersMu.Lock()
|
|
|
|
|
|
defer connListenersMu.Unlock()
|
|
|
|
|
|
connStateListeners = append(connStateListeners, listener)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-01-29 13:55:32 +08:00
|
|
|
|
// unregisterConnStateListener 取消注册连接状态监听器
|
|
|
|
|
|
func unregisterConnStateListener(listener connStateListener) {
|
2026-01-13 14:15:45 +08:00
|
|
|
|
connListenersMu.Lock()
|
|
|
|
|
|
defer connListenersMu.Unlock()
|
|
|
|
|
|
for i, l := range connStateListeners {
|
|
|
|
|
|
if l != nil && &l == &listener {
|
|
|
|
|
|
connStateListeners = append(connStateListeners[:i], connStateListeners[i+1:]...)
|
|
|
|
|
|
break
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// notifyConnState 通知所有监听器连接状态变化
|
2026-01-29 13:55:32 +08:00
|
|
|
|
func notifyConnState(state connState, err error) {
|
2026-01-13 14:15:45 +08:00
|
|
|
|
connListenersMu.RLock()
|
2026-01-29 13:55:32 +08:00
|
|
|
|
listeners := make([]connStateListener, len(connStateListeners))
|
2026-01-13 14:15:45 +08:00
|
|
|
|
copy(listeners, connStateListeners)
|
|
|
|
|
|
connListenersMu.RUnlock()
|
|
|
|
|
|
|
|
|
|
|
|
for _, listener := range listeners {
|
|
|
|
|
|
if listener != nil {
|
|
|
|
|
|
listener(state, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// init 初始化 NATS 连接
|
|
|
|
|
|
func init() {
|
|
|
|
|
|
// 从配置文件读取 NATS 地址
|
|
|
|
|
|
natsURL = g.Cfg().MustGet(context.Background(), "nats.url").String()
|
|
|
|
|
|
if natsURL == "" {
|
|
|
|
|
|
// 默认使用本地地址
|
|
|
|
|
|
natsURL = nats.DefaultURL
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 创建健康检查上下文
|
|
|
|
|
|
healthCtx, healthCancel = context.WithCancel(context.Background())
|
|
|
|
|
|
|
|
|
|
|
|
// 创建重连通知通道(增大缓冲区避免丢失通知)
|
|
|
|
|
|
reconnectChan = make(chan struct{}, 10)
|
|
|
|
|
|
|
|
|
|
|
|
// 启动连接
|
|
|
|
|
|
go initConnection()
|
|
|
|
|
|
|
|
|
|
|
|
// 启动健康检查协程
|
|
|
|
|
|
go healthCheck()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// initConnection 初始化连接
|
|
|
|
|
|
func initConnection() {
|
|
|
|
|
|
ctx := context.Background()
|
2026-01-29 13:55:32 +08:00
|
|
|
|
notifyConnState(connStateConnecting, nil)
|
2026-01-13 14:15:45 +08:00
|
|
|
|
if err := connect(ctx); err != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "NATS 初始连接失败: %v", err)
|
2026-01-29 13:55:32 +08:00
|
|
|
|
notifyConnState(connStateDisconnected, err)
|
2026-01-13 14:15:45 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// connect 建立 NATS 连接
|
|
|
|
|
|
func connect(ctx context.Context) error {
|
2026-01-29 13:55:32 +08:00
|
|
|
|
natsMu.Lock()
|
|
|
|
|
|
defer natsMu.Unlock()
|
2026-01-13 14:15:45 +08:00
|
|
|
|
|
|
|
|
|
|
if nc != nil && !nc.IsClosed() {
|
|
|
|
|
|
nc.Close()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 连接选项配置
|
|
|
|
|
|
opts := []nats.Option{
|
|
|
|
|
|
nats.Name("goframe-nats-client"),
|
|
|
|
|
|
nats.ReconnectWait(2 * time.Second),
|
|
|
|
|
|
nats.MaxReconnects(-1), // 无限重连
|
|
|
|
|
|
nats.PingInterval(10 * time.Second),
|
|
|
|
|
|
nats.MaxPingsOutstanding(5),
|
|
|
|
|
|
nats.ReconnectHandler(func(nc *nats.Conn) {
|
|
|
|
|
|
g.Log().Infof(ctx, "✅ NATS 重连成功: %s", nc.ConnectedUrl())
|
|
|
|
|
|
connected = true
|
|
|
|
|
|
|
|
|
|
|
|
// 重新创建 JetStream 实例
|
|
|
|
|
|
if newJS, err := jetstream.New(nc); err == nil {
|
|
|
|
|
|
js = newJS
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 通知重连成功
|
2026-01-29 13:55:32 +08:00
|
|
|
|
notifyConnState(connStateConnected, nil)
|
2026-01-13 14:15:45 +08:00
|
|
|
|
|
|
|
|
|
|
// 使用非阻塞发送避免阻塞
|
|
|
|
|
|
select {
|
|
|
|
|
|
case reconnectChan <- struct{}{}:
|
|
|
|
|
|
default:
|
|
|
|
|
|
// 通道已满,丢弃通知
|
|
|
|
|
|
}
|
|
|
|
|
|
}),
|
|
|
|
|
|
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
|
|
|
|
|
|
g.Log().Warningf(ctx, "⚠️ NATS 连接断开: %v, 准备重连...", err)
|
|
|
|
|
|
connected = false
|
2026-01-29 13:55:32 +08:00
|
|
|
|
notifyConnState(connStateReconnecting, err)
|
2026-01-13 14:15:45 +08:00
|
|
|
|
}),
|
|
|
|
|
|
nats.ClosedHandler(func(nc *nats.Conn) {
|
|
|
|
|
|
g.Log().Infof(ctx, "NATS 连接已关闭: %s", nc.ConnectedUrl())
|
|
|
|
|
|
connected = false
|
2026-01-29 13:55:32 +08:00
|
|
|
|
notifyConnState(connStateClosed, nil)
|
2026-01-13 14:15:45 +08:00
|
|
|
|
}),
|
|
|
|
|
|
nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
|
|
|
|
|
|
g.Log().Errorf(ctx, "NATS 错误: %v", err)
|
|
|
|
|
|
}),
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
var err error
|
|
|
|
|
|
nc, err = nats.Connect(natsURL, opts...)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return fmt.Errorf("NATS 连接失败: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 等待连接就绪
|
|
|
|
|
|
if nc.Status() != nats.CONNECTED {
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-time.After(5 * time.Second):
|
2026-01-29 13:55:32 +08:00
|
|
|
|
notifyConnState(connStateDisconnected, fmt.Errorf("连接超时"))
|
2026-01-13 14:15:45 +08:00
|
|
|
|
return fmt.Errorf("NATS 连接超时")
|
|
|
|
|
|
case <-nc.StatusChanged(nats.CONNECTED):
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 创建 JetStream 实例
|
|
|
|
|
|
js, err = jetstream.New(nc)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return fmt.Errorf("创建 JetStream 失败: %w", err)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
connected = true
|
|
|
|
|
|
inited = true
|
|
|
|
|
|
g.Log().Infof(ctx, "✅ NATS 连接成功: %s", nc.ConnectedUrl())
|
2026-01-29 13:55:32 +08:00
|
|
|
|
notifyConnState(connStateConnected, nil)
|
2026-01-13 14:15:45 +08:00
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// healthCheck 健康检查协程(仅作为备用检查)
|
|
|
|
|
|
func healthCheck() {
|
|
|
|
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
|
|
|
|
defer ticker.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
|
select {
|
|
|
|
|
|
case <-healthCtx.Done():
|
|
|
|
|
|
return
|
|
|
|
|
|
case <-ticker.C:
|
2026-01-29 13:55:32 +08:00
|
|
|
|
natsMu.RLock()
|
2026-01-13 14:15:45 +08:00
|
|
|
|
currentConnected := connected
|
|
|
|
|
|
currentConn := nc
|
2026-01-29 13:55:32 +08:00
|
|
|
|
natsMu.RUnlock()
|
2026-01-13 14:15:45 +08:00
|
|
|
|
|
|
|
|
|
|
if !currentConnected || currentConn == nil || currentConn.IsClosed() {
|
|
|
|
|
|
// 仅记录日志,不尝试重连(NATS 已有自动重连机制)
|
|
|
|
|
|
g.Log().Warning(context.Background(), "NATS 连接断开,等待 NATS 自动重连...")
|
|
|
|
|
|
}
|
|
|
|
|
|
case <-reconnectChan:
|
|
|
|
|
|
// 重连成功的通知(仅记录日志)
|
|
|
|
|
|
g.Log().Info(context.Background(), "收到重连成功通知")
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// checkConnected 检查连接状态
|
|
|
|
|
|
func checkConnected() bool {
|
2026-01-29 13:55:32 +08:00
|
|
|
|
natsMu.RLock()
|
|
|
|
|
|
defer natsMu.RUnlock()
|
2026-01-13 14:15:45 +08:00
|
|
|
|
return connected && nc != nil && !nc.IsClosed()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2026-01-29 13:55:32 +08:00
|
|
|
|
// getConnState 获取当前连接状态
|
|
|
|
|
|
func getConnState() connState {
|
|
|
|
|
|
natsMu.RLock()
|
|
|
|
|
|
defer natsMu.RUnlock()
|
2026-01-13 14:15:45 +08:00
|
|
|
|
|
|
|
|
|
|
if nc == nil {
|
2026-01-29 13:55:32 +08:00
|
|
|
|
return connStateDisconnected
|
2026-01-13 14:15:45 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if nc.IsClosed() {
|
2026-01-29 13:55:32 +08:00
|
|
|
|
return connStateClosed
|
2026-01-13 14:15:45 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if connected {
|
2026-01-29 13:55:32 +08:00
|
|
|
|
return connStateConnected
|
2026-01-13 14:15:45 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-01-29 13:55:32 +08:00
|
|
|
|
return connStateDisconnected
|
2026-01-13 14:15:45 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2026-01-29 13:55:32 +08:00
|
|
|
|
// shutdown 优雅关闭:自动注销所有已注册的服务并关闭 NATS 连接
|
|
|
|
|
|
func shutdown() error {
|
2026-01-13 14:15:45 +08:00
|
|
|
|
ctx := context.Background()
|
|
|
|
|
|
g.Log().Info(ctx, "开始优雅关闭 NATS RPC 服务...")
|
|
|
|
|
|
|
|
|
|
|
|
// 注销所有单实例服务
|
|
|
|
|
|
rpcServicesMu.Lock()
|
|
|
|
|
|
singleServiceCount := len(rpcServices)
|
|
|
|
|
|
for serviceName := range rpcServices {
|
|
|
|
|
|
if sub, exists := rpcSubs[serviceName]; exists {
|
|
|
|
|
|
if err := sub.Unsubscribe(); err != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "注销服务 %s 失败: %v", serviceName, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
delete(rpcSubs, serviceName)
|
|
|
|
|
|
delete(rpcServices, serviceName)
|
|
|
|
|
|
}
|
|
|
|
|
|
rpcServicesMu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
// 注销所有队列服务
|
|
|
|
|
|
queueRPCMu.Lock()
|
|
|
|
|
|
queueServiceCount := 0
|
|
|
|
|
|
for queueName, servicesMap := range queueRPCServices {
|
|
|
|
|
|
queueServiceCount += len(servicesMap)
|
|
|
|
|
|
for serviceName, sub := range queueRPCSubs[queueName] {
|
|
|
|
|
|
if err := sub.Unsubscribe(); err != nil {
|
|
|
|
|
|
g.Log().Errorf(ctx, "注销队列服务 %s (队列: %s) 失败: %v", serviceName, queueName, err)
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
delete(queueRPCSubs, queueName)
|
|
|
|
|
|
delete(queueRPCServices, queueName)
|
|
|
|
|
|
}
|
|
|
|
|
|
queueRPCMu.Unlock()
|
|
|
|
|
|
|
|
|
|
|
|
g.Log().Infof(ctx, "已注销 %d 个单实例服务和 %d 个队列服务", singleServiceCount, queueServiceCount)
|
|
|
|
|
|
|
2026-01-29 13:55:32 +08:00
|
|
|
|
natsMu.Lock()
|
|
|
|
|
|
defer natsMu.Unlock()
|
2026-01-13 14:15:45 +08:00
|
|
|
|
|
|
|
|
|
|
// 停止健康检查协程
|
|
|
|
|
|
if healthCancel != nil {
|
|
|
|
|
|
healthCancel()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 关闭连接
|
|
|
|
|
|
if nc != nil && !nc.IsClosed() {
|
|
|
|
|
|
nc.Close()
|
|
|
|
|
|
connected = false
|
|
|
|
|
|
inited = false
|
|
|
|
|
|
}
|
|
|
|
|
|
g.Log().Info(ctx, "NATS RPC 服务已优雅关闭")
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|