From ecf3c297592de58461a8689a175c705af10296ba Mon Sep 17 00:00:00 2001 From: Cold <16419454+cold502@user.noreply.gitee.com> Date: Wed, 24 Dec 2025 18:33:11 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A45=E4=B8=AA=E6=96=87=E4=BB=B6,?= =?UTF-8?q?=20=E4=BF=AE=E6=94=B9=E4=BA=86mongo=E6=96=B9=E6=B3=95,=20?= =?UTF-8?q?=E5=A6=82=E6=9E=9Ctoken=E8=8E=B7=E5=8F=96=E9=94=99=E8=AF=AF,=20?= =?UTF-8?q?=E7=84=B6=E5=90=8E=E8=8E=B7=E5=8F=96=E5=AE=8C=E7=A7=9F=E6=88=B7?= =?UTF-8?q?id=E5=90=8E,=20=E6=8A=8A=E9=94=99=E8=AF=AF=E6=B8=85=E9=99=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mongo/mongo.go | 49 ++++++++++++++++++++++++++++-------- rabbitmq/consumer_manager.go | 10 +++++--- ragflow/document.go | 32 ++++++++++++----------- redis/redis.go | 18 ++++++++++--- redis/types.go | 20 ++++++++------- 5 files changed, 88 insertions(+), 41 deletions(-) diff --git a/mongo/mongo.go b/mongo/mongo.go index baabbe5..d02c9ad 100644 --- a/mongo/mongo.go +++ b/mongo/mongo.go @@ -261,31 +261,58 @@ func GetTenantInfo(ctx context.Context) (user do.User, err error) { return } - // 2. token 获取失败,尝试从请求参数获取 customerServiceId + // 2. token 获取失败,尝试从请求参数或context获取 accountName + var accountName string + + // 2.1 尝试从request获取(HTTP请求场景) req := g.RequestFromCtx(ctx) - if req == nil { - return user, gerror.New("无法获取租户信息:无 token 且无 request") + if req != nil { + accountName = req.Get("accountName").String() + if accountName == "" { + accountName = req.Get("account_name").String() + } + // 兼容旧参数名 + if accountName == "" { + accountName = req.Get("customerServiceId").String() + } + if accountName == "" { + accountName = req.Get("customer_service_id").String() + } } - customerServiceId := req.Get("customerServiceId").String() - if customerServiceId == "" { - customerServiceId = req.Get("customer_service_id").String() + // 2.2 request不存在或未获取到,尝试从context.Value获取(WebSocket场景) + if accountName == "" { + if val := ctx.Value("accountName"); val != nil { + if str, ok := val.(string); ok { + accountName = str + } + } + // 兼容旧参数名 + if accountName == "" { + if val := ctx.Value("customerServiceId"); val != nil { + if str, ok := val.(string); ok { + accountName = str + } + } + } } - if customerServiceId == "" { - return user, gerror.New("无法获取租户信息:无 token 且无 customerServiceId 参数") + + if accountName == "" { + return user, gerror.New("无法获取租户信息:无 token 且无 accountName 参数") } // 3. 直接查询 customer_service_account 表获取 tenantId - filter := bson.M{"customerServiceId": customerServiceId, "isDeleted": false} + filter := bson.M{"accountName": accountName, "isDeleted": false} var account struct { TenantId interface{} `bson:"tenantId"` } if findErr := db.Collection("customer_service_account").FindOne(ctx, filter).Decode(&account); findErr != nil { - return user, gerror.Newf("通过 customerServiceId 查询租户失败: %v", findErr) + return user, gerror.Newf("通过 accountName 查询租户失败: %v", findErr) } user.TenantId = account.TenantId - user.UserName = customerServiceId + user.UserName = accountName + err = nil // 清空之前从token获取时的错误 return } diff --git a/rabbitmq/consumer_manager.go b/rabbitmq/consumer_manager.go index 28695ca..54648ce 100644 --- a/rabbitmq/consumer_manager.go +++ b/rabbitmq/consumer_manager.go @@ -104,9 +104,13 @@ func (cm *ConsumerManager) Init() (err error) { } glog.Info(cm.ctx, "RabbitMQ 连接已初始化") - // 设置响应队列(RAGFlow 响应消息) - if err = SetupResponseQueue(cm.ctx); err != nil { - glog.Fatalf(cm.ctx, "设置响应队列失败: %v", err) + // 声明响应Exchange(队列由各消费者自己声明和绑定) + if err = DeclareExchange(cm.ctx, &ExchangeConfig{ + Name: "ragflow.response", + Type: "topic", + Durable: true, + }); err != nil { + glog.Fatalf(cm.ctx, "声明响应Exchange失败: %v", err) return } diff --git a/ragflow/document.go b/ragflow/document.go index 5913572..0d819a7 100644 --- a/ragflow/document.go +++ b/ragflow/document.go @@ -5,10 +5,10 @@ package ragflow import ( "bytes" "context" + "encoding/json" "mime/multipart" "strings" - "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/errors/gerror" ) @@ -44,6 +44,11 @@ type UploadDocumentReq struct { FilePaths []string // 本地文件路径列表 } +// UploadDocumentRes 上传文档响应 +type UploadDocumentRes struct { + Id string `json:"id"` // 文档ID +} + // ListDocumentsReq 列出文档请求 type ListDocumentsReq struct { Page int `json:"page,omitempty"` // 页码,默认 1 @@ -190,28 +195,25 @@ func (c *Client) UploadDocumentFromText(ctx context.Context, datasetId, content, defer resp.Close() // 解析响应 - var result struct { - Code int `json:"code"` - Message string `json:"message"` - Data struct { - Id string `json:"id"` - } `json:"data"` + var response struct { + Code int `json:"code"` + Message string `json:"message"` + Data []UploadDocumentRes `json:"data"` // RAGFlow返回数组 } - bodyBytes := resp.ReadAll() - if err = gjson.DecodeTo(bodyBytes, &result); err != nil { - return "", err + if err := json.Unmarshal(resp.ReadAll(), &response); err != nil { + return "", gerror.Newf("json Decode failed: %v", err) } - if result.Code != 0 { - return "", gerror.Newf("上传文档失败 (code=%d): %s", result.Code, result.Message) + if len(response.Data) == 0 { + return "", gerror.New("上传文档返回data为空") } - if result.Data.Id == "" { - return "", gerror.New("上传成功但未返回文档ID") + if response.Code != 0 { + return "", gerror.Newf("上传文档失败 (code=%d): %s", response.Code, response.Message) } - return result.Data.Id, nil + return response.Data[0].Id, nil } // UploadDocument 上传文档(保留兼容) diff --git a/redis/redis.go b/redis/redis.go index 1ebfe1c..278f912 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -487,9 +487,10 @@ const ( // UserState 用户会话状态(阶段+对话计数+咨询方向,统一5分钟过期) type UserState struct { - Stage int `json:"stage"` // 用户阶段:5=未选择方向 0=AI模型 1=打招呼 2=业务 3=发卡片 - Count int64 `json:"count"` // 对话计数 - Direction string `json:"direction"` // 用户选择的咨询方向(如:产品咨询、售后服务) + Stage int `json:"stage"` // 当前阶段 + Direction string `json:"direction"` // 咨询方向 + Count int64 `json:"count"` // 对话计数(v5.2卡片触发) + CustomerServiceId string `json:"customerServiceId"` // 用户选择的方向对应的客服账号ID } // GetUserState 获取用户状态(阶段+计数) @@ -528,6 +529,17 @@ func SetUserStage(ctx context.Context, userId, platform string, stage int) error return err } +// SetUserCustomerServiceId 设置用户对应的客服账号ID,并刷新过期时间 +func SetUserCustomerServiceId(ctx context.Context, userId, platform, customerServiceId string) error { + key := UserStateKeyPrefix + userId + "_" + platform + _, err := redisClient.Do(ctx, "HSET", key, "customerServiceId", customerServiceId) + if err != nil { + return err + } + _, err = redisClient.Do(ctx, "EXPIRE", key, UserStateExpireSeconds) + return err +} + // SetUserDirection 设置用户选择的咨询方向,并刷新过期时间 func SetUserDirection(ctx context.Context, userId, platform, direction string) error { key := UserStateKeyPrefix + userId + "_" + platform diff --git a/redis/types.go b/redis/types.go index a6a3e36..1ceab65 100644 --- a/redis/types.go +++ b/redis/types.go @@ -10,15 +10,17 @@ type HistoryMessage struct { // SendStreamMessage 发送到 Redis Stream 的消息结构 type SendStreamMessage struct { - UserId string `json:"user_id"` // 用户ID - Content string `json:"content"` // 消息内容 - Timestamp int64 `json:"timestamp"` // 时间戳(秒) - MessageId string `json:"message_id"` // 消息唯一ID - Platform string `json:"platform,omitempty"` // 平台标识 - AccountId string `json:"account_id,omitempty"` // 账号ID - TenantId string `json:"tenant_id,omitempty"` // 租户ID(数据隔离) - ReplyQueue string `json:"reply_queue,omitempty"` // 响应队列名称(支持多实例独立队列) - History []HistoryMessage `json:"history,omitempty"` // 历史对话(归档后恢复时携带) + UserId string `json:"user_id"` // 用户ID + Content string `json:"content"` // 消息内容 + Timestamp int64 `json:"timestamp"` // 时间戳(秒) + MessageId string `json:"message_id"` // 消息唯一ID + Platform string `json:"platform,omitempty"` // 平台标识 + AccountId string `json:"account_id,omitempty"` // 账号ID + TenantId string `json:"tenant_id,omitempty"` // 租户ID(数据隔离) + CustomerServiceId string `json:"customer_service_id,omitempty"` // 客服账号ID + ChatId string `json:"chat_id,omitempty"` // RAGFlow Chat ID(从ragflow_config查询) + ReplyQueue string `json:"reply_queue,omitempty"` // 响应队列名称(支持多实例独立队列) + History []HistoryMessage `json:"history,omitempty"` // 历史对话(归档后恢复时携带) } // BatchStreamMessage 批量消息结构