This commit is contained in:
@@ -109,7 +109,7 @@ func (m *MongoDB) Find(ctx context.Context, filter bson.M, result interface{}, c
|
||||
redisKey := fmt.Sprintf(redis.List, user.TenantId, collection, filterKey, optionsKey)
|
||||
if m.Cache {
|
||||
var resultStr *gvar.Var
|
||||
resultStr, err = redis.RedisClient().Get(ctx, redisKey)
|
||||
resultStr, err = redis.RedisClient.Get(ctx, redisKey)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -168,7 +168,7 @@ func (m *MongoDB) Find(ctx context.Context, filter bson.M, result interface{}, c
|
||||
return
|
||||
}
|
||||
if m.Cache {
|
||||
err = redis.RedisClient().SetEX(ctx, redisKey, result, int64(time.Hour))
|
||||
err = redis.RedisClient.SetEX(ctx, redisKey, result, int64(time.Hour))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -200,7 +200,7 @@ func (m *MongoDB) FindOne(ctx context.Context, filter bson.M, result interface{}
|
||||
redisKey := fmt.Sprintf(redis.One, user.TenantId, collection, filterKey)
|
||||
if m.Cache {
|
||||
var resultStr *gvar.Var
|
||||
resultStr, err = redis.RedisClient().Get(ctx, redisKey)
|
||||
resultStr, err = redis.RedisClient.Get(ctx, redisKey)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -221,7 +221,7 @@ func (m *MongoDB) FindOne(ctx context.Context, filter bson.M, result interface{}
|
||||
err = nil
|
||||
}
|
||||
if m.Cache {
|
||||
err = redis.RedisClient().SetEX(ctx, redisKey, result, int64(time.Hour))
|
||||
err = redis.RedisClient.SetEX(ctx, redisKey, result, int64(time.Hour))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -231,23 +231,23 @@ func (m *MongoDB) FindOne(ctx context.Context, filter bson.M, result interface{}
|
||||
|
||||
func (m *MongoDB) CleanRedis(ctx context.Context, filter bson.M, tenantId interface{}, collection string) (err error) {
|
||||
listKeys := fmt.Sprintf(redis.CleanList, tenantId, collection)
|
||||
keys, err := redis.RedisClient().Keys(ctx, listKeys)
|
||||
keys, err := redis.RedisClient.Keys(ctx, listKeys)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for _, key := range keys {
|
||||
_, err = redis.RedisClient().Del(ctx, key)
|
||||
_, err = redis.RedisClient.Del(ctx, key)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
countKeys := fmt.Sprintf(redis.CleanCount, tenantId, collection)
|
||||
keys, err = redis.RedisClient().Keys(ctx, countKeys)
|
||||
keys, err = redis.RedisClient.Keys(ctx, countKeys)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for _, key := range keys {
|
||||
_, err = redis.RedisClient().Del(ctx, key)
|
||||
_, err = redis.RedisClient.Del(ctx, key)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -256,7 +256,7 @@ func (m *MongoDB) CleanRedis(ctx context.Context, filter bson.M, tenantId interf
|
||||
delete(filter, "tenantId")
|
||||
filterKey := fmt.Sprintf("%+v", filter)
|
||||
oneKey := fmt.Sprintf(redis.One, tenantId, collection, filterKey)
|
||||
_, err = redis.RedisClient().Del(ctx, oneKey)
|
||||
_, err = redis.RedisClient.Del(ctx, oneKey)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -525,7 +525,7 @@ func (m *MongoDB) Count(ctx context.Context, filter bson.M, collection string) (
|
||||
redisKey := fmt.Sprintf(redis.Count, user.TenantId, collection, filterKey)
|
||||
if m.Cache {
|
||||
var resultStr *gvar.Var
|
||||
resultStr, err = redis.RedisClient().Get(ctx, redisKey)
|
||||
resultStr, err = redis.RedisClient.Get(ctx, redisKey)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -536,7 +536,7 @@ func (m *MongoDB) Count(ctx context.Context, filter bson.M, collection string) (
|
||||
}
|
||||
count, err = db.Collection(collection).CountDocuments(ctx, filter)
|
||||
if m.Cache {
|
||||
err = redis.RedisClient().SetEX(ctx, redisKey, count, int64(time.Hour))
|
||||
err = redis.RedisClient.SetEX(ctx, redisKey, count, int64(time.Hour))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -14,27 +15,60 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
// redisClient 内部使用的 Redis 客户端(单例模式)
|
||||
redisClient *gredis.Redis
|
||||
redisOnce sync.Once
|
||||
)
|
||||
|
||||
// RedisClient 获取Redis客户端(支持重试3次,每次间隔2秒)
|
||||
func RedisClient() *gredis.Redis {
|
||||
// getClient 获取 Redis 客户端(延迟初始化)
|
||||
func getClient() *gredis.Redis {
|
||||
redisOnce.Do(func() {
|
||||
for i := 0; i < 3; i++ {
|
||||
redisClient = g.Redis()
|
||||
if redisClient != nil {
|
||||
ctx := context.Background()
|
||||
if _, err := redisClient.Do(ctx, "PING"); err == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
redisClient = g.Redis()
|
||||
})
|
||||
return redisClient
|
||||
}
|
||||
|
||||
// GetRedisClient 获取 Redis 客户端(供外部使用)
|
||||
func GetRedisClient() *gredis.Redis {
|
||||
return getClient()
|
||||
}
|
||||
|
||||
// RedisClient 导出的 Redis 客户端(供 mongo.go 使用,兼容旧代码)
|
||||
var RedisClient = getClient()
|
||||
|
||||
// Lock 分布式锁
|
||||
func Lock(ctx context.Context, key string, expireSeconds int64, fn func(ctx context.Context) error) (success bool, err error) {
|
||||
limit := 3
|
||||
LOOP:
|
||||
if limit < 0 {
|
||||
return false, errors.New("锁重试次数耗尽")
|
||||
}
|
||||
limit--
|
||||
if val, err := RedisClient.Set(ctx, key, true, gredis.SetOption{
|
||||
TTLOption: gredis.TTLOption{
|
||||
EX: &expireSeconds,
|
||||
},
|
||||
NX: true,
|
||||
}); err != nil {
|
||||
return false, err
|
||||
} else {
|
||||
if val.Bool() {
|
||||
defer func(RedisClient *gredis.Redis, ctx context.Context, key string) {
|
||||
if _, err = RedisClient.Del(ctx, key); err != nil {
|
||||
glog.Errorf(ctx, "RedisClient.Del error: %v", err)
|
||||
}
|
||||
}(RedisClient, ctx, key)
|
||||
if err = fn(ctx); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
} else {
|
||||
time.Sleep(time.Second)
|
||||
goto LOOP
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func GetReadStream(ctx context.Context, msg ...QueueMessage) error {
|
||||
for _, t := range msg {
|
||||
err := GetReadFromStream(ctx, t.StreamKey, t.GroupName, t.ConsumerName, t.BatchSize, t.BlockMs, t.AutoAck, t.HandleFunc)
|
||||
@@ -164,9 +198,6 @@ func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName stri
|
||||
|
||||
RECONNECT:
|
||||
// 先尝试读取pending消息(ID=0),处理积压
|
||||
glog.Debugf(ctx, "[DEBUG Redis] XREADGROUP GROUP %s %s COUNT %d BLOCK 0 STREAMS %s 0",
|
||||
groupName, consumerName, count, streamKey)
|
||||
|
||||
result, err := redisClient.Do(execCtx,
|
||||
"XREADGROUP", "GROUP", groupName, consumerName,
|
||||
"COUNT", count,
|
||||
@@ -179,9 +210,6 @@ RECONNECT:
|
||||
|
||||
// 如果没有pending消息,读取新消息
|
||||
if result == nil || result.IsEmpty() {
|
||||
glog.Debugf(ctx, "[DEBUG Redis] 无pending消息,读取新消息 XREADGROUP GROUP %s %s COUNT %d BLOCK %d STREAMS %s >",
|
||||
groupName, consumerName, count, blockMs, streamKey)
|
||||
|
||||
result, err = redisClient.Do(execCtx,
|
||||
"XREADGROUP", "GROUP", groupName, consumerName,
|
||||
"COUNT", count,
|
||||
@@ -193,8 +221,6 @@ RECONNECT:
|
||||
}
|
||||
}
|
||||
|
||||
glog.Debugf(ctx, "[DEBUG Redis] XREADGROUP 返回: %+v", result)
|
||||
|
||||
// 预分配容量,避免动态扩容
|
||||
messages := make([]StreamMessage, 0, int(count))
|
||||
|
||||
|
||||
@@ -116,3 +116,13 @@ func GetHistoryContextLimit() int64 {
|
||||
ctx := context.Background()
|
||||
return g.Cfg().MustGet(ctx, "history.contextLimit", 5).Int64() // 默认5轮对话
|
||||
}
|
||||
|
||||
// DocSyncMessage 文档同步消息结构(RAGFlow与MongoDB同步)
|
||||
type DocSyncMessage struct {
|
||||
DocId string `json:"docId"` // MongoDB文档ID
|
||||
RagflowDocId string `json:"ragflowDocId"` // RAGFlow文档ID
|
||||
TenantId string `json:"tenantId"` // 租户ID
|
||||
DocType string `json:"docType"` // 文档类型:speechcraft/product
|
||||
Action string `json:"action"` // 操作类型:sync_ragflow_id
|
||||
Timestamp int64 `json:"timestamp"` // 时间戳
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user