Dockerfile

This commit is contained in:
2026-03-23 14:08:11 +08:00
parent c7a2f5bd0c
commit 827d55dbee
100 changed files with 3139 additions and 5992 deletions

View File

@@ -0,0 +1,191 @@
package data
import (
consts "cid/consts/data"
dao "cid/dao/data"
dto "cid/model/dto/data"
entity "cid/model/entity/data"
"context"
"errors"
)
type apiInterfaceService struct{}
// ApiInterface 接口服务
var ApiInterface = new(apiInterfaceService)
// Create 创建接口
func (s *apiInterfaceService) Create(ctx context.Context, req *dto.CreateApiInterfaceReq) (res *dto.CreateApiInterfaceRes, err error) {
// 检查平台是否存在
_, err = dao.Platform.GetOne(ctx, &dto.GetPlatformReq{Id: req.PlatformId})
if err != nil {
return nil, errors.New("平台不存在")
}
// 检查接口编码在同一平台下是否重复
interfaces, _, err := dao.ApiInterface.List(ctx, &dto.ListApiInterfaceReq{
PlatformId: req.PlatformId,
Code: req.Code,
})
if err != nil {
return
}
if len(interfaces) > 0 {
return nil, errors.New("接口编码在该平台下已存在")
}
// 插入数据库
id, err := dao.ApiInterface.Insert(ctx, req)
if err != nil {
return
}
res = &dto.CreateApiInterfaceRes{
Id: id,
}
return
}
// List 获取接口列表
func (s *apiInterfaceService) List(ctx context.Context, req *dto.ListApiInterfaceReq) (res *dto.ListApiInterfaceRes, err error) {
apiList, total, err := dao.ApiInterface.List(ctx, req)
if err != nil {
return
}
// 获取平台ID列表用于批量查询
platformIds := make([]int64, 0)
for _, item := range apiList {
if item.PlatformId > 0 {
platformIds = append(platformIds, item.PlatformId)
}
}
// 批量获取平台信息
platformMap := make(map[int64]string)
if len(platformIds) > 0 {
platforms, _, err := dao.Platform.List(ctx, &dto.ListPlatformReq{})
if err == nil {
for _, p := range platforms {
platformMap[p.Id] = p.Name
}
}
}
// 组装响应数据
list := make([]dto.ApiInterfaceItem, 0, len(apiList))
for _, item := range apiList {
platformName := ""
if name, ok := platformMap[item.PlatformId]; ok {
platformName = name
}
list = append(list, dto.ApiInterfaceItem{
Id: item.Id,
PlatformId: item.PlatformId,
PlatformName: platformName,
Name: item.Name,
Code: item.Code,
Url: item.Url,
Method: item.Method,
Status: item.Status,
StatusName: s.getStatusName(item.Status),
CreatedAt: item.CreatedAt.Unix(),
UpdatedAt: item.UpdatedAt.Unix(),
})
}
res = &dto.ListApiInterfaceRes{
List: list,
Total: total,
}
return
}
// GetOne 获取单个接口
func (s *apiInterfaceService) GetOne(ctx context.Context, req *dto.GetApiInterfaceReq) (res *dto.GetApiInterfaceRes, err error) {
apiInterface, err := dao.ApiInterface.GetOne(ctx, req)
if err != nil {
return
}
// 获取平台名称
var platformName string
if apiInterface.PlatformId > 0 {
platform, _ := dao.Platform.GetOne(ctx, &dto.GetPlatformReq{Id: apiInterface.PlatformId})
if platform != nil {
platformName = platform.Name
}
}
return &dto.GetApiInterfaceRes{
ApiInterface: apiInterface,
PlatformName: platformName,
}, nil
}
// Update 更新接口
func (s *apiInterfaceService) Update(ctx context.Context, req *dto.UpdateApiInterfaceReq) (err error) {
// 检查接口是否存在
exist, err := dao.ApiInterface.GetOne(ctx, &dto.GetApiInterfaceReq{Id: req.Id})
if err != nil || exist == nil {
return errors.New("接口不存在")
}
// 如果修改了平台,检查平台是否存在
if req.PlatformId > 0 && req.PlatformId != exist.PlatformId {
_, err := dao.Platform.GetOne(ctx, &dto.GetPlatformReq{Id: req.PlatformId})
if err != nil {
return errors.New("平台不存在")
}
}
// 如果修改了编码,检查编码是否重复
if req.Code != "" && req.Code != exist.Code {
platformId := req.PlatformId
if platformId == 0 {
platformId = exist.PlatformId
}
interfaces, _, err := dao.ApiInterface.List(ctx, &dto.ListApiInterfaceReq{
PlatformId: platformId,
Code: req.Code,
})
if err != nil {
return err
}
if len(interfaces) > 0 {
return errors.New("接口编码在该平台下已存在")
}
}
_, err = dao.ApiInterface.Update(ctx, req)
return
}
// UpdateStatus 更新接口状态
func (s *apiInterfaceService) UpdateStatus(ctx context.Context, req *dto.UpdateApiInterfaceStatusReq) (err error) {
_, err = dao.ApiInterface.UpdateStatus(ctx, req.Id, req.Status.String())
return
}
// Delete 删除接口
func (s *apiInterfaceService) Delete(ctx context.Context, req *dto.DeleteApiInterfaceReq) (err error) {
_, err = dao.ApiInterface.Delete(ctx, req)
return
}
// GetByIds 根据ID列表获取接口
func (s *apiInterfaceService) GetByIds(ctx context.Context, ids []int64) (res []entity.ApiInterface, err error) {
return dao.ApiInterface.GetByIds(ctx, ids)
}
// getStatusName 获取状态名称
func (s *apiInterfaceService) getStatusName(status consts.PlatformStatus) string {
statusNames := map[consts.PlatformStatus]string{
consts.PlatformStatusActive: "启用",
consts.PlatformStatusInactive: "停用",
}
if name, ok := statusNames[status]; ok {
return name
}
return string(status)
}

