package main import ( "context" "model-gateway/model/dto" "model-gateway/service/job" "model-gateway/service/task" "os" "os/signal" "syscall" "time" "model-gateway/controller" "gitea.com/red-future/common/http" "gitea.com/red-future/common/jaeger" _ "gitea.com/red-future/common/swagger" _ "github.com/gogf/gf/contrib/drivers/pgsql/v2" _ "github.com/gogf/gf/contrib/nosql/redis/v2" "github.com/gogf/gf/v2/frame/g" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() defer jaeger.ShutDown(ctx) // 注册路由 http.RouteRegister([]interface{}{ controller.Model, controller.Task, controller.Stat, }) // 本地调试:可选自动触发 worker/cleaner(由配置文件控制) startAutoRunner(ctx) // 监听退出信号,确保 Ctrl+C 能完整退出(停止 worker/cleaner 并关闭 gateway server) quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt, syscall.SIGTERM) <-quit g.Log().Infof(ctx, "[main] 收到退出信号,开始优雅退出...") cancel() // 关闭 gateway server(RouteRegister 内部是 go Httpserver.Run() 启动的) _ = http.Httpserver.Shutdown() } func startAutoRunner(ctx context.Context) { // worker if g.Cfg().MustGet(ctx, "asynch.worker.enabled").Bool() { interval := g.Cfg().MustGet(ctx, "asynch.worker.intervalSeconds").Int() if interval <= 0 { interval = 5 } batchSize := g.Cfg().MustGet(ctx, "asynch.worker.batchSize").Int() goroutines := g.Cfg().MustGet(ctx, "asynch.worker.goroutines").Int() ticker := time.NewTicker(time.Duration(interval) * time.Second) go func() { defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: if _, err := task.AsyncWorker.RunOnce(ctx, &dto.RunWorkReq{ BatchSize: batchSize, Goroutines: goroutines, }); err != nil { g.Log().Warningf(ctx, "[auto-worker] run once failed: %v", err) } } } }() } // cleaner if g.Cfg().MustGet(ctx, "asynch.cleaner.enabled").Bool() { interval := g.Cfg().MustGet(ctx, "asynch.cleaner.intervalSeconds").Int() if interval <= 0 { interval = 30 } ticker := time.NewTicker(time.Duration(interval) * time.Second) go func() { defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: _, _ = job.Cleaner.RunOnce(ctx) } } }() } // queryPending if g.Cfg().MustGet(ctx, "asynch.queryPending.enabled").Bool() { interval := g.Cfg().MustGet(ctx, "asynch.queryPending.intervalSeconds", 10).Int() limit := g.Cfg().MustGet(ctx, "asynch.queryPending.limit", 10).Int() ticker := time.NewTicker(time.Duration(interval) * time.Second) go func() { defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: if _, err := task.Task.QueryPendingTasks(ctx, &dto.QueryPendingTasksReq{Limit: limit}); err != nil { g.Log().Warningf(ctx, "[auto-queryPending] run once failed: %v", err) } } } }() } }