package message import ( "context" "fmt" "time" "github.com/gogf/gf/v2/frame/g" "sync" ) // MessageType 消息队列类型 type messageType string const ( // MessageRedis Redis 消息队列 MessageRedis messageType = "redis" // MessageRabbitMQ RabbitMQ 消息队列 MessageRabbitMQ messageType = "rabbitmq" // MessageNATS NATS 消息队列 MessageNATS messageType = "nats" ) // configFactory 消息队列配置工厂函数类型 type configFactory func() messageUtil // PluginManager 消息队列插件管理器 type pluginManager struct { mu sync.RWMutex instances map[messageType]messageUtil // 已连接的插件实例 } var ( defaultPluginManager = newPluginManager() ) // newPluginManager 创建插件管理器 func newPluginManager() *pluginManager { return &pluginManager{ instances: make(map[messageType]messageUtil), } } // register 注册插件(内部方法) func (m *pluginManager) register(msgType messageType, instance messageUtil) error { m.mu.Lock() defer m.mu.Unlock() m.instances[msgType] = instance return nil } // RegisterPlugin 注册消息队列插件 // 所有插件必须通过此方法注册,自动进行连接检测 // 只有连接成功的插件才会被注册,连接失败的插件不会被注册 // 异步无限重连,只有连接成功了才注册 // name: 数据源名称,用于标识不同的连接实例 func RegisterPlugin(ctx context.Context, name string, msgType messageType, factory configFactory) error { if factory == nil { g.Log().Errorf(ctx, "❌ factory cannot be nil") return fmt.Errorf("factory cannot be nil") } // 开启异步连接,无限重试直到成功 go func() { // 创建实例 instance := factory() // 创建通知 channel pluginKey := fmt.Sprintf("%s-%s", msgType, name) if !instance.Ping(ctx) { // 使用统一的重连函数 if err := commonConnect(ctx, msgType, name, func(ctx context.Context) error { return instance.Connect(ctx) }, func(ctx context.Context) error { return instance.Close(ctx) }); err != nil { g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", msgType, name, err) return } } // 连接成功,注册插件 defaultPluginManager.mu.Lock() defaultPluginManager.instances[messageType(pluginKey)] = instance defaultPluginManager.mu.Unlock() g.Log().Infof(ctx, "✅ [%s][%s] 插件注册成功", msgType, name) }() return nil } // GetMsgPlugin 获取消息队列插件(默认数据源),如果未注册则等待 func GetMsgPlugin(ctx context.Context, msgType messageType) (messageUtil, error) { return GetMsgPluginWithName(ctx, msgType, "default") } // GetMsgPluginWithName 获取指定数据源的消息队列插件,如果未注册则等待直到超时 func GetMsgPluginWithName(ctx context.Context, msgType messageType, name string) (messageUtil, error) { pluginKey := fmt.Sprintf("%s-%s", msgType, name) for { defaultPluginManager.mu.RLock() instance, ok := defaultPluginManager.instances[messageType(pluginKey)] defaultPluginManager.mu.RUnlock() if ok { return instance, nil } // 未注册,等待一段时间后重试 select { case <-ctx.Done(): return nil, fmt.Errorf("wait for plugin ready canceled: %s with datasource: %s", msgType, name) default: time.Sleep(3 * time.Second) } } }