nats连接与执行方法分离

This commit is contained in:
2026-01-13 14:15:45 +08:00
parent 4d6aa1f384
commit 86cb177625
4 changed files with 329 additions and 317 deletions

318
nats/connection.go Normal file
View File

@@ -0,0 +1,318 @@
package nats
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/gogf/gf/v2/frame/g"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
var (
nc *nats.Conn
js jetstream.JetStream
inited bool
mu sync.RWMutex
natsURL string
healthCtx context.Context
healthCancel context.CancelFunc
connected bool
reconnectChan chan struct{}
// 连接状态变化监听器
connStateListeners []ConnStateListener
connListenersMu sync.RWMutex
// 监控指标
metrics Metrics
)
// Metrics 监控指标
type Metrics struct {
PublishCount atomic.Int64
PublishError atomic.Int64
SubscribeCount atomic.Int64
RequestCount atomic.Int64
RequestError atomic.Int64
ConsumeCount atomic.Int64
ConsumeError atomic.Int64
}
// ConnState 连接状态
type ConnState int
const (
ConnStateDisconnected ConnState = iota
ConnStateConnecting
ConnStateConnected
ConnStateReconnecting
ConnStateClosed
)
// ConnStateListener 连接状态监听器
type ConnStateListener func(state ConnState, err error)
// GetMetrics 获取监控指标
func GetMetrics() Metrics {
return metrics
}
// RegisterConnStateListener 注册连接状态监听器
func RegisterConnStateListener(listener ConnStateListener) {
connListenersMu.Lock()
defer connListenersMu.Unlock()
connStateListeners = append(connStateListeners, listener)
}
// UnregisterConnStateListener 取消注册连接状态监听器
func UnregisterConnStateListener(listener ConnStateListener) {
connListenersMu.Lock()
defer connListenersMu.Unlock()
for i, l := range connStateListeners {
if l != nil && &l == &listener {
connStateListeners = append(connStateListeners[:i], connStateListeners[i+1:]...)
break
}
}
}
// notifyConnState 通知所有监听器连接状态变化
func notifyConnState(state ConnState, err error) {
connListenersMu.RLock()
listeners := make([]ConnStateListener, len(connStateListeners))
copy(listeners, connStateListeners)
connListenersMu.RUnlock()
for _, listener := range listeners {
if listener != nil {
listener(state, err)
}
}
}
// init 初始化 NATS 连接
func init() {
// 从配置文件读取 NATS 地址
natsURL = g.Cfg().MustGet(context.Background(), "nats.url").String()
if natsURL == "" {
// 默认使用本地地址
natsURL = nats.DefaultURL
}
// 创建健康检查上下文
healthCtx, healthCancel = context.WithCancel(context.Background())
// 创建重连通知通道(增大缓冲区避免丢失通知)
reconnectChan = make(chan struct{}, 10)
// 启动连接
go initConnection()
// 启动健康检查协程
go healthCheck()
}
// initConnection 初始化连接
func initConnection() {
ctx := context.Background()
notifyConnState(ConnStateConnecting, nil)
if err := connect(ctx); err != nil {
g.Log().Errorf(ctx, "NATS 初始连接失败: %v", err)
notifyConnState(ConnStateDisconnected, err)
}
}
// connect 建立 NATS 连接
func connect(ctx context.Context) error {
mu.Lock()
defer mu.Unlock()
if nc != nil && !nc.IsClosed() {
nc.Close()
}
// 连接选项配置
opts := []nats.Option{
nats.Name("goframe-nats-client"),
nats.ReconnectWait(2 * time.Second),
nats.MaxReconnects(-1), // 无限重连
nats.PingInterval(10 * time.Second),
nats.MaxPingsOutstanding(5),
nats.ReconnectHandler(func(nc *nats.Conn) {
g.Log().Infof(ctx, "✅ NATS 重连成功: %s", nc.ConnectedUrl())
connected = true
// 重新创建 JetStream 实例
if newJS, err := jetstream.New(nc); err == nil {
js = newJS
}
// 通知重连成功
notifyConnState(ConnStateConnected, nil)
// 使用非阻塞发送避免阻塞
select {
case reconnectChan <- struct{}{}:
default:
// 通道已满,丢弃通知
}
}),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
g.Log().Warningf(ctx, "⚠️ NATS 连接断开: %v, 准备重连...", err)
connected = false
notifyConnState(ConnStateReconnecting, err)
}),
nats.ClosedHandler(func(nc *nats.Conn) {
g.Log().Infof(ctx, "NATS 连接已关闭: %s", nc.ConnectedUrl())
connected = false
notifyConnState(ConnStateClosed, nil)
}),
nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
g.Log().Errorf(ctx, "NATS 错误: %v", err)
}),
}
var err error
nc, err = nats.Connect(natsURL, opts...)
if err != nil {
return fmt.Errorf("NATS 连接失败: %w", err)
}
// 等待连接就绪
if nc.Status() != nats.CONNECTED {
select {
case <-time.After(5 * time.Second):
notifyConnState(ConnStateDisconnected, fmt.Errorf("连接超时"))
return fmt.Errorf("NATS 连接超时")
case <-nc.StatusChanged(nats.CONNECTED):
}
}
// 创建 JetStream 实例
js, err = jetstream.New(nc)
if err != nil {
return fmt.Errorf("创建 JetStream 失败: %w", err)
}
connected = true
inited = true
g.Log().Infof(ctx, "✅ NATS 连接成功: %s", nc.ConnectedUrl())
notifyConnState(ConnStateConnected, nil)
return nil
}
// healthCheck 健康检查协程(仅作为备用检查)
func healthCheck() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-healthCtx.Done():
return
case <-ticker.C:
mu.RLock()
currentConnected := connected
currentConn := nc
mu.RUnlock()
if !currentConnected || currentConn == nil || currentConn.IsClosed() {
// 仅记录日志不尝试重连NATS 已有自动重连机制)
g.Log().Warning(context.Background(), "NATS 连接断开,等待 NATS 自动重连...")
}
case <-reconnectChan:
// 重连成功的通知(仅记录日志)
g.Log().Info(context.Background(), "收到重连成功通知")
}
}
}
// checkConnected 检查连接状态
func checkConnected() bool {
mu.RLock()
defer mu.RUnlock()
return connected && nc != nil && !nc.IsClosed()
}
// IsConnected 检查 NATS 是否已连接
func IsConnected() bool {
return checkConnected()
}
// GetConnState 获取当前连接状态
func GetConnState() ConnState {
mu.RLock()
defer mu.RUnlock()
if nc == nil {
return ConnStateDisconnected
}
if nc.IsClosed() {
return ConnStateClosed
}
if connected {
return ConnStateConnected
}
return ConnStateDisconnected
}
// Shutdown 优雅关闭:自动注销所有已注册的服务并关闭 NATS 连接
func Shutdown() error {
ctx := context.Background()
g.Log().Info(ctx, "开始优雅关闭 NATS RPC 服务...")
// 注销所有单实例服务
rpcServicesMu.Lock()
singleServiceCount := len(rpcServices)
for serviceName := range rpcServices {
if sub, exists := rpcSubs[serviceName]; exists {
if err := sub.Unsubscribe(); err != nil {
g.Log().Errorf(ctx, "注销服务 %s 失败: %v", serviceName, err)
}
}
delete(rpcSubs, serviceName)
delete(rpcServices, serviceName)
}
rpcServicesMu.Unlock()
// 注销所有队列服务
queueRPCMu.Lock()
queueServiceCount := 0
for queueName, servicesMap := range queueRPCServices {
queueServiceCount += len(servicesMap)
for serviceName, sub := range queueRPCSubs[queueName] {
if err := sub.Unsubscribe(); err != nil {
g.Log().Errorf(ctx, "注销队列服务 %s (队列: %s) 失败: %v", serviceName, queueName, err)
}
}
delete(queueRPCSubs, queueName)
delete(queueRPCServices, queueName)
}
queueRPCMu.Unlock()
g.Log().Infof(ctx, "已注销 %d 个单实例服务和 %d 个队列服务", singleServiceCount, queueServiceCount)
mu.Lock()
defer mu.Unlock()
// 停止健康检查协程
if healthCancel != nil {
healthCancel()
}
// 关闭连接
if nc != nil && !nc.IsClosed() {
nc.Close()
connected = false
inited = false
}
g.Log().Info(ctx, "NATS RPC 服务已优雅关闭")
return nil
}

