2026-05-12 13:45:08 +08:00
|
|
|
|
# model-asynch(模型异步中间件)[2026.5.12前,暂时弃置]
|
2026-04-29 15:54:14 +08:00
|
|
|
|
|
|
|
|
|
|
一个独立的异步中间件服务:按模型配置路由调用不同模型服务,统一生成 `task_id`,后台异步执行,结果上传 OSS,并提供查询/批量领取/自动重试/自动清理能力,便于业务方“拿走结果并转移”。
|
|
|
|
|
|
|
|
|
|
|
|
> 分支约定:`dev` 为开发分支;`main`(或 master)为线上主分支。
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
## 1. 核心功能
|
|
|
|
|
|
|
|
|
|
|
|
### 1.1 模型配置(asynch_models)
|
|
|
|
|
|
- 增删改查模型服务配置(`model_name` 唯一标识)
|
|
|
|
|
|
- 支持配置:
|
|
|
|
|
|
- 请求地址:`base_url + route`
|
|
|
|
|
|
- 请求方式:`http_method`(GET/POST)
|
|
|
|
|
|
- 请求头:`head_msg`(以请求头注入,支持多个 header)
|
|
|
|
|
|
- 超时:`timeout_seconds`
|
|
|
|
|
|
- 并发:`max_concurrency`(按租户+模型的 Redis 分布式信号量限流)
|
|
|
|
|
|
- 重试:`retry_times`(失败后最多再重试 N 次)
|
|
|
|
|
|
- 保留:`auto_clean_seconds`(任务被业务领取到 `state=4` 后的保留秒数,到期清理)
|
|
|
|
|
|
|
|
|
|
|
|
### 1.2 异步任务(asynch_task)
|
|
|
|
|
|
- 创建任务:生成 `task_id`,入库排队
|
|
|
|
|
|
- 后台 Worker:
|
|
|
|
|
|
- PostgreSQL `FOR UPDATE SKIP LOCKED` 抢占任务,支持多实例不重复消费
|
|
|
|
|
|
- 调用模型服务(GET/POST)
|
|
|
|
|
|
- 结果上传 OSS(调用你们的 OSS 文件服务 `oss/file/uploadFile`,透传 `Authorization/X-User-Info`)
|
|
|
|
|
|
- 批量领取结果:批量查询 `task_id` 列表,返回 `task_id/state/oss_file`,并把成功的任务从 `state=2` 更新为 `state=4`
|
|
|
|
|
|
- 自动重试:失败 `state=3` 会由清理器按 `retry_times` 重新入队到队尾
|
|
|
|
|
|
- 自动清理:
|
|
|
|
|
|
- `state=4` 且 `expire_at` 到期 → 硬删除任务
|
|
|
|
|
|
- 失败重试耗尽仍失败 → 硬删除任务
|
|
|
|
|
|
- `state=0/1` 超时 → 标记失败(防止卡死)
|
|
|
|
|
|
|
|
|
|
|
|
### 1.3 统计(asynch_model_stat)
|
|
|
|
|
|
- 按天统计:`day + tenant_id + creator + model_name -> request_count`
|
|
|
|
|
|
- 统计口径:仅在 Worker 真正调用模型服务时计数(OSS 重试不计数)
|
|
|
|
|
|
- 用途:给其他服务提供全局限流/监控依据
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
## 2. 使用流程(业务方如何接入)
|
|
|
|
|
|
|
|
|
|
|
|
### 第一步:创建模型配置
|
|
|
|
|
|
业务方(或运维)先在中间件里创建/更新模型配置(`model_name` 为唯一键),例如:
|
|
|
|
|
|
- `POST /model/createModel`(或 `/model/updateModel`)
|
|
|
|
|
|
|
|
|
|
|
|
请求示例(JSON):
|
|
|
|
|
|
```json
|
|
|
|
|
|
{
|
|
|
|
|
|
"modelName": "model-service",
|
|
|
|
|
|
"modelsType": "1,2,3",
|
|
|
|
|
|
"baseUrl": "http://127.0.0.1:8000",
|
|
|
|
|
|
"route": "/api/v1/chat",
|
|
|
|
|
|
"httpMethod": "POST",
|
|
|
|
|
|
"headMsg": "API_KEY:model-key,API_STATE:true,API_NUM:123",
|
|
|
|
|
|
"enabled": 1,
|
|
|
|
|
|
"maxConcurrency": 5,
|
|
|
|
|
|
"queueLimit": 20,
|
|
|
|
|
|
"timeoutSeconds": 1800,
|
|
|
|
|
|
"expectedSeconds": 600,
|
|
|
|
|
|
"retryTimes": 3,
|
|
|
|
|
|
"retryQueueMaxSeconds": 600,
|
|
|
|
|
|
"autoCleanSeconds": 3600,
|
|
|
|
|
|
"remark": "Model-Service 模型服务"
|
|
|
|
|
|
}
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
参数说明:
|
|
|
|
|
|
- `modelName`:模型名称(唯一标识/路由键)
|
|
|
|
|
|
- `modelsType`:模型类型ID列表(逗号分隔),示例:`1,2,3`(关联 `asynch_models_type.type_id`)
|
2026-05-12 13:45:08 +08:00
|
|
|
|
|
|
|
|
|
|
### 模型类型同步
|
|
|
|
|
|
- `POST /model/type/createModelType` 创建成功后,会同步 `POST` 到 `prompts-core` 的 `/prompt/createPrompt`
|
|
|
|
|
|
- 同步字段映射:
|
|
|
|
|
|
- `typeId` -> `modelTypeId`
|
|
|
|
|
|
- `type` -> `modelType`
|
|
|
|
|
|
- `promptInfo` -> `promptInfo`
|
|
|
|
|
|
- `responseJsonSchema` -> `responseJsonSchema`
|
|
|
|
|
|
- `version` -> `version`
|
|
|
|
|
|
- 若 `prompts-core` 同步失败,`model-gateway` 会回滚本地新建的模型类型,避免两边数据不一致
|
2026-04-29 15:54:14 +08:00
|
|
|
|
- `form`:动态表单配置(JSON数组),用于前端按模型渲染参数表单(字段示例:field/label/type/required)
|
|
|
|
|
|
- `baseUrl`:模型服务地址(Base URL)
|
|
|
|
|
|
- `route`:模型服务路由(拼接到 baseUrl 后)
|
|
|
|
|
|
- `httpMethod`:请求方式(GET/POST)
|
|
|
|
|
|
- `headMsg`:请求头绑定(支持多个 header,逗号分隔,格式 `Key:Value`;布尔/数字也会以字符串形式注入 header)
|
|
|
|
|
|
- `enabled`:是否启用(0禁用/1启用)
|
|
|
|
|
|
- `maxConcurrency`:单模型最大并发(按租户+模型维度限流)
|
|
|
|
|
|
- `queueLimit`:排队上限(严格控制)。创建任务时通过 Redis Lua 原子闸门校验并占位,保证分布式并发创建不会超限;任务进入成功/失败态后释放占位,失败重试重新入队时会再次占位。
|
|
|
|
|
|
- `timeoutSeconds`:调用模型服务超时(秒)
|
|
|
|
|
|
- `expectedSeconds`:模型预计执行时间(秒,用于超时判定/排队策略等)
|
|
|
|
|
|
- `retryTimes`:失败后最多再重试 N 次(不含首次)
|
|
|
|
|
|
- `retryQueueMaxSeconds`:失败重试最大排队时间(秒);0 表示重试插队到队首;>0 表示排队超过该时间后插队,否则仍到队尾
|
|
|
|
|
|
- `autoCleanSeconds`:任务被领取到 `state=4` 后的保留时间(秒),到期清理
|
|
|
|
|
|
- `remark`:备注说明
|
|
|
|
|
|
|
|
|
|
|
|
### 第二步:创建任务拿到 task_id
|
|
|
|
|
|
业务方发起推理请求时调用:
|
|
|
|
|
|
- `POST /task/createTask`(传 `modelName + requestPayload + bizName + callbackUrl(可选) + modelKey(可选)`)
|
|
|
|
|
|
- 中间件返回 `task_id`
|
|
|
|
|
|
- 业务方将 `task_id` 落到自己的业务表,并把业务状态置为「生成中」
|
|
|
|
|
|
|
|
|
|
|
|
> `modelKey` 用于“动态覆盖/补充”模型配置中的 `head_msg`(例如每次请求携带不同的 `X-API-Key:xxx`)。
|
|
|
|
|
|
>
|
|
|
|
|
|
> `callbackUrl` 用于任务成功后的回调通知:当任务 `state=2` 成功时,中间件会发起一次 GET 请求:
|
|
|
|
|
|
> - 实际回调地址:`callbackUrl/{bizName}`
|
2026-05-12 13:45:08 +08:00
|
|
|
|
> - query 参数:`task_id/state/oss_file/file_type/text(可选,最多2000字符)`
|
2026-04-29 15:54:14 +08:00
|
|
|
|
|
|
|
|
|
|
### 第三步:同步任务进度(推荐批量)
|
|
|
|
|
|
业务方通过轮询/定时任务同步进度:
|
|
|
|
|
|
- 推荐:`POST /task/getTaskBatch`(批量传 `taskIds`,返回每个任务的 `state + oss_file`)
|
|
|
|
|
|
- 或单条:`GET /task/getTaskResult?taskId=...`
|
|
|
|
|
|
|
|
|
|
|
|
业务侧拿到 `oss_file` 后自行做资源处理(直接保存或转存),并把业务状态更新为「成功/失败」。
|
|
|
|
|
|
|
|
|
|
|
|
> 说明:批量接口对 `state=2(成功)` 的任务会自动标记为 `state=4(已下载)` 并写入 `expire_at`,用于后续清理。
|
|
|
|
|
|
|
|
|
|
|
|
### 后台执行(由上层定时任务控制)
|
|
|
|
|
|
本项目不再在服务进程内常驻轮询 worker/cleaner,而是提供两个接口供上层定时任务触发:
|
2026-05-12 13:45:08 +08:00
|
|
|
|
- `POST /task/runWork`:执行一次 Worker(抢占并处理一批排队任务;适合处理 createTask 立即执行时未处理到的任务和积压队列)
|
2026-04-29 15:54:14 +08:00
|
|
|
|
- `POST /task/cleanWork`:执行一次 Cleaner(清理过期任务、失败重试、超时任务失败等)
|
|
|
|
|
|
|
2026-05-12 13:45:08 +08:00
|
|
|
|
创建任务执行策略:
|
|
|
|
|
|
- `POST /task/createTask` 成功入库后,会立即异步尝试执行当前任务。
|
|
|
|
|
|
- 若当前模型并发已满,或当前任务未成功抢占,则会按 `asynch.worker.intervalSeconds` 对当前任务做轻量级定向轮询;只要任务仍为 `state=0` 就继续尝试,一旦进入 `state=1/2/3/4` 就立即停止,不会一直轮询。
|
|
|
|
|
|
- 若任务执行成功且配置了 `callbackUrl + bizName`,会在成功落库后异步触发回调钩子。
|
|
|
|
|
|
|
|
|
|
|
|
本地调试(可选):
|
|
|
|
|
|
可在 `config.yml` 中开启自动执行,避免手工频繁调用接口:
|
|
|
|
|
|
```yml
|
|
|
|
|
|
asynch:
|
|
|
|
|
|
worker:
|
|
|
|
|
|
enabled: true
|
|
|
|
|
|
intervalSeconds: 5
|
|
|
|
|
|
batchSize: 10
|
|
|
|
|
|
goroutines: 1
|
|
|
|
|
|
cleaner:
|
|
|
|
|
|
enabled: true
|
|
|
|
|
|
intervalSeconds: 30
|
|
|
|
|
|
```
|
|
|
|
|
|
|
2026-04-29 15:54:14 +08:00
|
|
|
|
### 动态并发/队列调参(接口请求控制)
|
|
|
|
|
|
为支持根据最近一段时间的耗时与吞吐对 `max_concurrency/queue_limit` 做动态调整,本项目提供接口供上层定时任务触发(建议每小时一次):
|
|
|
|
|
|
- `POST /model/autoTune`
|
|
|
|
|
|
|
|
|
|
|
|
请求参数(JSON,可选):
|
|
|
|
|
|
```json
|
|
|
|
|
|
{
|
|
|
|
|
|
"windowSeconds": 3600
|
|
|
|
|
|
}
|
|
|
|
|
|
```
|
|
|
|
|
|
> `windowSeconds` 不传/<=0 默认 3600(1小时)。
|
|
|
|
|
|
|
|
|
|
|
|
动态调参口径(默认近 1 小时窗口,按 `model_name` 维度):
|
|
|
|
|
|
- 执行耗时:`finished_at - started_at`(取 P90)
|
|
|
|
|
|
- 吞吐:近 1 小时完成数 / 3600
|
|
|
|
|
|
|
|
|
|
|
|
调参结果不会覆盖 `asynch_models` 中配置的最大上限(cap),而是写入 Redis 运行时参数(带 TTL,默认 2 小时):
|
|
|
|
|
|
- `asynch:runtime:max_concurrency:{model_name}`
|
|
|
|
|
|
- `asynch:runtime:queue_limit:{model_name}`
|
|
|
|
|
|
|
|
|
|
|
|
生效位置:
|
|
|
|
|
|
- CreateTask 入队时,严格 queue_limit 闸门会优先使用运行时 `queue_limit`(若无运行时值则回退 cap)。
|
|
|
|
|
|
- Worker 获取并发令牌时,优先使用运行时 `max_concurrency`(若无运行时值则回退 cap)。
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
## 3. 状态机说明(asynch_task.state)
|
|
|
|
|
|
|
|
|
|
|
|
| state | 含义 | 产生方 |
|
|
|
|
|
|
|---:|---|---|
|
|
|
|
|
|
| 0 | 排队中 | 创建任务/重试入队 |
|
|
|
|
|
|
| 1 | 执行中 | Worker 抢占后 |
|
|
|
|
|
|
| 2 | 成功(已上传 OSS) | Worker |
|
|
|
|
|
|
| 3 | 失败 | Worker / 超时处理 |
|
|
|
|
|
|
| 4 | 已下载(已领取) | 批量领取接口(2→4) |
|
|
|
|
|
|
|
|
|
|
|
|
字段补充:
|
|
|
|
|
|
- `retry_count`:已重试次数(不含首次)
|
|
|
|
|
|
- `enqueue_at`:入队时间(用于排队顺序,重试会更新为 NOW() 放到队尾)
|
|
|
|
|
|
- `expire_at`:仅对 `state=4` 生效,表示保留到期时间
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
## 4. 配置说明(config.yml)
|
|
|
|
|
|
|
|
|
|
|
|
关键配置:
|
|
|
|
|
|
- `database.default`: PostgreSQL 连接
|
|
|
|
|
|
- `redis.default`: Redis 连接(并发令牌、可扩展用途)
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
## 5. 数据库初始化
|
|
|
|
|
|
|
|
|
|
|
|
项目根目录提供 `update.sql`:首次部署执行建表 SQL。
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
## 6. 开发与发布建议(Git)
|
|
|
|
|
|
|
|
|
|
|
|
- `dev`:日常开发与联调
|
|
|
|
|
|
- `main`:线上稳定分支
|
|
|
|
|
|
- 推荐流程:
|
|
|
|
|
|
1) 从 `main` 拉出 `dev`
|
|
|
|
|
|
2) 功能完成后提 MR/PR 合并回 `main`
|
|
|
|
|
|
3) 打 tag / 发布镜像
|