第五次对话发卡片, 用redis记录卡片次数

This commit is contained in:
Cold
2025-12-16 11:52:46 +08:00
committed by 张斌
parent c4d232d6ff
commit 03aef62184
3 changed files with 313 additions and 104 deletions

View File

@@ -0,0 +1,267 @@
# Consul KV 配置中心方案
**文档版本**v1.0
**创建日期**2025-12-16
**负责项目**customerservice + message + gateway
---
## 一、核心目标
| 目标 | 说明 |
|------|------|
| **安全性** | 敏感配置(连接地址/密码)不写在代码里,从 Consul KV 读取 |
| **回退机制** | Consul 不可用时,使用本地默认配置保证服务可启动 |
| **热更新** | Consul KV 变更后,服务无需重启立即生效 |
| **性能最优** | 请求路径只读内存(原子快照),不访问 Consul单协程阻塞监听不空转 |
---
## 二、配置分类与热更新策略
### 2.1 配置分类
| 分类 | 配置项 | 热更新方式 |
|------|--------|------------|
| **连接池配置** | http 连接池数量、redis 连接池数量、mongo 连接池数量 | 平滑切换(创建新池→原子替换→延迟关闭旧池) |
| **协程池配置** | grpool worker 数量 | 动态调整(`grpool.SetSize()` |
| **业务参数** | 追问延时、归档延时、批量大小、超时时间、卡片触发轮数 | 直接原子更新内存变量 |
| **开关/限流** | 功能开关、限流阈值、日志级别 | 直接原子更新内存变量 |
| **中间件地址** | Redis/MQ/Mongo/ES/Jaeger 地址 | 平滑切换连接池 |
### 2.2 热更新策略矩阵
```
┌─────────────────────────────────────────────────────────────────┐
│ 热更新策略矩阵 │
├─────────────────────────────────────────────────────────────────┤
│ 配置变更类型 │ 更新方式 │ 对请求影响 │
├─────────────────────────────────────────────────────────────────┤
│ 业务参数/开关 │ atomic.Value │ 无影响,立即生效 │
│ 协程池数量 │ grpool.SetSize │ 无影响,渐进生效 │
│ 连接池数量/地址 │ 平滑切换单例 │ 无影响,新请求用新池 │
└─────────────────────────────────────────────────────────────────┘
```
---
## 三、Consul KV 键名规范
```
config/{service}/pool
config/{service}/business
config/{service}/middleware
```
### 3.1 示例 KV 结构
**Key**: `config/customerservice/pool`
```yaml
http:
maxConns: 100 # HTTP 连接池最大连接数
maxIdleConns: 20 # HTTP 连接池最大空闲连接
redis:
maxActive: 50 # Redis 连接池最大活跃连接
maxIdle: 10 # Redis 连接池最大空闲连接
idleTimeout: 300 # 空闲超时(秒)
grpool:
workerSize: 200 # 协程池 worker 数量
```
**Key**: `config/customerservice/business`
```yaml
followUp:
delay1: 30 # 第一次追问延时(秒)
delay2: 60 # 第二次追问延时(秒)
delay3: 180 # 第三次追问延时(秒)
archive:
delay: 3600 # 归档延时(秒)
stream:
batchSize: 200 # 批量读取消息数量
blockTimeout: 2000 # 阻塞超时(毫秒)
card:
triggerCount: 5 # 触发发送卡片的对话轮数
```
**Key**: `config/customerservice/middleware`
```yaml
redis:
address: "192.168.3.200:6379"
password: ""
db: 0
rabbitmq:
host: "192.168.3.200"
port: 5672
username: "guest"
password: "guest"
vhost: "/"
mongo:
uri: "mongodb://192.168.3.200:27017"
database: "customer_service"
```
---
## 四、架构设计
### 4.1 模块结构
```
common/configcenter/
├── configcenter.go # 核心:启动/停止/获取配置
├── watcher.go # Consul KV 阻塞监听(单协程)
├── snapshot.go # 配置快照atomic.Value
├── hook.go # 变更回调分发
└── types.go # 配置结构体定义
```
### 4.2 核心接口
```go
// 启动配置中心(每个服务 main.go 调用一次)
func Start(ctx context.Context, opts Options) error
// 获取当前配置快照请求路径调用O(1) 无锁)
func Get() *Snapshot
// 注册变更回调(组件注册自己的重初始化逻辑)
func Subscribe(name string, handler func(old, new *Snapshot))
// 优雅停止
func Stop()
```
### 4.3 数据流
```
┌──────────────┐ 阻塞查询 ┌──────────────┐
│ Consul KV │ ─────────────→ │ Watcher │ (单协程,不空转)
└──────────────┘ (长轮询) └──────┬───────┘
│ 配置变更
┌──────────────┐
│ Snapshot │ atomic.Value 原子替换)
└──────┬───────┘
┌───────────────────────┼───────────────────────┐
▼ ▼ ▼
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
│ 业务代码读取 │ │ Hook 回调 │ │ 日志记录 │
│ Get().Pool │ │ 重建连接池 │ │ 版本号变更 │
└────────────────┘ └────────────────┘ └────────────────┘
```
---
## 五、服务启动流程
```go
func main() {
ctx := context.Background()
// 1. 启动配置中心(优先从 Consul 拉取,失败则用本地默认)
if err := configcenter.Start(ctx, configcenter.Options{
ConsulAddr: "192.168.3.200:8500",
ServiceName: "customerservice",
DefaultPath: "config.yml", // 回退配置
}); err != nil {
glog.Warningf(ctx, "配置中心启动失败,使用本地配置: %v", err)
}
// 2. 注册组件热更新回调
configcenter.Subscribe("redis", redis.OnConfigChange)
configcenter.Subscribe("grpool", ragflow.OnPoolSizeChange)
configcenter.Subscribe("rabbitmq", rabbitmq.OnConfigChange)
// 3. 初始化组件(使用 configcenter.Get() 获取配置)
initComponents(ctx)
// 4. 启动服务...
// 5. 优雅退出
defer configcenter.Stop()
}
```
---
## 六、性能保证
| 环节 | 性能策略 |
|------|----------|
| **请求路径** | 只读 `atomic.Value`O(1),无锁,无网络 |
| **监听 Consul** | 阻塞查询Blocking Query无变更时不消耗 CPU |
| **配置解析** | 只在变更时解析一次 YAML → 结构体 |
| **连接池切换** | 原子替换指针,旧池延迟 30s 关闭(等待请求完成) |
| **协程池调整** | `grpool` 内置支持动态调整,无需重建 |
---
## 七、回退与容错
| 场景 | 处理方式 |
|------|----------|
| **启动时 Consul 不可用** | 使用本地 `config.yml` 默认配置,服务正常启动 |
| **运行时 Consul 断连** | 保持最后一次成功配置,自动重试连接 |
| **配置格式错误** | 拒绝更新,保持旧配置,记录错误日志 |
| **连接池切换失败** | 保持旧连接池,记录错误,不影响服务 |
---
## 八、Consul KV 操作示例
### 8.1 通过 Consul UI 操作
1. 访问 `http://192.168.3.200:8500/ui`
2. 点击 **Key/Value** 菜单
3. 创建/编辑 Key`config/customerservice/business`
4. 在 Value 中粘贴 YAML 配置内容
5. 点击 **Save**
### 8.2 通过 CLI 操作
```bash
# 写入配置
consul kv put config/customerservice/business @business.yml
# 读取配置
consul kv get config/customerservice/business
# 删除配置
consul kv delete config/customerservice/business
```
### 8.3 通过 HTTP API 操作
```bash
# 写入配置
curl --request PUT \
--url http://192.168.3.200:8500/v1/kv/config/customerservice/business \
--data-binary @business.yml
# 读取配置Base64 编码)
curl http://192.168.3.200:8500/v1/kv/config/customerservice/business
# 阻塞查询(长轮询,等待变更)
curl "http://192.168.3.200:8500/v1/kv/config/customerservice/business?index=123&wait=5m"
```
---
## 九、实现进度
| 阶段 | 状态 | 说明 |
|------|------|------|
| 方案设计 | ✅ 完成 | 本文档 |
| common/configcenter 模块 | ⏳ 待实现 | 核心配置中心模块 |
| 各组件热更新回调 | ⏳ 待实现 | Redis/MQ/Mongo/ES 等 |
| 联调验证 | ⏳ 待实现 | KV 修改后无需重启生效 |
---
## 十、注意事项
1. **安全性**:生产环境的 Consul 必须启用 ACL限制 KV 访问权限
2. **版本控制**:重要配置变更前,建议先备份旧配置
3. **灰度发布**:可通过 `config/{env}/{service}` 实现多环境配置隔离
4. **监控告警**:建议对配置变更事件添加日志和告警

View File

@@ -475,3 +475,49 @@ func Unlock(ctx context.Context, key string) {
glog.Errorf(ctx, "释放分布式锁失败: %v", err) glog.Errorf(ctx, "释放分布式锁失败: %v", err)
} }
} }
// ============== 对话计数相关(用于卡片触发)==============
const (
// ConversationCountKeyPrefix 对话计数 Key 前缀
ConversationCountKeyPrefix = "ragflow:conversation:count:"
)
// IncrConversationCount 增加用户对话计数,返回当前轮数
// 用于判断是否触发发送卡片如对话5轮后发送
// expireSeconds: 过期时间建议与会话超时一致如7200秒=2小时
func IncrConversationCount(ctx context.Context, userId, platform string, expireSeconds int64) (count int64, err error) {
key := ConversationCountKeyPrefix + userId + "_" + platform
result, err := redisClient.Do(ctx, "INCR", key)
if err != nil {
return
}
count = result.Int64()
// 首次设置过期时间
if count == 1 {
redisClient.Do(ctx, "EXPIRE", key, expireSeconds)
}
return
}
// GetConversationCount 获取用户当前对话轮数
func GetConversationCount(ctx context.Context, userId, platform string) (count int64, err error) {
key := ConversationCountKeyPrefix + userId + "_" + platform
result, err := redisClient.Get(ctx, key)
if err != nil {
return
}
if result.IsEmpty() {
return 0, nil
}
count = result.Int64()
return
}
// ResetConversationCount 重置用户对话计数(归档/卡片发送后调用)
func ResetConversationCount(ctx context.Context, userId, platform string) error {
key := ConversationCountKeyPrefix + userId + "_" + platform
_, err := redisClient.Del(ctx, key)
return err
}

