重构消息队列连接管理,支持多数据源配置

主要变更:
1. 重构NATS、RabbitMQ和Redis连接管理模块,支持多数据源配置
2. 统一连接管理接口,增加数据源名称参数
3. 优化连接状态检查和错误处理
4. 增加连接池管理和资源清理机制
5. 改进日志输出格式和内容
This commit is contained in:
2026-02-04 13:49:17 +08:00
committed by 张斌
parent 69d2ace17f
commit 55a6ec0374
12 changed files with 1339 additions and 1114 deletions

View File

@@ -8,137 +8,160 @@ import (
"github.com/gogf/gf/v2/frame/g"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
var (
nc *nats.Conn
js jetstream.JetStream
natsMu sync.RWMutex
muNats sync.RWMutex
natsConns map[string]*nats.Conn // key: 数据源名称, value: NATS 连接
natsJS map[string]nats.JetStreamContext // key: 数据源名称, value: JetStream 上下文
)
// natsConnect 建立 NATS 连接
func natsConnect(ctx context.Context) error {
natsMu.Lock()
defer natsMu.Unlock()
func init() {
natsConns = make(map[string]*nats.Conn)
natsJS = make(map[string]nats.JetStreamContext)
}
// 安全地关闭旧连接
if oldConn := nc; oldConn != nil && !oldConn.IsClosed() {
// natsConnect 建立 NATS 连接
func natsConnect(ctx context.Context, name string) error {
if g.Cfg().MustGet(ctx, "nats").IsEmpty() {
g.Log().Errorf(ctx, "❌ NATS 配置不存在")
return fmt.Errorf("NATS Configuration does not exist")
}
// 确定数据源名称
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
g.Log().Infof(ctx, "🔔 NATS [%s] 开始创建连接", dsName)
muNats.Lock()
defer muNats.Unlock()
// 安全地关闭旧连接(仅针对该数据源)
if oldConn, exists := natsConns[dsName]; exists && oldConn != nil && !oldConn.IsClosed() {
oldConn.Close()
delete(natsConns, dsName)
delete(natsJS, dsName)
}
// 从配置文件读取 NATS 地址
natsURL := g.Cfg().MustGet(ctx, "nats.url").String()
natsURL := g.Cfg().MustGet(ctx, fmt.Sprintf("nats.%s.url", dsName)).String()
if natsURL == "" {
// 默认使用本地地址
natsURL = nats.DefaultURL
}
// 使用独立的日志上下文,避免使用外部可能被取消的上下文
logCtx := context.Background()
// 连接选项配置
opts := []nats.Option{
nats.Name("goframe-nats-client"),
nats.ReconnectWait(2 * time.Second),
nats.MaxReconnects(-1), // 无限重连
nats.Name(fmt.Sprintf("goframe-nats-client-%s", dsName)),
nats.NoReconnect(),
nats.PingInterval(10 * time.Second),
nats.MaxPingsOutstanding(5),
nats.ReconnectHandler(func(nc *nats.Conn) {
g.Log().Infof(logCtx, "✅ NATS 重连成功: %s", nc.ConnectedUrl())
natsMu.Lock()
defer natsMu.Unlock()
// 重新创建 JetStream 实例
if newJS, err := jetstream.New(nc); err == nil {
js = newJS
}
}),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
g.Log().Warningf(logCtx, "⚠️ NATS 连接断开: %v, 准备重连...", err)
}),
nats.ClosedHandler(func(nc *nats.Conn) {
g.Log().Infof(logCtx, "NATS 连接已关闭: %s", nc.ConnectedUrl())
g.Log().Infof(ctx, "NATS [%s] 连接已关闭: %s", dsName, nc.ConnectedUrl())
}),
nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
g.Log().Errorf(logCtx, "NATS 错误: %v", err)
g.Log().Errorf(ctx, "NATS [%s] 错误: %v", dsName, err)
}),
}
var err error
nc, err = nats.Connect(natsURL, opts...)
newConn, err := nats.Connect(natsURL, opts...)
if err != nil {
return fmt.Errorf("NATS 连接失败: %w", err)
g.Log().Errorf(ctx, "❌ NATS [%s] 连接失败: %v", dsName, err)
return err
}
// 等待连接就绪
if nc.Status() != nats.CONNECTED {
if newConn.Status() != nats.CONNECTED {
select {
case <-time.After(5 * time.Second):
// 连接超时,清理资源
if nc != nil {
nc.Close()
}
newConn.Close()
g.Log().Errorf(ctx, "❌ NATS [%s] 连接超时", dsName)
return fmt.Errorf("NATS 连接超时")
case <-nc.StatusChanged(nats.CONNECTED):
case <-newConn.StatusChanged(nats.CONNECTED):
// 连接成功
g.Log().Infof(ctx, "✅ NATS [%s] 连接成功: %s", dsName, newConn.ConnectedUrl())
case <-ctx.Done():
// 外部上下文被取消,清理资源
if nc != nil {
nc.Close()
}
newConn.Close()
g.Log().Errorf(ctx, "NATS [%s] 连接被取消: %v", dsName, ctx.Err())
return fmt.Errorf("NATS 连接被取消: %w", ctx.Err())
}
}
// 创建 JetStream 实例
js, err = jetstream.New(nc)
newJS, err := newConn.JetStream(nats.MaxWait(10 * time.Second))
if err != nil {
// 创建 JetStream 失败,清理连接
if nc != nil {
nc.Close()
}
return fmt.Errorf("创建 JetStream 失败: %w", err)
newConn.Close()
g.Log().Errorf(ctx, "❌ NATS [%s] 创建 JetStream 失败: %v", dsName, err)
return err
}
g.Log().Infof(ctx, "✅ NATS 连接成功: %s", nc.ConnectedUrl())
// 保存连接和 JetStream 上下文
natsConns[dsName] = newConn
natsJS[dsName] = newJS
return nil
}
// natsPing 检测 NATS 连接状态
func natsPing() bool {
natsMu.RLock()
defer natsMu.RUnlock()
if nc == nil || nc.IsClosed() {
return false
func natsPing(ctx context.Context, name string) bool {
// 确定数据源名称
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
// 使用 NATS 的状态检查
if nc.Status() != nats.CONNECTED {
muNats.RLock()
defer muNats.RUnlock()
nc, exists := natsConns[dsName]
if !exists || nc == nil || nc.IsClosed() || nc.Status() != nats.CONNECTED {
g.Log().Errorf(ctx, "❌ NATS [%s] 连接已关闭或不可用", dsName)
return false
}
g.Log().Infof(ctx, "📊 NATS [%s] 连接正常: %s", dsName, nc.ConnectedUrl())
return true
}
// natsReconnect 重连 NATS
func natsReconnect(ctx context.Context) error {
if err := natsConnect(ctx); err != nil {
return fmt.Errorf("nats重连失败: %w", err)
}
return nil
}
// natsClose 关闭 NATS 连接
func natsClose(ctx context.Context) error {
natsMu.Lock()
defer natsMu.Unlock()
if nc == nil || nc.IsClosed() {
return nil // 连接已经关闭或不存在
func natsClose(ctx context.Context, name string) error {
// 确定数据源名称
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
nc.Close()
g.Log().Infof(ctx, "✅ NATS 连接已关闭")
muNats.Lock()
defer muNats.Unlock()
if nc, exists := natsConns[dsName]; exists && nc != nil && !nc.IsClosed() {
nc.Close()
}
delete(natsConns, dsName)
delete(natsJS, dsName)
g.Log().Infof(ctx, "✅ NATS [%s] 连接已关闭", dsName)
return nil
}
// getNatsConn 获取 NATS 连接(内部使用)
func getNatsConn(name string) *nats.Conn {
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
return natsConns[dsName]
}
// getNatsJS 获取 JetStream 上下文(内部使用)
func getNatsJS(name string) nats.JetStreamContext {
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
return natsJS[dsName]
}

View File

@@ -7,103 +7,158 @@ import (
"github.com/gogf/gf/v2/util/gconv"
amqp "github.com/rabbitmq/amqp091-go"
"sync"
"time"
)
var (
conn *amqp.Connection
channel *amqp.Channel
rabbitmqMu sync.RWMutex
muRabbitMQ sync.RWMutex
rabbitmqConns map[string]*amqp.Connection
rabbitmqChannels map[string]*amqp.Channel
)
// config RabbitMQ 配置
type config struct {
Host string
Port int
Username string
Password string
VHost string
func init() {
rabbitmqConns = make(map[string]*amqp.Connection)
rabbitmqChannels = make(map[string]*amqp.Channel)
}
func rabbitmqConnect(ctx context.Context) error {
rabbitmqMu.Lock()
defer rabbitmqMu.Unlock()
LOOP:
cfg := &config{
Host: g.Cfg().MustGet(ctx, "rabbitmq.host").String(),
Port: g.Cfg().MustGet(ctx, "rabbitmq.port").Int(),
Username: g.Cfg().MustGet(ctx, "rabbitmq.username").String(),
Password: g.Cfg().MustGet(ctx, "rabbitmq.password").String(),
VHost: g.Cfg().MustGet(ctx, "rabbitmq.vhost", "/").String(),
// rabbitmqConnect 建立 RabbitMQ 连接
func rabbitmqConnect(ctx context.Context, name string) error {
if g.Cfg().MustGet(ctx, "rabbitmq").IsEmpty() {
g.Log().Errorf(ctx, "❌ RabbitMQ 配置不存在")
return fmt.Errorf("RabbitMQ Configuration does not exist")
}
// 确定数据源名称
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost
g.Log().Infof(ctx, "🔔 RabbitMQ [%s] 开始创建连接", dsName)
muRabbitMQ.Lock()
defer muRabbitMQ.Unlock()
var err error
conn, err = amqp.Dial(url)
// 安全地关闭旧连接(仅针对该数据源)
if oldConn, exists := rabbitmqConns[dsName]; exists && oldConn != nil && !oldConn.IsClosed() {
oldConn.Close()
}
if oldChannel, exists := rabbitmqChannels[dsName]; exists && oldChannel != nil && !oldChannel.IsClosed() {
oldChannel.Close()
}
delete(rabbitmqConns, dsName)
delete(rabbitmqChannels, dsName)
// 从配置文件读取 RabbitMQ 配置
host := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.host", dsName)).String()
port := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.port", dsName)).Int()
username := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.username", dsName)).String()
password := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.password", dsName)).String()
vHost := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.vhost", dsName), "/").String()
if g.IsEmpty(host) {
return fmt.Errorf("❌ RabbitMQ 配置错误: host 不能为空 (数据源: %s)", dsName)
}
if g.IsEmpty(port) {
return fmt.Errorf("❌ RabbitMQ 配置错误: port 不能为空 (数据源: %s)", dsName)
}
if g.IsEmpty(username) {
return fmt.Errorf("❌ RabbitMQ 配置错误: username 不能为空 (数据源: %s)", dsName)
}
if g.IsEmpty(password) {
return fmt.Errorf("❌ RabbitMQ 配置错误: password 不能为空 (数据源: %s)", dsName)
}
// 构建连接 URL
url := "amqp://" + username + ":" + password + "@" + host + ":" + gconv.String(port) + "/" + vHost
// 创建连接
newConn, err := amqp.Dial(url)
if err != nil {
g.Log().Errorf(ctx, "重连失败: %v", err)
time.Sleep(2 * time.Second)
goto LOOP
g.Log().Errorf(ctx, "❌ RabbitMQ [%s] 连接失败: %v", dsName, err)
return err
}
channel, err = conn.Channel()
// 创建 Channel
newChannel, err := newConn.Channel()
if err != nil {
g.Log().Errorf(ctx, "创建 Channel 失败: %v", err)
time.Sleep(2 * time.Second)
goto LOOP
g.Log().Errorf(ctx, "❌ RabbitMQ [%s] 创建 Channel 失败: %v", dsName, err)
newConn.Close()
return err
}
g.Log().Info(ctx, "RabbitMQ 重连成功")
return nil
}
// 保存连接和 Channel
rabbitmqConns[dsName] = newConn
rabbitmqChannels[dsName] = newChannel
// rabbitmqReconnect 重新连接
func rabbitmqReconnect(ctx context.Context) error {
if err := rabbitmqConnect(ctx); err != nil {
return fmt.Errorf("nats重连失败: %w", err)
}
g.Log().Infof(ctx, "✅ RabbitMQ [%s] 连接成功", dsName)
return nil
}
// rabbitmqPing 检测 RabbitMQ 连接状态
func rabbitmqPing() bool {
rabbitmqMu.RLock()
defer rabbitmqMu.RUnlock()
func rabbitmqPing(ctx context.Context, name string) bool {
// 确定数据源名称
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
if conn == nil || conn.IsClosed() {
muRabbitMQ.RLock()
defer muRabbitMQ.RUnlock()
conn, exists := rabbitmqConns[dsName]
channel, channelExists := rabbitmqChannels[dsName]
if !exists || conn == nil || conn.IsClosed() || !channelExists || channel == nil || channel.IsClosed() {
g.Log().Errorf(ctx, "❌ RabbitMQ [%s] 连接已关闭或不可用", dsName)
return false
}
g.Log().Infof(ctx, "📊 RabbitMQ [%s] 连接正常", dsName)
return true
}
// rabbitmqClose 关闭连接
func rabbitmqClose(ctx context.Context) error {
rabbitmqMu.Lock()
defer rabbitmqMu.Unlock()
// rabbitmqClose 关闭 RabbitMQ 连接
func rabbitmqClose(ctx context.Context, name string) error {
// 确定数据源名称
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
muRabbitMQ.Lock()
defer muRabbitMQ.Unlock()
var lastErr error
if channel != nil {
if channel, exists := rabbitmqChannels[dsName]; exists && channel != nil && !channel.IsClosed() {
if err := channel.Close(); err != nil {
g.Log().Errorf(ctx, "关闭 RabbitMQ Channel 失败: %v", err)
g.Log().Errorf(ctx, " RabbitMQ [%s] 关闭 Channel 失败: %v", dsName, err)
lastErr = err
}
channel = nil
}
delete(rabbitmqChannels, dsName)
if conn != nil {
if conn, exists := rabbitmqConns[dsName]; exists && conn != nil && !conn.IsClosed() {
if err := conn.Close(); err != nil {
g.Log().Errorf(ctx, "关闭 RabbitMQ 连接失败: %v", err)
g.Log().Errorf(ctx, " RabbitMQ [%s] 关闭连接失败: %v", dsName, err)
lastErr = err
}
conn = nil
}
delete(rabbitmqConns, dsName)
g.Log().Info(ctx, "RabbitMQ 连接已关闭")
g.Log().Infof(ctx, "RabbitMQ [%s] 连接已关闭", dsName)
return lastErr
}
// getRabbitMQConn 获取 RabbitMQ 连接(内部使用)
func getRabbitMQConn(name string) *amqp.Connection {
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
return rabbitmqConns[dsName]
}
// getRabbitMQChannel 获取 RabbitMQ Channel内部使用
func getRabbitMQChannel(name string) *amqp.Channel {
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
return rabbitmqChannels[dsName]
}

View File

@@ -1,7 +1,6 @@
// =============================================================================
// Redis 数据源连接管理
// 负责 Redis 数据源的连接、重连、健康检查和优雅关闭
// 支持多数据源和无限重连
// Redis 连接管理
// 负责 Redis 的连接、重连、健康检查和优雅关闭
// =============================================================================
package message
@@ -9,477 +8,191 @@ package message
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/util/gconv"
)
// =============================================================================
// Redis 数据源配置结构
// =============================================================================
var (
muRedis sync.RWMutex
redisConns map[string]*gredis.Redis
redisConfigs map[string]*gredis.Config
)
type redisDataSourceConfig struct {
name string // 数据源名称
address string // Redis 地址,如: 127.0.0.1:6379
db int // 数据库编号
pass string // 密码
maxRetries int // 最大重试次数,-1 表示无限重试
retryInterval time.Duration // 重试间隔
func init() {
redisConns = make(map[string]*gredis.Redis)
redisConfigs = make(map[string]*gredis.Config)
}
// =============================================================================
// Redis 数据源接口
// =============================================================================
type redisDataSource interface {
name() string
getClient() *gredis.Redis
getIsConnected() bool
redisConnect(ctx context.Context) error
redisReconnect(ctx context.Context) error
redisClose(ctx context.Context) error
redisPing(ctx context.Context) bool
}
// =============================================================================
// Redis 数据源实现
// =============================================================================
type baseRedisDataSource struct {
config *redisDataSourceConfig
client *gredis.Redis
isConnected bool
mu sync.RWMutex
lastError error
lastErrorTime time.Time
reconnectMu sync.Mutex
}
func newBaseRedisDataSource(config *redisDataSourceConfig) *baseRedisDataSource {
return &baseRedisDataSource{
config: config,
isConnected: false,
// redisConnect 建立 Redis 连接
// name: 数据源名称,如果为空则使用默认数据源
func redisConnect(ctx context.Context, name string) error {
if g.Cfg().MustGet(ctx, "redis").IsEmpty() {
g.Log().Errorf(ctx, "❌ Redis 配置不存在")
return fmt.Errorf("redis Configuration does not exist")
}
}
func (d *baseRedisDataSource) name() string {
return d.config.name
}
func (d *baseRedisDataSource) getClient() *gredis.Redis {
d.mu.RLock()
defer d.mu.RUnlock()
return d.client
}
func (d *baseRedisDataSource) getIsConnected() bool {
d.mu.RLock()
defer d.mu.RUnlock()
return d.isConnected && d.client != nil
}
func (d *baseRedisDataSource) redisConnect(ctx context.Context) error {
// 使用互斥锁防止并发重连
d.reconnectMu.Lock()
defer d.reconnectMu.Unlock()
d.mu.Lock()
if d.client != nil {
d.client.Close(ctx)
// 确定数据源名称
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
d.mu.Unlock()
g.Log().Infof(ctx, "🔔 Redis [%s] 开始创建连接", dsName)
muRedis.Lock()
defer muRedis.Unlock()
// 安全地关闭旧连接(仅针对该数据源)
if oldRedis, exists := redisConns[dsName]; exists && oldRedis != nil {
oldRedis.Close(ctx)
delete(redisConns, dsName)
}
// 从配置文件读取 Redis 配置
redisAddr := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.address", dsName)).String()
if g.IsEmpty(redisAddr) {
g.Log().Errorf(ctx, "❌ Redis 配置错误: address 不能为空 (数据源: %s)", dsName)
return fmt.Errorf("❌ Redis 配置错误: address 不能为空 (数据源: %s)", dsName)
}
redisDB := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.db", dsName)).Int()
if redisDB < 0 || redisDB > 15 {
g.Log().Errorf(ctx, "❌ Redis 配置错误: db 必须在 0-15 之间 (当前值: %d)", redisDB)
return fmt.Errorf("❌ Redis 配置错误: db 必须在 0-15 之间 (当前值: %d)", redisDB)
}
idleTimeout := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.idleTimeout", dsName)).String()
redisIdleTimeout, err := time.ParseDuration(idleTimeout)
if err != nil {
g.Log().Errorf(ctx, "❌ Redis idleTimeout 格式错误: %v", err)
return err
}
maxConnLifetime := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.maxConnLifetime", dsName)).String()
redisMaxConnLifetime, err := time.ParseDuration(maxConnLifetime)
if err != nil {
g.Log().Errorf(ctx, "❌ Redis maxConnLifetime 格式错误: %v", err)
return err
}
waitTimeout := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.waitTimeout", dsName)).String()
redisWaitTimeout, err := time.ParseDuration(waitTimeout)
if err != nil {
g.Log().Errorf(ctx, "❌ Redis waitTimeout 格式错误: %v", err)
return err
}
dialTimeout := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.dialTimeout", dsName)).String()
redisDialTimeout, err := time.ParseDuration(dialTimeout)
if err != nil {
g.Log().Errorf(ctx, "❌ Redis dialTimeout 格式错误: %v", err)
return err
}
readTimeout := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.readTimeout", dsName)).String()
redisReadTimeout, err := time.ParseDuration(readTimeout)
if err != nil {
g.Log().Errorf(ctx, "❌ Redis readTimeout 格式错误: %v", err)
return err
}
writeTimeout := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.writeTimeout", dsName)).String()
redisWriteTimeout, err := time.ParseDuration(writeTimeout)
if err != nil {
g.Log().Errorf(ctx, "❌ Redis writeTimeout 格式错误: %v", err)
return err
}
maxActive := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.maxActive", dsName)).Int()
if g.IsEmpty(maxActive) {
g.Log().Errorf(ctx, "❌ Redis maxActive 配置错误: %v", maxActive)
return fmt.Errorf("❌ Redis maxActive 配置错误")
}
// 构建 GoFrame Redis 配置
redisConfig := &gredis.Config{
Address: d.config.address,
Db: d.config.db,
Pass: d.config.pass,
Address: redisAddr,
Db: redisDB,
IdleTimeout: redisIdleTimeout,
MaxConnLifetime: redisMaxConnLifetime,
WaitTimeout: redisWaitTimeout,
DialTimeout: redisDialTimeout,
ReadTimeout: redisReadTimeout,
WriteTimeout: redisWriteTimeout,
MaxActive: maxActive,
}
redisConfigs[dsName] = redisConfig
// 使用 GoFrame 的 Redis 连接
redisObj, err := gredis.New(redisConfig)
newRedis, err := gredis.New(redisConfig)
if err != nil {
d.mu.Lock()
d.isConnected = false
d.lastError = err
d.lastErrorTime = time.Now()
d.mu.Unlock()
return fmt.Errorf("datasource [%s] connection failed: %w", d.config.name, err)
g.Log().Errorf(ctx, "❌ Redis [%s] 连接失败: %v", dsName, err)
return err
}
// 测试连接(直接调用避免死锁)
_, err = newRedis.Do(ctx, "PING")
if err != nil {
g.Log().Errorf(ctx, "❌ Redis [%s] 连接失败: ping 失败 - %v", dsName, err)
_ = newRedis.Close(ctx)
return err
}
d.mu.Lock()
d.client = redisObj
d.mu.Unlock()
// 测试连接
if !d.redisPing(ctx) {
d.mu.Lock()
d.isConnected = false
d.lastError = err
d.lastErrorTime = time.Now()
d.mu.Unlock()
return fmt.Errorf("datasource [%s] ping failed: %w", d.config.name, err)
}
d.mu.Lock()
d.isConnected = true
d.lastError = nil
d.mu.Unlock()
glog.Infof(ctx, "✅ datasource [%s] connected successfully", d.config.name)
redisConns[dsName] = newRedis
g.Log().Infof(ctx, "✅ Redis [%s] 连接成功: %s (DB: %d)", dsName, redisAddr, redisDB)
return nil
}
func (d *baseRedisDataSource) redisReconnect(ctx context.Context) error {
glog.Infof(ctx, "🔄 reconnecting datasource [%s]", d.config.name)
return d.redisConnect(ctx)
}
func (d *baseRedisDataSource) redisClose(ctx context.Context) error {
d.mu.Lock()
defer d.mu.Unlock()
if d.client != nil {
if err := d.client.Close(ctx); err != nil {
return fmt.Errorf("datasource [%s] close failed: %w", d.config.name, err)
}
// redisPing 检测 Redis 连接状态(带超时保护)
func redisPing(ctx context.Context, name string) bool {
// 确定数据源名称
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
d.isConnected = false
glog.Infof(ctx, "datasource [%s] closed", d.config.name)
return nil
}
muRedis.RLock()
defer muRedis.RUnlock()
func (d *baseRedisDataSource) redisPing(ctx context.Context) bool {
d.mu.RLock()
client := d.client
d.mu.RUnlock()
if client == nil {
rc, exists := redisConns[dsName]
if !exists || rc == nil {
g.Log().Errorf(ctx, "❌ Redis [%s] 连接未建立", dsName)
return false
}
_, err := client.Do(ctx, "PING")
// 创建带超时的子上下文,避免死锁
timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
_, err := rc.Do(timeoutCtx, "PING")
if err != nil {
g.Log().Errorf(ctx, "❌ Redis [%s] ping 失败: %v", dsName, err)
return false
}
g.Log().Infof(ctx, "📊 Redis [%s] 连接正常", dsName)
return true
}
// =============================================================================
// Redis 多数据源管理器
// =============================================================================
type redisDataSourceManager struct {
sources map[string]redisDataSource
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
started bool
maxRetries int
reconnectCh chan string
}
var (
globalRedisManager *redisDataSourceManager
redisManagerOnce sync.Once
)
// getRedisManager 获取全局 Redis 管理器
func getRedisManager() *redisDataSourceManager {
redisManagerOnce.Do(func() {
ctx, cancel := context.WithCancel(context.Background())
globalRedisManager = &redisDataSourceManager{
sources: make(map[string]redisDataSource),
ctx: ctx,
cancel: cancel,
started: false,
maxRetries: -1, // 默认无限重试
reconnectCh: make(chan string, 100),
}
})
return globalRedisManager
}
// registerDataSource 注册 Redis 数据源
func (m *redisDataSourceManager) registerDataSource(config *redisDataSourceConfig) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, exists := m.sources[config.name]; exists {
return fmt.Errorf("datasource [%s] already exists", config.name)
// redisClose 关闭 Redis 连接
func redisClose(ctx context.Context, name string) error {
// 确定数据源名称
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
source := newBaseRedisDataSource(config)
m.sources[config.name] = source
muRedis.Lock()
defer muRedis.Unlock()
if rc, exists := redisConns[dsName]; exists && rc != nil {
if err := rc.Close(ctx); err != nil {
g.Log().Errorf(ctx, "❌ Redis [%s] 关闭失败: %v", dsName, err)
return err
}
delete(redisConns, dsName)
}
g.Log().Infof(ctx, "✅ Redis [%s] 连接已关闭", dsName)
return nil
}
// getDataSource 获取 Redis 数据源
func (m *redisDataSourceManager) getDataSource(name string) (redisDataSource, error) {
m.mu.RLock()
defer m.mu.RUnlock()
source, exists := m.sources[name]
if !exists {
return nil, fmt.Errorf("datasource [%s] not found", name)
// getRedisConn 获取 Redis 连接(内部使用)
func getRedisConn(name string) *gredis.Redis {
dsName := "default"
if !g.IsEmpty(name) {
dsName = name
}
return source, nil
}
// getAllDataSourceNames 获取所有 Redis 数据源名称
func (m *redisDataSourceManager) getAllDataSourceNames() []string {
m.mu.RLock()
defer m.mu.RUnlock()
names := make([]string, 0, len(m.sources))
for name := range m.sources {
names = append(names, name)
}
return names
}
// initializeFromConfig 从配置初始化 Redis 数据源
func (m *redisDataSourceManager) initializeFromConfig(ctx context.Context) error {
var firstErr error
// 获取 redis 配置下的所有子键
redisConfig := g.Cfg().MustGet(ctx, "redis")
if redisConfig.IsNil() {
glog.Warningf(ctx, "no redis configuration found in config.yml")
return nil
}
// 将配置转换为 map
configMap := redisConfig.Map()
if configMap == nil {
glog.Warningf(ctx, "redis configuration is not a map")
return nil
}
// 遍历所有 redis 子配置
for name, subConfig := range configMap {
// 跳过非对象类型的配置
subMap, ok := subConfig.(map[string]interface{})
if !ok {
continue
}
// 检查是否有 address 配置
address, hasAddress := subMap["address"]
if !hasAddress || gconv.String(address) == "" {
continue
}
// 构建数据源配置
config := &redisDataSourceConfig{
name: name,
address: gconv.String(address),
db: gconv.Int(subMap["db"]),
pass: gconv.String(subMap["pass"]),
maxRetries: gconv.Int(subMap["maxRetries"]),
retryInterval: gconv.Duration(subMap["retryInterval"]),
}
// 设置默认值
if config.maxRetries == 0 {
config.maxRetries = -1 // 默认无限重试
}
if config.retryInterval == 0 {
config.retryInterval = 5 * time.Second
}
// 注册数据源
if err := m.registerDataSource(config); err != nil {
glog.Errorf(ctx, "failed to register datasource [%s]: %v", name, err)
if firstErr == nil {
firstErr = err
}
continue
}
// 连接数据源
source, _ := m.getDataSource(name)
if err := source.redisConnect(ctx); err != nil {
glog.Errorf(ctx, "failed to initialize datasource [%s]: %v", name, err)
if firstErr == nil {
firstErr = err
}
}
}
return firstErr
}
// startHealthCheck 启动健康检查
func (m *redisDataSourceManager) startHealthCheck() {
if m.started {
return
}
m.started = true
// 启动健康检查循环
go m.healthCheckLoop()
// 启动重连处理循环
go m.reconnectLoop()
}
// healthCheckLoop 健康检查循环
func (m *redisDataSourceManager) healthCheckLoop() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
m.checkConnections()
}
}
}
// reconnectLoop 重连处理循环
func (m *redisDataSourceManager) reconnectLoop() {
reconnectCounts := make(map[string]int)
for {
select {
case <-m.ctx.Done():
return
case name := <-m.reconnectCh:
go func(dsName string) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
source, err := m.getDataSource(dsName)
if err != nil {
glog.Errorf(ctx, "datasource [%s] not found for reconnect", dsName)
return
}
if err := source.redisReconnect(ctx); err != nil {
glog.Errorf(ctx, "datasource [%s] reconnect failed: %v", dsName, err)
// 记录重连次数
reconnectCounts[dsName]++
// 检查重连次数限制
if m.maxRetries > 0 && reconnectCounts[dsName] > m.maxRetries {
glog.Errorf(ctx, "datasource [%s] reconnect count %d exceeds limit %d, stopping auto-reconnect",
dsName, reconnectCounts[dsName], m.maxRetries)
return
}
// 延迟后重新放入重连队列
time.Sleep(5 * time.Second)
select {
case m.reconnectCh <- dsName:
default:
// 通道已满,丢弃通知
}
} else {
// 重连成功,重置计数器
reconnectCounts[dsName] = 0
}
}(name)
}
}
}
// checkConnections 检查连接状态
func (m *redisDataSourceManager) checkConnections() {
m.mu.RLock()
defer m.mu.RUnlock()
for name, source := range m.sources {
if !source.getIsConnected() {
glog.Warningf(context.Background(), "datasource [%s] disconnected, queued for reconnect", name)
// 发送到重连队列
select {
case m.reconnectCh <- name:
default:
// 通道已满,丢弃通知
}
}
}
}
// closeAll 关闭所有 Redis 数据源
func (m *redisDataSourceManager) closeAll(ctx context.Context) error {
m.cancel()
m.mu.RLock()
defer m.mu.RUnlock()
var lastErr error
for name, source := range m.sources {
if err := source.redisClose(ctx); err != nil {
glog.Errorf(ctx, "failed to close datasource [%s]: %v", name, err)
lastErr = err
}
}
return lastErr
}
// =============================================================================
// 全局初始化
// =============================================================================
var (
redisManager = getRedisManager()
)
// init 初始化 Redis 数据源
func init() {
ctx := context.Background()
// 从配置初始化多数据源
if err := redisManager.initializeFromConfig(ctx); err != nil {
glog.Errorf(ctx, "❌ Failed to initialize Redis datasources: %v", err)
} else {
glog.Infof(ctx, "✅ Redis datasources initialized: %v", redisManager.getAllDataSourceNames())
}
// 启动健康检查
redisManager.startHealthCheck()
// 设置优雅关闭
setupGracefulShutdown()
}
// setupGracefulShutdown 设置优雅关闭
func setupGracefulShutdown() {
go func() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
glog.Info(ctx, "🔄 Shutting down Redis connections...")
if err := redisManager.closeAll(ctx); err != nil {
glog.Errorf(ctx, "❌ Failed to close Redis connections: %v", err)
} else {
glog.Info(ctx, "✅ Redis connections closed successfully")
}
}()
}
// =============================================================================
// 私有辅助函数
// =============================================================================
// getDefaultDataSource 获取默认数据源
func getDefaultDataSource() (redisDataSource, error) {
return redisManager.getDataSource("default")
return redisConns[dsName]
}

View File

@@ -6,6 +6,10 @@ type messagePublishConfig interface {
GetPublishMsgType()
}
type messagePublishDelayConfig interface {
GetPublishDelayMsgType()
}
type messageSubscribeConfig interface {
GetSubscribeMsgType()
}
@@ -15,12 +19,14 @@ type messageSubscribeConfig interface {
type messageUtil interface {
// Publish 发布消息
Publish(ctx context.Context, msg messagePublishConfig) error
// PublishDelay 发布延迟消息
PublishDelay(ctx context.Context, msg messagePublishDelayConfig) error
// Subscribe 订阅消息
Subscribe(ctx context.Context, msg messageSubscribeConfig) error
// Ping 检测连接状态
ping(ctx context.Context) bool
// Reconnect
reconnect(ctx context.Context) error
Ping(ctx context.Context) bool
// Connect 连
Connect(ctx context.Context) error
// Close 关闭连接
close(ctx context.Context) error
Close(ctx context.Context) error
}

View File

@@ -3,10 +3,10 @@ package message
import (
"context"
"fmt"
"sync"
"time"
"github.com/gogf/gf/v2/frame/g"
"sync"
)
// MessageType 消息队列类型
@@ -32,7 +32,6 @@ type pluginManager struct {
var (
defaultPluginManager = newPluginManager()
// 不再支持默认插件类型,必须显式指定类型
)
// newPluginManager 创建插件管理器
@@ -42,63 +41,6 @@ func newPluginManager() *pluginManager {
}
}
// RegisterPlugin 注册消息队列插件
// 所有插件必须通过此方法注册,自动进行连接检测
// 只有连接成功的插件才会被注册,连接失败的插件不会被注册
// 异步无限重连,只有连接成功了才注册
func registerPlugin(msgType messageType, factory configFactory) error {
if factory == nil {
return fmt.Errorf("factory cannot be nil")
}
// 创建实例
instance := factory()
ctx := context.Background()
// 开启异步连接,无限重连直到成功
go func() {
retryInterval := 2 * time.Second
maxInterval := 30 * time.Second
for {
select {
case <-ctx.Done():
g.Log().Errorf(ctx, "❌ [%s] 注册被取消", msgType)
return
default:
// 尝试连接使用Reconnect方法
if err := instance.reconnect(ctx); err == nil {
// 连接成功,注册插件
if err := defaultPluginManager.register(msgType, instance); err != nil {
g.Log().Errorf(ctx, "❌ [%s] 注册插件失败: %v", msgType, err)
instance.close(ctx)
} else {
g.Log().Infof(ctx, "✅ [%s] 插件注册成功", msgType)
}
return
}
// 连接失败,记录日志并等待重试
g.Log().Warningf(ctx, "⚠️ [%s] 连接失败,%v 后重试...", msgType, retryInterval)
select {
case <-time.After(retryInterval):
// 增加重试间隔,但不超过最大值
retryInterval *= 2
if retryInterval > maxInterval {
retryInterval = maxInterval
}
case <-ctx.Done():
g.Log().Errorf(ctx, "❌ [%s] 注册被取消", msgType)
return
}
}
}
}()
return nil
}
// register 注册插件(内部方法)
func (m *pluginManager) register(msgType messageType, instance messageUtil) error {
m.mu.Lock()
@@ -107,27 +49,66 @@ func (m *pluginManager) register(msgType messageType, instance messageUtil) erro
return nil
}
// GetMsgPlugin 获取消息队列插件
func GetMsgPlugin(msgType messageType) (messageUtil, error) {
defaultPluginManager.mu.RLock()
instance, ok := defaultPluginManager.instances[msgType]
defaultPluginManager.mu.RUnlock()
if !ok {
return nil, fmt.Errorf("unsupported message type: %s", msgType)
// RegisterPlugin 注册消息队列插件
// 所有插件必须通过此方法注册,自动进行连接检测
// 只有连接成功的插件才会被注册,连接失败的插件不会被注册
// 异步无限重连,只有连接成功了才注册
// name: 数据源名称,用于标识不同的连接实例
func RegisterPlugin(ctx context.Context, name string, msgType messageType, factory configFactory) error {
if factory == nil {
g.Log().Errorf(ctx, "❌ factory cannot be nil")
return fmt.Errorf("factory cannot be nil")
}
return instance, nil
// 开启异步连接,无限重试直到成功
go func() {
// 创建实例
instance := factory()
// 创建通知 channel
pluginKey := fmt.Sprintf("%s-%s", msgType, name)
if !instance.Ping(ctx) {
// 使用统一的重连函数
if err := commonConnect(ctx, msgType, name, func(ctx context.Context) error {
return instance.Connect(ctx)
}, func(ctx context.Context) error {
return instance.Close(ctx)
}); err != nil {
g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", msgType, name, err)
return
}
}
// 连接成功,注册插件
defaultPluginManager.mu.Lock()
defaultPluginManager.instances[messageType(pluginKey)] = instance
defaultPluginManager.mu.Unlock()
g.Log().Infof(ctx, "✅ [%s][%s] 插件注册成功", msgType, name)
}()
return nil
}
// GetSupportedTypes 获取所有已注册的插件类型
func GetSupportedTypes() []messageType {
defaultPluginManager.mu.RLock()
defer defaultPluginManager.mu.RUnlock()
types := make([]messageType, 0, len(defaultPluginManager.instances))
for t := range defaultPluginManager.instances {
types = append(types, t)
}
return types
// GetMsgPlugin 获取消息队列插件(默认数据源),如果未注册则等待
func GetMsgPlugin(ctx context.Context, msgType messageType) (messageUtil, error) {
return GetMsgPluginWithName(ctx, msgType, "default")
}
// GetMsgPluginWithName 获取指定数据源的消息队列插件,如果未注册则等待直到超时
func GetMsgPluginWithName(ctx context.Context, msgType messageType, name string) (messageUtil, error) {
pluginKey := fmt.Sprintf("%s-%s", msgType, name)
for {
defaultPluginManager.mu.RLock()
instance, ok := defaultPluginManager.instances[messageType(pluginKey)]
defaultPluginManager.mu.RUnlock()
if ok {
return instance, nil
}
// 未注册,等待一段时间后重试
select {
case <-ctx.Done():
return nil, fmt.Errorf("wait for plugin ready canceled: %s with datasource: %s", msgType, name)
default:
time.Sleep(3 * time.Second)
}
}
}

View File

@@ -6,11 +6,16 @@ import (
"fmt"
"github.com/gogf/gf/v2/frame/g"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"time"
)
type NatsPublishMsgConfig struct {
QueueName string
Durable bool
Data any
}
type NatsPublishDelayMsgConfig struct {
QueueName string
Durable bool
DelayTime int
@@ -19,9 +24,9 @@ type NatsPublishMsgConfig struct {
type NatsSubscribeMsgConfig struct {
QueueName string
ConsumerName string
Durable bool
DelayTime int
ConsumerName string
AutoAck bool
PrefetchCount int
HandleFunc func(ctx context.Context, message map[string]interface{}) error
@@ -31,32 +36,38 @@ func (*NatsPublishMsgConfig) GetPublishMsgType() {
}
func (*NatsPublishDelayMsgConfig) GetPublishDelayMsgType() {
}
func (*NatsSubscribeMsgConfig) GetSubscribeMsgType() {
}
type natsMsg struct {
name string // 数据源名称
}
func init() {
// 注册 Nats 插件,必须使用 RegisterPlugin 确保连接检测
registerPlugin(MessageNATS, func() messageUtil {
return &natsMsg{}
// 注册 Nats 插件(默认数据源)
RegisterPlugin(context.Background(), "default", MessageNATS, func() messageUtil {
return &natsMsg{name: "default"}
})
}
type natsMsg struct{}
// Ping 检测 NATS 连接状态
func (c *natsMsg) ping(_ context.Context) bool {
return natsPing()
// Connect 连接 NATS
func (c *natsMsg) Connect(ctx context.Context) error {
return natsConnect(ctx, c.name)
}
// Reconnect 重连 NATS
func (c *natsMsg) reconnect(ctx context.Context) error {
return natsReconnect(ctx)
// Ping 检测 NATS 连接状态
func (c *natsMsg) Ping(ctx context.Context) bool {
return natsPing(ctx, c.name)
}
// Close 关闭 NATS 连接
func (c *natsMsg) close(ctx context.Context) error {
return natsClose(ctx)
func (c *natsMsg) Close(ctx context.Context) error {
return natsClose(ctx, c.name)
}
// Publish 发布消息
@@ -71,13 +82,31 @@ func (c *natsMsg) Publish(ctx context.Context, msgConfig messagePublishConfig) e
if g.IsEmpty(cfg.Data) {
return fmt.Errorf("必须提供数据")
}
return c.createPublish(ctx, cfg.QueueName, cfg.Durable, 0, cfg.Data)
}
// PublishDelay 发布延迟消息
func (c *natsMsg) PublishDelay(ctx context.Context, msgConfig messagePublishDelayConfig) error {
cfg, ok := msgConfig.(*NatsPublishDelayMsgConfig)
if !ok {
return fmt.Errorf("无效的 NATS 配置类型")
}
if g.IsEmpty(cfg.QueueName) {
return fmt.Errorf("必须提供队列名称")
}
if g.IsEmpty(cfg.DelayTime) {
return fmt.Errorf("延迟时间必须大于 0")
}
if g.IsEmpty(cfg.Data) {
return fmt.Errorf("必须提供数据")
}
return c.createPublish(ctx, cfg.QueueName, cfg.Durable, cfg.DelayTime, cfg.Data)
}
// Publish 发布消息
func (c *natsMsg) createPublish(ctx context.Context, subject string, durable bool, delayTime int, data any) error {
delayMsg := delayTime > 0
if err := c.createStreamGroupInternal(ctx, subject, durable, delayMsg); err != nil {
if err := c.createStream(ctx, subject, durable, delayMsg); err != nil {
return err
}
payload, err := json.Marshal(data)
@@ -85,96 +114,30 @@ func (c *natsMsg) createPublish(ctx context.Context, subject string, durable boo
return fmt.Errorf("序列化数据失败: %w", err)
}
msg := &nats.Msg{
Subject: subject,
Data: payload,
}
m := nats.NewMsg(subject)
m.Data = payload // 所有消息都需要设置数据
if delayMsg {
// 计算目标投递时间
targetTime := time.Now().Add(time.Duration(delayTime) * time.Second)
delayNs := time.Until(targetTime).Nanoseconds()
if delayNs < 0 {
delayNs = 0
}
g.Log().Infof(ctx, "📅 NATS 延迟消息配置: DelayTime=%d秒, TargetTime=%v, DelayNs=%d纳秒(%.2f秒)",
delayTime, targetTime.Format("2006-01-02 15:04:05"), delayNs, float64(delayNs)/float64(time.Second.Nanoseconds()))
// NATS JetStream 延迟消息使用 Nats-Msg-Delay Header纳秒数
msg.Header = nats.Header{
"Nats-Msg-Delay": []string{fmt.Sprintf("%d", delayNs)},
}
g.Log().Infof(ctx, "📅 NATS 延迟消息 Header: %v", msg.Header)
// 获取 Stream 配置验证
streamName, _ := getStreamInfo(durable, delayMsg)
stream, err := js.Stream(ctx, streamName)
if err == nil {
info, _ := stream.Info(ctx)
g.Log().Infof(ctx, "📅 Stream 配置: AllowMsgSchedules=%v, Storage=%v",
info.Config.AllowMsgSchedules, info.Config.Storage)
if !info.Config.AllowMsgSchedules {
g.Log().Errorf(ctx, "❌ Stream 不支持延迟消息AllowMsgSchedules=false")
}
}
// 使用 @at 指定具体延迟时间,而不是 @every 重复执行
futureTime := time.Now().Add(time.Duration(delayTime) * time.Second).Format(time.RFC3339Nano)
m.Header.Set("Nats-Schedule", fmt.Sprintf("@at %s", futureTime))
m.Subject = subject + ".schedule"
m.Header.Set("Nats-Schedule-Target", subject)
g.Log().Infof(ctx, "📅 NATS 延迟消息配置: DelayTime=%ds, Schedule=@at %s, Header=%s", delayTime, futureTime, m.Header)
}
// 发布消息到 JetStream
ack, err := js.PublishMsg(ctx, msg)
js := getNatsJS(c.name)
if js == nil {
g.Log().Errorf(ctx, "❌ NATS [%s] JetStream 不存在", c.name)
return fmt.Errorf("NATS JetStream 不存在")
}
ack, err := js.PublishMsg(m)
if err != nil {
g.Log().Errorf(ctx, "❌ NATS 发布消息失败: err=%v", err)
g.Log().Errorf(ctx, "❌ NATS 发布消息失败: err=%v, Subject=%s", err, m.Subject)
return err
}
g.Log().Infof(ctx, "✅ NATS 发布消息成功: StreamSeq=%d, Domain=%s", ack.Sequence, ack.Domain)
return nil
}
// createStreamGroup 内部创建消费组
func (c *natsMsg) createStreamGroupInternal(ctx context.Context, subject string, durable, delayMsg bool) error {
streamName, storage := getStreamInfo(durable, delayMsg)
// 先检查 Stream 是否存在
stream, err := js.Stream(ctx, streamName)
if err == nil {
// Stream 已存在,检查配置是否匹配
info, _ := stream.Info(ctx)
if info.Config.AllowMsgSchedules != delayMsg || info.Config.Storage != storage {
g.Log().Infof(ctx, "🔄 Stream 配置不匹配,正在重新创建: stream=%s, 当前AllowMsgSchedules=%v, 需要%v",
streamName, info.Config.AllowMsgSchedules, delayMsg)
// 删除旧 Stream
if err := js.DeleteStream(ctx, streamName); err != nil {
g.Log().Warningf(ctx, "删除旧 Stream 失败: %v", err)
}
} else {
g.Log().Infof(ctx, "✅ Stream 已存在且配置正确: stream=%s", streamName)
return nil
}
}
// 构建流配置
jsConfig := jetstream.StreamConfig{
Name: streamName,
Subjects: []string{subject},
AllowMsgSchedules: delayMsg, // 延迟消息核心开关
Storage: storage,
Discard: jetstream.DiscardOld, // 达到上限删除旧消息
}
stream, err = js.CreateStream(ctx, jsConfig)
if err != nil {
return fmt.Errorf("创建任务流失败: %w", err)
}
// 获取 Stream 信息验证配置
info, err := stream.Info(ctx)
if err == nil {
g.Log().Infof(ctx, "✅ NATS 队列初始化成功: stream=%s, AllowMsgSchedules=%v, Storage=%v",
streamName, info.Config.AllowMsgSchedules, info.Config.Storage)
}
g.Log().Infof(ctx, "✅ NATS 队列初始化成功: stream=%s", streamName)
g.Log().Infof(ctx, "✅ NATS 发布消息成功: Stream=%v, StreamSeq=%d", ack.Stream, ack.Sequence)
return nil
}
@@ -196,110 +159,215 @@ func (c *natsMsg) Subscribe(ctx context.Context, msgConfig messageSubscribeConfi
if g.IsEmpty(cfg.PrefetchCount) {
cfg.PrefetchCount = 1
}
return c.createSubscribeInternal(ctx, cfg.QueueName, cfg.ConsumerName, cfg.PrefetchCount, cfg.AutoAck, cfg.Durable, cfg.DelayTime, cfg.HandleFunc)
return c.createSubscribe(ctx, cfg.QueueName, cfg.ConsumerName, cfg.PrefetchCount, cfg.DelayTime, cfg.AutoAck, cfg.Durable, cfg.HandleFunc)
}
// createSubscribe 内部订阅消息
func (c *natsMsg) createSubscribeInternal(ctx context.Context, subject, consumerName string, prefetchCount int, autoAck, durable bool, delayTime int, handler func(ctx context.Context, message map[string]interface{}) error) error {
func (c *natsMsg) createSubscribe(ctx context.Context, subject, consumerName string, prefetchCount, delayTime int, autoAck, durable bool, handler func(ctx context.Context, message map[string]any) error) error {
g.Log().Infof(ctx, "🔔 NATS 开始订阅: QueueName=%s, ConsumerName=%s", subject, consumerName)
delayMsg := delayTime > 0
streamName, _ := getStreamInfo(durable, delayMsg)
// 确保 Stream 存在,如果不存在则创建
if err := c.createStreamGroupInternal(ctx, subject, durable, delayMsg); err != nil {
g.Log().Errorf(ctx, "创建 Stream 失败: %v", err)
return fmt.Errorf("创建 Stream 失败: %w", err)
}
// Stream 不存在,创建新的
ackPolicy := jetstream.AckExplicitPolicy
if autoAck {
ackPolicy = jetstream.AckNonePolicy
}
jsConfig := jetstream.ConsumerConfig{
Name: consumerName,
Durable: consumerName,
FilterSubject: subject,
AckPolicy: ackPolicy,
MaxDeliver: 3,
MaxAckPending: prefetchCount,
}
// 创建新消费者
consumer, err := js.CreateOrUpdateConsumer(ctx, streamName, jsConfig)
if err != nil {
g.Log().Errorf(ctx, "创建消费者失败: %v", err)
return err
}
// 获取消费者信息验证
if cInfo, err := consumer.Info(ctx); err == nil {
g.Log().Infof(ctx, "🔔 消费者创建成功: %s, AckPolicy=%v, MaxAckPending=%d",
cInfo.Name, cInfo.Config.AckPolicy, cInfo.Config.MaxAckPending)
}
// 创建消息处理函数
msgHandler := func(msg jetstream.Msg) {
// 记录消息接收时间
now := time.Now()
meta, err := msg.Metadata()
if err == nil {
g.Log().Infof(ctx, "📨 收到消息: StreamSeq=%d, Published=%v, Received=%v, 距离发布=%.2f秒",
meta.Sequence.Stream,
meta.Timestamp.Format("2006-01-02 15:04:05"),
now.Format("2006-01-02 15:04:05"),
now.Sub(meta.Timestamp).Seconds())
}
// 解析消息
// 创建推送订阅的回调函数
msgHandler := func(msg *nats.Msg) {
var data map[string]any
if err := json.Unmarshal(msg.Data(), &data); err != nil {
g.Log().Errorf(ctx, "解析消息失败: %v", err)
if err := msg.Nak(); err != nil {
g.Log().Errorf(ctx, "Nak 失败: %v", err)
}
if err := json.Unmarshal(msg.Data, &data); err != nil {
g.Log().Errorf(ctx, "解析消息失败: %v", err)
return
}
g.Log().Infof(ctx, "📨 收到消息: Subject=%s, Data=%v", msg.Subject, data)
// 处理业务逻辑
if err := handler(ctx, data); err != nil {
g.Log().Errorf(ctx, "处理消息失败: %v", err)
if err := msg.Nak(); err != nil {
g.Log().Errorf(ctx, "Nak 失败: %v", err)
g.Log().Errorf(ctx, "处理消息失败: %v", err)
if !autoAck {
if err := msg.Nak(); err != nil {
g.Log().Errorf(ctx, "❌ Nak 失败: %v", err)
return
}
return
}
} else {
g.Log().Infof(ctx, "✅ 处理消息成功")
}
if err := msg.Ack(); err != nil {
g.Log().Errorf(ctx, "❌ Ack 失败: %v", err)
}
}
delayMsg := delayTime > 0
// 创建流
if err := c.createStream(ctx, subject, durable, delayMsg); err != nil {
return err
}
// 获取 JetStream 上下文
js := getNatsJS(c.name)
if js == nil {
g.Log().Errorf(ctx, "❌ NATS [%s] JetStream 不存在", c.name)
return fmt.Errorf("NATS JetStream 不存在")
}
// 创建推送订阅
var sub *nats.Subscription
var err error
// 配置订阅选项 - 使用 DeliverSubject 创建 Push Consumer
subOpts := []nats.SubOpt{
nats.Durable(consumerName),
nats.MaxAckPending(prefetchCount),
nats.DeliverSubject(consumerName),
}
if !autoAck {
subOpts = append(subOpts, nats.ManualAck())
}
// 使用 Subscribe 创建推送订阅
sub, err = js.Subscribe(subject, msgHandler, subOpts...)
if err != nil {
g.Log().Errorf(ctx, "创建推送订阅失败: %v", err)
return err
}
g.Log().Infof(ctx, "✅ NATS 推送订阅成功: Consumer=%s", consumerName)
// 启动后台 goroutine 监听上下文取消,用于清理订阅
go func() {
<-ctx.Done()
g.Log().Infof(ctx, "订阅上下文取消,取消订阅")
if err := sub.Unsubscribe(); err != nil {
return
}
g.Log().Infof(ctx, "处理消息成功")
if !autoAck {
if err := msg.Ack(); err != nil {
g.Log().Errorf(ctx, "Ack 失败: %v", err)
}
}
}
// 开始消费
_, err = consumer.Consume(msgHandler)
if err != nil {
return fmt.Errorf("开始消费失败: %w", err)
}
g.Log().Infof(ctx, "✅ NATS 订阅成功")
}()
return nil
}
func getStreamInfo(durable, delayMsg bool) (string, jetstream.StorageType) {
// createStream 内部创建消费组
func (c *natsMsg) createStream(ctx context.Context, subject string, durable, delayMsg bool) error {
streamName, storage := getStreamInfo(durable, delayMsg)
// 构建流配置
// 如果是延迟消息,需要包含两个 subjects:
// 1. subject.schedule - 用于发送调度消息
// 2. subject - 用于实际投递目标
subjects := []string{subject}
if delayMsg {
subjects = []string{subject, subject + ".schedule"}
}
jsConfig := &StreamConfig{
Name: streamName,
Subjects: subjects,
AllowMsgSchedules: delayMsg, // 延迟消息核心开关
Storage: storage,
Discard: DiscardNew, // 达到上限删除旧消息
}
nc := getNatsConn(c.name)
if !c.Ping(ctx) {
// 使用统一的重连函数
if err := commonConnect(ctx, MessageNATS, c.name, func(ctx context.Context) error {
return c.Connect(ctx)
}, func(ctx context.Context) error {
return c.Close(ctx)
}); err != nil {
g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageNATS, c.name, err)
return err
}
}
if nc == nil {
g.Log().Errorf(ctx, "❌ NATS [%s] 连接不存在", c.name)
return fmt.Errorf("NATS 连接不存在")
}
err := jsStreamCreate(nc, jsConfig)
if err != nil {
g.Log().Errorf(ctx, "❌ 创建 Stream 失败: err=%v", err)
return err
}
g.Log().Infof(ctx, "✅ 创建 Stream 成功: stream=%s, subjects=%v, allowSchedules=%v", streamName, subjects, delayMsg)
return nil
}
func getStreamInfo(durable, delayMsg bool) (string, StorageType) {
// Stream 不存在,创建新的
streamName := "ordinary_msg_memory"
storage := jetstream.MemoryStorage
storage := MemoryStorage
// 延迟消息必须使用 FileStorageNATS 官方要求)
if delayMsg {
streamName = "delay_msg_file"
storage = jetstream.FileStorage
if durable {
streamName = "delay_msg_file"
storage = FileStorage
} else {
streamName = "delay_msg_memory"
storage = MemoryStorage
}
} else {
if durable {
streamName = "ordinary_msg_file"
storage = jetstream.FileStorage
storage = FileStorage
}
}
return streamName, storage
}
const (
// JSApiStreamCreateT is the endpoint to create new streams.
// Will return JSON response.
JSApiStreamCreateT = "$JS.API.STREAM.CREATE.%s"
// JSApiStreamUpdateT is the endpoint to update existing streams.
// Will return JSON response.
JSApiStreamUpdateT = "$JS.API.STREAM.UPDATE.%s"
)
// jsStreamCreate is for sending a stream create for fields that nats.go does not know about yet.
func jsStreamCreate(nc *nats.Conn, cfg *StreamConfig) error {
j, err := json.Marshal(cfg)
if err != nil {
return err
}
msg, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), j, time.Second*3)
if err != nil {
return err
}
// 检查 API 响应中的错误
var resp struct {
Error *struct {
Code int `json:"code"`
ErrCode int `json:"err_code"`
Description string `json:"description"`
} `json:"error,omitempty"`
}
if err := json.Unmarshal(msg.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
// 如果 Stream 已存在,尝试更新
if resp.Error.ErrCode == 10058 { // JSStreamNameExistErr
return jsStreamUpdate(nc, cfg)
}
return fmt.Errorf("JS API error: %s", resp.Error.Description)
}
return nil
}
// jsStreamUpdate is for sending a stream create for fields that nats.go does not know about yet.
func jsStreamUpdate(nc *nats.Conn, cfg *StreamConfig) error {
j, err := json.Marshal(cfg)
if err != nil {
return err
}
msg, err := nc.Request(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name), j, time.Second*3)
if err != nil {
return err
}
// 检查 API 响应中的错误
var resp struct {
Error *struct {
Code int `json:"code"`
ErrCode int `json:"err_code"`
Description string `json:"description"`
} `json:"error,omitempty"`
}
if err := json.Unmarshal(msg.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
return fmt.Errorf("JS API error: %s", resp.Error.Description)
}
return nil
}

View File

@@ -31,6 +31,9 @@ var (
traceCancelMu sync.RWMutex
// 取消主题前缀
cancelSubjectPrefix = "ctx.cancel.otel."
// RPC 使用的默认数据源名称
rpcDefaultDatasource = "default"
)
// rpcHandler RPC 处理函数类型
@@ -42,7 +45,7 @@ type rpcHandler func(ctx context.Context, req []byte) (any, error)
// serviceName: 服务名称,调用方通过此名称调用服务
// handler: 服务处理函数,接收请求并返回响应
func registerRPCService(serviceName string, handler rpcHandler) (err error) {
if !natsPing() {
if !natsPing(context.Background(), rpcDefaultDatasource) {
return fmt.Errorf("NATS 未连接")
}
@@ -63,6 +66,11 @@ func registerRPCService(serviceName string, handler rpcHandler) (err error) {
rpcServicesMu.Unlock()
// 订阅服务主题
nc := getNatsConn(rpcDefaultDatasource)
if nc == nil {
return fmt.Errorf("NATS 连接不存在")
}
subject := fmt.Sprintf("rpc.%s", serviceName)
sub, err := nc.Subscribe(subject, func(msg *nats.Msg) {
// 执行处理函数
@@ -84,7 +92,7 @@ func registerRPCService(serviceName string, handler rpcHandler) (err error) {
// queueName: 队列组名,同一队列组的实例共享请求
// handler: 服务处理函数
func registerQueueRPCService(serviceName, queueName string, handler rpcHandler) (err error) {
if !natsPing() {
if !natsPing(context.Background(), rpcDefaultDatasource) {
return fmt.Errorf("NATS 未连接")
}
@@ -111,6 +119,11 @@ func registerQueueRPCService(serviceName, queueName string, handler rpcHandler)
queueRPCMu.Unlock()
// 订阅服务主题(队列模式)
nc := getNatsConn(rpcDefaultDatasource)
if nc == nil {
return fmt.Errorf("NATS 连接不存在")
}
subject := fmt.Sprintf("rpc.%s", serviceName)
sub, err := nc.QueueSubscribe(subject, queueName, func(msg *nats.Msg) {
// 执行处理函数
@@ -209,7 +222,7 @@ func createCancelContext(ctx context.Context, traceID string) context.Context {
//
// sub, err := nats.SetupCancelListener(ctx)
func setupCancelListener(ctx context.Context) (*nats.Subscription, error) {
if !natsPing() {
if !natsPing(ctx, rpcDefaultDatasource) {
return nil, fmt.Errorf("NATS 未连接")
}
@@ -219,6 +232,11 @@ func setupCancelListener(ctx context.Context) (*nats.Subscription, error) {
// 修复问题3订阅取消主题格式: ctx.cancel.otel.*
// 使用 * 通配符而不是 >,因为 TraceID 是最后一部分
nc := getNatsConn(rpcDefaultDatasource)
if nc == nil {
return nil, fmt.Errorf("NATS 连接不存在")
}
cancelSubject := cancelSubjectPrefix + "*"
sub, err := nc.Subscribe(cancelSubject, func(msg *nats.Msg) {
// 从主题中解析 TraceID (去除前缀)
@@ -261,7 +279,7 @@ func setupCancelListener(ctx context.Context) (*nats.Subscription, error) {
//
// err := nats.publishCancel(ctx, traceID)
func publishCancel(ctx context.Context, traceID string) error {
if !natsPing() {
if !natsPing(ctx, rpcDefaultDatasource) {
return fmt.Errorf("NATS 未连接")
}
@@ -269,6 +287,11 @@ func publishCancel(ctx context.Context, traceID string) error {
return fmt.Errorf("TraceID 不能为空")
}
nc := getNatsConn(rpcDefaultDatasource)
if nc == nil {
return fmt.Errorf("NATS 连接不存在")
}
cancelSubject := cancelSubjectPrefix + traceID
err := nc.Publish(cancelSubject, nil)
if err != nil {
@@ -303,7 +326,7 @@ func cleanupTraceCancel(traceID string) {
// req: 请求数据
// 返回: 响应数据(任意类型)和错误
func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err error) {
if !natsPing() {
if !natsPing(ctx, rpcDefaultDatasource) {
return fmt.Errorf("NATS 未连接")
}
@@ -406,6 +429,11 @@ func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err er
}
// 发送请求
nc := getNatsConn(rpcDefaultDatasource)
if nc == nil {
return fmt.Errorf("NATS 连接不存在")
}
responseMsg, err := nc.RequestMsgWithContext(ctx, msg)
// 关闭 done channel通知 goroutine 退出
@@ -475,7 +503,7 @@ func WithExcludeMethods(methods ...string) registerServiceOption {
// }, WithQueueGroup("order-group"))
func AutoRegisterServices(ctx context.Context, serviceInstances map[string]interface{}, options ...registerServiceOption) error {
// 先注册 RPC 服务(如果 NATS 不可用则记录警告但不阻塞启动)
if !natsPing() {
if !natsPing(ctx, rpcDefaultDatasource) {
return fmt.Errorf("NATS 未连接RPC 服务未注册")
}
@@ -512,7 +540,7 @@ func AutoRegisterServices(ctx context.Context, serviceInstances map[string]inter
// registerService 注册单个服务的所有公开方法(内部函数)
func registerService(service interface{}, serviceNamePrefix string, options ...registerServiceOption) (err error) {
if !natsPing() {
if !natsPing(context.Background(), rpcDefaultDatasource) {
return fmt.Errorf("NATS 未连接")
}

View File

@@ -11,6 +11,12 @@ import (
)
type RabbitMQPublishMsgConfig struct {
QueueName string
Durable bool
Data any
}
type RabbitMQPublishDelayMsgConfig struct {
QueueName string
Durable bool
DelayTime int
@@ -19,8 +25,6 @@ type RabbitMQPublishMsgConfig struct {
type RabbitMQSubscribeMsgConfig struct {
QueueName string
Durable bool
DelayTime int
ConsumerName string
AutoAck bool
PrefetchCount int
@@ -31,32 +35,36 @@ func (*RabbitMQPublishMsgConfig) GetPublishMsgType() {
}
func (*RabbitMQPublishDelayMsgConfig) GetPublishDelayMsgType() {}
func (*RabbitMQSubscribeMsgConfig) GetSubscribeMsgType() {
}
func init() {
// 注册 RabbitMQ 插件,必须使用 RegisterPlugin 确保连接检测
//registerPlugin(MessageRabbitMQ, func() messageUtil {
// return &rabbitMQ{}
//})
type rabbitMQ struct {
name string // 数据源名称
}
type rabbitMQ struct{}
func init() {
// 注册 RabbitMQ 插件(默认数据源)
RegisterPlugin(context.Background(), "default", MessageRabbitMQ, func() messageUtil {
return &rabbitMQ{name: "default"}
})
}
// Connect 连接 RabbitMQ
func (c *rabbitMQ) Connect(ctx context.Context) error {
return rabbitmqConnect(ctx, c.name)
}
// Ping 检测 RabbitMQ 连接状态
func (c *rabbitMQ) ping(ctx context.Context) bool {
return rabbitmqPing()
}
// Reconnect 重连 RabbitMQ
func (c *rabbitMQ) reconnect(ctx context.Context) error {
return rabbitmqReconnect(ctx)
func (c *rabbitMQ) Ping(ctx context.Context) bool {
return rabbitmqPing(ctx, c.name)
}
// Close 关闭 RabbitMQ 连接
func (c *rabbitMQ) close(ctx context.Context) error {
return rabbitmqClose(ctx)
func (c *rabbitMQ) Close(ctx context.Context) error {
return rabbitmqClose(ctx, c.name)
}
// Publish 发布消息
@@ -71,11 +79,43 @@ func (c *rabbitMQ) Publish(ctx context.Context, msgConfig messagePublishConfig)
if cfg.Data == nil {
return fmt.Errorf("数据不能为空")
}
return c.publishMessageInternal(ctx, cfg.QueueName, cfg.Durable, 0, cfg.Data)
}
// PublishDelay 发布延迟消息
func (c *rabbitMQ) PublishDelay(ctx context.Context, msgConfig messagePublishDelayConfig) error {
cfg, ok := msgConfig.(*RabbitMQPublishDelayMsgConfig)
if !ok {
return fmt.Errorf("无效的 RabbitMQ 配置类型")
}
if g.IsEmpty(cfg.QueueName) {
return fmt.Errorf("队列名称不能为空")
}
if cfg.Data == nil {
return fmt.Errorf("数据不能为空")
}
return c.publishMessageInternal(ctx, cfg.QueueName, cfg.Durable, cfg.DelayTime, cfg.Data)
}
// publishMessage 发布消息内部实现
func (c *rabbitMQ) publishMessageInternal(ctx context.Context, queueName string, durable bool, delayTime int, data interface{}) error {
if !c.Ping(ctx) {
if err := commonConnect(ctx, MessageRabbitMQ, c.name, func(ctx context.Context) error {
return c.Connect(ctx)
}, func(ctx context.Context) error {
return c.Close(ctx)
}); err != nil {
g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageRabbitMQ, c.name, err)
return err
}
}
channel := getRabbitMQChannel(c.name)
if channel == nil || channel.IsClosed() {
g.Log().Errorf(ctx, "❌ RabbitMQ [%s] Channel 不存在或已关闭", c.name)
return fmt.Errorf("RabbitMQ Channel 不存在或已关闭")
}
delayMsg := delayTime > 0
// 1. 决定 Exchange 类型
@@ -86,12 +126,12 @@ func (c *rabbitMQ) publishMessageInternal(ctx context.Context, queueName string,
if delayMsg {
exchangeType = "x-delayed-message"
exchangeName = queueName + ".delayed"
args["x-delayed-type"] = "fanout" // 底层用 topic
args["x-delayed-type"] = "fanout"
}
// 2. 声明 Exchange只声明一次
// 2. 声明 Exchange使用 exchangeName 而不是 queueName
if err := channel.ExchangeDeclare(
queueName, // exchange 交换机名称
exchangeName, // 修复:使用正确的交换机名称
exchangeType,
durable,
false, // autoDelete
@@ -99,7 +139,8 @@ func (c *rabbitMQ) publishMessageInternal(ctx context.Context, queueName string,
false, // noWait
args,
); err != nil {
return fmt.Errorf("声明 Exchange 失败: %w", err)
g.Log().Errorf(ctx, "❌ 声明 Exchange 失败: %v", err)
return err
}
// 3. 声明队列
@@ -111,7 +152,8 @@ func (c *rabbitMQ) publishMessageInternal(ctx context.Context, queueName string,
false, // noWait
nil, // args
); err != nil {
return fmt.Errorf("声明队列失败: %w", err)
g.Log().Errorf(ctx, "❌ 声明队列失败: %v", err)
return err
}
// 4. 绑定队列
@@ -122,13 +164,15 @@ func (c *rabbitMQ) publishMessageInternal(ctx context.Context, queueName string,
false, // noWait
nil, // args
); err != nil {
return fmt.Errorf("绑定队列失败: %w", err)
g.Log().Errorf(ctx, "❌ 绑定队列失败: %v", err)
return err
}
// 5. 序列化数据
body, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("序列化数据失败: %w", err)
g.Log().Errorf(ctx, "❌ 序列化数据失败: %v", err)
return err
}
// 6. 发布消息
deliveryMode := amqp.Transient
@@ -142,9 +186,9 @@ func (c *rabbitMQ) publishMessageInternal(ctx context.Context, queueName string,
Timestamp: time.Now(),
}
if delayMsg {
duration := time.Duration(delayTime) * time.Minute
duration := delayTime * 1000 // 延迟时间(毫秒)= 秒 * 1000
publishing.Headers = amqp.Table{
"x-delay": duration, // 延迟时间(毫秒)
"x-delay": duration,
}
}
err = channel.PublishWithContext(
@@ -154,6 +198,11 @@ func (c *rabbitMQ) publishMessageInternal(ctx context.Context, queueName string,
false, false,
publishing,
)
if err != nil {
g.Log().Errorf(ctx, "❌ 发布消息失败: %v", err)
return err
}
g.Log().Infof(ctx, "📨 发布消息成功: queueName=%s, data=%v", queueName, data)
return err
}
@@ -180,10 +229,28 @@ func (c *rabbitMQ) Subscribe(ctx context.Context, msgConfig messageSubscribeConf
// createSubscribe 内部订阅消息
func (c *rabbitMQ) createSubscribeInternal(ctx context.Context, queueName, consumerName string, prefetchCount int, autoAck bool, handler func(ctx context.Context, message map[string]interface{}) error) error {
g.Log().Infof(ctx, "🔔 RabbitMQ 开始订阅: queueName=%s, consumerName=%s", queueName, consumerName)
g.Log().Infof(ctx, "🔔 RabbitMQ [%s] 开始订阅: queueName=%s, consumerName=%s", c.name, queueName, consumerName)
if !c.Ping(ctx) {
if err := commonConnect(ctx, MessageRabbitMQ, c.name, func(ctx context.Context) error {
return c.Connect(ctx)
}, func(ctx context.Context) error {
return c.Close(ctx)
}); err != nil {
g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageRabbitMQ, c.name, err)
return err
}
}
channel := getRabbitMQChannel(c.name)
if channel == nil || channel.IsClosed() {
g.Log().Errorf(ctx, "❌ RabbitMQ [%s] Channel 不存在或已关闭", c.name)
return fmt.Errorf("RabbitMQ Channel 不存在或已关闭")
}
if err := channel.Qos(prefetchCount, 0, false); err != nil {
return fmt.Errorf("设置 Qos 失败: %w", err)
g.Log().Errorf(ctx, "❌ 设置 Qos 失败: %v", err)
return err
}
g.Log().Infof(ctx, "📊 设置 Prefetch Count: %d", prefetchCount)
@@ -197,97 +264,48 @@ func (c *rabbitMQ) createSubscribeInternal(ctx context.Context, queueName, consu
nil, // args
)
if err != nil {
return fmt.Errorf("注册消费者失败: %w", err)
g.Log().Errorf(ctx, "❌ 消费消息失败: %v", err)
return err
}
go func() {
defer func() {
if r := recover(); r != nil {
g.Log().Errorf(ctx, "❌ RabbitMQ 消费者 panic: %v", r)
g.Log().Infof(ctx, "👀 开始监听消息")
for {
select {
case <-ctx.Done():
// Context 取消,退出
g.Log().Infof(ctx, "context cancel 监听消息退出")
return nil
case m, ok := <-msg:
if !ok {
// Channel 关闭,退出
g.Log().Infof(ctx, "channel close 监听消息退出")
return nil
}
}()
g.Log().Infof(ctx, "📨 收到消息: %s", string(m.Body))
// 并发控制信号量
semaphore := make(chan struct{}, 10) // 限制最大并发数为 10
for {
select {
case <-ctx.Done():
g.Log().Infof(ctx, "🔕 RabbitMQ 消费者停止: queueName=%s, consumerName=%s", queueName, consumerName)
return
case msg, ok := <-msg:
if !ok {
g.Log().Warningf(ctx, "⚠️ RabbitMQ 消息通道关闭")
return
var data map[string]interface{}
if err := json.Unmarshal(m.Body, &data); err != nil {
// 如果不是 JSON直接使用原始内容
data = map[string]interface{}{
"data": string(m.Body),
}
// 获取并发控制槽位
semaphore <- struct{}{}
go func(m amqp.Delivery) {
defer func() {
<-semaphore // 释放槽位
if r := recover(); r != nil {
g.Log().Errorf(ctx, "❌ 消息处理 panic: %v", r)
}
}()
if err := c.handleMessageWithRetryInternal(ctx, m, handler, autoAck); err != nil {
g.Log().Errorf(ctx, "❌ 消息处理失败(重试次数耗尽): %v", err)
// 仅在手动 ACK 模式下拒绝消息
if !autoAck {
// 拒绝消息不再重新入队(避免死循环)
m.Nack(false, false)
}
return
}
// 仅在手动 ACK 模式下确认消息
if autoAck {
if err := m.Ack(false); err != nil {
g.Log().Errorf(ctx, "❌ ACK 消息失败: %v", err)
}
}
}(msg)
}
err := handler(ctx, data)
if err != nil {
g.Log().Errorf(ctx, "❌ 消息处理失败: %v", err)
// 仅在手动 ACK 模式下拒绝消息
if !autoAck {
// 拒绝消息不再重新入队(避免死循环)
m.Nack(false, false)
continue
}
}
g.Log().Infof(ctx, "✅ 消息处理成功: %v", err)
// 仅在手动 ACK 模式下确认消息
if err := m.Ack(false); err != nil {
g.Log().Errorf(ctx, "❌ AUTO ACK 消息失败: %v", err)
} else {
g.Log().Infof(ctx, "✅ AUTO ACK 消息成功")
}
}
}()
return nil
}
// handleMessageWithRetry 处理消息(支持重试)
func (c *rabbitMQ) handleMessageWithRetryInternal(ctx context.Context, msg amqp.Delivery, handler func(ctx context.Context, message map[string]interface{}) error, autoAck bool) error {
var data map[string]interface{}
if err := json.Unmarshal(msg.Body, &data); err != nil {
// 如果不是 JSON直接使用原始内容
data = map[string]interface{}{
"data": string(msg.Body),
}
}
// 重试逻辑
const maxRetry = 3
for attempt := 0; attempt <= maxRetry; attempt++ {
if attempt > 0 {
g.Log().Infof(ctx, "🔄 消息处理重试 (第%d次)", attempt)
// 指数退避
time.Sleep(time.Duration(attempt) * time.Second)
}
err := handler(ctx, data)
if err == nil {
return nil // 成功
}
g.Log().Warningf(ctx, "⚠️ 消息处理失败 (第%d次): %v", attempt+1, err)
if attempt == maxRetry {
return fmt.Errorf("达到最大重试次数 %d: %w", maxRetry, err)
}
}
return nil
}

73
message/reconnect.go Normal file
View File

@@ -0,0 +1,73 @@
package message
import (
"context"
"fmt"
"strings"
"time"
"github.com/gogf/gf/v2/frame/g"
)
// connectFunc 连接函数类型
type connectFunc func(ctx context.Context) error
// closeFunc 关闭函数类型
type closeFunc func(ctx context.Context) error
// reconnectOption 重连选项
type reconnectOption struct {
maxRetries int // 最大重试次数0 表示无限重试
interval time.Duration // 重试间隔
componentType messageType // 组件类型nats/redis/rabbitmq
componentName string // 组件名称(数据源名称)
}
// defaultReconnectOption 默认重连选项
func defaultReconnectOption(componentType messageType, componentName string) *reconnectOption {
return &reconnectOption{
maxRetries: 0, // 无限重试
interval: 3 * time.Second,
componentType: componentType,
componentName: componentName,
}
}
// commonReconnect 重连函数NATS、Redis、RabbitMQ 共用)
func commonReconnect(ctx context.Context, connectFn connectFunc, closeFn closeFunc, opt *reconnectOption) error {
if opt == nil {
opt = defaultReconnectOption("unknown", "default")
}
for attempt := 0; opt.maxRetries == 0 || attempt < opt.maxRetries; attempt++ {
err := connectFn(ctx)
if err == nil {
g.Log().Infof(ctx, "✅ 连接成功: type=%s, name=%s, attempt=%d",
opt.componentType, opt.componentName, attempt+1)
return nil
}
// 记录失败日志
g.Log().Warningf(ctx, "⚠️ 连接失败: type=%s, name=%s, attempt=%d, err=%v, 重试中...",
opt.componentType, opt.componentName, attempt+1, err)
// 如果错误信息中包含 "does not exist",则认为是连接失败,不再重试
if strings.Contains(err.Error(), "does not exist") {
return err
}
// 等待一段时间再重试
select {
case <-time.After(opt.interval):
case <-ctx.Done():
if err = closeFn(ctx); err != nil {
return err
}
return ctx.Err()
}
}
return fmt.Errorf("连接失败,已达最大重试次数")
}
// connect 连接函数,直接调用 commonReconnect
func commonConnect(ctx context.Context, componentType messageType, name string, connectFn func(ctx context.Context) error, closeFn closeFunc) error {
opt := defaultReconnectOption(componentType, name)
return commonReconnect(ctx, connectFn, closeFn, opt)
}

View File

@@ -3,11 +3,11 @@ package message
import (
"context"
"fmt"
"github.com/gogf/gf/v2/os/glog"
"strings"
"time"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/util/gconv"
)
@@ -16,6 +16,9 @@ type RedisPublishMsgConfig struct {
Data any
}
type RedisPublishDelayMsgConfig struct {
}
type RedisSubscribeMsgConfig struct {
QueueName string
ConsumerName string
@@ -28,18 +31,22 @@ func (*RedisPublishMsgConfig) GetPublishMsgType() {
}
func (*RedisPublishDelayMsgConfig) GetPublishDelayMsgType() {}
func (*RedisSubscribeMsgConfig) GetSubscribeMsgType() {
}
func init() {
// 注册 Redis 插件(连接由 RegisterPlugin 异步处理)
registerPlugin(MessageRedis, func() messageUtil {
return &redis{}
})
type redis struct {
name string // 数据源名称
}
type redis struct{}
func init() {
// 注册 Redis 插件(默认数据源)
RegisterPlugin(context.Background(), "default", MessageRedis, func() messageUtil {
return &redis{name: "default"}
})
}
// RedisStreamMessage Redis Stream 消息结构
type redisStreamMessage struct {
@@ -47,41 +54,19 @@ type redisStreamMessage struct {
Values map[string]interface{}
}
// Ping 检测 Redis 连接状态
func (c *redis) ping(ctx context.Context) bool {
conn, err := getDefaultDataSource()
if err != nil {
return false
}
return conn.redisPing(ctx)
// Connect 连接 Redis
func (c *redis) Connect(ctx context.Context) error {
return redisConnect(ctx, c.name)
}
// Reconnect 重连 Redis
func (c *redis) reconnect(ctx context.Context) error {
conn, err := getDefaultDataSource()
if err != nil {
return fmt.Errorf("获取默认连接失败: %w", err)
}
if err := conn.redisReconnect(ctx); err != nil {
return fmt.Errorf("redis重连失败: %w", err)
}
return nil
// Ping 检测 Redis 连接状态
func (c *redis) Ping(ctx context.Context) bool {
return redisPing(ctx, c.name)
}
// Close 关闭 Redis 连接
func (c *redis) close(ctx context.Context) error {
conn, err := getDefaultDataSource()
if err != nil {
return fmt.Errorf("获取默认连接失败: %w", err)
}
if err := conn.redisClose(ctx); err != nil {
return fmt.Errorf("关闭redis连接失败: %w", err)
}
return nil
func (c *redis) Close(ctx context.Context) error {
return redisClose(ctx, c.name)
}
// Publish 发布消息
@@ -96,14 +81,16 @@ func (c *redis) Publish(ctx context.Context, msgConfig messagePublishConfig) err
if g.IsEmpty(cfg.Data) {
return fmt.Errorf("数据不能为空")
}
conn, err := getDefaultDataSource()
if err != nil {
return fmt.Errorf("获取默认连接失败: %w", err)
}
if !conn.getIsConnected() {
if err := conn.redisReconnect(ctx); err != nil {
return fmt.Errorf("redis重连失败: %w", err)
rc := getRedisConn(c.name)
if !c.Ping(ctx) {
if err := commonConnect(ctx, MessageRedis, c.name, func(ctx context.Context) error {
return c.Connect(ctx)
}, func(ctx context.Context) error {
return c.Close(ctx)
}); err != nil {
g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageRedis, c.name, err)
return err
}
}
@@ -113,7 +100,7 @@ func (c *redis) Publish(ctx context.Context, msgConfig messagePublishConfig) err
for key, val := range values {
args = append(args, key, val)
}
result, err := conn.getClient().Do(ctx, "XADD", args...)
result, err := rc.Do(ctx, "XADD", args...)
if err != nil {
g.Log().Errorf(ctx, "❌ Redis 发布消息失败: key=%s, err=%v", cfg.QueueName, err)
return err
@@ -122,6 +109,12 @@ func (c *redis) Publish(ctx context.Context, msgConfig messagePublishConfig) err
return nil
}
// PublishDelay 发布延迟消息
func (c *redis) PublishDelay(ctx context.Context, _ messagePublishDelayConfig) error {
g.Log().Errorf(ctx, "❌ Redis 不支持延迟消息")
return fmt.Errorf("❌ Redis 不支持延迟消息")
}
// Subscribe 订阅消息
func (c *redis) Subscribe(ctx context.Context, msgConfig messageSubscribeConfig) error {
cfg, ok := msgConfig.(*RedisSubscribeMsgConfig)
@@ -142,162 +135,92 @@ func (c *redis) Subscribe(ctx context.Context, msgConfig messageSubscribeConfig)
// createSubscribe 内部订阅消息
func (c *redis) createSubscribe(ctx context.Context, key, consumerName string, prefetchCount int, autoAck bool, handler func(ctx context.Context, message map[string]interface{}) error) error {
go func() {
defer func() {
if r := recover(); r != nil {
g.Log().Errorf(ctx, "❌ Redis 消费者 panic: %v", r)
}
}()
retryTicker := time.NewTicker(time.Second)
defer retryTicker.Stop()
LOOP:
err := c.consumeMessages(ctx, key, consumerName, prefetchCount, autoAck, handler)
if err != nil {
// 对于超时错误,返回nil继续循环,而不是返回错误
if strings.Contains(err.Error(), "i/o timeout") || strings.Contains(err.Error(), "timeout") ||
strings.Contains(err.Error(), "context deadline exceeded") || strings.Contains(err.Error(), "context canceled") {
// 重试计数器
var consecutiveErrors int
const maxConsecutiveErrors = 3
for {
select {
case <-ctx.Done():
g.Log().Infof(ctx, "🔕 Redis 消费者停止: topic=%s", key)
return
case <-retryTicker.C:
err := c.consumeMessages(ctx, key, consumerName, prefetchCount, autoAck, handler)
if err != nil {
// 对于超时错误,返回nil继续循环,而不是返回错误
if strings.Contains(err.Error(), "i/o timeout") || strings.Contains(err.Error(), "timeout") ||
strings.Contains(err.Error(), "context deadline exceeded") || strings.Contains(err.Error(), "context canceled") {
consecutiveErrors++
if consecutiveErrors > maxConsecutiveErrors {
g.Log().Errorf(ctx, "Max retries exceeded, giving up")
return
}
backoffTime := 5 * time.Second
g.Log().Warningf(ctx, "⚠️ 等待 %v 后重试...", backoffTime)
time.Sleep(backoffTime)
} else {
// 非超时错误(严重错误)
consecutiveErrors = 0 // 重置计数
g.Log().Errorf(ctx, "严重错误,立即重试: %v", err)
// 短暂等待后重试
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
// 继续循环
}
}
} else {
// 成功时重置错误计数器
consecutiveErrors = 0
}
}
time.Sleep(time.Second)
goto LOOP
} else {
g.Log().Errorf(ctx, "❌ 严重错误: %v", err)
}
}()
return nil
}
time.Sleep(time.Second)
goto LOOP
}
// consumeMessages 消费消息
func (c *redis) consumeMessages(ctx context.Context, key, consumerName string, prefetchCount int, autoAck bool, handler func(ctx context.Context, message map[string]interface{}) error) error {
conn, err := getDefaultDataSource()
if err != nil {
return fmt.Errorf("获取默认连接失败: %w", err)
if !c.Ping(ctx) {
if err := commonConnect(ctx, MessageRedis, c.name, func(ctx context.Context) error {
return c.Connect(ctx)
}, func(ctx context.Context) error {
return c.Close(ctx)
}); err != nil {
g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageRedis, c.name, err)
return err
}
}
if !conn.getIsConnected() {
if err := conn.redisReconnect(ctx); err != nil {
return fmt.Errorf("redis重连失败: %w", err)
}
rc := getRedisConn(c.name)
if rc == nil {
g.Log().Errorf(ctx, "❌ Redis [%s] 连接不存在", c.name)
return fmt.Errorf("Redis 连接不存在")
}
// 检查消费者组是否存在
if err := c.createStreamGroup(ctx, key); err != nil {
return fmt.Errorf("create stream group failed: %w", err)
groupName := "default"
_, err := rc.Do(ctx, "XGROUP", "CREATE", key, groupName, "0", "MKSTREAM")
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, "BUSYGROUP") && strings.Contains(errStr, "already exists") {
glog.Infof(ctx, "✅ Redis [%s] 消费者组已存在: %s", c.name, key)
return nil
}
g.Log().Errorf(ctx, "❌ 创建消费组失败: key=%s, err=%v", key, err)
return err
}
glog.Infof(ctx, "✅ Redis [%s] 消费者组创建成功: %s", c.name, key)
// 使用带重试的命令执行
result, err := conn.getClient().Do(ctx, "XREADGROUP", "GROUP", "default", consumerName, "COUNT", prefetchCount, "BLOCK", 0, "STREAMS", key, ">")
result, err := rc.Do(ctx, "XREADGROUP", "GROUP", groupName, consumerName, "COUNT", prefetchCount, "BLOCK", 0, "STREAMS", key, ">")
if err != nil {
if strings.Contains(err.Error(), "i/o timeout") || strings.Contains(err.Error(), "timeout") ||
strings.Contains(err.Error(), "context deadline exceeded") || strings.Contains(err.Error(), "context canceled") {
}
return err
}
messages, err := c.parseStreamResult(result)
if err != nil {
g.Log().Errorf(ctx, "❌ 解析消息失败: %v", err)
return err
}
for _, msg := range messages {
// 处理消息
if err := handler(ctx, msg.Values); err != nil {
g.Log().Errorf(ctx, "❌ 消息处理失败: messageID=%s, err=%v", msg.ID, err)
continue
}
// ACK 消息
if autoAck {
if err := c.ackMessage(ctx, key, "default", msg.ID); err != nil {
g.Log().Errorf(ctx, "❌ ACK 消息失败: messageID=%s, err=%v", msg.ID, err)
// 如果不是自动ACK,则跳过当前消息
if !autoAck {
continue
}
} else {
g.Log().Infof(ctx, "✅ 消息处理成功: messageID=%s", msg.ID)
}
// ACK 消息
args := make([]interface{}, 0, len(msg.ID)+2)
args = append(args, key, groupName, msg.ID)
_, err = rc.Do(ctx, "XACK", args...)
if err != nil {
g.Log().Errorf(ctx, "❌ ACK 消息失败: messageID=%s, err=%v", msg.ID, err)
} else {
g.Log().Infof(ctx, "✅ ACK 消息成功: messageID=%s", msg.ID)
}
}
return nil
}
// createStreamGroup 内部单个创建消费组
func (c *redis) createStreamGroup(ctx context.Context, key string) error {
conn, err := getDefaultDataSource()
if err != nil {
return fmt.Errorf("获取默认连接失败: %w", err)
}
if !conn.getIsConnected() {
if err := conn.redisReconnect(ctx); err != nil {
return fmt.Errorf("redis重连失败: %w", err)
}
}
groupName := "default"
_, err = conn.getClient().Do(ctx, "XGROUP", "CREATE", key, groupName, "0", "MKSTREAM")
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, "BUSYGROUP") && strings.Contains(errStr, "already exists") {
glog.Infof(ctx, "✅ Redis 消费者组已存在: %s", groupName)
return nil
}
return fmt.Errorf("初始化消费者组失败: %w", err)
}
glog.Infof(ctx, "✅ Redis 消费者组创建成功: %s", groupName)
return nil
}
// ackMessage ACK 消息
func (c *redis) ackMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error {
conn, err := getDefaultDataSource()
if err != nil {
return fmt.Errorf("获取默认连接失败: %w", err)
}
if !conn.getIsConnected() {
if err := conn.redisReconnect(ctx); err != nil {
return fmt.Errorf("redis重连失败: %w", err)
}
}
args := make([]interface{}, 0, len(messageIDs)+2)
args = append(args, streamKey, groupName)
for _, id := range messageIDs {
args = append(args, id)
}
_, err = conn.getClient().Do(ctx, "XACK", args...)
return err
}
// parseStreamResult 解析 Stream 结果
func (c *redis) parseStreamResult(result interface{}) ([]redisStreamMessage, error) {
if result == nil {

125
message/store.go Normal file
View File

@@ -0,0 +1,125 @@
// Copyright 2019-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package message
import "fmt"
type RetentionPolicy int
const (
// LimitsPolicy (default) means that messages are retained until any given limit is reached.
// This could be one of MaxMsgs, MaxBytes, or MaxAge.
LimitsPolicy RetentionPolicy = iota
// InterestPolicy specifies that when all known consumers have acknowledged a message it can be removed.
InterestPolicy
// WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed.
WorkQueuePolicy
)
// MarshalJSON 将 RetentionPolicy 序列化为字符串
func (rp RetentionPolicy) MarshalJSON() ([]byte, error) {
switch rp {
case LimitsPolicy:
return []byte(`"limits"`), nil
case InterestPolicy:
return []byte(`"interest"`), nil
case WorkQueuePolicy:
return []byte(`"workqueue"`), nil
default:
return nil, fmt.Errorf("can not marshal %v", rp)
}
}
// UnmarshalJSON 将字符串反序列化为 RetentionPolicy
func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case `"limits"`:
*rp = LimitsPolicy
case `"interest"`:
*rp = InterestPolicy
case `"workqueue"`:
*rp = WorkQueuePolicy
default:
return fmt.Errorf("unknown retention policy: %s", string(data))
}
return nil
}
type DiscardPolicy int
const (
// DiscardOld will remove older messages to return to the limits.
DiscardOld = iota
// DiscardNew will error on a StoreMsg call
DiscardNew
)
// MarshalJSON 将 DiscardPolicy 序列化为字符串
func (dp DiscardPolicy) MarshalJSON() ([]byte, error) {
switch dp {
case DiscardOld:
return []byte(`"old"`), nil
case DiscardNew:
return []byte(`"new"`), nil
default:
return nil, fmt.Errorf("can not marshal %v", dp)
}
}
// UnmarshalJSON 将字符串反序列化为 DiscardPolicy
func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case `"old"`:
*dp = DiscardOld
case `"new"`:
*dp = DiscardNew
default:
return fmt.Errorf("unknown discard policy: %s", string(data))
}
return nil
}
type StorageType int
const (
// FileStorage specifies on disk, designated by the JetStream config StoreDir.
FileStorage = StorageType(22)
// MemoryStorage specifies in memory only.
MemoryStorage = StorageType(33)
)
// MarshalJSON 将 StorageType 序列化为字符串
func (st StorageType) MarshalJSON() ([]byte, error) {
switch st {
case MemoryStorage:
return []byte(`"memory"`), nil
case FileStorage:
return []byte(`"file"`), nil
default:
return nil, fmt.Errorf("can not marshal %v", st)
}
}
// UnmarshalJSON 将字符串反序列化为 StorageType
func (st *StorageType) UnmarshalJSON(data []byte) error {
switch string(data) {
case `"memory"`:
*st = MemoryStorage
case `"file"`:
*st = FileStorage
default:
return fmt.Errorf("unknown storage type: %s", string(data))
}
return nil
}

212
message/stream.go Normal file
View File

@@ -0,0 +1,212 @@
// Copyright 2019-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package message
import (
"fmt"
"time"
)
// StreamConfig will determine the name, subjects and retention policy
// for a given stream. If subjects is empty the name will be used.
type StreamConfig struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
MaxAge time.Duration `json:"max_age"`
MaxMsgsPer int64 `json:"max_msgs_per_subject"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Discard DiscardPolicy `json:"discard"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Compression StoreCompression `json:"compression"`
FirstSeq uint64 `json:"first_seq,omitempty"`
// Allow applying a subject transform to incoming messages before doing anything else
SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"`
// Allow republish of the message after being sequenced and stored.
RePublish *RePublish `json:"republish,omitempty"`
// Allow higher performance, direct access to get individual messages. E.g. KeyValue
AllowDirect bool `json:"allow_direct"`
// Allow higher performance and unified direct access for mirrors as well.
MirrorDirect bool `json:"mirror_direct"`
// Allow KV like semantics to also discard new on a per subject basis
DiscardNewPer bool `json:"discard_new_per_subject,omitempty"`
// Optional qualifiers. These can not be modified after set to true.
// Sealed will seal a stream so no messages can get out or in.
Sealed bool `json:"sealed"`
// DenyDelete will restrict the ability to delete messages.
DenyDelete bool `json:"deny_delete"`
// DenyPurge will restrict the ability to purge messages.
DenyPurge bool `json:"deny_purge"`
// AllowRollup allows messages to be placed into the system and purge
// all older messages using a special msg header.
AllowRollup bool `json:"allow_rollup_hdrs"`
// The following defaults will apply to consumers when created against
// this stream, unless overridden manually.
// TODO(nat): Can/should we name these better?
ConsumerLimits StreamConsumerLimits `json:"consumer_limits"`
// AllowMsgTTL allows header initiated per-message TTLs. If disabled,
// then the `NATS-TTL` header will be ignored.
AllowMsgTTL bool `json:"allow_msg_ttl"`
// SubjectDeleteMarkerTTL sets the TTL of delete marker messages left behind by
// subject delete markers.
SubjectDeleteMarkerTTL time.Duration `json:"subject_delete_marker_ttl,omitempty"`
// AllowMsgCounter allows a stream to use (only) counter CRDTs.
AllowMsgCounter bool `json:"allow_msg_counter,omitempty"`
// AllowAtomicPublish allows atomic batch publishing into the stream.
AllowAtomicPublish bool `json:"allow_atomic,omitempty"`
// AllowMsgSchedules allows the scheduling of messages.
AllowMsgSchedules bool `json:"allow_msg_schedules,omitempty"`
// PersistMode allows to opt-in to different persistence mode settings.
PersistMode PersistModeType `json:"persist_mode,omitempty"`
// Metadata is additional metadata for the Stream.
Metadata map[string]string `json:"metadata,omitempty"`
}
// Used to guide placement of streams and meta controllers in clustered JetStream.
type Placement struct {
Cluster string `json:"cluster,omitempty"`
Tags []string `json:"tags,omitempty"`
Preferred string `json:"preferred,omitempty"`
}
// StreamSource dictates how streams can source from other streams.
type StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
External *ExternalStream `json:"external,omitempty"`
// Internal
iname string // For indexing when stream names are the same for multiple sources.
}
// SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received
type SubjectTransformConfig struct {
Source string `json:"src"`
Destination string `json:"dest"`
}
// ExternalStream allows you to qualify access to a stream source in another account or domain.
type ExternalStream struct {
ApiPrefix string `json:"api"`
DeliverPrefix string `json:"deliver"`
}
// RePublish is for republishing messages once committed to a stream.
type RePublish struct {
Source string `json:"src,omitempty"`
Destination string `json:"dest"`
HeadersOnly bool `json:"headers_only,omitempty"`
}
type StreamConsumerLimits struct {
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
}
// PersistModeType determines what persistence mode the stream uses.
type PersistModeType int
const (
// DefaultPersistMode specifies the default persist mode. Writes to the stream will immediately be flushed.
// The publish acknowledgement will be sent after the persisting completes.
DefaultPersistMode = PersistModeType(iota)
// AsyncPersistMode specifies writes to the stream will be flushed asynchronously.
// The publish acknowledgement may be sent before the persisting completes.
// This means writes could be lost if they weren't flushed prior to a hard kill of the server.
AsyncPersistMode
)
// MarshalJSON 将 PersistModeType 序列化为字符串
func (pm PersistModeType) MarshalJSON() ([]byte, error) {
switch pm {
case DefaultPersistMode:
return []byte(`"default"`), nil
case AsyncPersistMode:
return []byte(`"async"`), nil
default:
return nil, fmt.Errorf("can not marshal %v", pm)
}
}
// UnmarshalJSON 将字符串反序列化为 PersistModeType
func (pm *PersistModeType) UnmarshalJSON(data []byte) error {
switch string(data) {
case `"default"`:
*pm = DefaultPersistMode
case `"async"`:
*pm = AsyncPersistMode
default:
return fmt.Errorf("unknown persist mode: %s", string(data))
}
return nil
}
type StoreCompression uint8
const (
NoCompression StoreCompression = iota
S2Compression
)
// MarshalJSON 将 StoreCompression 序列化为字符串
func (sc StoreCompression) MarshalJSON() ([]byte, error) {
switch sc {
case NoCompression:
return []byte(`"none"`), nil
case S2Compression:
return []byte(`"s2"`), nil
default:
return nil, fmt.Errorf("can not marshal %v", sc)
}
}
// UnmarshalJSON 将字符串反序列化为 StoreCompression
func (sc *StoreCompression) UnmarshalJSON(data []byte) error {
switch string(data) {
case `"none"`:
*sc = NoCompression
case `"s2"`:
*sc = S2Compression
default:
return fmt.Errorf("unknown store compression: %s", string(data))
}
return nil
}