Reviewed-on: #2
model-asynch(模型异步中间件)[2026.5.12前,暂时弃置]
一个独立的异步中间件服务:按模型配置路由调用不同模型服务,统一生成 task_id,后台异步执行,结果上传 OSS,并提供查询/批量领取/自动重试/自动清理能力,便于业务方“拿走结果并转移”。
分支约定:
dev为开发分支;main(或 master)为线上主分支。
1. 核心功能
1.1 模型配置(asynch_models)
- 增删改查模型服务配置(
model_name唯一标识) - 支持配置:
- 请求地址:
base_url + route - 请求方式:
http_method(GET/POST) - 请求头:
head_msg(以请求头注入,支持多个 header) - 超时:
timeout_seconds - 并发:
max_concurrency(按租户+模型的 Redis 分布式信号量限流) - 重试:
retry_times(失败后最多再重试 N 次) - 保留:
auto_clean_seconds(任务被业务领取到state=4后的保留秒数,到期清理)
- 请求地址:
1.2 异步任务(asynch_task)
- 创建任务:生成
task_id,入库排队 - 后台 Worker:
- PostgreSQL
FOR UPDATE SKIP LOCKED抢占任务,支持多实例不重复消费 - 调用模型服务(GET/POST)
- 结果上传 OSS(调用你们的 OSS 文件服务
oss/file/uploadFile,透传Authorization/X-User-Info)
- PostgreSQL
- 批量领取结果:批量查询
task_id列表,返回task_id/state/oss_file,并把成功的任务从state=2更新为state=4 - 自动重试:失败
state=3会由清理器按retry_times重新入队到队尾 - 自动清理:
state=4且expire_at到期 → 硬删除任务- 失败重试耗尽仍失败 → 硬删除任务
state=0/1超时 → 标记失败(防止卡死)
1.3 统计(asynch_model_stat)
- 按天统计:
day + tenant_id + creator + model_name -> request_count - 统计口径:仅在 Worker 真正调用模型服务时计数(OSS 重试不计数)
- 用途:给其他服务提供全局限流/监控依据
2. 使用流程(业务方如何接入)
第一步:创建模型配置
业务方(或运维)先在中间件里创建/更新模型配置(model_name 为唯一键),例如:
POST /model/createModel(或/model/updateModel)
请求示例(JSON):
{
"modelName": "model-service",
"modelsType": "1,2,3",
"baseUrl": "http://127.0.0.1:8000",
"route": "/api/v1/chat",
"httpMethod": "POST",
"headMsg": "API_KEY:model-key,API_STATE:true,API_NUM:123",
"enabled": 1,
"maxConcurrency": 5,
"queueLimit": 20,
"timeoutSeconds": 1800,
"expectedSeconds": 600,
"retryTimes": 3,
"retryQueueMaxSeconds": 600,
"autoCleanSeconds": 3600,
"remark": "Model-Service 模型服务"
}
参数说明:
modelName:模型名称(唯一标识/路由键)modelsType:模型类型ID列表(逗号分隔),示例:1,2,3(关联asynch_models_type.type_id)
模型类型同步
POST /model/type/createModelType创建成功后,会同步POST到prompts-core的/prompt/createPrompt- 同步字段映射:
typeId->modelTypeIdtype->modelTypepromptInfo->promptInforesponseJsonSchema->responseJsonSchemaversion->version
- 若
prompts-core同步失败,model-gateway会回滚本地新建的模型类型,避免两边数据不一致 form:动态表单配置(JSON数组),用于前端按模型渲染参数表单(字段示例:field/label/type/required)baseUrl:模型服务地址(Base URL)route:模型服务路由(拼接到 baseUrl 后)httpMethod:请求方式(GET/POST)headMsg:请求头绑定(支持多个 header,逗号分隔,格式Key:Value;布尔/数字也会以字符串形式注入 header)enabled:是否启用(0禁用/1启用)maxConcurrency:单模型最大并发(按租户+模型维度限流)queueLimit:排队上限(严格控制)。创建任务时通过 Redis Lua 原子闸门校验并占位,保证分布式并发创建不会超限;任务进入成功/失败态后释放占位,失败重试重新入队时会再次占位。timeoutSeconds:调用模型服务超时(秒)expectedSeconds:模型预计执行时间(秒,用于超时判定/排队策略等)retryTimes:失败后最多再重试 N 次(不含首次)retryQueueMaxSeconds:失败重试最大排队时间(秒);0 表示重试插队到队首;>0 表示排队超过该时间后插队,否则仍到队尾autoCleanSeconds:任务被领取到state=4后的保留时间(秒),到期清理remark:备注说明
第二步:创建任务拿到 task_id
业务方发起推理请求时调用:
POST /task/createTask(传modelName + requestPayload + bizName + callbackUrl(可选) + modelKey(可选))- 中间件返回
task_id - 业务方将
task_id落到自己的业务表,并把业务状态置为「生成中」
modelKey用于“动态覆盖/补充”模型配置中的head_msg(例如每次请求携带不同的X-API-Key:xxx)。
callbackUrl用于任务成功后的回调通知:当任务state=2成功时,中间件会发起一次 GET 请求:
- 实际回调地址:
callbackUrl/{bizName}- query 参数:
task_id/state/oss_file/file_type/text(可选,最多2000字符)
第三步:同步任务进度(推荐批量)
业务方通过轮询/定时任务同步进度:
- 推荐:
POST /task/getTaskBatch(批量传taskIds,返回每个任务的state + oss_file) - 或单条:
GET /task/getTaskResult?taskId=...
业务侧拿到 oss_file 后自行做资源处理(直接保存或转存),并把业务状态更新为「成功/失败」。
说明:批量接口对
state=2(成功)的任务会自动标记为state=4(已下载)并写入expire_at,用于后续清理。
后台执行(由上层定时任务控制)
本项目不再在服务进程内常驻轮询 worker/cleaner,而是提供两个接口供上层定时任务触发:
POST /task/runWork:执行一次 Worker(抢占并处理一批排队任务;适合处理 createTask 立即执行时未处理到的任务和积压队列)POST /task/cleanWork:执行一次 Cleaner(清理过期任务、失败重试、超时任务失败等)
创建任务执行策略:
POST /task/createTask成功入库后,会立即异步尝试执行当前任务。- 若当前模型并发已满,或当前任务未成功抢占,则会按
asynch.worker.intervalSeconds对当前任务做轻量级定向轮询;只要任务仍为state=0就继续尝试,一旦进入state=1/2/3/4就立即停止,不会一直轮询。 - 若任务执行成功且配置了
callbackUrl + bizName,会在成功落库后异步触发回调钩子。
本地调试(可选):
可在 config.yml 中开启自动执行,避免手工频繁调用接口:
asynch:
worker:
enabled: true
intervalSeconds: 5
batchSize: 10
goroutines: 1
cleaner:
enabled: true
intervalSeconds: 30
动态并发/队列调参(接口请求控制)
为支持根据最近一段时间的耗时与吞吐对 max_concurrency/queue_limit 做动态调整,本项目提供接口供上层定时任务触发(建议每小时一次):
POST /model/autoTune
请求参数(JSON,可选):
{
"windowSeconds": 3600
}
windowSeconds不传/<=0 默认 3600(1小时)。
动态调参口径(默认近 1 小时窗口,按 model_name 维度):
- 执行耗时:
finished_at - started_at(取 P90) - 吞吐:近 1 小时完成数 / 3600
调参结果不会覆盖 asynch_models 中配置的最大上限(cap),而是写入 Redis 运行时参数(带 TTL,默认 2 小时):
asynch:runtime:max_concurrency:{model_name}asynch:runtime:queue_limit:{model_name}
生效位置:
- CreateTask 入队时,严格 queue_limit 闸门会优先使用运行时
queue_limit(若无运行时值则回退 cap)。 - Worker 获取并发令牌时,优先使用运行时
max_concurrency(若无运行时值则回退 cap)。
3. 状态机说明(asynch_task.state)
| state | 含义 | 产生方 |
|---|---|---|
| 0 | 排队中 | 创建任务/重试入队 |
| 1 | 执行中 | Worker 抢占后 |
| 2 | 成功(已上传 OSS) | Worker |
| 3 | 失败 | Worker / 超时处理 |
| 4 | 已下载(已领取) | 批量领取接口(2→4) |
字段补充:
retry_count:已重试次数(不含首次)enqueue_at:入队时间(用于排队顺序,重试会更新为 NOW() 放到队尾)expire_at:仅对state=4生效,表示保留到期时间
4. 配置说明(config.yml)
关键配置:
database.default: PostgreSQL 连接redis.default: Redis 连接(并发令牌、可扩展用途)
5. 数据库初始化
项目根目录提供 update.sql:首次部署执行建表 SQL。
6. 开发与发布建议(Git)
dev:日常开发与联调main:线上稳定分支- 推荐流程:
- 从
main拉出dev - 功能完成后提 MR/PR 合并回
main - 打 tag / 发布镜像
- 从