View File

@@ -1,104 +0,0 @@
// Package startup 提供服务启动时的组件初始化控制
// 各服务可以按需初始化所需组件,避免不必要的资源占用
package startup
import (
"context"
"sync"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
)
// Components 组件配置
type Components struct {
Consul bool // Consul 服务注册发现(所有服务都需要)
Jaeger bool // Jaeger 链路追踪(所有服务都需要)
Redis bool // Redis 缓存
RabbitMQ bool // RabbitMQ 消息队列
MongoDB bool // MongoDB 数据库
RAGFlow bool // RAGFlow AI 客户端
ES bool // Elasticsearch
}
var (
initialized bool
initOnce sync.Once
components *Components
)
// Init 初始化指定的组件
// 示例:
//
// bootstrap.Init(ctx, &bootstrap.Components{
// Consul: true,
// Jaeger: true,
// Redis: true,
// RabbitMQ: true,
// })
func Init(ctx context.Context, c *Components) {
initOnce.Do(func() {
components = c
initialized = true
glog.Infof(ctx, "Bootstrap 初始化完成: %+v", c)
})
}
// IsInitialized 检查是否已初始化
func IsInitialized() bool {
return initialized
}
// GetComponents 获取组件配置
func GetComponents() *Components {
if components == nil {
// 默认配置:从配置文件读取
return loadFromConfig()
}
return components
}
// NeedRedis 是否需要 Redis
func NeedRedis() bool {
c := GetComponents()
return c != nil && c.Redis
}
// NeedRabbitMQ 是否需要 RabbitMQ
func NeedRabbitMQ() bool {
c := GetComponents()
return c != nil && c.RabbitMQ
}
// NeedMongoDB 是否需要 MongoDB
func NeedMongoDB() bool {
c := GetComponents()
return c != nil && c.MongoDB
}
// NeedRAGFlow 是否需要 RAGFlow
func NeedRAGFlow() bool {
c := GetComponents()
return c != nil && c.RAGFlow
}
// NeedES 是否需要 Elasticsearch
func NeedES() bool {
c := GetComponents()
return c != nil && c.ES
}
// loadFromConfig 从配置文件加载组件配置
// 如果配置文件中没有 startup 配置,则默认全部启动
func loadFromConfig() *Components {
ctx := context.Background()
return &Components{
Consul: !g.Cfg().MustGet(ctx, "consul").IsEmpty(),
Jaeger: !g.Cfg().MustGet(ctx, "jaeger").IsEmpty(),
Redis: !g.Cfg().MustGet(ctx, "redis").IsEmpty(),
RabbitMQ: !g.Cfg().MustGet(ctx, "rabbitmq").IsEmpty(),
MongoDB: !g.Cfg().MustGet(ctx, "mongo").IsEmpty(),
RAGFlow: !g.Cfg().MustGet(ctx, "ragflow").IsEmpty(),
ES: !g.Cfg().MustGet(ctx, "elasticsearch").IsEmpty(),
}
}