goroutine

This commit is contained in:
Cold
2025-12-03 15:33:32 +08:00
committed by 张斌
parent 90780f6362
commit 0738f6f957
3 changed files with 114 additions and 34 deletions

View File

@@ -56,6 +56,20 @@ func Find(ctx context.Context, filter bson.M, result interface{}, collection str
return return
} }
// FindWithoutTenant 查询多条记录(不过滤租户,用于导出等场景)
func FindWithoutTenant(ctx context.Context, filter bson.M, result interface{}, collection string, opts ...options.Lister[options.FindOptions]) (err error) {
if err = utils.ValidStructPtr(result); err != nil {
return
}
// 不添加 tenantId 过滤条件
cur, err := db.Collection(collection).Find(ctx, filter, opts...)
if err != nil {
return
}
err = cur.All(ctx, result)
return
}
// FindOne 查询1条记录 // FindOne 查询1条记录
func FindOne(ctx context.Context, filter bson.M, result interface{}, collection string, opts ...options.Lister[options.FindOneOptions]) (err error) { func FindOne(ctx context.Context, filter bson.M, result interface{}, collection string, opts ...options.Lister[options.FindOneOptions]) (err error) {
if len(filter) == 0 { if len(filter) == 0 {

View File

@@ -11,10 +11,12 @@ import (
) )
var ( var (
conn *amqp.Connection conn *amqp.Connection
channel *amqp.Channel channel *amqp.Channel
once sync.Once once sync.Once
mu sync.RWMutex mu sync.RWMutex
closeWatcher chan struct{} // 用于停止监听 goroutine
watcherStarted bool // 防止重复启动监听
) )
// Config RabbitMQ 配置 // Config RabbitMQ 配置
@@ -53,8 +55,14 @@ func Init(ctx context.Context, cfg *Config) error {
return return
} }
// 监听连接关闭 // 初始化关闭监听器
go handleConnectionClose(ctx) closeWatcher = make(chan struct{})
// 监听连接关闭(只启动一次)
if !watcherStarted {
go handleConnectionClose(ctx)
watcherStarted = true
}
g.Log().Info(ctx, "RabbitMQ 连接成功") g.Log().Info(ctx, "RabbitMQ 连接成功")
}) })
@@ -101,13 +109,38 @@ func GetConnection() (*amqp.Connection, error) {
// handleConnectionClose 监听连接关闭并重连 // handleConnectionClose 监听连接关闭并重连
func handleConnectionClose(ctx context.Context) { func handleConnectionClose(ctx context.Context) {
closeErr := make(chan *amqp.Error) for {
conn.NotifyClose(closeErr) // 检查是否需要停止监听
select {
case <-closeWatcher:
g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态")
return
default:
}
err := <-closeErr mu.RLock()
if err != nil { currentConn := conn
g.Log().Errorf(ctx, "RabbitMQ 连接关闭: %v尝试重连...", err) mu.RUnlock()
reconnect(ctx)
if currentConn == nil {
return
}
// 创建关闭通知 channel
closeErr := make(chan *amqp.Error, 1)
currentConn.NotifyClose(closeErr)
// 等待连接关闭或停止信号
select {
case err := <-closeErr:
if err != nil {
g.Log().Errorf(ctx, "RabbitMQ 连接关闭: %v尝试重连...", err)
reconnect(ctx)
}
case <-closeWatcher:
g.Log().Info(ctx, "停止监听 RabbitMQ 连接状态")
return
}
} }
} }
@@ -149,7 +182,7 @@ func reconnect(ctx context.Context) {
} }
g.Log().Info(ctx, "RabbitMQ 重连成功") g.Log().Info(ctx, "RabbitMQ 重连成功")
go handleConnectionClose(ctx) // 不再重复启动监听 goroutine
return return
} }
@@ -161,10 +194,17 @@ func Close(ctx context.Context) error {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
// 停止监听 goroutine
if closeWatcher != nil {
close(closeWatcher)
closeWatcher = nil
}
if channel != nil { if channel != nil {
if err := channel.Close(); err != nil { if err := channel.Close(); err != nil {
g.Log().Errorf(ctx, "关闭 RabbitMQ Channel 失败: %v", err) g.Log().Errorf(ctx, "关闭 RabbitMQ Channel 失败: %v", err)
} }
channel = nil
} }
if conn != nil { if conn != nil {
@@ -172,8 +212,10 @@ func Close(ctx context.Context) error {
g.Log().Errorf(ctx, "关闭 RabbitMQ 连接失败: %v", err) g.Log().Errorf(ctx, "关闭 RabbitMQ 连接失败: %v", err)
return err return err
} }
conn = nil
} }
watcherStarted = false
g.Log().Info(ctx, "RabbitMQ 连接已关闭") g.Log().Info(ctx, "RabbitMQ 连接已关闭")
return nil return nil
} }

