package redis import ( "context" "strings" "sync" "time" "github.com/gogf/gf/v2/database/gredis" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" ) var ( // redisClient 内部使用的 Redis 客户端(单例模式) redisClient *gredis.Redis redisOnce sync.Once ) // getClient 获取 Redis 客户端(延迟初始化) func getClient() *gredis.Redis { redisOnce.Do(func() { redisClient = g.Redis() }) return redisClient } // getClient 获取 Redis 客户端 临时方法 func GetRedisClientTest(name string) *gredis.Redis { return g.Redis(name) } // RedisClient 获取 Redis 客户端(函数式,确保单例正确初始化) func RedisClient() *gredis.Redis { return getClient() } func GetReadStream(ctx context.Context, msg ...QueueMessage) error { for _, t := range msg { err := GetReadFromStream(ctx, t.StreamKey, t.GroupName, t.ConsumerName, t.BatchSize, t.BlockMs, t.AutoAck, t.HandleFunc) if err != nil { glog.Infof(ctx, "读取ReadFromStream数据失败-> 键名: %s, 消费者组: %s, 消费者名称%v\n, 失败err:%v\n", t.StreamKey, t.GroupName, t.ConsumerName, err) continue } } return nil } // GetReadFromStream 读取ReadFromStream数据 func GetReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count, blockMs int64, autoAck bool, fn func(ctx context.Context, message map[string]interface{}) error) (err error) { glog.Infof(ctx, "初始化 Stream: %s, 消费者组: %s", streamKey, groupName) err = InitStreamGroup(ctx, streamKey, groupName) if err != nil { return err } for { // 从 Redis Stream 读取一批消息 messages, err := ReadFromStream(ctx, streamKey, groupName, consumerName, count, blockMs) if err != nil { glog.Errorf(ctx, "[DEBUG Redis] XREADGROUP 错误: %v", err) return err } // 处理消息 for _, msg := range messages { glog.Infof(ctx, "消费者 '%s' -> 接收到消息 ID: %s, 内容: %v\n", consumerName, msg.ID, msg.Values) // 业务处理 if err = fn(ctx, msg.Values); err != nil { glog.Infof(ctx, "业务处理失败-> err:%v\n", err) continue } // 确认消息 (ACK) if autoAck { // 处理成功后,必须调用 XAck,否则消息会一直留在 PEL 中 err = AckMessage(ctx, streamKey, groupName, msg.ID) if err != nil { glog.Infof(ctx, "消费者 '%s' 确认消息 ID %s 失败: %v\n", consumerName, msg.ID, err) } else { glog.Infof(ctx, "消费者 '%s' -> 已确认消息 ID: %s\n", consumerName, msg.ID) } } } } return } // Stream 和消费者组常量 const ( // RAGFlow 请求 Stream Key RAGFlowRequestStreamKey = "ragflow:request:stream" // RAGFlow 响应 Stream Key RAGFlowResponseStreamKey = "ragflow:response:stream" // RAGFlow 请求消费者组名称 RAGFlowRequestConsumerGroup = "ragflow:request:consumer:group" // RAGFlow 响应消费者组名称 RAGFlowResponseConsumerGroup = "ragflow:response:consumer:group" // RAGFlow 消费者组名称(兼容旧代码) RAGFlowConsumerGroup = "ragflow:consumer:group" // 会话最后活跃时间 Key 前缀 SessionLastActiveKeyPrefix = "ragflow:session:" ) // StreamMessage Redis Stream 消息结构 type StreamMessage struct { ID string // 消息ID(自动生成) Values map[string]interface{} // 消息内容 } // InitStreamGroup 初始化 Stream 和消费者组 // 使用 gredis Do() 方法执行 XGROUP CREATE 命令 func InitStreamGroup(ctx context.Context, streamKey, groupName string) error { // XGROUP CREATE streamKey groupName 0 MKSTREAM _, err := getClient().Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM") if err != nil { // 如果组已存在,忽略错误 errStr := err.Error() if strings.Contains(errStr, "BUSYGROUP") || strings.Contains(errStr, "already exists") { return nil } return err } return nil } // AddToStream 将消息添加到 Stream // 使用 gredis Do() 方法执行 XADD 命令 // msg 可以是结构体或 map,内部自动转换 func AddToStream(ctx context.Context, streamKey string, msg interface{}) (messageID string, err error) { // 将结构体转换为 map values := gconv.Map(msg) // XADD streamKey * field1 value1 field2 value2 ... args := make([]interface{}, 0, len(values)*2+2) args = append(args, streamKey, "*") // "*" 自动生成ID for key, val := range values { args = append(args, key, val) } result, err := getClient().Do(ctx, "XADD", args...) if err != nil { return } messageID = result.String() return } // CreateConsumerGroup 创建消费者组(如果不存在) // XGROUP CREATE streamKey groupName 0 MKSTREAM // 使用0作为起始ID,从Stream开头读取所有未消费消息 func CreateConsumerGroup(ctx context.Context, streamKey, groupName string) error { _, err := getClient().Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM") return err } // ReadFromStream 从 Stream 读取消息(消费者组模式) // 使用 gredis Do() 方法执行 XREADGROUP 命令 func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count int64, blockMs int64) ([]StreamMessage, error) { // 检查是否需要记录trace(避免轮询产生大量trace) execCtx := ctx if !g.Cfg().MustGet(ctx, "jaeger.traceStream", true).Bool() { // 不记录trace:使用background context(不继承span) execCtx = context.Background() } RECONNECT: // 先尝试读取pending消息(ID=0),处理积压 result, err := getClient().Do(execCtx, "XREADGROUP", "GROUP", groupName, consumerName, "COUNT", count, "BLOCK", 0, // 不阻塞,立即返回 "STREAMS", streamKey, "0", // ID=0 读取pending消息 ) if err != nil { g.Log().Errorf(ctx, "❌ XREADGROUP读取pending失败: stream=%s, error=%v", streamKey, err) time.Sleep(time.Second) goto RECONNECT } // 检查pending结果是否为空(需要检查消息数组是否为空) hasPending := false if result != nil && !result.IsEmpty() { // 尝试解析map格式 if resultVal := result.Val(); resultVal != nil { if streamsMap, ok := resultVal.(map[interface{}]interface{}); ok { for _, streamMsgs := range streamsMap { if msgsArray, ok := streamMsgs.([]interface{}); ok && len(msgsArray) > 0 { hasPending = true break } } } } } // 如果没有pending消息,读取新消息 if !hasPending { result, err = getClient().Do(execCtx, "XREADGROUP", "GROUP", groupName, consumerName, "COUNT", count, "BLOCK", blockMs, "STREAMS", streamKey, ">", ) if err != nil { g.Log().Errorf(ctx, "❌ XREADGROUP读取新消息失败: stream=%s, error=%v", streamKey, err) time.Sleep(time.Second) goto RECONNECT } } // 预分配容量,避免动态扩容 messages := make([]StreamMessage, 0, int(count)) if result == nil || result.IsEmpty() { // 超时或没有数据 return messages, nil } // GoFrame gredis 返回格式: map[streamKey:[[msgID [field1 value1 field2 value2 ...]] ...]] resultVal := result.Val() // 尝试 map 格式(GoFrame gredis 返回) if streamsMap, ok := resultVal.(map[interface{}]interface{}); ok { for streamKey, streamMsgs := range streamsMap { msgsArray, ok := streamMsgs.([]interface{}) if !ok { g.Log().Errorf(ctx, "❌ streamMsgs类型转换失败: streamKey=%v, 实际类型=%T", streamKey, streamMsgs) continue } for i, msgData := range msgsArray { msgArray, ok := msgData.([]interface{}) if !ok { g.Log().Errorf(ctx, "❌ msgData类型转换失败: index=%d, 实际类型=%T", i, msgData) continue } if len(msgArray) < 2 { g.Log().Errorf(ctx, "❌ msgArray长度不足: index=%d, len=%d", i, len(msgArray)) continue } msgID := gconv.String(msgArray[0]) fieldsArray, ok := msgArray[1].([]interface{}) if !ok { g.Log().Errorf(ctx, "❌ fieldsArray类型转换失败: msgID=%s, msgArray[1]类型=%T", msgID, msgArray[1]) continue } values := make(map[string]interface{}, len(fieldsArray)/2) for i := 0; i < len(fieldsArray); i += 2 { if i+1 < len(fieldsArray) { key := gconv.String(fieldsArray[i]) values[key] = fieldsArray[i+1] } } messages = append(messages, StreamMessage{ ID: msgID, Values: values, }) } } if len(messages) == 0 { g.Log().Errorf(ctx, "❌ [ReadFromStream] map格式解析失败: streamsMap长度=%d, 但未提取到消息", len(streamsMap)) } return messages, nil } // 尝试数组格式(标准 Redis 返回) if streamsArray, ok := resultVal.([]interface{}); ok && len(streamsArray) > 0 { for _, streamData := range streamsArray { streamArray, ok := streamData.([]interface{}) if !ok || len(streamArray) < 2 { continue } messagesArray, ok := streamArray[1].([]interface{}) if !ok { continue } for _, msgData := range messagesArray { msgArray, ok := msgData.([]interface{}) if !ok || len(msgArray) < 2 { continue } msgID := gconv.String(msgArray[0]) fieldsArray, ok := msgArray[1].([]interface{}) if !ok { continue } values := make(map[string]interface{}, len(fieldsArray)/2) for i := 0; i < len(fieldsArray); i += 2 { if i+1 < len(fieldsArray) { key := gconv.String(fieldsArray[i]) values[key] = fieldsArray[i+1] } } messages = append(messages, StreamMessage{ ID: msgID, Values: values, }) } } if len(messages) == 0 { g.Log().Errorf(ctx, "❌ [ReadFromStream] 数组格式解析失败: streamsArray长度=%d, 但未提取到消息", len(streamsArray)) } return messages, nil } g.Log().Errorf(ctx, "❌ [ReadFromStream] 无法识别的result格式, resultVal类型: %T, 值: %+v", resultVal, resultVal) return messages, nil } // AckMessage 确认消息已处理 // 使用 gredis Do() 方法执行 XACK 命令 func AckMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error { // XACK streamKey groupName messageID1 messageID2 ... // 预分配容量,避免动态扩容 args := make([]interface{}, 0, len(messageIDs)+2) args = append(args, streamKey, groupName) for _, id := range messageIDs { args = append(args, id) } _, err := getClient().Do(ctx, "XACK", args...) return err } // GetStreamLength 获取 Stream 当前长度 // 使用 gredis Do() 方法执行 XLEN 命令 func GetStreamLength(ctx context.Context, streamKey string) (int64, error) { // XLEN streamKey result, err := getClient().Do(ctx, "XLEN", streamKey) if err != nil { return 0, err } length := gconv.Int64(result) return length, nil } // PendingMessage Pending 消息结构 type PendingMessage struct { ID string // 消息ID Consumer string // 消费者名称 Idle int64 // 空闲时间(毫秒) RetryCount int64 // 重试次数 } // GetPendingMessages 获取待处理消息 // 使用 gredis Do() 方法执行 XPENDING 命令 func GetPendingMessages(ctx context.Context, streamKey, groupName string, start, end string, count int64) ([]PendingMessage, error) { // XPENDING streamKey groupName start end count result, err := getClient().Do(ctx, "XPENDING", streamKey, groupName, start, end, count) if err != nil { return nil, err } if result == nil { return nil, nil } // 解析返回值:[[ID, consumer, idle, retryCount], ...] pendingArray, ok := result.Val().([]interface{}) if !ok { return nil, nil } messages := make([]PendingMessage, 0, len(pendingArray)) for _, item := range pendingArray { itemArray, ok := item.([]interface{}) if !ok || len(itemArray) < 4 { continue } messages = append(messages, PendingMessage{ ID: gconv.String(itemArray[0]), Consumer: gconv.String(itemArray[1]), Idle: gconv.Int64(itemArray[2]), RetryCount: gconv.Int64(itemArray[3]), }) } return messages, nil } // ClaimPendingMessage 认领超时的 Pending 消息 // 使用 gredis Do() 方法执行 XCLAIM 命令 func ClaimPendingMessage(ctx context.Context, streamKey, groupName, consumerName string, minIdleTime int64, messageIDs ...string) ([]StreamMessage, error) { // XCLAIM streamKey groupName consumerName minIdleTime messageID1 messageID2 ... args := []interface{}{streamKey, groupName, consumerName, minIdleTime} for _, id := range messageIDs { args = append(args, id) } result, err := getClient().Do(ctx, "XCLAIM", args...) if err != nil { return nil, err } if result == nil { return nil, nil } // 解析返回值:类似 XREADGROUP messagesArray, ok := result.Val().([]interface{}) if !ok { return nil, nil } // 预分配容量,避免动态扩容 messages := make([]StreamMessage, 0, len(messagesArray)) for _, msgData := range messagesArray { msgArray, ok := msgData.([]interface{}) if !ok || len(msgArray) < 2 { continue } msgID := gconv.String(msgArray[0]) fieldsArray, ok := msgArray[1].([]interface{}) if !ok { continue } // 预分配 map 容量 ,避免动态扩容 values := make(map[string]interface{}, len(fieldsArray)/2) for i := 0; i < len(fieldsArray); i += 2 { if i+1 < len(fieldsArray) { key := gconv.String(fieldsArray[i]) values[key] = fieldsArray[i+1] } } messages = append(messages, StreamMessage{ ID: msgID, Values: values, }) } return messages, nil } // SetSessionLastActive 设置用户最后活跃时间 // 使用 gredis SetEX 方法 func SetSessionLastActive(ctx context.Context, userId string) error { key := SessionLastActiveKeyPrefix + userId + ":last_active" timestamp := gtime.Now().Timestamp() // SETEX key 7200 value (7200秒 = 2小时) _, err := getClient().Do(ctx, "SETEX", key, 7200, timestamp) return err } // GetSessionLastActive 获取用户最后活跃时间 // 使用 gredis Get 方法 func GetSessionLastActive(ctx context.Context, userId string) (int64, error) { key := SessionLastActiveKeyPrefix + userId + ":last_active" result, err := getClient().Get(ctx, key) if err != nil { return 0, err } if result.IsEmpty() { return 0, nil } timestamp := gconv.Int64(result.Val()) return timestamp, nil } // IsUserActive 检查用户是否在指定时间范围内活跃过 // 用于追问逻辑:如果用户最近活跃过,则不发送追问消息 // 参数: // - userId: 用户ID // - seconds: 时间范围(秒),例如传入300表示检查5分钟内是否活跃 // // 返回: // - bool: true表示用户在指定时间内活跃过 // - error: 操作失败时返回错误 func IsUserActive(ctx context.Context, userId string, seconds int64) (bool, error) { lastActive, err := GetSessionLastActive(ctx, userId) if err != nil { return false, err } if lastActive == 0 { return false, nil // 未找到记录,视为不活跃 } // 检查时间差 now := gtime.Now().Timestamp() return (now - lastActive) < seconds, nil } // ============== 限流相关 ============== // IncrRateLimit 增加限流计数器,返回当前计数 // key: 限流key(需要包含完整路径,如 "ip:192.168.1.1") // windowSeconds: 时间窗口(秒) func IncrRateLimit(ctx context.Context, key string, windowSeconds int64) (count int64, err error) { fullKey := RateLimitKeyPrefix + key result, err := getClient().Do(ctx, "INCR", fullKey) if err != nil { return } count = result.Int64() // 首次设置过期时间 if count == 1 { getClient().Do(ctx, "EXPIRE", fullKey, windowSeconds) } return } // GetRateLimit 获取当前限流计数 func GetRateLimit(ctx context.Context, key string) (count int64, err error) { fullKey := RateLimitKeyPrefix + key result, err := getClient().Get(ctx, fullKey) if err != nil { return } if result.IsEmpty() { return 0, nil } count = result.Int64() return } // SetSessionCache 缓存 RAGFlow Session ID(租户+用户隔离) func SetSessionCache(ctx context.Context, tenantId, userId, sessionId string) error { key := SessionLastActiveKeyPrefix + tenantId + ":" + userId + ":session_id" // SETEX key 7200 value (7200秒 = 2小时,与last_active保持一致) _, err := getClient().Do(ctx, "SETEX", key, 7200, sessionId) return err } // GetSessionCache 获取缓存的 RAGFlow Session ID(租户+用户隔离) func GetSessionCache(ctx context.Context, tenantId, userId string) (string, error) { key := SessionLastActiveKeyPrefix + tenantId + ":" + userId + ":session_id" result, err := getClient().Get(ctx, key) if err != nil { return "", err } if result.IsEmpty() { return "", nil } return result.String(), nil } // DelSessionCache 删除缓存的 RAGFlow Session ID(归档时调用,租户+用户隔离) func DelSessionCache(ctx context.Context, tenantId, userId string) error { key := SessionLastActiveKeyPrefix + tenantId + ":" + userId + ":session_id" _, err := getClient().Del(ctx, key) return err } // TryLock 尝试获取分布式锁(非阻塞) // key: 锁的键名 // expireSeconds: 锁的过期时间(秒),防止死锁 // 返回 true 表示获取成功,false 表示锁已被其他节点持有 func TryLock(ctx context.Context, key string, expireSeconds int) bool { // SET key value NX EX expireSeconds result, err := getClient().Do(ctx, "SET", key, gtime.Now().String(), "NX", "EX", expireSeconds) if err != nil { glog.Errorf(ctx, "获取分布式锁失败: %v", err) return false } return result.String() == "OK" } // Unlock 释放分布式锁 func Unlock(ctx context.Context, key string) { if _, err := getClient().Del(ctx, key); err != nil { glog.Errorf(ctx, "释放分布式锁失败: %v", err) } } // ============== 对话计数相关(用于卡片触发)============== const ( // UserStateKeyPrefix 用户会话状态 Key 前缀(融合阶段+计数) UserStateKeyPrefix = "ragflow:user:state:" // UserStateExpireSeconds 用户状态过期时间(5分钟) UserStateExpireSeconds = 300 ) // UserState 用户会话状态(阶段+对话计数+咨询方向,统一5分钟过期) type UserState struct { Stage int `json:"stage"` // 当前阶段 Direction string `json:"direction"` // 咨询方向 Count int64 `json:"count"` // 对话计数(v5.2卡片触发) AccountName string `json:"accountName"` // 用户选择的方向对应的客服账号名称 } // GetUserState 获取用户状态(阶段+计数) func GetUserState(ctx context.Context, userId, platform string) (state *UserState, err error) { key := UserStateKeyPrefix + userId + "_" + platform result, err := getClient().Do(ctx, "HGETALL", key) if err != nil { return } state = &UserState{Stage: 5} // 默认状态5(未选择方向) if result.IsEmpty() { // Redis为空,初始化默认状态 if initErr := SetUserStage(ctx, userId, platform, 5); initErr != nil { err = initErr return } return } m := result.Map() state.Stage = gconv.Int(m["stage"]) state.Count = gconv.Int64(m["count"]) state.Direction = gconv.String(m["direction"]) return } // SetUserStage 设置用户阶段,并刷新过期时间 func SetUserStage(ctx context.Context, userId, platform string, stage int) error { key := UserStateKeyPrefix + userId + "_" + platform _, err := getClient().Do(ctx, "HSET", key, "stage", stage) if err != nil { return err } _, err = getClient().Do(ctx, "EXPIRE", key, UserStateExpireSeconds) return err } // SetUserAccountName 设置用户对应的客服账号名称,并刷新过期时间 func SetUserAccountName(ctx context.Context, userId, platform, accountName string) error { key := UserStateKeyPrefix + userId + "_" + platform _, err := getClient().Do(ctx, "HSET", key, "accountName", accountName) if err != nil { return err } _, err = getClient().Do(ctx, "EXPIRE", key, UserStateExpireSeconds) return err } // SetUserDirection 设置用户选择的咨询方向,并刷新过期时间 func SetUserDirection(ctx context.Context, userId, platform, direction string) error { key := UserStateKeyPrefix + userId + "_" + platform _, err := getClient().Do(ctx, "HSET", key, "direction", direction) if err != nil { return err } _, err = getClient().Do(ctx, "EXPIRE", key, UserStateExpireSeconds) return err } // IncrUserCount 增加用户对话计数,返回当前轮数,并刷新过期时间 func IncrUserCount(ctx context.Context, userId, platform string) (count int64, err error) { key := UserStateKeyPrefix + userId + "_" + platform result, err := getClient().Do(ctx, "HINCRBY", key, "count", 1) if err != nil { return } count = result.Int64() _, err = getClient().Do(ctx, "EXPIRE", key, UserStateExpireSeconds) return } // ResetUserState 重置用户状态(归档时调用) func ResetUserState(ctx context.Context, userId, platform string) error { key := UserStateKeyPrefix + userId + "_" + platform _, err := getClient().Del(ctx, key) return err } // ============== 对话缓存相关(5句落库)============== const ( // ConversationCacheKeyPrefix 对话缓存 Key 前缀 ConversationCacheKeyPrefix = "ragflow:conversation:cache:" // ConversationCacheExpireSeconds 对话缓存过期时间(10分钟) ConversationCacheExpireSeconds = 600 ) // CacheConversation 缓存单条对话到Redis List(按sessionId存储) func CacheConversation(ctx context.Context, sessionId string, data []byte) error { key := ConversationCacheKeyPrefix + sessionId _, err := getClient().Do(ctx, "RPUSH", key, string(data)) if err != nil { return err } _, err = getClient().Do(ctx, "EXPIRE", key, ConversationCacheExpireSeconds) return err } // GetCachedConversations 获取缓存的对话列表并清空(按sessionId查询) func GetCachedConversations(ctx context.Context, sessionId string) (list []string, err error) { key := ConversationCacheKeyPrefix + sessionId result, err := getClient().Do(ctx, "LRANGE", key, 0, -1) if err != nil { return } if result.IsEmpty() { return } list = result.Strings() // 清空缓存 getClient().Del(ctx, key) return } // GetCachedConversationCount 获取缓存的对话数量(按sessionId查询) func GetCachedConversationCount(ctx context.Context, sessionId string) (count int64, err error) { key := ConversationCacheKeyPrefix + sessionId result, err := getClient().Do(ctx, "LLEN", key) if err != nil { return } return result.Int64(), nil } // ClearCachedConversations 清空对话缓存(归档时调用,按sessionId) func ClearCachedConversations(ctx context.Context, sessionId string) error { key := ConversationCacheKeyPrefix + sessionId _, err := getClient().Del(ctx, key) return err } // ========== 以下为兼容旧接口(内部调用新实现)========== // IncrConversationCount 增加用户对话计数(兼容旧接口) func IncrConversationCount(ctx context.Context, userId, platform string, _ int64) (count int64, err error) { return IncrUserCount(ctx, userId, platform) } // GetConversationCount 获取用户当前对话轮数(兼容旧接口) func GetConversationCount(ctx context.Context, userId, platform string) (count int64, err error) { state, err := GetUserState(ctx, userId, platform) if err != nil { return } return state.Count, nil } // ResetConversationCount 重置用户对话计数(兼容旧接口) func ResetConversationCount(ctx context.Context, userId, platform string) error { return ResetUserState(ctx, userId, platform) }