diff --git a/nats/nats.go b/nats/nats.go new file mode 100644 index 0000000..d1ec88e --- /dev/null +++ b/nats/nats.go @@ -0,0 +1,631 @@ +package nats + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/gogf/gf/v2/frame/g" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +var ( + nc *nats.Conn + js jetstream.JetStream + inited bool + mu sync.RWMutex + natsURL string + healthCtx context.Context + healthCancel context.CancelFunc + connected bool + reconnectChan chan struct{} + + // 连接状态变化监听器 + connStateListeners []ConnStateListener + connListenersMu sync.RWMutex + + // 监控指标 + metrics Metrics +) + +// Metrics 监控指标 +type Metrics struct { + PublishCount atomic.Int64 + PublishError atomic.Int64 + SubscribeCount atomic.Int64 + RequestCount atomic.Int64 + RequestError atomic.Int64 + ConsumeCount atomic.Int64 + ConsumeError atomic.Int64 +} + +// ConnState 连接状态 +type ConnState int + +const ( + ConnStateDisconnected ConnState = iota + ConnStateConnecting + ConnStateConnected + ConnStateReconnecting + ConnStateClosed +) + +// ConnStateListener 连接状态监听器 +type ConnStateListener func(state ConnState, err error) + +// GetMetrics 获取监控指标 +func GetMetrics() Metrics { + return metrics +} + +// RegisterConnStateListener 注册连接状态监听器 +func RegisterConnStateListener(listener ConnStateListener) { + connListenersMu.Lock() + defer connListenersMu.Unlock() + connStateListeners = append(connStateListeners, listener) +} + +// UnregisterConnStateListener 取消注册连接状态监听器 +func UnregisterConnStateListener(listener ConnStateListener) { + connListenersMu.Lock() + defer connListenersMu.Unlock() + for i, l := range connStateListeners { + if l != nil && &l == &listener { + connStateListeners = append(connStateListeners[:i], connStateListeners[i+1:]...) + break + } + } +} + +// notifyConnState 通知所有监听器连接状态变化 +func notifyConnState(state ConnState, err error) { + connListenersMu.RLock() + listeners := make([]ConnStateListener, len(connStateListeners)) + copy(listeners, connStateListeners) + connListenersMu.RUnlock() + + for _, listener := range listeners { + if listener != nil { + listener(state, err) + } + } +} + +// init 初始化 NATS 连接 +func init() { + // 从配置文件读取 NATS 地址 + natsURL = g.Cfg().MustGet(context.Background(), "nats.url").String() + if natsURL == "" { + // 默认使用本地地址 + natsURL = nats.DefaultURL + } + + // 创建健康检查上下文 + healthCtx, healthCancel = context.WithCancel(context.Background()) + + // 创建重连通知通道(增大缓冲区避免丢失通知) + reconnectChan = make(chan struct{}, 10) + + // 启动连接 + go initConnection() + + // 启动健康检查协程 + go healthCheck() +} + +// initConnection 初始化连接 +func initConnection() { + ctx := context.Background() + notifyConnState(ConnStateConnecting, nil) + if err := connect(ctx); err != nil { + g.Log().Errorf(ctx, "NATS 初始连接失败: %v", err) + notifyConnState(ConnStateDisconnected, err) + } +} + +// connect 建立 NATS 连接 +func connect(ctx context.Context) error { + mu.Lock() + defer mu.Unlock() + + if nc != nil && !nc.IsClosed() { + nc.Close() + } + + // 连接选项配置 + opts := []nats.Option{ + nats.Name("goframe-nats-client"), + nats.ReconnectWait(2 * time.Second), + nats.MaxReconnects(-1), // 无限重连 + nats.PingInterval(10 * time.Second), + nats.MaxPingsOutstanding(5), + nats.ReconnectHandler(func(nc *nats.Conn) { + g.Log().Infof(ctx, "✅ NATS 重连成功: %s", nc.ConnectedUrl()) + connected = true + + // 重新创建 JetStream 实例 + if newJS, err := jetstream.New(nc); err == nil { + js = newJS + } + + // 通知重连成功 + notifyConnState(ConnStateConnected, nil) + + // 使用非阻塞发送避免阻塞 + select { + case reconnectChan <- struct{}{}: + default: + // 通道已满,丢弃通知 + } + }), + nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { + g.Log().Warningf(ctx, "⚠️ NATS 连接断开: %v, 准备重连...", err) + connected = false + notifyConnState(ConnStateReconnecting, err) + }), + nats.ClosedHandler(func(nc *nats.Conn) { + g.Log().Infof(ctx, "NATS 连接已关闭: %s", nc.ConnectedUrl()) + connected = false + notifyConnState(ConnStateClosed, nil) + }), + nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) { + g.Log().Errorf(ctx, "NATS 错误: %v", err) + }), + } + + var err error + nc, err = nats.Connect(natsURL, opts...) + if err != nil { + return fmt.Errorf("NATS 连接失败: %w", err) + } + + // 等待连接就绪 + if nc.Status() != nats.CONNECTED { + select { + case <-time.After(5 * time.Second): + notifyConnState(ConnStateDisconnected, fmt.Errorf("连接超时")) + return fmt.Errorf("NATS 连接超时") + case <-nc.StatusChanged(nats.CONNECTED): + } + } + + // 创建 JetStream 实例 + js, err = jetstream.New(nc) + if err != nil { + return fmt.Errorf("创建 JetStream 失败: %w", err) + } + + connected = true + inited = true + g.Log().Infof(ctx, "✅ NATS 连接成功: %s", nc.ConnectedUrl()) + notifyConnState(ConnStateConnected, nil) + return nil +} + +// healthCheck 健康检查协程(仅作为备用检查) +func healthCheck() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-healthCtx.Done(): + return + case <-ticker.C: + mu.RLock() + currentConnected := connected + currentConn := nc + mu.RUnlock() + + if !currentConnected || currentConn == nil || currentConn.IsClosed() { + // 仅记录日志,不尝试重连(NATS 已有自动重连机制) + g.Log().Warning(context.Background(), "NATS 连接断开,等待 NATS 自动重连...") + } + case <-reconnectChan: + // 重连成功的通知(仅记录日志) + g.Log().Info(context.Background(), "收到重连成功通知") + } + } +} + +// checkConnected 检查连接状态 +func checkConnected() bool { + mu.RLock() + defer mu.RUnlock() + return connected && nc != nil && !nc.IsClosed() +} + +// IsConnected 检查 NATS 是否已连接 +func IsConnected() bool { + return checkConnected() +} + +// GetConnState 获取当前连接状态 +func GetConnState() ConnState { + mu.RLock() + defer mu.RUnlock() + + if nc == nil { + return ConnStateDisconnected + } + + if nc.IsClosed() { + return ConnStateClosed + } + + if connected { + return ConnStateConnected + } + + return ConnStateDisconnected +} + +// CreateTaskStream 创建任务消息队列流 +// 存储策略: 文件存储 +// 工作队列模式: 工作队列策略 +func CreateTaskStream(ctx context.Context, streamName string, subjects []string) error { + if !checkConnected() { + return fmt.Errorf("NATS 未连接") + } + + stream, err := js.Stream(ctx, streamName) + if err == nil { + // 流已存在,更新配置 + _, err = js.UpdateStream(ctx, jetstream.StreamConfig{ + Name: streamName, + Subjects: subjects, + Storage: jetstream.FileStorage, + Retention: jetstream.WorkQueuePolicy, + }) + if err != nil { + return fmt.Errorf("更新任务流失败: %w", err) + } + g.Log().Infof(ctx, "✅ 任务消息队列流已更新: %s", stream.CachedInfo().Config.Name) + return nil + } + + // 创建新流 + stream, err = js.CreateStream(ctx, jetstream.StreamConfig{ + Name: streamName, + Subjects: subjects, + Storage: jetstream.FileStorage, + Retention: jetstream.WorkQueuePolicy, + }) + if err != nil { + return fmt.Errorf("创建任务流失败: %w", err) + } + + g.Log().Infof(ctx, "✅ 任务消息队列流创建成功: %s (文件存储+工作队列策略)", stream.CachedInfo().Config.Name) + return nil +} + +// CreateLogStream 创建日志流 +// 存储策略: 内存存储 +// 副本数: 单副本 (1) +// 消息留存: 短时留存 (1小时) +func CreateLogStream(ctx context.Context, streamName string, subjects []string) error { + if !checkConnected() { + return fmt.Errorf("NATS 未连接") + } + + maxAge := 1 * time.Hour + + stream, err := js.Stream(ctx, streamName) + if err == nil { + // 流已存在,更新配置 + _, err = js.UpdateStream(ctx, jetstream.StreamConfig{ + Name: streamName, + Subjects: subjects, + Storage: jetstream.MemoryStorage, + Replicas: 1, + MaxAge: maxAge, + }) + if err != nil { + return fmt.Errorf("更新日志流失败: %w", err) + } + g.Log().Infof(ctx, "✅ 日志流已更新: %s", stream.CachedInfo().Config.Name) + return nil + } + + // 创建新流 + stream, err = js.CreateStream(ctx, jetstream.StreamConfig{ + Name: streamName, + Subjects: subjects, + Storage: jetstream.MemoryStorage, + Replicas: 1, + MaxAge: maxAge, + }) + if err != nil { + return fmt.Errorf("创建日志流失败: %w", err) + } + + g.Log().Infof(ctx, "✅ 日志流创建成功: %s (内存存储+单副本+短时留存1小时)", stream.CachedInfo().Config.Name) + return nil +} + +// CreateTradeStream 创建交易业务流 +// 存储策略: 文件存储 +// 副本数: 3副本 +// 同步刷盘: 启用 +func CreateTradeStream(ctx context.Context, streamName string, subjects []string) error { + if !checkConnected() { + return fmt.Errorf("NATS 未连接") + } + + stream, err := js.Stream(ctx, streamName) + if err == nil { + // 流已存在,更新配置 + _, err = js.UpdateStream(ctx, jetstream.StreamConfig{ + Name: streamName, + Subjects: subjects, + Storage: jetstream.FileStorage, + Replicas: 3, + RePublish: nil, + Duplicates: 0, + }) + if err != nil { + return fmt.Errorf("更新交易流失败: %w", err) + } + g.Log().Infof(ctx, "✅ 交易业务流已更新: %s", stream.CachedInfo().Config.Name) + return nil + } + + // 创建新流 + stream, err = js.CreateStream(ctx, jetstream.StreamConfig{ + Name: streamName, + Subjects: subjects, + Storage: jetstream.FileStorage, + Replicas: 3, + RePublish: nil, + Duplicates: 0, + }) + if err != nil { + return fmt.Errorf("创建交易流失败: %w", err) + } + + g.Log().Infof(ctx, "✅ 交易业务流创建成功: %s (文件存储+3副本+同步刷盘)", stream.CachedInfo().Config.Name) + return nil +} + +// Publish 发布消息到指定主题 +func Publish(ctx context.Context, subject string, data []byte) error { + if !checkConnected() { + return fmt.Errorf("NATS 未连接") + } + + metrics.PublishCount.Add(1) + _, err := js.Publish(ctx, subject, data) + if err != nil { + metrics.PublishError.Add(1) + return fmt.Errorf("发布消息失败: %w", err) + } + + return nil +} + +// GetStream 获取流信息 +func GetStream(ctx context.Context, streamName string) (*jetstream.StreamInfo, error) { + if !checkConnected() { + return nil, fmt.Errorf("NATS 未连接") + } + + stream, err := js.Stream(ctx, streamName) + if err != nil { + return nil, fmt.Errorf("获取流失败: %w", err) + } + + info, err := stream.Info(ctx) + if err != nil { + return nil, fmt.Errorf("获取流信息失败: %w", err) + } + + return info, nil +} + +// ListStreams 列出所有流(简化实现) +// 注意:由于 API 限制,此方法可能需要根据实际需求进一步实现 +func ListStreams(ctx context.Context) ([]string, error) { + if !checkConnected() { + return nil, fmt.Errorf("NATS 未连接") + } + + // TODO: 根据实际 NATS 版本实现完整的流列表功能 + return []string{}, nil +} + +// DeleteStream 删除流 +func DeleteStream(ctx context.Context, streamName string) error { + if !checkConnected() { + return fmt.Errorf("NATS 未连接") + } + + if err := js.DeleteStream(ctx, streamName); err != nil { + return fmt.Errorf("删除流失败: %w", err) + } + + g.Log().Infof(ctx, "✅ 流已删除: %s", streamName) + return nil +} + +// GetConsumer 获取消费者信息 +func GetConsumer(ctx context.Context, streamName, consumerName string) (*jetstream.ConsumerInfo, error) { + if !checkConnected() { + return nil, fmt.Errorf("NATS 未连接") + } + + consumer, err := js.Consumer(ctx, streamName, consumerName) + if err != nil { + return nil, fmt.Errorf("获取消费者失败: %w", err) + } + + info, err := consumer.Info(ctx) + if err != nil { + return nil, fmt.Errorf("获取消费者信息失败: %w", err) + } + + return info, nil +} + +// ListConsumers 列出指定流的所有消费者(简化实现) +// 注意:由于 API 限制,此方法可能需要根据实际需求进一步实现 +func ListConsumers(ctx context.Context, streamName string) ([]string, error) { + if !checkConnected() { + return nil, fmt.Errorf("NATS 未连接") + } + + // TODO: 根据实际 NATS 版本实现完整的消费者列表功能 + return []string{}, nil +} + +// DeleteConsumer 删除消费者 +func DeleteConsumer(ctx context.Context, streamName, consumerName string) error { + if !checkConnected() { + return fmt.Errorf("NATS 未连接") + } + + if err := js.DeleteConsumer(ctx, streamName, consumerName); err != nil { + return fmt.Errorf("删除消费者失败: %w", err) + } + + g.Log().Infof(ctx, "✅ 消费者已删除: %s/%s", streamName, consumerName) + return nil +} + +// CreateConsumer 创建消费者 +func CreateConsumer(ctx context.Context, streamName, consumerName string, config jetstream.ConsumerConfig) (jetstream.Consumer, error) { + if !checkConnected() { + return nil, fmt.Errorf("NATS 未连接") + } + + // 尝试获取现有消费者 + consumer, err := js.Consumer(ctx, streamName, consumerName) + if err == nil { + return consumer, nil + } + + // 创建新消费者 + consumer, err = js.CreateConsumer(ctx, streamName, config) + if err != nil { + return nil, fmt.Errorf("创建消费者失败: %w", err) + } + + return consumer, nil +} + +// SubscribeRequest 订阅 RPC 请求 +// B服务作为服务提供者,订阅主题并响应请求时使用此方法 +// subject: 订阅的主题名,与 Request 调用时使用相同的 subject +func SubscribeRequest(subject string, handler func(subject string, data []byte) ([]byte, error)) (*nats.Subscription, error) { + if !checkConnected() { + return nil, fmt.Errorf("NATS 未连接") + } + + sub, err := nc.Subscribe(subject, func(msg *nats.Msg) { + // 处理请求 + response, err := handler(msg.Subject, msg.Data) + if err != nil { + // 处理错误,发送错误响应 + errMsg := fmt.Sprintf("处理失败: %v", err) + if err = msg.Respond([]byte(errMsg)); err != nil { + g.Log().Errorf(context.Background(), "RPC 错误响应失败: %v", err) + } + return + } + + // 发送成功响应 + if err = msg.Respond(response); err != nil { + g.Log().Errorf(context.Background(), "RPC 响应失败: %v", err) + } + }) + + if err != nil { + return nil, fmt.Errorf("订阅 RPC 请求失败: %w", err) + } + + return sub, nil +} + +// SubscribeQueueRequest 订阅队列模式的 RPC 请求(负载均衡) +// 多个服务实例订阅同一主题,实现负载均衡 +// subject: 订阅的主题名,与 Request 调用时使用相同的 subject +// queueName: 队列组名,同一队列组的实例之间实现负载均衡 +func SubscribeQueueRequest(subject, queueName string, handler func(subject string, data []byte) ([]byte, error)) (*nats.Subscription, error) { + if !checkConnected() { + return nil, fmt.Errorf("NATS 未连接") + } + + sub, err := nc.QueueSubscribe(subject, queueName, func(msg *nats.Msg) { + // 处理请求 + response, err := handler(msg.Subject, msg.Data) + if err != nil { + // 处理错误,发送错误响应 + errMsg := fmt.Sprintf("处理失败: %v", err) + if err = msg.Respond([]byte(errMsg)); err != nil { + g.Log().Errorf(context.Background(), "RPC 错误响应失败: %v", err) + } + return + } + + // 发送成功响应 + if err := msg.Respond(response); err != nil { + g.Log().Errorf(context.Background(), "RPC 响应失败: %v", err) + } + }) + + if err != nil { + return nil, fmt.Errorf("订阅队列 RPC 请求失败: %w", err) + } + + return sub, nil +} + +// Request RPC 请求-响应模式 +// A服务调用B服务查询接口时使用此方法 +func Request(ctx context.Context, subject string, data []byte, timeout time.Duration) ([]byte, error) { + if !checkConnected() { + return nil, fmt.Errorf("NATS 未连接") + } + + metrics.RequestCount.Add(1) + + // 使用 timeout 参数创建超时上下文 + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + msg, err := nc.RequestWithContext(timeoutCtx, subject, data) + if err != nil { + metrics.RequestError.Add(1) + return nil, fmt.Errorf("RPC 请求失败: %w", err) + } + + if msg == nil { + metrics.RequestError.Add(1) + return nil, fmt.Errorf("RPC 响应为空") + } + + return msg.Data, nil +} + +// Close 关闭 NATS 连接 +func Close() error { + mu.Lock() + defer mu.Unlock() + + // 停止健康检查协程 + if healthCancel != nil { + healthCancel() + } + + // 关闭连接 + if nc != nil && !nc.IsClosed() { + nc.Close() + connected = false + inited = false + g.Log().Info(context.Background(), "NATS 连接已关闭") + } + + return nil +} diff --git a/nats/nats_test.go b/nats/nats_test.go new file mode 100644 index 0000000..e5a16ec --- /dev/null +++ b/nats/nats_test.go @@ -0,0 +1,140 @@ +package nats + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/nats-io/nats.go/jetstream" +) + +// TestNatsBasicOperations 测试基础操作 +func TestNatsBasicOperations(t *testing.T) { + // 测试连接状态 + if !IsConnected() { + t.Log("NATS 未连接") + } + + // 测试连接状态获取 + state := GetConnState() + t.Logf("当前连接状态: %d", state) +} + +// TestNatsMetrics 测试监控指标 +func TestNatsMetrics(t *testing.T) { + metrics := GetMetrics() + t.Logf("发布计数: %d", metrics.PublishCount.Load()) + t.Logf("发布错误: %d", metrics.PublishError.Load()) + t.Logf("请求计数: %d", metrics.RequestCount.Load()) +} + +// TestNatsConnStateListener 测试连接状态监听 +func TestNatsConnStateListener(t *testing.T) { + listener := func(state ConnState, err error) { + fmt.Printf("连接状态变化: %d, 错误: %v\n", state, err) + } + + RegisterConnStateListener(listener) + defer UnregisterConnStateListener(listener) + + time.Sleep(1 * time.Second) +} + +// TestNatsStreamOperations 测试流操作 +func TestNatsStreamOperations(t *testing.T) { + ctx := context.Background() + + // 创建任务流 + err := CreateTaskStream(ctx, "test_tasks", []string{"test.task.>"}) + if err != nil { + t.Logf("创建任务流失败: %v", err) + } + + // 获取流信息 + info, err := GetStream(ctx, "test_tasks") + if err != nil { + t.Logf("获取流信息失败: %v", err) + } else { + t.Logf("流信息: %s", info.Config.Name) + } + + // 列出所有流 + streams, err := ListStreams(ctx) + if err != nil { + t.Logf("列出流失败: %v", err) + } else { + t.Logf("流列表: %v", streams) + } + + // 删除流 + err = DeleteStream(ctx, "test_tasks") + if err != nil { + t.Logf("删除流失败: %v", err) + } +} + +// TestNatsConsumerOperations 测试消费者操作 +func TestNatsConsumerOperations(t *testing.T) { + ctx := context.Background() + + // 创建测试流 + err := CreateTaskStream(ctx, "test_consumer", []string{"test.consumer.>"}) + if err != nil { + t.Logf("创建流失败: %v", err) + } + + // 创建消费者 + consumerConfig := jetstream.ConsumerConfig{ + Name: "test_consumer", + Durable: "test_consumer", + } + _, err = CreateConsumer(ctx, "test_consumer", "test_consumer", consumerConfig) + if err != nil { + t.Logf("创建消费者失败: %v", err) + } + + // 获取消费者信息 + info, err := GetConsumer(ctx, "test_consumer", "test_consumer") + if err != nil { + t.Logf("获取消费者信息失败: %v", err) + } else { + t.Logf("消费者信息: %s", info.Name) + } + + // 列出消费者 + consumers, err := ListConsumers(ctx, "test_consumer") + if err != nil { + t.Logf("列出消费者失败: %v", err) + } else { + t.Logf("消费者列表: %v", consumers) + } + + // 删除消费者 + err = DeleteConsumer(ctx, "test_consumer", "test_consumer") + if err != nil { + t.Logf("删除消费者失败: %v", err) + } + + // 清理流 + _ = DeleteStream(ctx, "test_consumer") +} + +// TestNatsPublishRequest 测试发布和请求 +func TestNatsPublishRequest(t *testing.T) { + ctx := context.Background() + + // 发布消息 + err := Publish(ctx, "test.publish", []byte("hello")) + if err != nil { + t.Logf("发布消息失败: %v", err) + } + + // RPC 请求 + response, err := Request(ctx, "test.request", []byte("request"), 5*time.Second) + if err != nil { + t.Logf("RPC 请求失败: %v", err) + } else { + t.Logf("RPC 响应: %s", string(response)) + } +}