package message import ( "context" "errors" "strings" "time" "github.com/gogf/gf/v2/database/gredis" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/util/gconv" ) // StreamMessage Redis Stream 消息结构 type StreamMessage struct { ID string // 消息ID(自动生成) Values map[string]interface{} // 消息内容 } // getClient 获取 Redis 客户端 func getRedisClient() *gredis.Redis { return g.Redis() } // getClient 获取 Redis 客户端 func getRedisClientTest(name string) *gredis.Redis { return g.Redis(name) } // getRedisClientByDB 根据DB获取Redis客户端,如果db<=0则返回默认客户端 func getRedisClientByDB(db int) *gredis.Redis { if db <= 0 { return g.Redis() } // 创建连接到指定DB的Redis客户端 client, err := gredis.New(&gredis.Config{ Address: g.Cfg().MustGet(context.Background(), "redis.default.address").String(), Db: db, }) if err != nil { glog.Errorf(context.Background(), "创建Redis客户端失败: %v", err) return g.Redis() } return client } // lock 分布式锁 func lock(ctx context.Context, key string, expireSeconds int64, fn func(ctx context.Context) error) (success bool, err error) { limit := 3 LOOP: if limit < 0 { return false, errors.New("锁重试次数耗尽") } limit-- if val, err := getRedisClient().Set(ctx, key, true, gredis.SetOption{ TTLOption: gredis.TTLOption{ EX: &expireSeconds, }, NX: true, }); err != nil { return false, err } else { if val.Bool() { defer func(RedisClient *gredis.Redis, ctx context.Context, key string) { if _, err = RedisClient.Del(ctx, key); err != nil { glog.Errorf(ctx, "RedisClient.Del error: %v", err) } }(getRedisClient(), ctx, key) if err = fn(ctx); err != nil { return false, err } return true, nil } else { time.Sleep(time.Second) goto LOOP } } } // publishToRedis 将消息添加到 Redis Stream func publishToRedis(ctx context.Context, streamKey string, msg interface{}) (messageID string, err error) { values := gconv.Map(msg) args := make([]interface{}, 0, len(values)*2+2) args = append(args, streamKey, "*") for key, val := range values { args = append(args, key, val) } result, err := getRedisClient().Do(ctx, "XADD", args...) if err != nil { return } messageID = result.String() return } // initStreamGroup 初始化 Stream 和消费者组 func initStreamGroup(ctx context.Context, streamKey, groupName string) error { _, err := getRedisClient().Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM") if err != nil { // 如果组已存在,忽略错误 errStr := err.Error() // 检查错误是否是 "BUSYGROUP Consumer Group name already exists" if strings.Contains(errStr, "BUSYGROUP") || strings.Contains(errStr, "already exists") { // 这是一个预期的情况,说明消费者组已经存在,无需处理 return nil } // 这是一个真正的错误,需要记录或处理 return err } return nil } // readFromStream 从 Stream 读取消息 func readFromStream(ctx context.Context, msg QueueMessage) error { // 初始化 Stream 和消费者组 if err := initStreamGroup(ctx, msg.StreamKey, msg.GroupName); err != nil { return err } go func() { RECONNECT: for { result, err := getRedisClient().Do(ctx, "XREADGROUP", "GROUP", msg.GroupName, msg.ConsumerName, "COUNT", msg.BatchSize, "BLOCK", 0, "STREAMS", msg.StreamKey, ">") if err != nil { //select { //case <-ctx.Done(): // return //} time.Sleep(time.Second) goto RECONNECT } // 检查返回结果是否为空 if result == nil || result.IsEmpty() { continue } messages := make([]StreamMessage, 0, int(msg.BatchSize)) // 尝试 map 格式(GoFrame gredis 返回) if streamsMap, ok := result.Val().(map[interface{}]interface{}); ok { for _, streamMsgs := range streamsMap { msgsArray, ok := streamMsgs.([]interface{}) if !ok { continue } for _, msgData := range msgsArray { msgArray, ok := msgData.([]interface{}) if !ok || len(msgArray) < 2 { continue } msgID := gconv.String(msgArray[0]) fieldsArray, ok := msgArray[1].([]interface{}) if !ok { continue } values := make(map[string]interface{}, len(fieldsArray)/2) for i := 0; i < len(fieldsArray); i += 2 { if i+1 < len(fieldsArray) { key := gconv.String(fieldsArray[i]) values[key] = fieldsArray[i+1] } } messages = append(messages, StreamMessage{ ID: msgID, Values: values, }) } } } // 尝试数组格式(标准 Redis 返回) if streamsArray, ok := result.Val().([]interface{}); ok && len(streamsArray) > 0 { for _, streamData := range streamsArray { streamArray, ok := streamData.([]interface{}) if !ok || len(streamArray) < 2 { continue } messagesArray, ok := streamArray[1].([]interface{}) if !ok { continue } for _, msgData := range messagesArray { msgArray, ok := msgData.([]interface{}) if !ok || len(msgArray) < 2 { continue } msgID := gconv.String(msgArray[0]) fieldsArray, ok := msgArray[1].([]interface{}) if !ok { continue } values := make(map[string]interface{}, len(fieldsArray)/2) for i := 0; i < len(fieldsArray); i += 2 { if i+1 < len(fieldsArray) { key := gconv.String(fieldsArray[i]) values[key] = fieldsArray[i+1] } } messages = append(messages, StreamMessage{ ID: msgID, Values: values, }) } } } // 处理消息 for _, streamMsg := range messages { // 业务处理 if err := msg.HandleFunc(ctx, streamMsg.Values); err != nil { glog.Infof(ctx, "业务处理失败-> err:%v\n", err) continue } // 确认消息 if msg.AutoAck { err := ackMessage(ctx, msg.StreamKey, msg.GroupName, streamMsg.ID) if err != nil { glog.Infof(ctx, "消费者 '%s' 确认消息 ID %s 失败: %v\n", msg.ConsumerName, streamMsg.ID, err) } } } } }() return nil } // ackMessage 确认消息已处理 func ackMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error { args := make([]interface{}, 0, len(messageIDs)+2) args = append(args, streamKey, groupName) for _, id := range messageIDs { args = append(args, id) } _, err := getRedisClient().Do(ctx, "XACK", args...) return err }