diff --git a/redis/message.go b/redis/message.go index e7e6cb0..a20f019 100644 --- a/redis/message.go +++ b/redis/message.go @@ -8,7 +8,7 @@ type QueueMessage struct { ConsumerName string // 消费者名称 Timeout int64 // 阻塞超时时间(毫秒) BatchSize int64 // 最大并发数(信号量容量) - BlockMs int64 - Block bool + BlockMs int64 // 阻塞时间 + AutoAck bool //ACK确认,true自动确认,false手动确认 HandleFunc func(ctx context.Context, message map[string]interface{}) error } diff --git a/redis/redis.go b/redis/redis.go index c338e75..f1af3ac 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -3,7 +3,6 @@ package redis import ( "context" "errors" - "fmt" "strings" "sync" "time" @@ -73,16 +72,17 @@ LOOP: func GetReadStream(ctx context.Context, msg ...QueueMessage) error { for _, t := range msg { - err := GetReadFromStream(ctx, t.StreamKey, t.GroupName, t.ConsumerName, t.BatchSize, t.BlockMs, t.Block, t.HandleFunc) + err := GetReadFromStream(ctx, t.StreamKey, t.GroupName, t.ConsumerName, t.BatchSize, t.BlockMs, t.AutoAck, t.HandleFunc) if err != nil { - return err + glog.Infof(ctx, "读取ReadFromStream数据失败-> 键名: %s, 消费者组: %s, 消费者名称%v\n, 失败err:%v\n", t.StreamKey, t.GroupName, t.ConsumerName, err) + continue } } return nil } // GetReadFromStream 读取ReadFromStream数据 -func GetReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count, blockMs int64, Block bool, fn func(ctx context.Context, message map[string]interface{}) error) (err error) { +func GetReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count, blockMs int64, autoAck bool, fn func(ctx context.Context, message map[string]interface{}) error) (err error) { glog.Infof(ctx, "初始化 Stream: %s, 消费者组: %s", streamKey, groupName) err = InitStreamGroup(ctx, streamKey, groupName) if err != nil { @@ -97,13 +97,14 @@ func GetReadFromStream(ctx context.Context, streamKey, groupName, consumerName s } // 处理消息 for _, msg := range messages { - fmt.Printf("消费者 '%s' -> 接收到消息 ID: %s, 内容: %v\n", consumerName, msg.ID, msg.Values) + glog.Infof(ctx, "消费者 '%s' -> 接收到消息 ID: %s, 内容: %v\n", consumerName, msg.ID, msg.Values) // 业务处理 if err = fn(ctx, msg.Values); err != nil { - return err + glog.Infof(ctx, "业务处理失败-> err:%v\n", err) + continue } // 确认消息 (ACK) - if Block { + if autoAck { // 处理成功后,必须调用 XAck,否则消息会一直留在 PEL 中 err = AckMessage(ctx, streamKey, groupName, msg.ID) if err != nil { diff --git a/utils/utils.go b/utils/utils.go index b8ca4ca..c18874d 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -94,6 +94,8 @@ func GetUserInfo(ctx context.Context) (user do.User, err error) { user.TenantId = dataMap["tenantId"] } else { user.TenantId = ctx.Value("tenantId") + user.UserName = ctx.Value("userName") + fmt.Println("user.UserName==================", user.UserName) } if user.TenantId == nil { return user, gerror.New("租户信息为空")