重构消息队列模块,新增NATS和RabbitMQ连接实现,移除旧版消息队列代码

This commit is contained in:
2026-01-31 05:17:14 +08:00
committed by 张斌
parent 6acdbb6e88
commit ee6d3c9033
17 changed files with 966 additions and 2391 deletions

144
message/connection_nats.go Normal file
View File

@@ -0,0 +1,144 @@
package message
import (
"context"
"fmt"
"sync"
"time"
"github.com/gogf/gf/v2/frame/g"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
var (
nc *nats.Conn
js jetstream.JetStream
natsMu sync.RWMutex
)
// natsConnect 建立 NATS 连接
func natsConnect(ctx context.Context) error {
natsMu.Lock()
defer natsMu.Unlock()
// 安全地关闭旧连接
if oldConn := nc; oldConn != nil && !oldConn.IsClosed() {
oldConn.Close()
}
// 从配置文件读取 NATS 地址
natsURL := g.Cfg().MustGet(ctx, "nats.url").String()
if natsURL == "" {
// 默认使用本地地址
natsURL = nats.DefaultURL
}
// 使用独立的日志上下文,避免使用外部可能被取消的上下文
logCtx := context.Background()
// 连接选项配置
opts := []nats.Option{
nats.Name("goframe-nats-client"),
nats.ReconnectWait(2 * time.Second),
nats.MaxReconnects(-1), // 无限重连
nats.PingInterval(10 * time.Second),
nats.MaxPingsOutstanding(5),
nats.ReconnectHandler(func(nc *nats.Conn) {
g.Log().Infof(logCtx, "✅ NATS 重连成功: %s", nc.ConnectedUrl())
natsMu.Lock()
defer natsMu.Unlock()
// 重新创建 JetStream 实例
if newJS, err := jetstream.New(nc); err == nil {
js = newJS
}
}),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
g.Log().Warningf(logCtx, "⚠️ NATS 连接断开: %v, 准备重连...", err)
}),
nats.ClosedHandler(func(nc *nats.Conn) {
g.Log().Infof(logCtx, "NATS 连接已关闭: %s", nc.ConnectedUrl())
}),
nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
g.Log().Errorf(logCtx, "NATS 错误: %v", err)
}),
}
var err error
nc, err = nats.Connect(natsURL, opts...)
if err != nil {
return fmt.Errorf("NATS 连接失败: %w", err)
}
// 等待连接就绪
if nc.Status() != nats.CONNECTED {
select {
case <-time.After(5 * time.Second):
// 连接超时,清理资源
if nc != nil {
nc.Close()
}
return fmt.Errorf("NATS 连接超时")
case <-nc.StatusChanged(nats.CONNECTED):
// 连接成功
case <-ctx.Done():
// 外部上下文被取消,清理资源
if nc != nil {
nc.Close()
}
return fmt.Errorf("NATS 连接被取消: %w", ctx.Err())
}
}
// 创建 JetStream 实例
js, err = jetstream.New(nc)
if err != nil {
// 创建 JetStream 失败,清理连接
if nc != nil {
nc.Close()
}
return fmt.Errorf("创建 JetStream 失败: %w", err)
}
g.Log().Infof(ctx, "✅ NATS 连接成功: %s", nc.ConnectedUrl())
return nil
}
// natsPing 检测 NATS 连接状态
func natsPing() bool {
natsMu.RLock()
defer natsMu.RUnlock()
if nc == nil || nc.IsClosed() {
return false
}
// 使用 NATS 的状态检查
if nc.Status() != nats.CONNECTED {
return false
}
return true
}
// natsReconnect 重连 NATS
func natsReconnect(ctx context.Context) error {
if err := natsConnect(ctx); err != nil {
return fmt.Errorf("nats重连失败: %w", err)
}
return nil
}
// natsClose 关闭 NATS 连接
func natsClose(ctx context.Context) error {
natsMu.Lock()
defer natsMu.Unlock()
if nc == nil || nc.IsClosed() {
return nil // 连接已经关闭或不存在
}
nc.Close()
g.Log().Infof(ctx, "✅ NATS 连接已关闭")
return nil
}

View File

@@ -0,0 +1,109 @@
package message
import (
"context"
"fmt"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
amqp "github.com/rabbitmq/amqp091-go"
"sync"
"time"
)
var (
conn *amqp.Connection
channel *amqp.Channel
rabbitmqMu sync.RWMutex
)
// config RabbitMQ 配置
type config struct {
Host string
Port int
Username string
Password string
VHost string
}
func rabbitmqConnect(ctx context.Context) error {
rabbitmqMu.Lock()
defer rabbitmqMu.Unlock()
LOOP:
cfg := &config{
Host: g.Cfg().MustGet(ctx, "rabbitmq.host").String(),
Port: g.Cfg().MustGet(ctx, "rabbitmq.port").Int(),
Username: g.Cfg().MustGet(ctx, "rabbitmq.username").String(),
Password: g.Cfg().MustGet(ctx, "rabbitmq.password").String(),
VHost: g.Cfg().MustGet(ctx, "rabbitmq.vhost", "/").String(),
}
url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost
var err error
conn, err = amqp.Dial(url)
if err != nil {
g.Log().Errorf(ctx, "重连失败: %v", err)
time.Sleep(2 * time.Second)
goto LOOP
}
channel, err = conn.Channel()
if err != nil {
g.Log().Errorf(ctx, "创建 Channel 失败: %v", err)
time.Sleep(2 * time.Second)
goto LOOP
}
g.Log().Info(ctx, "RabbitMQ 重连成功")
return nil
}
// rabbitmqReconnect 重新连接
func rabbitmqReconnect(ctx context.Context) error {
if err := rabbitmqConnect(ctx); err != nil {
return fmt.Errorf("nats重连失败: %w", err)
}
return nil
}
// rabbitmqPing 检测 RabbitMQ 连接状态
func rabbitmqPing() bool {
rabbitmqMu.RLock()
defer rabbitmqMu.RUnlock()
if conn == nil || conn.IsClosed() {
return false
}
return true
}
// rabbitmqClose 关闭连接
func rabbitmqClose(ctx context.Context) error {
rabbitmqMu.Lock()
defer rabbitmqMu.Unlock()
var lastErr error
if channel != nil {
if err := channel.Close(); err != nil {
g.Log().Errorf(ctx, "关闭 RabbitMQ Channel 失败: %v", err)
lastErr = err
}
channel = nil
}
if conn != nil {
if err := conn.Close(); err != nil {
g.Log().Errorf(ctx, "关闭 RabbitMQ 连接失败: %v", err)
lastErr = err
}
conn = nil
}
g.Log().Info(ctx, "RabbitMQ 连接已关闭")
return lastErr
}

View File

@@ -1,174 +0,0 @@
package message
import (
"context"
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/errors/gerror"
)
func GetRedisClientTest(name string) *gredis.Redis {
return getRedisClientTest(name)
}
// GetLock 获取分布式锁
func GetLock(ctx context.Context, key string, expireSeconds int64, fn func(ctx context.Context) error) (success bool, err error) {
return lock(ctx, key, expireSeconds, fn)
}
// MessageConfig 消息配置接口
type MessageConfig interface {
start(ctx context.Context) error
publish(ctx context.Context, data interface{}, options ...map[string]interface{}) (messageID string, err error)
}
// RedisMessageConfig Redis Stream 消息配置
type RedisMessageConfig struct {
StreamKey string // Stream 键名
GroupName string // 消费者组名称
ConsumerName string // 消费者名称
BatchSize int64 // 最大并发数(信号量容量)
AutoAck bool // ACK确认,true自动确认,false手动确认
HandleFunc func(ctx context.Context, message map[string]interface{}) error
}
func (r *RedisMessageConfig) start(ctx context.Context) error {
return readFromStream(ctx, QueueMessage{
StreamKey: r.StreamKey,
GroupName: r.GroupName,
ConsumerName: r.ConsumerName,
BatchSize: r.BatchSize,
AutoAck: r.AutoAck,
HandleFunc: r.HandleFunc,
})
}
func (r *RedisMessageConfig) publish(ctx context.Context, data interface{}, options ...map[string]interface{}) (messageID string, err error) {
return publishToRedis(ctx, r.StreamKey, data)
}
// RabbitMQMessageConfig RabbitMQ 消息配置
type RabbitMQMessageConfig struct {
Queue string // 队列名称
Exchange string // 交换器名称
RoutingKey string // 路由键
PrefetchCount int // QoS: 预取数量(并发控制)
WorkerCount int // worker 数量
ConsumerTag string // 消费者标签
HandleFunc func(ctx context.Context, message map[string]interface{}) error
}
func (r *RabbitMQMessageConfig) start(ctx context.Context) error {
return startRabbitMQConsumer(ctx, QueueMessage{
Queue: r.Queue,
Exchange: r.Exchange,
RoutingKey: r.RoutingKey,
PrefetchCount: r.PrefetchCount,
WorkerCount: r.WorkerCount,
ConsumerTag: r.ConsumerTag,
AutoAck: true,
HandleFunc: r.HandleFunc,
})
}
func (r *RabbitMQMessageConfig) publish(ctx context.Context, data interface{}, options ...map[string]interface{}) (messageID string, err error) {
opts := make(map[string]interface{})
if len(options) > 0 {
opts = options[0]
}
exchange := r.Exchange
routingKey := r.RoutingKey
delay := 0
if v, ok := opts["exchange"].(string); ok {
exchange = v
}
if v, ok := opts["routingKey"].(string); ok {
routingKey = v
}
if v, ok := opts["delay"].(int); ok {
delay = v
}
if delay > 0 {
return publishDelayedToRabbitMQ(ctx, exchange, routingKey, data, delay)
}
return publishToRabbitMQ(ctx, exchange, routingKey, data)
}
// QueueMessage 统一消息队列配置结构体(内部使用)
type QueueMessage struct {
// Redis Stream 配置
StreamKey string
GroupName string
ConsumerName string
BatchSize int64
AutoAck bool
HandleFunc func(ctx context.Context, message map[string]interface{}) error
// RabbitMQ 配置
Queue string
Exchange string
RoutingKey string
PrefetchCount int
WorkerCount int
ConsumerTag string
}
// StartConsumers 启动消息消费者(统一入口)
// 支持同时启动多个消费者,包括 Redis Stream 和 RabbitMQ
func StartConsumers(ctx context.Context, configs ...MessageConfig) error {
for _, cfg := range configs {
if err := cfg.start(ctx); err != nil {
return gerror.Wrap(err, "启动消费者失败")
}
}
return nil
}
// PublishMessage 发布消息(统一入口)
// 根据配置类型选择发布到 Redis Stream 或 RabbitMQ
func PublishMessage(ctx context.Context, cfg MessageConfig, data interface{}, options ...map[string]interface{}) (messageID string, err error) {
return cfg.publish(ctx, data, options...)
}
// ========== Redis Stream 公共方法(方便迁移) ==========
// AddToStream 将消息添加到 Redis Stream
//func AddToStream(ctx context.Context, streamKey string, msg interface{}) (messageID string, err error) {
// return addToStream(ctx, streamKey, msg)
//}
// ReadFromStream 从 Redis Stream 读取消息(已废弃)
// 请使用 RedisMessageConfig.StartConsumers 启动消费者
// 此方法保留用于向后兼容,但实际不会返回消息(异步消费模式)
func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count, blockMs int64) ([]StreamMessage, error) {
return nil, gerror.New("ReadFromStream 已废弃,请使用 RedisMessageConfig.StartConsumers 启动消费者")
}
// AckMessage 确认 Redis Stream 消息
func AckMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error {
return ackMessage(ctx, streamKey, groupName, messageIDs...)
}
// InitStreamGroup 初始化 Redis Stream 消费者组
func InitStreamGroup(ctx context.Context, streamKey, groupName string) error {
return initStreamGroup(ctx, streamKey, groupName)
}
// ========== RabbitMQ 公共方法(方便迁移) ==========
// InitRabbitMQ 初始化 RabbitMQ 连接
func InitRabbitMQ(ctx context.Context) error {
return initRabbitMQ(ctx)
}
// PublishToRabbitMQ 发布消息到 RabbitMQ
//func PublishToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}) error {
// return publishToRabbitMQ(ctx, exchange, routingKey, message)
//}
// PublishDelayedToRabbitMQ 发布延时消息到 RabbitMQ
//func PublishDelayedToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}, delaySeconds int) error {
// return publishDelayedToRabbitMQ(ctx, exchange, routingKey, message, delaySeconds)
//}

26
message/msg_interfaces.go Normal file
View File

@@ -0,0 +1,26 @@
package message
import "context"
type messagePublishConfig interface {
GetPublishMsgType()
}
type messageSubscribeConfig interface {
GetSubscribeMsgType()
}
// messageUtil 消息队列公共配置接口
// 只暴露核心的发布/订阅功能,配置访问器方法不需要在公共接口中
type messageUtil interface {
// Publish 发布消息
Publish(ctx context.Context, msg messagePublishConfig) error
// Subscribe 订阅消息
Subscribe(ctx context.Context, msg messageSubscribeConfig) error
// Ping 检测连接状态
ping(ctx context.Context) bool
// Reconnect 重连
reconnect(ctx context.Context) error
// Close 关闭连接
close(ctx context.Context) error
}

View File

