Files
prompts-core/service/prompt/prompt_compose_service.go

355 lines
10 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package prompt
import (
"context"
"encoding/json"
"errors"
"fmt"
"prompts-core/common/util"
"prompts-core/consts/public"
"prompts-core/dao"
"prompts-core/model/dto"
"prompts-core/model/entity"
"prompts-core/service/gateway"
"gitea.com/red-future/common/beans"
"gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
)
// ComposeMessages 核心拼接提示词主流程
func ComposeMessages(ctx context.Context, req *dto.ComposeMessagesReq) (*dto.ComposeMessagesRes, error) {
// 1) 获取模型信息
chatModel, aiModel, err := GetModelMessage(ctx, req)
if err != nil {
return nil, err
}
// 2) 校验用户表单
if err = validateUserForm(req, aiModel); err != nil {
return nil, err
}
return handleBuild(ctx, req, chatModel, aiModel)
}
// GetModelMessage 获取模型信息
func GetModelMessage(ctx context.Context, req *dto.ComposeMessagesReq) (*gateway.AsynchModel, *gateway.AsynchModel, error) {
userInfo, err := utils.GetUserInfo(ctx)
if err != nil {
return nil, nil, fmt.Errorf("获取用户信息失败: %w", err)
}
chatModel, err := gateway.GetModelConfig(ctx, &gateway.AsynchModel{
SQLBaseDO: beans.SQLBaseDO{Creator: userInfo.UserName},
IsChatModel: 1,
})
if err != nil || chatModel == nil {
return nil, nil, errors.New("当前没有对话模型,请添加")
}
aiModel, err := gateway.GetModelConfig(ctx, &gateway.AsynchModel{
SQLBaseDO: beans.SQLBaseDO{TenantId: userInfo.TenantId, Creator: userInfo.UserName},
ModelName: req.ModelName,
})
if err != nil || aiModel == nil {
return nil, nil, errors.New("需要构建的模型不存在")
}
return chatModel, aiModel, nil
}
// validateUserForm 校验用户表单
func validateUserForm(req *dto.ComposeMessagesReq, model *gateway.AsynchModel) error {
if len(req.UserForm) == 0 {
return nil
}
isValid, exceedTokens, err := util.CheckUserFormWithinWindow(req.UserForm, model.TokenConfig)
if err != nil {
return fmt.Errorf("校验用户表单失败: %w", err)
}
if !isValid {
availableWindow := util.GetAvailableWindow(model.TokenConfig)
return fmt.Errorf("UserForm 内容超出窗口大小: 超出 %d tokens可用窗口 %d tokens请精简后重试",
exceedTokens, availableWindow)
}
return nil
}
// handleBuild 通用构建处理
func handleBuild(ctx context.Context, req *dto.ComposeMessagesReq, chatModel, aiModel *gateway.AsynchModel) (*dto.ComposeMessagesRes, error) {
// 1) 处理表单分批
processedReq, totalBatches, err := ProcessUserFormBatches(ctx, req, aiModel)
if err != nil {
return nil, fmt.Errorf("处理用户表单分批失败: %w", err)
}
// 2) 构建推理请求
ir := NewPromptIR()
var taskReq map[string]any
switch req.BuildType {
case public.BuildTypePrompt:
taskReq, err = buildPromptTypeRequest(ctx, processedReq, aiModel, chatModel, ir, totalBatches)
case public.BuildTypeNode:
taskReq, err = buildNodeTypeRequest(ctx, req, chatModel, ir)
case public.BuildTypeStruct:
taskReq, err = buildStructTypeRequest(ctx, req, chatModel, ir)
default:
return nil, errors.New("不支持的构建类型")
}
if err != nil {
return nil, fmt.Errorf("构建推理请求失败: %w", err)
}
// 3) 调用网关创建任务
taskID, err := gateway.CreateGatewayTask(ctx, taskReq)
if err != nil {
return nil, fmt.Errorf("创建网关任务失败: %w", err)
}
if taskID == "" {
return nil, errors.New("网关未返回taskId")
}
// 4) 保存任务记录
if _, err = dao.ComposeTask.Insert(ctx, &entity.ComposeTask{
TaskId: taskID,
ModelName: req.ModelName,
SkillName: req.SkillName,
BuildType: req.BuildType,
CallbackUrl: req.CallbackUrl,
RequestPayload: util.MustMarshalToMap(req),
Status: public.ComposeStatusPending,
}); err != nil {
return nil, err
}
return &dto.ComposeMessagesRes{TaskId: taskID}, nil
}
// Callback 回调处理
func Callback(ctx context.Context, req *dto.CallbackReq) error {
g.Log().Infof(ctx, "[开始回调处理] taskId=%s state=%d", req.TaskId, req.State)
// 1) 查询任务
composeTask, err := dao.ComposeTask.Get(ctx, &entity.ComposeTask{TaskId: req.TaskId})
if err != nil {
return fmt.Errorf("查询任务失败: %w", err)
}
// 2) 处理失败
if req.State == 3 {
return handleCallbackFailed(ctx, req, composeTask)
}
// 3) 处理成功
if req.State == 2 {
return handleCallbackSuccess(ctx, req, composeTask)
}
return nil
}
// handleCallbackFailed 处理回调失败
func handleCallbackFailed(ctx context.Context, req *dto.CallbackReq, composeTask *entity.ComposeTask) error {
_, err := dao.ComposeTask.Update(ctx, &entity.ComposeTask{
TaskId: req.TaskId,
Status: public.ComposeStatusFailed,
ErrorMessage: req.ErrorMsg,
GatewayState: req.State,
OssFile: req.OssFile,
FileType: req.FileType,
ResultText: req.Messages,
})
if composeTask.CallbackUrl != "" {
composeTask.Status = public.ComposeStatusFailed
composeTask.ErrorMessage = req.ErrorMsg
_ = gateway.SendCallback(ctx, composeTask, 0)
}
return err
}
// handleCallbackSuccess 处理回调成功
func handleCallbackSuccess(ctx context.Context, req *dto.CallbackReq, composeTask *entity.ComposeTask) error {
// 1) 获取模型配置
model, err := gateway.GetModelConfig(ctx, &gateway.AsynchModel{
SQLBaseDO: beans.SQLBaseDO{Creator: composeTask.Creator},
ModelName: composeTask.ModelName,
})
if err != nil {
return fmt.Errorf("查询模型失败: %w", err)
}
// 2) 根据运营商获取协议配置
//protocol, err := dao.ProviderProtocol.Get(ctx, &entity.ProviderProtocol{
// ProviderName: model.OperatorName,
//})
// 2) 解析结果
var messages map[string]any
switch composeTask.BuildType {
case public.BuildTypePrompt, public.BuildTypeNode:
messages = ParseResult(req.Messages, model.ResponseBody)
case public.BuildTypeStruct:
messages = ParseStructResult(req.Messages, model.ResponseBody)
default:
messages = req.Messages
}
// 3) 合并附加结构
messages = util.MergeConsult(composeTask.RequestPayload, messages, model.ExtendMapping)
// 4) 更新数据库
_, err = dao.ComposeTask.Update(ctx, &entity.ComposeTask{
TaskId: req.TaskId,
Status: public.ComposeStatusSuccess,
Messages: messages,
GatewayState: req.State,
OssFile: req.OssFile,
FileType: req.FileType,
ResultText: req.Messages,
})
if err != nil {
return err
}
// 5) 存储提示词结果作为历史请求
var epicycleId int64
payload := composeTask.RequestPayload
sessionId := gconv.String(payload["sessionId"])
nodeId := gconv.String(payload["nodeId"])
buildType := gconv.Int(payload["buildType"])
if buildType == public.BuildTypePrompt && sessionId != "" && nodeId != "" {
epicycleId, err = dao.ComposeSession.Insert(ctx, &entity.ComposeSession{
NodeId: nodeId,
SessionId: sessionId,
RequestContent: messages,
})
}
// 6) 拼接历史内容
// 7) 回调业务方
if composeTask.CallbackUrl != "" {
composeTask.Status = public.ComposeStatusSuccess
composeTask.Messages = messages
_ = gateway.SendCallback(ctx, composeTask, epicycleId)
}
return nil
}
// ParseResult 解析结果
func ParseResult(raw map[string]any, responseBody string) map[string]any {
if responseBody == "" {
return raw
}
contentVal := raw[responseBody]
if contentVal == nil {
return raw
}
// 已经是数组
if arr, ok := contentVal.([]any); ok {
rounds := gconv.Maps(arr)
if len(rounds) > 0 {
return map[string]any{"total_rounds": len(rounds), "rounds": rounds}
}
return raw
}
// 是字符串
contentStr := gconv.String(contentVal)
if contentStr == "" {
return raw
}
// 尝试解析为数组
var arr []map[string]any
if err := json.Unmarshal([]byte(contentStr), &arr); err == nil && len(arr) > 0 {
return map[string]any{"total_rounds": len(arr), "rounds": arr}
}
// 尝试解析为单对象
var obj map[string]any
if err := json.Unmarshal([]byte(contentStr), &obj); err == nil && len(obj) > 0 {
return map[string]any{"total_rounds": 1, "rounds": []map[string]any{obj}}
}
return map[string]any{"content": contentStr}
}
func ParseStructResult(raw map[string]any, responseBody string) map[string]any {
// 如果外层已有 rounds直接返回
if _, ok := raw["rounds"]; ok {
return raw
}
contentVal := raw[responseBody]
if contentVal == nil {
return map[string]any{
"total_rounds": 1,
"rounds": []map[string]any{raw},
}
}
contentStr := gconv.String(contentVal)
if contentStr == "" || contentStr == "0" {
return map[string]any{
"total_rounds": 1,
"rounds": []map[string]any{raw},
}
}
// 尝试解析为数组
var arr []map[string]any
if err := json.Unmarshal([]byte(contentStr), &arr); err == nil && len(arr) > 0 {
// 数组的每个元素包一层 content
var rounds []map[string]any
for _, item := range arr {
rounds = append(rounds, map[string]any{"content": item})
}
return map[string]any{
"total_rounds": len(rounds),
"rounds": rounds,
}
}
// 尝试解析为单个对象
var parsed map[string]any
if err := json.Unmarshal([]byte(contentStr), &parsed); err == nil {
raw[responseBody] = parsed
}
// 兜底:包标准结构
return map[string]any{
"total_rounds": 1,
"rounds": []map[string]any{raw},
}
}
// GetComposeTask 查询任务结果
func GetComposeTask(ctx context.Context, taskID string) (*dto.GetComposeTaskRes, error) {
record, err := dao.ComposeTask.Get(ctx, &entity.ComposeTask{
TaskId: taskID,
})
if err != nil {
return nil, fmt.Errorf("查询任务失败: %w", err)
}
if record == nil {
return nil, fmt.Errorf("未找到任务(taskId=%s)", taskID)
}
messages := parseMessagesForResponse(record.Messages)
return &dto.GetComposeTaskRes{
TaskId: record.TaskId,
Status: record.Status,
ErrorMessage: record.ErrorMessage,
Messages: messages,
}, nil
}
// parseMessagesForResponse 解析用于响应的消息
func parseMessagesForResponse(messages any) any {
str, ok := messages.(string)
if !ok || str == "" {
return messages
}
var parsed any
if err := json.Unmarshal([]byte(str), &parsed); err == nil {
return parsed
}
return messages
}