去掉无用代码
This commit is contained in:
@@ -1,178 +0,0 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
consts "cid/consts/app"
|
||||
dao "cid/dao/app"
|
||||
dto "cid/model/dto/app"
|
||||
entity "cid/model/entity/app"
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/gogf/gf/v2/util/gconv"
|
||||
)
|
||||
|
||||
type applicationService struct{}
|
||||
|
||||
// Application 应用服务
|
||||
var Application = new(applicationService)
|
||||
|
||||
// Create 创建应用
|
||||
func (s *applicationService) Create(ctx context.Context, req *dto.CreateApplicationReq) (res *dto.CreateApplicationRes, err error) {
|
||||
// 检查应用名称是否重复
|
||||
count, err := dao.Application.Count(ctx, &dto.ListApplicationReq{Name: req.Name})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if count > 0 {
|
||||
return nil, errors.New("应用名称已存在")
|
||||
}
|
||||
|
||||
// 检查应用编码是否重复
|
||||
count, err = dao.Application.Count(ctx, &dto.ListApplicationReq{AppCode: req.AppCode})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if count > 0 {
|
||||
return nil, errors.New("应用编码已存在")
|
||||
}
|
||||
|
||||
// 插入数据库
|
||||
id, err := dao.Application.Insert(ctx, req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
res = &dto.CreateApplicationRes{
|
||||
Id: id,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// List 获取应用列表
|
||||
func (s *applicationService) List(ctx context.Context, req *dto.ListApplicationReq) (res *dto.ListApplicationRes, err error) {
|
||||
applicationList, total, err := dao.Application.List(ctx, req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 组装响应数据
|
||||
list := make([]dto.ApplicationItem, 0, len(applicationList))
|
||||
for _, item := range applicationList {
|
||||
list = append(list, dto.ApplicationItem{
|
||||
Id: item.Id,
|
||||
Name: item.Name,
|
||||
AppCode: item.AppCode,
|
||||
Type: item.Type,
|
||||
TypeName: s.getTypeName(item.Type),
|
||||
Status: item.Status,
|
||||
StatusName: s.getStatusName(item.Status),
|
||||
Description: item.Description,
|
||||
CreatedAt: item.CreatedAt.Unix(),
|
||||
UpdatedAt: item.UpdatedAt.Unix(),
|
||||
})
|
||||
}
|
||||
|
||||
res = &dto.ListApplicationRes{
|
||||
List: list,
|
||||
Total: total,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// GetOne 获取单个应用
|
||||
func (s *applicationService) GetOne(ctx context.Context, req *dto.GetApplicationReq) (res *dto.GetApplicationRes, err error) {
|
||||
application, err := dao.Application.GetOne(ctx, req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var applicationEntity *entity.Application
|
||||
if err = gconv.Struct(application, &applicationEntity); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return &dto.GetApplicationRes{
|
||||
Application: applicationEntity,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Update 更新应用
|
||||
func (s *applicationService) Update(ctx context.Context, req *dto.UpdateApplicationReq) (err error) {
|
||||
// 检查应用是否存在
|
||||
exist, err := dao.Application.GetOne(ctx, &dto.GetApplicationReq{Id: req.Id})
|
||||
if err != nil || exist == nil {
|
||||
return errors.New("应用不存在")
|
||||
}
|
||||
|
||||
// 如果修改了名称,检查新名称是否重复
|
||||
if req.Name != "" && req.Name != exist.Name {
|
||||
count, err := dao.Application.Count(ctx, &dto.ListApplicationReq{Name: req.Name})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if count > 0 {
|
||||
return errors.New("应用名称已存在")
|
||||
}
|
||||
}
|
||||
|
||||
// 如果修改了应用编码,检查新编码是否重复
|
||||
if req.AppCode != "" && req.AppCode != exist.AppCode {
|
||||
count, err := dao.Application.Count(ctx, &dto.ListApplicationReq{AppCode: req.AppCode})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if count > 0 {
|
||||
return errors.New("应用编码已存在")
|
||||
}
|
||||
}
|
||||
|
||||
_, err = dao.Application.Update(ctx, req)
|
||||
return
|
||||
}
|
||||
|
||||
// UpdateStatus 更新应用状态
|
||||
func (s *applicationService) UpdateStatus(ctx context.Context, req *dto.UpdateApplicationStatusReq) (err error) {
|
||||
_, err = dao.Application.UpdateStatus(ctx, req.Id, req.Status.String())
|
||||
return
|
||||
}
|
||||
|
||||
// Delete 删除应用
|
||||
func (s *applicationService) Delete(ctx context.Context, req *dto.DeleteApplicationReq) (err error) {
|
||||
// TODO: 检查是否存在关联的数据,防止误删
|
||||
// 例如: 检查该应用是否有关联的广告活动等
|
||||
|
||||
_, err = dao.Application.Delete(ctx, req)
|
||||
return
|
||||
}
|
||||
|
||||
// GetByAppCode 根据应用编码获取应用
|
||||
func (s *applicationService) GetByAppCode(ctx context.Context, appCode string) (res *entity.Application, err error) {
|
||||
return dao.Application.GetByAppCode(ctx, appCode)
|
||||
}
|
||||
|
||||
// getTypeName 获取类型名称
|
||||
func (s *applicationService) getTypeName(appType consts.AppType) string {
|
||||
typeNames := map[consts.AppType]string{
|
||||
consts.AppTypeWeb: "Web应用",
|
||||
consts.AppTypeMobile: "移动应用",
|
||||
consts.AppTypeMiniApp: "小程序",
|
||||
consts.AppTypeH5: "H5应用",
|
||||
consts.AppTypeDesktop: "桌面应用",
|
||||
consts.AppTypeThirdParty: "第三方应用",
|
||||
}
|
||||
if name, ok := typeNames[appType]; ok {
|
||||
return name
|
||||
}
|
||||
return string(appType)
|
||||
}
|
||||
|
||||
// getStatusName 获取状态名称
|
||||
func (s *applicationService) getStatusName(status consts.AppStatus) string {
|
||||
statusNames := map[consts.AppStatus]string{
|
||||
consts.AppStatusActive: "启用",
|
||||
consts.AppStatusInactive: "停用",
|
||||
}
|
||||
if name, ok := statusNames[status]; ok {
|
||||
return name
|
||||
}
|
||||
return string(status)
|
||||
}
|
||||
@@ -1,191 +0,0 @@
|
||||
package data
|
||||
|
||||
import (
|
||||
consts "cid/consts/data"
|
||||
dao "cid/dao/data"
|
||||
dto "cid/model/dto/data"
|
||||
entity "cid/model/entity/data"
|
||||
"context"
|
||||
"errors"
|
||||
)
|
||||
|
||||
type apiInterfaceService struct{}
|
||||
|
||||
// ApiInterface 接口服务
|
||||
var ApiInterface = new(apiInterfaceService)
|
||||
|
||||
// Create 创建接口
|
||||
func (s *apiInterfaceService) Create(ctx context.Context, req *dto.CreateApiInterfaceReq) (res *dto.CreateApiInterfaceRes, err error) {
|
||||
// 检查平台是否存在
|
||||
_, err = dao.Platform.GetOne(ctx, &dto.GetPlatformReq{Id: req.PlatformId})
|
||||
if err != nil {
|
||||
return nil, errors.New("平台不存在")
|
||||
}
|
||||
|
||||
// 检查接口编码在同一平台下是否重复
|
||||
interfaces, _, err := dao.ApiInterface.List(ctx, &dto.ListApiInterfaceReq{
|
||||
PlatformId: req.PlatformId,
|
||||
Code: req.Code,
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if len(interfaces) > 0 {
|
||||
return nil, errors.New("接口编码在该平台下已存在")
|
||||
}
|
||||
|
||||
// 插入数据库
|
||||
id, err := dao.ApiInterface.Insert(ctx, req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
res = &dto.CreateApiInterfaceRes{
|
||||
Id: id,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// List 获取接口列表
|
||||
func (s *apiInterfaceService) List(ctx context.Context, req *dto.ListApiInterfaceReq) (res *dto.ListApiInterfaceRes, err error) {
|
||||
apiList, total, err := dao.ApiInterface.List(ctx, req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 获取平台ID列表用于批量查询
|
||||
platformIds := make([]int64, 0)
|
||||
for _, item := range apiList {
|
||||
if item.PlatformId > 0 {
|
||||
platformIds = append(platformIds, item.PlatformId)
|
||||
}
|
||||
}
|
||||
|
||||
// 批量获取平台信息
|
||||
platformMap := make(map[int64]string)
|
||||
if len(platformIds) > 0 {
|
||||
platforms, _, err := dao.Platform.List(ctx, &dto.ListPlatformReq{})
|
||||
if err == nil {
|
||||
for _, p := range platforms {
|
||||
platformMap[p.Id] = p.Name
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 组装响应数据
|
||||
list := make([]dto.ApiInterfaceItem, 0, len(apiList))
|
||||
for _, item := range apiList {
|
||||
platformName := ""
|
||||
if name, ok := platformMap[item.PlatformId]; ok {
|
||||
platformName = name
|
||||
}
|
||||
|
||||
list = append(list, dto.ApiInterfaceItem{
|
||||
Id: item.Id,
|
||||
PlatformId: item.PlatformId,
|
||||
PlatformName: platformName,
|
||||
Name: item.Name,
|
||||
Code: item.Code,
|
||||
Url: item.Url,
|
||||
Method: item.Method,
|
||||
Status: item.Status,
|
||||
StatusName: s.getStatusName(item.Status),
|
||||
CreatedAt: item.CreatedAt.Unix(),
|
||||
UpdatedAt: item.UpdatedAt.Unix(),
|
||||
})
|
||||
}
|
||||
|
||||
res = &dto.ListApiInterfaceRes{
|
||||
List: list,
|
||||
Total: total,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// GetOne 获取单个接口
|
||||
func (s *apiInterfaceService) GetOne(ctx context.Context, req *dto.GetApiInterfaceReq) (res *dto.GetApiInterfaceRes, err error) {
|
||||
apiInterface, err := dao.ApiInterface.GetOne(ctx, req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 获取平台名称
|
||||
var platformName string
|
||||
if apiInterface.PlatformId > 0 {
|
||||
platform, _ := dao.Platform.GetOne(ctx, &dto.GetPlatformReq{Id: apiInterface.PlatformId})
|
||||
if platform != nil {
|
||||
platformName = platform.Name
|
||||
}
|
||||
}
|
||||
|
||||
return &dto.GetApiInterfaceRes{
|
||||
ApiInterface: apiInterface,
|
||||
PlatformName: platformName,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Update 更新接口
|
||||
func (s *apiInterfaceService) Update(ctx context.Context, req *dto.UpdateApiInterfaceReq) (err error) {
|
||||
// 检查接口是否存在
|
||||
exist, err := dao.ApiInterface.GetOne(ctx, &dto.GetApiInterfaceReq{Id: req.Id})
|
||||
if err != nil || exist == nil {
|
||||
return errors.New("接口不存在")
|
||||
}
|
||||
|
||||
// 如果修改了平台,检查平台是否存在
|
||||
if req.PlatformId > 0 && req.PlatformId != exist.PlatformId {
|
||||
_, err := dao.Platform.GetOne(ctx, &dto.GetPlatformReq{Id: req.PlatformId})
|
||||
if err != nil {
|
||||
return errors.New("平台不存在")
|
||||
}
|
||||
}
|
||||
|
||||
// 如果修改了编码,检查编码是否重复
|
||||
if req.Code != "" && req.Code != exist.Code {
|
||||
platformId := req.PlatformId
|
||||
if platformId == 0 {
|
||||
platformId = exist.PlatformId
|
||||
}
|
||||
interfaces, _, err := dao.ApiInterface.List(ctx, &dto.ListApiInterfaceReq{
|
||||
PlatformId: platformId,
|
||||
Code: req.Code,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(interfaces) > 0 {
|
||||
return errors.New("接口编码在该平台下已存在")
|
||||
}
|
||||
}
|
||||
|
||||
_, err = dao.ApiInterface.Update(ctx, req)
|
||||
return
|
||||
}
|
||||
|
||||
// UpdateStatus 更新接口状态
|
||||
func (s *apiInterfaceService) UpdateStatus(ctx context.Context, req *dto.UpdateApiInterfaceStatusReq) (err error) {
|
||||
_, err = dao.ApiInterface.UpdateStatus(ctx, req.Id, req.Status.String())
|
||||
return
|
||||
}
|
||||
|
||||
// Delete 删除接口
|
||||
func (s *apiInterfaceService) Delete(ctx context.Context, req *dto.DeleteApiInterfaceReq) (err error) {
|
||||
_, err = dao.ApiInterface.Delete(ctx, req)
|
||||
return
|
||||
}
|
||||
|
||||
// GetByIds 根据ID列表获取接口
|
||||
func (s *apiInterfaceService) GetByIds(ctx context.Context, ids []int64) (res []entity.ApiInterface, err error) {
|
||||
return dao.ApiInterface.GetByIds(ctx, ids)
|
||||
}
|
||||
|
||||
// getStatusName 获取状态名称
|
||||
func (s *apiInterfaceService) getStatusName(status consts.PlatformStatus) string {
|
||||
statusNames := map[consts.PlatformStatus]string{
|
||||
consts.PlatformStatusActive: "启用",
|
||||
consts.PlatformStatusInactive: "停用",
|
||||
}
|
||||
if name, ok := statusNames[status]; ok {
|
||||
return name
|
||||
}
|
||||
return string(status)
|
||||
}
|
||||
@@ -1,316 +0,0 @@
|
||||
package data
|
||||
|
||||
import (
|
||||
consts "cid/consts/data"
|
||||
dao "cid/dao/data"
|
||||
dto "cid/model/dto/data"
|
||||
entity "cid/model/entity/data"
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/os/grpool"
|
||||
"github.com/gogf/gf/v2/util/guid"
|
||||
)
|
||||
|
||||
type dataFetchService struct{}
|
||||
|
||||
// DataFetch 数据获取服务
|
||||
var DataFetch = new(dataFetchService)
|
||||
|
||||
// FetchPool 数据获取协程池,限制并发数避免goroutine爆炸
|
||||
var FetchPool = grpool.New(10)
|
||||
|
||||
// Execute 执行数据获取
|
||||
func (s *dataFetchService) Execute(ctx context.Context, req *dto.ExecuteDataFetchReq) (res *dto.ExecuteDataFetchRes, err error) {
|
||||
// 检查接口是否存在
|
||||
apiInterface, err := dao.ApiInterface.GetOne(ctx, &dto.GetApiInterfaceReq{Id: req.InterfaceId})
|
||||
if err != nil || apiInterface == nil {
|
||||
return nil, fmt.Errorf("接口不存在")
|
||||
}
|
||||
|
||||
// 检查接口状态
|
||||
if apiInterface.Status != consts.PlatformStatusActive {
|
||||
return nil, fmt.Errorf("接口未启用")
|
||||
}
|
||||
|
||||
// 检查平台是否存在
|
||||
platform, err := dao.Platform.GetOne(ctx, &dto.GetPlatformReq{Id: apiInterface.PlatformId})
|
||||
if err != nil || platform == nil {
|
||||
return nil, fmt.Errorf("平台不存在")
|
||||
}
|
||||
|
||||
// 检查平台状态
|
||||
if platform.Status != consts.PlatformStatusActive {
|
||||
return nil, fmt.Errorf("平台未启用")
|
||||
}
|
||||
|
||||
// 生成请求ID
|
||||
requestId := guid.S()
|
||||
startTime := time.Now().UnixMilli()
|
||||
|
||||
// 创建数据获取日志
|
||||
fetchLog := &entity.DataFetchLog{
|
||||
PlatformId: req.PlatformId,
|
||||
InterfaceId: req.InterfaceId,
|
||||
RequestId: requestId,
|
||||
Status: consts.FetchStatusPending,
|
||||
StartTime: startTime,
|
||||
RequestConfig: req.RequestParams,
|
||||
RetryCount: 0,
|
||||
}
|
||||
|
||||
logId, err := dao.DataFetchLog.Insert(ctx, fetchLog)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("创建日志失败: %v", err)
|
||||
}
|
||||
|
||||
// 使用协程池异步执行数据获取
|
||||
asyncCtx := context.WithoutCancel(ctx)
|
||||
FetchPool.Add(asyncCtx, func(ctx context.Context) {
|
||||
s.executeFetch(ctx, logId, requestId, platform, apiInterface, req.RequestParams, startTime)
|
||||
})
|
||||
|
||||
return &dto.ExecuteDataFetchRes{
|
||||
RequestId: requestId,
|
||||
Status: string(consts.FetchStatusPending),
|
||||
Message: "任务已提交",
|
||||
}, nil
|
||||
}
|
||||
|
||||
// BatchExecute 批量执行数据获取
|
||||
func (s *dataFetchService) BatchExecute(ctx context.Context, req *dto.BatchExecuteDataFetchReq) (res *dto.BatchExecuteDataFetchRes, err error) {
|
||||
var requestIds []string
|
||||
successCount := 0
|
||||
failedCount := 0
|
||||
|
||||
for _, interfaceId := range req.InterfaceIds {
|
||||
executeReq := &dto.ExecuteDataFetchReq{
|
||||
PlatformId: 0, // 接口服务会自动获取
|
||||
InterfaceId: interfaceId,
|
||||
RequestParams: req.RequestParams,
|
||||
}
|
||||
|
||||
executeRes, err := s.Execute(ctx, executeReq)
|
||||
if err != nil {
|
||||
failedCount++
|
||||
g.Log().Error(ctx, "批量执行失败", g.Map{
|
||||
"interfaceId": interfaceId,
|
||||
"error": err.Error(),
|
||||
})
|
||||
} else {
|
||||
successCount++
|
||||
requestIds = append(requestIds, executeRes.RequestId)
|
||||
}
|
||||
}
|
||||
|
||||
return &dto.BatchExecuteDataFetchRes{
|
||||
SuccessCount: successCount,
|
||||
FailedCount: failedCount,
|
||||
RequestIds: requestIds,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// List 获取数据获取日志列表
|
||||
func (s *dataFetchService) List(ctx context.Context, req *dto.ListDataFetchLogReq) (res *dto.ListDataFetchLogRes, err error) {
|
||||
logList, total, err := dao.DataFetchLog.List(ctx, req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 获取平台和接口ID列表
|
||||
platformIds := make([]int64, 0)
|
||||
interfaceIds := make([]int64, 0)
|
||||
for _, item := range logList {
|
||||
if item.PlatformId > 0 {
|
||||
platformIds = append(platformIds, item.PlatformId)
|
||||
}
|
||||
if item.InterfaceId > 0 {
|
||||
interfaceIds = append(interfaceIds, item.InterfaceId)
|
||||
}
|
||||
}
|
||||
|
||||
// 批量获取平台和接口信息
|
||||
platformMap := make(map[int64]string)
|
||||
interfaceMap := make(map[int64]string)
|
||||
if len(platformIds) > 0 {
|
||||
platforms, _, err := dao.Platform.List(ctx, &dto.ListPlatformReq{})
|
||||
if err == nil {
|
||||
for _, p := range platforms {
|
||||
platformMap[p.Id] = p.Name
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(interfaceIds) > 0 {
|
||||
interfaces, _, err := dao.ApiInterface.List(ctx, &dto.ListApiInterfaceReq{})
|
||||
if err == nil {
|
||||
for _, i := range interfaces {
|
||||
interfaceMap[i.Id] = i.Name
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 组装响应数据
|
||||
list := make([]dto.DataFetchLogItem, 0, len(logList))
|
||||
for _, item := range logList {
|
||||
platformName := ""
|
||||
if name, ok := platformMap[item.PlatformId]; ok {
|
||||
platformName = name
|
||||
}
|
||||
interfaceName := ""
|
||||
if name, ok := interfaceMap[item.InterfaceId]; ok {
|
||||
interfaceName = name
|
||||
}
|
||||
|
||||
list = append(list, dto.DataFetchLogItem{
|
||||
Id: item.Id,
|
||||
PlatformId: item.PlatformId,
|
||||
PlatformName: platformName,
|
||||
InterfaceId: item.InterfaceId,
|
||||
InterfaceName: interfaceName,
|
||||
RequestId: item.RequestId,
|
||||
Status: item.Status,
|
||||
StatusName: s.getStatusName(item.Status),
|
||||
StartTime: item.StartTime,
|
||||
EndTime: item.EndTime,
|
||||
Duration: item.Duration,
|
||||
ErrorMessage: item.ErrorMessage,
|
||||
RetryCount: item.RetryCount,
|
||||
CreatedAt: item.CreatedAt.Unix(),
|
||||
})
|
||||
}
|
||||
|
||||
res = &dto.ListDataFetchLogRes{
|
||||
List: list,
|
||||
Total: total,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// GetOne 获取单个数据获取日志
|
||||
func (s *dataFetchService) GetOne(ctx context.Context, req *dto.GetDataFetchLogReq) (res *dto.GetDataFetchLogRes, err error) {
|
||||
fetchLog, err := dao.DataFetchLog.GetOne(ctx, req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 获取平台和接口名称
|
||||
var platformName, interfaceName string
|
||||
if fetchLog.PlatformId > 0 {
|
||||
platform, _ := dao.Platform.GetOne(ctx, &dto.GetPlatformReq{Id: fetchLog.PlatformId})
|
||||
if platform != nil {
|
||||
platformName = platform.Name
|
||||
}
|
||||
}
|
||||
if fetchLog.InterfaceId > 0 {
|
||||
apiInterface, _ := dao.ApiInterface.GetOne(ctx, &dto.GetApiInterfaceReq{Id: fetchLog.InterfaceId})
|
||||
if apiInterface != nil {
|
||||
interfaceName = apiInterface.Name
|
||||
}
|
||||
}
|
||||
|
||||
return &dto.GetDataFetchLogRes{
|
||||
DataFetchLog: fetchLog,
|
||||
PlatformName: platformName,
|
||||
InterfaceName: interfaceName,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ReExecute 重新执行数据获取
|
||||
func (s *dataFetchService) ReExecute(ctx context.Context, req *dto.ReExecuteDataFetchReq) (res *dto.ReExecuteDataFetchRes, err error) {
|
||||
// 获取原日志
|
||||
oldLog, err := dao.DataFetchLog.GetOne(ctx, &dto.GetDataFetchLogReq{Id: req.LogId})
|
||||
if err != nil || oldLog == nil {
|
||||
return nil, fmt.Errorf("日志不存在")
|
||||
}
|
||||
|
||||
// 检查接口是否存在
|
||||
apiInterface, err := dao.ApiInterface.GetOne(ctx, &dto.GetApiInterfaceReq{Id: oldLog.InterfaceId})
|
||||
if err != nil || apiInterface == nil {
|
||||
return nil, fmt.Errorf("接口不存在")
|
||||
}
|
||||
|
||||
// 检查平台是否存在
|
||||
platform, err := dao.Platform.GetOne(ctx, &dto.GetPlatformReq{Id: apiInterface.PlatformId})
|
||||
if err != nil || platform == nil {
|
||||
return nil, fmt.Errorf("平台不存在")
|
||||
}
|
||||
|
||||
// 生成新的请求ID
|
||||
requestId := guid.S()
|
||||
startTime := time.Now().UnixMilli()
|
||||
|
||||
// 创建新的数据获取日志
|
||||
fetchLog := &entity.DataFetchLog{
|
||||
PlatformId: oldLog.PlatformId,
|
||||
InterfaceId: oldLog.InterfaceId,
|
||||
RequestId: requestId,
|
||||
Status: consts.FetchStatusPending,
|
||||
StartTime: startTime,
|
||||
RequestConfig: oldLog.RequestConfig,
|
||||
RetryCount: oldLog.RetryCount + 1,
|
||||
}
|
||||
|
||||
logId, err := dao.DataFetchLog.Insert(ctx, fetchLog)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("创建日志失败: %v", err)
|
||||
}
|
||||
|
||||
// 使用协程池异步执行数据获取
|
||||
asyncCtx := context.WithoutCancel(ctx)
|
||||
FetchPool.Add(asyncCtx, func(ctx context.Context) {
|
||||
s.executeFetch(ctx, logId, requestId, platform, apiInterface, oldLog.RequestConfig, startTime)
|
||||
})
|
||||
|
||||
return &dto.ReExecuteDataFetchRes{
|
||||
RequestId: requestId,
|
||||
Status: string(consts.FetchStatusPending),
|
||||
Message: "任务已重新提交",
|
||||
}, nil
|
||||
}
|
||||
|
||||
// executeFetch 执行实际的数据获取(在协程池中运行)
|
||||
func (s *dataFetchService) executeFetch(ctx context.Context, logId int64, requestId string, platform *entity.Platform, apiInterface *entity.ApiInterface, requestParams map[string]interface{}, startTime int64) {
|
||||
// 更新状态为执行中
|
||||
dao.DataFetchLog.UpdateStatus(ctx, logId, string(consts.FetchStatusRunning), 0, 0, "", "")
|
||||
|
||||
// TODO: 根据平台和接口配置执行HTTP请求
|
||||
// 这里需要根据平台的限流配置进行限流控制
|
||||
// 根据接口的请求配置组装请求参数
|
||||
// 调用接口获取数据
|
||||
|
||||
// 模拟接口调用
|
||||
time.Sleep(time.Second * 2)
|
||||
|
||||
// 模拟成功响应
|
||||
responseData := `{"code": 0, "message": "success", "data": {"items": []}}`
|
||||
endTime := time.Now().UnixMilli()
|
||||
duration := int(endTime - startTime)
|
||||
|
||||
// 更新状态为成功
|
||||
dao.DataFetchLog.UpdateStatus(ctx, logId, string(consts.FetchStatusSuccess), endTime, duration, responseData, "")
|
||||
|
||||
g.Log().Info(ctx, "数据获取成功", g.Map{
|
||||
"logId": logId,
|
||||
"requestId": requestId,
|
||||
"platform": platform.Name,
|
||||
"interface": apiInterface.Name,
|
||||
"duration": duration,
|
||||
})
|
||||
}
|
||||
|
||||
// getStatusName 获取状态名称
|
||||
func (s *dataFetchService) getStatusName(status consts.FetchStatus) string {
|
||||
statusNames := map[consts.FetchStatus]string{
|
||||
consts.FetchStatusPending: "待执行",
|
||||
consts.FetchStatusRunning: "执行中",
|
||||
consts.FetchStatusSuccess: "成功",
|
||||
consts.FetchStatusFailed: "失败",
|
||||
consts.FetchStatusRateLimit: "触发限流",
|
||||
}
|
||||
if name, ok := statusNames[status]; ok {
|
||||
return name
|
||||
}
|
||||
return string(status)
|
||||
}
|
||||
@@ -1,189 +0,0 @@
|
||||
package data
|
||||
|
||||
import (
|
||||
consts "cid/consts/data"
|
||||
dao "cid/dao/data"
|
||||
dto "cid/model/dto/data"
|
||||
entity "cid/model/entity/data"
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/gogf/gf/v2/util/gconv"
|
||||
)
|
||||
|
||||
type platformService struct{}
|
||||
|
||||
// Platform 平台服务
|
||||
var Platform = new(platformService)
|
||||
|
||||
// Create 创建平台
|
||||
func (s *platformService) Create(ctx context.Context, req *dto.CreatePlatformReq) (res *dto.CreatePlatformRes, err error) {
|
||||
// 检查平台名称是否重复
|
||||
count, err := dao.Platform.Count(ctx, &dto.ListPlatformReq{Name: req.Name})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if count > 0 {
|
||||
return nil, errors.New("平台名称已存在")
|
||||
}
|
||||
|
||||
// 检查平台类型是否重复
|
||||
count, err = dao.Platform.Count(ctx, &dto.ListPlatformReq{Type: req.Type})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if count > 0 {
|
||||
return nil, errors.New("该类型平台已存在")
|
||||
}
|
||||
|
||||
// 插入数据库
|
||||
id, err := dao.Platform.Insert(ctx, req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
res = &dto.CreatePlatformRes{
|
||||
Id: id,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// List 获取平台列表
|
||||
func (s *platformService) List(ctx context.Context, req *dto.ListPlatformReq) (res *dto.ListPlatformRes, err error) {
|
||||
platformList, total, err := dao.Platform.List(ctx, req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 组装响应数据
|
||||
list := make([]dto.PlatformItem, 0, len(platformList))
|
||||
for _, item := range platformList {
|
||||
list = append(list, dto.PlatformItem{
|
||||
Id: item.Id,
|
||||
Name: item.Name,
|
||||
Type: item.Type,
|
||||
TypeName: s.getTypeName(item.Type),
|
||||
Status: item.Status,
|
||||
StatusName: s.getStatusName(item.Status),
|
||||
Description: item.Description,
|
||||
CreatedAt: item.CreatedAt.Unix(),
|
||||
UpdatedAt: item.UpdatedAt.Unix(),
|
||||
})
|
||||
}
|
||||
|
||||
res = &dto.ListPlatformRes{
|
||||
List: list,
|
||||
Total: total,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// GetOne 获取单个平台
|
||||
func (s *platformService) GetOne(ctx context.Context, req *dto.GetPlatformReq) (res *dto.GetPlatformRes, err error) {
|
||||
platform, err := dao.Platform.GetOne(ctx, req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var platformEntity *entity.Platform
|
||||
if err = gconv.Struct(platform, &platformEntity); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return &dto.GetPlatformRes{
|
||||
Platform: platformEntity,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Update 更新平台
|
||||
func (s *platformService) Update(ctx context.Context, req *dto.UpdatePlatformReq) (err error) {
|
||||
// 检查平台是否存在
|
||||
exist, err := dao.Platform.GetOne(ctx, &dto.GetPlatformReq{Id: req.Id})
|
||||
if err != nil || exist == nil {
|
||||
return errors.New("平台不存在")
|
||||
}
|
||||
|
||||
// 如果修改了名称,检查新名称是否重复
|
||||
if req.Name != "" && req.Name != exist.Name {
|
||||
count, err := dao.Platform.Count(ctx, &dto.ListPlatformReq{Name: req.Name})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if count > 0 {
|
||||
return errors.New("平台名称已存在")
|
||||
}
|
||||
}
|
||||
|
||||
// 如果修改了类型,检查新类型是否重复
|
||||
if req.Type != "" && req.Type != exist.Type {
|
||||
count, err := dao.Platform.Count(ctx, &dto.ListPlatformReq{Type: req.Type})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if count > 0 {
|
||||
return errors.New("该类型平台已存在")
|
||||
}
|
||||
}
|
||||
|
||||
_, err = dao.Platform.Update(ctx, req)
|
||||
return
|
||||
}
|
||||
|
||||
// UpdateStatus 更新平台状态
|
||||
func (s *platformService) UpdateStatus(ctx context.Context, req *dto.UpdatePlatformStatusReq) (err error) {
|
||||
_, err = dao.Platform.UpdateStatus(ctx, req.Id, req.Status.String())
|
||||
return
|
||||
}
|
||||
|
||||
// Delete 删除平台
|
||||
func (s *platformService) Delete(ctx context.Context, req *dto.DeletePlatformReq) (err error) {
|
||||
// 检查是否存在关联的接口
|
||||
interfaces, _, err := dao.ApiInterface.List(ctx, &dto.ListApiInterfaceReq{
|
||||
PlatformId: req.Id,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(interfaces) > 0 {
|
||||
return errors.New("该平台下存在接口,无法删除")
|
||||
}
|
||||
|
||||
_, err = dao.Platform.Delete(ctx, req)
|
||||
return
|
||||
}
|
||||
|
||||
// GetByType 根据类型获取平台
|
||||
func (s *platformService) GetByType(ctx context.Context, platformType consts.SyncPlatform) (res *entity.Platform, err error) {
|
||||
return dao.Platform.GetByType(ctx, platformType.String())
|
||||
}
|
||||
|
||||
// getTypeName 获取类型名称
|
||||
func (s *platformService) getTypeName(platformType consts.SyncPlatform) string {
|
||||
typeNames := map[consts.SyncPlatform]string{
|
||||
consts.PlatformTaobao: "淘宝",
|
||||
consts.PlatformJD: "京东",
|
||||
consts.PlatformKuaishou: "快手",
|
||||
consts.PlatformDouyin: "抖音",
|
||||
consts.PlatformXhs: "小红书",
|
||||
consts.PlatformPdd: "拼多多",
|
||||
consts.PlatformXianyu: "闲鱼",
|
||||
consts.PlatformTmall: "天猫",
|
||||
consts.PlatformWechat: "微信",
|
||||
consts.PlatformCustom: "自定义",
|
||||
}
|
||||
if name, ok := typeNames[platformType]; ok {
|
||||
return name
|
||||
}
|
||||
return string(platformType)
|
||||
}
|
||||
|
||||
// getStatusName 获取状态名称
|
||||
func (s *platformService) getStatusName(status consts.PlatformStatus) string {
|
||||
statusNames := map[consts.PlatformStatus]string{
|
||||
consts.PlatformStatusActive: "启用",
|
||||
consts.PlatformStatusInactive: "停用",
|
||||
}
|
||||
if name, ok := statusNames[status]; ok {
|
||||
return name
|
||||
}
|
||||
return string(status)
|
||||
}
|
||||
@@ -1,306 +0,0 @@
|
||||
package mapping
|
||||
|
||||
import (
|
||||
consts "cid/consts/mapping"
|
||||
dao "cid/dao/mapping"
|
||||
dto "cid/model/dto/mapping"
|
||||
entity "cid/model/entity/mapping"
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/util/gconv"
|
||||
)
|
||||
|
||||
type dataMappingService struct{}
|
||||
|
||||
// DataMapping 数据映射服务
|
||||
var DataMapping = new(dataMappingService)
|
||||
|
||||
// Create 创建数据映射
|
||||
func (s *dataMappingService) Create(ctx context.Context, req *dto.CreateDataMappingReq) (res *dto.CreateDataMappingRes, err error) {
|
||||
// 检查接口是否存在
|
||||
// TODO: 这里需要调用data层的ApiInterface服务,暂时跳过
|
||||
|
||||
// 检查同一接口下目标字段是否重复
|
||||
existing, err := dao.DataMapping.GetByInterfaceIdAndTargetField(ctx, req.InterfaceId, req.TargetField)
|
||||
if err == nil && existing != nil {
|
||||
return nil, errors.New("该接口下目标字段已存在")
|
||||
}
|
||||
|
||||
// 插入数据库
|
||||
id, err := dao.DataMapping.Insert(ctx, req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
res = &dto.CreateDataMappingRes{
|
||||
Id: id,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// BatchCreate 批量创建数据映射
|
||||
func (s *dataMappingService) BatchCreate(ctx context.Context, req *dto.BatchCreateDataMappingReq) (res *dto.BatchCreateDataMappingRes, err error) {
|
||||
var ids []int64
|
||||
successCount := 0
|
||||
failedCount := 0
|
||||
|
||||
for _, mappingReq := range req.Mappings {
|
||||
// 设置平台和接口ID
|
||||
mappingReq.PlatformId = req.PlatformId
|
||||
mappingReq.InterfaceId = req.InterfaceId
|
||||
|
||||
createRes, err := s.Create(ctx, &mappingReq)
|
||||
if err != nil {
|
||||
failedCount++
|
||||
g.Log().Error(ctx, "批量创建映射失败", g.Map{
|
||||
"sourceField": mappingReq.SourceField,
|
||||
"targetField": mappingReq.TargetField,
|
||||
"error": err.Error(),
|
||||
})
|
||||
} else {
|
||||
successCount++
|
||||
ids = append(ids, createRes.Id)
|
||||
}
|
||||
}
|
||||
|
||||
return &dto.BatchCreateDataMappingRes{
|
||||
SuccessCount: successCount,
|
||||
FailedCount: failedCount,
|
||||
Ids: ids,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// List 获取数据映射列表
|
||||
func (s *dataMappingService) List(ctx context.Context, req *dto.ListDataMappingReq) (res *dto.ListDataMappingRes, err error) {
|
||||
mappingList, total, err := dao.DataMapping.List(ctx, req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 获取平台和接口ID列表
|
||||
platformIds := make([]int64, 0)
|
||||
interfaceIds := make([]int64, 0)
|
||||
for _, item := range mappingList {
|
||||
if item.PlatformId > 0 {
|
||||
platformIds = append(platformIds, item.PlatformId)
|
||||
}
|
||||
if item.InterfaceId > 0 {
|
||||
interfaceIds = append(interfaceIds, item.InterfaceId)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: 批量获取平台和接口信息(需要调用data层服务)
|
||||
platformMap := make(map[int64]string)
|
||||
interfaceMap := make(map[int64]string)
|
||||
|
||||
// 组装响应数据
|
||||
list := make([]dto.DataMappingItem, 0, len(mappingList))
|
||||
for _, item := range mappingList {
|
||||
platformName := ""
|
||||
if name, ok := platformMap[item.PlatformId]; ok {
|
||||
platformName = name
|
||||
}
|
||||
interfaceName := ""
|
||||
if name, ok := interfaceMap[item.InterfaceId]; ok {
|
||||
interfaceName = name
|
||||
}
|
||||
|
||||
list = append(list, dto.DataMappingItem{
|
||||
Id: item.Id,
|
||||
PlatformId: item.PlatformId,
|
||||
PlatformName: platformName,
|
||||
InterfaceId: item.InterfaceId,
|
||||
InterfaceName: interfaceName,
|
||||
SourceField: item.SourceField,
|
||||
TargetField: item.TargetField,
|
||||
FieldType: item.FieldType,
|
||||
DefaultValue: item.DefaultValue,
|
||||
TransformRule: item.TransformRule,
|
||||
Priority: item.Priority,
|
||||
Status: item.Status,
|
||||
StatusName: s.getStatusName(item.Status),
|
||||
CreatedAt: item.CreatedAt.Unix(),
|
||||
UpdatedAt: item.UpdatedAt.Unix(),
|
||||
})
|
||||
}
|
||||
|
||||
res = &dto.ListDataMappingRes{
|
||||
List: list,
|
||||
Total: total,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// GetOne 获取单个数据映射
|
||||
func (s *dataMappingService) GetOne(ctx context.Context, req *dto.GetDataMappingReq) (res *dto.GetDataMappingRes, err error) {
|
||||
mapping, err := dao.DataMapping.GetOne(ctx, req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: 获取平台和接口名称
|
||||
platformName := ""
|
||||
interfaceName := ""
|
||||
|
||||
return &dto.GetDataMappingRes{
|
||||
DataMapping: mapping,
|
||||
PlatformName: platformName,
|
||||
InterfaceName: interfaceName,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Update 更新数据映射
|
||||
func (s *dataMappingService) Update(ctx context.Context, req *dto.UpdateDataMappingReq) (err error) {
|
||||
// 检查映射是否存在
|
||||
exist, err := dao.DataMapping.GetOne(ctx, &dto.GetDataMappingReq{Id: req.Id})
|
||||
if err != nil || exist == nil {
|
||||
return errors.New("映射不存在")
|
||||
}
|
||||
|
||||
// 如果修改了目标字段,检查是否重复
|
||||
if req.TargetField != "" && req.TargetField != exist.TargetField {
|
||||
interfaceId := req.InterfaceId
|
||||
if interfaceId == 0 {
|
||||
interfaceId = exist.InterfaceId
|
||||
}
|
||||
existing, err := dao.DataMapping.GetByInterfaceIdAndTargetField(ctx, interfaceId, req.TargetField)
|
||||
if err == nil && existing != nil && existing.Id != req.Id {
|
||||
return errors.New("该接口下目标字段已存在")
|
||||
}
|
||||
}
|
||||
|
||||
_, err = dao.DataMapping.Update(ctx, req)
|
||||
return
|
||||
}
|
||||
|
||||
// Delete 删除数据映射
|
||||
func (s *dataMappingService) Delete(ctx context.Context, req *dto.DeleteDataMappingReq) (err error) {
|
||||
_, err = dao.DataMapping.Delete(ctx, req)
|
||||
return
|
||||
}
|
||||
|
||||
// Execute 执行数据映射
|
||||
func (s *dataMappingService) Execute(ctx context.Context, req *dto.ExecuteDataMappingReq) (res *dto.ExecuteDataMappingRes, err error) {
|
||||
// 获取接口的所有映射规则
|
||||
mappings, err := dao.DataMapping.GetByInterfaceId(ctx, req.InterfaceId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(mappings) == 0 {
|
||||
return &dto.ExecuteDataMappingRes{
|
||||
TargetData: map[string]interface{}{},
|
||||
AppliedRules: []string{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// 初始化目标数据
|
||||
targetData := make(map[string]interface{})
|
||||
appliedRules := make([]string, 0)
|
||||
|
||||
// 遍历映射规则进行转换
|
||||
for _, mapping := range mappings {
|
||||
if mapping.Status != consts.MappingStatusActive {
|
||||
continue
|
||||
}
|
||||
|
||||
// 获取源数据值
|
||||
sourceValue, exists := req.SourceData[mapping.SourceField]
|
||||
|
||||
// 应用转换规则
|
||||
targetValue := s.applyTransformRule(ctx, sourceValue, exists, mapping)
|
||||
|
||||
// 设置目标数据
|
||||
targetData[mapping.TargetField] = targetValue
|
||||
|
||||
appliedRules = append(appliedRules, gconv.String(mapping.SourceField)+" -> "+gconv.String(mapping.TargetField))
|
||||
}
|
||||
|
||||
return &dto.ExecuteDataMappingRes{
|
||||
TargetData: targetData,
|
||||
AppliedRules: appliedRules,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// applyTransformRule 应用转换规则
|
||||
func (s *dataMappingService) applyTransformRule(ctx context.Context, sourceValue interface{}, exists bool, mapping entity.DataMapping) interface{} {
|
||||
// 如果源字段不存在,使用默认值
|
||||
if !exists {
|
||||
if mapping.DefaultValue != "" {
|
||||
return mapping.DefaultValue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 如果没有转换规则,直接返回源值
|
||||
if mapping.TransformRule == nil || len(mapping.TransformRule) == 0 {
|
||||
return sourceValue
|
||||
}
|
||||
|
||||
// 获取转换类型
|
||||
transformType := ""
|
||||
if t, ok := mapping.TransformRule["type"].(string); ok {
|
||||
transformType = t
|
||||
}
|
||||
|
||||
// 根据转换类型应用不同的转换逻辑
|
||||
switch consts.TransformType(transformType) {
|
||||
case consts.TransformTypeFixed:
|
||||
// 固定值
|
||||
if v, ok := mapping.TransformRule["rule"].(string); ok {
|
||||
return v
|
||||
}
|
||||
case consts.TransformTypeMapping:
|
||||
// 值映射
|
||||
if mappingMap, ok := mapping.TransformRule["mappingMap"].(map[string]interface{}); ok {
|
||||
if sourceKey := gconv.String(sourceValue); sourceKey != "" {
|
||||
if v, ok := mappingMap[sourceKey]; ok {
|
||||
return v
|
||||
}
|
||||
}
|
||||
}
|
||||
case consts.TransformTypeRegex:
|
||||
// 正则转换
|
||||
if regex, ok := mapping.TransformRule["regex"].(string); ok {
|
||||
// TODO: 实现正则替换逻辑
|
||||
g.Log().Warning(ctx, "正则转换暂未实现", g.Map{
|
||||
"regex": regex,
|
||||
"sourceValue": sourceValue,
|
||||
})
|
||||
}
|
||||
case consts.TransformTypeFunction:
|
||||
// 函数转换
|
||||
if functionName, ok := mapping.TransformRule["functionName"].(string); ok {
|
||||
// TODO: 实现函数调用逻辑
|
||||
g.Log().Warning(ctx, "函数转换暂未实现", g.Map{
|
||||
"function": functionName,
|
||||
"sourceValue": sourceValue,
|
||||
})
|
||||
}
|
||||
case consts.TransformTypeScript:
|
||||
// 脚本转换
|
||||
if script, ok := mapping.TransformRule["script"].(string); ok {
|
||||
// TODO: 实现脚本执行逻辑
|
||||
g.Log().Warning(ctx, "脚本转换暂未实现", g.Map{
|
||||
"script": script,
|
||||
"sourceValue": sourceValue,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// 默认返回源值
|
||||
return sourceValue
|
||||
}
|
||||
|
||||
// getStatusName 获取状态名称
|
||||
func (s *dataMappingService) getStatusName(status consts.MappingStatus) string {
|
||||
statusNames := map[consts.MappingStatus]string{
|
||||
consts.MappingStatusActive: "启用",
|
||||
consts.MappingStatusInactive: "停用",
|
||||
}
|
||||
if name, ok := statusNames[status]; ok {
|
||||
return name
|
||||
}
|
||||
return string(status)
|
||||
}
|
||||
Reference in New Issue
Block a user