View File

@@ -0,0 +1,316 @@
package data
import (
consts "cid/consts/data"
dao "cid/dao/data"
dto "cid/model/dto/data"
entity "cid/model/entity/data"
"context"
"fmt"
"time"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/grpool"
"github.com/gogf/gf/v2/util/guid"
)
type dataFetchService struct{}
// DataFetch 数据获取服务
var DataFetch = new(dataFetchService)
// FetchPool 数据获取协程池限制并发数避免goroutine爆炸
var FetchPool = grpool.New(10)
// Execute 执行数据获取
func (s *dataFetchService) Execute(ctx context.Context, req *dto.ExecuteDataFetchReq) (res *dto.ExecuteDataFetchRes, err error) {
// 检查接口是否存在
apiInterface, err := dao.ApiInterface.GetOne(ctx, &dto.GetApiInterfaceReq{Id: req.InterfaceId})
if err != nil || apiInterface == nil {
return nil, fmt.Errorf("接口不存在")
}
// 检查接口状态
if apiInterface.Status != consts.PlatformStatusActive {
return nil, fmt.Errorf("接口未启用")
}
// 检查平台是否存在
platform, err := dao.Platform.GetOne(ctx, &dto.GetPlatformReq{Id: apiInterface.PlatformId})
if err != nil || platform == nil {
return nil, fmt.Errorf("平台不存在")
}
// 检查平台状态
if platform.Status != consts.PlatformStatusActive {
return nil, fmt.Errorf("平台未启用")
}
// 生成请求ID
requestId := guid.S()
startTime := time.Now().UnixMilli()
// 创建数据获取日志
fetchLog := &entity.DataFetchLog{
PlatformId: req.PlatformId,
InterfaceId: req.InterfaceId,
RequestId: requestId,
Status: consts.FetchStatusPending,
StartTime: startTime,
RequestConfig: req.RequestParams,
RetryCount: 0,
}
logId, err := dao.DataFetchLog.Insert(ctx, fetchLog)
if err != nil {
return nil, fmt.Errorf("创建日志失败: %v", err)
}
// 使用协程池异步执行数据获取
asyncCtx := context.WithoutCancel(ctx)
FetchPool.Add(asyncCtx, func(ctx context.Context) {
s.executeFetch(ctx, logId, requestId, platform, apiInterface, req.RequestParams, startTime)
})
return &dto.ExecuteDataFetchRes{
RequestId: requestId,
Status: string(consts.FetchStatusPending),
Message: "任务已提交",
}, nil
}
// BatchExecute 批量执行数据获取
func (s *dataFetchService) BatchExecute(ctx context.Context, req *dto.BatchExecuteDataFetchReq) (res *dto.BatchExecuteDataFetchRes, err error) {
var requestIds []string
successCount := 0
failedCount := 0
for _, interfaceId := range req.InterfaceIds {
executeReq := &dto.ExecuteDataFetchReq{
PlatformId: 0, // 接口服务会自动获取
InterfaceId: interfaceId,
RequestParams: req.RequestParams,
}
executeRes, err := s.Execute(ctx, executeReq)
if err != nil {
failedCount++
g.Log().Error(ctx, "批量执行失败", g.Map{
"interfaceId": interfaceId,
"error": err.Error(),
})
} else {
successCount++
requestIds = append(requestIds, executeRes.RequestId)
}
}
return &dto.BatchExecuteDataFetchRes{
SuccessCount: successCount,
FailedCount: failedCount,
RequestIds: requestIds,
}, nil
}
// List 获取数据获取日志列表
func (s *dataFetchService) List(ctx context.Context, req *dto.ListDataFetchLogReq) (res *dto.ListDataFetchLogRes, err error) {
logList, total, err := dao.DataFetchLog.List(ctx, req)
if err != nil {
return
}
// 获取平台和接口ID列表
platformIds := make([]int64, 0)
interfaceIds := make([]int64, 0)
for _, item := range logList {
if item.PlatformId > 0 {
platformIds = append(platformIds, item.PlatformId)
}
if item.InterfaceId > 0 {
interfaceIds = append(interfaceIds, item.InterfaceId)
}
}
// 批量获取平台和接口信息
platformMap := make(map[int64]string)
interfaceMap := make(map[int64]string)
if len(platformIds) > 0 {
platforms, _, err := dao.Platform.List(ctx, &dto.ListPlatformReq{})
if err == nil {
for _, p := range platforms {
platformMap[p.Id] = p.Name
}
}
}
if len(interfaceIds) > 0 {
interfaces, _, err := dao.ApiInterface.List(ctx, &dto.ListApiInterfaceReq{})
if err == nil {
for _, i := range interfaces {
interfaceMap[i.Id] = i.Name
}
}
}
// 组装响应数据
list := make([]dto.DataFetchLogItem, 0, len(logList))
for _, item := range logList {
platformName := ""
if name, ok := platformMap[item.PlatformId]; ok {
platformName = name
}
interfaceName := ""
if name, ok := interfaceMap[item.InterfaceId]; ok {
interfaceName = name
}
list = append(list, dto.DataFetchLogItem{
Id: item.Id,
PlatformId: item.PlatformId,
PlatformName: platformName,
InterfaceId: item.InterfaceId,
InterfaceName: interfaceName,
RequestId: item.RequestId,
Status: item.Status,
StatusName: s.getStatusName(item.Status),
StartTime: item.StartTime,
EndTime: item.EndTime,
Duration: item.Duration,
ErrorMessage: item.ErrorMessage,
RetryCount: item.RetryCount,
CreatedAt: item.CreatedAt.Unix(),
})
}
res = &dto.ListDataFetchLogRes{
List: list,
Total: total,
}
return
}
// GetOne 获取单个数据获取日志
func (s *dataFetchService) GetOne(ctx context.Context, req *dto.GetDataFetchLogReq) (res *dto.GetDataFetchLogRes, err error) {
fetchLog, err := dao.DataFetchLog.GetOne(ctx, req)
if err != nil {
return
}
// 获取平台和接口名称
var platformName, interfaceName string
if fetchLog.PlatformId > 0 {
platform, _ := dao.Platform.GetOne(ctx, &dto.GetPlatformReq{Id: fetchLog.PlatformId})
if platform != nil {
platformName = platform.Name
}
}
if fetchLog.InterfaceId > 0 {
apiInterface, _ := dao.ApiInterface.GetOne(ctx, &dto.GetApiInterfaceReq{Id: fetchLog.InterfaceId})
if apiInterface != nil {
interfaceName = apiInterface.Name
}
}
return &dto.GetDataFetchLogRes{
DataFetchLog: fetchLog,
PlatformName: platformName,
InterfaceName: interfaceName,
}, nil
}
// ReExecute 重新执行数据获取
func (s *dataFetchService) ReExecute(ctx context.Context, req *dto.ReExecuteDataFetchReq) (res *dto.ReExecuteDataFetchRes, err error) {
// 获取原日志
oldLog, err := dao.DataFetchLog.GetOne(ctx, &dto.GetDataFetchLogReq{Id: req.LogId})
if err != nil || oldLog == nil {
return nil, fmt.Errorf("日志不存在")
}
// 检查接口是否存在
apiInterface, err := dao.ApiInterface.GetOne(ctx, &dto.GetApiInterfaceReq{Id: oldLog.InterfaceId})
if err != nil || apiInterface == nil {
return nil, fmt.Errorf("接口不存在")
}
// 检查平台是否存在
platform, err := dao.Platform.GetOne(ctx, &dto.GetPlatformReq{Id: apiInterface.PlatformId})
if err != nil || platform == nil {
return nil, fmt.Errorf("平台不存在")
}
// 生成新的请求ID
requestId := guid.S()
startTime := time.Now().UnixMilli()
// 创建新的数据获取日志
fetchLog := &entity.DataFetchLog{
PlatformId: oldLog.PlatformId,
InterfaceId: oldLog.InterfaceId,
RequestId: requestId,
Status: consts.FetchStatusPending,
StartTime: startTime,
RequestConfig: oldLog.RequestConfig,
RetryCount: oldLog.RetryCount + 1,
}
logId, err := dao.DataFetchLog.Insert(ctx, fetchLog)
if err != nil {
return nil, fmt.Errorf("创建日志失败: %v", err)
}
// 使用协程池异步执行数据获取
asyncCtx := context.WithoutCancel(ctx)
FetchPool.Add(asyncCtx, func(ctx context.Context) {
s.executeFetch(ctx, logId, requestId, platform, apiInterface, oldLog.RequestConfig, startTime)
})
return &dto.ReExecuteDataFetchRes{
RequestId: requestId,
Status: string(consts.FetchStatusPending),
Message: "任务已重新提交",
}, nil
}
// executeFetch 执行实际的数据获取(在协程池中运行)
func (s *dataFetchService) executeFetch(ctx context.Context, logId int64, requestId string, platform *entity.Platform, apiInterface *entity.ApiInterface, requestParams map[string]interface{}, startTime int64) {
// 更新状态为执行中
dao.DataFetchLog.UpdateStatus(ctx, logId, string(consts.FetchStatusRunning), 0, 0, "", "")
// TODO: 根据平台和接口配置执行HTTP请求
// 这里需要根据平台的限流配置进行限流控制
// 根据接口的请求配置组装请求参数
// 调用接口获取数据
// 模拟接口调用
time.Sleep(time.Second * 2)
// 模拟成功响应
responseData := `{"code": 0, "message": "success", "data": {"items": []}}`
endTime := time.Now().UnixMilli()
duration := int(endTime - startTime)
// 更新状态为成功
dao.DataFetchLog.UpdateStatus(ctx, logId, string(consts.FetchStatusSuccess), endTime, duration, responseData, "")
g.Log().Info(ctx, "数据获取成功", g.Map{
"logId": logId,
"requestId": requestId,
"platform": platform.Name,
"interface": apiInterface.Name,
"duration": duration,
})
}
// getStatusName 获取状态名称
func (s *dataFetchService) getStatusName(status consts.FetchStatus) string {
statusNames := map[consts.FetchStatus]string{
consts.FetchStatusPending: "待执行",
consts.FetchStatusRunning: "执行中",
consts.FetchStatusSuccess: "成功",
consts.FetchStatusFailed: "失败",
consts.FetchStatusRateLimit: "触发限流",
}
if name, ok := statusNames[status]; ok {
return name
}
return string(status)
}