View File

@@ -19,7 +19,8 @@ type Consumer struct {
prefetchCount int // QoS: 预取数量(并发控制) prefetchCount int // QoS: 预取数量(并发控制)
autoAck bool // 是否自动确认 autoAck bool // 是否自动确认
handler MessageHandler handler MessageHandler
workerCount int // worker 数量 workerCount int // worker 数量
cancel context.CancelFunc // 用于停止 worker
} }
// ConsumerOption 消费者配置选项 // ConsumerOption 消费者配置选项
@@ -74,6 +75,9 @@ func NewConsumer(queue string, handler MessageHandler, opts ...ConsumerOption) *
// Start 启动消费者 // Start 启动消费者
func (c *Consumer) Start(ctx context.Context) error { func (c *Consumer) Start(ctx context.Context) error {
// 创建可取消的 context
workerCtx, cancel := context.WithCancel(ctx)
c.cancel = cancel
ch, err := GetChannel() ch, err := GetChannel()
if err != nil { if err != nil {
return err return err
@@ -108,7 +112,7 @@ func (c *Consumer) Start(ctx context.Context) error {
// 启动多个 worker // 启动多个 worker
for i := 0; i < c.workerCount; i++ { for i := 0; i < c.workerCount; i++ {
go c.worker(ctx, i, msgs) go c.worker(workerCtx, i, msgs)
} }
return nil return nil
@@ -118,29 +122,40 @@ func (c *Consumer) Start(ctx context.Context) error {
func (c *Consumer) worker(ctx context.Context, workerID int, msgs <-chan amqp.Delivery) { func (c *Consumer) worker(ctx context.Context, workerID int, msgs <-chan amqp.Delivery) {
g.Log().Debugf(ctx, "Worker %d 已启动", workerID) g.Log().Debugf(ctx, "Worker %d 已启动", workerID)
for msg := range msgs { for {
// 处理消息 select {
err := c.handler(ctx, msg.Body) case <-ctx.Done():
// Context 取消,退出
if err != nil { g.Log().Infof(ctx, "Worker %d 收到停止信号,正在退出", workerID)
g.Log().Errorf(ctx, "Worker %d 处理消息失败: %v", workerID, err) return
case msg, ok := <-msgs:
// 如果不是自动确认,需要手动 Nack if !ok {
if !c.autoAck { // Channel 关闭,退出
// requeue=false: 不重新入队,进入死信队列 g.Log().Infof(ctx, "Worker %d 消息通道已关闭,退出", workerID)
msg.Nack(false, false) return
}
} else {
// 处理成功,手动确认
if !c.autoAck {
msg.Ack(false)
} }
g.Log().Debugf(ctx, "Worker %d 处理消息成功", workerID) // 处理消息
err := c.handler(ctx, msg.Body)
if err != nil {
g.Log().Errorf(ctx, "Worker %d 处理消息失败: %v", workerID, err)
// 如果不是自动确认,需要手动 Nack
if !c.autoAck {
// requeue=false: 不重新入队,进入死信队列
msg.Nack(false, false)
}
} else {
// 处理成功,手动确认
if !c.autoAck {
msg.Ack(false)
}
g.Log().Debugf(ctx, "Worker %d 处理消息成功", workerID)
}
} }
} }
g.Log().Debugf(ctx, "Worker %d 已停止", workerID)
} }
// StartTypedConsumer 启动类型化消费者(自动反序列化) // StartTypedConsumer 启动类型化消费者(自动反序列化)
@@ -163,3 +178,12 @@ func StartTypedConsumer[T any](
consumer := NewConsumer(queue, wrappedHandler, opts...) consumer := NewConsumer(queue, wrappedHandler, opts...)
return consumer.Start(ctx) return consumer.Start(ctx)
} }
// Stop 停止消费者
func (c *Consumer) Stop(ctx context.Context) {
if c.cancel != nil {
g.Log().Infof(ctx, "正在停止消费者: queue=%s", c.queue)
c.cancel()
c.cancel = nil
}
}