From 38ae9edd54f229c2d12b4c5e675fbdb151d3ff47 Mon Sep 17 00:00:00 2001 From: qhd <1766646056@qq.com> Date: Wed, 31 Dec 2025 10:46:39 +0800 Subject: [PATCH] =?UTF-8?q?redis=E6=B6=88=E8=B4=B9=E9=98=9F=E5=88=97?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- redis/message.go | 4 ++-- redis/redis.go | 15 ++++++++------- utils/utils.go | 2 ++ 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/redis/message.go b/redis/message.go index e7e6cb0..a20f019 100644 --- a/redis/message.go +++ b/redis/message.go @@ -8,7 +8,7 @@ type QueueMessage struct { ConsumerName string // 消费者名称 Timeout int64 // 阻塞超时时间(毫秒) BatchSize int64 // 最大并发数(信号量容量) - BlockMs int64 - Block bool + BlockMs int64 // 阻塞时间 + AutoAck bool //ACK确认,true自动确认,false手动确认 HandleFunc func(ctx context.Context, message map[string]interface{}) error } diff --git a/redis/redis.go b/redis/redis.go index c338e75..f1af3ac 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -3,7 +3,6 @@ package redis import ( "context" "errors" - "fmt" "strings" "sync" "time" @@ -73,16 +72,17 @@ LOOP: func GetReadStream(ctx context.Context, msg ...QueueMessage) error { for _, t := range msg { - err := GetReadFromStream(ctx, t.StreamKey, t.GroupName, t.ConsumerName, t.BatchSize, t.BlockMs, t.Block, t.HandleFunc) + err := GetReadFromStream(ctx, t.StreamKey, t.GroupName, t.ConsumerName, t.BatchSize, t.BlockMs, t.AutoAck, t.HandleFunc) if err != nil { - return err + glog.Infof(ctx, "读取ReadFromStream数据失败-> 键名: %s, 消费者组: %s, 消费者名称%v\n, 失败err:%v\n", t.StreamKey, t.GroupName, t.ConsumerName, err) + continue } } return nil } // GetReadFromStream 读取ReadFromStream数据 -func GetReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count, blockMs int64, Block bool, fn func(ctx context.Context, message map[string]interface{}) error) (err error) { +func GetReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count, blockMs int64, autoAck bool, fn func(ctx context.Context, message map[string]interface{}) error) (err error) { glog.Infof(ctx, "初始化 Stream: %s, 消费者组: %s", streamKey, groupName) err = InitStreamGroup(ctx, streamKey, groupName) if err != nil { @@ -97,13 +97,14 @@ func GetReadFromStream(ctx context.Context, streamKey, groupName, consumerName s } // 处理消息 for _, msg := range messages { - fmt.Printf("消费者 '%s' -> 接收到消息 ID: %s, 内容: %v\n", consumerName, msg.ID, msg.Values) + glog.Infof(ctx, "消费者 '%s' -> 接收到消息 ID: %s, 内容: %v\n", consumerName, msg.ID, msg.Values) // 业务处理 if err = fn(ctx, msg.Values); err != nil { - return err + glog.Infof(ctx, "业务处理失败-> err:%v\n", err) + continue } // 确认消息 (ACK) - if Block { + if autoAck { // 处理成功后,必须调用 XAck,否则消息会一直留在 PEL 中 err = AckMessage(ctx, streamKey, groupName, msg.ID) if err != nil { diff --git a/utils/utils.go b/utils/utils.go index b8ca4ca..c18874d 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -94,6 +94,8 @@ func GetUserInfo(ctx context.Context) (user do.User, err error) { user.TenantId = dataMap["tenantId"] } else { user.TenantId = ctx.Value("tenantId") + user.UserName = ctx.Value("userName") + fmt.Println("user.UserName==================", user.UserName) } if user.TenantId == nil { return user, gerror.New("租户信息为空")