Files
common/message/nats_msg.go
qhd 55a6ec0374 重构消息队列连接管理,支持多数据源配置
主要变更:
1. 重构NATS、RabbitMQ和Redis连接管理模块,支持多数据源配置
2. 统一连接管理接口,增加数据源名称参数
3. 优化连接状态检查和错误处理
4. 增加连接池管理和资源清理机制
5. 改进日志输出格式和内容
2026-03-12 08:51:45 +08:00

374 lines
10 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package message
import (
"context"
"encoding/json"
"fmt"
"github.com/gogf/gf/v2/frame/g"
"github.com/nats-io/nats.go"
"time"
)
type NatsPublishMsgConfig struct {
QueueName string
Durable bool
Data any
}
type NatsPublishDelayMsgConfig struct {
QueueName string
Durable bool
DelayTime int
Data any
}
type NatsSubscribeMsgConfig struct {
QueueName string
ConsumerName string
Durable bool
DelayTime int
AutoAck bool
PrefetchCount int
HandleFunc func(ctx context.Context, message map[string]interface{}) error
}
func (*NatsPublishMsgConfig) GetPublishMsgType() {
}
func (*NatsPublishDelayMsgConfig) GetPublishDelayMsgType() {
}
func (*NatsSubscribeMsgConfig) GetSubscribeMsgType() {
}
type natsMsg struct {
name string // 数据源名称
}
func init() {
// 注册 Nats 插件(默认数据源)
RegisterPlugin(context.Background(), "default", MessageNATS, func() messageUtil {
return &natsMsg{name: "default"}
})
}
// Connect 连接 NATS
func (c *natsMsg) Connect(ctx context.Context) error {
return natsConnect(ctx, c.name)
}
// Ping 检测 NATS 连接状态
func (c *natsMsg) Ping(ctx context.Context) bool {
return natsPing(ctx, c.name)
}
// Close 关闭 NATS 连接
func (c *natsMsg) Close(ctx context.Context) error {
return natsClose(ctx, c.name)
}
// Publish 发布消息
func (c *natsMsg) Publish(ctx context.Context, msgConfig messagePublishConfig) error {
cfg, ok := msgConfig.(*NatsPublishMsgConfig)
if !ok {
return fmt.Errorf("无效的 NATS 配置类型")
}
if g.IsEmpty(cfg.QueueName) {
return fmt.Errorf("必须提供队列名称")
}
if g.IsEmpty(cfg.Data) {
return fmt.Errorf("必须提供数据")
}
return c.createPublish(ctx, cfg.QueueName, cfg.Durable, 0, cfg.Data)
}
// PublishDelay 发布延迟消息
func (c *natsMsg) PublishDelay(ctx context.Context, msgConfig messagePublishDelayConfig) error {
cfg, ok := msgConfig.(*NatsPublishDelayMsgConfig)
if !ok {
return fmt.Errorf("无效的 NATS 配置类型")
}
if g.IsEmpty(cfg.QueueName) {
return fmt.Errorf("必须提供队列名称")
}
if g.IsEmpty(cfg.DelayTime) {
return fmt.Errorf("延迟时间必须大于 0")
}
if g.IsEmpty(cfg.Data) {
return fmt.Errorf("必须提供数据")
}
return c.createPublish(ctx, cfg.QueueName, cfg.Durable, cfg.DelayTime, cfg.Data)
}
// Publish 发布消息
func (c *natsMsg) createPublish(ctx context.Context, subject string, durable bool, delayTime int, data any) error {
delayMsg := delayTime > 0
if err := c.createStream(ctx, subject, durable, delayMsg); err != nil {
return err
}
payload, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("序列化数据失败: %w", err)
}
m := nats.NewMsg(subject)
m.Data = payload // 所有消息都需要设置数据
if delayMsg {
// 使用 @at 指定具体延迟时间,而不是 @every 重复执行
futureTime := time.Now().Add(time.Duration(delayTime) * time.Second).Format(time.RFC3339Nano)
m.Header.Set("Nats-Schedule", fmt.Sprintf("@at %s", futureTime))
m.Subject = subject + ".schedule"
m.Header.Set("Nats-Schedule-Target", subject)
g.Log().Infof(ctx, "📅 NATS 延迟消息配置: DelayTime=%ds, Schedule=@at %s, Header=%s", delayTime, futureTime, m.Header)
}
// 发布消息到 JetStream
js := getNatsJS(c.name)
if js == nil {
g.Log().Errorf(ctx, "❌ NATS [%s] JetStream 不存在", c.name)
return fmt.Errorf("NATS JetStream 不存在")
}
ack, err := js.PublishMsg(m)
if err != nil {
g.Log().Errorf(ctx, "❌ NATS 发布消息失败: err=%v, Subject=%s", err, m.Subject)
return err
}
g.Log().Infof(ctx, "✅ NATS 发布消息成功: Stream=%v, StreamSeq=%d", ack.Stream, ack.Sequence)
return nil
}
// Subscribe 订阅消息
func (c *natsMsg) Subscribe(ctx context.Context, msgConfig messageSubscribeConfig) error {
cfg, ok := msgConfig.(*NatsSubscribeMsgConfig)
if !ok {
return fmt.Errorf("无效的 NATS 配置类型")
}
if g.IsEmpty(cfg.QueueName) {
return fmt.Errorf("必须提供队列名称")
}
if g.IsEmpty(cfg.ConsumerName) {
return fmt.Errorf("必须提供消费者名称")
}
if g.IsEmpty(cfg.HandleFunc) {
return fmt.Errorf("必须提供处理函数")
}
if g.IsEmpty(cfg.PrefetchCount) {
cfg.PrefetchCount = 1
}
return c.createSubscribe(ctx, cfg.QueueName, cfg.ConsumerName, cfg.PrefetchCount, cfg.DelayTime, cfg.AutoAck, cfg.Durable, cfg.HandleFunc)
}
// createSubscribe 内部订阅消息
func (c *natsMsg) createSubscribe(ctx context.Context, subject, consumerName string, prefetchCount, delayTime int, autoAck, durable bool, handler func(ctx context.Context, message map[string]any) error) error {
g.Log().Infof(ctx, "🔔 NATS 开始订阅: QueueName=%s, ConsumerName=%s", subject, consumerName)
// 创建推送订阅的回调函数
msgHandler := func(msg *nats.Msg) {
var data map[string]any
if err := json.Unmarshal(msg.Data, &data); err != nil {
g.Log().Errorf(ctx, "❌ 解析消息失败: %v", err)
return
}
g.Log().Infof(ctx, "📨 收到消息: Subject=%s, Data=%v", msg.Subject, data)
// 处理业务逻辑
if err := handler(ctx, data); err != nil {
g.Log().Errorf(ctx, "❌ 处理消息失败: %v", err)
if !autoAck {
if err := msg.Nak(); err != nil {
g.Log().Errorf(ctx, "❌ Nak 失败: %v", err)
return
}
return
}
} else {
g.Log().Infof(ctx, "✅ 处理消息成功")
}
if err := msg.Ack(); err != nil {
g.Log().Errorf(ctx, "❌ Ack 失败: %v", err)
}
}
delayMsg := delayTime > 0
// 创建流
if err := c.createStream(ctx, subject, durable, delayMsg); err != nil {
return err
}
// 获取 JetStream 上下文
js := getNatsJS(c.name)
if js == nil {
g.Log().Errorf(ctx, "❌ NATS [%s] JetStream 不存在", c.name)
return fmt.Errorf("NATS JetStream 不存在")
}
// 创建推送订阅
var sub *nats.Subscription
var err error
// 配置订阅选项 - 使用 DeliverSubject 创建 Push Consumer
subOpts := []nats.SubOpt{
nats.Durable(consumerName),
nats.MaxAckPending(prefetchCount),
nats.DeliverSubject(consumerName),
}
if !autoAck {
subOpts = append(subOpts, nats.ManualAck())
}
// 使用 Subscribe 创建推送订阅
sub, err = js.Subscribe(subject, msgHandler, subOpts...)
if err != nil {
g.Log().Errorf(ctx, "创建推送订阅失败: %v", err)
return err
}
g.Log().Infof(ctx, "✅ NATS 推送订阅成功: Consumer=%s", consumerName)
// 启动后台 goroutine 监听上下文取消,用于清理订阅
go func() {
<-ctx.Done()
g.Log().Infof(ctx, "订阅上下文取消,取消订阅")
if err := sub.Unsubscribe(); err != nil {
return
}
}()
return nil
}
// createStream 内部创建消费组
func (c *natsMsg) createStream(ctx context.Context, subject string, durable, delayMsg bool) error {
streamName, storage := getStreamInfo(durable, delayMsg)
// 构建流配置
// 如果是延迟消息,需要包含两个 subjects:
// 1. subject.schedule - 用于发送调度消息
// 2. subject - 用于实际投递目标
subjects := []string{subject}
if delayMsg {
subjects = []string{subject, subject + ".schedule"}
}
jsConfig := &StreamConfig{
Name: streamName,
Subjects: subjects,
AllowMsgSchedules: delayMsg, // 延迟消息核心开关
Storage: storage,
Discard: DiscardNew, // 达到上限删除旧消息
}
nc := getNatsConn(c.name)
if !c.Ping(ctx) {
// 使用统一的重连函数
if err := commonConnect(ctx, MessageNATS, c.name, func(ctx context.Context) error {
return c.Connect(ctx)
}, func(ctx context.Context) error {
return c.Close(ctx)
}); err != nil {
g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageNATS, c.name, err)
return err
}
}
if nc == nil {
g.Log().Errorf(ctx, "❌ NATS [%s] 连接不存在", c.name)
return fmt.Errorf("NATS 连接不存在")
}
err := jsStreamCreate(nc, jsConfig)
if err != nil {
g.Log().Errorf(ctx, "❌ 创建 Stream 失败: err=%v", err)
return err
}
g.Log().Infof(ctx, "✅ 创建 Stream 成功: stream=%s, subjects=%v, allowSchedules=%v", streamName, subjects, delayMsg)
return nil
}
func getStreamInfo(durable, delayMsg bool) (string, StorageType) {
// Stream 不存在,创建新的
streamName := "ordinary_msg_memory"
storage := MemoryStorage
// 延迟消息必须使用 FileStorageNATS 官方要求)
if delayMsg {
if durable {
streamName = "delay_msg_file"
storage = FileStorage
} else {
streamName = "delay_msg_memory"
storage = MemoryStorage
}
} else {
if durable {
streamName = "ordinary_msg_file"
storage = FileStorage
}
}
return streamName, storage
}
const (
// JSApiStreamCreateT is the endpoint to create new streams.
// Will return JSON response.
JSApiStreamCreateT = "$JS.API.STREAM.CREATE.%s"
// JSApiStreamUpdateT is the endpoint to update existing streams.
// Will return JSON response.
JSApiStreamUpdateT = "$JS.API.STREAM.UPDATE.%s"
)
// jsStreamCreate is for sending a stream create for fields that nats.go does not know about yet.
func jsStreamCreate(nc *nats.Conn, cfg *StreamConfig) error {
j, err := json.Marshal(cfg)
if err != nil {
return err
}
msg, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), j, time.Second*3)
if err != nil {
return err
}
// 检查 API 响应中的错误
var resp struct {
Error *struct {
Code int `json:"code"`
ErrCode int `json:"err_code"`
Description string `json:"description"`
} `json:"error,omitempty"`
}
if err := json.Unmarshal(msg.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
// 如果 Stream 已存在,尝试更新
if resp.Error.ErrCode == 10058 { // JSStreamNameExistErr
return jsStreamUpdate(nc, cfg)
}
return fmt.Errorf("JS API error: %s", resp.Error.Description)
}
return nil
}
// jsStreamUpdate is for sending a stream create for fields that nats.go does not know about yet.
func jsStreamUpdate(nc *nats.Conn, cfg *StreamConfig) error {
j, err := json.Marshal(cfg)
if err != nil {
return err
}
msg, err := nc.Request(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name), j, time.Second*3)
if err != nil {
return err
}
// 检查 API 响应中的错误
var resp struct {
Error *struct {
Code int `json:"code"`
ErrCode int `json:"err_code"`
Description string `json:"description"`
} `json:"error,omitempty"`
}
if err := json.Unmarshal(msg.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
return fmt.Errorf("JS API error: %s", resp.Error.Description)
}
return nil
}