From ee6d3c9033962f90e0f246e4b66ea03bf06d74b0 Mon Sep 17 00:00:00 2001 From: qhd <1766646056@qq.com> Date: Sat, 31 Jan 2026 05:17:14 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=B6=88=E6=81=AF=E9=98=9F?= =?UTF-8?q?=E5=88=97=E6=A8=A1=E5=9D=97=EF=BC=8C=E6=96=B0=E5=A2=9ENATS?= =?UTF-8?q?=E5=92=8CRabbitMQ=E8=BF=9E=E6=8E=A5=E5=AE=9E=E7=8E=B0=EF=BC=8C?= =?UTF-8?q?=E7=A7=BB=E9=99=A4=E6=97=A7=E7=89=88=E6=B6=88=E6=81=AF=E9=98=9F?= =?UTF-8?q?=E5=88=97=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- message/connection_nats.go | 144 +++++++++ message/connection_rabbitmq.go | 109 +++++++ message/message.go | 174 ----------- message/msg_interfaces.go | 26 ++ message/msg_plugin_manager.go | 133 +++++++++ message/msg_queue.go | 152 ---------- message/nats_client.go | 313 -------------------- message/nats_msg.go | 351 ++++++++++++++-------- message/nats_rpc.go | 74 ++--- message/rabbit.go | 351 ---------------------- message/rabbitmq_client.go | 210 -------------- message/rabbitmq_msg.go | 283 +++++++++--------- message/redis.go | 275 ------------------ message/redis_client.go | 468 ------------------------------ message/redis_msg.go | 280 +++++++++--------- middleware/module_tenant_check.go | 8 +- redis/redis.go | 6 +- 17 files changed, 966 insertions(+), 2391 deletions(-) create mode 100644 message/connection_nats.go create mode 100644 message/connection_rabbitmq.go delete mode 100644 message/message.go create mode 100644 message/msg_interfaces.go create mode 100644 message/msg_plugin_manager.go delete mode 100644 message/msg_queue.go delete mode 100644 message/nats_client.go delete mode 100644 message/rabbit.go delete mode 100644 message/rabbitmq_client.go delete mode 100644 message/redis.go delete mode 100644 message/redis_client.go diff --git a/message/connection_nats.go b/message/connection_nats.go new file mode 100644 index 0000000..e00f3f6 --- /dev/null +++ b/message/connection_nats.go @@ -0,0 +1,144 @@ +package message + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/gogf/gf/v2/frame/g" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +var ( + nc *nats.Conn + js jetstream.JetStream + natsMu sync.RWMutex +) + +// natsConnect 建立 NATS 连接 +func natsConnect(ctx context.Context) error { + natsMu.Lock() + defer natsMu.Unlock() + + // 安全地关闭旧连接 + if oldConn := nc; oldConn != nil && !oldConn.IsClosed() { + oldConn.Close() + } + + // 从配置文件读取 NATS 地址 + natsURL := g.Cfg().MustGet(ctx, "nats.url").String() + if natsURL == "" { + // 默认使用本地地址 + natsURL = nats.DefaultURL + } + + // 使用独立的日志上下文,避免使用外部可能被取消的上下文 + logCtx := context.Background() + + // 连接选项配置 + opts := []nats.Option{ + nats.Name("goframe-nats-client"), + nats.ReconnectWait(2 * time.Second), + nats.MaxReconnects(-1), // 无限重连 + nats.PingInterval(10 * time.Second), + nats.MaxPingsOutstanding(5), + nats.ReconnectHandler(func(nc *nats.Conn) { + g.Log().Infof(logCtx, "✅ NATS 重连成功: %s", nc.ConnectedUrl()) + + natsMu.Lock() + defer natsMu.Unlock() + // 重新创建 JetStream 实例 + if newJS, err := jetstream.New(nc); err == nil { + js = newJS + } + }), + nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { + g.Log().Warningf(logCtx, "⚠️ NATS 连接断开: %v, 准备重连...", err) + }), + nats.ClosedHandler(func(nc *nats.Conn) { + g.Log().Infof(logCtx, "NATS 连接已关闭: %s", nc.ConnectedUrl()) + }), + nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) { + g.Log().Errorf(logCtx, "NATS 错误: %v", err) + }), + } + + var err error + nc, err = nats.Connect(natsURL, opts...) + if err != nil { + return fmt.Errorf("NATS 连接失败: %w", err) + } + + // 等待连接就绪 + if nc.Status() != nats.CONNECTED { + select { + case <-time.After(5 * time.Second): + // 连接超时,清理资源 + if nc != nil { + nc.Close() + } + return fmt.Errorf("NATS 连接超时") + case <-nc.StatusChanged(nats.CONNECTED): + // 连接成功 + case <-ctx.Done(): + // 外部上下文被取消,清理资源 + if nc != nil { + nc.Close() + } + return fmt.Errorf("NATS 连接被取消: %w", ctx.Err()) + } + } + + // 创建 JetStream 实例 + js, err = jetstream.New(nc) + if err != nil { + // 创建 JetStream 失败,清理连接 + if nc != nil { + nc.Close() + } + return fmt.Errorf("创建 JetStream 失败: %w", err) + } + + g.Log().Infof(ctx, "✅ NATS 连接成功: %s", nc.ConnectedUrl()) + return nil +} + +// natsPing 检测 NATS 连接状态 +func natsPing() bool { + natsMu.RLock() + defer natsMu.RUnlock() + + if nc == nil || nc.IsClosed() { + return false + } + + // 使用 NATS 的状态检查 + if nc.Status() != nats.CONNECTED { + return false + } + + return true +} + +// natsReconnect 重连 NATS +func natsReconnect(ctx context.Context) error { + if err := natsConnect(ctx); err != nil { + return fmt.Errorf("nats重连失败: %w", err) + } + return nil +} + +// natsClose 关闭 NATS 连接 +func natsClose(ctx context.Context) error { + natsMu.Lock() + defer natsMu.Unlock() + + if nc == nil || nc.IsClosed() { + return nil // 连接已经关闭或不存在 + } + nc.Close() + g.Log().Infof(ctx, "✅ NATS 连接已关闭") + return nil +} diff --git a/message/connection_rabbitmq.go b/message/connection_rabbitmq.go new file mode 100644 index 0000000..c98d2fe --- /dev/null +++ b/message/connection_rabbitmq.go @@ -0,0 +1,109 @@ +package message + +import ( + "context" + "fmt" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/util/gconv" + amqp "github.com/rabbitmq/amqp091-go" + "sync" + "time" +) + +var ( + conn *amqp.Connection + channel *amqp.Channel + rabbitmqMu sync.RWMutex +) + +// config RabbitMQ 配置 +type config struct { + Host string + Port int + Username string + Password string + VHost string +} + +func rabbitmqConnect(ctx context.Context) error { + rabbitmqMu.Lock() + defer rabbitmqMu.Unlock() + +LOOP: + cfg := &config{ + Host: g.Cfg().MustGet(ctx, "rabbitmq.host").String(), + Port: g.Cfg().MustGet(ctx, "rabbitmq.port").Int(), + Username: g.Cfg().MustGet(ctx, "rabbitmq.username").String(), + Password: g.Cfg().MustGet(ctx, "rabbitmq.password").String(), + VHost: g.Cfg().MustGet(ctx, "rabbitmq.vhost", "/").String(), + } + + url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost + + var err error + conn, err = amqp.Dial(url) + if err != nil { + g.Log().Errorf(ctx, "重连失败: %v", err) + + time.Sleep(2 * time.Second) + goto LOOP + } + + channel, err = conn.Channel() + if err != nil { + g.Log().Errorf(ctx, "创建 Channel 失败: %v", err) + + time.Sleep(2 * time.Second) + goto LOOP + } + + g.Log().Info(ctx, "RabbitMQ 重连成功") + return nil +} + +// rabbitmqReconnect 重新连接 +func rabbitmqReconnect(ctx context.Context) error { + if err := rabbitmqConnect(ctx); err != nil { + return fmt.Errorf("nats重连失败: %w", err) + } + return nil +} + +// rabbitmqPing 检测 RabbitMQ 连接状态 +func rabbitmqPing() bool { + rabbitmqMu.RLock() + defer rabbitmqMu.RUnlock() + + if conn == nil || conn.IsClosed() { + return false + } + + return true +} + +// rabbitmqClose 关闭连接 +func rabbitmqClose(ctx context.Context) error { + rabbitmqMu.Lock() + defer rabbitmqMu.Unlock() + + var lastErr error + + if channel != nil { + if err := channel.Close(); err != nil { + g.Log().Errorf(ctx, "关闭 RabbitMQ Channel 失败: %v", err) + lastErr = err + } + channel = nil + } + + if conn != nil { + if err := conn.Close(); err != nil { + g.Log().Errorf(ctx, "关闭 RabbitMQ 连接失败: %v", err) + lastErr = err + } + conn = nil + } + + g.Log().Info(ctx, "RabbitMQ 连接已关闭") + return lastErr +} diff --git a/message/message.go b/message/message.go deleted file mode 100644 index 321fc5d..0000000 --- a/message/message.go +++ /dev/null @@ -1,174 +0,0 @@ -package message - -import ( - "context" - "github.com/gogf/gf/v2/database/gredis" - - "github.com/gogf/gf/v2/errors/gerror" -) - -func GetRedisClientTest(name string) *gredis.Redis { - return getRedisClientTest(name) -} - -// GetLock 获取分布式锁 -func GetLock(ctx context.Context, key string, expireSeconds int64, fn func(ctx context.Context) error) (success bool, err error) { - return lock(ctx, key, expireSeconds, fn) -} - -// MessageConfig 消息配置接口 -type MessageConfig interface { - start(ctx context.Context) error - publish(ctx context.Context, data interface{}, options ...map[string]interface{}) (messageID string, err error) -} - -// RedisMessageConfig Redis Stream 消息配置 -type RedisMessageConfig struct { - StreamKey string // Stream 键名 - GroupName string // 消费者组名称 - ConsumerName string // 消费者名称 - BatchSize int64 // 最大并发数(信号量容量) - AutoAck bool // ACK确认,true自动确认,false手动确认 - HandleFunc func(ctx context.Context, message map[string]interface{}) error -} - -func (r *RedisMessageConfig) start(ctx context.Context) error { - return readFromStream(ctx, QueueMessage{ - StreamKey: r.StreamKey, - GroupName: r.GroupName, - ConsumerName: r.ConsumerName, - BatchSize: r.BatchSize, - AutoAck: r.AutoAck, - HandleFunc: r.HandleFunc, - }) -} - -func (r *RedisMessageConfig) publish(ctx context.Context, data interface{}, options ...map[string]interface{}) (messageID string, err error) { - return publishToRedis(ctx, r.StreamKey, data) -} - -// RabbitMQMessageConfig RabbitMQ 消息配置 -type RabbitMQMessageConfig struct { - Queue string // 队列名称 - Exchange string // 交换器名称 - RoutingKey string // 路由键 - PrefetchCount int // QoS: 预取数量(并发控制) - WorkerCount int // worker 数量 - ConsumerTag string // 消费者标签 - HandleFunc func(ctx context.Context, message map[string]interface{}) error -} - -func (r *RabbitMQMessageConfig) start(ctx context.Context) error { - return startRabbitMQConsumer(ctx, QueueMessage{ - Queue: r.Queue, - Exchange: r.Exchange, - RoutingKey: r.RoutingKey, - PrefetchCount: r.PrefetchCount, - WorkerCount: r.WorkerCount, - ConsumerTag: r.ConsumerTag, - AutoAck: true, - HandleFunc: r.HandleFunc, - }) -} - -func (r *RabbitMQMessageConfig) publish(ctx context.Context, data interface{}, options ...map[string]interface{}) (messageID string, err error) { - opts := make(map[string]interface{}) - if len(options) > 0 { - opts = options[0] - } - exchange := r.Exchange - routingKey := r.RoutingKey - delay := 0 - - if v, ok := opts["exchange"].(string); ok { - exchange = v - } - if v, ok := opts["routingKey"].(string); ok { - routingKey = v - } - if v, ok := opts["delay"].(int); ok { - delay = v - } - - if delay > 0 { - return publishDelayedToRabbitMQ(ctx, exchange, routingKey, data, delay) - } - return publishToRabbitMQ(ctx, exchange, routingKey, data) -} - -// QueueMessage 统一消息队列配置结构体(内部使用) -type QueueMessage struct { - // Redis Stream 配置 - StreamKey string - GroupName string - ConsumerName string - BatchSize int64 - AutoAck bool - HandleFunc func(ctx context.Context, message map[string]interface{}) error - - // RabbitMQ 配置 - Queue string - Exchange string - RoutingKey string - PrefetchCount int - WorkerCount int - ConsumerTag string -} - -// StartConsumers 启动消息消费者(统一入口) -// 支持同时启动多个消费者,包括 Redis Stream 和 RabbitMQ -func StartConsumers(ctx context.Context, configs ...MessageConfig) error { - for _, cfg := range configs { - if err := cfg.start(ctx); err != nil { - return gerror.Wrap(err, "启动消费者失败") - } - } - return nil -} - -// PublishMessage 发布消息(统一入口) -// 根据配置类型选择发布到 Redis Stream 或 RabbitMQ -func PublishMessage(ctx context.Context, cfg MessageConfig, data interface{}, options ...map[string]interface{}) (messageID string, err error) { - return cfg.publish(ctx, data, options...) -} - -// ========== Redis Stream 公共方法(方便迁移) ========== - -// AddToStream 将消息添加到 Redis Stream -//func AddToStream(ctx context.Context, streamKey string, msg interface{}) (messageID string, err error) { -// return addToStream(ctx, streamKey, msg) -//} - -// ReadFromStream 从 Redis Stream 读取消息(已废弃) -// 请使用 RedisMessageConfig.StartConsumers 启动消费者 -// 此方法保留用于向后兼容,但实际不会返回消息(异步消费模式) -func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count, blockMs int64) ([]StreamMessage, error) { - return nil, gerror.New("ReadFromStream 已废弃,请使用 RedisMessageConfig.StartConsumers 启动消费者") -} - -// AckMessage 确认 Redis Stream 消息 -func AckMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error { - return ackMessage(ctx, streamKey, groupName, messageIDs...) -} - -// InitStreamGroup 初始化 Redis Stream 消费者组 -func InitStreamGroup(ctx context.Context, streamKey, groupName string) error { - return initStreamGroup(ctx, streamKey, groupName) -} - -// ========== RabbitMQ 公共方法(方便迁移) ========== - -// InitRabbitMQ 初始化 RabbitMQ 连接 -func InitRabbitMQ(ctx context.Context) error { - return initRabbitMQ(ctx) -} - -// PublishToRabbitMQ 发布消息到 RabbitMQ -//func PublishToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}) error { -// return publishToRabbitMQ(ctx, exchange, routingKey, message) -//} - -// PublishDelayedToRabbitMQ 发布延时消息到 RabbitMQ -//func PublishDelayedToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}, delaySeconds int) error { -// return publishDelayedToRabbitMQ(ctx, exchange, routingKey, message, delaySeconds) -//} diff --git a/message/msg_interfaces.go b/message/msg_interfaces.go new file mode 100644 index 0000000..125cf81 --- /dev/null +++ b/message/msg_interfaces.go @@ -0,0 +1,26 @@ +package message + +import "context" + +type messagePublishConfig interface { + GetPublishMsgType() +} + +type messageSubscribeConfig interface { + GetSubscribeMsgType() +} + +// messageUtil 消息队列公共配置接口 +// 只暴露核心的发布/订阅功能,配置访问器方法不需要在公共接口中 +type messageUtil interface { + // Publish 发布消息 + Publish(ctx context.Context, msg messagePublishConfig) error + // Subscribe 订阅消息 + Subscribe(ctx context.Context, msg messageSubscribeConfig) error + // Ping 检测连接状态 + ping(ctx context.Context) bool + // Reconnect 重连 + reconnect(ctx context.Context) error + // Close 关闭连接 + close(ctx context.Context) error +} diff --git a/message/msg_plugin_manager.go b/message/msg_plugin_manager.go new file mode 100644 index 0000000..84c79eb --- /dev/null +++ b/message/msg_plugin_manager.go @@ -0,0 +1,133 @@ +package message + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/gogf/gf/v2/frame/g" +) + +// MessageType 消息队列类型 +type messageType string + +const ( + // MessageRedis Redis 消息队列 + MessageRedis messageType = "redis" + // MessageRabbitMQ RabbitMQ 消息队列 + MessageRabbitMQ messageType = "rabbitmq" + // MessageNATS NATS 消息队列 + MessageNATS messageType = "nats" +) + +// configFactory 消息队列配置工厂函数类型 +type configFactory func() messageUtil + +// PluginManager 消息队列插件管理器 +type pluginManager struct { + mu sync.RWMutex + instances map[messageType]messageUtil // 已连接的插件实例 +} + +var ( + defaultPluginManager = newPluginManager() + // 不再支持默认插件类型,必须显式指定类型 +) + +// newPluginManager 创建插件管理器 +func newPluginManager() *pluginManager { + return &pluginManager{ + instances: make(map[messageType]messageUtil), + } +} + +// RegisterPlugin 注册消息队列插件 +// 所有插件必须通过此方法注册,自动进行连接检测 +// 只有连接成功的插件才会被注册,连接失败的插件不会被注册 +// 异步无限重连,只有连接成功了才注册 +func registerPlugin(msgType messageType, factory configFactory) error { + if factory == nil { + return fmt.Errorf("factory cannot be nil") + } + + // 创建实例 + instance := factory() + ctx := context.Background() + + // 开启异步连接,无限重连直到成功 + go func() { + retryInterval := 2 * time.Second + maxInterval := 30 * time.Second + + for { + select { + case <-ctx.Done(): + g.Log().Errorf(ctx, "❌ [%s] 注册被取消", msgType) + return + default: + // 尝试连接(使用Reconnect方法) + if err := instance.reconnect(ctx); err == nil { + // 连接成功,注册插件 + if err := defaultPluginManager.register(msgType, instance); err != nil { + g.Log().Errorf(ctx, "❌ [%s] 注册插件失败: %v", msgType, err) + instance.close(ctx) + } else { + g.Log().Infof(ctx, "✅ [%s] 插件注册成功", msgType) + } + return + } + + // 连接失败,记录日志并等待重试 + g.Log().Warningf(ctx, "⚠️ [%s] 连接失败,%v 后重试...", msgType, retryInterval) + + select { + case <-time.After(retryInterval): + // 增加重试间隔,但不超过最大值 + retryInterval *= 2 + if retryInterval > maxInterval { + retryInterval = maxInterval + } + case <-ctx.Done(): + g.Log().Errorf(ctx, "❌ [%s] 注册被取消", msgType) + return + } + } + } + }() + + return nil +} + +// register 注册插件(内部方法) +func (m *pluginManager) register(msgType messageType, instance messageUtil) error { + m.mu.Lock() + defer m.mu.Unlock() + m.instances[msgType] = instance + return nil +} + +// GetMsgPlugin 获取消息队列插件 +func GetMsgPlugin(msgType messageType) (messageUtil, error) { + defaultPluginManager.mu.RLock() + instance, ok := defaultPluginManager.instances[msgType] + defaultPluginManager.mu.RUnlock() + + if !ok { + return nil, fmt.Errorf("unsupported message type: %s", msgType) + } + + return instance, nil +} + +// GetSupportedTypes 获取所有已注册的插件类型 +func GetSupportedTypes() []messageType { + defaultPluginManager.mu.RLock() + defer defaultPluginManager.mu.RUnlock() + + types := make([]messageType, 0, len(defaultPluginManager.instances)) + for t := range defaultPluginManager.instances { + types = append(types, t) + } + return types +} diff --git a/message/msg_queue.go b/message/msg_queue.go deleted file mode 100644 index be1f2da..0000000 --- a/message/msg_queue.go +++ /dev/null @@ -1,152 +0,0 @@ -package message - -import ( - "context" - "fmt" -) - -type RedisConfig struct { - // Stream 名称 - Stream string - - // 消费者组名称 - Group string - - // 消费者名称 - Consumer string - - // 每次消费数量 - Count int64 - - // 是否自动 ACK - AutoAck bool - - // 处理函数 - HandleFunc func(ctx context.Context, message map[string]interface{}) error -} - -// RabbitMQConfig RabbitMQ 队列配置 -type RabbitMQConfig struct { - Mode string - Exchange string - Topic string - DelayMessage bool - - // 队列名称(必需) - Name string - - // 实际队列名(用于绑定) - Queue string - - // 是否持久化 - Durable bool - - // QoS 预取数量(每次推送的消息数量,默认10) - PrefetchCount int - - // 最大重试次数(默认3) - MaxRetry int - - // 是否自动 ACK - AutoAck bool - - // 处理函数 - HandleFunc func(ctx context.Context, message map[string]interface{}) error -} - -// NATSConfig NATS 队列配置 -type NATSConfig struct { - DelayMessage bool - // Stream 名称 - Stream string - - // 消费者名称 - Consumer string - - // 是否持久化 - Durable bool - - // 副本数 - Replicas int - // QoS 预取数量(每次推送的消息数量,默认10) - PrefetchCount int - - // 是否自动 ACK - AutoAck bool - - // 处理函数 - HandleFunc func(ctx context.Context, message map[string]interface{}) error -} - -// messageBroker 消息代理接口 -type messageBroker interface { - // StreamGroup 创建消费组(支持单个配置或批量配置) - streamGroup(ctx context.Context, configs ...interface{}) error - - // Publish 发布消息(支持单个配置或批量配置) - publish(ctx context.Context, config interface{}, data interface{}) error - - // PublishDelayed 发布延迟消息(支持单个配置或批量配置) - publishDelayed(ctx context.Context, config interface{}, data interface{}, delay int) error - - // Subscribe 订阅消息(支持单个配置或批量配置) - subscribe(ctx context.Context, configs ...interface{}) error -} - -type messageClientType string - -const ( - ClientTypeRedis messageClientType = "redis" - ClientTypeRabbitMQ messageClientType = "rabbitmq" - ClientTypeNATS messageClientType = "nats" -) - -// newMessageBroker 创建消息代理实例 -func newMessageBroker(ctx context.Context, clientType messageClientType) (messageBroker, error) { - switch clientType { - case ClientTypeRedis: - return &redisMessageClient{clientType: clientType}, nil - case ClientTypeRabbitMQ: - return &rabbitMQMessageClient{clientType: clientType}, nil - case ClientTypeNATS: - return &natsMessageClient{clientType: clientType}, nil - default: - return nil, fmt.Errorf("unknown client type: %s", clientType) - } -} - -// StreamGroup 直接创建消费组 -func StreamGroup(ctx context.Context, clientType messageClientType, configs ...interface{}) error { - broker, err := newMessageBroker(ctx, clientType) - if err != nil { - return err - } - return broker.streamGroup(ctx, configs...) -} - -// Publish 直接发布消息 -func Publish(ctx context.Context, clientType messageClientType, config interface{}, data interface{}) error { - broker, err := newMessageBroker(ctx, clientType) - if err != nil { - return err - } - return broker.publish(ctx, config, data) -} - -// PublishDelayed 直接发布延迟消息 -func PublishDelayed(ctx context.Context, clientType messageClientType, config interface{}, data interface{}, delay int) error { - broker, err := newMessageBroker(ctx, clientType) - if err != nil { - return err - } - return broker.publishDelayed(ctx, config, data, delay) -} - -// Subscribe 直接订阅消息 -func Subscribe(ctx context.Context, clientType messageClientType, configs ...interface{}) error { - broker, err := newMessageBroker(ctx, clientType) - if err != nil { - return err - } - return broker.subscribe(ctx, configs...) -} diff --git a/message/nats_client.go b/message/nats_client.go deleted file mode 100644 index 470bef7..0000000 --- a/message/nats_client.go +++ /dev/null @@ -1,313 +0,0 @@ -package message - -import ( - "context" - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/gogf/gf/v2/frame/g" - "github.com/nats-io/nats.go" - "github.com/nats-io/nats.go/jetstream" -) - -var ( - nc *nats.Conn - js jetstream.JetStream - inited bool - natsMu sync.RWMutex - natsURL string - healthCtx context.Context - healthCancel context.CancelFunc - connected bool - reconnectChan chan struct{} - - // 连接状态变化监听器 - connStateListeners []connStateListener - connListenersMu sync.RWMutex - - // 监控指标 - metrics metricsCounter -) - -// Metrics 监控指标 -type metricsCounter struct { - PublishCount atomic.Int64 - PublishError atomic.Int64 - SubscribeCount atomic.Int64 - RequestCount atomic.Int64 - RequestError atomic.Int64 - ConsumeCount atomic.Int64 - ConsumeError atomic.Int64 -} - -// ConnState 连接状态 -type connState int - -const ( - connStateDisconnected connState = iota - connStateConnecting - connStateConnected - connStateReconnecting - connStateClosed -) - -// ConnStateListener 连接状态监听器 -type connStateListener func(state connState, err error) - -// GetMetrics 获取监控指标 -func getMetrics() metricsCounter { - return metrics -} - -// registerConnStateListener 注册连接状态监听器 -func registerConnStateListener(listener connStateListener) { - connListenersMu.Lock() - defer connListenersMu.Unlock() - connStateListeners = append(connStateListeners, listener) -} - -// unregisterConnStateListener 取消注册连接状态监听器 -func unregisterConnStateListener(listener connStateListener) { - connListenersMu.Lock() - defer connListenersMu.Unlock() - for i, l := range connStateListeners { - if l != nil && &l == &listener { - connStateListeners = append(connStateListeners[:i], connStateListeners[i+1:]...) - break - } - } -} - -// notifyConnState 通知所有监听器连接状态变化 -func notifyConnState(state connState, err error) { - connListenersMu.RLock() - listeners := make([]connStateListener, len(connStateListeners)) - copy(listeners, connStateListeners) - connListenersMu.RUnlock() - - for _, listener := range listeners { - if listener != nil { - listener(state, err) - } - } -} - -// init 初始化 NATS 连接 -func init() { - // 从配置文件读取 NATS 地址 - natsURL = g.Cfg().MustGet(context.Background(), "nats.url").String() - if natsURL == "" { - // 默认使用本地地址 - natsURL = nats.DefaultURL - } - - // 创建健康检查上下文 - healthCtx, healthCancel = context.WithCancel(context.Background()) - - // 创建重连通知通道(增大缓冲区避免丢失通知) - reconnectChan = make(chan struct{}, 10) - - // 启动连接 - go initConnection() - - // 启动健康检查协程 - go healthCheck() -} - -// initConnection 初始化连接 -func initConnection() { - ctx := context.Background() - notifyConnState(connStateConnecting, nil) - if err := connect(ctx); err != nil { - g.Log().Errorf(ctx, "NATS 初始连接失败: %v", err) - notifyConnState(connStateDisconnected, err) - } -} - -// connect 建立 NATS 连接 -func connect(ctx context.Context) error { - natsMu.Lock() - defer natsMu.Unlock() - - if nc != nil && !nc.IsClosed() { - nc.Close() - } - - // 连接选项配置 - opts := []nats.Option{ - nats.Name("goframe-nats-client"), - nats.ReconnectWait(2 * time.Second), - nats.MaxReconnects(-1), // 无限重连 - nats.PingInterval(10 * time.Second), - nats.MaxPingsOutstanding(5), - nats.ReconnectHandler(func(nc *nats.Conn) { - g.Log().Infof(ctx, "✅ NATS 重连成功: %s", nc.ConnectedUrl()) - connected = true - - // 重新创建 JetStream 实例 - if newJS, err := jetstream.New(nc); err == nil { - js = newJS - } - - // 通知重连成功 - notifyConnState(connStateConnected, nil) - - // 使用非阻塞发送避免阻塞 - select { - case reconnectChan <- struct{}{}: - default: - // 通道已满,丢弃通知 - } - }), - nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { - g.Log().Warningf(ctx, "⚠️ NATS 连接断开: %v, 准备重连...", err) - connected = false - notifyConnState(connStateReconnecting, err) - }), - nats.ClosedHandler(func(nc *nats.Conn) { - g.Log().Infof(ctx, "NATS 连接已关闭: %s", nc.ConnectedUrl()) - connected = false - notifyConnState(connStateClosed, nil) - }), - nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) { - g.Log().Errorf(ctx, "NATS 错误: %v", err) - }), - } - - var err error - nc, err = nats.Connect(natsURL, opts...) - if err != nil { - return fmt.Errorf("NATS 连接失败: %w", err) - } - - // 等待连接就绪 - if nc.Status() != nats.CONNECTED { - select { - case <-time.After(5 * time.Second): - notifyConnState(connStateDisconnected, fmt.Errorf("连接超时")) - return fmt.Errorf("NATS 连接超时") - case <-nc.StatusChanged(nats.CONNECTED): - } - } - - // 创建 JetStream 实例 - js, err = jetstream.New(nc) - if err != nil { - return fmt.Errorf("创建 JetStream 失败: %w", err) - } - - connected = true - inited = true - g.Log().Infof(ctx, "✅ NATS 连接成功: %s", nc.ConnectedUrl()) - notifyConnState(connStateConnected, nil) - return nil -} - -// healthCheck 健康检查协程(仅作为备用检查) -func healthCheck() { - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - - for { - select { - case <-healthCtx.Done(): - return - case <-ticker.C: - natsMu.RLock() - currentConnected := connected - currentConn := nc - natsMu.RUnlock() - - if !currentConnected || currentConn == nil || currentConn.IsClosed() { - // 仅记录日志,不尝试重连(NATS 已有自动重连机制) - g.Log().Warning(context.Background(), "NATS 连接断开,等待 NATS 自动重连...") - } - case <-reconnectChan: - // 重连成功的通知(仅记录日志) - g.Log().Info(context.Background(), "收到重连成功通知") - } - } -} - -// checkConnected 检查连接状态 -func checkConnected() bool { - natsMu.RLock() - defer natsMu.RUnlock() - return connected && nc != nil && !nc.IsClosed() -} - -// getConnState 获取当前连接状态 -func getConnState() connState { - natsMu.RLock() - defer natsMu.RUnlock() - - if nc == nil { - return connStateDisconnected - } - - if nc.IsClosed() { - return connStateClosed - } - - if connected { - return connStateConnected - } - - return connStateDisconnected -} - -// shutdown 优雅关闭:自动注销所有已注册的服务并关闭 NATS 连接 -func shutdown() error { - ctx := context.Background() - g.Log().Info(ctx, "开始优雅关闭 NATS RPC 服务...") - - // 注销所有单实例服务 - rpcServicesMu.Lock() - singleServiceCount := len(rpcServices) - for serviceName := range rpcServices { - if sub, exists := rpcSubs[serviceName]; exists { - if err := sub.Unsubscribe(); err != nil { - g.Log().Errorf(ctx, "注销服务 %s 失败: %v", serviceName, err) - } - } - delete(rpcSubs, serviceName) - delete(rpcServices, serviceName) - } - rpcServicesMu.Unlock() - - // 注销所有队列服务 - queueRPCMu.Lock() - queueServiceCount := 0 - for queueName, servicesMap := range queueRPCServices { - queueServiceCount += len(servicesMap) - for serviceName, sub := range queueRPCSubs[queueName] { - if err := sub.Unsubscribe(); err != nil { - g.Log().Errorf(ctx, "注销队列服务 %s (队列: %s) 失败: %v", serviceName, queueName, err) - } - } - delete(queueRPCSubs, queueName) - delete(queueRPCServices, queueName) - } - queueRPCMu.Unlock() - - g.Log().Infof(ctx, "已注销 %d 个单实例服务和 %d 个队列服务", singleServiceCount, queueServiceCount) - - natsMu.Lock() - defer natsMu.Unlock() - - // 停止健康检查协程 - if healthCancel != nil { - healthCancel() - } - - // 关闭连接 - if nc != nil && !nc.IsClosed() { - nc.Close() - connected = false - inited = false - } - g.Log().Info(ctx, "NATS RPC 服务已优雅关闭") - return nil -} diff --git a/message/nats_msg.go b/message/nats_msg.go index 016917f..205ac07 100644 --- a/message/nats_msg.go +++ b/message/nats_msg.go @@ -4,195 +4,274 @@ import ( "context" "encoding/json" "fmt" - "time" - "github.com/gogf/gf/v2/frame/g" + "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" + "time" ) -// natsMessageClient NATS 实现 -type natsMessageClient struct { - clientType messageClientType +type NatsPublishMsgConfig struct { + QueueName string + Durable bool + DelayTime int + Data any } -// StreamGroup 创建消费组(支持单个或批量) -func (q *natsMessageClient) streamGroup(ctx context.Context, configs ...interface{}) error { - if len(configs) == 0 { - return fmt.Errorf("配置不能为空") +type NatsSubscribeMsgConfig struct { + QueueName string + Durable bool + DelayTime int + ConsumerName string + AutoAck bool + PrefetchCount int + HandleFunc func(ctx context.Context, message map[string]interface{}) error +} + +func (*NatsPublishMsgConfig) GetPublishMsgType() { + +} + +func (*NatsSubscribeMsgConfig) GetSubscribeMsgType() { + +} + +func init() { + // 注册 Nats 插件,必须使用 RegisterPlugin 确保连接检测 + registerPlugin(MessageNATS, func() messageUtil { + return &natsMsg{} + }) +} + +type natsMsg struct{} + +// Ping 检测 NATS 连接状态 +func (c *natsMsg) ping(_ context.Context) bool { + return natsPing() +} + +// Reconnect 重连 NATS +func (c *natsMsg) reconnect(ctx context.Context) error { + return natsReconnect(ctx) +} + +// Close 关闭 NATS 连接 +func (c *natsMsg) close(ctx context.Context) error { + return natsClose(ctx) +} + +// Publish 发布消息 +func (c *natsMsg) Publish(ctx context.Context, msgConfig messagePublishConfig) error { + cfg, ok := msgConfig.(*NatsPublishMsgConfig) + if !ok { + return fmt.Errorf("无效的 NATS 配置类型") } - for _, config := range configs { - cfg, ok := config.(*NATSConfig) - if !ok { - return fmt.Errorf("无效的 NATS 配置类型") + if g.IsEmpty(cfg.QueueName) { + return fmt.Errorf("必须提供队列名称") + } + if g.IsEmpty(cfg.Data) { + return fmt.Errorf("必须提供数据") + } + return c.createPublish(ctx, cfg.QueueName, cfg.Durable, cfg.DelayTime, cfg.Data) +} + +// Publish 发布消息 +func (c *natsMsg) createPublish(ctx context.Context, subject string, durable bool, delayTime int, data any) error { + delayMsg := delayTime > 0 + if err := c.createStreamGroupInternal(ctx, subject, durable, delayMsg); err != nil { + return err + } + payload, err := json.Marshal(data) + if err != nil { + return fmt.Errorf("序列化数据失败: %w", err) + } + + msg := &nats.Msg{ + Subject: subject, + Data: payload, + } + + if delayMsg { + // 计算目标投递时间 + targetTime := time.Now().Add(time.Duration(delayTime) * time.Second) + delayNs := time.Until(targetTime).Nanoseconds() + if delayNs < 0 { + delayNs = 0 } - if err := q.createStreamGroup(ctx, cfg); err != nil { - return err + + g.Log().Infof(ctx, "📅 NATS 延迟消息配置: DelayTime=%d秒, TargetTime=%v, DelayNs=%d纳秒(%.2f秒)", + delayTime, targetTime.Format("2006-01-02 15:04:05"), delayNs, float64(delayNs)/float64(time.Second.Nanoseconds())) + + // NATS JetStream 延迟消息使用 Nats-Msg-Delay Header(纳秒数) + msg.Header = nats.Header{ + "Nats-Msg-Delay": []string{fmt.Sprintf("%d", delayNs)}, + } + g.Log().Infof(ctx, "📅 NATS 延迟消息 Header: %v", msg.Header) + + // 获取 Stream 配置验证 + streamName, _ := getStreamInfo(durable, delayMsg) + stream, err := js.Stream(ctx, streamName) + if err == nil { + info, _ := stream.Info(ctx) + g.Log().Infof(ctx, "📅 Stream 配置: AllowMsgSchedules=%v, Storage=%v", + info.Config.AllowMsgSchedules, info.Config.Storage) + if !info.Config.AllowMsgSchedules { + g.Log().Errorf(ctx, "❌ Stream 不支持延迟消息!AllowMsgSchedules=false") + } } } + + // 发布消息到 JetStream + ack, err := js.PublishMsg(ctx, msg) + if err != nil { + g.Log().Errorf(ctx, "❌ NATS 发布消息失败: err=%v", err) + return err + } + + g.Log().Infof(ctx, "✅ NATS 发布消息成功: StreamSeq=%d, Domain=%s", ack.Sequence, ack.Domain) return nil } -// createStreamGroup 内部单个创建消费组 -func (q *natsMessageClient) createStreamGroup(ctx context.Context, cfg *NATSConfig) error { - // Stream 不存在,创建新的 - storage := jetstream.FileStorage - if !cfg.Durable { - storage = jetstream.MemoryStorage - } - if g.IsEmpty(cfg.Replicas) { - cfg.Replicas = 1 +// createStreamGroup 内部创建消费组 +func (c *natsMsg) createStreamGroupInternal(ctx context.Context, subject string, durable, delayMsg bool) error { + streamName, storage := getStreamInfo(durable, delayMsg) + + // 先检查 Stream 是否存在 + stream, err := js.Stream(ctx, streamName) + if err == nil { + // Stream 已存在,检查配置是否匹配 + info, _ := stream.Info(ctx) + if info.Config.AllowMsgSchedules != delayMsg || info.Config.Storage != storage { + g.Log().Infof(ctx, "🔄 Stream 配置不匹配,正在重新创建: stream=%s, 当前AllowMsgSchedules=%v, 需要%v", + streamName, info.Config.AllowMsgSchedules, delayMsg) + // 删除旧 Stream + if err := js.DeleteStream(ctx, streamName); err != nil { + g.Log().Warningf(ctx, "删除旧 Stream 失败: %v", err) + } + } else { + g.Log().Infof(ctx, "✅ Stream 已存在且配置正确: stream=%s", streamName) + return nil + } } + // 构建流配置 jsConfig := jetstream.StreamConfig{ - Name: cfg.Stream, - Subjects: []string{fmt.Sprintf("%s.>", cfg.Stream)}, - Replicas: cfg.Replicas, - NoAck: cfg.AutoAck, - AllowMsgSchedules: cfg.DelayMessage, // 延迟消息核心开关 + Name: streamName, + Subjects: []string{subject}, + AllowMsgSchedules: delayMsg, // 延迟消息核心开关 Storage: storage, Discard: jetstream.DiscardOld, // 达到上限删除旧消息 } - // 检查流是否已存在 - stream, err := js.Stream(ctx, cfg.Stream) - if err == nil { - // 流已存在,更新配置 - _, err = js.UpdateStream(ctx, jsConfig) - if err != nil { - return fmt.Errorf("更新任务流失败: %w", err) - } - g.Log().Infof(ctx, "任务流已更新: %s", stream.CachedInfo().Config.Name) - return nil - } - // 创建新流 + stream, err = js.CreateStream(ctx, jsConfig) if err != nil { return fmt.Errorf("创建任务流失败: %w", err) } - g.Log().Infof(ctx, "✅ NATS 队列初始化成功: stream=%s, consumer=%s", cfg.Stream, cfg.Consumer) + // 获取 Stream 信息验证配置 + info, err := stream.Info(ctx) + if err == nil { + g.Log().Infof(ctx, "✅ NATS 队列初始化成功: stream=%s, AllowMsgSchedules=%v, Storage=%v", + streamName, info.Config.AllowMsgSchedules, info.Config.Storage) + } + + g.Log().Infof(ctx, "✅ NATS 队列初始化成功: stream=%s", streamName) return nil } -// Publish 发布消息(支持单个或批量) -func (q *natsMessageClient) publish(ctx context.Context, config interface{}, data interface{}) error { - cfg, ok := config.(*NATSConfig) +// Subscribe 订阅消息 +func (c *natsMsg) Subscribe(ctx context.Context, msgConfig messageSubscribeConfig) error { + cfg, ok := msgConfig.(*NatsSubscribeMsgConfig) if !ok { return fmt.Errorf("无效的 NATS 配置类型") } - err := q.createStreamGroup(ctx, cfg) - if err != nil { - return err + if g.IsEmpty(cfg.QueueName) { + return fmt.Errorf("必须提供队列名称") } - payload, err := json.Marshal(data) - if err != nil { - return fmt.Errorf("序列化数据失败: %w", err) + if g.IsEmpty(cfg.ConsumerName) { + return fmt.Errorf("必须提供消费者名称") } - - // 发布消息到 JetStream - subject := fmt.Sprintf("%s.>", cfg.Stream) - _, err = js.Publish(ctx, subject, payload) - if err != nil { - g.Log().Errorf(ctx, "❌ NATS 发布消息失败: topic=%s, err=%v", cfg.Stream, err) - return err + if g.IsEmpty(cfg.HandleFunc) { + return fmt.Errorf("必须提供处理函数") } - - g.Log().Infof(ctx, "✅ NATS 发布消息成功: topic=%s", cfg.Stream) - return nil + if g.IsEmpty(cfg.PrefetchCount) { + cfg.PrefetchCount = 1 + } + return c.createSubscribeInternal(ctx, cfg.QueueName, cfg.ConsumerName, cfg.PrefetchCount, cfg.AutoAck, cfg.Durable, cfg.DelayTime, cfg.HandleFunc) } -// PublishDelayed 发布延迟消息(支持单个或批量) -func (q *natsMessageClient) publishDelayed(ctx context.Context, config interface{}, data interface{}, delay int) error { +// createSubscribe 内部订阅消息 +func (c *natsMsg) createSubscribeInternal(ctx context.Context, subject, consumerName string, prefetchCount int, autoAck, durable bool, delayTime int, handler func(ctx context.Context, message map[string]interface{}) error) error { + g.Log().Infof(ctx, "🔔 NATS 开始订阅: QueueName=%s, ConsumerName=%s", subject, consumerName) + delayMsg := delayTime > 0 + streamName, _ := getStreamInfo(durable, delayMsg) - cfg, ok := config.(*NATSConfig) - if !ok { - return fmt.Errorf("无效的 NATS 配置类型") + // 确保 Stream 存在,如果不存在则创建 + if err := c.createStreamGroupInternal(ctx, subject, durable, delayMsg); err != nil { + g.Log().Errorf(ctx, "创建 Stream 失败: %v", err) + return fmt.Errorf("创建 Stream 失败: %w", err) } - payload, err := json.Marshal(data) - if err != nil { - return fmt.Errorf("序列化数据失败: %w", err) - } - - // 使用 goroutine 实现简单的延迟发布 - go func() { - time.Sleep(time.Duration(delay)) - subject := fmt.Sprintf("%s.>", cfg.Stream) - if err := q.publishInternal(ctx, subject, payload); err != nil { - g.Log().Errorf(ctx, "❌ NATS 延迟消息发布失败: topic=%s, delay=%v, err=%v", cfg.Stream, delay, err) - } - }() - - g.Log().Infof(ctx, "✅ NATS 延迟消息已提交: topic=%s, delay=%v", cfg.Stream, delay) - - return nil -} - -// publishInternal 内部发布消息 -func (q *natsMessageClient) publishInternal(ctx context.Context, subject string, payload []byte) error { - _, err := js.Publish(ctx, subject, payload) - return err -} - -// Subscribe 订阅消息(支持单个或批量) -func (q *natsMessageClient) subscribe(ctx context.Context, configs ...interface{}) error { - if len(configs) == 0 { - return fmt.Errorf("配置不能为空") - } - - for _, config := range configs { - cfg, ok := config.(*NATSConfig) - if !ok { - return fmt.Errorf("无效的 NATS 配置类型") - } - handler := cfg.HandleFunc - if handler == nil { - return fmt.Errorf("必须提供处理函数") - } - if err := q.createSubscribe(ctx, cfg, handler); err != nil { - return err - } - } - return nil -} - -// subscribe 内部单个订阅消息 -func (q *natsMessageClient) createSubscribe(ctx context.Context, cfg *NATSConfig, handler func(ctx context.Context, message map[string]interface{}) error) error { - g.Log().Infof(ctx, "🔔 NATS 开始订阅: stream=%s, consumer=%s", cfg.Stream, cfg.Consumer) // Stream 不存在,创建新的 ackPolicy := jetstream.AckExplicitPolicy - if cfg.AutoAck { + if autoAck { ackPolicy = jetstream.AckNonePolicy } jsConfig := jetstream.ConsumerConfig{ - Name: cfg.Consumer, - Durable: cfg.Consumer, + Name: consumerName, + Durable: consumerName, + FilterSubject: subject, AckPolicy: ackPolicy, MaxDeliver: 3, - MaxAckPending: cfg.PrefetchCount, + MaxAckPending: prefetchCount, } // 创建新消费者 - consumer, err := js.CreateOrUpdateConsumer(ctx, cfg.Stream, jsConfig) + consumer, err := js.CreateOrUpdateConsumer(ctx, streamName, jsConfig) if err != nil { - return fmt.Errorf("创建消费者失败: %w", err) + g.Log().Errorf(ctx, "创建消费者失败: %v", err) + return err } + + // 获取消费者信息验证 + if cInfo, err := consumer.Info(ctx); err == nil { + g.Log().Infof(ctx, "🔔 消费者创建成功: %s, AckPolicy=%v, MaxAckPending=%d", + cInfo.Name, cInfo.Config.AckPolicy, cInfo.Config.MaxAckPending) + } + // 创建消息处理函数 msgHandler := func(msg jetstream.Msg) { + // 记录消息接收时间 + now := time.Now() + meta, err := msg.Metadata() + if err == nil { + g.Log().Infof(ctx, "📨 收到消息: StreamSeq=%d, Published=%v, Received=%v, 距离发布=%.2f秒", + meta.Sequence.Stream, + meta.Timestamp.Format("2006-01-02 15:04:05"), + now.Format("2006-01-02 15:04:05"), + now.Sub(meta.Timestamp).Seconds()) + } + // 解析消息 var data map[string]any if err := json.Unmarshal(msg.Data(), &data); err != nil { g.Log().Errorf(ctx, "解析消息失败: %v", err) - msg.Nak() + if err := msg.Nak(); err != nil { + g.Log().Errorf(ctx, "Nak 失败: %v", err) + } return } // 处理业务逻辑 if err := handler(ctx, data); err != nil { g.Log().Errorf(ctx, "处理消息失败: %v", err) - msg.Nak() + if err := msg.Nak(); err != nil { + g.Log().Errorf(ctx, "Nak 失败: %v", err) + } return } g.Log().Infof(ctx, "处理消息成功") - if !cfg.AutoAck { - msg.Ack() + if !autoAck { + if err := msg.Ack(); err != nil { + g.Log().Errorf(ctx, "Ack 失败: %v", err) + } } } @@ -202,7 +281,25 @@ func (q *natsMessageClient) createSubscribe(ctx context.Context, cfg *NATSConfig return fmt.Errorf("开始消费失败: %w", err) } - g.Log().Infof(ctx, "✅ 开始消费消息: %s/%s", cfg.Stream, cfg.Consumer) + g.Log().Infof(ctx, "✅ NATS 订阅成功") return nil } + +func getStreamInfo(durable, delayMsg bool) (string, jetstream.StorageType) { + // Stream 不存在,创建新的 + streamName := "ordinary_msg_memory" + storage := jetstream.MemoryStorage + + // 延迟消息必须使用 FileStorage(NATS 官方要求) + if delayMsg { + streamName = "delay_msg_file" + storage = jetstream.FileStorage + } else { + if durable { + streamName = "ordinary_msg_file" + storage = jetstream.FileStorage + } + } + return streamName, storage +} diff --git a/message/nats_rpc.go b/message/nats_rpc.go index 8644734..e204743 100644 --- a/message/nats_rpc.go +++ b/message/nats_rpc.go @@ -38,11 +38,11 @@ var ( // 返回值可以是任意类型,会被自动序列化为 JSON type rpcHandler func(ctx context.Context, req []byte) (any, error) -// RegisterRPCService 注册 RPC 服务(单实例) +// registerRPCService 注册 RPC 服务(单实例) // serviceName: 服务名称,调用方通过此名称调用服务 // handler: 服务处理函数,接收请求并返回响应 func registerRPCService(serviceName string, handler rpcHandler) (err error) { - if !checkConnected() { + if !natsPing() { return fmt.Errorf("NATS 未连接") } @@ -74,18 +74,17 @@ func registerRPCService(serviceName string, handler rpcHandler) (err error) { } rpcSubs[serviceName] = sub - metrics.SubscribeCount.Add(1) g.Log().Infof(context.Background(), "✅ RPC 服务已注册: %s", serviceName) return nil } -// RegisterQueueRPCService 注册 RPC 服务(集群模式) +// registerQueueRPCService 注册 RPC 服务(集群模式) // 多个服务实例注册同一服务时,请求会自动负载均衡 // serviceName: 服务名称 // queueName: 队列组名,同一队列组的实例共享请求 // handler: 服务处理函数 func registerQueueRPCService(serviceName, queueName string, handler rpcHandler) (err error) { - if !checkConnected() { + if !natsPing() { return fmt.Errorf("NATS 未连接") } @@ -126,7 +125,6 @@ func registerQueueRPCService(serviceName, queueName string, handler rpcHandler) queueRPCSubs[queueName][serviceName] = sub queueRPCMu.Unlock() - metrics.SubscribeCount.Add(1) g.Log().Infof(context.Background(), "✅ 队列 RPC 服务已注册: %s (队列组: %s)", serviceName, queueName) return nil } @@ -138,16 +136,16 @@ func executeHandler(handler rpcHandler, msg *nats.Msg) { // 从消息头重建上下文 ctx := headersToContext(context.Background(), msg.Header) // 提取 TraceID,创建可取消的 context - ctx = createCancelContext(ctx, msg.Header.Get(TraceIDKey)) + ctx = createCancelContext(ctx, msg.Header.Get(traceIDKey)) // 检查 context 是否已取消(在调用 handler 之前) select { case <-ctx.Done(): // context 已取消,返回取消错误 - g.Log().Infof(ctx, "RPC 请求已取消,traceID: %s", msg.Header.Get(TraceIDKey)) + g.Log().Infof(ctx, "RPC 请求已取消,traceID: %s", msg.Header.Get(traceIDKey)) // 仍然需要发送响应以避免客户端超时 respData = []byte(`{"_err":"请求已取消"}`) // 清理取消映射表 - cleanupTraceCancel(msg.Header.Get(TraceIDKey)) + cleanupTraceCancel(msg.Header.Get(traceIDKey)) return default: } @@ -176,7 +174,7 @@ func executeHandler(handler rpcHandler, msg *nats.Msg) { g.Log().Errorf(ctx, "RPC 响应失败: %v", err) } // 请求结束,清理取消映射表 - cleanupTraceCancel(msg.Header.Get(TraceIDKey)) + cleanupTraceCancel(msg.Header.Get(traceIDKey)) } // createCancelContext 创建可取消的 context 并注册到取消映射表 @@ -211,7 +209,7 @@ func createCancelContext(ctx context.Context, traceID string) context.Context { // // sub, err := nats.SetupCancelListener(ctx) func setupCancelListener(ctx context.Context) (*nats.Subscription, error) { - if !checkConnected() { + if !natsPing() { return nil, fmt.Errorf("NATS 未连接") } @@ -253,7 +251,6 @@ func setupCancelListener(ctx context.Context) (*nats.Subscription, error) { return nil, fmt.Errorf("设置取消监听器失败: %w", err) } - metrics.SubscribeCount.Add(1) g.Log().Infof(ctx, "✅ 取消监听器已设置: %s", cancelSubject) return sub, nil } @@ -264,7 +261,7 @@ func setupCancelListener(ctx context.Context) (*nats.Subscription, error) { // // err := nats.publishCancel(ctx, traceID) func publishCancel(ctx context.Context, traceID string) error { - if !checkConnected() { + if !natsPing() { return fmt.Errorf("NATS 未连接") } @@ -306,12 +303,10 @@ func cleanupTraceCancel(traceID string) { // req: 请求数据 // 返回: 响应数据(任意类型)和错误 func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err error) { - if !checkConnected() { + if !natsPing() { return fmt.Errorf("NATS 未连接") } - metrics.RequestCount.Add(1) - // 验证 resp 必须是指针类型 respValue := reflect.ValueOf(resp) if respValue.Kind() != reflect.Ptr { @@ -346,7 +341,6 @@ func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err er // 执行本地调用 var response interface{} if response, err = localHandler(cancelCtx, reqBody); err != nil { - metrics.RequestError.Add(1) return fmt.Errorf("本地调用 RPC 服务失败 [%s]: %w", serviceName, err) } @@ -357,7 +351,6 @@ func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err er var respMap map[string]any if json.Unmarshal(response.([]byte), &respMap) == nil { if errMsg, ok := respMap["_err"]; ok { - metrics.RequestError.Add(1) return fmt.Errorf("%v", errMsg) } } @@ -392,17 +385,17 @@ func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err er }) } - if msg.Header.Get(TraceIDKey) != "" { + if msg.Header.Get(traceIDKey) != "" { go func() { defer closeDone() select { case <-ctx.Done(): // context 被取消时,发送取消信号给服务端 if errors.Is(ctx.Err(), context.Canceled) { - if err := publishCancel(context.Background(), msg.Header.Get(TraceIDKey)); err != nil { + if err := publishCancel(context.Background(), msg.Header.Get(traceIDKey)); err != nil { g.Log().Errorf(ctx, "发送 RPC 取消信号失败: %v", err) } else { - g.Log().Infof(ctx, "RPC 调用已取消,traceID: %s", msg.Header.Get(TraceIDKey)) + g.Log().Infof(ctx, "RPC 调用已取消,traceID: %s", msg.Header.Get(traceIDKey)) } } case <-done: @@ -419,12 +412,10 @@ func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err er closeDone() if err != nil { - metrics.RequestError.Add(1) return fmt.Errorf("调用 RPC 服务失败 [%s]: %w", serviceName, err) } if responseMsg == nil { - metrics.RequestError.Add(1) return fmt.Errorf("RPC 响应为空 [%s]", serviceName) } @@ -434,7 +425,6 @@ func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err er var respMap map[string]any if json.Unmarshal(responseMsg.Data, &respMap) == nil { if errMsg, ok := respMap["_err"]; ok { - metrics.RequestError.Add(1) return fmt.Errorf("%v", errMsg) } } @@ -449,7 +439,7 @@ func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err er } // RegisterServiceOption 注册选项类型 -type RegisterServiceOption func(*registerServiceConfig) +type registerServiceOption func(*registerServiceConfig) type registerServiceConfig struct { queueName string // 队列组名(用于集群模式) @@ -457,14 +447,14 @@ type registerServiceConfig struct { } // WithQueueGroup 设置队列组名(集群模式) -func WithQueueGroup(queueName string) RegisterServiceOption { +func WithQueueGroup(queueName string) registerServiceOption { return func(cfg *registerServiceConfig) { cfg.queueName = queueName } } // WithExcludeMethods 排除不需要注册的方法 -func WithExcludeMethods(methods ...string) RegisterServiceOption { +func WithExcludeMethods(methods ...string) registerServiceOption { return func(cfg *registerServiceConfig) { cfg.excludeMethods = append(cfg.excludeMethods, methods...) } @@ -483,9 +473,9 @@ func WithExcludeMethods(methods ...string) RegisterServiceOption { // AutoRegisterServices(map[string]interface{}{ // "order": orderService, // }, WithQueueGroup("order-group")) -func AutoRegisterServices(ctx context.Context, serviceInstances map[string]interface{}, options ...RegisterServiceOption) error { +func AutoRegisterServices(ctx context.Context, serviceInstances map[string]interface{}, options ...registerServiceOption) error { // 先注册 RPC 服务(如果 NATS 不可用则记录警告但不阻塞启动) - if !checkConnected() { + if !natsPing() { return fmt.Errorf("NATS 未连接,RPC 服务未注册") } @@ -521,8 +511,8 @@ func AutoRegisterServices(ctx context.Context, serviceInstances map[string]inter } // registerService 注册单个服务的所有公开方法(内部函数) -func registerService(service interface{}, serviceNamePrefix string, options ...RegisterServiceOption) (err error) { - if !checkConnected() { +func registerService(service interface{}, serviceNamePrefix string, options ...registerServiceOption) (err error) { + if !natsPing() { return fmt.Errorf("NATS 未连接") } @@ -676,10 +666,10 @@ func registerService(service interface{}, serviceNamePrefix string, options ...R // ============ 上下文元数据工具函数 ============ // 以下函数用于在 context 和 NATS 消息头之间互转元数据 -// 定义常见的上下文元数据 key +// 定义常见的上下文元数据 key(私有) const ( - TraceIDKey = "trace_id" - TokenKey = "token" + traceIDKey = "trace_id" + tokenKey = "token" ) func getTraceID(ctx context.Context) (traceID string, err error) { @@ -687,7 +677,7 @@ func getTraceID(ctx context.Context) (traceID string, err error) { span := trace.SpanFromContext(ctx) if span != nil && span.SpanContext().HasTraceID() { traceID = span.SpanContext().TraceID().String() - } else if tid := ctx.Value(TraceIDKey); tid != nil { + } else if tid := ctx.Value(traceIDKey); tid != nil { traceID = fmt.Sprintf("%v", tid) } if traceID == "" { @@ -705,12 +695,12 @@ func contextToHeaders(ctx context.Context) (nats.Header, error) { if traceID, err := getTraceID(ctx); err != nil { return headers, err } else { - headers.Set(TraceIDKey, traceID) + headers.Set(traceIDKey, traceID) } // 提取 token(优先级:context value > HTTP Authorization header) token := "" - if t := ctx.Value(TokenKey); t != nil { + if t := ctx.Value(tokenKey); t != nil { token = fmt.Sprintf("%v", t) } else if r := g.RequestFromCtx(ctx); r != nil { // 从 HTTP 请求的 Authorization header 中提取 token @@ -725,7 +715,7 @@ func contextToHeaders(ctx context.Context) (nats.Header, error) { } } if token != "" { - headers.Set(TokenKey, token) + headers.Set(tokenKey, token) } return headers, nil @@ -739,13 +729,13 @@ func headersToContext(ctx context.Context, headers nats.Header) context.Context } // 恢复 trace_id - if traceID := headers.Get(TraceIDKey); traceID != "" { - ctx = context.WithValue(ctx, TraceIDKey, traceID) + if traceID := headers.Get(traceIDKey); traceID != "" { + ctx = context.WithValue(ctx, traceIDKey, traceID) } // 恢复 token - if token := headers.Get(TokenKey); token != "" { - ctx = context.WithValue(ctx, TokenKey, token) + if token := headers.Get(tokenKey); token != "" { + ctx = context.WithValue(ctx, tokenKey, token) } return ctx diff --git a/message/rabbit.go b/message/rabbit.go deleted file mode 100644 index 1ef01fb..0000000 --- a/message/rabbit.go +++ /dev/null @@ -1,351 +0,0 @@ -package message - -import ( - "context" - "sync" - "time" - - "github.com/gogf/gf/v2/encoding/gjson" - "github.com/gogf/gf/v2/errors/gerror" - "github.com/gogf/gf/v2/frame/g" - "github.com/gogf/gf/v2/util/gconv" - amqp "github.com/rabbitmq/amqp091-go" -) - -var ( - rabbitConn *amqp.Connection - rabbitChannel *amqp.Channel - rabbitOnce sync.Once - rabbitMu sync.RWMutex - rabbitCloseWatcher chan struct{} - rabbitWatcherStarted bool -) - -// Config RabbitMQ 配置 -type RabbitMQConfig1 struct { - Host string - Port int - Username string - Password string - VHost string -} - -// rabbitMQConfig 默认配置 -func getRabbitMQConfig() *RabbitMQConfig1 { - return &RabbitMQConfig1{ - Host: g.Cfg().MustGet(context.Background(), "rabbitmq.host").String(), - Port: g.Cfg().MustGet(context.Background(), "rabbitmq.port").Int(), - Username: g.Cfg().MustGet(context.Background(), "rabbitmq.username").String(), - Password: g.Cfg().MustGet(context.Background(), "rabbitmq.password").String(), - VHost: g.Cfg().MustGet(context.Background(), "rabbitmq.vhost", "/").String(), - } -} - -// initRabbitMQ 初始化 RabbitMQ 连接 -func initRabbitMQ(ctx context.Context) error { - var err error - rabbitOnce.Do(func() { - cfg := getRabbitMQConfig() - url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost - - rabbitConn, err = amqp.Dial(url) - if err != nil { - g.Log().Errorf(ctx, "RabbitMQ 连接失败: %v", err) - return - } - - rabbitChannel, err = rabbitConn.Channel() - if err != nil { - g.Log().Errorf(ctx, "创建 RabbitMQ Channel 失败: %v", err) - return - } - - rabbitCloseWatcher = make(chan struct{}) - - if !rabbitWatcherStarted { - go handleRabbitMQConnectionClose(ctx) - rabbitWatcherStarted = true - } - - g.Log().Info(ctx, "RabbitMQ 连接成功") - }) - - return err -} - -// getRabbitMQChannel 获取 RabbitMQ Channel -func getRabbitMQChannel() (*amqp.Channel, error) { - rabbitMu.RLock() - defer rabbitMu.RUnlock() - - if rabbitChannel == nil || rabbitChannel.IsClosed() { - return nil, gerror.New("RabbitMQ Channel 未初始化或已关闭") - } - - return rabbitChannel, nil -} - -// getRabbitMQConnection 获取 RabbitMQ 连接 -func getRabbitMQConnection() (*amqp.Connection, error) { - rabbitMu.RLock() - defer rabbitMu.RUnlock() - - if rabbitConn == nil || rabbitConn.IsClosed() { - return nil, gerror.New("RabbitMQ 连接未初始化或已关闭") - } - - return rabbitConn, nil -} - -// handleRabbitMQConnectionClose 监听连接关闭并重连 -func handleRabbitMQConnectionClose(ctx context.Context) { - for { - select { - case <-rabbitCloseWatcher: - g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态") - return - default: - } - - rabbitMu.RLock() - currentConn := rabbitConn - rabbitMu.RUnlock() - - if currentConn == nil { - return - } - - closeErr := make(chan *amqp.Error, 1) - currentConn.NotifyClose(closeErr) - - select { - case err := <-closeErr: - if err != nil { - g.Log().Errorf(ctx, "RabbitMQ 连接关闭: %v,尝试重连...", err) - reconnectRabbitMQ(ctx) - } - case <-rabbitCloseWatcher: - g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态") - return - } - } -} - -// reconnectRabbitMQ 重新连接 -func reconnectRabbitMQ(ctx context.Context) { - rabbitMu.Lock() - defer rabbitMu.Unlock() - - for i := 0; i < 10; i++ { - time.Sleep(time.Duration(i+1) * time.Second) - - cfg := getRabbitMQConfig() - url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost - - var err error - rabbitConn, err = amqp.Dial(url) - if err != nil { - g.Log().Errorf(ctx, "重连失败 (尝试 %d/10): %v", i+1, err) - continue - } - - rabbitChannel, err = rabbitConn.Channel() - if err != nil { - g.Log().Errorf(ctx, "创建 Channel 失败 (尝试 %d/10): %v", i+1, err) - continue - } - - g.Log().Info(ctx, "RabbitMQ 重连成功") - return - } - - g.Log().Fatal(ctx, "RabbitMQ 重连失败,已达到最大重试次数") -} - -// startRabbitMQConsumer 启动 RabbitMQ 消费者 -func startRabbitMQConsumer(ctx context.Context, msg QueueMessage) error { - // 初始化连接 - if err := initRabbitMQ(ctx); err != nil { - return gerror.Wrap(err, "初始化 RabbitMQ 连接失败") - } - - // 创建独立 Channel(避免并发冲突) - conn, err := getRabbitMQConnection() - if err != nil { - return gerror.Wrap(err, "获取RabbitMQ连接失败") - } - - ch, err := conn.Channel() - if err != nil { - return gerror.Wrap(err, "创建独立Channel失败") - } - - // 声明队列 - _, err = ch.QueueDeclare( - msg.Queue, // name - true, // durable - false, // autoDelete - false, // exclusive - false, // noWait - nil, // arguments - ) - if err != nil { - return gerror.Newf("声明队列失败: %v", err) - } - - // 设置 QoS(并发控制) - prefetchCount := msg.PrefetchCount - if prefetchCount == 0 { - prefetchCount = 1 - } - err = ch.Qos( - prefetchCount, // prefetchCount - 0, // prefetchSize - false, // global - ) - if err != nil { - return gerror.Newf("设置 QoS 失败: %v", err) - } - - // 开始消费 - msgs, err := ch.Consume( - msg.Queue, // queue - msg.ConsumerTag, // consumer tag - msg.AutoAck, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - if err != nil { - return gerror.Newf("开始消费失败: %v", err) - } - - workerCount := msg.WorkerCount - if workerCount == 0 { - workerCount = 1 - } - - g.Log().Infof(ctx, "RabbitMQ 消费者已启动: queue=%s, prefetch=%d, workers=%d", - msg.Queue, prefetchCount, workerCount) - - // 启动多个 worker - for i := 0; i < workerCount; i++ { - go rabbitMQWorker(ctx, i, msgs, msg) - } - - return nil -} - -// rabbitMQWorker RabbitMQ 工作协程 -func rabbitMQWorker(ctx context.Context, workerID int, msgs <-chan amqp.Delivery, msg QueueMessage) { - g.Log().Debugf(ctx, "RabbitMQ Worker %d 已启动", workerID) - - for { - select { - case <-ctx.Done(): - g.Log().Infof(ctx, "RabbitMQ Worker %d 收到停止信号,正在退出", workerID) - return - case delivery, ok := <-msgs: - if !ok { - g.Log().Infof(ctx, "RabbitMQ Worker %d 消息通道已关闭,退出", workerID) - return - } - - // 反序列化消息 - var message map[string]interface{} - if err := gjson.DecodeTo(delivery.Body, &message); err != nil { - g.Log().Errorf(ctx, "RabbitMQ Worker %d 反序列化消息失败: %v", workerID, err) - if !msg.AutoAck { - delivery.Nack(false, false) - } - continue - } - - // 处理消息 - err := msg.HandleFunc(ctx, message) - if err != nil { - g.Log().Errorf(ctx, "RabbitMQ Worker %d 处理消息失败: %v", workerID, err) - if !msg.AutoAck { - delivery.Nack(false, false) - } - } else { - if !msg.AutoAck { - delivery.Ack(false) - } - g.Log().Debugf(ctx, "RabbitMQ Worker %d 处理消息成功", workerID) - } - } - } -} - -// publishToRabbitMQ 发布消息到 RabbitMQ -func publishToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}) (messageID string, err error) { - ch, err := getRabbitMQChannel() - if err != nil { - return - } - - body, err := gjson.Encode(message) - if err != nil { - return "", gerror.Newf("消息序列化失败: %v", err) - } - - err = ch.PublishWithContext( - ctx, - exchange, // exchange - routingKey, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - DeliveryMode: amqp.Persistent, - ContentType: "application/json", - Body: body, - }, - ) - - if err != nil { - g.Log().Errorf(ctx, "发布消息失败: exchange=%s, routingKey=%s, err=%v", exchange, routingKey, err) - return - } - - g.Log().Debugf(ctx, "消息发布成功: exchange=%s, routingKey=%s", exchange, routingKey) - return messageID, nil -} - -// publishDelayedToRabbitMQ 发布延时消息到 RabbitMQ -func publishDelayedToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}, delaySeconds int) (messageID string, err error) { - ch, err := getRabbitMQChannel() - if err != nil { - return - } - - body, err := gjson.Encode(message) - if err != nil { - return "", gerror.Newf("消息序列化失败: %v", err) - } - - err = ch.PublishWithContext( - ctx, - exchange, // exchange(必须是 x-delayed-message 类型) - routingKey, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - DeliveryMode: amqp.Persistent, - ContentType: "application/json", - Body: body, - Headers: amqp.Table{ - "x-delay": delaySeconds * 1000, // 延时(毫秒) - }, - }, - ) - - if err != nil { - g.Log().Errorf(ctx, "发布延时消息失败: exchange=%s, routingKey=%s, delay=%ds, err=%v", exchange, routingKey, delaySeconds, err) - return - } - - g.Log().Debugf(ctx, "延时消息发布成功: exchange=%s, routingKey=%s, delay=%ds", exchange, routingKey, delaySeconds) - return messageID, nil -} diff --git a/message/rabbitmq_client.go b/message/rabbitmq_client.go deleted file mode 100644 index fc225f7..0000000 --- a/message/rabbitmq_client.go +++ /dev/null @@ -1,210 +0,0 @@ -package message - -import ( - "context" - "sync" - "time" - - "github.com/gogf/gf/v2/errors/gerror" - "github.com/gogf/gf/v2/frame/g" - "github.com/gogf/gf/v2/util/gconv" - amqp "github.com/rabbitmq/amqp091-go" -) - -var ( - conn *amqp.Connection - channel *amqp.Channel - rabbitmqOnce sync.Once - rabbitmqMu sync.RWMutex - closeWatcher chan struct{} // 用于停止监听 goroutine - watcherStarted bool // 防止重复启动监听 -) - -// Config RabbitMQ 配置 -type Config struct { - Host string - Port int - Username string - Password string - VHost string -} - -// Init 初始化 RabbitMQ 连接 -func Init(ctx context.Context, cfg *Config) error { - var err error - rabbitmqOnce.Do(func() { - // 构建连接字符串 - url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost - - // 创建连接 - conn, err = amqp.Dial(url) - if err != nil { - g.Log().Errorf(ctx, "RabbitMQ 连接失败: %v", err) - return - } - - // 创建 Channel - channel, err = conn.Channel() - if err != nil { - g.Log().Errorf(ctx, "创建 RabbitMQ Channel 失败: %v", err) - return - } - - // 初始化关闭监听器 - closeWatcher = make(chan struct{}) - - // 监听连接关闭(只启动一次) - if !watcherStarted { - go handleConnectionClose(ctx) - watcherStarted = true - } - - g.Log().Info(ctx, "RabbitMQ 连接成功") - }) - - return err -} - -// InitFromConfig 从配置文件初始化 -func InitFromConfig(ctx context.Context) error { - cfg := &Config{ - Host: g.Cfg().MustGet(ctx, "rabbitmq.host").String(), - Port: g.Cfg().MustGet(ctx, "rabbitmq.port").Int(), - Username: g.Cfg().MustGet(ctx, "rabbitmq.username").String(), - Password: g.Cfg().MustGet(ctx, "rabbitmq.password").String(), - VHost: g.Cfg().MustGet(ctx, "rabbitmq.vhost", "/").String(), - } - - return Init(ctx, cfg) -} - -// GetChannel 获取 Channel -func GetChannel() (*amqp.Channel, error) { - rabbitmqMu.RLock() - defer rabbitmqMu.RUnlock() - - if channel == nil || channel.IsClosed() { - return nil, gerror.New("RabbitMQ Channel 未初始化或已关闭") - } - - return channel, nil -} - -// GetConnection 获取连接 -func GetConnection() (*amqp.Connection, error) { - rabbitmqMu.RLock() - defer rabbitmqMu.RUnlock() - - if conn == nil || conn.IsClosed() { - return nil, gerror.New("RabbitMQ 连接未初始化或已关闭") - } - - return conn, nil -} - -// handleConnectionClose 监听连接关闭并重连 -func handleConnectionClose(ctx context.Context) { - for { - // 检查是否需要停止监听 - select { - case <-closeWatcher: - g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态") - return - default: - } - - rabbitmqMu.RLock() - currentConn := conn - rabbitmqMu.RUnlock() - - if currentConn == nil { - return - } - - // 创建关闭通知 channel - closeErr := make(chan *amqp.Error, 1) - currentConn.NotifyClose(closeErr) - - // 等待连接关闭或停止信号 - select { - case err := <-closeErr: - if err != nil { - g.Log().Errorf(ctx, "RabbitMQ 连接关闭: %v,尝试重连...", err) - reconnect(ctx) - } - case <-closeWatcher: - g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态") - return - } - } -} - -// reconnect 重新连接 -func reconnect(ctx context.Context) { - rabbitmqMu.Lock() - defer rabbitmqMu.Unlock() - - for i := 0; i < 10; i++ { - time.Sleep(time.Duration(i+1) * time.Second) - - cfg := &Config{ - Host: g.Cfg().MustGet(ctx, "rabbitmq.host").String(), - Port: g.Cfg().MustGet(ctx, "rabbitmq.port").Int(), - Username: g.Cfg().MustGet(ctx, "rabbitmq.username").String(), - Password: g.Cfg().MustGet(ctx, "rabbitmq.password").String(), - VHost: g.Cfg().MustGet(ctx, "rabbitmq.vhost", "/").String(), - } - - url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost - - var err error - conn, err = amqp.Dial(url) - if err != nil { - g.Log().Errorf(ctx, "重连失败 (尝试 %d/10): %v", i+1, err) - continue - } - - channel, err = conn.Channel() - if err != nil { - g.Log().Errorf(ctx, "创建 Channel 失败 (尝试 %d/10): %v", i+1, err) - continue - } - - g.Log().Info(ctx, "RabbitMQ 重连成功") - // 不再重复启动监听 goroutine - return - } - - g.Log().Fatal(ctx, "RabbitMQ 重连失败,已达到最大重试次数") -} - -// Close 关闭连接 -func Close(ctx context.Context) (err error) { - rabbitmqMu.Lock() - defer rabbitmqMu.Unlock() - - // 停止监听 goroutine - if closeWatcher != nil { - close(closeWatcher) - closeWatcher = nil - } - - if channel != nil { - if err = channel.Close(); err != nil { - g.Log().Errorf(ctx, "关闭 RabbitMQ Channel 失败: %v", err) - } - channel = nil - } - - if conn != nil { - if err = conn.Close(); err != nil { - g.Log().Errorf(ctx, "关闭 RabbitMQ 连接失败: %v", err) - return - } - conn = nil - } - - watcherStarted = false - g.Log().Info(ctx, "RabbitMQ 连接已关闭") - return -} diff --git a/message/rabbitmq_msg.go b/message/rabbitmq_msg.go index 8d49663..0a65e20 100644 --- a/message/rabbitmq_msg.go +++ b/message/rabbitmq_msg.go @@ -10,61 +10,129 @@ import ( amqp "github.com/rabbitmq/amqp091-go" ) -// rabbitMQMessageClient RabbitMQ 实现 -type rabbitMQMessageClient struct { - clientType messageClientType +type RabbitMQPublishMsgConfig struct { + QueueName string + Durable bool + DelayTime int + Data any } -// StreamGroup 创建消费组(支持单个或批量) -func (q *rabbitMQMessageClient) streamGroup(ctx context.Context, configs ...interface{}) error { - if len(configs) == 0 { - return fmt.Errorf("配置不能为空") - } - for _, config := range configs { - cfg, ok := config.(*RabbitMQConfig) - if !ok { - return fmt.Errorf("无效的 RabbitMQ 配置类型") - } - if err := q.setupQueue(ctx, channel, cfg, cfg.DelayMessage); err != nil { - return err - } - } - return nil +type RabbitMQSubscribeMsgConfig struct { + QueueName string + Durable bool + DelayTime int + ConsumerName string + AutoAck bool + PrefetchCount int + HandleFunc func(ctx context.Context, message map[string]interface{}) error } -// Publish 发布消息(支持单个或批量) -func (q *rabbitMQMessageClient) publish(ctx context.Context, config interface{}, data interface{}) error { - cfg, ok := config.(*RabbitMQConfig) +func (*RabbitMQPublishMsgConfig) GetPublishMsgType() { + +} + +func (*RabbitMQSubscribeMsgConfig) GetSubscribeMsgType() { + +} + +func init() { + // 注册 RabbitMQ 插件,必须使用 RegisterPlugin 确保连接检测 + //registerPlugin(MessageRabbitMQ, func() messageUtil { + // return &rabbitMQ{} + //}) +} + +type rabbitMQ struct{} + +// Ping 检测 RabbitMQ 连接状态 +func (c *rabbitMQ) ping(ctx context.Context) bool { + return rabbitmqPing() +} + +// Reconnect 重连 RabbitMQ +func (c *rabbitMQ) reconnect(ctx context.Context) error { + return rabbitmqReconnect(ctx) +} + +// Close 关闭 RabbitMQ 连接 +func (c *rabbitMQ) close(ctx context.Context) error { + return rabbitmqClose(ctx) +} + +// Publish 发布消息 +func (c *rabbitMQ) Publish(ctx context.Context, msgConfig messagePublishConfig) error { + cfg, ok := msgConfig.(*RabbitMQPublishMsgConfig) if !ok { return fmt.Errorf("无效的 RabbitMQ 配置类型") } - if err := q.publishMessage(ctx, cfg, "work", data, 0); err != nil { - g.Log().Errorf(ctx, "❌ RabbitMQ 发布消息失败: err=%v", err) - return err + if g.IsEmpty(cfg.QueueName) { + return fmt.Errorf("队列名称不能为空") } - return nil + if cfg.Data == nil { + return fmt.Errorf("数据不能为空") + } + return c.publishMessageInternal(ctx, cfg.QueueName, cfg.Durable, cfg.DelayTime, cfg.Data) } -// PublishDelayed 发布延迟消息 -func (q *rabbitMQMessageClient) publishDelayed(ctx context.Context, config interface{}, data interface{}, delaySeconds int) error { - cfg, ok := config.(*RabbitMQConfig) - if !ok { - return fmt.Errorf("无效的 RabbitMQ 配置类型") - } - if err := q.publishMessage(ctx, cfg, "delayed", data, delaySeconds); err != nil { - g.Log().Errorf(ctx, "❌ RabbitMQ 发布延迟消息失败: err=%v", err) - return err - } - return nil -} +// publishMessage 发布消息内部实现 +func (c *rabbitMQ) publishMessageInternal(ctx context.Context, queueName string, durable bool, delayTime int, data interface{}) error { + delayMsg := delayTime > 0 -func (q *rabbitMQMessageClient) publishMessage(ctx context.Context, cfg *RabbitMQConfig, mode string, data interface{}, delaySeconds int) error { + // 1. 决定 Exchange 类型 + exchangeType := "fanout" + exchangeName := queueName + routingKey := queueName + args := amqp.Table{} + if delayMsg { + exchangeType = "x-delayed-message" + exchangeName = queueName + ".delayed" + args["x-delayed-type"] = "fanout" // 底层用 topic + } + + // 2. 声明 Exchange(只声明一次) + if err := channel.ExchangeDeclare( + queueName, // exchange 交换机名称 + exchangeType, + durable, + false, // autoDelete + false, // internal + false, // noWait + args, + ); err != nil { + return fmt.Errorf("声明 Exchange 失败: %w", err) + } + + // 3. 声明队列 + if _, err := channel.QueueDeclare( + queueName, + durable, + false, // autoDelete + false, // exclusive + false, // noWait + nil, // args + ); err != nil { + return fmt.Errorf("声明队列失败: %w", err) + } + + // 4. 绑定队列 + if err := channel.QueueBind( + queueName, + routingKey, // routingKey 路由键 + exchangeName, // exchange 交换机名称 + false, // noWait + nil, // args + ); err != nil { + return fmt.Errorf("绑定队列失败: %w", err) + } + + // 5. 序列化数据 body, err := json.Marshal(data) if err != nil { return fmt.Errorf("序列化数据失败: %w", err) } + // 6. 发布消息 deliveryMode := amqp.Transient - if cfg.Durable { + if durable { deliveryMode = amqp.Persistent } publishing := amqp.Publishing{ @@ -73,15 +141,15 @@ func (q *rabbitMQMessageClient) publishMessage(ctx context.Context, cfg *RabbitM DeliveryMode: deliveryMode, Timestamp: time.Now(), } - if delaySeconds > 0 { + if delayMsg { + duration := time.Duration(delayTime) * time.Minute publishing.Headers = amqp.Table{ - "x-delay": delaySeconds * 1000, // 延时时间(毫秒) + "x-delay": duration, // 延迟时间(毫秒) } } - exchange, routingKey := q.parseExchangeAndRoutingKey(ctx, mode, cfg) err = channel.PublishWithContext( ctx, - exchange, + exchangeName, routingKey, false, false, publishing, @@ -89,106 +157,44 @@ func (q *rabbitMQMessageClient) publishMessage(ctx context.Context, cfg *RabbitM return err } -func (q *rabbitMQMessageClient) parseExchangeAndRoutingKey(_ context.Context, mode string, cfg *RabbitMQConfig) (exchange, routingKey string) { - switch mode { - case "work", "": - exchange = "" // 默认交换机 - routingKey = cfg.Name // 队列名 - case "event", "topic": - exchange = cfg.Exchange - routingKey = cfg.Topic - case "broadcast": - exchange = cfg.Exchange - routingKey = "" // fanout忽略路由键 - case "delayed": - exchange = cfg.Exchange + ".delayed" - routingKey = cfg.Topic - default: - exchange = "" - routingKey = cfg.Name +// Subscribe 订阅消息 +func (c *rabbitMQ) Subscribe(ctx context.Context, msgConfig messageSubscribeConfig) error { + cfg, ok := msgConfig.(*RabbitMQSubscribeMsgConfig) + if !ok { + return fmt.Errorf("无效的 RabbitMQ 配置类型") } - return exchange, routingKey + if g.IsEmpty(cfg.QueueName) { + return fmt.Errorf("队列名称不能为空") + } + if g.IsEmpty(cfg.ConsumerName) { + return fmt.Errorf("消费者名称不能为空") + } + if g.IsEmpty(cfg.PrefetchCount) { + cfg.PrefetchCount = 1 + } + if g.IsEmpty(cfg.HandleFunc) { + return fmt.Errorf("必须提供处理函数") + } + return c.createSubscribeInternal(ctx, cfg.QueueName, cfg.ConsumerName, cfg.PrefetchCount, cfg.AutoAck, cfg.HandleFunc) } -// setupQueue 统一的队列设置方法(声明 Exchange、队列、绑定、延迟 Exchange) -func (q *rabbitMQMessageClient) setupQueue(ctx context.Context, ch *amqp.Channel, cfg *RabbitMQConfig, delayMessage bool) error { - exchange, routingKey := q.parseExchangeAndRoutingKey(ctx, cfg.Mode, cfg) +// createSubscribe 内部订阅消息 +func (c *rabbitMQ) createSubscribeInternal(ctx context.Context, queueName, consumerName string, prefetchCount int, autoAck bool, handler func(ctx context.Context, message map[string]interface{}) error) error { + g.Log().Infof(ctx, "🔔 RabbitMQ 开始订阅: queueName=%s, consumerName=%s", queueName, consumerName) - // 声明 Exchange - if err := ch.ExchangeDeclare(exchange, "topic", cfg.Durable, false, false, false, nil); err != nil { - return fmt.Errorf("声明 Exchange 失败: %w", err) - } - - // 声明队列 - if _, err := ch.QueueDeclare(cfg.Queue, cfg.Durable, false, false, false, nil); err != nil { - return fmt.Errorf("声明队列失败: %w", err) - } - - // 绑定队列 - if err := ch.QueueBind(cfg.Queue, routingKey, exchange, false, nil); err != nil { - return fmt.Errorf("绑定队列失败: %w", err) - } - - // 声明延迟 Exchange(如果需要) - if delayMessage { - if err := ch.ExchangeDeclare(exchange, "x-delayed-message", true, false, false, false, amqp.Table{"x-delayed-type": "direct"}); err != nil { - return fmt.Errorf("声明延迟 Exchange 失败: %w", err) - } - if err := ch.QueueBind(cfg.Name, routingKey, exchange, false, nil); err != nil { - return fmt.Errorf("绑定延迟队列失败: %w", err) - } - } - - return nil -} - -// Subscribe 订阅消息(支持单个或批量) -func (q *rabbitMQMessageClient) subscribe(ctx context.Context, configs ...interface{}) error { - if len(configs) == 0 { - return fmt.Errorf("配置不能为空") - } - - for _, config := range configs { - cfg, ok := config.(*RabbitMQConfig) - if !ok { - return fmt.Errorf("无效的 RabbitMQ 配置类型") - } - handler := cfg.HandleFunc - if handler == nil { - return fmt.Errorf("必须提供处理函数") - } - if err := q.createSubscribe(ctx, cfg, handler); err != nil { - return err - } - } - return nil -} - -// subscribe 内部单个订阅消息 -func (q *rabbitMQMessageClient) createSubscribe(ctx context.Context, cfg *RabbitMQConfig, handler func(ctx context.Context, message map[string]interface{}) error) error { - g.Log().Infof(ctx, "🔔 RabbitMQ 开始订阅: exchange=%s, queue=%s", cfg.Exchange, cfg.Queue) - - // 设置 Qos (预取数量),控制每次推送的消息数量 - // prefetchCount: 未 ACK 消息的最大数量 - // prefetchSize: 未 ACK 消息的总大小(0 表示不限制) - // global: false 表示仅应用于当前消费者 - prefetchCount := cfg.PrefetchCount - if prefetchCount <= 0 { - prefetchCount = 10 // 默认值为 10 - } if err := channel.Qos(prefetchCount, 0, false); err != nil { return fmt.Errorf("设置 Qos 失败: %w", err) } g.Log().Infof(ctx, "📊 设置 Prefetch Count: %d", prefetchCount) msg, err := channel.Consume( - cfg.Queue, // queue - cfg.Queue, // consumer - cfg.AutoAck, // auto-ack (根据配置决定) - false, // exclusive - false, // no-local - false, // no-wait - nil, // args + queueName, // queue + consumerName, // consumer + autoAck, // auto-ack (根据配置决定) + false, // exclusive + false, // no-local + false, // no-wait + nil, // args ) if err != nil { return fmt.Errorf("注册消费者失败: %w", err) @@ -207,7 +213,7 @@ func (q *rabbitMQMessageClient) createSubscribe(ctx context.Context, cfg *Rabbit for { select { case <-ctx.Done(): - g.Log().Infof(ctx, "🔕 RabbitMQ 消费者停止: queue=%s", cfg.Queue) + g.Log().Infof(ctx, "🔕 RabbitMQ 消费者停止: queueName=%s, consumerName=%s", queueName, consumerName) return case msg, ok := <-msg: if !ok { @@ -226,11 +232,11 @@ func (q *rabbitMQMessageClient) createSubscribe(ctx context.Context, cfg *Rabbit } }() - if err := q.handleMessageWithRetry(ctx, m, handler, cfg.MaxRetry); err != nil { + if err := c.handleMessageWithRetryInternal(ctx, m, handler, autoAck); err != nil { g.Log().Errorf(ctx, "❌ 消息处理失败(重试次数耗尽): %v", err) // 仅在手动 ACK 模式下拒绝消息 - if !cfg.AutoAck { + if !autoAck { // 拒绝消息不再重新入队(避免死循环) m.Nack(false, false) } @@ -238,7 +244,7 @@ func (q *rabbitMQMessageClient) createSubscribe(ctx context.Context, cfg *Rabbit } // 仅在手动 ACK 模式下确认消息 - if cfg.AutoAck { + if autoAck { if err := m.Ack(false); err != nil { g.Log().Errorf(ctx, "❌ ACK 消息失败: %v", err) } @@ -252,7 +258,7 @@ func (q *rabbitMQMessageClient) createSubscribe(ctx context.Context, cfg *Rabbit } // handleMessageWithRetry 处理消息(支持重试) -func (q *rabbitMQMessageClient) handleMessageWithRetry(ctx context.Context, msg amqp.Delivery, handler func(ctx context.Context, message map[string]interface{}) error, maxRetry int) error { +func (c *rabbitMQ) handleMessageWithRetryInternal(ctx context.Context, msg amqp.Delivery, handler func(ctx context.Context, message map[string]interface{}) error, autoAck bool) error { var data map[string]interface{} if err := json.Unmarshal(msg.Body, &data); err != nil { @@ -263,6 +269,7 @@ func (q *rabbitMQMessageClient) handleMessageWithRetry(ctx context.Context, msg } // 重试逻辑 + const maxRetry = 3 for attempt := 0; attempt <= maxRetry; attempt++ { if attempt > 0 { g.Log().Infof(ctx, "🔄 消息处理重试 (第%d次)", attempt) diff --git a/message/redis.go b/message/redis.go deleted file mode 100644 index 67e88f1..0000000 --- a/message/redis.go +++ /dev/null @@ -1,275 +0,0 @@ -package message - -import ( - "context" - "errors" - "fmt" - "strings" - "time" - - "github.com/gogf/gf/v2/database/gredis" - "github.com/gogf/gf/v2/frame/g" - "github.com/gogf/gf/v2/os/glog" - "github.com/gogf/gf/v2/util/gconv" -) - -// StreamMessage Redis Stream 消息结构 -type StreamMessage struct { - ID string // 消息ID(自动生成) - Values map[string]interface{} // 消息内容 -} - -// getClient 获取 Redis 客户端 -func getRedisClientTest(name string) *gredis.Redis { - return g.Redis(name) -} - -// getRedisClientByDB 根据DB获取Redis客户端,如果db<=0则返回默认客户端 -func getRedisClientByDB(db int) *gredis.Redis { - if db <= 0 { - return g.Redis() - } - // 创建连接到指定DB的Redis客户端 - client, err := gredis.New(&gredis.Config{ - Address: g.Cfg().MustGet(context.Background(), "redis.default.address").String(), - Db: db, - }) - if err != nil { - glog.Errorf(context.Background(), "创建Redis客户端失败: %v", err) - return g.Redis() - } - return client -} - -// lock 分布式锁 -func lock(ctx context.Context, key string, expireSeconds int64, fn func(ctx context.Context) error) (success bool, err error) { - ds, err := GetManager().GetDefaultDataSource() - if err != nil { - return false, fmt.Errorf("获取默认数据源失败: %w", err) - } - - maxRetries := 3 - for i := 0; i < maxRetries; i++ { - if val, err := ds.Redis().Set(ctx, key, true, gredis.SetOption{ - TTLOption: gredis.TTLOption{ - EX: &expireSeconds, - }, - NX: true, - }); err != nil { - return false, err - } else { - if val.Bool() { - defer func(redisClient *gredis.Redis, ctx context.Context, key string) { - if _, err = redisClient.Del(ctx, key); err != nil { - glog.Errorf(ctx, "RedisClient.Del error: %v", err) - } - }(ds.Redis(), ctx, key) - if err = fn(ctx); err != nil { - return false, err - } - return true, nil - } else { - // 检查上下文是否已取消 - if ctx.Err() != nil { - return false, ctx.Err() - } - // 非最后一次重试时才等待 - if i < maxRetries-1 { - time.Sleep(time.Second) - } - } - } - } - return false, errors.New("锁重试次数耗尽") -} - -// publishToRedis 将消息添加到 Redis Stream -func publishToRedis(ctx context.Context, streamKey string, msg interface{}) (messageID string, err error) { - ds, err := GetManager().GetDefaultDataSource() - if err != nil { - return "", fmt.Errorf("获取默认数据源失败: %w", err) - } - - if !ds.IsConnected() { - if err := ds.Reconnect(ctx); err != nil { - return "", fmt.Errorf("redis重连失败: %w", err) - } - } - - values := gconv.Map(msg) - args := make([]interface{}, 0, len(values)*2+2) - args = append(args, streamKey, "*") - for key, val := range values { - args = append(args, key, val) - } - result, err := ds.Redis().Do(ctx, "XADD", args...) - if err != nil { - return - } - messageID = result.String() - return -} - -// initStreamGroup 初始化 Stream 和消费者组 -func initStreamGroup(ctx context.Context, streamKey, groupName string) error { - ds, err := GetManager().GetDefaultDataSource() - if err != nil { - return fmt.Errorf("获取默认数据源失败: %w", err) - } - - if !ds.IsConnected() { - if err := ds.Reconnect(ctx); err != nil { - return fmt.Errorf("redis重连失败: %w", err) - } - } - - _, err = ds.Redis().Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM") - if err != nil { - // 如果组已存在,忽略错误 - errStr := err.Error() - // 检查错误是否是 "BUSYGROUP Consumer Group name already exists" - if strings.Contains(errStr, "BUSYGROUP") || strings.Contains(errStr, "already exists") { - // 这是一个预期的情况,说明消费者组已经存在,无需处理 - return nil - } - // 这是一个真正的错误,需要记录或处理 - return err - } - return nil -} - -// readFromStream 从 Stream 读取消息 -func readFromStream(ctx context.Context, msg QueueMessage) error { - ds, err := GetManager().GetDefaultDataSource() - if err != nil { - return fmt.Errorf("获取默认数据源失败: %w", err) - } - - // 初始化 Stream 和消费者组 - if err := initStreamGroup(ctx, msg.StreamKey, msg.GroupName); err != nil { - return err - } - go func() { - RECONNECT: - for { - result, err := ds.Redis().Do(ctx, "XREADGROUP", "GROUP", msg.GroupName, msg.ConsumerName, "COUNT", msg.BatchSize, "BLOCK", 0, "STREAMS", msg.StreamKey, ">") - if err != nil { - //select { - //case <-ctx.Done(): - // return - //} - time.Sleep(time.Second) - goto RECONNECT - } - // 检查返回结果是否为空 - if result == nil || result.IsEmpty() { - continue - } - messages := make([]StreamMessage, 0, int(msg.BatchSize)) - // 尝试 map 格式(GoFrame gredis 返回) - if streamsMap, ok := result.Val().(map[interface{}]interface{}); ok { - for _, streamMsgs := range streamsMap { - msgsArray, ok := streamMsgs.([]interface{}) - if !ok { - continue - } - for _, msgData := range msgsArray { - msgArray, ok := msgData.([]interface{}) - if !ok || len(msgArray) < 2 { - continue - } - msgID := gconv.String(msgArray[0]) - fieldsArray, ok := msgArray[1].([]interface{}) - if !ok { - continue - } - values := make(map[string]interface{}, len(fieldsArray)/2) - for i := 0; i < len(fieldsArray); i += 2 { - if i+1 < len(fieldsArray) { - key := gconv.String(fieldsArray[i]) - values[key] = fieldsArray[i+1] - } - } - messages = append(messages, StreamMessage{ - ID: msgID, - Values: values, - }) - } - } - } - // 尝试数组格式(标准 Redis 返回) - if streamsArray, ok := result.Val().([]interface{}); ok && len(streamsArray) > 0 { - for _, streamData := range streamsArray { - streamArray, ok := streamData.([]interface{}) - if !ok || len(streamArray) < 2 { - continue - } - messagesArray, ok := streamArray[1].([]interface{}) - if !ok { - continue - } - for _, msgData := range messagesArray { - msgArray, ok := msgData.([]interface{}) - if !ok || len(msgArray) < 2 { - continue - } - msgID := gconv.String(msgArray[0]) - fieldsArray, ok := msgArray[1].([]interface{}) - if !ok { - continue - } - values := make(map[string]interface{}, len(fieldsArray)/2) - for i := 0; i < len(fieldsArray); i += 2 { - if i+1 < len(fieldsArray) { - key := gconv.String(fieldsArray[i]) - values[key] = fieldsArray[i+1] - } - } - messages = append(messages, StreamMessage{ - ID: msgID, - Values: values, - }) - } - } - } - // 处理消息 - for _, streamMsg := range messages { - // 业务处理 - if err := msg.HandleFunc(ctx, streamMsg.Values); err != nil { - glog.Infof(ctx, "业务处理失败-> err:%v\n", err) - continue - } - // 确认消息 - if msg.AutoAck { - err := ackMessage(ctx, msg.StreamKey, msg.GroupName, streamMsg.ID) - if err != nil { - glog.Infof(ctx, "消费者 '%s' 确认消息 ID %s 失败: %v\n", msg.ConsumerName, streamMsg.ID, err) - } - } - } - } - }() - return nil -} - -// ackMessage 确认消息已处理 -func ackMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error { - ds, err := GetManager().GetDefaultDataSource() - if err != nil { - return fmt.Errorf("获取默认数据源失败: %w", err) - } - - if !ds.IsConnected() { - if err := ds.Reconnect(ctx); err != nil { - return fmt.Errorf("redis重连失败: %w", err) - } - } - - args := make([]interface{}, 0, len(messageIDs)+2) - args = append(args, streamKey, groupName) - for _, id := range messageIDs { - args = append(args, id) - } - _, err = ds.Redis().Do(ctx, "XACK", args...) - return err -} diff --git a/message/redis_client.go b/message/redis_client.go deleted file mode 100644 index 2db78cb..0000000 --- a/message/redis_client.go +++ /dev/null @@ -1,468 +0,0 @@ -// ============================================================================= -// Redis 数据源连接管理 -// 使用 GoFrame 框架自带的 Redis 客户端,负责数据源的连接、重连、健康检查和优雅关闭 -// ============================================================================= - -package message - -import ( - "context" - "fmt" - "os" - "os/signal" - "sync" - "sync/atomic" - "syscall" - "time" - - "github.com/gogf/gf/v2/database/gredis" - "github.com/gogf/gf/v2/frame/g" - "github.com/gogf/gf/v2/os/glog" - "github.com/gogf/gf/v2/util/gconv" -) - -// ============================================================================= -// 数据源配置结构 -// ============================================================================= - -type RedisDataSourceConfig struct { - Name string `json:"name"` // 数据源名称 - Address string `json:"address"` // Redis 地址,如: 127.0.0.1:6379 - Db int `json:"db"` // 数据库编号 - Pass string `json:"pass"` // 密码 - Timeout time.Duration `json:"timeout"` // 连接超时 - MaxIdle int `json:"maxIdle"` // 最大空闲连接数 - MaxOpen int `json:"maxOpen"` // 最大活跃连接数 -} - -// ============================================================================= -// 单个数据源接口 -// ============================================================================= - -type DataSource interface { - Name() string - Redis() *gredis.Redis - IsConnected() bool - Connect(ctx context.Context) error - Reconnect(ctx context.Context) error - Close(ctx context.Context) error -} - -// ============================================================================= -// 数据源实现 -// ============================================================================= - -type BaseDataSource struct { - config *RedisDataSourceConfig - redis *gredis.Redis - isConnected bool - mu sync.RWMutex - lastError error - lastErrorTime time.Time - metrics RedisMetrics -} - -func NewBaseDataSource(config *RedisDataSourceConfig) *BaseDataSource { - return &BaseDataSource{ - config: config, - isConnected: false, - } -} - -func (d *BaseDataSource) Name() string { - return d.config.Name -} - -func (d *BaseDataSource) Redis() *gredis.Redis { - d.mu.RLock() - defer d.mu.RUnlock() - return d.redis -} - -func (d *BaseDataSource) IsConnected() bool { - d.mu.RLock() - defer d.mu.RUnlock() - return d.isConnected && d.redis != nil -} - -func (d *BaseDataSource) Connect(ctx context.Context) error { - d.mu.Lock() - defer d.mu.Unlock() - - // 设置默认值 - config := d.config - if config.Timeout == 0 { - config.Timeout = 10 * time.Second - } - if config.MaxIdle == 0 { - config.MaxIdle = 10 - } - if config.MaxOpen == 0 { - config.MaxOpen = 100 - } - - // 构建 GoFrame Redis 配置 - redisConfig := &gredis.Config{ - Address: config.Address, - Db: config.Db, - Pass: config.Pass, - } - - // 使用 GoFrame 的 Redis 连接 - redisObj, err := gredis.New(redisConfig) - if err != nil { - d.isConnected = false - d.lastError = err - d.lastErrorTime = time.Now() - d.metrics.PingError.Add(1) - return fmt.Errorf("datasource [%s] connection failed: %w", d.config.Name, err) - } - - d.redis = redisObj - - // 测试连接 - if err := d.Ping(ctx); err != nil { - d.isConnected = false - d.lastError = err - d.lastErrorTime = time.Now() - return fmt.Errorf("datasource [%s] ping failed: %w", d.config.Name, err) - } - - d.isConnected = true - d.lastError = nil - glog.Infof(ctx, "✅ datasource [%s] connected successfully", d.config.Name) - return nil -} - -func (d *BaseDataSource) Ping(ctx context.Context) error { - defer func() { - if r := recover(); r != nil { - d.metrics.PingError.Add(1) - glog.Errorf(ctx, "❌ datasource [%s] ping panic: %v", d.config.Name, r) - } - }() - - if d.redis == nil { - d.metrics.PingError.Add(1) - return fmt.Errorf("redis client is nil") - } - - _, err := d.redis.Do(ctx, "PING") - if err != nil { - d.metrics.PingError.Add(1) - return err - } - - d.metrics.PingCount.Add(1) - return nil -} - -func (d *BaseDataSource) Reconnect(ctx context.Context) error { - glog.Infof(ctx, "🔄 reconnecting datasource [%s]", d.config.Name) - return d.Connect(ctx) -} - -func (d *BaseDataSource) Close(ctx context.Context) error { - d.mu.Lock() - defer d.mu.Unlock() - - if d.redis != nil { - if err := d.redis.Close(ctx); err != nil { - return fmt.Errorf("datasource [%s] close failed: %w", d.config.Name, err) - } - } - - d.isConnected = false - d.redis = nil - glog.Infof(ctx, "datasource [%s] closed", d.config.Name) - return nil -} - -func (d *BaseDataSource) GetMetrics() RedisMetrics { - return d.metrics -} - -// ============================================================================= -// 监控指标 -// ============================================================================= - -type RedisMetrics struct { - PingCount atomic.Int64 - PingError atomic.Int64 - CommandCount atomic.Int64 - CommandError atomic.Int64 -} - -// GetPingMetrics 获取 Ping 相关指标 -func (m *RedisMetrics) GetPingMetrics() (int64, int64) { - return m.PingCount.Load(), m.PingError.Load() -} - -// GetCommandMetrics 获取命令相关指标 -func (m *RedisMetrics) GetCommandMetrics() (int64, int64) { - return m.CommandCount.Load(), m.CommandError.Load() -} - -// ============================================================================= -// 多数据源管理器 -// ============================================================================= - -type DataSourceManager struct { - sources map[string]DataSource - mu sync.RWMutex - ctx context.Context - cancel context.CancelFunc - started bool - maxRetries int - metrics RedisMetrics -} - -var ( - manager *DataSourceManager - once sync.Once -) - -// GetManager 获取全局管理器 -func GetManager() *DataSourceManager { - once.Do(func() { - ctx, cancel := context.WithCancel(context.Background()) - manager = &DataSourceManager{ - sources: make(map[string]DataSource), - ctx: ctx, - cancel: cancel, - started: false, - maxRetries: 3, - } - }) - return manager -} - -// RegisterDataSource 注册数据源 -func (m *DataSourceManager) RegisterDataSource(config *RedisDataSourceConfig) error { - m.mu.Lock() - defer m.mu.Unlock() - - if _, exists := m.sources[config.Name]; exists { - return fmt.Errorf("datasource [%s] already exists", config.Name) - } - - source := NewBaseDataSource(config) - m.sources[config.Name] = source - return nil -} - -// GetDataSource 获取数据源 -func (m *DataSourceManager) GetDataSource(name string) (DataSource, error) { - m.mu.RLock() - defer m.mu.RUnlock() - - source, exists := m.sources[name] - if !exists { - return nil, fmt.Errorf("datasource [%s] not found", name) - } - return source, nil -} - -// GetAllDataSourceNames 获取所有数据源名称 -func (m *DataSourceManager) GetAllDataSourceNames() []string { - m.mu.RLock() - defer m.mu.RUnlock() - - names := make([]string, 0, len(m.sources)) - for name := range m.sources { - names = append(names, name) - } - return names -} - -// GetDefaultDataSource 获取默认数据源(第一个注册的数据源) -func (m *DataSourceManager) GetDefaultDataSource() (DataSource, error) { - m.mu.RLock() - defer m.mu.RUnlock() - - for _, source := range m.sources { - return source, nil - } - return nil, fmt.Errorf("no datasource available") -} - -// GetMetrics 获取全局监控指标 -func (m *DataSourceManager) GetMetrics() RedisMetrics { - return m.metrics -} - -// init 初始化多数据源 -func init() { - ctx := context.Background() - - // 从配置初始化多数据源 - if err := GetManager().InitializeFromConfig(ctx); err != nil { - glog.Errorf(ctx, "❌ Failed to initialize Redis datasources: %v", err) - } else { - glog.Infof(ctx, "✅ Redis datasources initialized: %v", GetManager().GetAllDataSourceNames()) - } - - // 启动健康检查 - GetManager().StartHealthCheck() - - // 设置优雅关闭 - setupGracefulShutdown() -} - -// InitializeFromConfig 从配置初始化数据源 -// 动态读取 config.yml 中 redis 下的所有配置项 -func (m *DataSourceManager) InitializeFromConfig(ctx context.Context) error { - var firstErr error - - // 获取 redis 配置下的所有子键 - redisConfig := g.Cfg().MustGet(ctx, "redis") - if redisConfig.IsNil() { - glog.Warningf(ctx, "no redis configuration found in config.yml") - return nil - } - - // 将配置转换为 map - configMap := redisConfig.Map() - if configMap == nil { - glog.Warningf(ctx, "redis configuration is not a map") - return nil - } - - // 遍历所有 redis 子配置 - for name, subConfig := range configMap { - // 跳过非对象类型的配置 - subMap, ok := subConfig.(map[string]interface{}) - if !ok { - continue - } - - // 检查是否有 address 配置 - address, hasAddress := subMap["address"] - if !hasAddress || gconv.String(address) == "" { - continue - } - - // 构建数据源配置 - config := &RedisDataSourceConfig{ - Name: name, - Address: gconv.String(address), - Db: gconv.Int(subMap["db"]), - Pass: gconv.String(subMap["pass"]), - } - - // 设置默认值 - if config.Db == 0 { - config.Db = 0 - } - if config.Timeout == 0 { - config.Timeout = 10 * time.Second - } - if config.MaxIdle == 0 { - config.MaxIdle = 10 - } - if config.MaxOpen == 0 { - config.MaxOpen = 100 - } - - // 注册数据源 - if err := m.RegisterDataSource(config); err != nil { - glog.Errorf(ctx, "failed to register datasource [%s]: %v", name, err) - if firstErr == nil { - firstErr = err - } - continue - } - - // 连接数据源 - source, _ := m.GetDataSource(name) - if err := source.Connect(ctx); err != nil { - glog.Errorf(ctx, "failed to initialize datasource [%s]: %v", name, err) - if firstErr == nil { - firstErr = err - } - } - } - - return firstErr -} - -// StartHealthCheck 启动健康检查 -func (m *DataSourceManager) StartHealthCheck() { - if m.started { - return - } - m.started = true - go m.healthCheckLoop() -} - -// healthCheckLoop 健康检查循环 -func (m *DataSourceManager) healthCheckLoop() { - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - - for { - select { - case <-m.ctx.Done(): - return - case <-ticker.C: - m.checkAndReconnect() - } - } -} - -// checkAndReconnect 检查并重新连接 -func (m *DataSourceManager) checkAndReconnect() { - m.mu.RLock() - defer m.mu.RUnlock() - - for name, source := range m.sources { - if !source.IsConnected() { - glog.Warningf(context.Background(), "datasource [%s] disconnected, attempting reconnect", name) - - reconnectCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - if err := source.Reconnect(reconnectCtx); err != nil { - glog.Errorf(reconnectCtx, "datasource [%s] reconnect failed: %v", name, err) - } else { - glog.Infof(reconnectCtx, "✅ datasource [%s] reconnected successfully", name) - } - } - } -} - -// CloseAll 关闭所有数据源 -func (m *DataSourceManager) CloseAll(ctx context.Context) error { - m.cancel() - - m.mu.RLock() - defer m.mu.RUnlock() - - var lastErr error - for name, source := range m.sources { - if err := source.Close(ctx); err != nil { - glog.Errorf(ctx, "failed to close datasource [%s]: %v", name, err) - lastErr = err - } - } - return lastErr -} - -// setupGracefulShutdown 设置优雅关闭 -func setupGracefulShutdown() { - go func() { - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) - <-sigCh - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - glog.Info(ctx, "🔄 Shutting down Redis connections...") - if err := GetManager().CloseAll(ctx); err != nil { - glog.Errorf(ctx, "❌ Failed to close Redis connections: %v", err) - } else { - glog.Info(ctx, "✅ Redis connections closed successfully") - } - }() -} diff --git a/message/redis_msg.go b/message/redis_msg.go index 135a78c..38dd331 100644 --- a/message/redis_msg.go +++ b/message/redis_msg.go @@ -2,7 +2,6 @@ package message import ( "context" - "encoding/json" "fmt" "strings" "time" @@ -12,151 +11,137 @@ import ( "github.com/gogf/gf/v2/util/gconv" ) -// redisMessageClient Redis 实现 -type redisMessageClient struct { - clientType messageClientType +type RedisPublishMsgConfig struct { + QueueName string + Data any } +type RedisSubscribeMsgConfig struct { + QueueName string + ConsumerName string + AutoAck bool + PrefetchCount int + HandleFunc func(ctx context.Context, message map[string]interface{}) error +} + +func (*RedisPublishMsgConfig) GetPublishMsgType() { + +} + +func (*RedisSubscribeMsgConfig) GetSubscribeMsgType() { + +} + +func init() { + // 注册 Redis 插件(连接由 RegisterPlugin 异步处理) + registerPlugin(MessageRedis, func() messageUtil { + return &redis{} + }) +} + +type redis struct{} + // RedisStreamMessage Redis Stream 消息结构 -type RedisStreamMessage struct { +type redisStreamMessage struct { ID string Values map[string]interface{} } -// StreamGroup 创建消费组(支持单个或批量) -func (q *redisMessageClient) streamGroup(ctx context.Context, configs ...interface{}) error { - if len(configs) == 0 { - return fmt.Errorf("配置不能为空") +// Ping 检测 Redis 连接状态 +func (c *redis) ping(ctx context.Context) bool { + conn, err := getDefaultDataSource() + if err != nil { + return false } - for _, config := range configs { - cfg, ok := config.(*RedisConfig) - if !ok { - return fmt.Errorf("无效的 Redis 配置类型") - } - if err := q.createStreamGroup(ctx, cfg); err != nil { - return err - } + return conn.redisPing(ctx) +} + +// Reconnect 重连 Redis +func (c *redis) reconnect(ctx context.Context) error { + conn, err := getDefaultDataSource() + if err != nil { + return fmt.Errorf("获取默认连接失败: %w", err) } + + if err := conn.redisReconnect(ctx); err != nil { + return fmt.Errorf("redis重连失败: %w", err) + } + return nil } -// streamGroup 内部单个创建消费组 -func (q *redisMessageClient) createStreamGroup(ctx context.Context, cfg *RedisConfig) error { - // 获取默认数据源 - ds, err := GetManager().GetDefaultDataSource() +// Close 关闭 Redis 连接 +func (c *redis) close(ctx context.Context) error { + conn, err := getDefaultDataSource() if err != nil { - return fmt.Errorf("获取默认数据源失败: %w", err) + return fmt.Errorf("获取默认连接失败: %w", err) } - // 检查连接状态,未连接则自动重连 - if !ds.IsConnected() { - if err := ds.Reconnect(ctx); err != nil { - return fmt.Errorf("redis重连失败: %w", err) - } + if err := conn.redisClose(ctx); err != nil { + return fmt.Errorf("关闭redis连接失败: %w", err) } - _, err = ds.Redis().Do(ctx, "XGROUP", "CREATE", cfg.Stream, cfg.Group, "0", "MKSTREAM") - if err != nil { - errStr := err.Error() - if strings.Contains(errStr, "BUSYGROUP") && strings.Contains(errStr, "already exists") { - glog.Infof(ctx, "✅ Redis 消费者组已存在: %s", cfg.Group) - return nil - } - return fmt.Errorf("初始化消费者组失败: %w", err) - } - glog.Infof(ctx, "✅ Redis 消费者组创建成功: %s", cfg.Group) return nil } -// Publish 内部单个发布消息 -func (q *redisMessageClient) publish(ctx context.Context, config interface{}, data interface{}) error { - ds, err := GetManager().GetDefaultDataSource() - if err != nil { - return fmt.Errorf("获取默认数据源失败: %w", err) - } - - if !ds.IsConnected() { - if err := ds.Reconnect(ctx); err != nil { - return fmt.Errorf("redis重连失败: %w", err) - } - } - - cfg, ok := config.(*RedisConfig) +// Publish 发布消息 +func (c *redis) Publish(ctx context.Context, msgConfig messagePublishConfig) error { + cfg, ok := msgConfig.(*RedisPublishMsgConfig) if !ok { - return fmt.Errorf("无效的redis配置类型") + return fmt.Errorf("无效的 Redis 配置类型") } - values := gconv.Map(data) + if g.IsEmpty(cfg.QueueName) { + return fmt.Errorf("队列名称不能为空") + } + if g.IsEmpty(cfg.Data) { + return fmt.Errorf("数据不能为空") + } + conn, err := getDefaultDataSource() + if err != nil { + return fmt.Errorf("获取默认连接失败: %w", err) + } + + if !conn.getIsConnected() { + if err := conn.redisReconnect(ctx); err != nil { + return fmt.Errorf("redis重连失败: %w", err) + } + } + + values := gconv.Map(cfg.Data) args := make([]interface{}, 0, len(values)*2+2) - args = append(args, cfg.Stream, "*") + args = append(args, cfg.QueueName, "*") for key, val := range values { args = append(args, key, val) } - result, err := ds.Redis().Do(ctx, "XADD", args...) + result, err := conn.getClient().Do(ctx, "XADD", args...) if err != nil { - g.Log().Errorf(ctx, "❌ Redis 发布消息失败: topic=%s, err=%v", cfg.Stream, err) + g.Log().Errorf(ctx, "❌ Redis 发布消息失败: key=%s, err=%v", cfg.QueueName, err) return err } - g.Log().Infof(ctx, "✅ Redis 发布消息成功: topic=%s, messageID=%s", cfg.Stream, gconv.String(result)) + g.Log().Infof(ctx, "✅ Redis 发布消息成功: key=%s, messageID=%s", cfg.QueueName, gconv.String(result)) return nil } -// PublishDelayed 发布延迟消息(使用 ZSET) -func (q *redisMessageClient) publishDelayed(ctx context.Context, config interface{}, data interface{}, delay int) error { - ds, err := GetManager().GetDefaultDataSource() - if err != nil { - return fmt.Errorf("获取默认数据源失败: %w", err) - } - - if !ds.IsConnected() { - if err := ds.Reconnect(ctx); err != nil { - return fmt.Errorf("redis重连失败: %w", err) - } - } - - cfg, ok := config.(*RedisConfig) +// Subscribe 订阅消息 +func (c *redis) Subscribe(ctx context.Context, msgConfig messageSubscribeConfig) error { + cfg, ok := msgConfig.(*RedisSubscribeMsgConfig) if !ok { - return fmt.Errorf("无效的redis配置类型") + return fmt.Errorf("无效的 Redis 配置类型") } - payload, err := json.Marshal(data) - if err != nil { - return fmt.Errorf("序列化数据失败: %w", err) + if g.IsEmpty(cfg.QueueName) { + return fmt.Errorf("队列名称不能为空") } - score := float64(time.Now().Add(time.Duration(delay)).UnixMilli()) - delayedKey := fmt.Sprintf("delayed:%s", cfg.Stream) - - // ZADD delayedKey score payload - _, err = ds.Redis().Do(ctx, "ZADD", delayedKey, score, string(payload)) - if err != nil { - return err + if g.IsEmpty(cfg.ConsumerName) { + return fmt.Errorf("消费者名称不能为空") } - - g.Log().Infof(ctx, "✅ Redis 延迟消息已发布: topic=%s, delay=%v", cfg.Stream, delay) - return nil + if g.IsEmpty(cfg.HandleFunc) { + return fmt.Errorf("处理函数不能为空") + } + return c.createSubscribe(ctx, cfg.QueueName, cfg.ConsumerName, cfg.PrefetchCount, cfg.AutoAck, cfg.HandleFunc) } -// Subscribe 订阅消息(支持单个或批量) -func (q *redisMessageClient) subscribe(ctx context.Context, configs ...interface{}) error { - if len(configs) == 0 { - return fmt.Errorf("配置不能为空") - } - for _, config := range configs { - cfg, ok := config.(*RedisConfig) - if !ok { - return fmt.Errorf("无效的 Redis 配置类型") - } - handler := cfg.HandleFunc - if handler == nil { - return fmt.Errorf("必须提供处理函数") - } - if err := q.createSubscribe(ctx, cfg, handler); err != nil { - return err - } - } - return nil -} - -// subscribe 内部单个订阅消息 -func (q *redisMessageClient) createSubscribe(ctx context.Context, cfg *RedisConfig, handler func(ctx context.Context, message map[string]interface{}) error) error { +// createSubscribe 内部订阅消息 +func (c *redis) createSubscribe(ctx context.Context, key, consumerName string, prefetchCount int, autoAck bool, handler func(ctx context.Context, message map[string]interface{}) error) error { go func() { defer func() { if r := recover(); r != nil { @@ -174,10 +159,10 @@ func (q *redisMessageClient) createSubscribe(ctx context.Context, cfg *RedisConf for { select { case <-ctx.Done(): - g.Log().Infof(ctx, "🔕 Redis 消费者停止: topic=%s", cfg.Stream) + g.Log().Infof(ctx, "🔕 Redis 消费者停止: topic=%s", key) return case <-retryTicker.C: - err := q.consumeMessages(ctx, cfg, handler) + err := c.consumeMessages(ctx, key, consumerName, prefetchCount, autoAck, handler) if err != nil { // 对于超时错误,返回nil继续循环,而不是返回错误 if strings.Contains(err.Error(), "i/o timeout") || strings.Contains(err.Error(), "timeout") || @@ -216,25 +201,25 @@ func (q *redisMessageClient) createSubscribe(ctx context.Context, cfg *RedisConf } // consumeMessages 消费消息 -func (q *redisMessageClient) consumeMessages(ctx context.Context, cfg *RedisConfig, handler func(ctx context.Context, message map[string]interface{}) error) error { - ds, err := GetManager().GetDefaultDataSource() +func (c *redis) consumeMessages(ctx context.Context, key, consumerName string, prefetchCount int, autoAck bool, handler func(ctx context.Context, message map[string]interface{}) error) error { + conn, err := getDefaultDataSource() if err != nil { - return fmt.Errorf("获取默认数据源失败: %w", err) + return fmt.Errorf("获取默认连接失败: %w", err) } - if !ds.IsConnected() { - if err := ds.Reconnect(ctx); err != nil { + if !conn.getIsConnected() { + if err := conn.redisReconnect(ctx); err != nil { return fmt.Errorf("redis重连失败: %w", err) } } // 检查消费者组是否存在 - if err := q.createStreamGroup(ctx, cfg); err != nil { + if err := c.createStreamGroup(ctx, key); err != nil { return fmt.Errorf("create stream group failed: %w", err) } // 使用带重试的命令执行 - result, err := ds.Redis().Do(ctx, "XREADGROUP", "GROUP", cfg.Group, cfg.Consumer, "COUNT", cfg.Count, "BLOCK", 0, "STREAMS", cfg.Stream, ">") + result, err := conn.getClient().Do(ctx, "XREADGROUP", "GROUP", "default", consumerName, "COUNT", prefetchCount, "BLOCK", 0, "STREAMS", key, ">") if err != nil { if strings.Contains(err.Error(), "i/o timeout") || strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), "context deadline exceeded") || strings.Contains(err.Error(), "context canceled") { @@ -242,7 +227,7 @@ func (q *redisMessageClient) consumeMessages(ctx context.Context, cfg *RedisConf } return err } - messages, err := q.parseStreamResult(result) + messages, err := c.parseStreamResult(result) if err != nil { return err } @@ -254,8 +239,8 @@ func (q *redisMessageClient) consumeMessages(ctx context.Context, cfg *RedisConf } // ACK 消息 - if cfg.AutoAck { - if err := q.ackMessage(ctx, cfg.Stream, cfg.Group, msg.ID); err != nil { + if autoAck { + if err := c.ackMessage(ctx, key, "default", msg.ID); err != nil { g.Log().Errorf(ctx, "❌ ACK 消息失败: messageID=%s, err=%v", msg.ID, err) } } @@ -264,15 +249,42 @@ func (q *redisMessageClient) consumeMessages(ctx context.Context, cfg *RedisConf return nil } -// ackMessage ACK 消息 -func (q *redisMessageClient) ackMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error { - ds, err := GetManager().GetDefaultDataSource() +// createStreamGroup 内部单个创建消费组 +func (c *redis) createStreamGroup(ctx context.Context, key string) error { + conn, err := getDefaultDataSource() if err != nil { - return fmt.Errorf("获取默认数据源失败: %w", err) + return fmt.Errorf("获取默认连接失败: %w", err) } - if !ds.IsConnected() { - if err := ds.Reconnect(ctx); err != nil { + if !conn.getIsConnected() { + if err := conn.redisReconnect(ctx); err != nil { + return fmt.Errorf("redis重连失败: %w", err) + } + } + + groupName := "default" + _, err = conn.getClient().Do(ctx, "XGROUP", "CREATE", key, groupName, "0", "MKSTREAM") + if err != nil { + errStr := err.Error() + if strings.Contains(errStr, "BUSYGROUP") && strings.Contains(errStr, "already exists") { + glog.Infof(ctx, "✅ Redis 消费者组已存在: %s", groupName) + return nil + } + return fmt.Errorf("初始化消费者组失败: %w", err) + } + glog.Infof(ctx, "✅ Redis 消费者组创建成功: %s", groupName) + return nil +} + +// ackMessage ACK 消息 +func (c *redis) ackMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error { + conn, err := getDefaultDataSource() + if err != nil { + return fmt.Errorf("获取默认连接失败: %w", err) + } + + if !conn.getIsConnected() { + if err := conn.redisReconnect(ctx); err != nil { return fmt.Errorf("redis重连失败: %w", err) } } @@ -282,14 +294,14 @@ func (q *redisMessageClient) ackMessage(ctx context.Context, streamKey, groupNam for _, id := range messageIDs { args = append(args, id) } - _, err = ds.Redis().Do(ctx, "XACK", args...) + _, err = conn.getClient().Do(ctx, "XACK", args...) return err } // parseStreamResult 解析 Stream 结果 -func (q *redisMessageClient) parseStreamResult(result interface{}) ([]RedisStreamMessage, error) { +func (c *redis) parseStreamResult(result interface{}) ([]redisStreamMessage, error) { if result == nil { - return []RedisStreamMessage{}, nil + return []redisStreamMessage{}, nil } var resultVal interface{} @@ -303,15 +315,15 @@ func (q *redisMessageClient) parseStreamResult(result interface{}) ([]RedisStrea // 检查是否为空 if resultVal == nil { - return []RedisStreamMessage{}, nil + return []redisStreamMessage{}, nil } // 预分配切片容量,避免多次扩容 - messages := make([]RedisStreamMessage, 0) + messages := make([]redisStreamMessage, 0) if streamsMap, ok := resultVal.(map[interface{}]interface{}); ok { - for _, streamMsg := range streamsMap { - msgArray, ok := streamMsg.([]interface{}) + for _, streamData := range streamsMap { + msgArray, ok := streamData.([]interface{}) if !ok { continue } @@ -332,7 +344,7 @@ func (q *redisMessageClient) parseStreamResult(result interface{}) ([]RedisStrea values[key] = fieldsArray[i+1] } } - messages = append(messages, RedisStreamMessage{ + messages = append(messages, redisStreamMessage{ ID: msgID, Values: values, }) diff --git a/middleware/module_tenant_check.go b/middleware/module_tenant_check.go index 1eff872..2ba3518 100644 --- a/middleware/module_tenant_check.go +++ b/middleware/module_tenant_check.go @@ -6,7 +6,7 @@ import ( "fmt" "gitee.com/red-future---jilin-g/common/beans" "gitee.com/red-future---jilin-g/common/message" - "gitee.com/red-future---jilin-g/common/nats" + "gitee.com/red-future---jilin-g/common/redis" "gitee.com/red-future---jilin-g/common/utils" "github.com/gogf/gf/v2/database/gredis" "github.com/gogf/gf/v2/frame/g" @@ -20,7 +20,7 @@ import ( func ModuleTenantCheck(r *ghttp.Request) { // 检查是否是超级管理员 isSuperAdmin := false - if err := nats.CallRPC(r.Context(), "userService.IsSuperAdmin", nil, &isSuperAdmin); err != nil { + if err := message.CallRPC(r.Context(), "userService.IsSuperAdmin", nil, &isSuperAdmin); err != nil { SetResponseInfo(r.Context(), r, http.StatusPaymentRequired, err) } // 如果是超级管理员,则不进行模块租户检查 @@ -33,7 +33,7 @@ func ModuleTenantCheck(r *ghttp.Request) { SetResponseInfo(r.Context(), r, http.StatusPaymentRequired, err) } exit := gconv.Int64(time.Minute * 1) - getEX, err := message.GetRedisClientTest("test").GetEX(r.Context(), fmt.Sprintf("module_tenant:tenantId-%v", getUserInfo.TenantId), gredis.GetEXOption{ + getEX, err := redis.GetRedisClientTest("test").GetEX(r.Context(), fmt.Sprintf("module_tenant:tenantId-%v", getUserInfo.TenantId), gredis.GetEXOption{ TTLOption: gredis.TTLOption{ EX: &exit, }, @@ -68,7 +68,7 @@ func ModuleTenantCheck(r *ghttp.Request) { ModuleKey: moduleKey, TenantId: gconv.Uint64(getUserInfo.TenantId), } - err = nats.CallRPC(r.Context(), "moduleService.Check", &checkReq, checkRes) + err = message.CallRPC(r.Context(), "moduleService.Check", &checkReq, checkRes) if err != nil { SetResponseInfo(r.Context(), r, http.StatusPaymentRequired, err) } diff --git a/redis/redis.go b/redis/redis.go index d8dd359..8bcb742 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -28,9 +28,9 @@ func getClient() *gredis.Redis { return redisClient } -// GetRedisClient 获取 Redis 客户端(供外部使用) -func GetRedisClient() *gredis.Redis { - return getClient() +// getClient 获取 Redis 客户端 临时方法 +func GetRedisClientTest(name string) *gredis.Redis { + return g.Redis(name) } // RedisClient 获取 Redis 客户端(函数式,确保单例正确初始化)