diff --git a/rabbitmq/consumer.go b/rabbitmq/consumer.go index 305b3f7..160d4bd 100644 --- a/rabbitmq/consumer.go +++ b/rabbitmq/consumer.go @@ -21,6 +21,7 @@ type Consumer struct { handler MessageHandler workerCount int // worker 数量 cancel context.CancelFunc // 用于停止 worker + channel *amqp.Channel // 独立Channel(避免并发冲突) } // ConsumerOption 消费者配置选项 @@ -78,10 +79,17 @@ func (c *Consumer) Start(ctx context.Context) (err error) { // 创建可取消的 context workerCtx, cancel := context.WithCancel(ctx) c.cancel = cancel - ch, err := GetChannel() + + // 为每个消费者创建独立Channel(避免并发冲突) + conn, err := GetConnection() if err != nil { - return err + return gerror.Wrap(err, "获取RabbitMQ连接失败") } + c.channel, err = conn.Channel() + if err != nil { + return gerror.Wrap(err, "创建独立Channel失败") + } + ch := c.channel // 声明队列(如果不存在则创建) // 注意:Queue到Exchange的绑定应由message服务在发送响应时动态创建,或通过运维工具提前配置 @@ -196,8 +204,13 @@ func StartTypedConsumer[T any]( // Stop 停止消费者 func (c *Consumer) Stop(ctx context.Context) { if c.cancel != nil { - g.Log().Infof(ctx, "正在停止消费者: queue=%s", c.queue) c.cancel() - c.cancel = nil } + // 关闭独立Channel + if c.channel != nil && !c.channel.IsClosed() { + c.channel.Close() + g.Log().Debugf(ctx, "消费者Channel已关闭: queue=%s", c.queue) + } + g.Log().Infof(ctx, "正在停止消费者: queue=%s", c.queue) + c.cancel = nil }