@@ -0,0 +1,133 @@
package message
import (
"context"
"fmt"
"sync"
"time"
"github.com/gogf/gf/v2/frame/g"
)
// MessageType 消息队列类型
type messageType string
const (
// MessageRedis Redis 消息队列
MessageRedis messageType = "redis"
// MessageRabbitMQ RabbitMQ 消息队列
MessageRabbitMQ messageType = "rabbitmq"
// MessageNATS NATS 消息队列
MessageNATS messageType = "nats"
)
// configFactory 消息队列配置工厂函数类型
type configFactory func() messageUtil
// PluginManager 消息队列插件管理器
type pluginManager struct {
mu sync.RWMutex
instances map[messageType]messageUtil // 已连接的插件实例
}
var (
defaultPluginManager = newPluginManager()
// 不再支持默认插件类型,必须显式指定类型
)
// newPluginManager 创建插件管理器
func newPluginManager() *pluginManager {
return &pluginManager{
instances: make(map[messageType]messageUtil),
}
}
// RegisterPlugin 注册消息队列插件
// 所有插件必须通过此方法注册,自动进行连接检测
// 只有连接成功的插件才会被注册,连接失败的插件不会被注册
// 异步无限重连,只有连接成功了才注册
func registerPlugin(msgType messageType, factory configFactory) error {
if factory == nil {
return fmt.Errorf("factory cannot be nil")
}
// 创建实例
instance := factory()
ctx := context.Background()
// 开启异步连接,无限重连直到成功
go func() {
retryInterval := 2 * time.Second
maxInterval := 30 * time.Second
for {
select {
case <-ctx.Done():
g.Log().Errorf(ctx, "❌ [%s] 注册被取消", msgType)
return
default:
// 尝试连接使用Reconnect方法
if err := instance.reconnect(ctx); err == nil {
// 连接成功,注册插件
if err := defaultPluginManager.register(msgType, instance); err != nil {
g.Log().Errorf(ctx, "❌ [%s] 注册插件失败: %v", msgType, err)
instance.close(ctx)
} else {
g.Log().Infof(ctx, "✅ [%s] 插件注册成功", msgType)
}
return
}
// 连接失败,记录日志并等待重试
g.Log().Warningf(ctx, "⚠️ [%s] 连接失败,%v 后重试...", msgType, retryInterval)
select {
case <-time.After(retryInterval):
// 增加重试间隔,但不超过最大值
retryInterval *= 2
if retryInterval > maxInterval {
retryInterval = maxInterval
}
case <-ctx.Done():
g.Log().Errorf(ctx, "❌ [%s] 注册被取消", msgType)
return
}
}
}
}()
return nil
}
// register 注册插件(内部方法)
func (m *pluginManager) register(msgType messageType, instance messageUtil) error {
m.mu.Lock()
defer m.mu.Unlock()
m.instances[msgType] = instance
return nil
}
// GetMsgPlugin 获取消息队列插件
func GetMsgPlugin(msgType messageType) (messageUtil, error) {
defaultPluginManager.mu.RLock()
instance, ok := defaultPluginManager.instances[msgType]
defaultPluginManager.mu.RUnlock()
if !ok {
return nil, fmt.Errorf("unsupported message type: %s", msgType)
}
return instance, nil
}
// GetSupportedTypes 获取所有已注册的插件类型
func GetSupportedTypes() []messageType {
defaultPluginManager.mu.RLock()
defer defaultPluginManager.mu.RUnlock()
types := make([]messageType, 0, len(defaultPluginManager.instances))
for t := range defaultPluginManager.instances {
types = append(types, t)
}
return types
}

View File

@@ -1,152 +0,0 @@
package message
import (
"context"
"fmt"
)
type RedisConfig struct {
// Stream 名称
Stream string
// 消费者组名称
Group string
// 消费者名称
Consumer string
// 每次消费数量
Count int64
// 是否自动 ACK
AutoAck bool
// 处理函数
HandleFunc func(ctx context.Context, message map[string]interface{}) error
}
// RabbitMQConfig RabbitMQ 队列配置
type RabbitMQConfig struct {
Mode string
Exchange string
Topic string
DelayMessage bool
// 队列名称(必需)
Name string
// 实际队列名(用于绑定)
Queue string
// 是否持久化
Durable bool
// QoS 预取数量每次推送的消息数量默认10
PrefetchCount int
// 最大重试次数默认3
MaxRetry int
// 是否自动 ACK
AutoAck bool
// 处理函数
HandleFunc func(ctx context.Context, message map[string]interface{}) error
}
// NATSConfig NATS 队列配置
type NATSConfig struct {
DelayMessage bool
// Stream 名称
Stream string
// 消费者名称
Consumer string
// 是否持久化
Durable bool
// 副本数
Replicas int
// QoS 预取数量每次推送的消息数量默认10
PrefetchCount int
// 是否自动 ACK
AutoAck bool
// 处理函数
HandleFunc func(ctx context.Context, message map[string]interface{}) error
}
// messageBroker 消息代理接口
type messageBroker interface {
// StreamGroup 创建消费组(支持单个配置或批量配置)
streamGroup(ctx context.Context, configs ...interface{}) error
// Publish 发布消息(支持单个配置或批量配置)
publish(ctx context.Context, config interface{}, data interface{}) error
// PublishDelayed 发布延迟消息(支持单个配置或批量配置)
publishDelayed(ctx context.Context, config interface{}, data interface{}, delay int) error
// Subscribe 订阅消息(支持单个配置或批量配置)
subscribe(ctx context.Context, configs ...interface{}) error
}
type messageClientType string
const (
ClientTypeRedis messageClientType = "redis"
ClientTypeRabbitMQ messageClientType = "rabbitmq"
ClientTypeNATS messageClientType = "nats"
)
// newMessageBroker 创建消息代理实例
func newMessageBroker(ctx context.Context, clientType messageClientType) (messageBroker, error) {
switch clientType {
case ClientTypeRedis:
return &redisMessageClient{clientType: clientType}, nil
case ClientTypeRabbitMQ:
return &rabbitMQMessageClient{clientType: clientType}, nil
case ClientTypeNATS:
return &natsMessageClient{clientType: clientType}, nil
default:
return nil, fmt.Errorf("unknown client type: %s", clientType)
}
}
// StreamGroup 直接创建消费组
func StreamGroup(ctx context.Context, clientType messageClientType, configs ...interface{}) error {
broker, err := newMessageBroker(ctx, clientType)
if err != nil {
return err
}
return broker.streamGroup(ctx, configs...)
}
// Publish 直接发布消息
func Publish(ctx context.Context, clientType messageClientType, config interface{}, data interface{}) error {
broker, err := newMessageBroker(ctx, clientType)
if err != nil {
return err
}
return broker.publish(ctx, config, data)
}
// PublishDelayed 直接发布延迟消息
func PublishDelayed(ctx context.Context, clientType messageClientType, config interface{}, data interface{}, delay int) error {
broker, err := newMessageBroker(ctx, clientType)
if err != nil {
return err
}
return broker.publishDelayed(ctx, config, data, delay)
}
// Subscribe 直接订阅消息
func Subscribe(ctx context.Context, clientType messageClientType, configs ...interface{}) error {
broker, err := newMessageBroker(ctx, clientType)
if err != nil {
return err
}
return broker.subscribe(ctx, configs...)
}

View File

@@ -1,313 +0,0 @@
package message
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/gogf/gf/v2/frame/g"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
var (
nc *nats.Conn
js jetstream.JetStream
inited bool
natsMu sync.RWMutex
natsURL string
healthCtx context.Context
healthCancel context.CancelFunc
connected bool
reconnectChan chan struct{}
// 连接状态变化监听器
connStateListeners []connStateListener
connListenersMu sync.RWMutex
// 监控指标
metrics metricsCounter
)
// Metrics 监控指标
type metricsCounter struct {
PublishCount atomic.Int64
PublishError atomic.Int64
SubscribeCount atomic.Int64
RequestCount atomic.Int64
RequestError atomic.Int64
ConsumeCount atomic.Int64
ConsumeError atomic.Int64
}
// ConnState 连接状态
type connState int
const (
connStateDisconnected connState = iota
connStateConnecting
connStateConnected
connStateReconnecting
connStateClosed
)
// ConnStateListener 连接状态监听器
type connStateListener func(state connState, err error)
// GetMetrics 获取监控指标
func getMetrics() metricsCounter {
return metrics
}
// registerConnStateListener 注册连接状态监听器
func registerConnStateListener(listener connStateListener) {
connListenersMu.Lock()
defer connListenersMu.Unlock()
connStateListeners = append(connStateListeners, listener)
}
// unregisterConnStateListener 取消注册连接状态监听器
func unregisterConnStateListener(listener connStateListener) {
connListenersMu.Lock()
defer connListenersMu.Unlock()
for i, l := range connStateListeners {
if l != nil && &l == &listener {
connStateListeners = append(connStateListeners[:i], connStateListeners[i+1:]...)
break
}
}
}
// notifyConnState 通知所有监听器连接状态变化
func notifyConnState(state connState, err error) {
connListenersMu.RLock()
listeners := make([]connStateListener, len(connStateListeners))
copy(listeners, connStateListeners)
connListenersMu.RUnlock()
for _, listener := range listeners {
if listener != nil {
listener(state, err)
}
}
}
// init 初始化 NATS 连接
func init() {
// 从配置文件读取 NATS 地址
natsURL = g.Cfg().MustGet(context.Background(), "nats.url").String()
if natsURL == "" {
// 默认使用本地地址
natsURL = nats.DefaultURL
}
// 创建健康检查上下文
healthCtx, healthCancel = context.WithCancel(context.Background())
// 创建重连通知通道(增大缓冲区避免丢失通知)
reconnectChan = make(chan struct{}, 10)
// 启动连接
go initConnection()
// 启动健康检查协程
go healthCheck()
}
// initConnection 初始化连接
func initConnection() {
ctx := context.Background()
notifyConnState(connStateConnecting, nil)
if err := connect(ctx); err != nil {
g.Log().Errorf(ctx, "NATS 初始连接失败: %v", err)
notifyConnState(connStateDisconnected, err)
}
}
// connect 建立 NATS 连接
func connect(ctx context.Context) error {
natsMu.Lock()
defer natsMu.Unlock()
if nc != nil && !nc.IsClosed() {
nc.Close()
}
// 连接选项配置
opts := []nats.Option{
nats.Name("goframe-nats-client"),
nats.ReconnectWait(2 * time.Second),
nats.MaxReconnects(-1), // 无限重连
nats.PingInterval(10 * time.Second),
nats.MaxPingsOutstanding(5),
nats.ReconnectHandler(func(nc *nats.Conn) {
g.Log().Infof(ctx, "✅ NATS 重连成功: %s", nc.ConnectedUrl())
connected = true
// 重新创建 JetStream 实例
if newJS, err := jetstream.New(nc); err == nil {
js = newJS
}
// 通知重连成功
notifyConnState(connStateConnected, nil)
// 使用非阻塞发送避免阻塞
select {
case reconnectChan <- struct{}{}:
default:
// 通道已满,丢弃通知
}
}),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
g.Log().Warningf(ctx, "⚠️ NATS 连接断开: %v, 准备重连...", err)
connected = false
notifyConnState(connStateReconnecting, err)
}),
nats.ClosedHandler(func(nc *nats.Conn) {
g.Log().Infof(ctx, "NATS 连接已关闭: %s", nc.ConnectedUrl())
connected = false
notifyConnState(connStateClosed, nil)
}),
nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
g.Log().Errorf(ctx, "NATS 错误: %v", err)
}),
}
var err error
nc, err = nats.Connect(natsURL, opts...)
if err != nil {
return fmt.Errorf("NATS 连接失败: %w", err)
}
// 等待连接就绪
if nc.Status() != nats.CONNECTED {
select {
case <-time.After(5 * time.Second):
notifyConnState(connStateDisconnected, fmt.Errorf("连接超时"))
return fmt.Errorf("NATS 连接超时")
case <-nc.StatusChanged(nats.CONNECTED):
}
}
// 创建 JetStream 实例
js, err = jetstream.New(nc)
if err != nil {
return fmt.Errorf("创建 JetStream 失败: %w", err)
}
connected = true
inited = true
g.Log().Infof(ctx, "✅ NATS 连接成功: %s", nc.ConnectedUrl())
notifyConnState(connStateConnected, nil)
return nil
}
// healthCheck 健康检查协程(仅作为备用检查)
func healthCheck() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-healthCtx.Done():
return
case <-ticker.C:
natsMu.RLock()
currentConnected := connected
currentConn := nc
natsMu.RUnlock()
if !currentConnected || currentConn == nil || currentConn.IsClosed() {
// 仅记录日志不尝试重连NATS 已有自动重连机制)
g.Log().Warning(context.Background(), "NATS 连接断开,等待 NATS 自动重连...")
}
case <-reconnectChan:
// 重连成功的通知(仅记录日志)
g.Log().Info(context.Background(), "收到重连成功通知")
}
}
}
// checkConnected 检查连接状态
func checkConnected() bool {
natsMu.RLock()
defer natsMu.RUnlock()
return connected && nc != nil && !nc.IsClosed()
}
// getConnState 获取当前连接状态
func getConnState() connState {
natsMu.RLock()
defer natsMu.RUnlock()
if nc == nil {
return connStateDisconnected
}
if nc.IsClosed() {
return connStateClosed
}
if connected {
return connStateConnected
}
return connStateDisconnected
}
// shutdown 优雅关闭:自动注销所有已注册的服务并关闭 NATS 连接
func shutdown() error {
ctx := context.Background()
g.Log().Info(ctx, "开始优雅关闭 NATS RPC 服务...")
// 注销所有单实例服务
rpcServicesMu.Lock()
singleServiceCount := len(rpcServices)
for serviceName := range rpcServices {
if sub, exists := rpcSubs[serviceName]; exists {
if err := sub.Unsubscribe(); err != nil {
g.Log().Errorf(ctx, "注销服务 %s 失败: %v", serviceName, err)
}
}
delete(rpcSubs, serviceName)
delete(rpcServices, serviceName)
}
rpcServicesMu.Unlock()
// 注销所有队列服务
queueRPCMu.Lock()
queueServiceCount := 0
for queueName, servicesMap := range queueRPCServices {
queueServiceCount += len(servicesMap)
for serviceName, sub := range queueRPCSubs[queueName] {
if err := sub.Unsubscribe(); err != nil {
g.Log().Errorf(ctx, "注销队列服务 %s (队列: %s) 失败: %v", serviceName, queueName, err)
}
}
delete(queueRPCSubs, queueName)
delete(queueRPCServices, queueName)
}
queueRPCMu.Unlock()
g.Log().Infof(ctx, "已注销 %d 个单实例服务和 %d 个队列服务", singleServiceCount, queueServiceCount)
natsMu.Lock()
defer natsMu.Unlock()
// 停止健康检查协程
if healthCancel != nil {
healthCancel()
}
// 关闭连接
if nc != nil && !nc.IsClosed() {
nc.Close()
connected = false
inited = false
}
g.Log().Info(ctx, "NATS RPC 服务已优雅关闭")
return nil
}

View File

