From 70b8b7b1d0754a1892a010896a3604d823b350ec Mon Sep 17 00:00:00 2001 From: Cold <16419454+cold502@user.noreply.gitee.com> Date: Fri, 5 Dec 2025 12:18:04 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=94=B9=20redis=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- redis/redis.go | 404 +++++++++++++++++++++++++------------------------ 1 file changed, 206 insertions(+), 198 deletions(-) diff --git a/redis/redis.go b/redis/redis.go index 4e339dd..2cd7840 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -2,41 +2,24 @@ package redis import ( "context" - "strconv" + "strings" "time" + "github.com/gogf/gf/v2/database/gredis" "github.com/gogf/gf/v2/frame/g" - "github.com/redis/go-redis/v9" + "github.com/gogf/gf/v2/util/gconv" ) -var RedisClient *redis.Client +// GRedisClient GoFrame gredis 客户端,统一使用 +var GRedisClient *gredis.Redis + +// RedisClient GRedisClient 的别名,保持向后兼容 +var RedisClient *gredis.Redis func init() { - // 从 GoFrame 配置读取 Redis 配置 - ctx := context.Background() - - // 读取 Redis 配置 - addr := g.Cfg().MustGet(ctx, "redis.default.address").String() - - password := g.Cfg().MustGet(ctx, "redis.default.pass", "").String() - db := g.Cfg().MustGet(ctx, "redis.default.db", 0).Int() - - // 读取超时配置 - dialTimeout := g.Cfg().MustGet(ctx, "redis.default.dialTimeout", "30s").Duration() - readTimeout := g.Cfg().MustGet(ctx, "redis.default.readTimeout", "30s").Duration() - writeTimeout := g.Cfg().MustGet(ctx, "redis.default.writeTimeout", "30s").Duration() - - // 创建 Redis 客户端 - RedisClient = redis.NewClient(&redis.Options{ - Addr: addr, - Password: password, - DB: db, - DialTimeout: dialTimeout, - ReadTimeout: readTimeout, - WriteTimeout: writeTimeout, - // 不设置 Protocol(让 go-redis 自动协商) - // Protocol: 2, - }) + // 初始化 GoFrame gredis 客户端 + GRedisClient = g.Redis() + RedisClient = GRedisClient // 别名指向同一个客户端 } // Stream 和消费者组常量 @@ -56,21 +39,14 @@ type StreamMessage struct { } // InitStreamGroup 初始化 Stream 和消费者组 -// 在应用启动时调用一次,创建 Stream 和消费者组 -// 使用 GoFrame Do() 方法执行 XGROUP CREATE 命令 -// 参数: -// - streamKey: Stream 键名 -// - groupName: 消费者组名称 -// -// 返回:error 初始化失败时返回错误 +// 使用 gredis Do() 方法执行 XGROUP CREATE 命令 func InitStreamGroup(ctx context.Context, streamKey, groupName string) error { - // 使用 XGroupCreateMkStream 创建消费者组 - // 如果 Stream 不存在会自动创建 (MKSTREAM) - // "0": 从 Stream 开头开始消费 - err := RedisClient.XGroupCreateMkStream(ctx, streamKey, groupName, "0").Err() + // XGROUP CREATE streamKey groupName 0 MKSTREAM + _, err := GRedisClient.Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM") if err != nil { - // 如果组已存在,忽略 BUSYGROUP 错误 - if err.Error() == "BUSYGROUP Consumer Group name already exists" { + // 如果组已存在,忽略错误 + errStr := err.Error() + if strings.Contains(errStr, "BUSYGROUP") || strings.Contains(errStr, "already exists") { return nil } return err @@ -79,66 +55,94 @@ func InitStreamGroup(ctx context.Context, streamKey, groupName string) error { } // AddToStream 将消息添加到 Stream -// 用于 Controller 层将 RAGFlow 请求推入 Stream -// 参数: -// - streamKey: Stream 键名 -// - values: 消息内容(键值对) -// -// 返回: -// - string: 消息ID -// - error: 添加失败时返回错误 +// 使用 gredis Do() 方法执行 XADD 命令 func AddToStream(ctx context.Context, streamKey string, values map[string]interface{}) (string, error) { - // 使用 XAdd 添加消息到 Stream - messageID, err := RedisClient.XAdd(ctx, &redis.XAddArgs{ - Stream: streamKey, - Values: values, - }).Result() + // XADD streamKey * field1 value1 field2 value2 ... + args := []interface{}{streamKey, "*"} // "*" 自动生成ID + for key, val := range values { + args = append(args, key, val) + } + + result, err := GRedisClient.Do(ctx, "XADD", args...) if err != nil { return "", err } + + // 返回消息ID + messageID := result.String() return messageID, nil } // ReadFromStream 从 Stream 读取消息(消费者组模式) -// 后台 Goroutine 使用此方法从 Stream 中取出请求进行处理 -// 参数: -// - streamKey: Stream 键名 -// - groupName: 消费者组名称 -// - consumerName: 消费者名称(唯一标识) -// - count: 每次读取的消息数量 -// - blockMs: 阻塞时间(毫秒),0表示不阻塞 -// -// 返回: -// - []StreamMessage: 消息列表 -// - error: 读取失败时返回错误 +// 使用 gredis Do() 方法执行 XREADGROUP 命令 func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count int64, blockMs int64) ([]StreamMessage, error) { - // 使用 XReadGroup 从消费者组读取消息 - // ">" 表示读取未被消费的新消息(只获取新消息) - // 如果使用 "0" 或其他 ID,则返回 Pending 消息(未确认的消息) - streams, err := RedisClient.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: groupName, - Consumer: consumerName, - Streams: []string{streamKey, ">"}, // Stream名称 + 起始ID - Count: count, - Block: time.Duration(blockMs) * time.Millisecond, - }).Result() + // XREADGROUP GROUP groupName consumerName COUNT count BLOCK blockMs STREAMS streamKey > + result, err := GRedisClient.Do(ctx, + "XREADGROUP", "GROUP", groupName, consumerName, + "COUNT", count, + "BLOCK", blockMs, + "STREAMS", streamKey, ">", + ) - // 处理错误:超时或没有数据时返回 redis.Nil if err != nil { - if err == redis.Nil { - // 超时或没有数据,返回空数组 - return []StreamMessage{}, nil - } return nil, err } - // 解析返回的消息 - var messages []StreamMessage - for _, stream := range streams { - for _, msg := range stream.Messages { + // 解析返回值 + // 格式: [[streamKey, [[msgID, [field1, value1, field2, value2, ...]], ...]]] + messages := []StreamMessage{} + + if result == nil { + // 超时或没有数据 + return messages, nil + } + + // 类型断言:result.Val() 返回 interface{} + streamsArray, ok := result.Val().([]interface{}) + if !ok || len(streamsArray) == 0 { + return messages, nil + } + + // 遍历每个 stream + for _, streamData := range streamsArray { + streamArray, ok := streamData.([]interface{}) + if !ok || len(streamArray) < 2 { + continue + } + + // streamArray[0] 是 streamKey, streamArray[1] 是消息数组 + messagesArray, ok := streamArray[1].([]interface{}) + if !ok { + continue + } + + // 解析每条消息 + for _, msgData := range messagesArray { + msgArray, ok := msgData.([]interface{}) + if !ok || len(msgArray) < 2 { + continue + } + + // msgArray[0] 是 ID, msgArray[1] 是字段数组 + msgID := gconv.String(msgArray[0]) + fieldsArray, ok := msgArray[1].([]interface{}) + if !ok { + continue + } + + // 解析字段为 map + values := make(map[string]interface{}) + for i := 0; i < len(fieldsArray); i += 2 { + if i+1 < len(fieldsArray) { + key := gconv.String(fieldsArray[i]) + val := fieldsArray[i+1] + values[key] = val + } + } + messages = append(messages, StreamMessage{ - ID: msg.ID, - Values: msg.Values, + ID: msgID, + Values: values, }) } } @@ -147,101 +151,125 @@ func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName stri } // AckMessage 确认消息已处理 -// 处理完消息后必须调用此方法确认,否则消息会保留在 Pending List (PEL) -// 确认后消息会从 PEL 中移除 -// 参数: -// - streamKey: Stream 键名 -// - groupName: 消费者组名称 -// - messageIDs: 要确认的消息ID列表 -// -// 返回:error 确认失败时返回错误 +// 使用 gredis Do() 方法执行 XACK 命令 func AckMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error { - // 使用 XAck 确认消息 - // 返回值是成功确认的消息数量 - count, err := RedisClient.XAck(ctx, streamKey, groupName, messageIDs...).Result() - if err != nil { - return err + // XACK streamKey groupName messageID1 messageID2 ... + args := []interface{}{streamKey, groupName} + for _, id := range messageIDs { + args = append(args, id) } - // 可以检查 count 是否等于 len(messageIDs) - _ = count - return nil + + _, err := GRedisClient.Do(ctx, "XACK", args...) + return err } // GetStreamLength 获取 Stream 当前长度 -// 用于监控 Stream 消息积压情况 -// 参数: -// - streamKey: Stream 键名 -// -// 返回: -// - int64: Stream 中消息数量 -// - error: 操作失败时返回错误 +// 使用 gredis Do() 方法执行 XLEN 命令 func GetStreamLength(ctx context.Context, streamKey string) (int64, error) { - // 使用 XLen 获取 Stream 长度 - length, err := RedisClient.XLen(ctx, streamKey).Result() + // XLEN streamKey + result, err := GRedisClient.Do(ctx, "XLEN", streamKey) if err != nil { return 0, err } + + length := gconv.Int64(result) return length, nil } -// GetPendingMessages 获取待处理消息(未确认的消息) -// 用于监控和重试失败的消息 -// 参数: -// - streamKey: Stream 键名 -// - groupName: 消费者组名称 -// - start: 起始ID,"-" 表示最小ID -// - end: 结束ID,"+" 表示最大ID -// - count: 返回数量 -// -// 返回: -// - []redis.XPendingExt: Pending 消息列表 -// - error: 操作失败时返回错误 -func GetPendingMessages(ctx context.Context, streamKey, groupName string, start, end string, count int64) ([]redis.XPendingExt, error) { - // 使用 XPendingExt 获取详细的 Pending 消息 - pending, err := RedisClient.XPendingExt(ctx, &redis.XPendingExtArgs{ - Stream: streamKey, - Group: groupName, - Start: start, - End: end, - Count: count, - }).Result() +// PendingMessage Pending 消息结构 +type PendingMessage struct { + ID string // 消息ID + Consumer string // 消费者名称 + Idle int64 // 空闲时间(毫秒) + RetryCount int64 // 重试次数 +} + +// GetPendingMessages 获取待处理消息 +// 使用 gredis Do() 方法执行 XPENDING 命令 +func GetPendingMessages(ctx context.Context, streamKey, groupName string, start, end string, count int64) ([]PendingMessage, error) { + // XPENDING streamKey groupName start end count + result, err := GRedisClient.Do(ctx, "XPENDING", streamKey, groupName, start, end, count) if err != nil { return nil, err } - return pending, nil + + if result == nil { + return []PendingMessage{}, nil + } + + // 解析返回值:[[ID, consumer, idle, retryCount], ...] + pendingArray, ok := result.Val().([]interface{}) + if !ok { + return []PendingMessage{}, nil + } + + var messages []PendingMessage + for _, item := range pendingArray { + itemArray, ok := item.([]interface{}) + if !ok || len(itemArray) < 4 { + continue + } + + messages = append(messages, PendingMessage{ + ID: gconv.String(itemArray[0]), + Consumer: gconv.String(itemArray[1]), + Idle: gconv.Int64(itemArray[2]), + RetryCount: gconv.Int64(itemArray[3]), + }) + } + + return messages, nil } // ClaimPendingMessage 认领超时的 Pending 消息 -// 当某个消费者故障后,其他消费者可以认领其未完成的消息 -// 参数: -// - streamKey: Stream 键名 -// - groupName: 消费者组名称 -// - consumerName: 新消费者名称 -// - minIdleTime: 消息空闲时间(毫秒),超过此时间才能被认领 -// - messageIDs: 要认领的消息ID列表 -// -// 返回: -// - []StreamMessage: 认领的消息列表 -// - error: 操作失败时返回错误 +// 使用 gredis Do() 方法执行 XCLAIM 命令 func ClaimPendingMessage(ctx context.Context, streamKey, groupName, consumerName string, minIdleTime int64, messageIDs ...string) ([]StreamMessage, error) { - // 使用 XClaim 认领消息 - msgs, err := RedisClient.XClaim(ctx, &redis.XClaimArgs{ - Stream: streamKey, - Group: groupName, - Consumer: consumerName, - MinIdle: time.Duration(minIdleTime) * time.Millisecond, - Messages: messageIDs, - }).Result() + // XCLAIM streamKey groupName consumerName minIdleTime messageID1 messageID2 ... + args := []interface{}{streamKey, groupName, consumerName, minIdleTime} + for _, id := range messageIDs { + args = append(args, id) + } + + result, err := GRedisClient.Do(ctx, "XCLAIM", args...) if err != nil { return nil, err } - // 转换为 StreamMessage + if result == nil { + return []StreamMessage{}, nil + } + + // 解析返回值:类似 XREADGROUP + messagesArray, ok := result.Val().([]interface{}) + if !ok { + return []StreamMessage{}, nil + } + var messages []StreamMessage - for _, msg := range msgs { + for _, msgData := range messagesArray { + msgArray, ok := msgData.([]interface{}) + if !ok || len(msgArray) < 2 { + continue + } + + msgID := gconv.String(msgArray[0]) + fieldsArray, ok := msgArray[1].([]interface{}) + if !ok { + continue + } + + values := make(map[string]interface{}) + for i := 0; i < len(fieldsArray); i += 2 { + if i+1 < len(fieldsArray) { + key := gconv.String(fieldsArray[i]) + val := fieldsArray[i+1] + values[key] = val + } + } + messages = append(messages, StreamMessage{ - ID: msg.ID, - Values: msg.Values, + ID: msgID, + Values: values, }) } @@ -249,42 +277,30 @@ func ClaimPendingMessage(ctx context.Context, streamKey, groupName, consumerName } // SetSessionLastActive 设置用户最后活跃时间 -// 用于控制是否发送追问:用户回复后更新活跃时间,避免重复追问 -// 过期时间:2小时,超过2小时未活跃的记录会自动删除 -// 参数: -// - userId: 用户ID -// -// 返回:error 设置失败时返回错误 +// 使用 gredis SetEX 方法 func SetSessionLastActive(ctx context.Context, userId string) error { key := SessionLastActiveKeyPrefix + userId + ":last_active" timestamp := time.Now().Unix() - // 设置过期时间为 2 小时 - return RedisClient.Set(ctx, key, timestamp, 2*time.Hour).Err() + // SETEX key 7200 value (7200秒 = 2小时) + _, err := GRedisClient.Do(ctx, "SETEX", key, 7200, timestamp) + return err } // GetSessionLastActive 获取用户最后活跃时间 -// 参数: -// - userId: 用户ID -// -// 返回: -// - int64: Unix时间戳,未找到返回0 -// - error: 操作失败时返回错误 +// 使用 gredis Get 方法 func GetSessionLastActive(ctx context.Context, userId string) (int64, error) { key := SessionLastActiveKeyPrefix + userId + ":last_active" - result, err := RedisClient.Get(ctx, key).Result() - if err == redis.Nil { - return 0, nil // 未找到返回 0 - } + result, err := GRedisClient.Get(ctx, key) if err != nil { return 0, err } - // 将字符串转换为 int64 - timestamp, err := strconv.ParseInt(result, 10, 64) - if err != nil { - return 0, err + if result.IsEmpty() { + return 0, nil } + + timestamp := gconv.Int64(result.Val()) return timestamp, nil } @@ -312,35 +328,27 @@ func IsUserActive(ctx context.Context, userId string, seconds int64) (bool, erro } // SetSessionCache 缓存用户的 RAGFlow Session ID -// 避免每次请求都创建新 Session,提高性能 -// 过期时间:7天,超过7天未使用的Session会自动清理 -// 参数: -// - userId: 用户ID -// - sessionId: RAGFlow返回的Session ID -// -// 返回:error 设置失败时返回错误 +// 使用 gredis SetEX 方法 func SetSessionCache(ctx context.Context, userId, sessionId string) error { key := SessionLastActiveKeyPrefix + userId + ":session_id" - return RedisClient.Set(ctx, key, sessionId, 7*24*time.Hour).Err() + + // SETEX key 604800 value (604800秒 = 7天) + _, err := GRedisClient.Do(ctx, "SETEX", key, 604800, sessionId) + return err } // GetSessionCache 获取缓存的 RAGFlow Session ID -// 如果缓存中存在则直接使用,不存在则需要创建新Session -// 参数: -// - userId: 用户ID -// -// 返回: -// - string: Session ID,未找到返回空字符串 -// - error: 操作失败时返回错误 +// 使用 gredis Get 方法 func GetSessionCache(ctx context.Context, userId string) (string, error) { key := SessionLastActiveKeyPrefix + userId + ":session_id" - result, err := RedisClient.Get(ctx, key).Result() - if err == redis.Nil { - return "", nil // 未找到返回空字符串 - } + result, err := GRedisClient.Get(ctx, key) if err != nil { return "", err } - return result, nil + if result.IsEmpty() { + return "", nil + } + + return result.String(), nil }