Files
prompts-core/service/prompt/prompt_compose_service.go

565 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package prompt
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"gitea.com/red-future/common/beans"
"gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/container/gvar"
"github.com/gogf/gf/v2/frame/g"
"prompts-core/common/util"
"prompts-core/consts/public"
"prompts-core/dao"
"prompts-core/model/dto"
"prompts-core/model/entity"
"prompts-core/service/gateway"
)
// ComposeMessages 核心拼接提示词主流程
func ComposeMessages(ctx context.Context, req *dto.ComposeMessagesReq) (*dto.ComposeMessagesRes, error) {
chatModel, aiModel, err := GetModelMessage(ctx, req)
if err != nil {
return nil, err
}
if err = validateUserForm(ctx, req, aiModel); err != nil {
return nil, err
}
switch req.BuildType {
case public.BuildTypePrompt:
return handlePromptBuild(ctx, req, chatModel, aiModel) // 提示词构建
case public.BuildTypeNode:
return handleNodeBuild(ctx, req, chatModel, aiModel) // 节点构建
default:
return handleDefaultCase(ctx, req)
}
}
// validateUserForm 校验用户表单
func validateUserForm(ctx context.Context, req *dto.ComposeMessagesReq, model *entity.AsynchModel) error {
if len(req.UserForm) == 0 {
return nil
}
isValid, exceedTokens, err := util.CheckUserFormWithinWindow(req.UserForm, model.TokenConfig)
if err != nil {
return fmt.Errorf("校验用户表单失败: %w", err)
}
if !isValid {
availableWindow := util.GetAvailableWindow(model.TokenConfig)
return fmt.Errorf("UserForm 内容超出窗口大小: 超出 %d tokens可用窗口 %d tokens请精简后重试",
exceedTokens, availableWindow)
}
return nil
}
// handlePromptBuild 处理提示词构建BuildType=1
func handlePromptBuild(ctx context.Context, req *dto.ComposeMessagesReq, chatModel, aiModel *entity.AsynchModel) (*dto.ComposeMessagesRes, error) {
maxRetryTimes := g.Cfg().MustGet(ctx, "promptsRetry.maxRetryTimes", 3).Int()
history, err := GetHistoryMessages(ctx, req.SessionId)
if err != nil {
g.Log().Errorf(ctx, "获取历史会话失败: %v将不使用历史会话", err)
history = nil
}
var message *dto.MultiRoundResult
var taskRecord *entity.ComposeTask
for attempt := 0; attempt <= maxRetryTimes; attempt++ {
if attempt > 0 {
g.Log().Warningf(ctx, "[重试]第 %d/%d 次调用推理模型", attempt, maxRetryTimes)
}
taskID, err := callInferenceModel(ctx, req, chatModel, aiModel, history)
if err != nil {
g.Log().Errorf(ctx, "调用推理模型失败(第%d次): %v", attempt+1, err)
continue
}
if err = saveComposeTask(ctx, taskID, req); err != nil {
g.Log().Errorf(ctx, "保存任务记录失败(第%d次): %v", attempt+1, err)
continue
}
taskRecord, err = waitForResult(ctx, taskID)
if err != nil {
g.Log().Errorf(ctx, "等待结果失败(第%d次): %v", attempt+1, err)
continue
}
message = parsePromptBuild(taskRecord, chatModel)
if message != nil {
break
}
g.Log().Warningf(ctx, "[重试] 推理结果不合法(第%d次),准备重新请求", attempt+1)
}
if message == nil {
return nil, errors.New("推理模型调用失败,请稍后再试")
}
epicycleId, err := dao.ComposeSession.Insert(ctx, &entity.ComposeSession{
SessionId: req.SessionId,
RequestContent: message,
})
if err != nil {
g.Log().Errorf(ctx, "创建会话记录失败: %v", err)
}
return &dto.ComposeMessagesRes{
Messages: message,
EpicycleId: epicycleId,
}, nil
}
// handleNodeBuild 处理节点构建BuildType=2
func handleNodeBuild(ctx context.Context, req *dto.ComposeMessagesReq, chatModel, aiModel *entity.AsynchModel) (*dto.ComposeMessagesRes, error) {
taskID, err := callInferenceModel(ctx, req, chatModel, aiModel, nil)
if err != nil {
return nil, fmt.Errorf("调用推理模型失败: %w", err)
}
if err := saveComposeTask(ctx, taskID, req); err != nil {
return nil, fmt.Errorf("保存任务记录失败: %w", err)
}
taskRecord, err := waitForResult(ctx, taskID)
if err != nil {
return nil, fmt.Errorf("等待结果失败: %w", err)
}
message := parseNodeBuild(taskRecord)
return &dto.ComposeMessagesRes{
Messages: message,
EpicycleId: 0,
}, nil
}
// handleDefaultCase 处理默认情况
func handleDefaultCase(ctx context.Context, req *dto.ComposeMessagesReq) (*dto.ComposeMessagesRes, error) {
epicycleId, err := dao.ComposeSession.Insert(ctx, &entity.ComposeSession{
SessionId: req.SessionId,
Remark: req.Cause,
})
if err != nil {
return nil, fmt.Errorf("创建会话记录失败: %w", err)
}
return &dto.ComposeMessagesRes{
EpicycleId: epicycleId,
}, nil
}
// saveComposeTask 保存组合任务
func saveComposeTask(ctx context.Context, taskID string, req *dto.ComposeMessagesReq) error {
_, err := dao.ComposeTask.Insert(ctx, &entity.ComposeTask{
TaskId: taskID,
ModelName: req.ModelName,
SkillName: req.SkillName,
RequestPayload: util.MustMarshal(req),
Status: public.ComposeStatusPending,
})
return err
}
// GetModelMessage 获取模型信息
func GetModelMessage(ctx context.Context, req *dto.ComposeMessagesReq) (*entity.AsynchModel, *entity.AsynchModel, error) {
userInfo, err := utils.GetUserInfo(ctx)
if err != nil {
return nil, nil, fmt.Errorf("获取用户信息失败: %w", err)
}
chatModel, err := getChatModel(ctx, userInfo.UserName)
if err != nil {
return nil, nil, err
}
aiModel, err := getAIModel(ctx, userInfo.UserName, req.ModelName)
if err != nil {
return nil, nil, err
}
return chatModel, aiModel, nil
}
// getChatModel 获取聊天模型
func getChatModel(ctx context.Context, userName string) (*entity.AsynchModel, error) {
chatModel, err := dao.Model.Get(ctx, &entity.AsynchModel{
SQLBaseDO: beans.SQLBaseDO{Creator: userName},
IsChatModel: new(1),
})
if err != nil {
return nil, fmt.Errorf("查询聊天模型失败: %w", err)
}
if chatModel == nil {
return nil, errors.New("当前没有对话模型,请添加")
}
return chatModel, nil
}
// getAIModel 获取AI模型
func getAIModel(ctx context.Context, userName, modelName string) (*entity.AsynchModel, error) {
aiModel, err := dao.Model.Get(ctx, &entity.AsynchModel{
SQLBaseDO: beans.SQLBaseDO{Creator: userName},
ModelName: modelName,
})
if err != nil {
return nil, fmt.Errorf("查询AI模型失败: %w", err)
}
if aiModel == nil {
return nil, fmt.Errorf("需要构建的模型 %s 不存在", modelName)
}
return aiModel, nil
}
// callInferenceModel 调用推理模型
func callInferenceModel(ctx context.Context, req *dto.ComposeMessagesReq, chatModel *entity.AsynchModel, model *entity.AsynchModel, history []map[string]any) (string, error) {
taskReq, err := buildInferenceRequest(ctx, req, chatModel, model, history)
if err != nil {
return "", fmt.Errorf("构建推理请求失败: %w", err)
}
taskID, err := gateway.CreateGatewayTask(ctx, taskReq)
if err != nil {
return "", fmt.Errorf("创建网关任务失败: %w", err)
}
if taskID == "" {
return "", errors.New("网关未返回taskId")
}
return taskID, nil
}
// waitForResult 等待结果
func waitForResult(ctx context.Context, taskID string) (*entity.ComposeTask, error) {
timeout := time.Duration(g.Cfg().MustGet(ctx, "task.waitTimeoutSeconds", 300).Int()) * time.Second
pollInterval := time.Duration(g.Cfg().MustGet(ctx, "task.pollIntervalMillis", 500).Int()) * time.Millisecond
deadline := time.Now().Add(timeout)
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
for {
record, err := dao.ComposeTask.Get(ctx, &entity.ComposeTask{
TaskId: taskID,
})
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return nil, err
}
return nil, fmt.Errorf("查询任务失败: %w", err)
}
if record != nil {
if completed, result := checkTaskCompletion(record); completed {
return result, nil
}
}
if err = syncGatewayTaskState(ctx, taskID, record); err != nil {
g.Log().Warningf(ctx, "[waitForResult] 同步网关状态失败 taskId=%s err=%v", taskID, err)
}
if time.Now().After(deadline) {
return nil, fmt.Errorf("等待任务回调超时(taskId=%s)", taskID)
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ticker.C:
}
}
}
// checkTaskCompletion 检查任务是否完成
func checkTaskCompletion(record *entity.ComposeTask) (bool, *entity.ComposeTask) {
if record == nil {
return false, nil
}
switch record.Status {
case public.ComposeStatusSuccess:
return true, record
case public.ComposeStatusFailed:
errMsg := strings.TrimSpace(record.ErrorMessage)
if errMsg == "" {
return true, nil
}
return true, nil
default:
return false, nil
}
}
// syncGatewayTaskState 同步网关任务状态
func syncGatewayTaskState(ctx context.Context, taskID string, record *entity.ComposeTask) error {
state, err := gateway.QueryGatewayTaskState(ctx, taskID)
if err != nil {
return fmt.Errorf("查询网关状态失败: %w", err)
}
switch state {
case 2:
return updateTaskStatus(ctx, taskID, public.ComposeStatusSuccess, "")
case 3:
updateTaskStatus(ctx, taskID, public.ComposeStatusFailed, "model-gateway 任务执行失败")
return fmt.Errorf("model-gateway 任务执行失败(taskId=%s)", taskID)
}
return nil
}
// updateTaskStatus 更新任务状态
func updateTaskStatus(ctx context.Context, taskID string, status string, errorMsg string) error {
task := &entity.ComposeTask{
TaskId: taskID,
Status: status,
}
if errorMsg != "" {
task.ErrorMessage = errorMsg
}
_, err := dao.ComposeTask.Update(ctx, task)
return err
}
// parsePromptBuild 解析提示词构建结果BuildType == 1
func parsePromptBuild(taskRecord *entity.ComposeTask, model *entity.AsynchModel) *dto.MultiRoundResult {
if taskRecord == nil {
return nil
}
mapped := parseTaskMessages(taskRecord.Messages)
if mapped == nil {
return createDefaultResult(nil)
}
contentField := getContentField(model)
contentStr, ok := mapped[contentField].(string)
if !ok || contentStr == "" {
return createDefaultResult(mapped)
}
if roundsArray := tryParseAsArray(contentStr); roundsArray != nil {
return &dto.MultiRoundResult{
TotalRounds: len(roundsArray),
Rounds: roundsArray,
}
}
if singleRound := tryParseAsObject(contentStr); singleRound != nil {
return &dto.MultiRoundResult{
TotalRounds: 1,
Rounds: []any{singleRound},
}
}
return createDefaultResult(map[string]any{"content": contentStr})
}
// parseTaskMessages 解析任务消息
func parseTaskMessages(messages any) map[string]any {
var mapped map[string]any
switch v := messages.(type) {
case *gvar.Var:
if v != nil {
json.Unmarshal([]byte(v.String()), &mapped)
}
case string:
json.Unmarshal([]byte(v), &mapped)
case map[string]any:
mapped = v
default:
b, _ := json.Marshal(v)
json.Unmarshal(b, &mapped)
}
return mapped
}
// tryParseAsArray 尝试将字符串解析为数组
func tryParseAsArray(contentStr string) []any {
var roundsArray []any
if err := json.Unmarshal([]byte(contentStr), &roundsArray); err != nil {
return nil
}
return roundsArray
}
// tryParseAsObject 尝试将字符串解析为对象
func tryParseAsObject(contentStr string) any {
var singleRound any
if err := json.Unmarshal([]byte(contentStr), &singleRound); err != nil {
return nil
}
return singleRound
}
// createDefaultResult 创建默认结果
func createDefaultResult(data any) *dto.MultiRoundResult {
if data == nil {
data = make(map[string]any)
}
return &dto.MultiRoundResult{
TotalRounds: 1,
Rounds: []any{data},
}
}
// getContentField 从模型 ResponseMapping 中获取 content 字段名
func getContentField(model *entity.AsynchModel) string {
if model == nil {
return "content"
}
respMapping := parseResponseMapping(model.ResponseMapping)
for k, v := range respMapping {
if strings.Contains(v, "content") {
return k
}
}
return "content"
}
// parseResponseMapping 解析响应映射
func parseResponseMapping(mapping any) map[string]string {
result := make(map[string]string)
switch v := mapping.(type) {
case *gvar.Var:
if v != nil {
json.Unmarshal([]byte(v.String()), &result)
}
case string:
json.Unmarshal([]byte(v), &result)
case map[string]interface{}:
for k, val := range v {
if s, ok := val.(string); ok {
result[k] = s
}
}
}
return result
}
// parseNodeBuild 解析节点构建结果BuildType == 2
func parseNodeBuild(taskRecord *entity.ComposeTask) *dto.MultiRoundResult {
if taskRecord == nil {
return nil
}
result := parseTaskMessages(taskRecord.Messages)
if result == nil {
result = make(map[string]any)
}
return &dto.MultiRoundResult{
TotalRounds: 1,
Rounds: []any{result},
}
}
// Callback 回调处理
func Callback(ctx context.Context, req *dto.CallbackReq) error {
g.Log().Infof(ctx, "[Callback][RECV] taskId=%s state=%d ossFile=%s fileType=%s textLen=%d",
req.TaskId, req.State, req.OssFile, req.FileType, len(req.Text))
task, err := dao.ComposeTask.Get(ctx, &entity.ComposeTask{
TaskId: req.TaskId,
})
if err != nil {
return fmt.Errorf("查询任务失败: %w", err)
}
if task == nil {
return fmt.Errorf("任务不存在: %s", req.TaskId)
}
if req.State == 3 {
return handleCallbackFailure(ctx, req.TaskId, req.ErrorMsg)
}
return handleCallbackSuccess(ctx, req)
}
// handleCallbackFailure 处理回调失败
func handleCallbackFailure(ctx context.Context, taskID, errorMsg string) error {
_, err := dao.ComposeTask.Update(ctx, &entity.ComposeTask{
TaskId: taskID,
Status: public.ComposeStatusFailed,
ErrorMessage: errorMsg,
})
return err
}
// handleCallbackSuccess 处理回调成功
func handleCallbackSuccess(ctx context.Context, req *dto.CallbackReq) error {
result, err := util.ParseOutput(req.Text)
if err != nil {
handleCallbackFailure(ctx, req.TaskId, req.ErrorMsg)
return fmt.Errorf("解析模型输出失败: %w", err)
}
var messages any
if result != nil {
messages = result
}
_, err = dao.ComposeTask.Update(ctx, &entity.ComposeTask{
TaskId: req.TaskId,
Status: public.ComposeStatusSuccess,
Messages: messages,
})
if err != nil {
g.Log().Errorf(ctx, "[Callback] 更新任务失败 taskId=%s err=%v", req.TaskId, err)
}
return err
}
// GetComposeTask 查询任务结果
func GetComposeTask(ctx context.Context, taskID string) (*dto.GetComposeTaskRes, error) {
record, err := dao.ComposeTask.Get(ctx, &entity.ComposeTask{
TaskId: taskID,
})
if err != nil {
return nil, fmt.Errorf("查询任务失败: %w", err)
}
if record == nil {
return nil, fmt.Errorf("未找到任务(taskId=%s)", taskID)
}
messages := parseMessagesForResponse(record.Messages)
return &dto.GetComposeTaskRes{
TaskId: record.TaskId,
Status: record.Status,
ErrorMessage: record.ErrorMessage,
Messages: messages,
}, nil
}
// parseMessagesForResponse 解析用于响应的消息
func parseMessagesForResponse(messages any) any {
str, ok := messages.(string)
if !ok || str == "" {
return messages
}
var parsed any
if err := json.Unmarshal([]byte(str), &parsed); err == nil {
return parsed
}
return messages
}