Files
common/redis/redis.go

355 lines
9.4 KiB
Go
Raw Normal View History

2025-12-03 13:58:37 +08:00
package redis
import (
2025-12-04 17:39:31 +08:00
"context"
2025-12-05 12:18:04 +08:00
"strings"
2025-12-04 17:39:31 +08:00
2025-12-05 12:18:04 +08:00
"github.com/gogf/gf/v2/database/gredis"
2025-12-03 13:58:37 +08:00
"github.com/gogf/gf/v2/frame/g"
2025-12-06 09:36:10 +08:00
"github.com/gogf/gf/v2/os/gtime"
2025-12-05 12:18:04 +08:00
"github.com/gogf/gf/v2/util/gconv"
2025-12-03 13:58:37 +08:00
)
2025-12-05 12:18:04 +08:00
// GRedisClient GoFrame gredis 客户端,统一使用
var GRedisClient *gredis.Redis
// RedisClient GRedisClient 的别名,保持向后兼容
var RedisClient *gredis.Redis
2025-12-03 13:58:37 +08:00
func init() {
2025-12-05 12:18:04 +08:00
// 初始化 GoFrame gredis 客户端
GRedisClient = g.Redis()
RedisClient = GRedisClient // 别名指向同一个客户端
2025-12-03 13:58:37 +08:00
}
2025-12-04 17:39:31 +08:00
2025-12-05 11:44:07 +08:00
// Stream 和消费者组常量
2025-12-04 17:39:31 +08:00
const (
2025-12-05 11:44:07 +08:00
// RAGFlow 请求 Stream Key
RAGFlowRequestStreamKey = "ragflow:request:stream"
// RAGFlow 消费者组名称
RAGFlowConsumerGroup = "ragflow:consumer:group"
2025-12-04 17:39:31 +08:00
// 会话最后活跃时间 Key 前缀
SessionLastActiveKeyPrefix = "ragflow:session:"
)
2025-12-05 11:44:07 +08:00
// StreamMessage Redis Stream 消息结构
type StreamMessage struct {
ID string // 消息ID自动生成
Values map[string]interface{} // 消息内容
}
// InitStreamGroup 初始化 Stream 和消费者组
2025-12-05 12:18:04 +08:00
// 使用 gredis Do() 方法执行 XGROUP CREATE 命令
2025-12-05 11:44:07 +08:00
func InitStreamGroup(ctx context.Context, streamKey, groupName string) error {
2025-12-05 12:18:04 +08:00
// XGROUP CREATE streamKey groupName 0 MKSTREAM
_, err := GRedisClient.Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM")
2025-12-05 11:44:07 +08:00
if err != nil {
2025-12-05 12:18:04 +08:00
// 如果组已存在,忽略错误
errStr := err.Error()
if strings.Contains(errStr, "BUSYGROUP") || strings.Contains(errStr, "already exists") {
2025-12-05 11:44:07 +08:00
return nil
}
return err
}
return nil
2025-12-04 17:39:31 +08:00
}
2025-12-05 11:44:07 +08:00
// AddToStream 将消息添加到 Stream
2025-12-05 12:18:04 +08:00
// 使用 gredis Do() 方法执行 XADD 命令
2025-12-05 11:44:07 +08:00
func AddToStream(ctx context.Context, streamKey string, values map[string]interface{}) (string, error) {
2025-12-05 12:18:04 +08:00
// XADD streamKey * field1 value1 field2 value2 ...
args := []interface{}{streamKey, "*"} // "*" 自动生成ID
for key, val := range values {
args = append(args, key, val)
}
result, err := GRedisClient.Do(ctx, "XADD", args...)
2025-12-04 17:39:31 +08:00
if err != nil {
return "", err
}
2025-12-05 12:18:04 +08:00
// 返回消息ID
messageID := result.String()
2025-12-05 11:44:07 +08:00
return messageID, nil
}
2025-12-04 17:39:31 +08:00
2025-12-05 11:44:07 +08:00
// ReadFromStream 从 Stream 读取消息(消费者组模式)
2025-12-05 12:18:04 +08:00
// 使用 gredis Do() 方法执行 XREADGROUP 命令
2025-12-05 11:44:07 +08:00
func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count int64, blockMs int64) ([]StreamMessage, error) {
2025-12-05 12:18:04 +08:00
// XREADGROUP GROUP groupName consumerName COUNT count BLOCK blockMs STREAMS streamKey >
result, err := GRedisClient.Do(ctx,
"XREADGROUP", "GROUP", groupName, consumerName,
"COUNT", count,
"BLOCK", blockMs,
"STREAMS", streamKey, ">",
)
2025-12-05 11:44:07 +08:00
if err != nil {
return nil, err
2025-12-04 17:39:31 +08:00
}
2025-12-05 12:18:04 +08:00
// 解析返回值
// 格式: [[streamKey, [[msgID, [field1, value1, field2, value2, ...]], ...]]]
messages := []StreamMessage{}
if result == nil {
// 超时或没有数据
return messages, nil
}
// 类型断言result.Val() 返回 interface{}
streamsArray, ok := result.Val().([]interface{})
if !ok || len(streamsArray) == 0 {
return messages, nil
}
// 遍历每个 stream
for _, streamData := range streamsArray {
streamArray, ok := streamData.([]interface{})
if !ok || len(streamArray) < 2 {
continue
}
// streamArray[0] 是 streamKey, streamArray[1] 是消息数组
messagesArray, ok := streamArray[1].([]interface{})
if !ok {
continue
}
// 解析每条消息
for _, msgData := range messagesArray {
msgArray, ok := msgData.([]interface{})
if !ok || len(msgArray) < 2 {
continue
}
// msgArray[0] 是 ID, msgArray[1] 是字段数组
msgID := gconv.String(msgArray[0])
fieldsArray, ok := msgArray[1].([]interface{})
if !ok {
continue
}
// 解析字段为 map
values := make(map[string]interface{})
for i := 0; i < len(fieldsArray); i += 2 {
if i+1 < len(fieldsArray) {
key := gconv.String(fieldsArray[i])
val := fieldsArray[i+1]
values[key] = val
}
}
2025-12-05 11:44:07 +08:00
messages = append(messages, StreamMessage{
2025-12-05 12:18:04 +08:00
ID: msgID,
Values: values,
2025-12-05 11:44:07 +08:00
})
}
2025-12-04 17:39:31 +08:00
}
2025-12-05 11:44:07 +08:00
return messages, nil
}
// AckMessage 确认消息已处理
2025-12-05 12:18:04 +08:00
// 使用 gredis Do() 方法执行 XACK 命令
2025-12-05 11:44:07 +08:00
func AckMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error {
2025-12-05 12:18:04 +08:00
// XACK streamKey groupName messageID1 messageID2 ...
args := []interface{}{streamKey, groupName}
for _, id := range messageIDs {
args = append(args, id)
2025-12-05 11:44:07 +08:00
}
2025-12-05 12:18:04 +08:00
_, err := GRedisClient.Do(ctx, "XACK", args...)
return err
2025-12-04 17:39:31 +08:00
}
2025-12-05 11:44:07 +08:00
// GetStreamLength 获取 Stream 当前长度
2025-12-05 12:18:04 +08:00
// 使用 gredis Do() 方法执行 XLEN 命令
2025-12-05 11:44:07 +08:00
func GetStreamLength(ctx context.Context, streamKey string) (int64, error) {
2025-12-05 12:18:04 +08:00
// XLEN streamKey
result, err := GRedisClient.Do(ctx, "XLEN", streamKey)
2025-12-04 17:39:31 +08:00
if err != nil {
return 0, err
}
2025-12-05 12:18:04 +08:00
length := gconv.Int64(result)
2025-12-05 11:44:07 +08:00
return length, nil
}
2025-12-05 12:18:04 +08:00
// PendingMessage Pending 消息结构
type PendingMessage struct {
ID string // 消息ID
Consumer string // 消费者名称
Idle int64 // 空闲时间(毫秒)
RetryCount int64 // 重试次数
}
// GetPendingMessages 获取待处理消息
// 使用 gredis Do() 方法执行 XPENDING 命令
func GetPendingMessages(ctx context.Context, streamKey, groupName string, start, end string, count int64) ([]PendingMessage, error) {
// XPENDING streamKey groupName start end count
result, err := GRedisClient.Do(ctx, "XPENDING", streamKey, groupName, start, end, count)
2025-12-05 11:44:07 +08:00
if err != nil {
return nil, err
}
2025-12-05 12:18:04 +08:00
if result == nil {
return []PendingMessage{}, nil
}
// 解析返回值:[[ID, consumer, idle, retryCount], ...]
pendingArray, ok := result.Val().([]interface{})
if !ok {
return []PendingMessage{}, nil
}
var messages []PendingMessage
for _, item := range pendingArray {
itemArray, ok := item.([]interface{})
if !ok || len(itemArray) < 4 {
continue
}
messages = append(messages, PendingMessage{
ID: gconv.String(itemArray[0]),
Consumer: gconv.String(itemArray[1]),
Idle: gconv.Int64(itemArray[2]),
RetryCount: gconv.Int64(itemArray[3]),
})
}
return messages, nil
2025-12-05 11:44:07 +08:00
}
// ClaimPendingMessage 认领超时的 Pending 消息
2025-12-05 12:18:04 +08:00
// 使用 gredis Do() 方法执行 XCLAIM 命令
2025-12-05 11:44:07 +08:00
func ClaimPendingMessage(ctx context.Context, streamKey, groupName, consumerName string, minIdleTime int64, messageIDs ...string) ([]StreamMessage, error) {
2025-12-05 12:18:04 +08:00
// XCLAIM streamKey groupName consumerName minIdleTime messageID1 messageID2 ...
args := []interface{}{streamKey, groupName, consumerName, minIdleTime}
for _, id := range messageIDs {
args = append(args, id)
}
result, err := GRedisClient.Do(ctx, "XCLAIM", args...)
2025-12-05 11:44:07 +08:00
if err != nil {
return nil, err
}
2025-12-05 12:18:04 +08:00
if result == nil {
return []StreamMessage{}, nil
}
// 解析返回值:类似 XREADGROUP
messagesArray, ok := result.Val().([]interface{})
if !ok {
return []StreamMessage{}, nil
}
2025-12-05 11:44:07 +08:00
var messages []StreamMessage
2025-12-05 12:18:04 +08:00
for _, msgData := range messagesArray {
msgArray, ok := msgData.([]interface{})
if !ok || len(msgArray) < 2 {
continue
}
msgID := gconv.String(msgArray[0])
fieldsArray, ok := msgArray[1].([]interface{})
if !ok {
continue
}
values := make(map[string]interface{})
for i := 0; i < len(fieldsArray); i += 2 {
if i+1 < len(fieldsArray) {
key := gconv.String(fieldsArray[i])
val := fieldsArray[i+1]
values[key] = val
}
}
2025-12-05 11:44:07 +08:00
messages = append(messages, StreamMessage{
2025-12-05 12:18:04 +08:00
ID: msgID,
Values: values,
2025-12-05 11:44:07 +08:00
})
}
return messages, nil
2025-12-04 17:39:31 +08:00
}
// SetSessionLastActive 设置用户最后活跃时间
2025-12-05 12:18:04 +08:00
// 使用 gredis SetEX 方法
2025-12-04 17:39:31 +08:00
func SetSessionLastActive(ctx context.Context, userId string) error {
key := SessionLastActiveKeyPrefix + userId + ":last_active"
2025-12-06 09:36:10 +08:00
timestamp := gtime.Now().Timestamp()
2025-12-04 17:39:31 +08:00
2025-12-05 12:18:04 +08:00
// SETEX key 7200 value (7200秒 = 2小时)
_, err := GRedisClient.Do(ctx, "SETEX", key, 7200, timestamp)
return err
2025-12-04 17:39:31 +08:00
}
// GetSessionLastActive 获取用户最后活跃时间
2025-12-05 12:18:04 +08:00
// 使用 gredis Get 方法
2025-12-04 17:39:31 +08:00
func GetSessionLastActive(ctx context.Context, userId string) (int64, error) {
key := SessionLastActiveKeyPrefix + userId + ":last_active"
2025-12-05 12:18:04 +08:00
result, err := GRedisClient.Get(ctx, key)
2025-12-04 17:39:31 +08:00
if err != nil {
return 0, err
}
2025-12-05 12:18:04 +08:00
if result.IsEmpty() {
return 0, nil
2025-12-04 17:39:31 +08:00
}
2025-12-05 12:18:04 +08:00
timestamp := gconv.Int64(result.Val())
2025-12-05 11:44:07 +08:00
return timestamp, nil
2025-12-04 17:39:31 +08:00
}
// IsUserActive 检查用户是否在指定时间范围内活跃过
// 用于追问逻辑:如果用户最近活跃过,则不发送追问消息
// 参数:
// - userId: 用户ID
// - seconds: 时间范围例如传入300表示检查5分钟内是否活跃
//
// 返回:
// - bool: true表示用户在指定时间内活跃过
// - error: 操作失败时返回错误
func IsUserActive(ctx context.Context, userId string, seconds int64) (bool, error) {
lastActive, err := GetSessionLastActive(ctx, userId)
if err != nil {
return false, err
}
if lastActive == 0 {
return false, nil // 未找到记录,视为不活跃
}
2025-12-06 09:36:10 +08:00
now := gtime.Now().Timestamp()
2025-12-04 17:39:31 +08:00
return (now - lastActive) < seconds, nil
}
// SetSessionCache 缓存用户的 RAGFlow Session ID
2025-12-05 12:18:04 +08:00
// 使用 gredis SetEX 方法
2025-12-04 17:39:31 +08:00
func SetSessionCache(ctx context.Context, userId, sessionId string) error {
key := SessionLastActiveKeyPrefix + userId + ":session_id"
2025-12-05 12:18:04 +08:00
// SETEX key 604800 value (604800秒 = 7天)
_, err := GRedisClient.Do(ctx, "SETEX", key, 604800, sessionId)
return err
2025-12-04 17:39:31 +08:00
}
// GetSessionCache 获取缓存的 RAGFlow Session ID
2025-12-05 12:18:04 +08:00
// 使用 gredis Get 方法
2025-12-04 17:39:31 +08:00
func GetSessionCache(ctx context.Context, userId string) (string, error) {
key := SessionLastActiveKeyPrefix + userId + ":session_id"
2025-12-05 12:18:04 +08:00
result, err := GRedisClient.Get(ctx, key)
2025-12-04 17:39:31 +08:00
if err != nil {
return "", err
}
2025-12-05 12:18:04 +08:00
if result.IsEmpty() {
return "", nil
}
return result.String(), nil
2025-12-04 17:39:31 +08:00
}