package message import ( "context" "encoding/json" "fmt" "github.com/gogf/gf/v2/frame/g" "github.com/nats-io/nats.go" "time" ) type NatsPublishMsgConfig struct { QueueName string Durable bool Data any } type NatsPublishDelayMsgConfig struct { QueueName string Durable bool DelayTime int Data any } type NatsSubscribeMsgConfig struct { QueueName string ConsumerName string Durable bool DelayTime int AutoAck bool PrefetchCount int HandleFunc func(ctx context.Context, message map[string]interface{}) error } func (*NatsPublishMsgConfig) GetPublishMsgType() { } func (*NatsPublishDelayMsgConfig) GetPublishDelayMsgType() { } func (*NatsSubscribeMsgConfig) GetSubscribeMsgType() { } type natsMsg struct { name string // 数据源名称 } func init() { // 注册 Nats 插件(默认数据源) RegisterPlugin(context.Background(), "default", MessageNATS, func() messageUtil { return &natsMsg{name: "default"} }) } // Connect 连接 NATS func (c *natsMsg) Connect(ctx context.Context) error { return natsConnect(ctx, c.name) } // Ping 检测 NATS 连接状态 func (c *natsMsg) Ping(ctx context.Context) bool { return natsPing(ctx, c.name) } // Close 关闭 NATS 连接 func (c *natsMsg) Close(ctx context.Context) error { return natsClose(ctx, c.name) } // Publish 发布消息 func (c *natsMsg) Publish(ctx context.Context, msgConfig messagePublishConfig) error { cfg, ok := msgConfig.(*NatsPublishMsgConfig) if !ok { return fmt.Errorf("无效的 NATS 配置类型") } if g.IsEmpty(cfg.QueueName) { return fmt.Errorf("必须提供队列名称") } if g.IsEmpty(cfg.Data) { return fmt.Errorf("必须提供数据") } return c.createPublish(ctx, cfg.QueueName, cfg.Durable, 0, cfg.Data) } // PublishDelay 发布延迟消息 func (c *natsMsg) PublishDelay(ctx context.Context, msgConfig messagePublishDelayConfig) error { cfg, ok := msgConfig.(*NatsPublishDelayMsgConfig) if !ok { return fmt.Errorf("无效的 NATS 配置类型") } if g.IsEmpty(cfg.QueueName) { return fmt.Errorf("必须提供队列名称") } if g.IsEmpty(cfg.DelayTime) { return fmt.Errorf("延迟时间必须大于 0") } if g.IsEmpty(cfg.Data) { return fmt.Errorf("必须提供数据") } return c.createPublish(ctx, cfg.QueueName, cfg.Durable, cfg.DelayTime, cfg.Data) } // Publish 发布消息 func (c *natsMsg) createPublish(ctx context.Context, subject string, durable bool, delayTime int, data any) error { delayMsg := delayTime > 0 if err := c.createStream(ctx, subject, durable, delayMsg); err != nil { return err } payload, err := json.Marshal(data) if err != nil { return fmt.Errorf("序列化数据失败: %w", err) } m := nats.NewMsg(subject) m.Data = payload // 所有消息都需要设置数据 if delayMsg { // 使用 @at 指定具体延迟时间,而不是 @every 重复执行 futureTime := time.Now().Add(time.Duration(delayTime) * time.Second).Format(time.RFC3339Nano) m.Header.Set("Nats-Schedule", fmt.Sprintf("@at %s", futureTime)) m.Subject = subject + ".schedule" m.Header.Set("Nats-Schedule-Target", subject) g.Log().Infof(ctx, "📅 NATS 延迟消息配置: DelayTime=%ds, Schedule=@at %s, Header=%s", delayTime, futureTime, m.Header) } // 发布消息到 JetStream js := getNatsJS(c.name) if js == nil { g.Log().Errorf(ctx, "❌ NATS [%s] JetStream 不存在", c.name) return fmt.Errorf("NATS JetStream 不存在") } ack, err := js.PublishMsg(m) if err != nil { g.Log().Errorf(ctx, "❌ NATS 发布消息失败: err=%v, Subject=%s", err, m.Subject) return err } g.Log().Infof(ctx, "✅ NATS 发布消息成功: Stream=%v, StreamSeq=%d", ack.Stream, ack.Sequence) return nil } // Subscribe 订阅消息 func (c *natsMsg) Subscribe(ctx context.Context, msgConfig messageSubscribeConfig) error { cfg, ok := msgConfig.(*NatsSubscribeMsgConfig) if !ok { return fmt.Errorf("无效的 NATS 配置类型") } if g.IsEmpty(cfg.QueueName) { return fmt.Errorf("必须提供队列名称") } if g.IsEmpty(cfg.ConsumerName) { return fmt.Errorf("必须提供消费者名称") } if g.IsEmpty(cfg.HandleFunc) { return fmt.Errorf("必须提供处理函数") } if g.IsEmpty(cfg.PrefetchCount) { cfg.PrefetchCount = 1 } return c.createSubscribe(ctx, cfg.QueueName, cfg.ConsumerName, cfg.PrefetchCount, cfg.DelayTime, cfg.AutoAck, cfg.Durable, cfg.HandleFunc) } // createSubscribe 内部订阅消息 func (c *natsMsg) createSubscribe(ctx context.Context, subject, consumerName string, prefetchCount, delayTime int, autoAck, durable bool, handler func(ctx context.Context, message map[string]any) error) error { g.Log().Infof(ctx, "🔔 NATS 开始订阅: QueueName=%s, ConsumerName=%s", subject, consumerName) // 创建推送订阅的回调函数 msgHandler := func(msg *nats.Msg) { var data map[string]any if err := json.Unmarshal(msg.Data, &data); err != nil { g.Log().Errorf(ctx, "❌ 解析消息失败: %v", err) return } g.Log().Infof(ctx, "📨 收到消息: Subject=%s, Data=%v", msg.Subject, data) // 处理业务逻辑 if err := handler(ctx, data); err != nil { g.Log().Errorf(ctx, "❌ 处理消息失败: %v", err) if !autoAck { if err := msg.Nak(); err != nil { g.Log().Errorf(ctx, "❌ Nak 失败: %v", err) return } return } } else { g.Log().Infof(ctx, "✅ 处理消息成功") } if err := msg.Ack(); err != nil { g.Log().Errorf(ctx, "❌ Ack 失败: %v", err) } } delayMsg := delayTime > 0 // 创建流 if err := c.createStream(ctx, subject, durable, delayMsg); err != nil { return err } // 获取 JetStream 上下文 js := getNatsJS(c.name) if js == nil { g.Log().Errorf(ctx, "❌ NATS [%s] JetStream 不存在", c.name) return fmt.Errorf("NATS JetStream 不存在") } // 创建推送订阅 var sub *nats.Subscription var err error // 配置订阅选项 - 使用 DeliverSubject 创建 Push Consumer subOpts := []nats.SubOpt{ nats.Durable(consumerName), nats.MaxAckPending(prefetchCount), nats.DeliverSubject(consumerName), } if !autoAck { subOpts = append(subOpts, nats.ManualAck()) } // 使用 Subscribe 创建推送订阅 sub, err = js.Subscribe(subject, msgHandler, subOpts...) if err != nil { g.Log().Errorf(ctx, "创建推送订阅失败: %v", err) return err } g.Log().Infof(ctx, "✅ NATS 推送订阅成功: Consumer=%s", consumerName) // 启动后台 goroutine 监听上下文取消,用于清理订阅 go func() { <-ctx.Done() g.Log().Infof(ctx, "订阅上下文取消,取消订阅") if err := sub.Unsubscribe(); err != nil { return } }() return nil } // createStream 内部创建消费组 func (c *natsMsg) createStream(ctx context.Context, subject string, durable, delayMsg bool) error { streamName, storage := getStreamInfo(durable, delayMsg) // 构建流配置 // 如果是延迟消息,需要包含两个 subjects: // 1. subject.schedule - 用于发送调度消息 // 2. subject - 用于实际投递目标 subjects := []string{subject} if delayMsg { subjects = []string{subject, subject + ".schedule"} } jsConfig := &StreamConfig{ Name: streamName, Subjects: subjects, AllowMsgSchedules: delayMsg, // 延迟消息核心开关 Storage: storage, Discard: DiscardNew, // 达到上限删除旧消息 } nc := getNatsConn(c.name) if !c.Ping(ctx) { // 使用统一的重连函数 if err := commonConnect(ctx, MessageNATS, c.name, func(ctx context.Context) error { return c.Connect(ctx) }, func(ctx context.Context) error { return c.Close(ctx) }); err != nil { g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageNATS, c.name, err) return err } } if nc == nil { g.Log().Errorf(ctx, "❌ NATS [%s] 连接不存在", c.name) return fmt.Errorf("NATS 连接不存在") } err := jsStreamCreate(nc, jsConfig) if err != nil { g.Log().Errorf(ctx, "❌ 创建 Stream 失败: err=%v", err) return err } g.Log().Infof(ctx, "✅ 创建 Stream 成功: stream=%s, subjects=%v, allowSchedules=%v", streamName, subjects, delayMsg) return nil } func getStreamInfo(durable, delayMsg bool) (string, StorageType) { // Stream 不存在,创建新的 streamName := "ordinary_msg_memory" storage := MemoryStorage // 延迟消息必须使用 FileStorage(NATS 官方要求) if delayMsg { if durable { streamName = "delay_msg_file" storage = FileStorage } else { streamName = "delay_msg_memory" storage = MemoryStorage } } else { if durable { streamName = "ordinary_msg_file" storage = FileStorage } } return streamName, storage } const ( // JSApiStreamCreateT is the endpoint to create new streams. // Will return JSON response. JSApiStreamCreateT = "$JS.API.STREAM.CREATE.%s" // JSApiStreamUpdateT is the endpoint to update existing streams. // Will return JSON response. JSApiStreamUpdateT = "$JS.API.STREAM.UPDATE.%s" ) // jsStreamCreate is for sending a stream create for fields that nats.go does not know about yet. func jsStreamCreate(nc *nats.Conn, cfg *StreamConfig) error { j, err := json.Marshal(cfg) if err != nil { return err } msg, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), j, time.Second*3) if err != nil { return err } // 检查 API 响应中的错误 var resp struct { Error *struct { Code int `json:"code"` ErrCode int `json:"err_code"` Description string `json:"description"` } `json:"error,omitempty"` } if err := json.Unmarshal(msg.Data, &resp); err != nil { return err } if resp.Error != nil { // 如果 Stream 已存在,尝试更新 if resp.Error.ErrCode == 10058 { // JSStreamNameExistErr return jsStreamUpdate(nc, cfg) } return fmt.Errorf("JS API error: %s", resp.Error.Description) } return nil } // jsStreamUpdate is for sending a stream create for fields that nats.go does not know about yet. func jsStreamUpdate(nc *nats.Conn, cfg *StreamConfig) error { j, err := json.Marshal(cfg) if err != nil { return err } msg, err := nc.Request(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name), j, time.Second*3) if err != nil { return err } // 检查 API 响应中的错误 var resp struct { Error *struct { Code int `json:"code"` ErrCode int `json:"err_code"` Description string `json:"description"` } `json:"error,omitempty"` } if err := json.Unmarshal(msg.Data, &resp); err != nil { return err } if resp.Error != nil { return fmt.Errorf("JS API error: %s", resp.Error.Description) } return nil }