Files
common/redis/redis.go
2026-03-12 08:51:05 +08:00

450 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package redis
import (
"context"
"strings"
"sync"
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
)
var (
// redisClient 内部使用的 Redis 客户端(单例模式)
redisClient *gredis.Redis
redisOnce sync.Once
)
// getClient 获取 Redis 客户端(延迟初始化)
func getClient() *gredis.Redis {
redisOnce.Do(func() {
redisClient = g.Redis()
})
return redisClient
}
// GetRedisClient 获取 Redis 客户端(供外部使用)
func GetRedisClient() *gredis.Redis {
return getClient()
}
// RedisClient 导出的 Redis 客户端(供 mongo.go 使用,兼容旧代码)
var RedisClient = getClient()
// Stream 和消费者组常量
const (
// RAGFlow 请求 Stream Key
RAGFlowRequestStreamKey = "ragflow:request:stream"
// RAGFlow 响应 Stream Key
RAGFlowResponseStreamKey = "ragflow:response:stream"
// RAGFlow 请求消费者组名称
RAGFlowRequestConsumerGroup = "ragflow:request:consumer:group"
// RAGFlow 响应消费者组名称
RAGFlowResponseConsumerGroup = "ragflow:response:consumer:group"
// RAGFlow 消费者组名称(兼容旧代码)
RAGFlowConsumerGroup = "ragflow:consumer:group"
// 会话最后活跃时间 Key 前缀
SessionLastActiveKeyPrefix = "ragflow:session:"
)
// StreamMessage Redis Stream 消息结构
type StreamMessage struct {
ID string // 消息ID自动生成
Values map[string]interface{} // 消息内容
}
// InitStreamGroup 初始化 Stream 和消费者组
// 使用 gredis Do() 方法执行 XGROUP CREATE 命令
func InitStreamGroup(ctx context.Context, streamKey, groupName string) error {
// XGROUP CREATE streamKey groupName 0 MKSTREAM
_, err := redisClient.Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM")
if err != nil {
// 如果组已存在,忽略错误
errStr := err.Error()
if strings.Contains(errStr, "BUSYGROUP") || strings.Contains(errStr, "already exists") {
return nil
}
return err
}
return nil
}
// AddToStream 将消息添加到 Stream
// 使用 gredis Do() 方法执行 XADD 命令
// msg 可以是结构体或 map内部自动转换
func AddToStream(ctx context.Context, streamKey string, msg interface{}) (messageID string, err error) {
// 将结构体转换为 map
values := gconv.Map(msg)
// XADD streamKey * field1 value1 field2 value2 ...
args := make([]interface{}, 0, len(values)*2+2)
args = append(args, streamKey, "*") // "*" 自动生成ID
for key, val := range values {
args = append(args, key, val)
}
result, err := redisClient.Do(ctx, "XADD", args...)
if err != nil {
return
}
messageID = result.String()
return
}
// ReadFromStream 从 Stream 读取消息(消费者组模式)
// 使用 gredis Do() 方法执行 XREADGROUP 命令
func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count int64, blockMs int64) ([]StreamMessage, error) {
glog.Debugf(ctx, "[DEBUG Redis] XREADGROUP GROUP %s %s COUNT %d BLOCK %d STREAMS %s >",
groupName, consumerName, count, blockMs, streamKey)
// XREADGROUP GROUP groupName consumerName COUNT count BLOCK blockMs STREAMS streamKey >
result, err := redisClient.Do(ctx,
"XREADGROUP", "GROUP", groupName, consumerName,
"COUNT", count,
"BLOCK", blockMs,
"STREAMS", streamKey, ">",
)
if err != nil {
glog.Errorf(ctx, "[DEBUG Redis] XREADGROUP 错误: %v", err)
return nil, err
}
glog.Debugf(ctx, "[DEBUG Redis] XREADGROUP 返回: %+v", result)
// 预分配容量,避免动态扩容
messages := make([]StreamMessage, 0, int(count))
if result == nil || result.IsEmpty() {
// 超时或没有数据
return messages, nil
}
// GoFrame gredis 返回格式: map[streamKey:[[msgID [field1 value1 field2 value2 ...]] ...]]
resultVal := result.Val()
// 尝试 map 格式GoFrame gredis 返回)
if streamsMap, ok := resultVal.(map[interface{}]interface{}); ok {
for _, streamMsgs := range streamsMap {
msgsArray, ok := streamMsgs.([]interface{})
if !ok {
continue
}
for _, msgData := range msgsArray {
msgArray, ok := msgData.([]interface{})
if !ok || len(msgArray) < 2 {
continue
}
msgID := gconv.String(msgArray[0])
fieldsArray, ok := msgArray[1].([]interface{})
if !ok {
continue
}
values := make(map[string]interface{}, len(fieldsArray)/2)
for i := 0; i < len(fieldsArray); i += 2 {
if i+1 < len(fieldsArray) {
key := gconv.String(fieldsArray[i])
values[key] = fieldsArray[i+1]
}
}
messages = append(messages, StreamMessage{
ID: msgID,
Values: values,
})
}
}
return messages, nil
}
// 尝试数组格式(标准 Redis 返回)
if streamsArray, ok := resultVal.([]interface{}); ok && len(streamsArray) > 0 {
for _, streamData := range streamsArray {
streamArray, ok := streamData.([]interface{})
if !ok || len(streamArray) < 2 {
continue
}
messagesArray, ok := streamArray[1].([]interface{})
if !ok {
continue
}
for _, msgData := range messagesArray {
msgArray, ok := msgData.([]interface{})
if !ok || len(msgArray) < 2 {
continue
}
msgID := gconv.String(msgArray[0])
fieldsArray, ok := msgArray[1].([]interface{})
if !ok {
continue
}
values := make(map[string]interface{}, len(fieldsArray)/2)
for i := 0; i < len(fieldsArray); i += 2 {
if i+1 < len(fieldsArray) {
key := gconv.String(fieldsArray[i])
values[key] = fieldsArray[i+1]
}
}
messages = append(messages, StreamMessage{
ID: msgID,
Values: values,
})
}
}
}
return messages, nil
}
// AckMessage 确认消息已处理
// 使用 gredis Do() 方法执行 XACK 命令
func AckMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error {
// XACK streamKey groupName messageID1 messageID2 ...
// 预分配容量,避免动态扩容
args := make([]interface{}, 0, len(messageIDs)+2)
args = append(args, streamKey, groupName)
for _, id := range messageIDs {
args = append(args, id)
}
_, err := redisClient.Do(ctx, "XACK", args...)
return err
}
// GetStreamLength 获取 Stream 当前长度
// 使用 gredis Do() 方法执行 XLEN 命令
func GetStreamLength(ctx context.Context, streamKey string) (int64, error) {
// XLEN streamKey
result, err := redisClient.Do(ctx, "XLEN", streamKey)
if err != nil {
return 0, err
}
length := gconv.Int64(result)
return length, nil
}
// PendingMessage Pending 消息结构
type PendingMessage struct {
ID string // 消息ID
Consumer string // 消费者名称
Idle int64 // 空闲时间(毫秒)
RetryCount int64 // 重试次数
}
// GetPendingMessages 获取待处理消息
// 使用 gredis Do() 方法执行 XPENDING 命令
func GetPendingMessages(ctx context.Context, streamKey, groupName string, start, end string, count int64) ([]PendingMessage, error) {
// XPENDING streamKey groupName start end count
result, err := redisClient.Do(ctx, "XPENDING", streamKey, groupName, start, end, count)
if err != nil {
return nil, err
}
if result == nil {
return nil, nil
}
// 解析返回值:[[ID, consumer, idle, retryCount], ...]
pendingArray, ok := result.Val().([]interface{})
if !ok {
return nil, nil
}
messages := make([]PendingMessage, 0, len(pendingArray))
for _, item := range pendingArray {
itemArray, ok := item.([]interface{})
if !ok || len(itemArray) < 4 {
continue
}
messages = append(messages, PendingMessage{
ID: gconv.String(itemArray[0]),
Consumer: gconv.String(itemArray[1]),
Idle: gconv.Int64(itemArray[2]),
RetryCount: gconv.Int64(itemArray[3]),
})
}
return messages, nil
}
// ClaimPendingMessage 认领超时的 Pending 消息
// 使用 gredis Do() 方法执行 XCLAIM 命令
func ClaimPendingMessage(ctx context.Context, streamKey, groupName, consumerName string, minIdleTime int64, messageIDs ...string) ([]StreamMessage, error) {
// XCLAIM streamKey groupName consumerName minIdleTime messageID1 messageID2 ...
args := []interface{}{streamKey, groupName, consumerName, minIdleTime}
for _, id := range messageIDs {
args = append(args, id)
}
result, err := redisClient.Do(ctx, "XCLAIM", args...)
if err != nil {
return nil, err
}
if result == nil {
return nil, nil
}
// 解析返回值:类似 XREADGROUP
messagesArray, ok := result.Val().([]interface{})
if !ok {
return nil, nil
}
// 预分配容量,避免动态扩容
messages := make([]StreamMessage, 0, len(messagesArray))
for _, msgData := range messagesArray {
msgArray, ok := msgData.([]interface{})
if !ok || len(msgArray) < 2 {
continue
}
msgID := gconv.String(msgArray[0])
fieldsArray, ok := msgArray[1].([]interface{})
if !ok {
continue
}
// 预分配 map 容量 ,避免动态扩容
values := make(map[string]interface{}, len(fieldsArray)/2)
for i := 0; i < len(fieldsArray); i += 2 {
if i+1 < len(fieldsArray) {
key := gconv.String(fieldsArray[i])
values[key] = fieldsArray[i+1]
}
}
messages = append(messages, StreamMessage{
ID: msgID,
Values: values,
})
}
return messages, nil
}
// SetSessionLastActive 设置用户最后活跃时间
// 使用 gredis SetEX 方法
func SetSessionLastActive(ctx context.Context, userId string) error {
key := SessionLastActiveKeyPrefix + userId + ":last_active"
timestamp := gtime.Now().Timestamp()
// SETEX key 7200 value (7200秒 = 2小时)
_, err := redisClient.Do(ctx, "SETEX", key, 7200, timestamp)
return err
}
// GetSessionLastActive 获取用户最后活跃时间
// 使用 gredis Get 方法
func GetSessionLastActive(ctx context.Context, userId string) (int64, error) {
key := SessionLastActiveKeyPrefix + userId + ":last_active"
result, err := redisClient.Get(ctx, key)
if err != nil {
return 0, err
}
if result.IsEmpty() {
return 0, nil
}
timestamp := gconv.Int64(result.Val())
return timestamp, nil
}
// IsUserActive 检查用户是否在指定时间范围内活跃过
// 用于追问逻辑:如果用户最近活跃过,则不发送追问消息
// 参数:
// - userId: 用户ID
// - seconds: 时间范围例如传入300表示检查5分钟内是否活跃
//
// 返回:
// - bool: true表示用户在指定时间内活跃过
// - error: 操作失败时返回错误
func IsUserActive(ctx context.Context, userId string, seconds int64) (bool, error) {
lastActive, err := GetSessionLastActive(ctx, userId)
if err != nil {
return false, err
}
if lastActive == 0 {
return false, nil // 未找到记录,视为不活跃
}
now := gtime.Now().Timestamp()
return (now - lastActive) < seconds, nil
}
// SetSessionCache 缓存用户的 RAGFlow Session ID
// 使用 gredis SetEX 方法
func SetSessionCache(ctx context.Context, userId, sessionId string) error {
key := SessionLastActiveKeyPrefix + userId + ":session_id"
// SETEX key 604800 value (604800秒 = 7天)
_, err := redisClient.Do(ctx, "SETEX", key, 604800, sessionId)
return err
}
// 限流相关常量
const (
// RateLimitKeyPrefix 限流计数器 Key 前缀
RateLimitKeyPrefix = "ragflow:ratelimit:"
)
// IncrRateLimit 增加限流计数器,返回当前计数
// windowSeconds: 时间窗口(秒)
func IncrRateLimit(ctx context.Context, key string, windowSeconds int64) (count int64, err error) {
fullKey := RateLimitKeyPrefix + key
result, err := redisClient.Do(ctx, "INCR", fullKey)
if err != nil {
return
}
count = result.Int64()
// 首次设置过期时间
if count == 1 {
redisClient.Do(ctx, "EXPIRE", fullKey, windowSeconds)
}
return
}
// GetRateLimit 获取当前限流计数
func GetRateLimit(ctx context.Context, key string) (count int64, err error) {
fullKey := RateLimitKeyPrefix + key
result, err := redisClient.Get(ctx, fullKey)
if err != nil {
return
}
if result.IsEmpty() {
return 0, nil
}
count = result.Int64()
return
}
// GetSessionCache 获取缓存的 RAGFlow Session ID
func GetSessionCache(ctx context.Context, userId string) (string, error) {
key := SessionLastActiveKeyPrefix + userId + ":session_id"
result, err := redisClient.Get(ctx, key)
if err != nil {
return "", err
}
if result.IsEmpty() {
return "", nil
}
return result.String(), nil
}
// DelSessionCache 删除缓存的 RAGFlow Session ID归档时调用
func DelSessionCache(ctx context.Context, userId string) error {
key := SessionLastActiveKeyPrefix + userId + ":session_id"
_, err := redisClient.Del(ctx, key)
return err
}