@@ -4,195 +4,274 @@ import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/gogf/gf/v2/frame/g"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"time"
)
// natsMessageClient NATS 实现
type natsMessageClient struct {
clientType messageClientType
type NatsPublishMsgConfig struct {
QueueName string
Durable bool
DelayTime int
Data any
}
// StreamGroup 创建消费组(支持单个或批量)
func (q *natsMessageClient) streamGroup(ctx context.Context, configs ...interface{}) error {
if len(configs) == 0 {
return fmt.Errorf("配置不能为空")
type NatsSubscribeMsgConfig struct {
QueueName string
Durable bool
DelayTime int
ConsumerName string
AutoAck bool
PrefetchCount int
HandleFunc func(ctx context.Context, message map[string]interface{}) error
}
for _, config := range configs {
cfg, ok := config.(*NATSConfig)
func (*NatsPublishMsgConfig) GetPublishMsgType() {
}
func (*NatsSubscribeMsgConfig) GetSubscribeMsgType() {
}
func init() {
// 注册 Nats 插件,必须使用 RegisterPlugin 确保连接检测
registerPlugin(MessageNATS, func() messageUtil {
return &natsMsg{}
})
}
type natsMsg struct{}
// Ping 检测 NATS 连接状态
func (c *natsMsg) ping(_ context.Context) bool {
return natsPing()
}
// Reconnect 重连 NATS
func (c *natsMsg) reconnect(ctx context.Context) error {
return natsReconnect(ctx)
}
// Close 关闭 NATS 连接
func (c *natsMsg) close(ctx context.Context) error {
return natsClose(ctx)
}
// Publish 发布消息
func (c *natsMsg) Publish(ctx context.Context, msgConfig messagePublishConfig) error {
cfg, ok := msgConfig.(*NatsPublishMsgConfig)
if !ok {
return fmt.Errorf("无效的 NATS 配置类型")
}
if err := q.createStreamGroup(ctx, cfg); err != nil {
if g.IsEmpty(cfg.QueueName) {
return fmt.Errorf("必须提供队列名称")
}
if g.IsEmpty(cfg.Data) {
return fmt.Errorf("必须提供数据")
}
return c.createPublish(ctx, cfg.QueueName, cfg.Durable, cfg.DelayTime, cfg.Data)
}
// Publish 发布消息
func (c *natsMsg) createPublish(ctx context.Context, subject string, durable bool, delayTime int, data any) error {
delayMsg := delayTime > 0
if err := c.createStreamGroupInternal(ctx, subject, durable, delayMsg); err != nil {
return err
}
payload, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("序列化数据失败: %w", err)
}
msg := &nats.Msg{
Subject: subject,
Data: payload,
}
if delayMsg {
// 计算目标投递时间
targetTime := time.Now().Add(time.Duration(delayTime) * time.Second)
delayNs := time.Until(targetTime).Nanoseconds()
if delayNs < 0 {
delayNs = 0
}
g.Log().Infof(ctx, "📅 NATS 延迟消息配置: DelayTime=%d秒, TargetTime=%v, DelayNs=%d纳秒(%.2f秒)",
delayTime, targetTime.Format("2006-01-02 15:04:05"), delayNs, float64(delayNs)/float64(time.Second.Nanoseconds()))
// NATS JetStream 延迟消息使用 Nats-Msg-Delay Header纳秒数
msg.Header = nats.Header{
"Nats-Msg-Delay": []string{fmt.Sprintf("%d", delayNs)},
}
g.Log().Infof(ctx, "📅 NATS 延迟消息 Header: %v", msg.Header)
// 获取 Stream 配置验证
streamName, _ := getStreamInfo(durable, delayMsg)
stream, err := js.Stream(ctx, streamName)
if err == nil {
info, _ := stream.Info(ctx)
g.Log().Infof(ctx, "📅 Stream 配置: AllowMsgSchedules=%v, Storage=%v",
info.Config.AllowMsgSchedules, info.Config.Storage)
if !info.Config.AllowMsgSchedules {
g.Log().Errorf(ctx, "❌ Stream 不支持延迟消息AllowMsgSchedules=false")
}
}
}
// 发布消息到 JetStream
ack, err := js.PublishMsg(ctx, msg)
if err != nil {
g.Log().Errorf(ctx, "❌ NATS 发布消息失败: err=%v", err)
return err
}
g.Log().Infof(ctx, "✅ NATS 发布消息成功: StreamSeq=%d, Domain=%s", ack.Sequence, ack.Domain)
return nil
}
// createStreamGroup 内部单个创建消费组
func (q *natsMessageClient) createStreamGroup(ctx context.Context, cfg *NATSConfig) error {
// Stream 不存在,创建新的
storage := jetstream.FileStorage
if !cfg.Durable {
storage = jetstream.MemoryStorage
// createStreamGroup 内部创建消费组
func (c *natsMsg) createStreamGroupInternal(ctx context.Context, subject string, durable, delayMsg bool) error {
streamName, storage := getStreamInfo(durable, delayMsg)
// 先检查 Stream 是否存在
stream, err := js.Stream(ctx, streamName)
if err == nil {
// Stream 已存在,检查配置是否匹配
info, _ := stream.Info(ctx)
if info.Config.AllowMsgSchedules != delayMsg || info.Config.Storage != storage {
g.Log().Infof(ctx, "🔄 Stream 配置不匹配,正在重新创建: stream=%s, 当前AllowMsgSchedules=%v, 需要%v",
streamName, info.Config.AllowMsgSchedules, delayMsg)
// 删除旧 Stream
if err := js.DeleteStream(ctx, streamName); err != nil {
g.Log().Warningf(ctx, "删除旧 Stream 失败: %v", err)
}
if g.IsEmpty(cfg.Replicas) {
cfg.Replicas = 1
} else {
g.Log().Infof(ctx, "✅ Stream 已存在且配置正确: stream=%s", streamName)
return nil
}
}
// 构建流配置
jsConfig := jetstream.StreamConfig{
Name: cfg.Stream,
Subjects: []string{fmt.Sprintf("%s.>", cfg.Stream)},
Replicas: cfg.Replicas,
NoAck: cfg.AutoAck,
AllowMsgSchedules: cfg.DelayMessage, // 延迟消息核心开关
Name: streamName,
Subjects: []string{subject},
AllowMsgSchedules: delayMsg, // 延迟消息核心开关
Storage: storage,
Discard: jetstream.DiscardOld, // 达到上限删除旧消息
}
// 检查流是否已存在
stream, err := js.Stream(ctx, cfg.Stream)
if err == nil {
// 流已存在,更新配置
_, err = js.UpdateStream(ctx, jsConfig)
if err != nil {
return fmt.Errorf("更新任务流失败: %w", err)
}
g.Log().Infof(ctx, "任务流已更新: %s", stream.CachedInfo().Config.Name)
return nil
}
// 创建新流
stream, err = js.CreateStream(ctx, jsConfig)
if err != nil {
return fmt.Errorf("创建任务流失败: %w", err)
}
g.Log().Infof(ctx, "✅ NATS 队列初始化成功: stream=%s, consumer=%s", cfg.Stream, cfg.Consumer)
// 获取 Stream 信息验证配置
info, err := stream.Info(ctx)
if err == nil {
g.Log().Infof(ctx, "✅ NATS 队列初始化成功: stream=%s, AllowMsgSchedules=%v, Storage=%v",
streamName, info.Config.AllowMsgSchedules, info.Config.Storage)
}
g.Log().Infof(ctx, "✅ NATS 队列初始化成功: stream=%s", streamName)
return nil
}
// Publish 发布消息(支持单个或批量)
func (q *natsMessageClient) publish(ctx context.Context, config interface{}, data interface{}) error {
cfg, ok := config.(*NATSConfig)
// Subscribe 订阅消息
func (c *natsMsg) Subscribe(ctx context.Context, msgConfig messageSubscribeConfig) error {
cfg, ok := msgConfig.(*NatsSubscribeMsgConfig)
if !ok {
return fmt.Errorf("无效的 NATS 配置类型")
}
err := q.createStreamGroup(ctx, cfg)
if err != nil {
return err
if g.IsEmpty(cfg.QueueName) {
return fmt.Errorf("必须提供队列名称")
}
payload, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("序列化数据失败: %w", err)
if g.IsEmpty(cfg.ConsumerName) {
return fmt.Errorf("必须提供消费者名称")
}
// 发布消息到 JetStream
subject := fmt.Sprintf("%s.>", cfg.Stream)
_, err = js.Publish(ctx, subject, payload)
if err != nil {
g.Log().Errorf(ctx, "❌ NATS 发布消息失败: topic=%s, err=%v", cfg.Stream, err)
return err
}
g.Log().Infof(ctx, "✅ NATS 发布消息成功: topic=%s", cfg.Stream)
return nil
}
// PublishDelayed 发布延迟消息(支持单个或批量)
func (q *natsMessageClient) publishDelayed(ctx context.Context, config interface{}, data interface{}, delay int) error {
cfg, ok := config.(*NATSConfig)
if !ok {
return fmt.Errorf("无效的 NATS 配置类型")
}
payload, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("序列化数据失败: %w", err)
}
// 使用 goroutine 实现简单的延迟发布
go func() {
time.Sleep(time.Duration(delay))
subject := fmt.Sprintf("%s.>", cfg.Stream)
if err := q.publishInternal(ctx, subject, payload); err != nil {
g.Log().Errorf(ctx, "❌ NATS 延迟消息发布失败: topic=%s, delay=%v, err=%v", cfg.Stream, delay, err)
}
}()
g.Log().Infof(ctx, "✅ NATS 延迟消息已提交: topic=%s, delay=%v", cfg.Stream, delay)
return nil
}
// publishInternal 内部发布消息
func (q *natsMessageClient) publishInternal(ctx context.Context, subject string, payload []byte) error {
_, err := js.Publish(ctx, subject, payload)
return err
}
// Subscribe 订阅消息(支持单个或批量)
func (q *natsMessageClient) subscribe(ctx context.Context, configs ...interface{}) error {
if len(configs) == 0 {
return fmt.Errorf("配置不能为空")
}
for _, config := range configs {
cfg, ok := config.(*NATSConfig)
if !ok {
return fmt.Errorf("无效的 NATS 配置类型")
}
handler := cfg.HandleFunc
if handler == nil {
if g.IsEmpty(cfg.HandleFunc) {
return fmt.Errorf("必须提供处理函数")
}
if err := q.createSubscribe(ctx, cfg, handler); err != nil {
return err
if g.IsEmpty(cfg.PrefetchCount) {
cfg.PrefetchCount = 1
}
}
return nil
return c.createSubscribeInternal(ctx, cfg.QueueName, cfg.ConsumerName, cfg.PrefetchCount, cfg.AutoAck, cfg.Durable, cfg.DelayTime, cfg.HandleFunc)
}
// createSubscribe 内部订阅消息
func (c *natsMsg) createSubscribeInternal(ctx context.Context, subject, consumerName string, prefetchCount int, autoAck, durable bool, delayTime int, handler func(ctx context.Context, message map[string]interface{}) error) error {
g.Log().Infof(ctx, "🔔 NATS 开始订阅: QueueName=%s, ConsumerName=%s", subject, consumerName)
delayMsg := delayTime > 0
streamName, _ := getStreamInfo(durable, delayMsg)
// 确保 Stream 存在,如果不存在则创建
if err := c.createStreamGroupInternal(ctx, subject, durable, delayMsg); err != nil {
g.Log().Errorf(ctx, "创建 Stream 失败: %v", err)
return fmt.Errorf("创建 Stream 失败: %w", err)
}
// subscribe 内部单个订阅消息
func (q *natsMessageClient) createSubscribe(ctx context.Context, cfg *NATSConfig, handler func(ctx context.Context, message map[string]interface{}) error) error {
g.Log().Infof(ctx, "🔔 NATS 开始订阅: stream=%s, consumer=%s", cfg.Stream, cfg.Consumer)
// Stream 不存在,创建新的
ackPolicy := jetstream.AckExplicitPolicy
if cfg.AutoAck {
if autoAck {
ackPolicy = jetstream.AckNonePolicy
}
jsConfig := jetstream.ConsumerConfig{
Name: cfg.Consumer,
Durable: cfg.Consumer,
Name: consumerName,
Durable: consumerName,
FilterSubject: subject,
AckPolicy: ackPolicy,
MaxDeliver: 3,
MaxAckPending: cfg.PrefetchCount,
MaxAckPending: prefetchCount,
}
// 创建新消费者
consumer, err := js.CreateOrUpdateConsumer(ctx, cfg.Stream, jsConfig)
consumer, err := js.CreateOrUpdateConsumer(ctx, streamName, jsConfig)
if err != nil {
return fmt.Errorf("创建消费者失败: %w", err)
g.Log().Errorf(ctx, "创建消费者失败: %v", err)
return err
}
// 获取消费者信息验证
if cInfo, err := consumer.Info(ctx); err == nil {
g.Log().Infof(ctx, "🔔 消费者创建成功: %s, AckPolicy=%v, MaxAckPending=%d",
cInfo.Name, cInfo.Config.AckPolicy, cInfo.Config.MaxAckPending)
}
// 创建消息处理函数
msgHandler := func(msg jetstream.Msg) {
// 记录消息接收时间
now := time.Now()
meta, err := msg.Metadata()
if err == nil {
g.Log().Infof(ctx, "📨 收到消息: StreamSeq=%d, Published=%v, Received=%v, 距离发布=%.2f秒",
meta.Sequence.Stream,
meta.Timestamp.Format("2006-01-02 15:04:05"),
now.Format("2006-01-02 15:04:05"),
now.Sub(meta.Timestamp).Seconds())
}
// 解析消息
var data map[string]any
if err := json.Unmarshal(msg.Data(), &data); err != nil {
g.Log().Errorf(ctx, "解析消息失败: %v", err)
msg.Nak()
if err := msg.Nak(); err != nil {
g.Log().Errorf(ctx, "Nak 失败: %v", err)
}
return
}
// 处理业务逻辑
if err := handler(ctx, data); err != nil {
g.Log().Errorf(ctx, "处理消息失败: %v", err)
msg.Nak()
if err := msg.Nak(); err != nil {
g.Log().Errorf(ctx, "Nak 失败: %v", err)
}
return
}
g.Log().Infof(ctx, "处理消息成功")
if !cfg.AutoAck {
msg.Ack()
if !autoAck {
if err := msg.Ack(); err != nil {
g.Log().Errorf(ctx, "Ack 失败: %v", err)
}
}
}
@@ -202,7 +281,25 @@ func (q *natsMessageClient) createSubscribe(ctx context.Context, cfg *NATSConfig
return fmt.Errorf("开始消费失败: %w", err)
}
g.Log().Infof(ctx, "✅ 开始消费消息: %s/%s", cfg.Stream, cfg.Consumer)
g.Log().Infof(ctx, "✅ NATS 订阅成功")
return nil
}
func getStreamInfo(durable, delayMsg bool) (string, jetstream.StorageType) {
// Stream 不存在,创建新的
streamName := "ordinary_msg_memory"
storage := jetstream.MemoryStorage
// 延迟消息必须使用 FileStorageNATS 官方要求)
if delayMsg {
streamName = "delay_msg_file"
storage = jetstream.FileStorage
} else {
if durable {
streamName = "ordinary_msg_file"
storage = jetstream.FileStorage
}
}
return streamName, storage
}

View File

@@ -38,11 +38,11 @@ var (
// 返回值可以是任意类型,会被自动序列化为 JSON
type rpcHandler func(ctx context.Context, req []byte) (any, error)
// RegisterRPCService 注册 RPC 服务(单实例)
// registerRPCService 注册 RPC 服务(单实例)
// serviceName: 服务名称,调用方通过此名称调用服务
// handler: 服务处理函数,接收请求并返回响应
func registerRPCService(serviceName string, handler rpcHandler) (err error) {
if !checkConnected() {
if !natsPing() {
return fmt.Errorf("NATS 未连接")
}
@@ -74,18 +74,17 @@ func registerRPCService(serviceName string, handler rpcHandler) (err error) {
}
rpcSubs[serviceName] = sub
metrics.SubscribeCount.Add(1)
g.Log().Infof(context.Background(), "✅ RPC 服务已注册: %s", serviceName)
return nil
}
// RegisterQueueRPCService 注册 RPC 服务(集群模式)
// registerQueueRPCService 注册 RPC 服务(集群模式)
// 多个服务实例注册同一服务时,请求会自动负载均衡
// serviceName: 服务名称
// queueName: 队列组名,同一队列组的实例共享请求
// handler: 服务处理函数
func registerQueueRPCService(serviceName, queueName string, handler rpcHandler) (err error) {
if !checkConnected() {
if !natsPing() {
return fmt.Errorf("NATS 未连接")
}
@@ -126,7 +125,6 @@ func registerQueueRPCService(serviceName, queueName string, handler rpcHandler)
queueRPCSubs[queueName][serviceName] = sub
queueRPCMu.Unlock()
metrics.SubscribeCount.Add(1)
g.Log().Infof(context.Background(), "✅ 队列 RPC 服务已注册: %s (队列组: %s)", serviceName, queueName)
return nil
}
@@ -138,16 +136,16 @@ func executeHandler(handler rpcHandler, msg *nats.Msg) {
// 从消息头重建上下文
ctx := headersToContext(context.Background(), msg.Header)
// 提取 TraceID创建可取消的 context
ctx = createCancelContext(ctx, msg.Header.Get(TraceIDKey))
ctx = createCancelContext(ctx, msg.Header.Get(traceIDKey))
// 检查 context 是否已取消(在调用 handler 之前)
select {
case <-ctx.Done():
// context 已取消,返回取消错误
g.Log().Infof(ctx, "RPC 请求已取消traceID: %s", msg.Header.Get(TraceIDKey))
g.Log().Infof(ctx, "RPC 请求已取消traceID: %s", msg.Header.Get(traceIDKey))
// 仍然需要发送响应以避免客户端超时
respData = []byte(`{"_err":"请求已取消"}`)
// 清理取消映射表
cleanupTraceCancel(msg.Header.Get(TraceIDKey))
cleanupTraceCancel(msg.Header.Get(traceIDKey))
return
default:
}
@@ -176,7 +174,7 @@ func executeHandler(handler rpcHandler, msg *nats.Msg) {
g.Log().Errorf(ctx, "RPC 响应失败: %v", err)
}
// 请求结束,清理取消映射表
cleanupTraceCancel(msg.Header.Get(TraceIDKey))
cleanupTraceCancel(msg.Header.Get(traceIDKey))
}
// createCancelContext 创建可取消的 context 并注册到取消映射表
@@ -211,7 +209,7 @@ func createCancelContext(ctx context.Context, traceID string) context.Context {
//
// sub, err := nats.SetupCancelListener(ctx)
func setupCancelListener(ctx context.Context) (*nats.Subscription, error) {
if !checkConnected() {
if !natsPing() {
return nil, fmt.Errorf("NATS 未连接")
}
@@ -253,7 +251,6 @@ func setupCancelListener(ctx context.Context) (*nats.Subscription, error) {
return nil, fmt.Errorf("设置取消监听器失败: %w", err)
}
metrics.SubscribeCount.Add(1)
g.Log().Infof(ctx, "✅ 取消监听器已设置: %s", cancelSubject)
return sub, nil
}
@@ -264,7 +261,7 @@ func setupCancelListener(ctx context.Context) (*nats.Subscription, error) {
//
// err := nats.publishCancel(ctx, traceID)
func publishCancel(ctx context.Context, traceID string) error {
if !checkConnected() {
if !natsPing() {
return fmt.Errorf("NATS 未连接")
}
@@ -306,12 +303,10 @@ func cleanupTraceCancel(traceID string) {
// req: 请求数据
// 返回: 响应数据(任意类型)和错误
func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err error) {
if !checkConnected() {
if !natsPing() {
return fmt.Errorf("NATS 未连接")
}
metrics.RequestCount.Add(1)
// 验证 resp 必须是指针类型
respValue := reflect.ValueOf(resp)
if respValue.Kind() != reflect.Ptr {
@@ -346,7 +341,6 @@ func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err er
// 执行本地调用
var response interface{}
if response, err = localHandler(cancelCtx, reqBody); err != nil {
metrics.RequestError.Add(1)
return fmt.Errorf("本地调用 RPC 服务失败 [%s]: %w", serviceName, err)
}
@@ -357,7 +351,6 @@ func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err er
var respMap map[string]any
if json.Unmarshal(response.([]byte), &respMap) == nil {
if errMsg, ok := respMap["_err"]; ok {
metrics.RequestError.Add(1)
return fmt.Errorf("%v", errMsg)
}
}
@@ -392,17 +385,17 @@ func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err er
})
}
if msg.Header.Get(TraceIDKey) != "" {
if msg.Header.Get(traceIDKey) != "" {
go func() {
defer closeDone()
select {
case <-ctx.Done():
// context 被取消时,发送取消信号给服务端
if errors.Is(ctx.Err(), context.Canceled) {
if err := publishCancel(context.Background(), msg.Header.Get(TraceIDKey)); err != nil {
if err := publishCancel(context.Background(), msg.Header.Get(traceIDKey)); err != nil {
g.Log().Errorf(ctx, "发送 RPC 取消信号失败: %v", err)
} else {
g.Log().Infof(ctx, "RPC 调用已取消traceID: %s", msg.Header.Get(TraceIDKey))
g.Log().Infof(ctx, "RPC 调用已取消traceID: %s", msg.Header.Get(traceIDKey))
}
}
case <-done:
@@ -419,12 +412,10 @@ func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err er
closeDone()
if err != nil {
metrics.RequestError.Add(1)
return fmt.Errorf("调用 RPC 服务失败 [%s]: %w", serviceName, err)
}
if responseMsg == nil {
metrics.RequestError.Add(1)
return fmt.Errorf("RPC 响应为空 [%s]", serviceName)
}
@@ -434,7 +425,6 @@ func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err er
var respMap map[string]any
if json.Unmarshal(responseMsg.Data, &respMap) == nil {
if errMsg, ok := respMap["_err"]; ok {
metrics.RequestError.Add(1)
return fmt.Errorf("%v", errMsg)
}
}
@@ -449,7 +439,7 @@ func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err er
}
// RegisterServiceOption 注册选项类型
type RegisterServiceOption func(*registerServiceConfig)
type registerServiceOption func(*registerServiceConfig)
type registerServiceConfig struct {
queueName string // 队列组名(用于集群模式)
@@ -457,14 +447,14 @@ type registerServiceConfig struct {
}
// WithQueueGroup 设置队列组名(集群模式)
func WithQueueGroup(queueName string) RegisterServiceOption {
func WithQueueGroup(queueName string) registerServiceOption {
return func(cfg *registerServiceConfig) {
cfg.queueName = queueName
}
}
// WithExcludeMethods 排除不需要注册的方法
func WithExcludeMethods(methods ...string) RegisterServiceOption {
func WithExcludeMethods(methods ...string) registerServiceOption {
return func(cfg *registerServiceConfig) {
cfg.excludeMethods = append(cfg.excludeMethods, methods...)
}
@@ -483,9 +473,9 @@ func WithExcludeMethods(methods ...string) RegisterServiceOption {
// AutoRegisterServices(map[string]interface{}{
// "order": orderService,
// }, WithQueueGroup("order-group"))
func AutoRegisterServices(ctx context.Context, serviceInstances map[string]interface{}, options ...RegisterServiceOption) error {
func AutoRegisterServices(ctx context.Context, serviceInstances map[string]interface{}, options ...registerServiceOption) error {
// 先注册 RPC 服务(如果 NATS 不可用则记录警告但不阻塞启动)
if !checkConnected() {
if !natsPing() {
return fmt.Errorf("NATS 未连接RPC 服务未注册")
}
@@ -521,8 +511,8 @@ func AutoRegisterServices(ctx context.Context, serviceInstances map[string]inter
}
// registerService 注册单个服务的所有公开方法(内部函数)
func registerService(service interface{}, serviceNamePrefix string, options ...RegisterServiceOption) (err error) {
if !checkConnected() {
func registerService(service interface{}, serviceNamePrefix string, options ...registerServiceOption) (err error) {
if !natsPing() {
return fmt.Errorf("NATS 未连接")
}
@@ -676,10 +666,10 @@ func registerService(service interface{}, serviceNamePrefix string, options ...R
// ============ 上下文元数据工具函数 ============
// 以下函数用于在 context 和 NATS 消息头之间互转元数据
// 定义常见的上下文元数据 key
// 定义常见的上下文元数据 key(私有)
const (
TraceIDKey = "trace_id"
TokenKey = "token"
traceIDKey = "trace_id"
tokenKey = "token"
)
func getTraceID(ctx context.Context) (traceID string, err error) {
@@ -687,7 +677,7 @@ func getTraceID(ctx context.Context) (traceID string, err error) {
span := trace.SpanFromContext(ctx)
if span != nil && span.SpanContext().HasTraceID() {
traceID = span.SpanContext().TraceID().String()
} else if tid := ctx.Value(TraceIDKey); tid != nil {
} else if tid := ctx.Value(traceIDKey); tid != nil {
traceID = fmt.Sprintf("%v", tid)
}
if traceID == "" {
@@ -705,12 +695,12 @@ func contextToHeaders(ctx context.Context) (nats.Header, error) {
if traceID, err := getTraceID(ctx); err != nil {
return headers, err
} else {
headers.Set(TraceIDKey, traceID)
headers.Set(traceIDKey, traceID)
}
// 提取 token优先级context value > HTTP Authorization header
token := ""
if t := ctx.Value(TokenKey); t != nil {
if t := ctx.Value(tokenKey); t != nil {
token = fmt.Sprintf("%v", t)
} else if r := g.RequestFromCtx(ctx); r != nil {
// 从 HTTP 请求的 Authorization header 中提取 token
@@ -725,7 +715,7 @@ func contextToHeaders(ctx context.Context) (nats.Header, error) {
}
}
if token != "" {
headers.Set(TokenKey, token)
headers.Set(tokenKey, token)
}
return headers, nil
@@ -739,13 +729,13 @@ func headersToContext(ctx context.Context, headers nats.Header) context.Context
}
// 恢复 trace_id
if traceID := headers.Get(TraceIDKey); traceID != "" {
ctx = context.WithValue(ctx, TraceIDKey, traceID)
if traceID := headers.Get(traceIDKey); traceID != "" {
ctx = context.WithValue(ctx, traceIDKey, traceID)
}
// 恢复 token
if token := headers.Get(TokenKey); token != "" {
ctx = context.WithValue(ctx, TokenKey, token)
if token := headers.Get(tokenKey); token != "" {
ctx = context.WithValue(ctx, tokenKey, token)
}
return ctx

View File

@@ -1,351 +0,0 @@
package message
import (
"context"
"sync"
"time"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
rabbitConn *amqp.Connection
rabbitChannel *amqp.Channel
rabbitOnce sync.Once
rabbitMu sync.RWMutex
rabbitCloseWatcher chan struct{}
rabbitWatcherStarted bool
)
// Config RabbitMQ 配置
type RabbitMQConfig1 struct {
Host string
Port int
Username string
Password string
VHost string
}
// rabbitMQConfig 默认配置
func getRabbitMQConfig() *RabbitMQConfig1 {
return &RabbitMQConfig1{
Host: g.Cfg().MustGet(context.Background(), "rabbitmq.host").String(),
Port: g.Cfg().MustGet(context.Background(), "rabbitmq.port").Int(),
Username: g.Cfg().MustGet(context.Background(), "rabbitmq.username").String(),
Password: g.Cfg().MustGet(context.Background(), "rabbitmq.password").String(),
VHost: g.Cfg().MustGet(context.Background(), "rabbitmq.vhost", "/").String(),
}
}
// initRabbitMQ 初始化 RabbitMQ 连接
func initRabbitMQ(ctx context.Context) error {
var err error
rabbitOnce.Do(func() {
cfg := getRabbitMQConfig()
url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost
rabbitConn, err = amqp.Dial(url)
if err != nil {
g.Log().Errorf(ctx, "RabbitMQ 连接失败: %v", err)
return
}
rabbitChannel, err = rabbitConn.Channel()
if err != nil {
g.Log().Errorf(ctx, "创建 RabbitMQ Channel 失败: %v", err)
return
}
rabbitCloseWatcher = make(chan struct{})
if !rabbitWatcherStarted {
go handleRabbitMQConnectionClose(ctx)
rabbitWatcherStarted = true
}
g.Log().Info(ctx, "RabbitMQ 连接成功")
})
return err
}
// getRabbitMQChannel 获取 RabbitMQ Channel
func getRabbitMQChannel() (*amqp.Channel, error) {
rabbitMu.RLock()
defer rabbitMu.RUnlock()
if rabbitChannel == nil || rabbitChannel.IsClosed() {
return nil, gerror.New("RabbitMQ Channel 未初始化或已关闭")
}
return rabbitChannel, nil
}
// getRabbitMQConnection 获取 RabbitMQ 连接
func getRabbitMQConnection() (*amqp.Connection, error) {
rabbitMu.RLock()
defer rabbitMu.RUnlock()
if rabbitConn == nil || rabbitConn.IsClosed() {
return nil, gerror.New("RabbitMQ 连接未初始化或已关闭")
}
return rabbitConn, nil
}
// handleRabbitMQConnectionClose 监听连接关闭并重连
func handleRabbitMQConnectionClose(ctx context.Context) {
for {
select {
case <-rabbitCloseWatcher:
g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态")
return
default:
}
rabbitMu.RLock()
currentConn := rabbitConn
rabbitMu.RUnlock()
if currentConn == nil {
return
}
closeErr := make(chan *amqp.Error, 1)
currentConn.NotifyClose(closeErr)
select {
case err := <-closeErr:
if err != nil {
g.Log().Errorf(ctx, "RabbitMQ 连接关闭: %v尝试重连...", err)
reconnectRabbitMQ(ctx)
}
case <-rabbitCloseWatcher:
g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态")
return
}
}
}
// reconnectRabbitMQ 重新连接
func reconnectRabbitMQ(ctx context.Context) {
rabbitMu.Lock()
defer rabbitMu.Unlock()
for i := 0; i < 10; i++ {
time.Sleep(time.Duration(i+1) * time.Second)
cfg := getRabbitMQConfig()
url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost
var err error
rabbitConn, err = amqp.Dial(url)
if err != nil {
g.Log().Errorf(ctx, "重连失败 (尝试 %d/10): %v", i+1, err)
continue
}
rabbitChannel, err = rabbitConn.Channel()
if err != nil {
g.Log().Errorf(ctx, "创建 Channel 失败 (尝试 %d/10): %v", i+1, err)
continue
}
g.Log().Info(ctx, "RabbitMQ 重连成功")
return
}
g.Log().Fatal(ctx, "RabbitMQ 重连失败,已达到最大重试次数")
}
// startRabbitMQConsumer 启动 RabbitMQ 消费者
func startRabbitMQConsumer(ctx context.Context, msg QueueMessage) error {
// 初始化连接
if err := initRabbitMQ(ctx); err != nil {
return gerror.Wrap(err, "初始化 RabbitMQ 连接失败")
}
// 创建独立 Channel避免并发冲突
conn, err := getRabbitMQConnection()
if err != nil {
return gerror.Wrap(err, "获取RabbitMQ连接失败")
}
ch, err := conn.Channel()
if err != nil {
return gerror.Wrap(err, "创建独立Channel失败")
}
// 声明队列
_, err = ch.QueueDeclare(
msg.Queue, // name
true, // durable
false, // autoDelete
false, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
return gerror.Newf("声明队列失败: %v", err)
}
// 设置 QoS并发控制
prefetchCount := msg.PrefetchCount
if prefetchCount == 0 {
prefetchCount = 1
}
err = ch.Qos(
prefetchCount, // prefetchCount
0, // prefetchSize
false, // global
)
if err != nil {
return gerror.Newf("设置 QoS 失败: %v", err)
}
// 开始消费
msgs, err := ch.Consume(
msg.Queue, // queue
msg.ConsumerTag, // consumer tag
msg.AutoAck, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return gerror.Newf("开始消费失败: %v", err)
}
workerCount := msg.WorkerCount
if workerCount == 0 {
workerCount = 1
}
g.Log().Infof(ctx, "RabbitMQ 消费者已启动: queue=%s, prefetch=%d, workers=%d",
msg.Queue, prefetchCount, workerCount)
// 启动多个 worker
for i := 0; i < workerCount; i++ {
go rabbitMQWorker(ctx, i, msgs, msg)
}
return nil
}
// rabbitMQWorker RabbitMQ 工作协程
func rabbitMQWorker(ctx context.Context, workerID int, msgs <-chan amqp.Delivery, msg QueueMessage) {
g.Log().Debugf(ctx, "RabbitMQ Worker %d 已启动", workerID)
for {
select {
case <-ctx.Done():
g.Log().Infof(ctx, "RabbitMQ Worker %d 收到停止信号,正在退出", workerID)
return
case delivery, ok := <-msgs:
if !ok {
g.Log().Infof(ctx, "RabbitMQ Worker %d 消息通道已关闭,退出", workerID)
return
}
// 反序列化消息
var message map[string]interface{}
if err := gjson.DecodeTo(delivery.Body, &message); err != nil {
g.Log().Errorf(ctx, "RabbitMQ Worker %d 反序列化消息失败: %v", workerID, err)
if !msg.AutoAck {
delivery.Nack(false, false)
}
continue
}
// 处理消息
err := msg.HandleFunc(ctx, message)
if err != nil {
g.Log().Errorf(ctx, "RabbitMQ Worker %d 处理消息失败: %v", workerID, err)
if !msg.AutoAck {
delivery.Nack(false, false)
}
} else {
if !msg.AutoAck {
delivery.Ack(false)
}
g.Log().Debugf(ctx, "RabbitMQ Worker %d 处理消息成功", workerID)
}
}
}
}
// publishToRabbitMQ 发布消息到 RabbitMQ
func publishToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}) (messageID string, err error) {
ch, err := getRabbitMQChannel()
if err != nil {
return
}
body, err := gjson.Encode(message)
if err != nil {
return "", gerror.Newf("消息序列化失败: %v", err)
}
err = ch.PublishWithContext(
ctx,
exchange, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: body,
},
)
if err != nil {
g.Log().Errorf(ctx, "发布消息失败: exchange=%s, routingKey=%s, err=%v", exchange, routingKey, err)
return
}
g.Log().Debugf(ctx, "消息发布成功: exchange=%s, routingKey=%s", exchange, routingKey)
return messageID, nil
}
// publishDelayedToRabbitMQ 发布延时消息到 RabbitMQ
func publishDelayedToRabbitMQ(ctx context.Context, exchange, routingKey string, message interface{}, delaySeconds int) (messageID string, err error) {
ch, err := getRabbitMQChannel()
if err != nil {
return
}
body, err := gjson.Encode(message)
if err != nil {
return "", gerror.Newf("消息序列化失败: %v", err)
}
err = ch.PublishWithContext(
ctx,
exchange, // exchange必须是 x-delayed-message 类型)
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: body,
Headers: amqp.Table{
"x-delay": delaySeconds * 1000, // 延时(毫秒)
},
},
)
if err != nil {
g.Log().Errorf(ctx, "发布延时消息失败: exchange=%s, routingKey=%s, delay=%ds, err=%v", exchange, routingKey, delaySeconds, err)
return
}
g.Log().Debugf(ctx, "延时消息发布成功: exchange=%s, routingKey=%s, delay=%ds", exchange, routingKey, delaySeconds)
return messageID, nil
}

View File

@@ -1,210 +0,0 @@
package message
import (
"context"
"sync"
"time"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
conn *amqp.Connection
channel *amqp.Channel
rabbitmqOnce sync.Once
rabbitmqMu sync.RWMutex
closeWatcher chan struct{} // 用于停止监听 goroutine
watcherStarted bool // 防止重复启动监听
)
// Config RabbitMQ 配置
type Config struct {
Host string
Port int
Username string
Password string
VHost string
}
// Init 初始化 RabbitMQ 连接
func Init(ctx context.Context, cfg *Config) error {
var err error
rabbitmqOnce.Do(func() {
// 构建连接字符串
url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost
// 创建连接
conn, err = amqp.Dial(url)
if err != nil {
g.Log().Errorf(ctx, "RabbitMQ 连接失败: %v", err)
return
}
// 创建 Channel
channel, err = conn.Channel()
if err != nil {
g.Log().Errorf(ctx, "创建 RabbitMQ Channel 失败: %v", err)
return
}
// 初始化关闭监听器
closeWatcher = make(chan struct{})
// 监听连接关闭(只启动一次)
if !watcherStarted {
go handleConnectionClose(ctx)
watcherStarted = true
}
g.Log().Info(ctx, "RabbitMQ 连接成功")
})
return err
}
// InitFromConfig 从配置文件初始化
func InitFromConfig(ctx context.Context) error {
cfg := &Config{
Host: g.Cfg().MustGet(ctx, "rabbitmq.host").String(),
Port: g.Cfg().MustGet(ctx, "rabbitmq.port").Int(),
Username: g.Cfg().MustGet(ctx, "rabbitmq.username").String(),
Password: g.Cfg().MustGet(ctx, "rabbitmq.password").String(),
VHost: g.Cfg().MustGet(ctx, "rabbitmq.vhost", "/").String(),
}
return Init(ctx, cfg)
}
// GetChannel 获取 Channel
func GetChannel() (*amqp.Channel, error) {
rabbitmqMu.RLock()
defer rabbitmqMu.RUnlock()
if channel == nil || channel.IsClosed() {
return nil, gerror.New("RabbitMQ Channel 未初始化或已关闭")
}
return channel, nil
}
// GetConnection 获取连接
func GetConnection() (*amqp.Connection, error) {
rabbitmqMu.RLock()
defer rabbitmqMu.RUnlock()
if conn == nil || conn.IsClosed() {
return nil, gerror.New("RabbitMQ 连接未初始化或已关闭")
}
return conn, nil
}
// handleConnectionClose 监听连接关闭并重连
func handleConnectionClose(ctx context.Context) {
for {
// 检查是否需要停止监听
select {
case <-closeWatcher:
g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态")
return
default:
}
rabbitmqMu.RLock()
currentConn := conn
rabbitmqMu.RUnlock()
if currentConn == nil {
return
}
// 创建关闭通知 channel
closeErr := make(chan *amqp.Error, 1)
currentConn.NotifyClose(closeErr)
// 等待连接关闭或停止信号
select {
case err := <-closeErr:
if err != nil {
g.Log().Errorf(ctx, "RabbitMQ 连接关闭: %v尝试重连...", err)
reconnect(ctx)
}
case <-closeWatcher:
g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态")
return
}
}
}
// reconnect 重新连接
func reconnect(ctx context.Context) {
rabbitmqMu.Lock()
defer rabbitmqMu.Unlock()
for i := 0; i < 10; i++ {
time.Sleep(time.Duration(i+1) * time.Second)
cfg := &Config{
Host: g.Cfg().MustGet(ctx, "rabbitmq.host").String(),
Port: g.Cfg().MustGet(ctx, "rabbitmq.port").Int(),
Username: g.Cfg().MustGet(ctx, "rabbitmq.username").String(),
Password: g.Cfg().MustGet(ctx, "rabbitmq.password").String(),
VHost: g.Cfg().MustGet(ctx, "rabbitmq.vhost", "/").String(),
}
url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost
var err error
conn, err = amqp.Dial(url)
if err != nil {
g.Log().Errorf(ctx, "重连失败 (尝试 %d/10): %v", i+1, err)
continue
}
channel, err = conn.Channel()
if err != nil {
g.Log().Errorf(ctx, "创建 Channel 失败 (尝试 %d/10): %v", i+1, err)
continue
}
g.Log().Info(ctx, "RabbitMQ 重连成功")
// 不再重复启动监听 goroutine
return
}
g.Log().Fatal(ctx, "RabbitMQ 重连失败,已达到最大重试次数")
}
// Close 关闭连接
func Close(ctx context.Context) (err error) {
rabbitmqMu.Lock()
defer rabbitmqMu.Unlock()
// 停止监听 goroutine
if closeWatcher != nil {
close(closeWatcher)
closeWatcher = nil
}
if channel != nil {
if err = channel.Close(); err != nil {
g.Log().Errorf(ctx, "关闭 RabbitMQ Channel 失败: %v", err)
}
channel = nil
}
if conn != nil {
if err = conn.Close(); err != nil {
g.Log().Errorf(ctx, "关闭 RabbitMQ 连接失败: %v", err)
return
}
conn = nil
}
watcherStarted = false
g.Log().Info(ctx, "RabbitMQ 连接已关闭")
return
}

View File

@@ -10,61 +10,129 @@ import (
amqp "github.com/rabbitmq/amqp091-go"
)
// rabbitMQMessageClient RabbitMQ 实现
type rabbitMQMessageClient struct {
clientType messageClientType
type RabbitMQPublishMsgConfig struct {
QueueName string
Durable bool
DelayTime int
Data any
}
// StreamGroup 创建消费组(支持单个或批量)
func (q *rabbitMQMessageClient) streamGroup(ctx context.Context, configs ...interface{}) error {
if len(configs) == 0 {
return fmt.Errorf("配置不能为空")
type RabbitMQSubscribeMsgConfig struct {
QueueName string
Durable bool
DelayTime int
ConsumerName string
AutoAck bool
PrefetchCount int
HandleFunc func(ctx context.Context, message map[string]interface{}) error
}
for _, config := range configs {
cfg, ok := config.(*RabbitMQConfig)
func (*RabbitMQPublishMsgConfig) GetPublishMsgType() {
}
func (*RabbitMQSubscribeMsgConfig) GetSubscribeMsgType() {
}
func init() {
// 注册 RabbitMQ 插件,必须使用 RegisterPlugin 确保连接检测
//registerPlugin(MessageRabbitMQ, func() messageUtil {
// return &rabbitMQ{}
//})
}
type rabbitMQ struct{}
// Ping 检测 RabbitMQ 连接状态
func (c *rabbitMQ) ping(ctx context.Context) bool {
return rabbitmqPing()
}
// Reconnect 重连 RabbitMQ
func (c *rabbitMQ) reconnect(ctx context.Context) error {
return rabbitmqReconnect(ctx)
}
// Close 关闭 RabbitMQ 连接
func (c *rabbitMQ) close(ctx context.Context) error {
return rabbitmqClose(ctx)
}
// Publish 发布消息
func (c *rabbitMQ) Publish(ctx context.Context, msgConfig messagePublishConfig) error {
cfg, ok := msgConfig.(*RabbitMQPublishMsgConfig)
if !ok {
return fmt.Errorf("无效的 RabbitMQ 配置类型")
}
if err := q.setupQueue(ctx, channel, cfg, cfg.DelayMessage); err != nil {
return err
if g.IsEmpty(cfg.QueueName) {
return fmt.Errorf("队列名称不能为空")
}
if cfg.Data == nil {
return fmt.Errorf("数据不能为空")
}
return nil
return c.publishMessageInternal(ctx, cfg.QueueName, cfg.Durable, cfg.DelayTime, cfg.Data)
}
// Publish 发布消息(支持单个或批量)
func (q *rabbitMQMessageClient) publish(ctx context.Context, config interface{}, data interface{}) error {
cfg, ok := config.(*RabbitMQConfig)
if !ok {
return fmt.Errorf("无效的 RabbitMQ 配置类型")
}
if err := q.publishMessage(ctx, cfg, "work", data, 0); err != nil {
g.Log().Errorf(ctx, "❌ RabbitMQ 发布消息失败: err=%v", err)
return err
}
return nil
// publishMessage 发布消息内部实现
func (c *rabbitMQ) publishMessageInternal(ctx context.Context, queueName string, durable bool, delayTime int, data interface{}) error {
delayMsg := delayTime > 0
// 1. 决定 Exchange 类型
exchangeType := "fanout"
exchangeName := queueName
routingKey := queueName
args := amqp.Table{}
if delayMsg {
exchangeType = "x-delayed-message"
exchangeName = queueName + ".delayed"
args["x-delayed-type"] = "fanout" // 底层用 topic
}
// PublishDelayed 发布延迟消息
func (q *rabbitMQMessageClient) publishDelayed(ctx context.Context, config interface{}, data interface{}, delaySeconds int) error {
cfg, ok := config.(*RabbitMQConfig)
if !ok {
return fmt.Errorf("无效的 RabbitMQ 配置类型")
}
if err := q.publishMessage(ctx, cfg, "delayed", data, delaySeconds); err != nil {
g.Log().Errorf(ctx, "❌ RabbitMQ 发布延迟消息失败: err=%v", err)
return err
}
return nil
// 2. 声明 Exchange只声明一次
if err := channel.ExchangeDeclare(
queueName, // exchange 交换机名称
exchangeType,
durable,
false, // autoDelete
false, // internal
false, // noWait
args,
); err != nil {
return fmt.Errorf("声明 Exchange 失败: %w", err)
}
func (q *rabbitMQMessageClient) publishMessage(ctx context.Context, cfg *RabbitMQConfig, mode string, data interface{}, delaySeconds int) error {
// 3. 声明队列
if _, err := channel.QueueDeclare(
queueName,
durable,
false, // autoDelete
false, // exclusive
false, // noWait
nil, // args
); err != nil {
return fmt.Errorf("声明队列失败: %w", err)
}
// 4. 绑定队列
if err := channel.QueueBind(
queueName,
routingKey, // routingKey 路由键
exchangeName, // exchange 交换机名称
false, // noWait
nil, // args
); err != nil {
return fmt.Errorf("绑定队列失败: %w", err)
}
// 5. 序列化数据
body, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("序列化数据失败: %w", err)
}
// 6. 发布消息
deliveryMode := amqp.Transient
if cfg.Durable {
if durable {
deliveryMode = amqp.Persistent
}
publishing := amqp.Publishing{
@@ -73,15 +141,15 @@ func (q *rabbitMQMessageClient) publishMessage(ctx context.Context, cfg *RabbitM
DeliveryMode: deliveryMode,
Timestamp: time.Now(),
}
if delaySeconds > 0 {
if delayMsg {
duration := time.Duration(delayTime) * time.Minute
publishing.Headers = amqp.Table{
"x-delay": delaySeconds * 1000, // 延时间(毫秒)
"x-delay": duration, // 延时间(毫秒)
}
}
exchange, routingKey := q.parseExchangeAndRoutingKey(ctx, mode, cfg)
err = channel.PublishWithContext(
ctx,
exchange,
exchangeName,
routingKey,
false, false,
publishing,
@@ -89,102 +157,40 @@ func (q *rabbitMQMessageClient) publishMessage(ctx context.Context, cfg *RabbitM
return err
}
func (q *rabbitMQMessageClient) parseExchangeAndRoutingKey(_ context.Context, mode string, cfg *RabbitMQConfig) (exchange, routingKey string) {
switch mode {
case "work", "":
exchange = "" // 默认交换机
routingKey = cfg.Name // 队列名
case "event", "topic":
exchange = cfg.Exchange
routingKey = cfg.Topic
case "broadcast":
exchange = cfg.Exchange
routingKey = "" // fanout忽略路由键
case "delayed":
exchange = cfg.Exchange + ".delayed"
routingKey = cfg.Topic
default:
exchange = ""
routingKey = cfg.Name
}
return exchange, routingKey
}
// setupQueue 统一的队列设置方法(声明 Exchange、队列、绑定、延迟 Exchange
func (q *rabbitMQMessageClient) setupQueue(ctx context.Context, ch *amqp.Channel, cfg *RabbitMQConfig, delayMessage bool) error {
exchange, routingKey := q.parseExchangeAndRoutingKey(ctx, cfg.Mode, cfg)
// 声明 Exchange
if err := ch.ExchangeDeclare(exchange, "topic", cfg.Durable, false, false, false, nil); err != nil {
return fmt.Errorf("声明 Exchange 失败: %w", err)
}
// 声明队列
if _, err := ch.QueueDeclare(cfg.Queue, cfg.Durable, false, false, false, nil); err != nil {
return fmt.Errorf("声明队列失败: %w", err)
}
// 绑定队列
if err := ch.QueueBind(cfg.Queue, routingKey, exchange, false, nil); err != nil {
return fmt.Errorf("绑定队列失败: %w", err)
}
// 声明延迟 Exchange如果需要
if delayMessage {
if err := ch.ExchangeDeclare(exchange, "x-delayed-message", true, false, false, false, amqp.Table{"x-delayed-type": "direct"}); err != nil {
return fmt.Errorf("声明延迟 Exchange 失败: %w", err)
}
if err := ch.QueueBind(cfg.Name, routingKey, exchange, false, nil); err != nil {
return fmt.Errorf("绑定延迟队列失败: %w", err)
}
}
return nil
}
// Subscribe 订阅消息(支持单个或批量)
func (q *rabbitMQMessageClient) subscribe(ctx context.Context, configs ...interface{}) error {
if len(configs) == 0 {
return fmt.Errorf("配置不能为空")
}
for _, config := range configs {
cfg, ok := config.(*RabbitMQConfig)
// Subscribe 订阅消息
func (c *rabbitMQ) Subscribe(ctx context.Context, msgConfig messageSubscribeConfig) error {
cfg, ok := msgConfig.(*RabbitMQSubscribeMsgConfig)
if !ok {
return fmt.Errorf("无效的 RabbitMQ 配置类型")
}
handler := cfg.HandleFunc
if handler == nil {
if g.IsEmpty(cfg.QueueName) {
return fmt.Errorf("队列名称不能为空")
}
if g.IsEmpty(cfg.ConsumerName) {
return fmt.Errorf("消费者名称不能为空")
}
if g.IsEmpty(cfg.PrefetchCount) {
cfg.PrefetchCount = 1
}
if g.IsEmpty(cfg.HandleFunc) {
return fmt.Errorf("必须提供处理函数")
}
if err := q.createSubscribe(ctx, cfg, handler); err != nil {
return err
}
}
return nil
return c.createSubscribeInternal(ctx, cfg.QueueName, cfg.ConsumerName, cfg.PrefetchCount, cfg.AutoAck, cfg.HandleFunc)
}
// subscribe 内部单个订阅消息
func (q *rabbitMQMessageClient) createSubscribe(ctx context.Context, cfg *RabbitMQConfig, handler func(ctx context.Context, message map[string]interface{}) error) error {
g.Log().Infof(ctx, "🔔 RabbitMQ 开始订阅: exchange=%s, queue=%s", cfg.Exchange, cfg.Queue)
// createSubscribe 内部订阅消息
func (c *rabbitMQ) createSubscribeInternal(ctx context.Context, queueName, consumerName string, prefetchCount int, autoAck bool, handler func(ctx context.Context, message map[string]interface{}) error) error {
g.Log().Infof(ctx, "🔔 RabbitMQ 开始订阅: queueName=%s, consumerName=%s", queueName, consumerName)
// 设置 Qos (预取数量),控制每次推送的消息数量
// prefetchCount: 未 ACK 消息的最大数量
// prefetchSize: 未 ACK 消息的总大小0 表示不限制)
// global: false 表示仅应用于当前消费者
prefetchCount := cfg.PrefetchCount
if prefetchCount <= 0 {
prefetchCount = 10 // 默认值为 10
}
if err := channel.Qos(prefetchCount, 0, false); err != nil {
return fmt.Errorf("设置 Qos 失败: %w", err)
}
g.Log().Infof(ctx, "📊 设置 Prefetch Count: %d", prefetchCount)
msg, err := channel.Consume(
cfg.Queue, // queue
cfg.Queue, // consumer
cfg.AutoAck, // auto-ack (根据配置决定)
queueName, // queue
consumerName, // consumer
autoAck, // auto-ack (根据配置决定)
false, // exclusive
false, // no-local
false, // no-wait
@@ -207,7 +213,7 @@ func (q *rabbitMQMessageClient) createSubscribe(ctx context.Context, cfg *Rabbit
for {
select {
case <-ctx.Done():
g.Log().Infof(ctx, "🔕 RabbitMQ 消费者停止: queue=%s", cfg.Queue)
g.Log().Infof(ctx, "🔕 RabbitMQ 消费者停止: queueName=%s, consumerName=%s", queueName, consumerName)
return
case msg, ok := <-msg:
if !ok {
@@ -226,11 +232,11 @@ func (q *rabbitMQMessageClient) createSubscribe(ctx context.Context, cfg *Rabbit
}
}()
if err := q.handleMessageWithRetry(ctx, m, handler, cfg.MaxRetry); err != nil {
if err := c.handleMessageWithRetryInternal(ctx, m, handler, autoAck); err != nil {
g.Log().Errorf(ctx, "❌ 消息处理失败(重试次数耗尽): %v", err)
// 仅在手动 ACK 模式下拒绝消息
if !cfg.AutoAck {
if !autoAck {
// 拒绝消息不再重新入队(避免死循环)
m.Nack(false, false)
}
@@ -238,7 +244,7 @@ func (q *rabbitMQMessageClient) createSubscribe(ctx context.Context, cfg *Rabbit
}
// 仅在手动 ACK 模式下确认消息
if cfg.AutoAck {
if autoAck {
if err := m.Ack(false); err != nil {
g.Log().Errorf(ctx, "❌ ACK 消息失败: %v", err)
}
@@ -252,7 +258,7 @@ func (q *rabbitMQMessageClient) createSubscribe(ctx context.Context, cfg *Rabbit
}
// handleMessageWithRetry 处理消息(支持重试)
func (q *rabbitMQMessageClient) handleMessageWithRetry(ctx context.Context, msg amqp.Delivery, handler func(ctx context.Context, message map[string]interface{}) error, maxRetry int) error {
func (c *rabbitMQ) handleMessageWithRetryInternal(ctx context.Context, msg amqp.Delivery, handler func(ctx context.Context, message map[string]interface{}) error, autoAck bool) error {
var data map[string]interface{}
if err := json.Unmarshal(msg.Body, &data); err != nil {
@@ -263,6 +269,7 @@ func (q *rabbitMQMessageClient) handleMessageWithRetry(ctx context.Context, msg
}
// 重试逻辑
const maxRetry = 3
for attempt := 0; attempt <= maxRetry; attempt++ {
if attempt > 0 {
g.Log().Infof(ctx, "🔄 消息处理重试 (第%d次)", attempt)

View File

@@ -1,275 +0,0 @@
package message
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/util/gconv"
)
// StreamMessage Redis Stream 消息结构
type StreamMessage struct {
ID string // 消息ID自动生成
Values map[string]interface{} // 消息内容
}
// getClient 获取 Redis 客户端
func getRedisClientTest(name string) *gredis.Redis {
return g.Redis(name)
}
// getRedisClientByDB 根据DB获取Redis客户端如果db<=0则返回默认客户端
func getRedisClientByDB(db int) *gredis.Redis {
if db <= 0 {
return g.Redis()
}
// 创建连接到指定DB的Redis客户端
client, err := gredis.New(&gredis.Config{
Address: g.Cfg().MustGet(context.Background(), "redis.default.address").String(),
Db: db,
})
if err != nil {
glog.Errorf(context.Background(), "创建Redis客户端失败: %v", err)
return g.Redis()
}
return client
}
// lock 分布式锁
func lock(ctx context.Context, key string, expireSeconds int64, fn func(ctx context.Context) error) (success bool, err error) {
ds, err := GetManager().GetDefaultDataSource()
if err != nil {
return false, fmt.Errorf("获取默认数据源失败: %w", err)
}
maxRetries := 3
for i := 0; i < maxRetries; i++ {
if val, err := ds.Redis().Set(ctx, key, true, gredis.SetOption{
TTLOption: gredis.TTLOption{
EX: &expireSeconds,
},
NX: true,
}); err != nil {
return false, err
} else {
if val.Bool() {
defer func(redisClient *gredis.Redis, ctx context.Context, key string) {
if _, err = redisClient.Del(ctx, key); err != nil {
glog.Errorf(ctx, "RedisClient.Del error: %v", err)
}
}(ds.Redis(), ctx, key)
if err = fn(ctx); err != nil {
return false, err
}
return true, nil
} else {
// 检查上下文是否已取消
if ctx.Err() != nil {
return false, ctx.Err()
}
// 非最后一次重试时才等待
if i < maxRetries-1 {
time.Sleep(time.Second)
}
}
}
}
return false, errors.New("锁重试次数耗尽")
}
// publishToRedis 将消息添加到 Redis Stream
func publishToRedis(ctx context.Context, streamKey string, msg interface{}) (messageID string, err error) {
ds, err := GetManager().GetDefaultDataSource()
if err != nil {
return "", fmt.Errorf("获取默认数据源失败: %w", err)
}
if !ds.IsConnected() {
if err := ds.Reconnect(ctx); err != nil {
return "", fmt.Errorf("redis重连失败: %w", err)
}
}
values := gconv.Map(msg)
args := make([]interface{}, 0, len(values)*2+2)
args = append(args, streamKey, "*")
for key, val := range values {
args = append(args, key, val)
}
result, err := ds.Redis().Do(ctx, "XADD", args...)
if err != nil {
return
}
messageID = result.String()
return
}
// initStreamGroup 初始化 Stream 和消费者组
func initStreamGroup(ctx context.Context, streamKey, groupName string) error {
ds, err := GetManager().GetDefaultDataSource()
if err != nil {
return fmt.Errorf("获取默认数据源失败: %w", err)
}
if !ds.IsConnected() {
if err := ds.Reconnect(ctx); err != nil {
return fmt.Errorf("redis重连失败: %w", err)
}
}
_, err = ds.Redis().Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM")
if err != nil {
// 如果组已存在,忽略错误
errStr := err.Error()
// 检查错误是否是 "BUSYGROUP Consumer Group name already exists"
if strings.Contains(errStr, "BUSYGROUP") || strings.Contains(errStr, "already exists") {
// 这是一个预期的情况,说明消费者组已经存在,无需处理
return nil
}
// 这是一个真正的错误,需要记录或处理
return err
}
return nil
}
// readFromStream 从 Stream 读取消息
func readFromStream(ctx context.Context, msg QueueMessage) error {
ds, err := GetManager().GetDefaultDataSource()
if err != nil {
return fmt.Errorf("获取默认数据源失败: %w", err)
}
// 初始化 Stream 和消费者组
if err := initStreamGroup(ctx, msg.StreamKey, msg.GroupName); err != nil {
return err
}
go func() {
RECONNECT:
for {
result, err := ds.Redis().Do(ctx, "XREADGROUP", "GROUP", msg.GroupName, msg.ConsumerName, "COUNT", msg.BatchSize, "BLOCK", 0, "STREAMS", msg.StreamKey, ">")
if err != nil {
//select {
//case <-ctx.Done():
// return
//}
time.Sleep(time.Second)
goto RECONNECT
}
// 检查返回结果是否为空
if result == nil || result.IsEmpty() {
continue
}
messages := make([]StreamMessage, 0, int(msg.BatchSize))
// 尝试 map 格式GoFrame gredis 返回)
if streamsMap, ok := result.Val().(map[interface{}]interface{}); ok {
for _, streamMsgs := range streamsMap {
msgsArray, ok := streamMsgs.([]interface{})
if !ok {
continue
}
for _, msgData := range msgsArray {
msgArray, ok := msgData.([]interface{})
if !ok || len(msgArray) < 2 {
continue
}
msgID := gconv.String(msgArray[0])
fieldsArray, ok := msgArray[1].([]interface{})
if !ok {
continue
}
values := make(map[string]interface{}, len(fieldsArray)/2)
for i := 0; i < len(fieldsArray); i += 2 {
if i+1 < len(fieldsArray) {
key := gconv.String(fieldsArray[i])
values[key] = fieldsArray[i+1]
}
}
messages = append(messages, StreamMessage{
ID: msgID,
Values: values,
})
}
}
}
// 尝试数组格式(标准 Redis 返回)
if streamsArray, ok := result.Val().([]interface{}); ok && len(streamsArray) > 0 {
for _, streamData := range streamsArray {
streamArray, ok := streamData.([]interface{})
if !ok || len(streamArray) < 2 {
continue
}
messagesArray, ok := streamArray[1].([]interface{})
if !ok {
continue
}
for _, msgData := range messagesArray {
msgArray, ok := msgData.([]interface{})
if !ok || len(msgArray) < 2 {
continue
}
msgID := gconv.String(msgArray[0])
fieldsArray, ok := msgArray[1].([]interface{})
if !ok {
continue
}
values := make(map[string]interface{}, len(fieldsArray)/2)
for i := 0; i < len(fieldsArray); i += 2 {
if i+1 < len(fieldsArray) {
key := gconv.String(fieldsArray[i])
values[key] = fieldsArray[i+1]
}
}
messages = append(messages, StreamMessage{
ID: msgID,
Values: values,
})
}
}
}
// 处理消息
for _, streamMsg := range messages {
// 业务处理
if err := msg.HandleFunc(ctx, streamMsg.Values); err != nil {
glog.Infof(ctx, "业务处理失败-> err:%v\n", err)
continue
}
// 确认消息
if msg.AutoAck {
err := ackMessage(ctx, msg.StreamKey, msg.GroupName, streamMsg.ID)
if err != nil {
glog.Infof(ctx, "消费者 '%s' 确认消息 ID %s 失败: %v\n", msg.ConsumerName, streamMsg.ID, err)
}
}
}
}
}()
return nil
}
// ackMessage 确认消息已处理
func ackMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error {
ds, err := GetManager().GetDefaultDataSource()
if err != nil {
return fmt.Errorf("获取默认数据源失败: %w", err)
}
if !ds.IsConnected() {
if err := ds.Reconnect(ctx); err != nil {
return fmt.Errorf("redis重连失败: %w", err)
}
}
args := make([]interface{}, 0, len(messageIDs)+2)
args = append(args, streamKey, groupName)
for _, id := range messageIDs {
args = append(args, id)
}
_, err = ds.Redis().Do(ctx, "XACK", args...)
return err
}

View File

@@ -1,468 +0,0 @@
// =============================================================================
// Redis 数据源连接管理
// 使用 GoFrame 框架自带的 Redis 客户端,负责数据源的连接、重连、健康检查和优雅关闭
// =============================================================================
package message
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/util/gconv"
)
// =============================================================================
// 数据源配置结构
// =============================================================================
type RedisDataSourceConfig struct {
Name string `json:"name"` // 数据源名称
Address string `json:"address"` // Redis 地址,如: 127.0.0.1:6379
Db int `json:"db"` // 数据库编号
Pass string `json:"pass"` // 密码
Timeout time.Duration `json:"timeout"` // 连接超时
MaxIdle int `json:"maxIdle"` // 最大空闲连接数
MaxOpen int `json:"maxOpen"` // 最大活跃连接数
}
// =============================================================================
// 单个数据源接口
// =============================================================================
type DataSource interface {
Name() string
Redis() *gredis.Redis
IsConnected() bool
Connect(ctx context.Context) error
Reconnect(ctx context.Context) error
Close(ctx context.Context) error
}
// =============================================================================
// 数据源实现
// =============================================================================
type BaseDataSource struct {
config *RedisDataSourceConfig
redis *gredis.Redis
isConnected bool
mu sync.RWMutex
lastError error
lastErrorTime time.Time
metrics RedisMetrics
}
func NewBaseDataSource(config *RedisDataSourceConfig) *BaseDataSource {
return &BaseDataSource{
config: config,
isConnected: false,
}
}
func (d *BaseDataSource) Name() string {
return d.config.Name
}
func (d *BaseDataSource) Redis() *gredis.Redis {
d.mu.RLock()
defer d.mu.RUnlock()
return d.redis
}
func (d *BaseDataSource) IsConnected() bool {
d.mu.RLock()
defer d.mu.RUnlock()
return d.isConnected && d.redis != nil
}
func (d *BaseDataSource) Connect(ctx context.Context) error {
d.mu.Lock()
defer d.mu.Unlock()
// 设置默认值
config := d.config
if config.Timeout == 0 {
config.Timeout = 10 * time.Second
}
if config.MaxIdle == 0 {
config.MaxIdle = 10
}
if config.MaxOpen == 0 {
config.MaxOpen = 100
}
// 构建 GoFrame Redis 配置
redisConfig := &gredis.Config{
Address: config.Address,
Db: config.Db,
Pass: config.Pass,
}
// 使用 GoFrame 的 Redis 连接
redisObj, err := gredis.New(redisConfig)
if err != nil {
d.isConnected = false
d.lastError = err
d.lastErrorTime = time.Now()
d.metrics.PingError.Add(1)
return fmt.Errorf("datasource [%s] connection failed: %w", d.config.Name, err)
}
d.redis = redisObj
// 测试连接
if err := d.Ping(ctx); err != nil {
d.isConnected = false
d.lastError = err
d.lastErrorTime = time.Now()
return fmt.Errorf("datasource [%s] ping failed: %w", d.config.Name, err)
}
d.isConnected = true
d.lastError = nil
glog.Infof(ctx, "✅ datasource [%s] connected successfully", d.config.Name)
return nil
}
func (d *BaseDataSource) Ping(ctx context.Context) error {
defer func() {
if r := recover(); r != nil {
d.metrics.PingError.Add(1)
glog.Errorf(ctx, "❌ datasource [%s] ping panic: %v", d.config.Name, r)
}
}()
if d.redis == nil {
d.metrics.PingError.Add(1)
return fmt.Errorf("redis client is nil")
}
_, err := d.redis.Do(ctx, "PING")
if err != nil {
d.metrics.PingError.Add(1)
return err
}
d.metrics.PingCount.Add(1)
return nil
}
func (d *BaseDataSource) Reconnect(ctx context.Context) error {
glog.Infof(ctx, "🔄 reconnecting datasource [%s]", d.config.Name)
return d.Connect(ctx)
}
func (d *BaseDataSource) Close(ctx context.Context) error {
d.mu.Lock()
defer d.mu.Unlock()
if d.redis != nil {
if err := d.redis.Close(ctx); err != nil {
return fmt.Errorf("datasource [%s] close failed: %w", d.config.Name, err)
}
}
d.isConnected = false
d.redis = nil
glog.Infof(ctx, "datasource [%s] closed", d.config.Name)
return nil
}
func (d *BaseDataSource) GetMetrics() RedisMetrics {
return d.metrics
}
// =============================================================================
// 监控指标
// =============================================================================
type RedisMetrics struct {
PingCount atomic.Int64
PingError atomic.Int64
CommandCount atomic.Int64
CommandError atomic.Int64
}
// GetPingMetrics 获取 Ping 相关指标
func (m *RedisMetrics) GetPingMetrics() (int64, int64) {
return m.PingCount.Load(), m.PingError.Load()
}
// GetCommandMetrics 获取命令相关指标
func (m *RedisMetrics) GetCommandMetrics() (int64, int64) {
return m.CommandCount.Load(), m.CommandError.Load()
}
// =============================================================================
// 多数据源管理器
// =============================================================================
type DataSourceManager struct {
sources map[string]DataSource
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
started bool
maxRetries int
metrics RedisMetrics
}
var (
manager *DataSourceManager
once sync.Once
)
// GetManager 获取全局管理器
func GetManager() *DataSourceManager {
once.Do(func() {
ctx, cancel := context.WithCancel(context.Background())
manager = &DataSourceManager{
sources: make(map[string]DataSource),
ctx: ctx,
cancel: cancel,
started: false,
maxRetries: 3,
}
})
return manager
}
// RegisterDataSource 注册数据源
func (m *DataSourceManager) RegisterDataSource(config *RedisDataSourceConfig) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, exists := m.sources[config.Name]; exists {
return fmt.Errorf("datasource [%s] already exists", config.Name)
}
source := NewBaseDataSource(config)
m.sources[config.Name] = source
return nil
}
// GetDataSource 获取数据源
func (m *DataSourceManager) GetDataSource(name string) (DataSource, error) {
m.mu.RLock()
defer m.mu.RUnlock()
source, exists := m.sources[name]
if !exists {
return nil, fmt.Errorf("datasource [%s] not found", name)
}
return source, nil
}
// GetAllDataSourceNames 获取所有数据源名称
func (m *DataSourceManager) GetAllDataSourceNames() []string {
m.mu.RLock()
defer m.mu.RUnlock()
names := make([]string, 0, len(m.sources))
for name := range m.sources {
names = append(names, name)
}
return names
}
// GetDefaultDataSource 获取默认数据源(第一个注册的数据源)
func (m *DataSourceManager) GetDefaultDataSource() (DataSource, error) {
m.mu.RLock()
defer m.mu.RUnlock()
for _, source := range m.sources {
return source, nil
}
return nil, fmt.Errorf("no datasource available")
}
// GetMetrics 获取全局监控指标
func (m *DataSourceManager) GetMetrics() RedisMetrics {
return m.metrics
}
// init 初始化多数据源
func init() {
ctx := context.Background()
// 从配置初始化多数据源
if err := GetManager().InitializeFromConfig(ctx); err != nil {
glog.Errorf(ctx, "❌ Failed to initialize Redis datasources: %v", err)
} else {
glog.Infof(ctx, "✅ Redis datasources initialized: %v", GetManager().GetAllDataSourceNames())
}
// 启动健康检查
GetManager().StartHealthCheck()
// 设置优雅关闭
setupGracefulShutdown()
}
// InitializeFromConfig 从配置初始化数据源
// 动态读取 config.yml 中 redis 下的所有配置项
func (m *DataSourceManager) InitializeFromConfig(ctx context.Context) error {
var firstErr error
// 获取 redis 配置下的所有子键
redisConfig := g.Cfg().MustGet(ctx, "redis")
if redisConfig.IsNil() {
glog.Warningf(ctx, "no redis configuration found in config.yml")
return nil
}
// 将配置转换为 map
configMap := redisConfig.Map()
if configMap == nil {
glog.Warningf(ctx, "redis configuration is not a map")
return nil
}
// 遍历所有 redis 子配置
for name, subConfig := range configMap {
// 跳过非对象类型的配置
subMap, ok := subConfig.(map[string]interface{})
if !ok {
continue
}
// 检查是否有 address 配置
address, hasAddress := subMap["address"]
if !hasAddress || gconv.String(address) == "" {
continue
}
// 构建数据源配置
config := &RedisDataSourceConfig{
Name: name,
Address: gconv.String(address),
Db: gconv.Int(subMap["db"]),
Pass: gconv.String(subMap["pass"]),
}
// 设置默认值
if config.Db == 0 {
config.Db = 0
}
if config.Timeout == 0 {
config.Timeout = 10 * time.Second
}
if config.MaxIdle == 0 {
config.MaxIdle = 10
}
if config.MaxOpen == 0 {
config.MaxOpen = 100
}
// 注册数据源
if err := m.RegisterDataSource(config); err != nil {
glog.Errorf(ctx, "failed to register datasource [%s]: %v", name, err)
if firstErr == nil {
firstErr = err
}
continue
}
// 连接数据源
source, _ := m.GetDataSource(name)
if err := source.Connect(ctx); err != nil {
glog.Errorf(ctx, "failed to initialize datasource [%s]: %v", name, err)
if firstErr == nil {
firstErr = err
}
}
}
return firstErr
}
// StartHealthCheck 启动健康检查
func (m *DataSourceManager) StartHealthCheck() {
if m.started {
return
}
m.started = true
go m.healthCheckLoop()
}
// healthCheckLoop 健康检查循环
func (m *DataSourceManager) healthCheckLoop() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
m.checkAndReconnect()
}
}
}
// checkAndReconnect 检查并重新连接
func (m *DataSourceManager) checkAndReconnect() {
m.mu.RLock()
defer m.mu.RUnlock()
for name, source := range m.sources {
if !source.IsConnected() {
glog.Warningf(context.Background(), "datasource [%s] disconnected, attempting reconnect", name)
reconnectCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := source.Reconnect(reconnectCtx); err != nil {
glog.Errorf(reconnectCtx, "datasource [%s] reconnect failed: %v", name, err)
} else {
glog.Infof(reconnectCtx, "✅ datasource [%s] reconnected successfully", name)
}
}
}
}
// CloseAll 关闭所有数据源
func (m *DataSourceManager) CloseAll(ctx context.Context) error {
m.cancel()
m.mu.RLock()
defer m.mu.RUnlock()
var lastErr error
for name, source := range m.sources {
if err := source.Close(ctx); err != nil {
glog.Errorf(ctx, "failed to close datasource [%s]: %v", name, err)
lastErr = err
}
}
return lastErr
}
// setupGracefulShutdown 设置优雅关闭
func setupGracefulShutdown() {
go func() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
glog.Info(ctx, "🔄 Shutting down Redis connections...")
if err := GetManager().CloseAll(ctx); err != nil {
glog.Errorf(ctx, "❌ Failed to close Redis connections: %v", err)
} else {
glog.Info(ctx, "✅ Redis connections closed successfully")
}
}()
}

View File

@@ -2,7 +2,6 @@ package message
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
@@ -12,151 +11,137 @@ import (
"github.com/gogf/gf/v2/util/gconv"
)
// redisMessageClient Redis 实现
type redisMessageClient struct {
clientType messageClientType
type RedisPublishMsgConfig struct {
QueueName string
Data any
}
type RedisSubscribeMsgConfig struct {
QueueName string
ConsumerName string
AutoAck bool
PrefetchCount int
HandleFunc func(ctx context.Context, message map[string]interface{}) error
}
func (*RedisPublishMsgConfig) GetPublishMsgType() {
}
func (*RedisSubscribeMsgConfig) GetSubscribeMsgType() {
}
func init() {
// 注册 Redis 插件(连接由 RegisterPlugin 异步处理)
registerPlugin(MessageRedis, func() messageUtil {
return &redis{}
})
}
type redis struct{}
// RedisStreamMessage Redis Stream 消息结构
type RedisStreamMessage struct {
type redisStreamMessage struct {
ID string
Values map[string]interface{}
}
// StreamGroup 创建消费组(支持单个或批量)
func (q *redisMessageClient) streamGroup(ctx context.Context, configs ...interface{}) error {
if len(configs) == 0 {
return fmt.Errorf("配置不能为空")
// Ping 检测 Redis 连接状态
func (c *redis) ping(ctx context.Context) bool {
conn, err := getDefaultDataSource()
if err != nil {
return false
}
for _, config := range configs {
cfg, ok := config.(*RedisConfig)
return conn.redisPing(ctx)
}
// Reconnect 重连 Redis
func (c *redis) reconnect(ctx context.Context) error {
conn, err := getDefaultDataSource()
if err != nil {
return fmt.Errorf("获取默认连接失败: %w", err)
}
if err := conn.redisReconnect(ctx); err != nil {
return fmt.Errorf("redis重连失败: %w", err)
}
return nil
}
// Close 关闭 Redis 连接
func (c *redis) close(ctx context.Context) error {
conn, err := getDefaultDataSource()
if err != nil {
return fmt.Errorf("获取默认连接失败: %w", err)
}
if err := conn.redisClose(ctx); err != nil {
return fmt.Errorf("关闭redis连接失败: %w", err)
}
return nil
}
// Publish 发布消息
func (c *redis) Publish(ctx context.Context, msgConfig messagePublishConfig) error {
cfg, ok := msgConfig.(*RedisPublishMsgConfig)
if !ok {
return fmt.Errorf("无效的 Redis 配置类型")
}
if err := q.createStreamGroup(ctx, cfg); err != nil {
return err
if g.IsEmpty(cfg.QueueName) {
return fmt.Errorf("队列名称不能为空")
}
if g.IsEmpty(cfg.Data) {
return fmt.Errorf("数据不能为空")
}
return nil
}
// streamGroup 内部单个创建消费组
func (q *redisMessageClient) createStreamGroup(ctx context.Context, cfg *RedisConfig) error {
// 获取默认数据源
ds, err := GetManager().GetDefaultDataSource()
conn, err := getDefaultDataSource()
if err != nil {
return fmt.Errorf("获取默认数据源失败: %w", err)
return fmt.Errorf("获取默认连接失败: %w", err)
}
// 检查连接状态,未连接则自动重连
if !ds.IsConnected() {
if err := ds.Reconnect(ctx); err != nil {
if !conn.getIsConnected() {
if err := conn.redisReconnect(ctx); err != nil {
return fmt.Errorf("redis重连失败: %w", err)
}
}
_, err = ds.Redis().Do(ctx, "XGROUP", "CREATE", cfg.Stream, cfg.Group, "0", "MKSTREAM")
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, "BUSYGROUP") && strings.Contains(errStr, "already exists") {
glog.Infof(ctx, "✅ Redis 消费者组已存在: %s", cfg.Group)
return nil
}
return fmt.Errorf("初始化消费者组失败: %w", err)
}
glog.Infof(ctx, "✅ Redis 消费者组创建成功: %s", cfg.Group)
return nil
}
// Publish 内部单个发布消息
func (q *redisMessageClient) publish(ctx context.Context, config interface{}, data interface{}) error {
ds, err := GetManager().GetDefaultDataSource()
if err != nil {
return fmt.Errorf("获取默认数据源失败: %w", err)
}
if !ds.IsConnected() {
if err := ds.Reconnect(ctx); err != nil {
return fmt.Errorf("redis重连失败: %w", err)
}
}
cfg, ok := config.(*RedisConfig)
if !ok {
return fmt.Errorf("无效的redis配置类型")
}
values := gconv.Map(data)
values := gconv.Map(cfg.Data)
args := make([]interface{}, 0, len(values)*2+2)
args = append(args, cfg.Stream, "*")
args = append(args, cfg.QueueName, "*")
for key, val := range values {
args = append(args, key, val)
}
result, err := ds.Redis().Do(ctx, "XADD", args...)
result, err := conn.getClient().Do(ctx, "XADD", args...)
if err != nil {
g.Log().Errorf(ctx, "❌ Redis 发布消息失败: topic=%s, err=%v", cfg.Stream, err)
g.Log().Errorf(ctx, "❌ Redis 发布消息失败: key=%s, err=%v", cfg.QueueName, err)
return err
}
g.Log().Infof(ctx, "✅ Redis 发布消息成功: topic=%s, messageID=%s", cfg.Stream, gconv.String(result))
g.Log().Infof(ctx, "✅ Redis 发布消息成功: key=%s, messageID=%s", cfg.QueueName, gconv.String(result))
return nil
}
// PublishDelayed 发布延迟消息(使用 ZSET
func (q *redisMessageClient) publishDelayed(ctx context.Context, config interface{}, data interface{}, delay int) error {
ds, err := GetManager().GetDefaultDataSource()
if err != nil {
return fmt.Errorf("获取默认数据源失败: %w", err)
}
if !ds.IsConnected() {
if err := ds.Reconnect(ctx); err != nil {
return fmt.Errorf("redis重连失败: %w", err)
}
}
cfg, ok := config.(*RedisConfig)
if !ok {
return fmt.Errorf("无效的redis配置类型")
}
payload, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("序列化数据失败: %w", err)
}
score := float64(time.Now().Add(time.Duration(delay)).UnixMilli())
delayedKey := fmt.Sprintf("delayed:%s", cfg.Stream)
// ZADD delayedKey score payload
_, err = ds.Redis().Do(ctx, "ZADD", delayedKey, score, string(payload))
if err != nil {
return err
}
g.Log().Infof(ctx, "✅ Redis 延迟消息已发布: topic=%s, delay=%v", cfg.Stream, delay)
return nil
}
// Subscribe 订阅消息(支持单个或批量)
func (q *redisMessageClient) subscribe(ctx context.Context, configs ...interface{}) error {
if len(configs) == 0 {
return fmt.Errorf("配置不能为空")
}
for _, config := range configs {
cfg, ok := config.(*RedisConfig)
// Subscribe 订阅消息
func (c *redis) Subscribe(ctx context.Context, msgConfig messageSubscribeConfig) error {
cfg, ok := msgConfig.(*RedisSubscribeMsgConfig)
if !ok {
return fmt.Errorf("无效的 Redis 配置类型")
}
handler := cfg.HandleFunc
if handler == nil {
return fmt.Errorf("必须提供处理函数")
if g.IsEmpty(cfg.QueueName) {
return fmt.Errorf("队列名称不能为空")
}
if err := q.createSubscribe(ctx, cfg, handler); err != nil {
return err
if g.IsEmpty(cfg.ConsumerName) {
return fmt.Errorf("消费者名称不能为空")
}
if g.IsEmpty(cfg.HandleFunc) {
return fmt.Errorf("处理函数不能为空")
}
return nil
return c.createSubscribe(ctx, cfg.QueueName, cfg.ConsumerName, cfg.PrefetchCount, cfg.AutoAck, cfg.HandleFunc)
}
// subscribe 内部单个订阅消息
func (q *redisMessageClient) createSubscribe(ctx context.Context, cfg *RedisConfig, handler func(ctx context.Context, message map[string]interface{}) error) error {
// createSubscribe 内部订阅消息
func (c *redis) createSubscribe(ctx context.Context, key, consumerName string, prefetchCount int, autoAck bool, handler func(ctx context.Context, message map[string]interface{}) error) error {
go func() {
defer func() {
if r := recover(); r != nil {
@@ -174,10 +159,10 @@ func (q *redisMessageClient) createSubscribe(ctx context.Context, cfg *RedisConf
for {
select {
case <-ctx.Done():
g.Log().Infof(ctx, "🔕 Redis 消费者停止: topic=%s", cfg.Stream)
g.Log().Infof(ctx, "🔕 Redis 消费者停止: topic=%s", key)
return
case <-retryTicker.C:
err := q.consumeMessages(ctx, cfg, handler)
err := c.consumeMessages(ctx, key, consumerName, prefetchCount, autoAck, handler)
if err != nil {
// 对于超时错误,返回nil继续循环,而不是返回错误
if strings.Contains(err.Error(), "i/o timeout") || strings.Contains(err.Error(), "timeout") ||
@@ -216,25 +201,25 @@ func (q *redisMessageClient) createSubscribe(ctx context.Context, cfg *RedisConf
}
// consumeMessages 消费消息
func (q *redisMessageClient) consumeMessages(ctx context.Context, cfg *RedisConfig, handler func(ctx context.Context, message map[string]interface{}) error) error {
ds, err := GetManager().GetDefaultDataSource()
func (c *redis) consumeMessages(ctx context.Context, key, consumerName string, prefetchCount int, autoAck bool, handler func(ctx context.Context, message map[string]interface{}) error) error {
conn, err := getDefaultDataSource()
if err != nil {
return fmt.Errorf("获取默认数据源失败: %w", err)
return fmt.Errorf("获取默认连接失败: %w", err)
}
if !ds.IsConnected() {
if err := ds.Reconnect(ctx); err != nil {
if !conn.getIsConnected() {
if err := conn.redisReconnect(ctx); err != nil {
return fmt.Errorf("redis重连失败: %w", err)
}
}
// 检查消费者组是否存在
if err := q.createStreamGroup(ctx, cfg); err != nil {
if err := c.createStreamGroup(ctx, key); err != nil {
return fmt.Errorf("create stream group failed: %w", err)
}
// 使用带重试的命令执行
result, err := ds.Redis().Do(ctx, "XREADGROUP", "GROUP", cfg.Group, cfg.Consumer, "COUNT", cfg.Count, "BLOCK", 0, "STREAMS", cfg.Stream, ">")
result, err := conn.getClient().Do(ctx, "XREADGROUP", "GROUP", "default", consumerName, "COUNT", prefetchCount, "BLOCK", 0, "STREAMS", key, ">")
if err != nil {
if strings.Contains(err.Error(), "i/o timeout") || strings.Contains(err.Error(), "timeout") ||
strings.Contains(err.Error(), "context deadline exceeded") || strings.Contains(err.Error(), "context canceled") {
@@ -242,7 +227,7 @@ func (q *redisMessageClient) consumeMessages(ctx context.Context, cfg *RedisConf
}
return err
}
messages, err := q.parseStreamResult(result)
messages, err := c.parseStreamResult(result)
if err != nil {
return err
}
@@ -254,8 +239,8 @@ func (q *redisMessageClient) consumeMessages(ctx context.Context, cfg *RedisConf
}
// ACK 消息
if cfg.AutoAck {
if err := q.ackMessage(ctx, cfg.Stream, cfg.Group, msg.ID); err != nil {
if autoAck {
if err := c.ackMessage(ctx, key, "default", msg.ID); err != nil {
g.Log().Errorf(ctx, "❌ ACK 消息失败: messageID=%s, err=%v", msg.ID, err)
}
}
@@ -264,15 +249,42 @@ func (q *redisMessageClient) consumeMessages(ctx context.Context, cfg *RedisConf
return nil
}
// ackMessage ACK 消息
func (q *redisMessageClient) ackMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error {
ds, err := GetManager().GetDefaultDataSource()
// createStreamGroup 内部单个创建消费组
func (c *redis) createStreamGroup(ctx context.Context, key string) error {
conn, err := getDefaultDataSource()
if err != nil {
return fmt.Errorf("获取默认数据源失败: %w", err)
return fmt.Errorf("获取默认连接失败: %w", err)
}
if !ds.IsConnected() {
if err := ds.Reconnect(ctx); err != nil {
if !conn.getIsConnected() {
if err := conn.redisReconnect(ctx); err != nil {
return fmt.Errorf("redis重连失败: %w", err)
}
}
groupName := "default"
_, err = conn.getClient().Do(ctx, "XGROUP", "CREATE", key, groupName, "0", "MKSTREAM")
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, "BUSYGROUP") && strings.Contains(errStr, "already exists") {
glog.Infof(ctx, "✅ Redis 消费者组已存在: %s", groupName)
return nil
}
return fmt.Errorf("初始化消费者组失败: %w", err)
}
glog.Infof(ctx, "✅ Redis 消费者组创建成功: %s", groupName)
return nil
}
// ackMessage ACK 消息
func (c *redis) ackMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error {
conn, err := getDefaultDataSource()
if err != nil {
return fmt.Errorf("获取默认连接失败: %w", err)
}
if !conn.getIsConnected() {
if err := conn.redisReconnect(ctx); err != nil {
return fmt.Errorf("redis重连失败: %w", err)
}
}
@@ -282,14 +294,14 @@ func (q *redisMessageClient) ackMessage(ctx context.Context, streamKey, groupNam
for _, id := range messageIDs {
args = append(args, id)
}
_, err = ds.Redis().Do(ctx, "XACK", args...)
_, err = conn.getClient().Do(ctx, "XACK", args...)
return err
}
// parseStreamResult 解析 Stream 结果
func (q *redisMessageClient) parseStreamResult(result interface{}) ([]RedisStreamMessage, error) {
func (c *redis) parseStreamResult(result interface{}) ([]redisStreamMessage, error) {
if result == nil {
return []RedisStreamMessage{}, nil
return []redisStreamMessage{}, nil
}
var resultVal interface{}
@@ -303,15 +315,15 @@ func (q *redisMessageClient) parseStreamResult(result interface{}) ([]RedisStrea
// 检查是否为空
if resultVal == nil {
return []RedisStreamMessage{}, nil
return []redisStreamMessage{}, nil
}
// 预分配切片容量,避免多次扩容
messages := make([]RedisStreamMessage, 0)
messages := make([]redisStreamMessage, 0)
if streamsMap, ok := resultVal.(map[interface{}]interface{}); ok {
for _, streamMsg := range streamsMap {
msgArray, ok := streamMsg.([]interface{})
for _, streamData := range streamsMap {
msgArray, ok := streamData.([]interface{})
if !ok {
continue
}
@@ -332,7 +344,7 @@ func (q *redisMessageClient) parseStreamResult(result interface{}) ([]RedisStrea
values[key] = fieldsArray[i+1]
}
}
messages = append(messages, RedisStreamMessage{
messages = append(messages, redisStreamMessage{
ID: msgID,
Values: values,
})

View File

@@ -6,7 +6,7 @@ import (
"fmt"
"gitee.com/red-future---jilin-g/common/beans"
"gitee.com/red-future---jilin-g/common/message"
"gitee.com/red-future---jilin-g/common/nats"
"gitee.com/red-future---jilin-g/common/redis"
"gitee.com/red-future---jilin-g/common/utils"
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/frame/g"
@@ -20,7 +20,7 @@ import (
func ModuleTenantCheck(r *ghttp.Request) {
// 检查是否是超级管理员
isSuperAdmin := false
if err := nats.CallRPC(r.Context(), "userService.IsSuperAdmin", nil, &isSuperAdmin); err != nil {
if err := message.CallRPC(r.Context(), "userService.IsSuperAdmin", nil, &isSuperAdmin); err != nil {
SetResponseInfo(r.Context(), r, http.StatusPaymentRequired, err)
}
// 如果是超级管理员,则不进行模块租户检查
@@ -33,7 +33,7 @@ func ModuleTenantCheck(r *ghttp.Request) {
SetResponseInfo(r.Context(), r, http.StatusPaymentRequired, err)
}
exit := gconv.Int64(time.Minute * 1)
getEX, err := message.GetRedisClientTest("test").GetEX(r.Context(), fmt.Sprintf("module_tenant:tenantId-%v", getUserInfo.TenantId), gredis.GetEXOption{
getEX, err := redis.GetRedisClientTest("test").GetEX(r.Context(), fmt.Sprintf("module_tenant:tenantId-%v", getUserInfo.TenantId), gredis.GetEXOption{
TTLOption: gredis.TTLOption{
EX: &exit,
},
@@ -68,7 +68,7 @@ func ModuleTenantCheck(r *ghttp.Request) {
ModuleKey: moduleKey,
TenantId: gconv.Uint64(getUserInfo.TenantId),
}
err = nats.CallRPC(r.Context(), "moduleService.Check", &checkReq, checkRes)
err = message.CallRPC(r.Context(), "moduleService.Check", &checkReq, checkRes)
if err != nil {
SetResponseInfo(r.Context(), r, http.StatusPaymentRequired, err)
}

View File

@@ -28,9 +28,9 @@ func getClient() *gredis.Redis {
return redisClient
}
// GetRedisClient 获取 Redis 客户端(供外部使用)
func GetRedisClient() *gredis.Redis {
return getClient()
// getClient 获取 Redis 客户端 临时方法
func GetRedisClientTest(name string) *gredis.Redis {
return g.Redis(name)
}
// RedisClient 获取 Redis 客户端(函数式,确保单例正确初始化)