// Package rabbitmq 提供 RabbitMQ 队列初始化的封装方法 // // 本文件包含常用队列的声明和绑定逻辑,简化业务层的队列配置代码 package rabbitmq import ( "context" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/glog" ) // SetupResponseQueue 初始化 RAGFlow 响应队列 // // 功能: // 1. 声明持久化队列(从配置文件读取队列名,默认 ragflow.response.queue) // 2. 绑定到 ragflow.response Exchange(Topic 类型) // 3. 使用通配符 # 匹配所有 routing key(userId) // // 参数: // // ctx: 上下文 // // 返回: // // err: 错误信息,成功返回 nil // // 配置示例(config.yml): // // rabbitmq: // responseQueue: "ragflow.response.queue" # 可选,默认值 func SetupResponseQueue(ctx context.Context) (err error) { // 从配置文件读取队列名(支持每个开发者配置独立队列名) responseQueue := g.Cfg().MustGet(ctx, "rabbitmq.responseQueue", "ragflow.response.queue").String() // 声明持久化队列(服务器重启后队列仍存在) if err = DeclareQueue(ctx, &QueueConfig{ Name: responseQueue, Durable: true, // 持久化,防止数据丢失 }); err != nil { glog.Errorf(ctx, "声明响应队列失败: %v", err) return } // 绑定队列到 Exchange // Exchange 类型为 topic,routing key 使用通配符 # 匹配所有 userId if err = BindQueue(ctx, &BindingConfig{ Queue: responseQueue, Exchange: "ragflow.response", // RAGFlow 响应 Exchange RoutingKey: "#", // 通配符,匹配所有消息 }); err != nil { glog.Errorf(ctx, "绑定响应队列失败: %v", err) return } glog.Infof(ctx, "响应队列已绑定: %s -> ragflow.response (routingKey=#)", responseQueue) return } // SetupDelayedFlushQueue 初始化延时落库队列 // // 功能: // 1. 声明延时 Exchange(x-delayed-message 插件) // 2. 声明持久化队列 conversation.flush.queue // 3. 绑定队列到延时 Exchange // // 用途: // // 对话缓存延时落库机制的兜底策略 // 当对话少于5句时,10分钟后触发延时消息将缓存写入MongoDB // // 参数: // // ctx: 上下文 // // 返回: // // err: 错误信息,成功返回 nil // // 相关: // - service/conversation_service.go: handleResponse() // - service/conversation_service.go: handleDelayedFlush() func SetupDelayedFlushQueue(ctx context.Context) (err error) { // 声明延时 Exchange(需要 RabbitMQ 安装 x-delayed-message 插件) if err = SetupDelayExchange(ctx, "conversation.flush.delayed"); err != nil { glog.Warningf(ctx, "声明延时落库 Exchange 失败: %v", err) return } // 声明持久化队列 if err = DeclareQueue(ctx, &QueueConfig{ Name: "conversation.flush.queue", Durable: true, // 持久化,防止延时消息丢失 }); err != nil { glog.Warningf(ctx, "声明延时落库 Queue 失败: %v", err) return } // 绑定队列到延时 Exchange if err = BindQueue(ctx, &BindingConfig{ Queue: "conversation.flush.queue", Exchange: "conversation.flush.delayed", RoutingKey: "flush", // 延时落库消息的 routing key }); err != nil { glog.Warningf(ctx, "绑定延时落库 Queue 失败: %v", err) return } glog.Info(ctx, "延时落库队列已配置") return }