Files
common/message/redis.go

233 lines
6.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package message
import (
"context"
"errors"
"strings"
"time"
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/util/gconv"
)
// StreamMessage Redis Stream 消息结构
type StreamMessage struct {
ID string // 消息ID自动生成
Values map[string]interface{} // 消息内容
}
// getClient 获取 Redis 客户端
func getRedisClient() *gredis.Redis {
return g.Redis()
}
// getClient 获取 Redis 客户端
func getRedisClientTest(name string) *gredis.Redis {
return g.Redis(name)
}
// getRedisClientByDB 根据DB获取Redis客户端如果db<=0则返回默认客户端
func getRedisClientByDB(db int) *gredis.Redis {
if db <= 0 {
return g.Redis()
}
// 创建连接到指定DB的Redis客户端
client, err := gredis.New(&gredis.Config{
Address: g.Cfg().MustGet(context.Background(), "redis.default.address").String(),
Db: db,
})
if err != nil {
glog.Errorf(context.Background(), "创建Redis客户端失败: %v", err)
return g.Redis()
}
return client
}
// lock 分布式锁
func lock(ctx context.Context, key string, expireSeconds int64, fn func(ctx context.Context) error) (success bool, err error) {
limit := 3
LOOP:
if limit < 0 {
return false, errors.New("锁重试次数耗尽")
}
limit--
if val, err := getRedisClient().Set(ctx, key, true, gredis.SetOption{
TTLOption: gredis.TTLOption{
EX: &expireSeconds,
},
NX: true,
}); err != nil {
return false, err
} else {
if val.Bool() {
defer func(RedisClient *gredis.Redis, ctx context.Context, key string) {
if _, err = RedisClient.Del(ctx, key); err != nil {
glog.Errorf(ctx, "RedisClient.Del error: %v", err)
}
}(getRedisClient(), ctx, key)
if err = fn(ctx); err != nil {
return false, err
}
return true, nil
} else {
time.Sleep(time.Second)
goto LOOP
}
}
}
// publishToRedis 将消息添加到 Redis Stream
func publishToRedis(ctx context.Context, streamKey string, msg interface{}) (messageID string, err error) {
values := gconv.Map(msg)
args := make([]interface{}, 0, len(values)*2+2)
args = append(args, streamKey, "*")
for key, val := range values {
args = append(args, key, val)
}
result, err := getRedisClient().Do(ctx, "XADD", args...)
if err != nil {
return
}
messageID = result.String()
return
}
// initStreamGroup 初始化 Stream 和消费者组
func initStreamGroup(ctx context.Context, streamKey, groupName string) error {
_, err := getRedisClient().Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM")
if err != nil {
// 如果组已存在,忽略错误
errStr := err.Error()
// 检查错误是否是 "BUSYGROUP Consumer Group name already exists"
if strings.Contains(errStr, "BUSYGROUP") || strings.Contains(errStr, "already exists") {
// 这是一个预期的情况,说明消费者组已经存在,无需处理
return nil
}
// 这是一个真正的错误,需要记录或处理
return err
}
return nil
}
// readFromStream 从 Stream 读取消息
func readFromStream(ctx context.Context, msg QueueMessage) error {
// 初始化 Stream 和消费者组
if err := initStreamGroup(ctx, msg.StreamKey, msg.GroupName); err != nil {
return err
}
go func() {
RECONNECT:
for {
result, err := getRedisClient().Do(ctx, "XREADGROUP", "GROUP", msg.GroupName, msg.ConsumerName, "COUNT", msg.BatchSize, "BLOCK", 0, "STREAMS", msg.StreamKey, ">")
if err != nil {
//select {
//case <-ctx.Done():
// return
//}
time.Sleep(time.Second)
goto RECONNECT
}
// 检查返回结果是否为空
if result == nil || result.IsEmpty() {
continue
}
messages := make([]StreamMessage, 0, int(msg.BatchSize))
// 尝试 map 格式GoFrame gredis 返回)
if streamsMap, ok := result.Val().(map[interface{}]interface{}); ok {
for _, streamMsgs := range streamsMap {
msgsArray, ok := streamMsgs.([]interface{})
if !ok {
continue
}
for _, msgData := range msgsArray {
msgArray, ok := msgData.([]interface{})
if !ok || len(msgArray) < 2 {
continue
}
msgID := gconv.String(msgArray[0])
fieldsArray, ok := msgArray[1].([]interface{})
if !ok {
continue
}
values := make(map[string]interface{}, len(fieldsArray)/2)
for i := 0; i < len(fieldsArray); i += 2 {
if i+1 < len(fieldsArray) {
key := gconv.String(fieldsArray[i])
values[key] = fieldsArray[i+1]
}
}
messages = append(messages, StreamMessage{
ID: msgID,
Values: values,
})
}
}
}
// 尝试数组格式(标准 Redis 返回)
if streamsArray, ok := result.Val().([]interface{}); ok && len(streamsArray) > 0 {
for _, streamData := range streamsArray {
streamArray, ok := streamData.([]interface{})
if !ok || len(streamArray) < 2 {
continue
}
messagesArray, ok := streamArray[1].([]interface{})
if !ok {
continue
}
for _, msgData := range messagesArray {
msgArray, ok := msgData.([]interface{})
if !ok || len(msgArray) < 2 {
continue
}
msgID := gconv.String(msgArray[0])
fieldsArray, ok := msgArray[1].([]interface{})
if !ok {
continue
}
values := make(map[string]interface{}, len(fieldsArray)/2)
for i := 0; i < len(fieldsArray); i += 2 {
if i+1 < len(fieldsArray) {
key := gconv.String(fieldsArray[i])
values[key] = fieldsArray[i+1]
}
}
messages = append(messages, StreamMessage{
ID: msgID,
Values: values,
})
}
}
}
// 处理消息
for _, streamMsg := range messages {
// 业务处理
if err := msg.HandleFunc(ctx, streamMsg.Values); err != nil {
glog.Infof(ctx, "业务处理失败-> err:%v\n", err)
continue
}
// 确认消息
if msg.AutoAck {
err := ackMessage(ctx, msg.StreamKey, msg.GroupName, streamMsg.ID)
if err != nil {
glog.Infof(ctx, "消费者 '%s' 确认消息 ID %s 失败: %v\n", msg.ConsumerName, streamMsg.ID, err)
}
}
}
}
}()
return nil
}
// ackMessage 确认消息已处理
func ackMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error {
args := make([]interface{}, 0, len(messageIDs)+2)
args = append(args, streamKey, groupName)
for _, id := range messageIDs {
args = append(args, id)
}
_, err := getRedisClient().Do(ctx, "XACK", args...)
return err
}