Files
common/message/msg_plugin_manager.go

115 lines
3.2 KiB
Go
Raw Normal View History

package message
import (
"context"
"fmt"
"time"
"github.com/gogf/gf/v2/frame/g"
"sync"
)
// MessageType 消息队列类型
type messageType string
const (
// MessageRedis Redis 消息队列
MessageRedis messageType = "redis"
// MessageRabbitMQ RabbitMQ 消息队列
MessageRabbitMQ messageType = "rabbitmq"
// MessageNATS NATS 消息队列
MessageNATS messageType = "nats"
)
// configFactory 消息队列配置工厂函数类型
type configFactory func() messageUtil
// PluginManager 消息队列插件管理器
type pluginManager struct {
mu sync.RWMutex
instances map[messageType]messageUtil // 已连接的插件实例
}
var (
defaultPluginManager = newPluginManager()
)
// newPluginManager 创建插件管理器
func newPluginManager() *pluginManager {
return &pluginManager{
instances: make(map[messageType]messageUtil),
}
}
// register 注册插件(内部方法)
func (m *pluginManager) register(msgType messageType, instance messageUtil) error {
m.mu.Lock()
defer m.mu.Unlock()
m.instances[msgType] = instance
return nil
}
// RegisterPlugin 注册消息队列插件
// 所有插件必须通过此方法注册,自动进行连接检测
// 只有连接成功的插件才会被注册,连接失败的插件不会被注册
// 异步无限重连,只有连接成功了才注册
// name: 数据源名称,用于标识不同的连接实例
func RegisterPlugin(ctx context.Context, name string, msgType messageType, factory configFactory) error {
if factory == nil {
g.Log().Errorf(ctx, "❌ factory cannot be nil")
return fmt.Errorf("factory cannot be nil")
}
// 开启异步连接,无限重试直到成功
go func() {
// 创建实例
instance := factory()
// 创建通知 channel
pluginKey := fmt.Sprintf("%s-%s", msgType, name)
if !instance.Ping(ctx) {
// 使用统一的重连函数
if err := commonConnect(ctx, msgType, name, func(ctx context.Context) error {
return instance.Connect(ctx)
}, func(ctx context.Context) error {
return instance.Close(ctx)
}); err != nil {
g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", msgType, name, err)
return
}
}
// 连接成功,注册插件
defaultPluginManager.mu.Lock()
defaultPluginManager.instances[messageType(pluginKey)] = instance
defaultPluginManager.mu.Unlock()
g.Log().Infof(ctx, "✅ [%s][%s] 插件注册成功", msgType, name)
}()
return nil
}
// GetMsgPlugin 获取消息队列插件(默认数据源),如果未注册则等待
func GetMsgPlugin(ctx context.Context, msgType messageType) (messageUtil, error) {
return GetMsgPluginWithName(ctx, msgType, "default")
}
// GetMsgPluginWithName 获取指定数据源的消息队列插件,如果未注册则等待直到超时
func GetMsgPluginWithName(ctx context.Context, msgType messageType, name string) (messageUtil, error) {
pluginKey := fmt.Sprintf("%s-%s", msgType, name)
for {
defaultPluginManager.mu.RLock()
instance, ok := defaultPluginManager.instances[messageType(pluginKey)]
defaultPluginManager.mu.RUnlock()
if ok {
return instance, nil
}
// 未注册,等待一段时间后重试
select {
case <-ctx.Done():
return nil, fmt.Errorf("wait for plugin ready canceled: %s with datasource: %s", msgType, name)
default:
time.Sleep(3 * time.Second)
}
}
}