package gateway import ( "bytes" "context" "encoding/json" "fmt" "mime/multipart" "model-gateway/common/util" "model-gateway/model/entity" "time" commonHttp "gitea.com/red-future/common/http" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/util/guid" ) type uploadFileResponse struct { FileURL string `json:"fileURL"` // 文件 URL FileSize int `json:"fileSize"` // 文件大小(字节) FileName string `json:"fileName"` // 文件名 FileFormat string `json:"fileFormat"` // 文件格式 FileAddressPrefix string `json:"fileAddressPrefix"` // 文件地址前缀 } func UploadByTask(ctx context.Context, _ *entity.AsynchTask, data []byte, fileExt string, _ string) (ossURL string, err error) { // multipart body := &bytes.Buffer{} writer := multipart.NewWriter(body) ext := fileExt if ext == "" { ext = ".bin" } if ext[0] != '.' { ext = "." + ext } filename := fmt.Sprintf("asynch_%d_%s%s", time.Now().Unix(), guid.S(), ext) part, err := writer.CreateFormFile("file", filename) if err != nil { return "", err } if _, err := part.Write(data); err != nil { return "", err } contentType := writer.FormDataContentType() if err := writer.Close(); err != nil { return "", err } headers := util.ForwardHeaders(ctx) headers["Content-Type"] = contentType fullURL := "oss/file/uploadFile" g.Log().Infof(ctx, "[OSS] upload start url=%s filename=%s size=%d", fullURL, filename, len(data)) var resp uploadFileResponse if err = commonHttp.Post(ctx, fullURL, headers, &resp, body.Bytes()); err != nil { return "", err } g.Log().Infof(ctx, "[OSS] upload success url=%s size=%d format=%s", resp.FileURL, resp.FileSize, resp.FileFormat) return resp.FileURL, nil } // TriggerCallback 任务成功后的回调: // - JSON body 参数:task_id/state/oss_file/file_type/text(可选) func TriggerCallback(ctx context.Context, t *entity.AsynchTask) { headers := util.ForwardHeaders(ctx) var req struct{} payload := map[string]interface{}{ "task_id": t.TaskID, "state": t.State, "oss_file": t.OssFile, "file_type": t.FileType, "text": t.TextResult, "error_msg": t.ErrorMsg, } jsonData, err := json.Marshal(payload) if err != nil { g.Log().Warningf(ctx, "[回调] JSON序列化失败 taskId=%s 错误=%v", t.TaskID, err) return } g.Log().Infof(ctx, "[回调] 开始发送 taskId=%s 回调地址=%s 请求头数量=%d 消息体大小=%d字节", t.TaskID, t.CallbackURL, len(headers), len(jsonData)) err = commonHttp.Post(ctx, t.CallbackURL, headers, &req, jsonData) if err != nil { g.Log().Warningf(ctx, "[回调] 发送失败 taskId=%s 回调地址=%s 错误=%v", t.TaskID, t.CallbackURL, err) return } g.Log().Infof(ctx, "[回调] 发送成功 taskId=%s 回调地址=%s 消息体大小=%d字节", t.TaskID, t.CallbackURL, len(jsonData)) } // TriggerPromptsCallback 任务成功后的提示词回调 // - JSON body 参数:epicycleId(轮次id)/textResult(模型回答消息) func TriggerPromptsCallback(ctx context.Context, t *entity.AsynchTask, epicycleId int64) { callbackURL := "prompts-core/session/sessionCallback" headers := util.ForwardHeaders(ctx) var req struct{} payload := map[string]interface{}{ "epicycleId": epicycleId, "text": t.TextResult, } jsonData, err := json.Marshal(payload) if err != nil { g.Log().Warningf(ctx, "[提示词回调] JSON序列化失败 epicycleId=%d 错误=%v", epicycleId, err) return } g.Log().Infof(ctx, "[提示词回调] 开始发送 epicycleId=%d 回调地址=%s 请求头数量=%d 消息体大小=%d字节", t.EpicycleId, callbackURL, len(headers), len(jsonData)) err = commonHttp.Post(ctx, callbackURL, headers, &req, jsonData) if err != nil { g.Log().Warningf(ctx, "[提示词回调] 发送失败 epicycleId=%d 回调地址=%s 错误=%v", t.EpicycleId, callbackURL, err) return } g.Log().Infof(ctx, "[提示词回调] 发送成功 epicycleId=%d 回调地址=%s 消息体大小=%d字节", t.EpicycleId, callbackURL, len(jsonData)) } // IsSuperAdmin 调用admin-go服务检查是否是超级管理员 func IsSuperAdmin(ctx context.Context) (res bool, err error) { headers := util.ForwardHeaders(ctx) var r = make(map[string]bool) if err = commonHttp.Get(ctx, "admin-go/api/v1/system/user/checkIsSuperAdmin", headers, &r); err != nil { return false, err } return r["isSuperAdmin"], err } //// callback 向回调地址 POST 任务结果(与查询接口 GetTaskRes 出参一致) //func (s *audioTaskService) callback(ctx context.Context, taskID, status, errMsg, callbackURL string) { // if callbackURL == "" { // return // } // // task, _ := dao.TranscribeTask.GetByTaskID(ctx, taskID) // if task == nil { // g.Log().Errorf(ctx, "[回调 %s] 任务不存在", taskID) // return // } // // detailList, _ := dao.TranscribeTaskDetail.ListByTaskID(ctx, taskID) // detailItems := make([]dto.TranscribeTaskDetailItem, 0, len(detailList)) // for i := range detailList { // detailItems = append(detailItems, dao.DetailEntityToItem(&detailList[i])) // } // // // 构建与查询接口一致的 taskInfo // taskInfo := dao.EntityToItem(task) // // // 兼容历史数据: 从 result 中补全 scenes 等字段 // detailItems = enrichDetailsFromResult(task.Result, detailItems) // // payload := dto.CallbackPayload{ // TaskInfo: taskInfo, // DetailList: detailItems, // } // // body, _ := json.Marshal(payload) // // // 透传调用方的用户信息 // userJSON, _ := json.Marshal(beans.User{UserName: "admin", TenantId: 1}) // // req, _ := http.NewRequest("POST", callbackURL, bytes.NewReader(body)) // req.Header.Set("Content-Type", "application/json") // req.Header.Set("X-User-Info", string(userJSON)) // // resp, reqErr := http.DefaultClient.Do(req) // if reqErr != nil { // g.Log().Errorf(ctx, "[回调 %s] 请求失败: %v", taskID, reqErr) // return // } // defer resp.Body.Close() // // respBody, _ := io.ReadAll(resp.Body) // g.Log().Infof(ctx, "[回调 %s] 响应 status=%d, body=%s", taskID, resp.StatusCode, string(respBody)) //}