修改轮询为阻塞消息

This commit is contained in:
Cold
2025-12-26 15:13:07 +08:00
committed by 张斌
parent 7695752e1c
commit 7e3b0362d9
2 changed files with 26 additions and 19 deletions

View File

@@ -108,8 +108,15 @@ func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName stri
glog.Debugf(ctx, "[DEBUG Redis] XREADGROUP GROUP %s %s COUNT %d BLOCK %d STREAMS %s >", glog.Debugf(ctx, "[DEBUG Redis] XREADGROUP GROUP %s %s COUNT %d BLOCK %d STREAMS %s >",
groupName, consumerName, count, blockMs, streamKey) groupName, consumerName, count, blockMs, streamKey)
// 检查是否需要记录trace避免轮询产生大量trace
execCtx := ctx
if !g.Cfg().MustGet(ctx, "jaeger.traceStream", true).Bool() {
// 不记录trace使用background context不继承span
execCtx = context.Background()
}
// XREADGROUP GROUP groupName consumerName COUNT count BLOCK blockMs STREAMS streamKey > // XREADGROUP GROUP groupName consumerName COUNT count BLOCK blockMs STREAMS streamKey >
result, err := redisClient.Do(ctx, result, err := redisClient.Do(execCtx,
"XREADGROUP", "GROUP", groupName, consumerName, "XREADGROUP", "GROUP", groupName, consumerName,
"COUNT", count, "COUNT", count,
"BLOCK", blockMs, "BLOCK", blockMs,
@@ -487,10 +494,10 @@ const (
// UserState 用户会话状态(阶段+对话计数+咨询方向,统一5分钟过期 // UserState 用户会话状态(阶段+对话计数+咨询方向,统一5分钟过期
type UserState struct { type UserState struct {
Stage int `json:"stage"` // 当前阶段 Stage int `json:"stage"` // 当前阶段
Direction string `json:"direction"` // 咨询方向 Direction string `json:"direction"` // 咨询方向
Count int64 `json:"count"` // 对话计数v5.2卡片触发) Count int64 `json:"count"` // 对话计数v5.2卡片触发)
CustomerServiceId string `json:"customerServiceId"` // 用户选择的方向对应的客服账号ID AccountName string `json:"accountName"` // 用户选择的方向对应的客服账号名称
} }
// GetUserState 获取用户状态(阶段+计数) // GetUserState 获取用户状态(阶段+计数)
@@ -529,10 +536,10 @@ func SetUserStage(ctx context.Context, userId, platform string, stage int) error
return err return err
} }
// SetUserCustomerServiceId 设置用户对应的客服账号ID,并刷新过期时间 // SetUserAccountName 设置用户对应的客服账号名称,并刷新过期时间
func SetUserCustomerServiceId(ctx context.Context, userId, platform, customerServiceId string) error { func SetUserAccountName(ctx context.Context, userId, platform, accountName string) error {
key := UserStateKeyPrefix + userId + "_" + platform key := UserStateKeyPrefix + userId + "_" + platform
_, err := redisClient.Do(ctx, "HSET", key, "customerServiceId", customerServiceId) _, err := redisClient.Do(ctx, "HSET", key, "accountName", accountName)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -10,17 +10,17 @@ type HistoryMessage struct {
// SendStreamMessage 发送到 Redis Stream 的消息结构 // SendStreamMessage 发送到 Redis Stream 的消息结构
type SendStreamMessage struct { type SendStreamMessage struct {
UserId string `json:"user_id"` // 用户ID UserId string `json:"user_id"` // 用户ID
Content string `json:"content"` // 消息内容 Content string `json:"content"` // 消息内容
Timestamp int64 `json:"timestamp"` // 时间戳(秒) Timestamp int64 `json:"timestamp"` // 时间戳(秒)
MessageId string `json:"message_id"` // 消息唯一ID MessageId string `json:"message_id"` // 消息唯一ID
Platform string `json:"platform,omitempty"` // 平台标识 Platform string `json:"platform,omitempty"` // 平台标识
AccountId string `json:"account_id,omitempty"` // 账号ID AccountId string `json:"account_id,omitempty"` // 账号ID
TenantId string `json:"tenant_id,omitempty"` // 租户ID数据隔离 TenantId string `json:"tenant_id,omitempty"` // 租户ID数据隔离
CustomerServiceId string `json:"customer_service_id,omitempty"` // 客服账号ID AccountName string `json:"account_name,omitempty"` // 客服账号名称
ChatId string `json:"chat_id,omitempty"` // RAGFlow Chat ID从ragflow_config查询 ChatId string `json:"chat_id,omitempty"` // RAGFlow Chat ID从ragflow_config查询
ReplyQueue string `json:"reply_queue,omitempty"` // 响应队列名称(支持多实例独立队列) ReplyQueue string `json:"reply_queue,omitempty"` // 响应队列名称(支持多实例独立队列)
History []HistoryMessage `json:"history,omitempty"` // 历史对话(归档后恢复时携带) History []HistoryMessage `json:"history,omitempty"` // 历史对话(归档后恢复时携带)
} }
// BatchStreamMessage 批量消息结构 // BatchStreamMessage 批量消息结构