Files
common/message/redis.go

233 lines
6.4 KiB
Go
Raw Normal View History

2026-01-09 10:19:31 +08:00
package message
import (
"context"
"errors"
"strings"
"time"
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/util/gconv"
)
// StreamMessage Redis Stream 消息结构
type StreamMessage struct {
ID string // 消息ID自动生成
Values map[string]interface{} // 消息内容
}
// getClient 获取 Redis 客户端
func getRedisClient() *gredis.Redis {
return g.Redis()
}
// getClient 获取 Redis 客户端
func getRedisClientTest(name string) *gredis.Redis {
return g.Redis(name)
}
// getRedisClientByDB 根据DB获取Redis客户端如果db<=0则返回默认客户端
func getRedisClientByDB(db int) *gredis.Redis {
if db <= 0 {
return g.Redis()
}
// 创建连接到指定DB的Redis客户端
client, err := gredis.New(&gredis.Config{
Address: g.Cfg().MustGet(context.Background(), "redis.default.address").String(),
Db: db,
})
if err != nil {
glog.Errorf(context.Background(), "创建Redis客户端失败: %v", err)
return g.Redis()
}
return client
}
// lock 分布式锁
func lock(ctx context.Context, key string, expireSeconds int64, fn func(ctx context.Context) error) (success bool, err error) {
limit := 3
LOOP:
if limit < 0 {
return false, errors.New("锁重试次数耗尽")
}
limit--
if val, err := getRedisClient().Set(ctx, key, true, gredis.SetOption{
TTLOption: gredis.TTLOption{
EX: &expireSeconds,
},
NX: true,
}); err != nil {
return false, err
} else {
if val.Bool() {
defer func(RedisClient *gredis.Redis, ctx context.Context, key string) {
if _, err = RedisClient.Del(ctx, key); err != nil {
glog.Errorf(ctx, "RedisClient.Del error: %v", err)
}
}(getRedisClient(), ctx, key)
if err = fn(ctx); err != nil {
return false, err
}
return true, nil
} else {
time.Sleep(time.Second)
goto LOOP
}
}
}
// publishToRedis 将消息添加到 Redis Stream
func publishToRedis(ctx context.Context, streamKey string, msg interface{}) (messageID string, err error) {
values := gconv.Map(msg)
args := make([]interface{}, 0, len(values)*2+2)
args = append(args, streamKey, "*")
for key, val := range values {
args = append(args, key, val)
}
result, err := getRedisClient().Do(ctx, "XADD", args...)
if err != nil {
return
}
messageID = result.String()
return
}
// initStreamGroup 初始化 Stream 和消费者组
func initStreamGroup(ctx context.Context, streamKey, groupName string) error {
_, err := getRedisClient().Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM")
if err != nil {
// 如果组已存在,忽略错误
errStr := err.Error()
// 检查错误是否是 "BUSYGROUP Consumer Group name already exists"
if strings.Contains(errStr, "BUSYGROUP") || strings.Contains(errStr, "already exists") {
// 这是一个预期的情况,说明消费者组已经存在,无需处理
return nil
}
// 这是一个真正的错误,需要记录或处理
return err
}
return nil
}
// readFromStream 从 Stream 读取消息
func readFromStream(ctx context.Context, msg QueueMessage) error {
// 初始化 Stream 和消费者组
if err := initStreamGroup(ctx, msg.StreamKey, msg.GroupName); err != nil {
return err
}
go func() {
RECONNECT:
for {
result, err := getRedisClient().Do(ctx, "XREADGROUP", "GROUP", msg.GroupName, msg.ConsumerName, "COUNT", msg.BatchSize, "BLOCK", 0, "STREAMS", msg.StreamKey, ">")
if err != nil {
//select {
//case <-ctx.Done():
// return
//}
2026-01-09 10:19:31 +08:00
time.Sleep(time.Second)
goto RECONNECT
}
// 检查返回结果是否为空
if result == nil || result.IsEmpty() {
continue
}
messages := make([]StreamMessage, 0, int(msg.BatchSize))
// 尝试 map 格式GoFrame gredis 返回)
if streamsMap, ok := result.Val().(map[interface{}]interface{}); ok {
for _, streamMsgs := range streamsMap {
msgsArray, ok := streamMsgs.([]interface{})
if !ok {
continue
}
for _, msgData := range msgsArray {
msgArray, ok := msgData.([]interface{})
if !ok || len(msgArray) < 2 {
continue
}
msgID := gconv.String(msgArray[0])
fieldsArray, ok := msgArray[1].([]interface{})
if !ok {
continue
}
values := make(map[string]interface{}, len(fieldsArray)/2)
for i := 0; i < len(fieldsArray); i += 2 {
if i+1 < len(fieldsArray) {
key := gconv.String(fieldsArray[i])
values[key] = fieldsArray[i+1]
}
}
messages = append(messages, StreamMessage{
ID: msgID,
Values: values,
})
}
}
}
// 尝试数组格式(标准 Redis 返回)
if streamsArray, ok := result.Val().([]interface{}); ok && len(streamsArray) > 0 {
for _, streamData := range streamsArray {
streamArray, ok := streamData.([]interface{})
if !ok || len(streamArray) < 2 {
continue
}
messagesArray, ok := streamArray[1].([]interface{})
if !ok {
continue
}
for _, msgData := range messagesArray {
msgArray, ok := msgData.([]interface{})
if !ok || len(msgArray) < 2 {
continue
}
msgID := gconv.String(msgArray[0])
fieldsArray, ok := msgArray[1].([]interface{})
if !ok {
continue
}
values := make(map[string]interface{}, len(fieldsArray)/2)
for i := 0; i < len(fieldsArray); i += 2 {
if i+1 < len(fieldsArray) {
key := gconv.String(fieldsArray[i])
values[key] = fieldsArray[i+1]
}
}
messages = append(messages, StreamMessage{
ID: msgID,
Values: values,
})
}
}
}
// 处理消息
for _, streamMsg := range messages {
// 业务处理
if err := msg.HandleFunc(ctx, streamMsg.Values); err != nil {
glog.Infof(ctx, "业务处理失败-> err:%v\n", err)
continue
}
// 确认消息
if msg.AutoAck {
err := ackMessage(ctx, msg.StreamKey, msg.GroupName, streamMsg.ID)
if err != nil {
glog.Infof(ctx, "消费者 '%s' 确认消息 ID %s 失败: %v\n", msg.ConsumerName, streamMsg.ID, err)
}
}
}
}
}()
return nil
}
// ackMessage 确认消息已处理
func ackMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error {
args := make([]interface{}, 0, len(messageIDs)+2)
args = append(args, streamKey, groupName)
for _, id := range messageIDs {
args = append(args, id)
}
_, err := getRedisClient().Do(ctx, "XACK", args...)
return err
}