更新对gateway的匹配

This commit is contained in:
Cold
2026-01-28 15:27:32 +08:00
committed by 张斌
parent 1fcaeae3b0
commit 65b231b88f

View File

@@ -147,7 +147,7 @@ type StreamMessage struct {
// 使用 gredis Do() 方法执行 XGROUP CREATE 命令 // 使用 gredis Do() 方法执行 XGROUP CREATE 命令
func InitStreamGroup(ctx context.Context, streamKey, groupName string) error { func InitStreamGroup(ctx context.Context, streamKey, groupName string) error {
// XGROUP CREATE streamKey groupName 0 MKSTREAM // XGROUP CREATE streamKey groupName 0 MKSTREAM
_, err := redisClient.Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM") _, err := getClient().Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM")
if err != nil { if err != nil {
// 如果组已存在,忽略错误 // 如果组已存在,忽略错误
errStr := err.Error() errStr := err.Error()
@@ -173,7 +173,7 @@ func AddToStream(ctx context.Context, streamKey string, msg interface{}) (messag
args = append(args, key, val) args = append(args, key, val)
} }
result, err := redisClient.Do(ctx, "XADD", args...) result, err := getClient().Do(ctx, "XADD", args...)
if err != nil { if err != nil {
return return
} }
@@ -186,7 +186,7 @@ func AddToStream(ctx context.Context, streamKey string, msg interface{}) (messag
// XGROUP CREATE streamKey groupName 0 MKSTREAM // XGROUP CREATE streamKey groupName 0 MKSTREAM
// 使用0作为起始ID从Stream开头读取所有未消费消息 // 使用0作为起始ID从Stream开头读取所有未消费消息
func CreateConsumerGroup(ctx context.Context, streamKey, groupName string) error { func CreateConsumerGroup(ctx context.Context, streamKey, groupName string) error {
_, err := redisClient.Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM") _, err := getClient().Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM")
return err return err
} }
@@ -202,7 +202,7 @@ func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName stri
RECONNECT: RECONNECT:
// 先尝试读取pending消息ID=0处理积压 // 先尝试读取pending消息ID=0处理积压
result, err := redisClient.Do(execCtx, result, err := getClient().Do(execCtx,
"XREADGROUP", "GROUP", groupName, consumerName, "XREADGROUP", "GROUP", groupName, consumerName,
"COUNT", count, "COUNT", count,
"BLOCK", 0, // 不阻塞,立即返回 "BLOCK", 0, // 不阻塞,立即返回
@@ -232,7 +232,7 @@ RECONNECT:
// 如果没有pending消息读取新消息 // 如果没有pending消息读取新消息
if !hasPending { if !hasPending {
result, err = redisClient.Do(execCtx, result, err = getClient().Do(execCtx,
"XREADGROUP", "GROUP", groupName, consumerName, "XREADGROUP", "GROUP", groupName, consumerName,
"COUNT", count, "COUNT", count,
"BLOCK", blockMs, "BLOCK", blockMs,
@@ -354,7 +354,7 @@ func AckMessage(ctx context.Context, streamKey, groupName string, messageIDs ...
args = append(args, id) args = append(args, id)
} }
_, err := redisClient.Do(ctx, "XACK", args...) _, err := getClient().Do(ctx, "XACK", args...)
return err return err
} }
@@ -362,7 +362,7 @@ func AckMessage(ctx context.Context, streamKey, groupName string, messageIDs ...
// 使用 gredis Do() 方法执行 XLEN 命令 // 使用 gredis Do() 方法执行 XLEN 命令
func GetStreamLength(ctx context.Context, streamKey string) (int64, error) { func GetStreamLength(ctx context.Context, streamKey string) (int64, error) {
// XLEN streamKey // XLEN streamKey
result, err := redisClient.Do(ctx, "XLEN", streamKey) result, err := getClient().Do(ctx, "XLEN", streamKey)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@@ -383,7 +383,7 @@ type PendingMessage struct {
// 使用 gredis Do() 方法执行 XPENDING 命令 // 使用 gredis Do() 方法执行 XPENDING 命令
func GetPendingMessages(ctx context.Context, streamKey, groupName string, start, end string, count int64) ([]PendingMessage, error) { func GetPendingMessages(ctx context.Context, streamKey, groupName string, start, end string, count int64) ([]PendingMessage, error) {
// XPENDING streamKey groupName start end count // XPENDING streamKey groupName start end count
result, err := redisClient.Do(ctx, "XPENDING", streamKey, groupName, start, end, count) result, err := getClient().Do(ctx, "XPENDING", streamKey, groupName, start, end, count)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -425,7 +425,7 @@ func ClaimPendingMessage(ctx context.Context, streamKey, groupName, consumerName
args = append(args, id) args = append(args, id)
} }
result, err := redisClient.Do(ctx, "XCLAIM", args...) result, err := getClient().Do(ctx, "XCLAIM", args...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -479,7 +479,7 @@ func SetSessionLastActive(ctx context.Context, userId string) error {
timestamp := gtime.Now().Timestamp() timestamp := gtime.Now().Timestamp()
// SETEX key 7200 value (7200秒 = 2小时) // SETEX key 7200 value (7200秒 = 2小时)
_, err := redisClient.Do(ctx, "SETEX", key, 7200, timestamp) _, err := getClient().Do(ctx, "SETEX", key, 7200, timestamp)
return err return err
} }
@@ -487,7 +487,7 @@ func SetSessionLastActive(ctx context.Context, userId string) error {
// 使用 gredis Get 方法 // 使用 gredis Get 方法
func GetSessionLastActive(ctx context.Context, userId string) (int64, error) { func GetSessionLastActive(ctx context.Context, userId string) (int64, error) {
key := SessionLastActiveKeyPrefix + userId + ":last_active" key := SessionLastActiveKeyPrefix + userId + ":last_active"
result, err := redisClient.Get(ctx, key) result, err := getClient().Get(ctx, key)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@@ -531,7 +531,7 @@ func IsUserActive(ctx context.Context, userId string, seconds int64) (bool, erro
// windowSeconds: 时间窗口(秒) // windowSeconds: 时间窗口(秒)
func IncrRateLimit(ctx context.Context, key string, windowSeconds int64) (count int64, err error) { func IncrRateLimit(ctx context.Context, key string, windowSeconds int64) (count int64, err error) {
fullKey := RateLimitKeyPrefix + key fullKey := RateLimitKeyPrefix + key
result, err := redisClient.Do(ctx, "INCR", fullKey) result, err := getClient().Do(ctx, "INCR", fullKey)
if err != nil { if err != nil {
return return
} }
@@ -539,7 +539,7 @@ func IncrRateLimit(ctx context.Context, key string, windowSeconds int64) (count
// 首次设置过期时间 // 首次设置过期时间
if count == 1 { if count == 1 {
redisClient.Do(ctx, "EXPIRE", fullKey, windowSeconds) getClient().Do(ctx, "EXPIRE", fullKey, windowSeconds)
} }
return return
} }
@@ -547,7 +547,7 @@ func IncrRateLimit(ctx context.Context, key string, windowSeconds int64) (count
// GetRateLimit 获取当前限流计数 // GetRateLimit 获取当前限流计数
func GetRateLimit(ctx context.Context, key string) (count int64, err error) { func GetRateLimit(ctx context.Context, key string) (count int64, err error) {
fullKey := RateLimitKeyPrefix + key fullKey := RateLimitKeyPrefix + key
result, err := redisClient.Get(ctx, fullKey) result, err := getClient().Get(ctx, fullKey)
if err != nil { if err != nil {
return return
} }
@@ -562,14 +562,14 @@ func GetRateLimit(ctx context.Context, key string) (count int64, err error) {
func SetSessionCache(ctx context.Context, tenantId, userId, sessionId string) error { func SetSessionCache(ctx context.Context, tenantId, userId, sessionId string) error {
key := SessionLastActiveKeyPrefix + tenantId + ":" + userId + ":session_id" key := SessionLastActiveKeyPrefix + tenantId + ":" + userId + ":session_id"
// SETEX key 7200 value (7200秒 = 2小时与last_active保持一致) // SETEX key 7200 value (7200秒 = 2小时与last_active保持一致)
_, err := redisClient.Do(ctx, "SETEX", key, 7200, sessionId) _, err := getClient().Do(ctx, "SETEX", key, 7200, sessionId)
return err return err
} }
// GetSessionCache 获取缓存的 RAGFlow Session ID租户+用户隔离) // GetSessionCache 获取缓存的 RAGFlow Session ID租户+用户隔离)
func GetSessionCache(ctx context.Context, tenantId, userId string) (string, error) { func GetSessionCache(ctx context.Context, tenantId, userId string) (string, error) {
key := SessionLastActiveKeyPrefix + tenantId + ":" + userId + ":session_id" key := SessionLastActiveKeyPrefix + tenantId + ":" + userId + ":session_id"
result, err := redisClient.Get(ctx, key) result, err := getClient().Get(ctx, key)
if err != nil { if err != nil {
return "", err return "", err
} }
@@ -584,7 +584,7 @@ func GetSessionCache(ctx context.Context, tenantId, userId string) (string, erro
// DelSessionCache 删除缓存的 RAGFlow Session ID归档时调用租户+用户隔离) // DelSessionCache 删除缓存的 RAGFlow Session ID归档时调用租户+用户隔离)
func DelSessionCache(ctx context.Context, tenantId, userId string) error { func DelSessionCache(ctx context.Context, tenantId, userId string) error {
key := SessionLastActiveKeyPrefix + tenantId + ":" + userId + ":session_id" key := SessionLastActiveKeyPrefix + tenantId + ":" + userId + ":session_id"
_, err := redisClient.Del(ctx, key) _, err := getClient().Del(ctx, key)
return err return err
} }
@@ -594,7 +594,7 @@ func DelSessionCache(ctx context.Context, tenantId, userId string) error {
// 返回 true 表示获取成功false 表示锁已被其他节点持有 // 返回 true 表示获取成功false 表示锁已被其他节点持有
func TryLock(ctx context.Context, key string, expireSeconds int) bool { func TryLock(ctx context.Context, key string, expireSeconds int) bool {
// SET key value NX EX expireSeconds // SET key value NX EX expireSeconds
result, err := redisClient.Do(ctx, "SET", key, gtime.Now().String(), "NX", "EX", expireSeconds) result, err := getClient().Do(ctx, "SET", key, gtime.Now().String(), "NX", "EX", expireSeconds)
if err != nil { if err != nil {
glog.Errorf(ctx, "获取分布式锁失败: %v", err) glog.Errorf(ctx, "获取分布式锁失败: %v", err)
return false return false
@@ -604,7 +604,7 @@ func TryLock(ctx context.Context, key string, expireSeconds int) bool {
// Unlock 释放分布式锁 // Unlock 释放分布式锁
func Unlock(ctx context.Context, key string) { func Unlock(ctx context.Context, key string) {
if _, err := redisClient.Del(ctx, key); err != nil { if _, err := getClient().Del(ctx, key); err != nil {
glog.Errorf(ctx, "释放分布式锁失败: %v", err) glog.Errorf(ctx, "释放分布式锁失败: %v", err)
} }
} }
@@ -629,7 +629,7 @@ type UserState struct {
// GetUserState 获取用户状态(阶段+计数) // GetUserState 获取用户状态(阶段+计数)
func GetUserState(ctx context.Context, userId, platform string) (state *UserState, err error) { func GetUserState(ctx context.Context, userId, platform string) (state *UserState, err error) {
key := UserStateKeyPrefix + userId + "_" + platform key := UserStateKeyPrefix + userId + "_" + platform
result, err := redisClient.Do(ctx, "HGETALL", key) result, err := getClient().Do(ctx, "HGETALL", key)
if err != nil { if err != nil {
return return
} }
@@ -654,52 +654,52 @@ func GetUserState(ctx context.Context, userId, platform string) (state *UserStat
// SetUserStage 设置用户阶段,并刷新过期时间 // SetUserStage 设置用户阶段,并刷新过期时间
func SetUserStage(ctx context.Context, userId, platform string, stage int) error { func SetUserStage(ctx context.Context, userId, platform string, stage int) error {
key := UserStateKeyPrefix + userId + "_" + platform key := UserStateKeyPrefix + userId + "_" + platform
_, err := redisClient.Do(ctx, "HSET", key, "stage", stage) _, err := getClient().Do(ctx, "HSET", key, "stage", stage)
if err != nil { if err != nil {
return err return err
} }
_, err = redisClient.Do(ctx, "EXPIRE", key, UserStateExpireSeconds) _, err = getClient().Do(ctx, "EXPIRE", key, UserStateExpireSeconds)
return err return err
} }
// SetUserAccountName 设置用户对应的客服账号名称,并刷新过期时间 // SetUserAccountName 设置用户对应的客服账号名称,并刷新过期时间
func SetUserAccountName(ctx context.Context, userId, platform, accountName string) error { func SetUserAccountName(ctx context.Context, userId, platform, accountName string) error {
key := UserStateKeyPrefix + userId + "_" + platform key := UserStateKeyPrefix + userId + "_" + platform
_, err := redisClient.Do(ctx, "HSET", key, "accountName", accountName) _, err := getClient().Do(ctx, "HSET", key, "accountName", accountName)
if err != nil { if err != nil {
return err return err
} }
_, err = redisClient.Do(ctx, "EXPIRE", key, UserStateExpireSeconds) _, err = getClient().Do(ctx, "EXPIRE", key, UserStateExpireSeconds)
return err return err
} }
// SetUserDirection 设置用户选择的咨询方向,并刷新过期时间 // SetUserDirection 设置用户选择的咨询方向,并刷新过期时间
func SetUserDirection(ctx context.Context, userId, platform, direction string) error { func SetUserDirection(ctx context.Context, userId, platform, direction string) error {
key := UserStateKeyPrefix + userId + "_" + platform key := UserStateKeyPrefix + userId + "_" + platform
_, err := redisClient.Do(ctx, "HSET", key, "direction", direction) _, err := getClient().Do(ctx, "HSET", key, "direction", direction)
if err != nil { if err != nil {
return err return err
} }
_, err = redisClient.Do(ctx, "EXPIRE", key, UserStateExpireSeconds) _, err = getClient().Do(ctx, "EXPIRE", key, UserStateExpireSeconds)
return err return err
} }
// IncrUserCount 增加用户对话计数,返回当前轮数,并刷新过期时间 // IncrUserCount 增加用户对话计数,返回当前轮数,并刷新过期时间
func IncrUserCount(ctx context.Context, userId, platform string) (count int64, err error) { func IncrUserCount(ctx context.Context, userId, platform string) (count int64, err error) {
key := UserStateKeyPrefix + userId + "_" + platform key := UserStateKeyPrefix + userId + "_" + platform
result, err := redisClient.Do(ctx, "HINCRBY", key, "count", 1) result, err := getClient().Do(ctx, "HINCRBY", key, "count", 1)
if err != nil { if err != nil {
return return
} }
count = result.Int64() count = result.Int64()
_, err = redisClient.Do(ctx, "EXPIRE", key, UserStateExpireSeconds) _, err = getClient().Do(ctx, "EXPIRE", key, UserStateExpireSeconds)
return return
} }
// ResetUserState 重置用户状态(归档时调用) // ResetUserState 重置用户状态(归档时调用)
func ResetUserState(ctx context.Context, userId, platform string) error { func ResetUserState(ctx context.Context, userId, platform string) error {
key := UserStateKeyPrefix + userId + "_" + platform key := UserStateKeyPrefix + userId + "_" + platform
_, err := redisClient.Del(ctx, key) _, err := getClient().Del(ctx, key)
return err return err
} }
@@ -715,18 +715,18 @@ const (
// CacheConversation 缓存单条对话到Redis List按sessionId存储 // CacheConversation 缓存单条对话到Redis List按sessionId存储
func CacheConversation(ctx context.Context, sessionId string, data []byte) error { func CacheConversation(ctx context.Context, sessionId string, data []byte) error {
key := ConversationCacheKeyPrefix + sessionId key := ConversationCacheKeyPrefix + sessionId
_, err := redisClient.Do(ctx, "RPUSH", key, string(data)) _, err := getClient().Do(ctx, "RPUSH", key, string(data))
if err != nil { if err != nil {
return err return err
} }
_, err = redisClient.Do(ctx, "EXPIRE", key, ConversationCacheExpireSeconds) _, err = getClient().Do(ctx, "EXPIRE", key, ConversationCacheExpireSeconds)
return err return err
} }
// GetCachedConversations 获取缓存的对话列表并清空按sessionId查询 // GetCachedConversations 获取缓存的对话列表并清空按sessionId查询
func GetCachedConversations(ctx context.Context, sessionId string) (list []string, err error) { func GetCachedConversations(ctx context.Context, sessionId string) (list []string, err error) {
key := ConversationCacheKeyPrefix + sessionId key := ConversationCacheKeyPrefix + sessionId
result, err := redisClient.Do(ctx, "LRANGE", key, 0, -1) result, err := getClient().Do(ctx, "LRANGE", key, 0, -1)
if err != nil { if err != nil {
return return
} }
@@ -735,14 +735,14 @@ func GetCachedConversations(ctx context.Context, sessionId string) (list []strin
} }
list = result.Strings() list = result.Strings()
// 清空缓存 // 清空缓存
redisClient.Del(ctx, key) getClient().Del(ctx, key)
return return
} }
// GetCachedConversationCount 获取缓存的对话数量按sessionId查询 // GetCachedConversationCount 获取缓存的对话数量按sessionId查询
func GetCachedConversationCount(ctx context.Context, sessionId string) (count int64, err error) { func GetCachedConversationCount(ctx context.Context, sessionId string) (count int64, err error) {
key := ConversationCacheKeyPrefix + sessionId key := ConversationCacheKeyPrefix + sessionId
result, err := redisClient.Do(ctx, "LLEN", key) result, err := getClient().Do(ctx, "LLEN", key)
if err != nil { if err != nil {
return return
} }
@@ -752,7 +752,7 @@ func GetCachedConversationCount(ctx context.Context, sessionId string) (count in
// ClearCachedConversations 清空对话缓存归档时调用按sessionId // ClearCachedConversations 清空对话缓存归档时调用按sessionId
func ClearCachedConversations(ctx context.Context, sessionId string) error { func ClearCachedConversations(ctx context.Context, sessionId string) error {
key := ConversationCacheKeyPrefix + sessionId key := ConversationCacheKeyPrefix + sessionId
_, err := redisClient.Del(ctx, key) _, err := getClient().Del(ctx, key)
return err return err
} }