955 lines
26 KiB
Go
955 lines
26 KiB
Go
package nats
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"reflect"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"github.com/gogf/gf/v2/frame/g"
|
||
"github.com/nats-io/nats.go"
|
||
"github.com/nats-io/nats.go/jetstream"
|
||
)
|
||
|
||
var (
|
||
nc *nats.Conn
|
||
js jetstream.JetStream
|
||
inited bool
|
||
mu sync.RWMutex
|
||
natsURL string
|
||
healthCtx context.Context
|
||
healthCancel context.CancelFunc
|
||
connected bool
|
||
reconnectChan chan struct{}
|
||
|
||
// 连接状态变化监听器
|
||
connStateListeners []ConnStateListener
|
||
connListenersMu sync.RWMutex
|
||
|
||
// 监控指标
|
||
metrics Metrics
|
||
)
|
||
|
||
// Metrics 监控指标
|
||
type Metrics struct {
|
||
PublishCount atomic.Int64
|
||
PublishError atomic.Int64
|
||
SubscribeCount atomic.Int64
|
||
RequestCount atomic.Int64
|
||
RequestError atomic.Int64
|
||
ConsumeCount atomic.Int64
|
||
ConsumeError atomic.Int64
|
||
}
|
||
|
||
// ConnState 连接状态
|
||
type ConnState int
|
||
|
||
const (
|
||
ConnStateDisconnected ConnState = iota
|
||
ConnStateConnecting
|
||
ConnStateConnected
|
||
ConnStateReconnecting
|
||
ConnStateClosed
|
||
)
|
||
|
||
// ConnStateListener 连接状态监听器
|
||
type ConnStateListener func(state ConnState, err error)
|
||
|
||
// GetMetrics 获取监控指标
|
||
func GetMetrics() Metrics {
|
||
return metrics
|
||
}
|
||
|
||
// RegisterConnStateListener 注册连接状态监听器
|
||
func RegisterConnStateListener(listener ConnStateListener) {
|
||
connListenersMu.Lock()
|
||
defer connListenersMu.Unlock()
|
||
connStateListeners = append(connStateListeners, listener)
|
||
}
|
||
|
||
// UnregisterConnStateListener 取消注册连接状态监听器
|
||
func UnregisterConnStateListener(listener ConnStateListener) {
|
||
connListenersMu.Lock()
|
||
defer connListenersMu.Unlock()
|
||
for i, l := range connStateListeners {
|
||
if l != nil && &l == &listener {
|
||
connStateListeners = append(connStateListeners[:i], connStateListeners[i+1:]...)
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
// notifyConnState 通知所有监听器连接状态变化
|
||
func notifyConnState(state ConnState, err error) {
|
||
connListenersMu.RLock()
|
||
listeners := make([]ConnStateListener, len(connStateListeners))
|
||
copy(listeners, connStateListeners)
|
||
connListenersMu.RUnlock()
|
||
|
||
for _, listener := range listeners {
|
||
if listener != nil {
|
||
listener(state, err)
|
||
}
|
||
}
|
||
}
|
||
|
||
// init 初始化 NATS 连接
|
||
func init() {
|
||
// 从配置文件读取 NATS 地址
|
||
natsURL = g.Cfg().MustGet(context.Background(), "nats.url").String()
|
||
if natsURL == "" {
|
||
// 默认使用本地地址
|
||
natsURL = nats.DefaultURL
|
||
}
|
||
|
||
// 创建健康检查上下文
|
||
healthCtx, healthCancel = context.WithCancel(context.Background())
|
||
|
||
// 创建重连通知通道(增大缓冲区避免丢失通知)
|
||
reconnectChan = make(chan struct{}, 10)
|
||
|
||
// 启动连接
|
||
go initConnection()
|
||
|
||
// 启动健康检查协程
|
||
go healthCheck()
|
||
}
|
||
|
||
// initConnection 初始化连接
|
||
func initConnection() {
|
||
ctx := context.Background()
|
||
notifyConnState(ConnStateConnecting, nil)
|
||
if err := connect(ctx); err != nil {
|
||
g.Log().Errorf(ctx, "NATS 初始连接失败: %v", err)
|
||
notifyConnState(ConnStateDisconnected, err)
|
||
}
|
||
}
|
||
|
||
// connect 建立 NATS 连接
|
||
func connect(ctx context.Context) error {
|
||
mu.Lock()
|
||
defer mu.Unlock()
|
||
|
||
if nc != nil && !nc.IsClosed() {
|
||
nc.Close()
|
||
}
|
||
|
||
// 连接选项配置
|
||
opts := []nats.Option{
|
||
nats.Name("goframe-nats-client"),
|
||
nats.ReconnectWait(2 * time.Second),
|
||
nats.MaxReconnects(-1), // 无限重连
|
||
nats.PingInterval(10 * time.Second),
|
||
nats.MaxPingsOutstanding(5),
|
||
nats.ReconnectHandler(func(nc *nats.Conn) {
|
||
g.Log().Infof(ctx, "✅ NATS 重连成功: %s", nc.ConnectedUrl())
|
||
connected = true
|
||
|
||
// 重新创建 JetStream 实例
|
||
if newJS, err := jetstream.New(nc); err == nil {
|
||
js = newJS
|
||
}
|
||
|
||
// 通知重连成功
|
||
notifyConnState(ConnStateConnected, nil)
|
||
|
||
// 使用非阻塞发送避免阻塞
|
||
select {
|
||
case reconnectChan <- struct{}{}:
|
||
default:
|
||
// 通道已满,丢弃通知
|
||
}
|
||
}),
|
||
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
|
||
g.Log().Warningf(ctx, "⚠️ NATS 连接断开: %v, 准备重连...", err)
|
||
connected = false
|
||
notifyConnState(ConnStateReconnecting, err)
|
||
}),
|
||
nats.ClosedHandler(func(nc *nats.Conn) {
|
||
g.Log().Infof(ctx, "NATS 连接已关闭: %s", nc.ConnectedUrl())
|
||
connected = false
|
||
notifyConnState(ConnStateClosed, nil)
|
||
}),
|
||
nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
|
||
g.Log().Errorf(ctx, "NATS 错误: %v", err)
|
||
}),
|
||
}
|
||
|
||
var err error
|
||
nc, err = nats.Connect(natsURL, opts...)
|
||
if err != nil {
|
||
return fmt.Errorf("NATS 连接失败: %w", err)
|
||
}
|
||
|
||
// 等待连接就绪
|
||
if nc.Status() != nats.CONNECTED {
|
||
select {
|
||
case <-time.After(5 * time.Second):
|
||
notifyConnState(ConnStateDisconnected, fmt.Errorf("连接超时"))
|
||
return fmt.Errorf("NATS 连接超时")
|
||
case <-nc.StatusChanged(nats.CONNECTED):
|
||
}
|
||
}
|
||
|
||
// 创建 JetStream 实例
|
||
js, err = jetstream.New(nc)
|
||
if err != nil {
|
||
return fmt.Errorf("创建 JetStream 失败: %w", err)
|
||
}
|
||
|
||
connected = true
|
||
inited = true
|
||
g.Log().Infof(ctx, "✅ NATS 连接成功: %s", nc.ConnectedUrl())
|
||
notifyConnState(ConnStateConnected, nil)
|
||
return nil
|
||
}
|
||
|
||
// healthCheck 健康检查协程(仅作为备用检查)
|
||
func healthCheck() {
|
||
ticker := time.NewTicker(30 * time.Second)
|
||
defer ticker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-healthCtx.Done():
|
||
return
|
||
case <-ticker.C:
|
||
mu.RLock()
|
||
currentConnected := connected
|
||
currentConn := nc
|
||
mu.RUnlock()
|
||
|
||
if !currentConnected || currentConn == nil || currentConn.IsClosed() {
|
||
// 仅记录日志,不尝试重连(NATS 已有自动重连机制)
|
||
g.Log().Warning(context.Background(), "NATS 连接断开,等待 NATS 自动重连...")
|
||
}
|
||
case <-reconnectChan:
|
||
// 重连成功的通知(仅记录日志)
|
||
g.Log().Info(context.Background(), "收到重连成功通知")
|
||
}
|
||
}
|
||
}
|
||
|
||
// checkConnected 检查连接状态
|
||
func checkConnected() bool {
|
||
mu.RLock()
|
||
defer mu.RUnlock()
|
||
return connected && nc != nil && !nc.IsClosed()
|
||
}
|
||
|
||
// IsConnected 检查 NATS 是否已连接
|
||
func IsConnected() bool {
|
||
return checkConnected()
|
||
}
|
||
|
||
// GetConnState 获取当前连接状态
|
||
func GetConnState() ConnState {
|
||
mu.RLock()
|
||
defer mu.RUnlock()
|
||
|
||
if nc == nil {
|
||
return ConnStateDisconnected
|
||
}
|
||
|
||
if nc.IsClosed() {
|
||
return ConnStateClosed
|
||
}
|
||
|
||
if connected {
|
||
return ConnStateConnected
|
||
}
|
||
|
||
return ConnStateDisconnected
|
||
}
|
||
|
||
// CreateTaskStream 创建任务消息队列流
|
||
// 存储策略: 文件存储
|
||
// 工作队列模式: 工作队列策略
|
||
func CreateTaskStream(ctx context.Context, streamName string, subjects []string) error {
|
||
if !checkConnected() {
|
||
return fmt.Errorf("NATS 未连接")
|
||
}
|
||
|
||
stream, err := js.Stream(ctx, streamName)
|
||
if err == nil {
|
||
// 流已存在,更新配置
|
||
_, err = js.UpdateStream(ctx, jetstream.StreamConfig{
|
||
Name: streamName,
|
||
Subjects: subjects,
|
||
Storage: jetstream.FileStorage,
|
||
Retention: jetstream.WorkQueuePolicy,
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("更新任务流失败: %w", err)
|
||
}
|
||
g.Log().Infof(ctx, "✅ 任务消息队列流已更新: %s", stream.CachedInfo().Config.Name)
|
||
return nil
|
||
}
|
||
|
||
// 创建新流
|
||
stream, err = js.CreateStream(ctx, jetstream.StreamConfig{
|
||
Name: streamName,
|
||
Subjects: subjects,
|
||
Storage: jetstream.FileStorage,
|
||
Retention: jetstream.WorkQueuePolicy,
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("创建任务流失败: %w", err)
|
||
}
|
||
|
||
g.Log().Infof(ctx, "✅ 任务消息队列流创建成功: %s (文件存储+工作队列策略)", stream.CachedInfo().Config.Name)
|
||
return nil
|
||
}
|
||
|
||
// CreateLogStream 创建日志流
|
||
// 存储策略: 内存存储
|
||
// 副本数: 单副本 (1)
|
||
// 消息留存: 短时留存 (1小时)
|
||
func CreateLogStream(ctx context.Context, streamName string, subjects []string) error {
|
||
if !checkConnected() {
|
||
return fmt.Errorf("NATS 未连接")
|
||
}
|
||
|
||
maxAge := 1 * time.Hour
|
||
|
||
stream, err := js.Stream(ctx, streamName)
|
||
if err == nil {
|
||
// 流已存在,更新配置
|
||
_, err = js.UpdateStream(ctx, jetstream.StreamConfig{
|
||
Name: streamName,
|
||
Subjects: subjects,
|
||
Storage: jetstream.MemoryStorage,
|
||
Replicas: 1,
|
||
MaxAge: maxAge,
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("更新日志流失败: %w", err)
|
||
}
|
||
g.Log().Infof(ctx, "✅ 日志流已更新: %s", stream.CachedInfo().Config.Name)
|
||
return nil
|
||
}
|
||
|
||
// 创建新流
|
||
stream, err = js.CreateStream(ctx, jetstream.StreamConfig{
|
||
Name: streamName,
|
||
Subjects: subjects,
|
||
Storage: jetstream.MemoryStorage,
|
||
Replicas: 1,
|
||
MaxAge: maxAge,
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("创建日志流失败: %w", err)
|
||
}
|
||
|
||
g.Log().Infof(ctx, "✅ 日志流创建成功: %s (内存存储+单副本+短时留存1小时)", stream.CachedInfo().Config.Name)
|
||
return nil
|
||
}
|
||
|
||
// CreateTradeStream 创建交易业务流
|
||
// 存储策略: 文件存储
|
||
// 副本数: 3副本
|
||
// 同步刷盘: 启用
|
||
func CreateTradeStream(ctx context.Context, streamName string, subjects []string) error {
|
||
if !checkConnected() {
|
||
return fmt.Errorf("NATS 未连接")
|
||
}
|
||
|
||
stream, err := js.Stream(ctx, streamName)
|
||
if err == nil {
|
||
// 流已存在,更新配置
|
||
_, err = js.UpdateStream(ctx, jetstream.StreamConfig{
|
||
Name: streamName,
|
||
Subjects: subjects,
|
||
Storage: jetstream.FileStorage,
|
||
Replicas: 3,
|
||
RePublish: nil,
|
||
Duplicates: 0,
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("更新交易流失败: %w", err)
|
||
}
|
||
g.Log().Infof(ctx, "✅ 交易业务流已更新: %s", stream.CachedInfo().Config.Name)
|
||
return nil
|
||
}
|
||
|
||
// 创建新流
|
||
stream, err = js.CreateStream(ctx, jetstream.StreamConfig{
|
||
Name: streamName,
|
||
Subjects: subjects,
|
||
Storage: jetstream.FileStorage,
|
||
Replicas: 3,
|
||
RePublish: nil,
|
||
Duplicates: 0,
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("创建交易流失败: %w", err)
|
||
}
|
||
|
||
g.Log().Infof(ctx, "✅ 交易业务流创建成功: %s (文件存储+3副本+同步刷盘)", stream.CachedInfo().Config.Name)
|
||
return nil
|
||
}
|
||
|
||
// Publish 发布消息到指定主题
|
||
func Publish(ctx context.Context, subject string, data []byte) error {
|
||
if !checkConnected() {
|
||
return fmt.Errorf("NATS 未连接")
|
||
}
|
||
|
||
metrics.PublishCount.Add(1)
|
||
_, err := js.Publish(ctx, subject, data)
|
||
if err != nil {
|
||
metrics.PublishError.Add(1)
|
||
return fmt.Errorf("发布消息失败: %w", err)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// GetStream 获取流信息
|
||
func GetStream(ctx context.Context, streamName string) (*jetstream.StreamInfo, error) {
|
||
if !checkConnected() {
|
||
return nil, fmt.Errorf("NATS 未连接")
|
||
}
|
||
|
||
stream, err := js.Stream(ctx, streamName)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("获取流失败: %w", err)
|
||
}
|
||
|
||
info, err := stream.Info(ctx)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("获取流信息失败: %w", err)
|
||
}
|
||
|
||
return info, nil
|
||
}
|
||
|
||
// ListStreams 列出所有流(简化实现)
|
||
// 注意:由于 API 限制,此方法可能需要根据实际需求进一步实现
|
||
func ListStreams(ctx context.Context) ([]string, error) {
|
||
if !checkConnected() {
|
||
return nil, fmt.Errorf("NATS 未连接")
|
||
}
|
||
|
||
// TODO: 根据实际 NATS 版本实现完整的流列表功能
|
||
return []string{}, nil
|
||
}
|
||
|
||
// DeleteStream 删除流
|
||
func DeleteStream(ctx context.Context, streamName string) error {
|
||
if !checkConnected() {
|
||
return fmt.Errorf("NATS 未连接")
|
||
}
|
||
|
||
if err := js.DeleteStream(ctx, streamName); err != nil {
|
||
return fmt.Errorf("删除流失败: %w", err)
|
||
}
|
||
|
||
g.Log().Infof(ctx, "✅ 流已删除: %s", streamName)
|
||
return nil
|
||
}
|
||
|
||
// GetConsumer 获取消费者信息
|
||
func GetConsumer(ctx context.Context, streamName, consumerName string) (*jetstream.ConsumerInfo, error) {
|
||
if !checkConnected() {
|
||
return nil, fmt.Errorf("NATS 未连接")
|
||
}
|
||
|
||
consumer, err := js.Consumer(ctx, streamName, consumerName)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("获取消费者失败: %w", err)
|
||
}
|
||
|
||
info, err := consumer.Info(ctx)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("获取消费者信息失败: %w", err)
|
||
}
|
||
|
||
return info, nil
|
||
}
|
||
|
||
// ListConsumers 列出指定流的所有消费者(简化实现)
|
||
// 注意:由于 API 限制,此方法可能需要根据实际需求进一步实现
|
||
func ListConsumers(ctx context.Context, streamName string) ([]string, error) {
|
||
if !checkConnected() {
|
||
return nil, fmt.Errorf("NATS 未连接")
|
||
}
|
||
|
||
// TODO: 根据实际 NATS 版本实现完整的消费者列表功能
|
||
return []string{}, nil
|
||
}
|
||
|
||
// DeleteConsumer 删除消费者
|
||
func DeleteConsumer(ctx context.Context, streamName, consumerName string) error {
|
||
if !checkConnected() {
|
||
return fmt.Errorf("NATS 未连接")
|
||
}
|
||
|
||
if err := js.DeleteConsumer(ctx, streamName, consumerName); err != nil {
|
||
return fmt.Errorf("删除消费者失败: %w", err)
|
||
}
|
||
|
||
g.Log().Infof(ctx, "✅ 消费者已删除: %s/%s", streamName, consumerName)
|
||
return nil
|
||
}
|
||
|
||
// CreateConsumer 创建消费者
|
||
func CreateConsumer(ctx context.Context, streamName, consumerName string, config jetstream.ConsumerConfig) (jetstream.Consumer, error) {
|
||
if !checkConnected() {
|
||
return nil, fmt.Errorf("NATS 未连接")
|
||
}
|
||
|
||
// 尝试获取现有消费者
|
||
consumer, err := js.Consumer(ctx, streamName, consumerName)
|
||
if err == nil {
|
||
return consumer, nil
|
||
}
|
||
|
||
// 创建新消费者
|
||
consumer, err = js.CreateConsumer(ctx, streamName, config)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("创建消费者失败: %w", err)
|
||
}
|
||
|
||
return consumer, nil
|
||
}
|
||
|
||
// ============ RPC 服务封装 ============
|
||
// 以下方法提供了完全抽象的 RPC 调用接口
|
||
// 调用方和响应方完全不需要知道底层使用的是 NATS 的发布订阅模式
|
||
|
||
// RPC 服务注册表
|
||
var (
|
||
rpcServices map[string]RPCHandler
|
||
rpcSubs map[string]*nats.Subscription // 服务名 -> 订阅
|
||
rpcServicesMu sync.RWMutex
|
||
queueRPCServices map[string]map[string]RPCHandler // queueName -> subject -> handler
|
||
queueRPCSubs map[string]map[string]*nats.Subscription // queueName -> serviceName -> 订阅
|
||
queueRPCMu sync.RWMutex
|
||
)
|
||
|
||
// RPCHandler RPC 处理函数类型
|
||
// 实现方只需要关注请求参数和返回值,无需了解底层 NATS 实现
|
||
type RPCHandler func(ctx context.Context, req []byte) ([]byte, error)
|
||
|
||
// RegisterRPCService 注册 RPC 服务(单实例)
|
||
// serviceName: 服务名称,调用方通过此名称调用服务
|
||
// handler: 服务处理函数,接收请求并返回响应
|
||
func RegisterRPCService(serviceName string, handler RPCHandler) error {
|
||
if !checkConnected() {
|
||
return fmt.Errorf("NATS 未连接")
|
||
}
|
||
|
||
rpcServicesMu.Lock()
|
||
if rpcServices == nil {
|
||
rpcServices = make(map[string]RPCHandler)
|
||
}
|
||
if rpcSubs == nil {
|
||
rpcSubs = make(map[string]*nats.Subscription)
|
||
}
|
||
|
||
// 如果已存在该服务,先取消之前的订阅
|
||
if oldSub, exists := rpcSubs[serviceName]; exists {
|
||
oldSub.Unsubscribe()
|
||
}
|
||
|
||
rpcServices[serviceName] = handler
|
||
rpcServicesMu.Unlock()
|
||
|
||
// 订阅服务主题
|
||
subject := fmt.Sprintf("rpc.%s", serviceName)
|
||
sub, err := nc.Subscribe(subject, func(msg *nats.Msg) {
|
||
ctx := context.Background()
|
||
response, err := handler(ctx, msg.Data)
|
||
if err != nil {
|
||
errMsg := fmt.Sprintf("处理失败: %v", err)
|
||
if err = msg.Respond([]byte(errMsg)); err != nil {
|
||
g.Log().Errorf(ctx, "RPC 错误响应失败: %v", err)
|
||
}
|
||
return
|
||
}
|
||
|
||
if err = msg.Respond(response); err != nil {
|
||
g.Log().Errorf(ctx, "RPC 响应失败: %v", err)
|
||
}
|
||
})
|
||
|
||
if err != nil {
|
||
return fmt.Errorf("注册 RPC 服务失败: %w", err)
|
||
}
|
||
|
||
rpcSubs[serviceName] = sub
|
||
metrics.SubscribeCount.Add(1)
|
||
g.Log().Infof(context.Background(), "✅ RPC 服务已注册: %s", serviceName)
|
||
return nil
|
||
}
|
||
|
||
// RegisterQueueRPCService 注册 RPC 服务(集群模式)
|
||
// 多个服务实例注册同一服务时,请求会自动负载均衡
|
||
// serviceName: 服务名称
|
||
// queueName: 队列组名,同一队列组的实例共享请求
|
||
// handler: 服务处理函数
|
||
func RegisterQueueRPCService(serviceName, queueName string, handler RPCHandler) error {
|
||
if !checkConnected() {
|
||
return fmt.Errorf("NATS 未连接")
|
||
}
|
||
|
||
queueRPCMu.Lock()
|
||
if queueRPCServices == nil {
|
||
queueRPCServices = make(map[string]map[string]RPCHandler)
|
||
}
|
||
if queueRPCSubs == nil {
|
||
queueRPCSubs = make(map[string]map[string]*nats.Subscription)
|
||
}
|
||
if queueRPCServices[queueName] == nil {
|
||
queueRPCServices[queueName] = make(map[string]RPCHandler)
|
||
}
|
||
if queueRPCSubs[queueName] == nil {
|
||
queueRPCSubs[queueName] = make(map[string]*nats.Subscription)
|
||
}
|
||
|
||
// 如果已存在该服务,先取消之前的订阅
|
||
if oldSub, exists := queueRPCSubs[queueName][serviceName]; exists {
|
||
oldSub.Unsubscribe()
|
||
}
|
||
|
||
queueRPCServices[queueName][serviceName] = handler
|
||
queueRPCMu.Unlock()
|
||
|
||
// 订阅服务主题(队列模式)
|
||
subject := fmt.Sprintf("rpc.%s", serviceName)
|
||
sub, err := nc.QueueSubscribe(subject, queueName, func(msg *nats.Msg) {
|
||
ctx := context.Background()
|
||
response, err := handler(ctx, msg.Data)
|
||
if err != nil {
|
||
errMsg := fmt.Sprintf("处理失败: %v", err)
|
||
if err = msg.Respond([]byte(errMsg)); err != nil {
|
||
g.Log().Errorf(ctx, "RPC 错误响应失败: %v", err)
|
||
}
|
||
return
|
||
}
|
||
|
||
if err = msg.Respond(response); err != nil {
|
||
g.Log().Errorf(ctx, "RPC 响应失败: %v", err)
|
||
}
|
||
})
|
||
|
||
if err != nil {
|
||
return fmt.Errorf("注册队列 RPC 服务失败: %w", err)
|
||
}
|
||
|
||
queueRPCMu.Lock()
|
||
queueRPCSubs[queueName][serviceName] = sub
|
||
queueRPCMu.Unlock()
|
||
|
||
metrics.SubscribeCount.Add(1)
|
||
g.Log().Infof(context.Background(), "✅ 队列 RPC 服务已注册: %s (队列组: %s)", serviceName, queueName)
|
||
return nil
|
||
}
|
||
|
||
// CallRPC 调用 RPC 服务
|
||
// serviceName: 服务名称
|
||
// req: 请求数据
|
||
// timeout: 超时时间
|
||
// 返回: 响应数据和错误
|
||
func CallRPC(ctx context.Context, serviceName string, req []byte, timeout time.Duration) ([]byte, error) {
|
||
if !checkConnected() {
|
||
return nil, fmt.Errorf("NATS 未连接")
|
||
}
|
||
|
||
metrics.RequestCount.Add(1)
|
||
|
||
// 检查本地是否有注册的单实例服务,如果有则直接调用(优化性能)
|
||
rpcServicesMu.RLock()
|
||
if localHandler, exists := rpcServices[serviceName]; exists {
|
||
rpcServicesMu.RUnlock()
|
||
// 本地直接调用,避免网络开销
|
||
response, err := localHandler(ctx, req)
|
||
if err != nil {
|
||
metrics.RequestError.Add(1)
|
||
return nil, fmt.Errorf("本地调用 RPC 服务失败 [%s]: %w", serviceName, err)
|
||
}
|
||
return response, nil
|
||
}
|
||
rpcServicesMu.RUnlock()
|
||
|
||
// 通过 NATS 网络调用远程服务
|
||
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
|
||
defer cancel()
|
||
|
||
subject := fmt.Sprintf("rpc.%s", serviceName)
|
||
msg, err := nc.RequestWithContext(timeoutCtx, subject, req)
|
||
if err != nil {
|
||
metrics.RequestError.Add(1)
|
||
return nil, fmt.Errorf("调用 RPC 服务失败 [%s]: %w", serviceName, err)
|
||
}
|
||
|
||
if msg == nil {
|
||
metrics.RequestError.Add(1)
|
||
return nil, fmt.Errorf("RPC 响应为空 [%s]", serviceName)
|
||
}
|
||
|
||
return msg.Data, nil
|
||
}
|
||
|
||
// RegisterServiceOption 注册选项类型
|
||
type RegisterServiceOption func(*registerServiceConfig)
|
||
|
||
type registerServiceConfig struct {
|
||
queueName string // 队列组名(用于集群模式)
|
||
excludeMethods []string
|
||
}
|
||
|
||
// WithQueueGroup 设置队列组名(集群模式)
|
||
func WithQueueGroup(queueName string) RegisterServiceOption {
|
||
return func(cfg *registerServiceConfig) {
|
||
cfg.queueName = queueName
|
||
}
|
||
}
|
||
|
||
// WithExcludeMethods 排除不需要注册的方法
|
||
func WithExcludeMethods(methods ...string) RegisterServiceOption {
|
||
return func(cfg *registerServiceConfig) {
|
||
cfg.excludeMethods = append(cfg.excludeMethods, methods...)
|
||
}
|
||
}
|
||
|
||
// registerService 注册单个服务的所有公开方法(内部函数)
|
||
func registerService(service interface{}, serviceNamePrefix string, options ...RegisterServiceOption) error {
|
||
if !checkConnected() {
|
||
return fmt.Errorf("NATS 未连接")
|
||
}
|
||
|
||
// 应用选项
|
||
cfg := ®isterServiceConfig{}
|
||
for _, opt := range options {
|
||
opt(cfg)
|
||
}
|
||
|
||
// 创建排除方法集合
|
||
excludeSet := make(map[string]struct{})
|
||
for _, method := range cfg.excludeMethods {
|
||
excludeSet[method] = struct{}{}
|
||
}
|
||
|
||
// 获取 service 的类型
|
||
serviceType := reflect.TypeOf(service)
|
||
|
||
// 遍历所有方法
|
||
registeredCount := 0
|
||
for i := 0; i < serviceType.NumMethod(); i++ {
|
||
method := serviceType.Method(i)
|
||
|
||
// 只注册导出方法(首字母大写)
|
||
if !method.IsExported() {
|
||
continue
|
||
}
|
||
|
||
// 排除指定的方法
|
||
if _, exists := excludeSet[method.Name]; exists {
|
||
continue
|
||
}
|
||
|
||
// 检查方法签名:必须是 func(ctx context.Context, request) (response, error)
|
||
if method.Type.NumIn() < 2 {
|
||
g.Log().Warningf(context.Background(), "方法 %s 的参数数量不足,跳过注册", method.Name)
|
||
continue
|
||
}
|
||
|
||
// 第一个参数必须是 context.Context
|
||
if !method.Type.In(0).Implements(reflect.TypeOf((*context.Context)(nil)).Elem()) {
|
||
g.Log().Warningf(context.Background(), "方法 %s 的第一个参数必须是 context.Context,跳过注册", method.Name)
|
||
continue
|
||
}
|
||
|
||
// 返回值必须是 (result, error) 或 error
|
||
if method.Type.NumOut() < 1 || method.Type.NumOut() > 2 {
|
||
g.Log().Warningf(context.Background(), "方法 %s 的返回值数量不正确,跳过注册", method.Name)
|
||
continue
|
||
}
|
||
|
||
if !method.Type.Out(method.Type.NumOut() - 1).Implements(reflect.TypeOf((*error)(nil)).Elem()) {
|
||
g.Log().Warningf(context.Background(), "方法 %s 的最后一个返回值必须是 error,跳过注册", method.Name)
|
||
continue
|
||
}
|
||
|
||
// 生成服务名称:前缀.方法名(保持原始方法名)
|
||
serviceName := fmt.Sprintf("%s.%s", serviceNamePrefix, method.Name)
|
||
|
||
// 创建 RPC handler
|
||
handler := func(ctx context.Context, req []byte) ([]byte, error) {
|
||
// 准备方法调用参数
|
||
args := make([]reflect.Value, 2)
|
||
args[0] = reflect.ValueOf(ctx)
|
||
|
||
// 解析请求参数
|
||
if len(req) > 0 {
|
||
// 如果方法有第二个参数,尝试解析 JSON
|
||
if method.Type.NumIn() > 1 {
|
||
reqValuePtr := reflect.New(method.Type.In(1))
|
||
if err := json.Unmarshal(req, reqValuePtr.Interface()); err != nil {
|
||
return nil, fmt.Errorf("解析请求参数失败: %w", err)
|
||
}
|
||
args[1] = reqValuePtr.Elem()
|
||
}
|
||
} else if method.Type.NumIn() > 1 {
|
||
// 如果方法需要参数但请求为空,创建零值
|
||
args[1] = reflect.Zero(method.Type.In(1))
|
||
}
|
||
|
||
// 调用方法
|
||
results := method.Func.Call(args)
|
||
|
||
// 处理返回值
|
||
var err error
|
||
var result interface{}
|
||
|
||
if len(results) == 1 {
|
||
// 只有 error
|
||
if !results[0].IsNil() {
|
||
err = results[0].Interface().(error)
|
||
}
|
||
} else if len(results) == 2 {
|
||
// (result, error)
|
||
result = results[0].Interface()
|
||
if !results[1].IsNil() {
|
||
err = results[1].Interface().(error)
|
||
}
|
||
}
|
||
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 序列化返回值
|
||
if result == nil || (reflect.ValueOf(result).Kind() == reflect.Ptr && reflect.ValueOf(result).IsNil()) {
|
||
return []byte("{}"), nil
|
||
}
|
||
|
||
return json.Marshal(result)
|
||
}
|
||
|
||
// 注册 RPC 服务
|
||
var err error
|
||
if cfg.queueName != "" {
|
||
err = RegisterQueueRPCService(serviceName, cfg.queueName, handler)
|
||
} else {
|
||
err = RegisterRPCService(serviceName, handler)
|
||
}
|
||
|
||
if err != nil {
|
||
g.Log().Errorf(context.Background(), "注册服务 %s 失败: %v", serviceName, err)
|
||
continue
|
||
}
|
||
|
||
registeredCount++
|
||
g.Log().Infof(context.Background(), "✅ 已自动注册 RPC 服务: %s -> %s", serviceName, method.Name)
|
||
}
|
||
|
||
if registeredCount == 0 {
|
||
g.Log().Warningf(context.Background(), "未注册任何方法,请检查 %v 的方法签名", serviceNamePrefix)
|
||
return fmt.Errorf("未找到可注册的方法")
|
||
}
|
||
|
||
g.Log().Infof(context.Background(), "✅ Service %v 共注册了 %d 个 RPC 方法", serviceNamePrefix, registeredCount)
|
||
return nil
|
||
}
|
||
|
||
// AutoRegisterServices 自动注册多个服务的所有公开方法
|
||
// serviceInstances: map[包名]service实例,如 map[string]interface{}{"user": userService, "order": orderService}
|
||
// options: 注册选项(可选)
|
||
// 示例:
|
||
//
|
||
// AutoRegisterServices(map[string]interface{}{
|
||
// "user": userService,
|
||
// "order": orderService,
|
||
// })
|
||
// 或
|
||
// AutoRegisterServices(map[string]interface{}{
|
||
// "order": orderService,
|
||
// }, WithQueueGroup("order-group"))
|
||
func AutoRegisterServices(serviceInstances map[string]interface{}, options ...RegisterServiceOption) error {
|
||
if len(serviceInstances) == 0 {
|
||
return fmt.Errorf("service 实例列表不能为空")
|
||
}
|
||
|
||
totalRegistered := 0
|
||
|
||
// 遍历每个 service 实例
|
||
for pkgName, serviceInstance := range serviceInstances {
|
||
// 注册服务
|
||
err := registerService(serviceInstance, pkgName, options...)
|
||
if err != nil {
|
||
g.Log().Errorf(context.Background(), "注册 %s 服务失败: %v", pkgName, err)
|
||
continue
|
||
}
|
||
|
||
totalRegistered++
|
||
g.Log().Infof(context.Background(), "✅ %s 服务已自动注册", pkgName)
|
||
}
|
||
|
||
if totalRegistered == 0 {
|
||
return fmt.Errorf("未能注册任何服务")
|
||
}
|
||
|
||
g.Log().Infof(context.Background(), "✅ 共自动注册了 %d 个服务", totalRegistered)
|
||
return nil
|
||
}
|
||
|
||
// Shutdown 优雅关闭:自动注销所有已注册的服务并关闭 NATS 连接
|
||
func Shutdown() error {
|
||
ctx := context.Background()
|
||
g.Log().Info(ctx, "开始优雅关闭 NATS RPC 服务...")
|
||
|
||
// 注销所有单实例服务
|
||
rpcServicesMu.Lock()
|
||
singleServiceCount := len(rpcServices)
|
||
for serviceName := range rpcServices {
|
||
if sub, exists := rpcSubs[serviceName]; exists {
|
||
if err := sub.Unsubscribe(); err != nil {
|
||
g.Log().Errorf(ctx, "注销服务 %s 失败: %v", serviceName, err)
|
||
}
|
||
}
|
||
delete(rpcSubs, serviceName)
|
||
delete(rpcServices, serviceName)
|
||
}
|
||
rpcServicesMu.Unlock()
|
||
|
||
// 注销所有队列服务
|
||
queueRPCMu.Lock()
|
||
queueServiceCount := 0
|
||
for queueName, servicesMap := range queueRPCServices {
|
||
queueServiceCount += len(servicesMap)
|
||
for serviceName, sub := range queueRPCSubs[queueName] {
|
||
if err := sub.Unsubscribe(); err != nil {
|
||
g.Log().Errorf(ctx, "注销队列服务 %s (队列: %s) 失败: %v", serviceName, queueName, err)
|
||
}
|
||
}
|
||
delete(queueRPCSubs, queueName)
|
||
delete(queueRPCServices, queueName)
|
||
}
|
||
queueRPCMu.Unlock()
|
||
|
||
g.Log().Infof(ctx, "已注销 %d 个单实例服务和 %d 个队列服务", singleServiceCount, queueServiceCount)
|
||
|
||
mu.Lock()
|
||
defer mu.Unlock()
|
||
|
||
// 停止健康检查协程
|
||
if healthCancel != nil {
|
||
healthCancel()
|
||
}
|
||
|
||
// 关闭连接
|
||
if nc != nil && !nc.IsClosed() {
|
||
nc.Close()
|
||
connected = false
|
||
inited = false
|
||
}
|
||
g.Log().Info(ctx, "NATS RPC 服务已优雅关闭")
|
||
return nil
|
||
}
|