From f7cb007491fd6f3238ac6dcbb8010ffab80a46d7 Mon Sep 17 00:00:00 2001 From: Cold <16419454+cold502@user.noreply.gitee.com> Date: Sat, 6 Dec 2025 12:02:34 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8D=8F=E7=A8=8B=E6=B1=A0=E5=8D=95=E4=BE=8B?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ragflow/worker_pool.go | 41 +++++++++++++++++++++++++++++++---------- redis/redis.go | 35 +++++++++++++++++++---------------- 2 files changed, 50 insertions(+), 26 deletions(-) diff --git a/ragflow/worker_pool.go b/ragflow/worker_pool.go index 8c46c47..47767bf 100644 --- a/ragflow/worker_pool.go +++ b/ragflow/worker_pool.go @@ -2,6 +2,7 @@ package ragflow import ( "context" + "sync" "gitee.com/red-future---jilin-g/common/redis" "github.com/gogf/gf/v2/errors/gerror" @@ -15,24 +16,44 @@ type WorkerPool struct { size int } -// NewWorkerPool 创建协程池 +// 单例模式相关变量 +var ( + workerPoolInstance *WorkerPool + workerPoolOnce sync.Once +) + +// GetWorkerPoolWithSize 获取指定大小的协程池单例 +// 使用 sync.Once 确保只创建一次,size 仅首次调用生效 +func GetWorkerPoolWithSize(size int) *WorkerPool { + workerPoolOnce.Do(func() { + if size <= 0 { + size = 200 // 默认大小 + } + workerPoolInstance = &WorkerPool{ + pool: grpool.New(size), + size: size, + } + }) + return workerPoolInstance +} + +// GetWorkerPool 获取协程池单例(使用默认大小 200) +func GetWorkerPool() *WorkerPool { + return GetWorkerPoolWithSize(200) +} + +// NewWorkerPool 创建协程池(兼容旧代码,内部使用单例) // 参数: -// - size: 协程池大小,建议设置为 CPU 核心数的 2-4 倍 +// - size: 协程池大小,仅首次调用生效 // // 返回: -// - *WorkerPool: 协程池实例 +// - *WorkerPool: 协程池单例实例 // - error: 创建失败时返回错误 func NewWorkerPool(size int) (*WorkerPool, error) { if size <= 0 { return nil, gerror.New("协程池大小必须大于0") } - - pool := grpool.New(size) - - return &WorkerPool{ - pool: pool, - size: size, - }, nil + return GetWorkerPoolWithSize(size), nil } // Submit 提交任务到协程池 diff --git a/redis/redis.go b/redis/redis.go index 7856a4b..1e5ca10 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -10,16 +10,19 @@ import ( "github.com/gogf/gf/v2/util/gconv" ) -// GRedisClient GoFrame gredis 客户端,统一使用 +// GRedisClient GoFrame gredis 客户端,统一使用(懒加载) var GRedisClient *gredis.Redis // RedisClient GRedisClient 的别名,保持向后兼容 var RedisClient *gredis.Redis -func init() { - // 初始化 GoFrame gredis 客户端 - GRedisClient = g.Redis() - RedisClient = GRedisClient // 别名指向同一个客户端 +// GetRedisClient 获取 Redis 客户端(懒加载) +func GetRedisClient() *gredis.Redis { + if GRedisClient == nil { + GRedisClient = g.Redis() + RedisClient = GRedisClient + } + return GRedisClient } // Stream 和消费者组常量 @@ -42,7 +45,7 @@ type StreamMessage struct { // 使用 gredis Do() 方法执行 XGROUP CREATE 命令 func InitStreamGroup(ctx context.Context, streamKey, groupName string) error { // XGROUP CREATE streamKey groupName 0 MKSTREAM - _, err := GRedisClient.Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM") + _, err := GetRedisClient().Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM") if err != nil { // 如果组已存在,忽略错误 errStr := err.Error() @@ -63,7 +66,7 @@ func AddToStream(ctx context.Context, streamKey string, values map[string]interf args = append(args, key, val) } - result, err := GRedisClient.Do(ctx, "XADD", args...) + result, err := GetRedisClient().Do(ctx, "XADD", args...) if err != nil { return "", err } @@ -77,7 +80,7 @@ func AddToStream(ctx context.Context, streamKey string, values map[string]interf // 使用 gredis Do() 方法执行 XREADGROUP 命令 func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count int64, blockMs int64) ([]StreamMessage, error) { // XREADGROUP GROUP groupName consumerName COUNT count BLOCK blockMs STREAMS streamKey > - result, err := GRedisClient.Do(ctx, + result, err := GetRedisClient().Do(ctx, "XREADGROUP", "GROUP", groupName, consumerName, "COUNT", count, "BLOCK", blockMs, @@ -162,7 +165,7 @@ func AckMessage(ctx context.Context, streamKey, groupName string, messageIDs ... args = append(args, id) } - _, err := GRedisClient.Do(ctx, "XACK", args...) + _, err := GetRedisClient().Do(ctx, "XACK", args...) return err } @@ -170,7 +173,7 @@ func AckMessage(ctx context.Context, streamKey, groupName string, messageIDs ... // 使用 gredis Do() 方法执行 XLEN 命令 func GetStreamLength(ctx context.Context, streamKey string) (int64, error) { // XLEN streamKey - result, err := GRedisClient.Do(ctx, "XLEN", streamKey) + result, err := GetRedisClient().Do(ctx, "XLEN", streamKey) if err != nil { return 0, err } @@ -191,7 +194,7 @@ type PendingMessage struct { // 使用 gredis Do() 方法执行 XPENDING 命令 func GetPendingMessages(ctx context.Context, streamKey, groupName string, start, end string, count int64) ([]PendingMessage, error) { // XPENDING streamKey groupName start end count - result, err := GRedisClient.Do(ctx, "XPENDING", streamKey, groupName, start, end, count) + result, err := GetRedisClient().Do(ctx, "XPENDING", streamKey, groupName, start, end, count) if err != nil { return nil, err } @@ -233,7 +236,7 @@ func ClaimPendingMessage(ctx context.Context, streamKey, groupName, consumerName args = append(args, id) } - result, err := GRedisClient.Do(ctx, "XCLAIM", args...) + result, err := GetRedisClient().Do(ctx, "XCLAIM", args...) if err != nil { return nil, err } @@ -287,7 +290,7 @@ func SetSessionLastActive(ctx context.Context, userId string) error { timestamp := gtime.Now().Timestamp() // SETEX key 7200 value (7200秒 = 2小时) - _, err := GRedisClient.Do(ctx, "SETEX", key, 7200, timestamp) + _, err := GetRedisClient().Do(ctx, "SETEX", key, 7200, timestamp) return err } @@ -295,7 +298,7 @@ func SetSessionLastActive(ctx context.Context, userId string) error { // 使用 gredis Get 方法 func GetSessionLastActive(ctx context.Context, userId string) (int64, error) { key := SessionLastActiveKeyPrefix + userId + ":last_active" - result, err := GRedisClient.Get(ctx, key) + result, err := GetRedisClient().Get(ctx, key) if err != nil { return 0, err } @@ -337,7 +340,7 @@ func SetSessionCache(ctx context.Context, userId, sessionId string) error { key := SessionLastActiveKeyPrefix + userId + ":session_id" // SETEX key 604800 value (604800秒 = 7天) - _, err := GRedisClient.Do(ctx, "SETEX", key, 604800, sessionId) + _, err := GetRedisClient().Do(ctx, "SETEX", key, 604800, sessionId) return err } @@ -345,7 +348,7 @@ func SetSessionCache(ctx context.Context, userId, sessionId string) error { // 使用 gredis Get 方法 func GetSessionCache(ctx context.Context, userId string) (string, error) { key := SessionLastActiveKeyPrefix + userId + ":session_id" - result, err := GRedisClient.Get(ctx, key) + result, err := GetRedisClient().Get(ctx, key) if err != nil { return "", err }