package data import ( consts "cid/consts/data" dao "cid/dao/data" dto "cid/model/dto/data" entity "cid/model/entity/data" "context" "fmt" "time" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/grpool" "github.com/gogf/gf/v2/util/guid" ) type dataFetchService struct{} // DataFetch 数据获取服务 var DataFetch = new(dataFetchService) // FetchPool 数据获取协程池,限制并发数避免goroutine爆炸 var FetchPool = grpool.New(10) // Execute 执行数据获取 func (s *dataFetchService) Execute(ctx context.Context, req *dto.ExecuteDataFetchReq) (res *dto.ExecuteDataFetchRes, err error) { // 检查接口是否存在 apiInterface, err := dao.ApiInterface.GetOne(ctx, &dto.GetApiInterfaceReq{Id: req.InterfaceId}) if err != nil || apiInterface == nil { return nil, fmt.Errorf("接口不存在") } // 检查接口状态 if apiInterface.Status != consts.PlatformStatusActive { return nil, fmt.Errorf("接口未启用") } // 检查平台是否存在 platform, err := dao.Platform.GetOne(ctx, &dto.GetPlatformReq{Id: apiInterface.PlatformId}) if err != nil || platform == nil { return nil, fmt.Errorf("平台不存在") } // 检查平台状态 if platform.Status != consts.PlatformStatusActive { return nil, fmt.Errorf("平台未启用") } // 生成请求ID requestId := guid.S() startTime := time.Now().UnixMilli() // 创建数据获取日志 fetchLog := &entity.DataFetchLog{ PlatformId: req.PlatformId, InterfaceId: req.InterfaceId, RequestId: requestId, Status: consts.FetchStatusPending, StartTime: startTime, RequestConfig: req.RequestParams, RetryCount: 0, } logId, err := dao.DataFetchLog.Insert(ctx, fetchLog) if err != nil { return nil, fmt.Errorf("创建日志失败: %v", err) } // 使用协程池异步执行数据获取 asyncCtx := context.WithoutCancel(ctx) FetchPool.Add(asyncCtx, func(ctx context.Context) { s.executeFetch(ctx, logId, requestId, platform, apiInterface, req.RequestParams, startTime) }) return &dto.ExecuteDataFetchRes{ RequestId: requestId, Status: string(consts.FetchStatusPending), Message: "任务已提交", }, nil } // BatchExecute 批量执行数据获取 func (s *dataFetchService) BatchExecute(ctx context.Context, req *dto.BatchExecuteDataFetchReq) (res *dto.BatchExecuteDataFetchRes, err error) { var requestIds []string successCount := 0 failedCount := 0 for _, interfaceId := range req.InterfaceIds { executeReq := &dto.ExecuteDataFetchReq{ PlatformId: 0, // 接口服务会自动获取 InterfaceId: interfaceId, RequestParams: req.RequestParams, } executeRes, err := s.Execute(ctx, executeReq) if err != nil { failedCount++ g.Log().Error(ctx, "批量执行失败", g.Map{ "interfaceId": interfaceId, "error": err.Error(), }) } else { successCount++ requestIds = append(requestIds, executeRes.RequestId) } } return &dto.BatchExecuteDataFetchRes{ SuccessCount: successCount, FailedCount: failedCount, RequestIds: requestIds, }, nil } // List 获取数据获取日志列表 func (s *dataFetchService) List(ctx context.Context, req *dto.ListDataFetchLogReq) (res *dto.ListDataFetchLogRes, err error) { logList, total, err := dao.DataFetchLog.List(ctx, req) if err != nil { return } // 获取平台和接口ID列表 platformIds := make([]int64, 0) interfaceIds := make([]int64, 0) for _, item := range logList { if item.PlatformId > 0 { platformIds = append(platformIds, item.PlatformId) } if item.InterfaceId > 0 { interfaceIds = append(interfaceIds, item.InterfaceId) } } // 批量获取平台和接口信息 platformMap := make(map[int64]string) interfaceMap := make(map[int64]string) if len(platformIds) > 0 { platforms, _, err := dao.Platform.List(ctx, &dto.ListPlatformReq{}) if err == nil { for _, p := range platforms { platformMap[p.Id] = p.Name } } } if len(interfaceIds) > 0 { interfaces, _, err := dao.ApiInterface.List(ctx, &dto.ListApiInterfaceReq{}) if err == nil { for _, i := range interfaces { interfaceMap[i.Id] = i.Name } } } // 组装响应数据 list := make([]dto.DataFetchLogItem, 0, len(logList)) for _, item := range logList { platformName := "" if name, ok := platformMap[item.PlatformId]; ok { platformName = name } interfaceName := "" if name, ok := interfaceMap[item.InterfaceId]; ok { interfaceName = name } list = append(list, dto.DataFetchLogItem{ Id: item.Id, PlatformId: item.PlatformId, PlatformName: platformName, InterfaceId: item.InterfaceId, InterfaceName: interfaceName, RequestId: item.RequestId, Status: item.Status, StatusName: s.getStatusName(item.Status), StartTime: item.StartTime, EndTime: item.EndTime, Duration: item.Duration, ErrorMessage: item.ErrorMessage, RetryCount: item.RetryCount, CreatedAt: item.CreatedAt.Unix(), }) } res = &dto.ListDataFetchLogRes{ List: list, Total: total, } return } // GetOne 获取单个数据获取日志 func (s *dataFetchService) GetOne(ctx context.Context, req *dto.GetDataFetchLogReq) (res *dto.GetDataFetchLogRes, err error) { fetchLog, err := dao.DataFetchLog.GetOne(ctx, req) if err != nil { return } // 获取平台和接口名称 var platformName, interfaceName string if fetchLog.PlatformId > 0 { platform, _ := dao.Platform.GetOne(ctx, &dto.GetPlatformReq{Id: fetchLog.PlatformId}) if platform != nil { platformName = platform.Name } } if fetchLog.InterfaceId > 0 { apiInterface, _ := dao.ApiInterface.GetOne(ctx, &dto.GetApiInterfaceReq{Id: fetchLog.InterfaceId}) if apiInterface != nil { interfaceName = apiInterface.Name } } return &dto.GetDataFetchLogRes{ DataFetchLog: fetchLog, PlatformName: platformName, InterfaceName: interfaceName, }, nil } // ReExecute 重新执行数据获取 func (s *dataFetchService) ReExecute(ctx context.Context, req *dto.ReExecuteDataFetchReq) (res *dto.ReExecuteDataFetchRes, err error) { // 获取原日志 oldLog, err := dao.DataFetchLog.GetOne(ctx, &dto.GetDataFetchLogReq{Id: req.LogId}) if err != nil || oldLog == nil { return nil, fmt.Errorf("日志不存在") } // 检查接口是否存在 apiInterface, err := dao.ApiInterface.GetOne(ctx, &dto.GetApiInterfaceReq{Id: oldLog.InterfaceId}) if err != nil || apiInterface == nil { return nil, fmt.Errorf("接口不存在") } // 检查平台是否存在 platform, err := dao.Platform.GetOne(ctx, &dto.GetPlatformReq{Id: apiInterface.PlatformId}) if err != nil || platform == nil { return nil, fmt.Errorf("平台不存在") } // 生成新的请求ID requestId := guid.S() startTime := time.Now().UnixMilli() // 创建新的数据获取日志 fetchLog := &entity.DataFetchLog{ PlatformId: oldLog.PlatformId, InterfaceId: oldLog.InterfaceId, RequestId: requestId, Status: consts.FetchStatusPending, StartTime: startTime, RequestConfig: oldLog.RequestConfig, RetryCount: oldLog.RetryCount + 1, } logId, err := dao.DataFetchLog.Insert(ctx, fetchLog) if err != nil { return nil, fmt.Errorf("创建日志失败: %v", err) } // 使用协程池异步执行数据获取 asyncCtx := context.WithoutCancel(ctx) FetchPool.Add(asyncCtx, func(ctx context.Context) { s.executeFetch(ctx, logId, requestId, platform, apiInterface, oldLog.RequestConfig, startTime) }) return &dto.ReExecuteDataFetchRes{ RequestId: requestId, Status: string(consts.FetchStatusPending), Message: "任务已重新提交", }, nil } // executeFetch 执行实际的数据获取(在协程池中运行) func (s *dataFetchService) executeFetch(ctx context.Context, logId int64, requestId string, platform *entity.Platform, apiInterface *entity.ApiInterface, requestParams map[string]interface{}, startTime int64) { // 更新状态为执行中 dao.DataFetchLog.UpdateStatus(ctx, logId, string(consts.FetchStatusRunning), 0, 0, "", "") // TODO: 根据平台和接口配置执行HTTP请求 // 这里需要根据平台的限流配置进行限流控制 // 根据接口的请求配置组装请求参数 // 调用接口获取数据 // 模拟接口调用 time.Sleep(time.Second * 2) // 模拟成功响应 responseData := `{"code": 0, "message": "success", "data": {"items": []}}` endTime := time.Now().UnixMilli() duration := int(endTime - startTime) // 更新状态为成功 dao.DataFetchLog.UpdateStatus(ctx, logId, string(consts.FetchStatusSuccess), endTime, duration, responseData, "") g.Log().Info(ctx, "数据获取成功", g.Map{ "logId": logId, "requestId": requestId, "platform": platform.Name, "interface": apiInterface.Name, "duration": duration, }) } // getStatusName 获取状态名称 func (s *dataFetchService) getStatusName(status consts.FetchStatus) string { statusNames := map[consts.FetchStatus]string{ consts.FetchStatusPending: "待执行", consts.FetchStatusRunning: "执行中", consts.FetchStatusSuccess: "成功", consts.FetchStatusFailed: "失败", consts.FetchStatusRateLimit: "触发限流", } if name, ok := statusNames[status]; ok { return name } return string(status) }