From 0738f6f9570613037c38ca2c441e6e6480837064 Mon Sep 17 00:00:00 2001 From: Cold <16419454+cold502@user.noreply.gitee.com> Date: Wed, 3 Dec 2025 15:33:32 +0800 Subject: [PATCH] goroutine --- mongo/mongo.go | 14 +++++++++ rabbitmq/client.go | 68 +++++++++++++++++++++++++++++++++++--------- rabbitmq/consumer.go | 66 ++++++++++++++++++++++++++++-------------- 3 files changed, 114 insertions(+), 34 deletions(-) diff --git a/mongo/mongo.go b/mongo/mongo.go index 92f0c2a..b7928ff 100644 --- a/mongo/mongo.go +++ b/mongo/mongo.go @@ -56,6 +56,20 @@ func Find(ctx context.Context, filter bson.M, result interface{}, collection str return } +// FindWithoutTenant 查询多条记录(不过滤租户,用于导出等场景) +func FindWithoutTenant(ctx context.Context, filter bson.M, result interface{}, collection string, opts ...options.Lister[options.FindOptions]) (err error) { + if err = utils.ValidStructPtr(result); err != nil { + return + } + // 不添加 tenantId 过滤条件 + cur, err := db.Collection(collection).Find(ctx, filter, opts...) + if err != nil { + return + } + err = cur.All(ctx, result) + return +} + // FindOne 查询1条记录 func FindOne(ctx context.Context, filter bson.M, result interface{}, collection string, opts ...options.Lister[options.FindOneOptions]) (err error) { if len(filter) == 0 { diff --git a/rabbitmq/client.go b/rabbitmq/client.go index 7738082..4c0a05f 100644 --- a/rabbitmq/client.go +++ b/rabbitmq/client.go @@ -11,10 +11,12 @@ import ( ) var ( - conn *amqp.Connection - channel *amqp.Channel - once sync.Once - mu sync.RWMutex + conn *amqp.Connection + channel *amqp.Channel + once sync.Once + mu sync.RWMutex + closeWatcher chan struct{} // 用于停止监听 goroutine + watcherStarted bool // 防止重复启动监听 ) // Config RabbitMQ 配置 @@ -53,8 +55,14 @@ func Init(ctx context.Context, cfg *Config) error { return } - // 监听连接关闭 - go handleConnectionClose(ctx) + // 初始化关闭监听器 + closeWatcher = make(chan struct{}) + + // 监听连接关闭(只启动一次) + if !watcherStarted { + go handleConnectionClose(ctx) + watcherStarted = true + } g.Log().Info(ctx, "RabbitMQ 连接成功") }) @@ -101,13 +109,38 @@ func GetConnection() (*amqp.Connection, error) { // handleConnectionClose 监听连接关闭并重连 func handleConnectionClose(ctx context.Context) { - closeErr := make(chan *amqp.Error) - conn.NotifyClose(closeErr) + for { + // 检查是否需要停止监听 + select { + case <-closeWatcher: + g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态") + return + default: + } - err := <-closeErr - if err != nil { - g.Log().Errorf(ctx, "RabbitMQ 连接关闭: %v,尝试重连...", err) - reconnect(ctx) + mu.RLock() + currentConn := conn + mu.RUnlock() + + if currentConn == nil { + return + } + + // 创建关闭通知 channel + closeErr := make(chan *amqp.Error, 1) + currentConn.NotifyClose(closeErr) + + // 等待连接关闭或停止信号 + select { + case err := <-closeErr: + if err != nil { + g.Log().Errorf(ctx, "RabbitMQ 连接关闭: %v,尝试重连...", err) + reconnect(ctx) + } + case <-closeWatcher: + g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态") + return + } } } @@ -149,7 +182,7 @@ func reconnect(ctx context.Context) { } g.Log().Info(ctx, "RabbitMQ 重连成功") - go handleConnectionClose(ctx) + // 不再重复启动监听 goroutine return } @@ -161,10 +194,17 @@ func Close(ctx context.Context) error { mu.Lock() defer mu.Unlock() + // 停止监听 goroutine + if closeWatcher != nil { + close(closeWatcher) + closeWatcher = nil + } + if channel != nil { if err := channel.Close(); err != nil { g.Log().Errorf(ctx, "关闭 RabbitMQ Channel 失败: %v", err) } + channel = nil } if conn != nil { @@ -172,8 +212,10 @@ func Close(ctx context.Context) error { g.Log().Errorf(ctx, "关闭 RabbitMQ 连接失败: %v", err) return err } + conn = nil } + watcherStarted = false g.Log().Info(ctx, "RabbitMQ 连接已关闭") return nil } diff --git a/rabbitmq/consumer.go b/rabbitmq/consumer.go index f9fe3ae..1b0746d 100644 --- a/rabbitmq/consumer.go +++ b/rabbitmq/consumer.go @@ -19,7 +19,8 @@ type Consumer struct { prefetchCount int // QoS: 预取数量(并发控制) autoAck bool // 是否自动确认 handler MessageHandler - workerCount int // worker 数量 + workerCount int // worker 数量 + cancel context.CancelFunc // 用于停止 worker } // ConsumerOption 消费者配置选项 @@ -74,6 +75,9 @@ func NewConsumer(queue string, handler MessageHandler, opts ...ConsumerOption) * // Start 启动消费者 func (c *Consumer) Start(ctx context.Context) error { + // 创建可取消的 context + workerCtx, cancel := context.WithCancel(ctx) + c.cancel = cancel ch, err := GetChannel() if err != nil { return err @@ -108,7 +112,7 @@ func (c *Consumer) Start(ctx context.Context) error { // 启动多个 worker for i := 0; i < c.workerCount; i++ { - go c.worker(ctx, i, msgs) + go c.worker(workerCtx, i, msgs) } return nil @@ -118,29 +122,40 @@ func (c *Consumer) Start(ctx context.Context) error { func (c *Consumer) worker(ctx context.Context, workerID int, msgs <-chan amqp.Delivery) { g.Log().Debugf(ctx, "Worker %d 已启动", workerID) - for msg := range msgs { - // 处理消息 - err := c.handler(ctx, msg.Body) - - if err != nil { - g.Log().Errorf(ctx, "Worker %d 处理消息失败: %v", workerID, err) - - // 如果不是自动确认,需要手动 Nack - if !c.autoAck { - // requeue=false: 不重新入队,进入死信队列 - msg.Nack(false, false) - } - } else { - // 处理成功,手动确认 - if !c.autoAck { - msg.Ack(false) + for { + select { + case <-ctx.Done(): + // Context 取消,退出 + g.Log().Infof(ctx, "Worker %d 收到停止信号,正在退出", workerID) + return + case msg, ok := <-msgs: + if !ok { + // Channel 关闭,退出 + g.Log().Infof(ctx, "Worker %d 消息通道已关闭,退出", workerID) + return } - g.Log().Debugf(ctx, "Worker %d 处理消息成功", workerID) + // 处理消息 + err := c.handler(ctx, msg.Body) + + if err != nil { + g.Log().Errorf(ctx, "Worker %d 处理消息失败: %v", workerID, err) + + // 如果不是自动确认,需要手动 Nack + if !c.autoAck { + // requeue=false: 不重新入队,进入死信队列 + msg.Nack(false, false) + } + } else { + // 处理成功,手动确认 + if !c.autoAck { + msg.Ack(false) + } + + g.Log().Debugf(ctx, "Worker %d 处理消息成功", workerID) + } } } - - g.Log().Debugf(ctx, "Worker %d 已停止", workerID) } // StartTypedConsumer 启动类型化消费者(自动反序列化) @@ -163,3 +178,12 @@ func StartTypedConsumer[T any]( consumer := NewConsumer(queue, wrappedHandler, opts...) return consumer.Start(ctx) } + +// Stop 停止消费者 +func (c *Consumer) Stop(ctx context.Context) { + if c.cancel != nil { + g.Log().Infof(ctx, "正在停止消费者: queue=%s", c.queue) + c.cancel() + c.cancel = nil + } +}