Files
common/redis/redis.go

688 lines
20 KiB
Go
Raw Normal View History

2025-12-03 13:58:37 +08:00
package redis
import (
2025-12-04 17:39:31 +08:00
"context"
"errors"
2025-12-05 12:18:04 +08:00
"strings"
2025-12-09 17:55:08 +08:00
"sync"
2025-12-26 17:12:10 +08:00
"time"
2025-12-04 17:39:31 +08:00
2025-12-09 17:55:08 +08:00
"github.com/gogf/gf/v2/database/gredis"
2025-12-03 13:58:37 +08:00
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
2025-12-06 09:36:10 +08:00
"github.com/gogf/gf/v2/os/gtime"
2025-12-05 12:18:04 +08:00
"github.com/gogf/gf/v2/util/gconv"
2025-12-03 13:58:37 +08:00
)
2025-12-09 17:55:08 +08:00
var (
// redisClient 内部使用的 Redis 客户端(单例模式)
redisClient *gredis.Redis
redisOnce sync.Once
)
// getClient 获取 Redis 客户端(延迟初始化)
func getClient() *gredis.Redis {
redisOnce.Do(func() {
redisClient = g.Redis()
})
return redisClient
}
// GetRedisClient 获取 Redis 客户端(供外部使用)
func GetRedisClient() *gredis.Redis {
return getClient()
}
2025-12-26 17:12:10 +08:00
// Lock 分布式锁
2025-12-26 17:35:52 +08:00
func Lock(ctx context.Context, key string, expireSeconds int64, fn func(ctx context.Context) error) (success bool, err error) {
2025-12-26 17:12:10 +08:00
limit := 3
LOOP:
if limit < 0 {
return false, errors.New("锁重试次数耗尽")
2025-12-26 17:12:10 +08:00
}
limit--
if val, err := RedisClient.Set(ctx, key, true, gredis.SetOption{
TTLOption: gredis.TTLOption{
EX: &expireSeconds,
},
NX: true,
}); err != nil {
return false, err
2025-12-26 17:12:10 +08:00
} else {
if val.Bool() {
defer func(RedisClient *gredis.Redis, ctx context.Context, key string) {
if _, err = RedisClient.Del(ctx, key); err != nil {
glog.Errorf(ctx, "RedisClient.Del error: %v", err)
}
}(RedisClient, ctx, key)
2025-12-26 17:35:52 +08:00
if err = fn(ctx); err != nil {
return false, err
}
return true, nil
2025-12-26 17:12:10 +08:00
} else {
time.Sleep(time.Second)
goto LOOP
}
}
}
2025-12-09 17:55:08 +08:00
// RedisClient 导出的 Redis 客户端(供 mongo.go 使用,兼容旧代码)
var RedisClient = getClient()
2025-12-04 17:39:31 +08:00
2025-12-05 11:44:07 +08:00
// Stream 和消费者组常量
2025-12-04 17:39:31 +08:00
const (
2025-12-05 11:44:07 +08:00
// RAGFlow 请求 Stream Key
RAGFlowRequestStreamKey = "ragflow:request:stream"
// RAGFlow 响应 Stream Key
RAGFlowResponseStreamKey = "ragflow:response:stream"
// RAGFlow 请求消费者组名称
RAGFlowRequestConsumerGroup = "ragflow:request:consumer:group"
// RAGFlow 响应消费者组名称
RAGFlowResponseConsumerGroup = "ragflow:response:consumer:group"
// RAGFlow 消费者组名称(兼容旧代码)
2025-12-05 11:44:07 +08:00
RAGFlowConsumerGroup = "ragflow:consumer:group"
2025-12-04 17:39:31 +08:00
// 会话最后活跃时间 Key 前缀
SessionLastActiveKeyPrefix = "ragflow:session:"
)
2025-12-05 11:44:07 +08:00
// StreamMessage Redis Stream 消息结构
type StreamMessage struct {
ID string // 消息ID自动生成
Values map[string]interface{} // 消息内容
}
// InitStreamGroup 初始化 Stream 和消费者组
2025-12-05 12:18:04 +08:00
// 使用 gredis Do() 方法执行 XGROUP CREATE 命令
2025-12-05 11:44:07 +08:00
func InitStreamGroup(ctx context.Context, streamKey, groupName string) error {
2025-12-05 12:18:04 +08:00
// XGROUP CREATE streamKey groupName 0 MKSTREAM
2025-12-09 09:20:44 +08:00
_, err := redisClient.Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM")
2025-12-05 11:44:07 +08:00
if err != nil {
2025-12-05 12:18:04 +08:00
// 如果组已存在,忽略错误
errStr := err.Error()
if strings.Contains(errStr, "BUSYGROUP") || strings.Contains(errStr, "already exists") {
2025-12-05 11:44:07 +08:00
return nil
}
return err
}
return nil
2025-12-04 17:39:31 +08:00
}
2025-12-05 11:44:07 +08:00
// AddToStream 将消息添加到 Stream
2025-12-05 12:18:04 +08:00
// 使用 gredis Do() 方法执行 XADD 命令
2025-12-09 09:20:44 +08:00
// msg 可以是结构体或 map内部自动转换
func AddToStream(ctx context.Context, streamKey string, msg interface{}) (messageID string, err error) {
// 将结构体转换为 map
values := gconv.Map(msg)
2025-12-05 12:18:04 +08:00
// XADD streamKey * field1 value1 field2 value2 ...
2025-12-09 09:20:44 +08:00
args := make([]interface{}, 0, len(values)*2+2)
args = append(args, streamKey, "*") // "*" 自动生成ID
2025-12-05 12:18:04 +08:00
for key, val := range values {
args = append(args, key, val)
}
2025-12-09 09:20:44 +08:00
result, err := redisClient.Do(ctx, "XADD", args...)
2025-12-04 17:39:31 +08:00
if err != nil {
2025-12-09 09:20:44 +08:00
return
2025-12-04 17:39:31 +08:00
}
2025-12-05 12:18:04 +08:00
2025-12-09 09:20:44 +08:00
messageID = result.String()
return
2025-12-05 11:44:07 +08:00
}
2025-12-04 17:39:31 +08:00
// CreateConsumerGroup 创建消费者组(如果不存在)
// XGROUP CREATE streamKey groupName $ MKSTREAM
func CreateConsumerGroup(ctx context.Context, streamKey, groupName string) error {
_, err := redisClient.Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "$", "MKSTREAM")
return err
}
2025-12-05 11:44:07 +08:00
// ReadFromStream 从 Stream 读取消息(消费者组模式)
2025-12-05 12:18:04 +08:00
// 使用 gredis Do() 方法执行 XREADGROUP 命令
2025-12-05 11:44:07 +08:00
func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count int64, blockMs int64) ([]StreamMessage, error) {
glog.Debugf(ctx, "[DEBUG Redis] XREADGROUP GROUP %s %s COUNT %d BLOCK %d STREAMS %s >",
groupName, consumerName, count, blockMs, streamKey)
2025-12-26 15:13:07 +08:00
// 检查是否需要记录trace避免轮询产生大量trace
execCtx := ctx
if !g.Cfg().MustGet(ctx, "jaeger.traceStream", true).Bool() {
// 不记录trace使用background context不继承span
execCtx = context.Background()
}
2025-12-05 12:18:04 +08:00
// XREADGROUP GROUP groupName consumerName COUNT count BLOCK blockMs STREAMS streamKey >
2025-12-26 15:13:07 +08:00
result, err := redisClient.Do(execCtx,
2025-12-05 12:18:04 +08:00
"XREADGROUP", "GROUP", groupName, consumerName,
"COUNT", count,
"BLOCK", blockMs,
"STREAMS", streamKey, ">",
)
2025-12-05 11:44:07 +08:00
if err != nil {
glog.Errorf(ctx, "[DEBUG Redis] XREADGROUP 错误: %v", err)
2025-12-05 11:44:07 +08:00
return nil, err
2025-12-04 17:39:31 +08:00
}
glog.Debugf(ctx, "[DEBUG Redis] XREADGROUP 返回: %+v", result)
// 预分配容量,避免动态扩容
messages := make([]StreamMessage, 0, int(count))
2025-12-05 12:18:04 +08:00
if result == nil || result.IsEmpty() {
2025-12-05 12:18:04 +08:00
// 超时或没有数据
return messages, nil
}
// GoFrame gredis 返回格式: map[streamKey:[[msgID [field1 value1 field2 value2 ...]] ...]]
resultVal := result.Val()
2025-12-05 12:18:04 +08:00
// 尝试 map 格式GoFrame gredis 返回)
if streamsMap, ok := resultVal.(map[interface{}]interface{}); ok {
for _, streamMsgs := range streamsMap {
msgsArray, ok := streamMsgs.([]interface{})
if !ok {
continue
}
for _, msgData := range msgsArray {
msgArray, ok := msgData.([]interface{})
if !ok || len(msgArray) < 2 {
continue
}
msgID := gconv.String(msgArray[0])
fieldsArray, ok := msgArray[1].([]interface{})
if !ok {
continue
}
values := make(map[string]interface{}, len(fieldsArray)/2)
for i := 0; i < len(fieldsArray); i += 2 {
if i+1 < len(fieldsArray) {
key := gconv.String(fieldsArray[i])
values[key] = fieldsArray[i+1]
}
}
messages = append(messages, StreamMessage{
ID: msgID,
Values: values,
})
}
2025-12-05 12:18:04 +08:00
}
return messages, nil
}
2025-12-05 12:18:04 +08:00
// 尝试数组格式(标准 Redis 返回)
if streamsArray, ok := resultVal.([]interface{}); ok && len(streamsArray) > 0 {
for _, streamData := range streamsArray {
streamArray, ok := streamData.([]interface{})
if !ok || len(streamArray) < 2 {
2025-12-05 12:18:04 +08:00
continue
}
messagesArray, ok := streamArray[1].([]interface{})
2025-12-05 12:18:04 +08:00
if !ok {
continue
}
for _, msgData := range messagesArray {
msgArray, ok := msgData.([]interface{})
if !ok || len(msgArray) < 2 {
continue
2025-12-05 12:18:04 +08:00
}
msgID := gconv.String(msgArray[0])
fieldsArray, ok := msgArray[1].([]interface{})
if !ok {
continue
}
values := make(map[string]interface{}, len(fieldsArray)/2)
for i := 0; i < len(fieldsArray); i += 2 {
if i+1 < len(fieldsArray) {
key := gconv.String(fieldsArray[i])
values[key] = fieldsArray[i+1]
}
}
messages = append(messages, StreamMessage{
ID: msgID,
Values: values,
})
2025-12-05 12:18:04 +08:00
}
2025-12-05 11:44:07 +08:00
}
2025-12-04 17:39:31 +08:00
}
2025-12-05 11:44:07 +08:00
return messages, nil
}
// AckMessage 确认消息已处理
2025-12-05 12:18:04 +08:00
// 使用 gredis Do() 方法执行 XACK 命令
2025-12-05 11:44:07 +08:00
func AckMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error {
2025-12-05 12:18:04 +08:00
// XACK streamKey groupName messageID1 messageID2 ...
// 预分配容量,避免动态扩容
args := make([]interface{}, 0, len(messageIDs)+2)
args = append(args, streamKey, groupName)
2025-12-05 12:18:04 +08:00
for _, id := range messageIDs {
args = append(args, id)
2025-12-05 11:44:07 +08:00
}
2025-12-05 12:18:04 +08:00
2025-12-09 09:20:44 +08:00
_, err := redisClient.Do(ctx, "XACK", args...)
2025-12-05 12:18:04 +08:00
return err
2025-12-04 17:39:31 +08:00
}
2025-12-05 11:44:07 +08:00
// GetStreamLength 获取 Stream 当前长度
2025-12-05 12:18:04 +08:00
// 使用 gredis Do() 方法执行 XLEN 命令
2025-12-05 11:44:07 +08:00
func GetStreamLength(ctx context.Context, streamKey string) (int64, error) {
2025-12-05 12:18:04 +08:00
// XLEN streamKey
2025-12-09 09:20:44 +08:00
result, err := redisClient.Do(ctx, "XLEN", streamKey)
2025-12-04 17:39:31 +08:00
if err != nil {
return 0, err
}
2025-12-05 12:18:04 +08:00
length := gconv.Int64(result)
2025-12-05 11:44:07 +08:00
return length, nil
}
2025-12-05 12:18:04 +08:00
// PendingMessage Pending 消息结构
type PendingMessage struct {
ID string // 消息ID
Consumer string // 消费者名称
Idle int64 // 空闲时间(毫秒)
RetryCount int64 // 重试次数
}
// GetPendingMessages 获取待处理消息
// 使用 gredis Do() 方法执行 XPENDING 命令
func GetPendingMessages(ctx context.Context, streamKey, groupName string, start, end string, count int64) ([]PendingMessage, error) {
// XPENDING streamKey groupName start end count
2025-12-09 09:20:44 +08:00
result, err := redisClient.Do(ctx, "XPENDING", streamKey, groupName, start, end, count)
2025-12-05 11:44:07 +08:00
if err != nil {
return nil, err
}
2025-12-05 12:18:04 +08:00
if result == nil {
return nil, nil
2025-12-05 12:18:04 +08:00
}
// 解析返回值:[[ID, consumer, idle, retryCount], ...]
pendingArray, ok := result.Val().([]interface{})
if !ok {
return nil, nil
2025-12-05 12:18:04 +08:00
}
messages := make([]PendingMessage, 0, len(pendingArray))
2025-12-05 12:18:04 +08:00
for _, item := range pendingArray {
itemArray, ok := item.([]interface{})
if !ok || len(itemArray) < 4 {
continue
}
messages = append(messages, PendingMessage{
ID: gconv.String(itemArray[0]),
Consumer: gconv.String(itemArray[1]),
Idle: gconv.Int64(itemArray[2]),
RetryCount: gconv.Int64(itemArray[3]),
})
}
return messages, nil
2025-12-05 11:44:07 +08:00
}
// ClaimPendingMessage 认领超时的 Pending 消息
2025-12-05 12:18:04 +08:00
// 使用 gredis Do() 方法执行 XCLAIM 命令
2025-12-05 11:44:07 +08:00
func ClaimPendingMessage(ctx context.Context, streamKey, groupName, consumerName string, minIdleTime int64, messageIDs ...string) ([]StreamMessage, error) {
2025-12-05 12:18:04 +08:00
// XCLAIM streamKey groupName consumerName minIdleTime messageID1 messageID2 ...
args := []interface{}{streamKey, groupName, consumerName, minIdleTime}
for _, id := range messageIDs {
args = append(args, id)
}
2025-12-09 09:20:44 +08:00
result, err := redisClient.Do(ctx, "XCLAIM", args...)
2025-12-05 11:44:07 +08:00
if err != nil {
return nil, err
}
2025-12-05 12:18:04 +08:00
if result == nil {
return nil, nil
2025-12-05 12:18:04 +08:00
}
// 解析返回值:类似 XREADGROUP
messagesArray, ok := result.Val().([]interface{})
if !ok {
return nil, nil
2025-12-05 12:18:04 +08:00
}
// 预分配容量,避免动态扩容
messages := make([]StreamMessage, 0, len(messagesArray))
2025-12-05 12:18:04 +08:00
for _, msgData := range messagesArray {
msgArray, ok := msgData.([]interface{})
if !ok || len(msgArray) < 2 {
continue
}
msgID := gconv.String(msgArray[0])
fieldsArray, ok := msgArray[1].([]interface{})
if !ok {
continue
}
// 预分配 map 容量 ,避免动态扩容
values := make(map[string]interface{}, len(fieldsArray)/2)
2025-12-05 12:18:04 +08:00
for i := 0; i < len(fieldsArray); i += 2 {
if i+1 < len(fieldsArray) {
key := gconv.String(fieldsArray[i])
values[key] = fieldsArray[i+1]
2025-12-05 12:18:04 +08:00
}
}
2025-12-05 11:44:07 +08:00
messages = append(messages, StreamMessage{
2025-12-05 12:18:04 +08:00
ID: msgID,
Values: values,
2025-12-05 11:44:07 +08:00
})
}
return messages, nil
2025-12-04 17:39:31 +08:00
}
// SetSessionLastActive 设置用户最后活跃时间
2025-12-05 12:18:04 +08:00
// 使用 gredis SetEX 方法
2025-12-04 17:39:31 +08:00
func SetSessionLastActive(ctx context.Context, userId string) error {
key := SessionLastActiveKeyPrefix + userId + ":last_active"
2025-12-06 09:36:10 +08:00
timestamp := gtime.Now().Timestamp()
2025-12-04 17:39:31 +08:00
2025-12-05 12:18:04 +08:00
// SETEX key 7200 value (7200秒 = 2小时)
2025-12-09 09:20:44 +08:00
_, err := redisClient.Do(ctx, "SETEX", key, 7200, timestamp)
2025-12-05 12:18:04 +08:00
return err
2025-12-04 17:39:31 +08:00
}
// GetSessionLastActive 获取用户最后活跃时间
2025-12-05 12:18:04 +08:00
// 使用 gredis Get 方法
2025-12-04 17:39:31 +08:00
func GetSessionLastActive(ctx context.Context, userId string) (int64, error) {
key := SessionLastActiveKeyPrefix + userId + ":last_active"
2025-12-09 09:20:44 +08:00
result, err := redisClient.Get(ctx, key)
2025-12-04 17:39:31 +08:00
if err != nil {
return 0, err
}
2025-12-05 12:18:04 +08:00
if result.IsEmpty() {
return 0, nil
2025-12-04 17:39:31 +08:00
}
2025-12-05 12:18:04 +08:00
timestamp := gconv.Int64(result.Val())
2025-12-05 11:44:07 +08:00
return timestamp, nil
2025-12-04 17:39:31 +08:00
}
// IsUserActive 检查用户是否在指定时间范围内活跃过
// 用于追问逻辑:如果用户最近活跃过,则不发送追问消息
// 参数:
// - userId: 用户ID
// - seconds: 时间范围例如传入300表示检查5分钟内是否活跃
//
// 返回:
// - bool: true表示用户在指定时间内活跃过
// - error: 操作失败时返回错误
func IsUserActive(ctx context.Context, userId string, seconds int64) (bool, error) {
lastActive, err := GetSessionLastActive(ctx, userId)
if err != nil {
return false, err
}
if lastActive == 0 {
return false, nil // 未找到记录,视为不活跃
}
2025-12-20 14:50:41 +08:00
// 检查时间差
2025-12-06 09:36:10 +08:00
now := gtime.Now().Timestamp()
2025-12-04 17:39:31 +08:00
return (now - lastActive) < seconds, nil
}
2025-12-20 14:50:41 +08:00
// ============== 限流相关 ==============
2025-12-04 17:39:31 +08:00
const (
// RateLimitKeyPrefix 限流计数器 Key 前缀
RateLimitKeyPrefix = "ragflow:ratelimit:"
)
// IncrRateLimit 增加限流计数器,返回当前计数
// windowSeconds: 时间窗口(秒)
func IncrRateLimit(ctx context.Context, key string, windowSeconds int64) (count int64, err error) {
fullKey := RateLimitKeyPrefix + key
2025-12-09 09:20:44 +08:00
result, err := redisClient.Do(ctx, "INCR", fullKey)
if err != nil {
return
}
count = result.Int64()
// 首次设置过期时间
if count == 1 {
2025-12-09 09:20:44 +08:00
redisClient.Do(ctx, "EXPIRE", fullKey, windowSeconds)
}
return
}
// GetRateLimit 获取当前限流计数
func GetRateLimit(ctx context.Context, key string) (count int64, err error) {
fullKey := RateLimitKeyPrefix + key
2025-12-09 09:20:44 +08:00
result, err := redisClient.Get(ctx, fullKey)
if err != nil {
return
}
if result.IsEmpty() {
return 0, nil
}
count = result.Int64()
return
}
// SetSessionCache 缓存 RAGFlow Session ID租户+用户隔离)
func SetSessionCache(ctx context.Context, tenantId, userId, sessionId string) error {
key := SessionLastActiveKeyPrefix + tenantId + ":" + userId + ":session_id"
2025-12-20 14:50:41 +08:00
// SETEX key 7200 value (7200秒 = 2小时与last_active保持一致)
_, err := redisClient.Do(ctx, "SETEX", key, 7200, sessionId)
return err
}
// GetSessionCache 获取缓存的 RAGFlow Session ID租户+用户隔离)
func GetSessionCache(ctx context.Context, tenantId, userId string) (string, error) {
key := SessionLastActiveKeyPrefix + tenantId + ":" + userId + ":session_id"
2025-12-09 09:20:44 +08:00
result, err := redisClient.Get(ctx, key)
2025-12-04 17:39:31 +08:00
if err != nil {
return "", err
}
2025-12-05 12:18:04 +08:00
if result.IsEmpty() {
return "", nil
}
return result.String(), nil
2025-12-04 17:39:31 +08:00
}
2025-12-09 09:20:44 +08:00
// DelSessionCache 删除缓存的 RAGFlow Session ID归档时调用租户+用户隔离)
func DelSessionCache(ctx context.Context, tenantId, userId string) error {
key := SessionLastActiveKeyPrefix + tenantId + ":" + userId + ":session_id"
2025-12-09 09:20:44 +08:00
_, err := redisClient.Del(ctx, key)
return err
}
// TryLock 尝试获取分布式锁(非阻塞)
// key: 锁的键名
// expireSeconds: 锁的过期时间(秒),防止死锁
// 返回 true 表示获取成功false 表示锁已被其他节点持有
func TryLock(ctx context.Context, key string, expireSeconds int) bool {
// SET key value NX EX expireSeconds
result, err := redisClient.Do(ctx, "SET", key, gtime.Now().String(), "NX", "EX", expireSeconds)
if err != nil {
glog.Errorf(ctx, "获取分布式锁失败: %v", err)
return false
}
return result.String() == "OK"
}
// Unlock 释放分布式锁
func Unlock(ctx context.Context, key string) {
if _, err := redisClient.Del(ctx, key); err != nil {
glog.Errorf(ctx, "释放分布式锁失败: %v", err)
}
}
// ============== 对话计数相关(用于卡片触发)==============
const (
2025-12-16 15:20:16 +08:00
// UserStateKeyPrefix 用户会话状态 Key 前缀(融合阶段+计数)
UserStateKeyPrefix = "ragflow:user:state:"
// UserStateExpireSeconds 用户状态过期时间5分钟
UserStateExpireSeconds = 300
)
2025-12-18 18:01:21 +08:00
// UserState 用户会话状态(阶段+对话计数+咨询方向,统一5分钟过期
2025-12-16 15:20:16 +08:00
type UserState struct {
2025-12-26 15:13:07 +08:00
Stage int `json:"stage"` // 当前阶段
Direction string `json:"direction"` // 咨询方向
Count int64 `json:"count"` // 对话计数v5.2卡片触发)
AccountName string `json:"accountName"` // 用户选择的方向对应的客服账号名称
2025-12-16 15:20:16 +08:00
}
// GetUserState 获取用户状态(阶段+计数)
func GetUserState(ctx context.Context, userId, platform string) (state *UserState, err error) {
key := UserStateKeyPrefix + userId + "_" + platform
result, err := redisClient.Do(ctx, "HGETALL", key)
if err != nil {
return
}
2025-12-18 18:01:21 +08:00
state = &UserState{Stage: 5} // 默认状态5未选择方向
2025-12-16 15:20:16 +08:00
if result.IsEmpty() {
// Redis为空初始化默认状态
if initErr := SetUserStage(ctx, userId, platform, 5); initErr != nil {
err = initErr
return
}
2025-12-18 18:01:21 +08:00
return
}
2025-12-16 15:20:16 +08:00
m := result.Map()
state.Stage = gconv.Int(m["stage"])
state.Count = gconv.Int64(m["count"])
2025-12-18 18:01:21 +08:00
state.Direction = gconv.String(m["direction"])
return
}
2025-12-16 15:20:16 +08:00
// SetUserStage 设置用户阶段,并刷新过期时间
func SetUserStage(ctx context.Context, userId, platform string, stage int) error {
key := UserStateKeyPrefix + userId + "_" + platform
_, err := redisClient.Do(ctx, "HSET", key, "stage", stage)
if err != nil {
2025-12-16 15:20:16 +08:00
return err
}
2025-12-16 15:20:16 +08:00
_, err = redisClient.Do(ctx, "EXPIRE", key, UserStateExpireSeconds)
return err
}
2025-12-26 15:13:07 +08:00
// SetUserAccountName 设置用户对应的客服账号名称,并刷新过期时间
func SetUserAccountName(ctx context.Context, userId, platform, accountName string) error {
key := UserStateKeyPrefix + userId + "_" + platform
2025-12-26 15:13:07 +08:00
_, err := redisClient.Do(ctx, "HSET", key, "accountName", accountName)
if err != nil {
return err
}
_, err = redisClient.Do(ctx, "EXPIRE", key, UserStateExpireSeconds)
return err
}
2025-12-18 18:01:21 +08:00
// SetUserDirection 设置用户选择的咨询方向,并刷新过期时间
func SetUserDirection(ctx context.Context, userId, platform, direction string) error {
key := UserStateKeyPrefix + userId + "_" + platform
_, err := redisClient.Do(ctx, "HSET", key, "direction", direction)
if err != nil {
return err
}
_, err = redisClient.Do(ctx, "EXPIRE", key, UserStateExpireSeconds)
return err
}
2025-12-16 15:20:16 +08:00
// IncrUserCount 增加用户对话计数,返回当前轮数,并刷新过期时间
func IncrUserCount(ctx context.Context, userId, platform string) (count int64, err error) {
key := UserStateKeyPrefix + userId + "_" + platform
result, err := redisClient.Do(ctx, "HINCRBY", key, "count", 1)
if err != nil {
return
}
count = result.Int64()
2025-12-16 15:20:16 +08:00
_, err = redisClient.Do(ctx, "EXPIRE", key, UserStateExpireSeconds)
return
}
2025-12-16 15:20:16 +08:00
// ResetUserState 重置用户状态(归档时调用)
func ResetUserState(ctx context.Context, userId, platform string) error {
key := UserStateKeyPrefix + userId + "_" + platform
_, err := redisClient.Del(ctx, key)
return err
}
2025-12-16 15:20:16 +08:00
2025-12-18 18:01:21 +08:00
// ============== 对话缓存相关5句落库==============
const (
// ConversationCacheKeyPrefix 对话缓存 Key 前缀
ConversationCacheKeyPrefix = "ragflow:conversation:cache:"
// ConversationCacheExpireSeconds 对话缓存过期时间10分钟
ConversationCacheExpireSeconds = 600
)
// CacheConversation 缓存单条对话到Redis List按sessionId存储
func CacheConversation(ctx context.Context, sessionId string, data []byte) error {
key := ConversationCacheKeyPrefix + sessionId
2025-12-18 18:01:21 +08:00
_, err := redisClient.Do(ctx, "RPUSH", key, string(data))
if err != nil {
return err
}
_, err = redisClient.Do(ctx, "EXPIRE", key, ConversationCacheExpireSeconds)
return err
}
// GetCachedConversations 获取缓存的对话列表并清空按sessionId查询
func GetCachedConversations(ctx context.Context, sessionId string) (list []string, err error) {
key := ConversationCacheKeyPrefix + sessionId
2025-12-18 18:01:21 +08:00
result, err := redisClient.Do(ctx, "LRANGE", key, 0, -1)
if err != nil {
return
}
if result.IsEmpty() {
return
}
list = result.Strings()
// 清空缓存
redisClient.Del(ctx, key)
return
}
// GetCachedConversationCount 获取缓存的对话数量按sessionId查询
func GetCachedConversationCount(ctx context.Context, sessionId string) (count int64, err error) {
key := ConversationCacheKeyPrefix + sessionId
2025-12-18 18:01:21 +08:00
result, err := redisClient.Do(ctx, "LLEN", key)
if err != nil {
return
}
return result.Int64(), nil
}
// ClearCachedConversations 清空对话缓存归档时调用按sessionId
func ClearCachedConversations(ctx context.Context, sessionId string) error {
key := ConversationCacheKeyPrefix + sessionId
2025-12-18 18:01:21 +08:00
_, err := redisClient.Del(ctx, key)
return err
}
2025-12-16 15:20:16 +08:00
// ========== 以下为兼容旧接口(内部调用新实现)==========
// IncrConversationCount 增加用户对话计数(兼容旧接口)
func IncrConversationCount(ctx context.Context, userId, platform string, _ int64) (count int64, err error) {
return IncrUserCount(ctx, userId, platform)
}
// GetConversationCount 获取用户当前对话轮数(兼容旧接口)
func GetConversationCount(ctx context.Context, userId, platform string) (count int64, err error) {
state, err := GetUserState(ctx, userId, platform)
if err != nil {
return
}
return state.Count, nil
}
// ResetConversationCount 重置用户对话计数(兼容旧接口)
func ResetConversationCount(ctx context.Context, userId, platform string) error {
return ResetUserState(ctx, userId, platform)
}