package message import ( "context" "fmt" "strings" "time" "github.com/gogf/gf/v2/frame/g" ) // connectFunc 连接函数类型 type connectFunc func(ctx context.Context) error // closeFunc 关闭函数类型 type closeFunc func(ctx context.Context) error // reconnectOption 重连选项 type reconnectOption struct { maxRetries int // 最大重试次数,0 表示无限重试 interval time.Duration // 重试间隔 componentType messageType // 组件类型(nats/redis/rabbitmq) componentName string // 组件名称(数据源名称) } // defaultReconnectOption 默认重连选项 func defaultReconnectOption(componentType messageType, componentName string) *reconnectOption { return &reconnectOption{ maxRetries: 0, // 无限重试 interval: 3 * time.Second, componentType: componentType, componentName: componentName, } } // commonReconnect 重连函数(NATS、Redis、RabbitMQ 共用) func commonReconnect(ctx context.Context, connectFn connectFunc, closeFn closeFunc, opt *reconnectOption) error { if opt == nil { opt = defaultReconnectOption("unknown", "default") } for attempt := 0; opt.maxRetries == 0 || attempt < opt.maxRetries; attempt++ { err := connectFn(ctx) if err == nil { g.Log().Infof(ctx, "✅ 连接成功: type=%s, name=%s, attempt=%d", opt.componentType, opt.componentName, attempt+1) return nil } // 记录失败日志 g.Log().Warningf(ctx, "⚠️ 连接失败: type=%s, name=%s, attempt=%d, err=%v, 重试中...", opt.componentType, opt.componentName, attempt+1, err) // 如果错误信息中包含 "does not exist",则认为是连接失败,不再重试 if strings.Contains(err.Error(), "does not exist") { return err } // 等待一段时间再重试 select { case <-time.After(opt.interval): case <-ctx.Done(): if err = closeFn(ctx); err != nil { return err } return ctx.Err() } } return fmt.Errorf("连接失败,已达最大重试次数") } // connect 连接函数,直接调用 commonReconnect func commonConnect(ctx context.Context, componentType messageType, name string, connectFn func(ctx context.Context) error, closeFn closeFunc) error { opt := defaultReconnectOption(componentType, name) return commonReconnect(ctx, connectFn, closeFn, opt) }