map预分配容量避免动态扩容,优化了返回值复用情况
This commit is contained in:
@@ -50,8 +50,7 @@ func loadConfig(ctx context.Context) (baseURL, apiKey string) {
|
|||||||
// 使用 GoFrame 全局配置(从项目的 config.yml 读取)
|
// 使用 GoFrame 全局配置(从项目的 config.yml 读取)
|
||||||
baseURL = g.Cfg().MustGet(ctx, "ragflow.base_url", "").String()
|
baseURL = g.Cfg().MustGet(ctx, "ragflow.base_url", "").String()
|
||||||
apiKey = g.Cfg().MustGet(ctx, "ragflow.api_key", "").String()
|
apiKey = g.Cfg().MustGet(ctx, "ragflow.api_key", "").String()
|
||||||
|
return
|
||||||
return baseURL, apiKey
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetGlobalClient 获取全局客户端
|
// GetGlobalClient 获取全局客户端
|
||||||
|
|||||||
@@ -90,7 +90,8 @@ func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName stri
|
|||||||
|
|
||||||
// 解析返回值
|
// 解析返回值
|
||||||
// 格式: [[streamKey, [[msgID, [field1, value1, field2, value2, ...]], ...]]]
|
// 格式: [[streamKey, [[msgID, [field1, value1, field2, value2, ...]], ...]]]
|
||||||
messages := []StreamMessage{}
|
// 预分配容量,避免动态扩容
|
||||||
|
messages := make([]StreamMessage, 0, int(count))
|
||||||
|
|
||||||
if result == nil {
|
if result == nil {
|
||||||
// 超时或没有数据
|
// 超时或没有数据
|
||||||
@@ -130,8 +131,8 @@ func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName stri
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// 解析字段为 map
|
// 解析字段为 map,预分配容量,避免动态扩容
|
||||||
values := make(map[string]interface{})
|
values := make(map[string]interface{}, len(fieldsArray)/2)
|
||||||
for i := 0; i < len(fieldsArray); i += 2 {
|
for i := 0; i < len(fieldsArray); i += 2 {
|
||||||
if i+1 < len(fieldsArray) {
|
if i+1 < len(fieldsArray) {
|
||||||
key := gconv.String(fieldsArray[i])
|
key := gconv.String(fieldsArray[i])
|
||||||
@@ -154,7 +155,9 @@ func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName stri
|
|||||||
// 使用 gredis Do() 方法执行 XACK 命令
|
// 使用 gredis Do() 方法执行 XACK 命令
|
||||||
func AckMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error {
|
func AckMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error {
|
||||||
// XACK streamKey groupName messageID1 messageID2 ...
|
// XACK streamKey groupName messageID1 messageID2 ...
|
||||||
args := []interface{}{streamKey, groupName}
|
// 预分配容量,避免动态扩容
|
||||||
|
args := make([]interface{}, 0, len(messageIDs)+2)
|
||||||
|
args = append(args, streamKey, groupName)
|
||||||
for _, id := range messageIDs {
|
for _, id := range messageIDs {
|
||||||
args = append(args, id)
|
args = append(args, id)
|
||||||
}
|
}
|
||||||
@@ -245,7 +248,8 @@ func ClaimPendingMessage(ctx context.Context, streamKey, groupName, consumerName
|
|||||||
return []StreamMessage{}, nil
|
return []StreamMessage{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var messages []StreamMessage
|
// 预分配容量,避免动态扩容
|
||||||
|
messages := make([]StreamMessage, 0, len(messagesArray))
|
||||||
for _, msgData := range messagesArray {
|
for _, msgData := range messagesArray {
|
||||||
msgArray, ok := msgData.([]interface{})
|
msgArray, ok := msgData.([]interface{})
|
||||||
if !ok || len(msgArray) < 2 {
|
if !ok || len(msgArray) < 2 {
|
||||||
@@ -258,12 +262,12 @@ func ClaimPendingMessage(ctx context.Context, streamKey, groupName, consumerName
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
values := make(map[string]interface{})
|
// 预分配 map 容量 ,避免动态扩容
|
||||||
|
values := make(map[string]interface{}, len(fieldsArray)/2)
|
||||||
for i := 0; i < len(fieldsArray); i += 2 {
|
for i := 0; i < len(fieldsArray); i += 2 {
|
||||||
if i+1 < len(fieldsArray) {
|
if i+1 < len(fieldsArray) {
|
||||||
key := gconv.String(fieldsArray[i])
|
key := gconv.String(fieldsArray[i])
|
||||||
val := fieldsArray[i+1]
|
values[key] = fieldsArray[i+1]
|
||||||
values[key] = val
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
39
redis/types.go
Normal file
39
redis/types.go
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
package redis
|
||||||
|
|
||||||
|
// SendStreamMessage 发送到 Redis Stream 的消息结构
|
||||||
|
type SendStreamMessage struct {
|
||||||
|
UserId string `json:"user_id"` // 用户ID
|
||||||
|
Content string `json:"content"` // 消息内容
|
||||||
|
Timestamp int64 `json:"timestamp"` // 时间戳(秒)
|
||||||
|
MessageId string `json:"message_id"` // 消息唯一ID
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToMap 转换为 map[string]interface{} 用于 Stream 存储
|
||||||
|
func (m *SendStreamMessage) ToMap() map[string]interface{} {
|
||||||
|
return map[string]interface{}{
|
||||||
|
"user_id": m.UserId,
|
||||||
|
"content": m.Content,
|
||||||
|
"timestamp": m.Timestamp,
|
||||||
|
"message_id": m.MessageId,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchStreamMessage 批量消息结构
|
||||||
|
type BatchStreamMessage struct {
|
||||||
|
UserId string `json:"user_id"` // 用户ID
|
||||||
|
Content string `json:"content"` // 消息内容
|
||||||
|
Timestamp int64 `json:"timestamp"` // 时间戳(秒)
|
||||||
|
BatchId string `json:"batch_id"` // 批次ID
|
||||||
|
Index int `json:"index"` // 批次内序号
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToMap 转换为 map[string]interface{} 用于 Stream 存储
|
||||||
|
func (m *BatchStreamMessage) ToMap() map[string]interface{} {
|
||||||
|
return map[string]interface{}{
|
||||||
|
"user_id": m.UserId,
|
||||||
|
"content": m.Content,
|
||||||
|
"timestamp": m.Timestamp,
|
||||||
|
"batch_id": m.BatchId,
|
||||||
|
"index": m.Index,
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user