View File

@@ -6,7 +6,6 @@ import (
"fmt"
"reflect"
"sync"
"sync/atomic"
"time"
"github.com/gogf/gf/v2/frame/g"
@@ -14,256 +13,19 @@ import (
"github.com/nats-io/nats.go/jetstream"
)
// RPC 服务注册表
var (
nc *nats.Conn
js jetstream.JetStream
inited bool
mu sync.RWMutex
natsURL string
healthCtx context.Context
healthCancel context.CancelFunc
connected bool
reconnectChan chan struct{}
// 连接状态变化监听器
connStateListeners []ConnStateListener
connListenersMu sync.RWMutex
// 监控指标
metrics Metrics
rpcServices map[string]RPCHandler
rpcSubs map[string]*nats.Subscription // 服务名 -> 订阅
rpcServicesMu sync.RWMutex
queueRPCServices map[string]map[string]RPCHandler // queueName -> subject -> handler
queueRPCSubs map[string]map[string]*nats.Subscription // queueName -> serviceName -> 订阅
queueRPCMu sync.RWMutex
)
// Metrics 监控指标
type Metrics struct {
PublishCount atomic.Int64
PublishError atomic.Int64
SubscribeCount atomic.Int64
RequestCount atomic.Int64
RequestError atomic.Int64
ConsumeCount atomic.Int64
ConsumeError atomic.Int64
}
// ConnState 连接状态
type ConnState int
const (
ConnStateDisconnected ConnState = iota
ConnStateConnecting
ConnStateConnected
ConnStateReconnecting
ConnStateClosed
)
// ConnStateListener 连接状态监听器
type ConnStateListener func(state ConnState, err error)
// GetMetrics 获取监控指标
func GetMetrics() Metrics {
return metrics
}
// RegisterConnStateListener 注册连接状态监听器
func RegisterConnStateListener(listener ConnStateListener) {
connListenersMu.Lock()
defer connListenersMu.Unlock()
connStateListeners = append(connStateListeners, listener)
}
// UnregisterConnStateListener 取消注册连接状态监听器
func UnregisterConnStateListener(listener ConnStateListener) {
connListenersMu.Lock()
defer connListenersMu.Unlock()
for i, l := range connStateListeners {
if l != nil && &l == &listener {
connStateListeners = append(connStateListeners[:i], connStateListeners[i+1:]...)
break
}
}
}
// notifyConnState 通知所有监听器连接状态变化
func notifyConnState(state ConnState, err error) {
connListenersMu.RLock()
listeners := make([]ConnStateListener, len(connStateListeners))
copy(listeners, connStateListeners)
connListenersMu.RUnlock()
for _, listener := range listeners {
if listener != nil {
listener(state, err)
}
}
}
// init 初始化 NATS 连接
func init() {
// 从配置文件读取 NATS 地址
natsURL = g.Cfg().MustGet(context.Background(), "nats.url").String()
if natsURL == "" {
// 默认使用本地地址
natsURL = nats.DefaultURL
}
// 创建健康检查上下文
healthCtx, healthCancel = context.WithCancel(context.Background())
// 创建重连通知通道(增大缓冲区避免丢失通知)
reconnectChan = make(chan struct{}, 10)
// 启动连接
go initConnection()
// 启动健康检查协程
go healthCheck()
}
// initConnection 初始化连接
func initConnection() {
ctx := context.Background()
notifyConnState(ConnStateConnecting, nil)
if err := connect(ctx); err != nil {
g.Log().Errorf(ctx, "NATS 初始连接失败: %v", err)
notifyConnState(ConnStateDisconnected, err)
}
}
// connect 建立 NATS 连接
func connect(ctx context.Context) error {
mu.Lock()
defer mu.Unlock()
if nc != nil && !nc.IsClosed() {
nc.Close()
}
// 连接选项配置
opts := []nats.Option{
nats.Name("goframe-nats-client"),
nats.ReconnectWait(2 * time.Second),
nats.MaxReconnects(-1), // 无限重连
nats.PingInterval(10 * time.Second),
nats.MaxPingsOutstanding(5),
nats.ReconnectHandler(func(nc *nats.Conn) {
g.Log().Infof(ctx, "✅ NATS 重连成功: %s", nc.ConnectedUrl())
connected = true
// 重新创建 JetStream 实例
if newJS, err := jetstream.New(nc); err == nil {
js = newJS
}
// 通知重连成功
notifyConnState(ConnStateConnected, nil)
// 使用非阻塞发送避免阻塞
select {
case reconnectChan <- struct{}{}:
default:
// 通道已满,丢弃通知
}
}),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
g.Log().Warningf(ctx, "⚠️ NATS 连接断开: %v, 准备重连...", err)
connected = false
notifyConnState(ConnStateReconnecting, err)
}),
nats.ClosedHandler(func(nc *nats.Conn) {
g.Log().Infof(ctx, "NATS 连接已关闭: %s", nc.ConnectedUrl())
connected = false
notifyConnState(ConnStateClosed, nil)
}),
nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
g.Log().Errorf(ctx, "NATS 错误: %v", err)
}),
}
var err error
nc, err = nats.Connect(natsURL, opts...)
if err != nil {
return fmt.Errorf("NATS 连接失败: %w", err)
}
// 等待连接就绪
if nc.Status() != nats.CONNECTED {
select {
case <-time.After(5 * time.Second):
notifyConnState(ConnStateDisconnected, fmt.Errorf("连接超时"))
return fmt.Errorf("NATS 连接超时")
case <-nc.StatusChanged(nats.CONNECTED):
}
}
// 创建 JetStream 实例
js, err = jetstream.New(nc)
if err != nil {
return fmt.Errorf("创建 JetStream 失败: %w", err)
}
connected = true
inited = true
g.Log().Infof(ctx, "✅ NATS 连接成功: %s", nc.ConnectedUrl())
notifyConnState(ConnStateConnected, nil)
return nil
}
// healthCheck 健康检查协程(仅作为备用检查)
func healthCheck() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-healthCtx.Done():
return
case <-ticker.C:
mu.RLock()
currentConnected := connected
currentConn := nc
mu.RUnlock()
if !currentConnected || currentConn == nil || currentConn.IsClosed() {
// 仅记录日志不尝试重连NATS 已有自动重连机制)
g.Log().Warning(context.Background(), "NATS 连接断开,等待 NATS 自动重连...")
}
case <-reconnectChan:
// 重连成功的通知(仅记录日志)
g.Log().Info(context.Background(), "收到重连成功通知")
}
}
}
// checkConnected 检查连接状态
func checkConnected() bool {
mu.RLock()
defer mu.RUnlock()
return connected && nc != nil && !nc.IsClosed()
}
// IsConnected 检查 NATS 是否已连接
func IsConnected() bool {
return checkConnected()
}
// GetConnState 获取当前连接状态
func GetConnState() ConnState {
mu.RLock()
defer mu.RUnlock()
if nc == nil {
return ConnStateDisconnected
}
if nc.IsClosed() {
return ConnStateClosed
}
if connected {
return ConnStateConnected
}
return ConnStateDisconnected
}
// RPCHandler RPC 处理函数类型
// 实现方只需要关注请求参数和返回值,无需了解底层 NATS 实现
type RPCHandler func(ctx context.Context, req []byte) ([]byte, error)
// CreateTaskStream 创建任务消息队列流
// 存储策略: 文件存储
@@ -521,20 +283,6 @@ func CreateConsumer(ctx context.Context, streamName, consumerName string, config
// 以下方法提供了完全抽象的 RPC 调用接口
// 调用方和响应方完全不需要知道底层使用的是 NATS 的发布订阅模式
// RPC 服务注册表
var (
rpcServices map[string]RPCHandler
rpcSubs map[string]*nats.Subscription // 服务名 -> 订阅
rpcServicesMu sync.RWMutex
queueRPCServices map[string]map[string]RPCHandler // queueName -> subject -> handler
queueRPCSubs map[string]map[string]*nats.Subscription // queueName -> serviceName -> 订阅
queueRPCMu sync.RWMutex
)
// RPCHandler RPC 处理函数类型
// 实现方只需要关注请求参数和返回值,无需了解底层 NATS 实现
type RPCHandler func(ctx context.Context, req []byte) ([]byte, error)
// RegisterRPCService 注册 RPC 服务(单实例)
// serviceName: 服务名称,调用方通过此名称调用服务
// handler: 服务处理函数,接收请求并返回响应
@@ -898,57 +646,3 @@ func AutoRegisterServices(serviceInstances map[string]interface{}, options ...Re
g.Log().Infof(context.Background(), "✅ 共自动注册了 %d 个服务", totalRegistered)
return nil
}
// Shutdown 优雅关闭:自动注销所有已注册的服务并关闭 NATS 连接
func Shutdown() error {
ctx := context.Background()
g.Log().Info(ctx, "开始优雅关闭 NATS RPC 服务...")
// 注销所有单实例服务
rpcServicesMu.Lock()
singleServiceCount := len(rpcServices)
for serviceName := range rpcServices {
if sub, exists := rpcSubs[serviceName]; exists {
if err := sub.Unsubscribe(); err != nil {
g.Log().Errorf(ctx, "注销服务 %s 失败: %v", serviceName, err)
}
}
delete(rpcSubs, serviceName)
delete(rpcServices, serviceName)
}
rpcServicesMu.Unlock()
// 注销所有队列服务
queueRPCMu.Lock()
queueServiceCount := 0
for queueName, servicesMap := range queueRPCServices {
queueServiceCount += len(servicesMap)
for serviceName, sub := range queueRPCSubs[queueName] {
if err := sub.Unsubscribe(); err != nil {
g.Log().Errorf(ctx, "注销队列服务 %s (队列: %s) 失败: %v", serviceName, queueName, err)
}
}
delete(queueRPCSubs, queueName)
delete(queueRPCServices, queueName)
}
queueRPCMu.Unlock()
g.Log().Infof(ctx, "已注销 %d 个单实例服务和 %d 个队列服务", singleServiceCount, queueServiceCount)
mu.Lock()
defer mu.Unlock()
// 停止健康检查协程
if healthCancel != nil {
healthCancel()
}
// 关闭连接
if nc != nil && !nc.IsClosed() {
nc.Close()
connected = false
inited = false
}
g.Log().Info(ctx, "NATS RPC 服务已优雅关闭")
return nil
}

View File

@@ -131,7 +131,7 @@ func TestNatsPublishRequest(t *testing.T) {
}
// RPC 请求
response, err := Request(ctx, "test.request", []byte("request"), 5*time.Second)
response, err := CallRPC(ctx, "test.request", []byte("request"), 5*time.Second)
if err != nil {
t.Logf("RPC 请求失败: %v", err)
} else {