From 03aef62184f971ba669fe5281cf162810350537b Mon Sep 17 00:00:00 2001 From: Cold <16419454+cold502@user.noreply.gitee.com> Date: Tue, 16 Dec 2025 11:52:46 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AC=AC=E4=BA=94=E6=AC=A1=E5=AF=B9=E8=AF=9D?= =?UTF-8?q?=E5=8F=91=E5=8D=A1=E7=89=87,=20=E7=94=A8redis=E8=AE=B0=E5=BD=95?= =?UTF-8?q?=E5=8D=A1=E7=89=87=E6=AC=A1=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consul/配置中心方案.md | 267 +++++++++++++++++++++++++++++++++++++++++ redis/redis.go | 46 +++++++ startup/startup.go | 104 ---------------- 3 files changed, 313 insertions(+), 104 deletions(-) create mode 100644 consul/配置中心方案.md delete mode 100644 startup/startup.go diff --git a/consul/配置中心方案.md b/consul/配置中心方案.md new file mode 100644 index 0000000..3c87785 --- /dev/null +++ b/consul/配置中心方案.md @@ -0,0 +1,267 @@ +# Consul KV 配置中心方案 + +**文档版本**:v1.0 +**创建日期**:2025-12-16 +**负责项目**:customerservice + message + gateway + +--- + +## 一、核心目标 + +| 目标 | 说明 | +|------|------| +| **安全性** | 敏感配置(连接地址/密码)不写在代码里,从 Consul KV 读取 | +| **回退机制** | Consul 不可用时,使用本地默认配置保证服务可启动 | +| **热更新** | Consul KV 变更后,服务无需重启立即生效 | +| **性能最优** | 请求路径只读内存(原子快照),不访问 Consul;单协程阻塞监听,不空转 | + +--- + +## 二、配置分类与热更新策略 + +### 2.1 配置分类 + +| 分类 | 配置项 | 热更新方式 | +|------|--------|------------| +| **连接池配置** | http 连接池数量、redis 连接池数量、mongo 连接池数量 | 平滑切换(创建新池→原子替换→延迟关闭旧池) | +| **协程池配置** | grpool worker 数量 | 动态调整(`grpool.SetSize()`) | +| **业务参数** | 追问延时、归档延时、批量大小、超时时间、卡片触发轮数 | 直接原子更新内存变量 | +| **开关/限流** | 功能开关、限流阈值、日志级别 | 直接原子更新内存变量 | +| **中间件地址** | Redis/MQ/Mongo/ES/Jaeger 地址 | 平滑切换连接池 | + +### 2.2 热更新策略矩阵 + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ 热更新策略矩阵 │ +├─────────────────────────────────────────────────────────────────┤ +│ 配置变更类型 │ 更新方式 │ 对请求影响 │ +├─────────────────────────────────────────────────────────────────┤ +│ 业务参数/开关 │ atomic.Value │ 无影响,立即生效 │ +│ 协程池数量 │ grpool.SetSize │ 无影响,渐进生效 │ +│ 连接池数量/地址 │ 平滑切换单例 │ 无影响,新请求用新池 │ +└─────────────────────────────────────────────────────────────────┘ +``` + +--- + +## 三、Consul KV 键名规范 + +``` +config/{service}/pool +config/{service}/business +config/{service}/middleware +``` + +### 3.1 示例 KV 结构 + +**Key**: `config/customerservice/pool` +```yaml +http: + maxConns: 100 # HTTP 连接池最大连接数 + maxIdleConns: 20 # HTTP 连接池最大空闲连接 +redis: + maxActive: 50 # Redis 连接池最大活跃连接 + maxIdle: 10 # Redis 连接池最大空闲连接 + idleTimeout: 300 # 空闲超时(秒) +grpool: + workerSize: 200 # 协程池 worker 数量 +``` + +**Key**: `config/customerservice/business` +```yaml +followUp: + delay1: 30 # 第一次追问延时(秒) + delay2: 60 # 第二次追问延时(秒) + delay3: 180 # 第三次追问延时(秒) +archive: + delay: 3600 # 归档延时(秒) +stream: + batchSize: 200 # 批量读取消息数量 + blockTimeout: 2000 # 阻塞超时(毫秒) +card: + triggerCount: 5 # 触发发送卡片的对话轮数 +``` + +**Key**: `config/customerservice/middleware` +```yaml +redis: + address: "192.168.3.200:6379" + password: "" + db: 0 +rabbitmq: + host: "192.168.3.200" + port: 5672 + username: "guest" + password: "guest" + vhost: "/" +mongo: + uri: "mongodb://192.168.3.200:27017" + database: "customer_service" +``` + +--- + +## 四、架构设计 + +### 4.1 模块结构 + +``` +common/configcenter/ +├── configcenter.go # 核心:启动/停止/获取配置 +├── watcher.go # Consul KV 阻塞监听(单协程) +├── snapshot.go # 配置快照(atomic.Value) +├── hook.go # 变更回调分发 +└── types.go # 配置结构体定义 +``` + +### 4.2 核心接口 + +```go +// 启动配置中心(每个服务 main.go 调用一次) +func Start(ctx context.Context, opts Options) error + +// 获取当前配置快照(请求路径调用,O(1) 无锁) +func Get() *Snapshot + +// 注册变更回调(组件注册自己的重初始化逻辑) +func Subscribe(name string, handler func(old, new *Snapshot)) + +// 优雅停止 +func Stop() +``` + +### 4.3 数据流 + +``` +┌──────────────┐ 阻塞查询 ┌──────────────┐ +│ Consul KV │ ─────────────→ │ Watcher │ (单协程,不空转) +└──────────────┘ (长轮询) └──────┬───────┘ + │ 配置变更 + ▼ + ┌──────────────┐ + │ Snapshot │ (atomic.Value 原子替换) + └──────┬───────┘ + │ + ┌───────────────────────┼───────────────────────┐ + ▼ ▼ ▼ + ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ + │ 业务代码读取 │ │ Hook 回调 │ │ 日志记录 │ + │ Get().Pool │ │ 重建连接池 │ │ 版本号变更 │ + └────────────────┘ └────────────────┘ └────────────────┘ +``` + +--- + +## 五、服务启动流程 + +```go +func main() { + ctx := context.Background() + + // 1. 启动配置中心(优先从 Consul 拉取,失败则用本地默认) + if err := configcenter.Start(ctx, configcenter.Options{ + ConsulAddr: "192.168.3.200:8500", + ServiceName: "customerservice", + DefaultPath: "config.yml", // 回退配置 + }); err != nil { + glog.Warningf(ctx, "配置中心启动失败,使用本地配置: %v", err) + } + + // 2. 注册组件热更新回调 + configcenter.Subscribe("redis", redis.OnConfigChange) + configcenter.Subscribe("grpool", ragflow.OnPoolSizeChange) + configcenter.Subscribe("rabbitmq", rabbitmq.OnConfigChange) + + // 3. 初始化组件(使用 configcenter.Get() 获取配置) + initComponents(ctx) + + // 4. 启动服务... + + // 5. 优雅退出 + defer configcenter.Stop() +} +``` + +--- + +## 六、性能保证 + +| 环节 | 性能策略 | +|------|----------| +| **请求路径** | 只读 `atomic.Value`,O(1),无锁,无网络 | +| **监听 Consul** | 阻塞查询(Blocking Query),无变更时不消耗 CPU | +| **配置解析** | 只在变更时解析一次 YAML → 结构体 | +| **连接池切换** | 原子替换指针,旧池延迟 30s 关闭(等待请求完成) | +| **协程池调整** | `grpool` 内置支持动态调整,无需重建 | + +--- + +## 七、回退与容错 + +| 场景 | 处理方式 | +|------|----------| +| **启动时 Consul 不可用** | 使用本地 `config.yml` 默认配置,服务正常启动 | +| **运行时 Consul 断连** | 保持最后一次成功配置,自动重试连接 | +| **配置格式错误** | 拒绝更新,保持旧配置,记录错误日志 | +| **连接池切换失败** | 保持旧连接池,记录错误,不影响服务 | + +--- + +## 八、Consul KV 操作示例 + +### 8.1 通过 Consul UI 操作 + +1. 访问 `http://192.168.3.200:8500/ui` +2. 点击 **Key/Value** 菜单 +3. 创建/编辑 Key,如 `config/customerservice/business` +4. 在 Value 中粘贴 YAML 配置内容 +5. 点击 **Save** + +### 8.2 通过 CLI 操作 + +```bash +# 写入配置 +consul kv put config/customerservice/business @business.yml + +# 读取配置 +consul kv get config/customerservice/business + +# 删除配置 +consul kv delete config/customerservice/business +``` + +### 8.3 通过 HTTP API 操作 + +```bash +# 写入配置 +curl --request PUT \ + --url http://192.168.3.200:8500/v1/kv/config/customerservice/business \ + --data-binary @business.yml + +# 读取配置(Base64 编码) +curl http://192.168.3.200:8500/v1/kv/config/customerservice/business + +# 阻塞查询(长轮询,等待变更) +curl "http://192.168.3.200:8500/v1/kv/config/customerservice/business?index=123&wait=5m" +``` + +--- + +## 九、实现进度 + +| 阶段 | 状态 | 说明 | +|------|------|------| +| 方案设计 | ✅ 完成 | 本文档 | +| common/configcenter 模块 | ⏳ 待实现 | 核心配置中心模块 | +| 各组件热更新回调 | ⏳ 待实现 | Redis/MQ/Mongo/ES 等 | +| 联调验证 | ⏳ 待实现 | KV 修改后无需重启生效 | + +--- + +## 十、注意事项 + +1. **安全性**:生产环境的 Consul 必须启用 ACL,限制 KV 访问权限 +2. **版本控制**:重要配置变更前,建议先备份旧配置 +3. **灰度发布**:可通过 `config/{env}/{service}` 实现多环境配置隔离 +4. **监控告警**:建议对配置变更事件添加日志和告警 diff --git a/redis/redis.go b/redis/redis.go index 7156952..1585f51 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -475,3 +475,49 @@ func Unlock(ctx context.Context, key string) { glog.Errorf(ctx, "释放分布式锁失败: %v", err) } } + +// ============== 对话计数相关(用于卡片触发)============== + +const ( + // ConversationCountKeyPrefix 对话计数 Key 前缀 + ConversationCountKeyPrefix = "ragflow:conversation:count:" +) + +// IncrConversationCount 增加用户对话计数,返回当前轮数 +// 用于判断是否触发发送卡片(如对话5轮后发送) +// expireSeconds: 过期时间(秒),建议与会话超时一致(如7200秒=2小时) +func IncrConversationCount(ctx context.Context, userId, platform string, expireSeconds int64) (count int64, err error) { + key := ConversationCountKeyPrefix + userId + "_" + platform + result, err := redisClient.Do(ctx, "INCR", key) + if err != nil { + return + } + count = result.Int64() + + // 首次设置过期时间 + if count == 1 { + redisClient.Do(ctx, "EXPIRE", key, expireSeconds) + } + return +} + +// GetConversationCount 获取用户当前对话轮数 +func GetConversationCount(ctx context.Context, userId, platform string) (count int64, err error) { + key := ConversationCountKeyPrefix + userId + "_" + platform + result, err := redisClient.Get(ctx, key) + if err != nil { + return + } + if result.IsEmpty() { + return 0, nil + } + count = result.Int64() + return +} + +// ResetConversationCount 重置用户对话计数(归档/卡片发送后调用) +func ResetConversationCount(ctx context.Context, userId, platform string) error { + key := ConversationCountKeyPrefix + userId + "_" + platform + _, err := redisClient.Del(ctx, key) + return err +} diff --git a/startup/startup.go b/startup/startup.go deleted file mode 100644 index d927e43..0000000 --- a/startup/startup.go +++ /dev/null @@ -1,104 +0,0 @@ -// Package startup 提供服务启动时的组件初始化控制 -// 各服务可以按需初始化所需组件,避免不必要的资源占用 -package startup - -import ( - "context" - "sync" - - "github.com/gogf/gf/v2/frame/g" - "github.com/gogf/gf/v2/os/glog" -) - -// Components 组件配置 -type Components struct { - Consul bool // Consul 服务注册发现(所有服务都需要) - Jaeger bool // Jaeger 链路追踪(所有服务都需要) - Redis bool // Redis 缓存 - RabbitMQ bool // RabbitMQ 消息队列 - MongoDB bool // MongoDB 数据库 - RAGFlow bool // RAGFlow AI 客户端 - ES bool // Elasticsearch -} - -var ( - initialized bool - initOnce sync.Once - components *Components -) - -// Init 初始化指定的组件 -// 示例: -// -// bootstrap.Init(ctx, &bootstrap.Components{ -// Consul: true, -// Jaeger: true, -// Redis: true, -// RabbitMQ: true, -// }) -func Init(ctx context.Context, c *Components) { - initOnce.Do(func() { - components = c - initialized = true - glog.Infof(ctx, "Bootstrap 初始化完成: %+v", c) - }) -} - -// IsInitialized 检查是否已初始化 -func IsInitialized() bool { - return initialized -} - -// GetComponents 获取组件配置 -func GetComponents() *Components { - if components == nil { - // 默认配置:从配置文件读取 - return loadFromConfig() - } - return components -} - -// NeedRedis 是否需要 Redis -func NeedRedis() bool { - c := GetComponents() - return c != nil && c.Redis -} - -// NeedRabbitMQ 是否需要 RabbitMQ -func NeedRabbitMQ() bool { - c := GetComponents() - return c != nil && c.RabbitMQ -} - -// NeedMongoDB 是否需要 MongoDB -func NeedMongoDB() bool { - c := GetComponents() - return c != nil && c.MongoDB -} - -// NeedRAGFlow 是否需要 RAGFlow -func NeedRAGFlow() bool { - c := GetComponents() - return c != nil && c.RAGFlow -} - -// NeedES 是否需要 Elasticsearch -func NeedES() bool { - c := GetComponents() - return c != nil && c.ES -} - -// loadFromConfig 从配置文件加载组件配置 -// 如果配置文件中没有 startup 配置,则默认全部启动 -func loadFromConfig() *Components { - ctx := context.Background() - return &Components{ - Consul: !g.Cfg().MustGet(ctx, "consul").IsEmpty(), - Jaeger: !g.Cfg().MustGet(ctx, "jaeger").IsEmpty(), - Redis: !g.Cfg().MustGet(ctx, "redis").IsEmpty(), - RabbitMQ: !g.Cfg().MustGet(ctx, "rabbitmq").IsEmpty(), - MongoDB: !g.Cfg().MustGet(ctx, "mongo").IsEmpty(), - RAGFlow: !g.Cfg().MustGet(ctx, "ragflow").IsEmpty(), - ES: !g.Cfg().MustGet(ctx, "elasticsearch").IsEmpty(), - } -}