From 7381d57b77e7a7c63a4fa89fdc5036dc0331d0e4 Mon Sep 17 00:00:00 2001 From: Cold <16419454+cold502@user.noreply.gitee.com> Date: Fri, 19 Dec 2025 15:02:05 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90websocket=E5=AF=B9=E8=AF=9D?= =?UTF-8?q?=E6=BC=94=E7=A4=BA=E5=92=8Cmain=E6=96=B9=E6=B3=95=E7=9A=84?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=E6=8A=BD=E7=A6=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/welcome.go | 4 ++-- rabbitmq/consumer.go | 28 ++++++++++++++++++++++++++++ rabbitmq/instance.go | 41 +++++++++++++++++++++++++++++++++++++++++ redis/redis.go | 5 +++++ redis/types.go | 17 +++++++++-------- 5 files changed, 85 insertions(+), 10 deletions(-) create mode 100644 rabbitmq/instance.go diff --git a/config/welcome.go b/config/welcome.go index 6dfe133..4e4bb76 100644 --- a/config/welcome.go +++ b/config/welcome.go @@ -17,8 +17,8 @@ var ( // initWelcomeMessages 初始化欢迎话术配置 func initWelcomeMessages(ctx context.Context) { welcomeOnce.Do(func() { - cfg := g.Cfg() - welcomeMap := cfg.MustGet(ctx, "welcomes").MapStrStr() + // 从默认配置文件(config.yml)读取 welcomes 配置 + welcomeMap := g.Cfg().MustGet(ctx, "welcomes").MapStrStr() welcomeMu.Lock() welcomeCache = welcomeMap diff --git a/rabbitmq/consumer.go b/rabbitmq/consumer.go index 4fb5a2f..0ddca60 100644 --- a/rabbitmq/consumer.go +++ b/rabbitmq/consumer.go @@ -83,6 +83,34 @@ func (c *Consumer) Start(ctx context.Context) (err error) { return err } + // 声明队列(如果不存在则创建) + _, err = ch.QueueDeclare( + c.queue, // name + true, // durable(持久化) + false, // autoDelete(不自动删除) + false, // exclusive(非独占) + false, // noWait + nil, // arguments + ) + if err != nil { + return gerror.Newf("声明队列失败: %v", err) + } + + // TODO: 队列绑定逻辑暂时注释,避免重复binding导致消息重复投递 + // 绑定队列到Exchange(使用队列名作为routingKey,支持多租户) + // Exchange类型应该是topic,绑定模式为 #(接收所有消息) + // err = ch.QueueBind( + // c.queue, // queue name + // "#", // routing key(通配符,接收所有消息) + // "ragflow.response", // exchange name + // false, // noWait + // nil, // arguments + // ) + // if err != nil { + // g.Log().Warningf(ctx, "绑定队列到Exchange失败(可能Exchange不存在或类型不匹配): %v", err) + // // 不返回错误,继续启动消费者(可能是direct exchange或队列已绑定) + // } + // 设置 QoS(并发控制) err = ch.Qos( c.prefetchCount, // prefetchCount: 每个 consumer 最多同时处理的消息数 diff --git a/rabbitmq/instance.go b/rabbitmq/instance.go new file mode 100644 index 0000000..330c6d5 --- /dev/null +++ b/rabbitmq/instance.go @@ -0,0 +1,41 @@ +package rabbitmq + +import ( + "fmt" + "os" + "sync" + + "github.com/gogf/gf/v2/util/guid" +) + +var ( + instanceId string + instanceOnce sync.Once +) + +// getInstanceId 获取当前实例的唯一标识(单例) +// 格式:{hostname}.{uuid8} +func getInstanceId() string { + instanceOnce.Do(func() { + // 获取主机名 + hostname, err := os.Hostname() + if err != nil || hostname == "" { + hostname = "unknown" + } + + // 生成8位UUID + uuid := guid.S()[:8] + + instanceId = fmt.Sprintf("%s.%s", hostname, uuid) + }) + return instanceId +} + +// GetInstanceQueueName 获取当前实例的响应队列名 +// 格式:{baseQueue}.{hostname}.{uuid8} +func GetInstanceQueueName(baseQueue string) string { + if baseQueue == "" { + baseQueue = "ragflow.response" + } + return fmt.Sprintf("%s.%s", baseQueue, getInstanceId()) +} diff --git a/redis/redis.go b/redis/redis.go index 69ff1c5..072fb1b 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -502,6 +502,11 @@ func GetUserState(ctx context.Context, userId, platform string) (state *UserStat state = &UserState{Stage: 5} // 默认状态5(未选择方向) if result.IsEmpty() { + // Redis为空,初始化默认状态 + if initErr := SetUserStage(ctx, userId, platform, 5); initErr != nil { + err = initErr + return + } return } diff --git a/redis/types.go b/redis/types.go index 59bf280..17c571f 100644 --- a/redis/types.go +++ b/redis/types.go @@ -10,14 +10,15 @@ type HistoryMessage struct { // SendStreamMessage 发送到 Redis Stream 的消息结构 type SendStreamMessage struct { - UserId string `json:"user_id"` // 用户ID - Content string `json:"content"` // 消息内容 - Timestamp int64 `json:"timestamp"` // 时间戳(秒) - MessageId string `json:"message_id"` // 消息唯一ID - Platform string `json:"platform,omitempty"` // 平台标识 - AccountId string `json:"account_id,omitempty"` // 账号ID - TenantId string `json:"tenant_id,omitempty"` // 租户ID(数据隔离) - History []HistoryMessage `json:"history,omitempty"` // 历史对话(归档后恢复时携带) + UserId string `json:"user_id"` // 用户ID + Content string `json:"content"` // 消息内容 + Timestamp int64 `json:"timestamp"` // 时间戳(秒) + MessageId string `json:"message_id"` // 消息唯一ID + Platform string `json:"platform,omitempty"` // 平台标识 + AccountId string `json:"account_id,omitempty"` // 账号ID + TenantId string `json:"tenant_id,omitempty"` // 租户ID(数据隔离) + ReplyQueue string `json:"reply_queue,omitempty"` // 响应队列名称(支持多实例独立队列) + History []HistoryMessage `json:"history,omitempty"` // 历史对话(归档后恢复时携带) } // BatchStreamMessage 批量消息结构