View File

@@ -0,0 +1,189 @@
package data
import (
consts "cid/consts/data"
dao "cid/dao/data"
dto "cid/model/dto/data"
entity "cid/model/entity/data"
"context"
"errors"
"github.com/gogf/gf/v2/util/gconv"
)
type platformService struct{}
// Platform 平台服务
var Platform = new(platformService)
// Create 创建平台
func (s *platformService) Create(ctx context.Context, req *dto.CreatePlatformReq) (res *dto.CreatePlatformRes, err error) {
// 检查平台名称是否重复
count, err := dao.Platform.Count(ctx, &dto.ListPlatformReq{Name: req.Name})
if err != nil {
return
}
if count > 0 {
return nil, errors.New("平台名称已存在")
}
// 检查平台类型是否重复
count, err = dao.Platform.Count(ctx, &dto.ListPlatformReq{Type: req.Type})
if err != nil {
return
}
if count > 0 {
return nil, errors.New("该类型平台已存在")
}
// 插入数据库
id, err := dao.Platform.Insert(ctx, req)
if err != nil {
return
}
res = &dto.CreatePlatformRes{
Id: id,
}
return
}
// List 获取平台列表
func (s *platformService) List(ctx context.Context, req *dto.ListPlatformReq) (res *dto.ListPlatformRes, err error) {
platformList, total, err := dao.Platform.List(ctx, req)
if err != nil {
return
}
// 组装响应数据
list := make([]dto.PlatformItem, 0, len(platformList))
for _, item := range platformList {
list = append(list, dto.PlatformItem{
Id: item.Id,
Name: item.Name,
Type: item.Type,
TypeName: s.getTypeName(item.Type),
Status: item.Status,
StatusName: s.getStatusName(item.Status),
Description: item.Description,
CreatedAt: item.CreatedAt.Unix(),
UpdatedAt: item.UpdatedAt.Unix(),
})
}
res = &dto.ListPlatformRes{
List: list,
Total: total,
}
return
}
// GetOne 获取单个平台
func (s *platformService) GetOne(ctx context.Context, req *dto.GetPlatformReq) (res *dto.GetPlatformRes, err error) {
platform, err := dao.Platform.GetOne(ctx, req)
if err != nil {
return
}
var platformEntity *entity.Platform
if err = gconv.Struct(platform, &platformEntity); err != nil {
return
}
return &dto.GetPlatformRes{
Platform: platformEntity,
}, nil
}
// Update 更新平台
func (s *platformService) Update(ctx context.Context, req *dto.UpdatePlatformReq) (err error) {
// 检查平台是否存在
exist, err := dao.Platform.GetOne(ctx, &dto.GetPlatformReq{Id: req.Id})
if err != nil || exist == nil {
return errors.New("平台不存在")
}
// 如果修改了名称,检查新名称是否重复
if req.Name != "" && req.Name != exist.Name {
count, err := dao.Platform.Count(ctx, &dto.ListPlatformReq{Name: req.Name})
if err != nil {
return err
}
if count > 0 {
return errors.New("平台名称已存在")
}
}
// 如果修改了类型,检查新类型是否重复
if req.Type != "" && req.Type != exist.Type {
count, err := dao.Platform.Count(ctx, &dto.ListPlatformReq{Type: req.Type})
if err != nil {
return err
}
if count > 0 {
return errors.New("该类型平台已存在")
}
}
_, err = dao.Platform.Update(ctx, req)
return
}
// UpdateStatus 更新平台状态
func (s *platformService) UpdateStatus(ctx context.Context, req *dto.UpdatePlatformStatusReq) (err error) {
_, err = dao.Platform.UpdateStatus(ctx, req.Id, req.Status.String())
return
}
// Delete 删除平台
func (s *platformService) Delete(ctx context.Context, req *dto.DeletePlatformReq) (err error) {
// 检查是否存在关联的接口
interfaces, _, err := dao.ApiInterface.List(ctx, &dto.ListApiInterfaceReq{
PlatformId: req.Id,
})
if err != nil {
return err
}
if len(interfaces) > 0 {
return errors.New("该平台下存在接口,无法删除")
}
_, err = dao.Platform.Delete(ctx, req)
return
}
// GetByType 根据类型获取平台
func (s *platformService) GetByType(ctx context.Context, platformType consts.SyncPlatform) (res *entity.Platform, err error) {
return dao.Platform.GetByType(ctx, platformType.String())
}
// getTypeName 获取类型名称
func (s *platformService) getTypeName(platformType consts.SyncPlatform) string {
typeNames := map[consts.SyncPlatform]string{
consts.PlatformTaobao: "淘宝",
consts.PlatformJD: "京东",
consts.PlatformKuaishou: "快手",
consts.PlatformDouyin: "抖音",
consts.PlatformXhs: "小红书",
consts.PlatformPdd: "拼多多",
consts.PlatformXianyu: "闲鱼",
consts.PlatformTmall: "天猫",
consts.PlatformWechat: "微信",
consts.PlatformCustom: "自定义",
}
if name, ok := typeNames[platformType]; ok {
return name
}
return string(platformType)
}
// getStatusName 获取状态名称
func (s *platformService) getStatusName(status consts.PlatformStatus) string {
statusNames := map[consts.PlatformStatus]string{
consts.PlatformStatusActive: "启用",
consts.PlatformStatusInactive: "停用",
}
if name, ok := statusNames[status]; ok {
return name
}
return string(status)
}