V2.7 增加agent执行器

This commit is contained in:
georgehao
2019-07-03 22:31:27 +08:00
parent c3a89e9243
commit 37fb659c4e
48 changed files with 2832 additions and 513 deletions

39
agent/server/config.go Normal file
View File

@@ -0,0 +1,39 @@
/************************************************************
** @Description: server
** @Author: george hao
** @Date: 2018-11-29 11:13
** @Last Modified by: george hao
** @Last Modified time: 2018-11-29 11:13
*************************************************************/
package server
import (
"github.com/george518/PPGo_Job/agent/common"
"github.com/go-ini/ini"
)
var C = new(common.Conf)
var ConfPath string
func InitConfig(path string) error {
Cfg, err := ini.Load(path)
if err != nil {
return err
}
ConfPath = path
err = Cfg.MapTo(C)
return err
}
func SaveConfig(key string, value string) error {
Cfg, err := ini.Load(ConfPath)
if err != nil {
return err
}
Cfg.Section("").Key(key).SetValue(value)
Cfg.SaveTo(ConfPath)
InitConfig(ConfPath)
return nil
}

139
agent/server/job.go Normal file
View File

@@ -0,0 +1,139 @@
/************************************************************
** @Description: job
** @Author: george hao
** @Date: 2019-06-24 15:14
** @Last Modified by: george hao
** @Last Modified time: 2019-06-24 15:14
*************************************************************/
package server
import (
"bytes"
"fmt"
"github.com/astaxie/beego/logs"
. "github.com/george518/PPGo_Job/jobs"
"github.com/george518/PPGo_Job/libs"
"github.com/george518/PPGo_Job/models"
"os/exec"
"runtime"
"runtime/debug"
"sync"
"time"
)
//执行句柄map
var CmdMap sync.Map
func SetCmdMap(key string, cmd *exec.Cmd) {
if _, ok := CmdMap.Load(key); ok {
Counter.Store(key, cmd)
}
}
func GetCmdMap(key string) *exec.Cmd {
if v, ok := CmdMap.Load(key); ok {
return v.(*exec.Cmd)
}
return nil
}
func RestJobFromTask(task *models.Task, serverId int) (*Job, error) {
if task.Id < 1 {
return nil, fmt.Errorf("ToJob: 缺少id")
}
if task.ServerIds == "" {
return nil, fmt.Errorf("任务执行失败,找不到执行的服务器")
}
job := ResetCommandJob(task.Id, serverId, task.TaskName, task.Command)
job.Task = task
job.Concurrent = task.Concurrent == 1
job.ServerId = serverId
job.ServerName = "执行器"
return job, nil
}
func ResetCommandJob(id int, serverId int, name string, command string) *Job {
job := &Job{
Id: id,
Name: name,
}
job.JobKey = libs.JobKey(id, serverId)
job.RunFunc = func(timeout time.Duration) (jobResult *JobResult) {
bufOut := new(bytes.Buffer)
bufErr := new(bytes.Buffer)
//cmd := exec.Command("/bin/bash", "-c", command)
var cmd *exec.Cmd
if runtime.GOOS == "windows" {
cmd = exec.Command("CMD", "/C", command)
} else {
cmd = exec.Command("sh", "-c", command)
}
cmd.Stdout = bufOut
cmd.Stderr = bufErr
cmd.Start()
err, isTimeout := runCmdWithTimeout(cmd, timeout)
jobResult = new(JobResult)
jobResult.ErrMsg = libs.GbkAsUtf8(bufErr.String())
jobResult.OutMsg = libs.GbkAsUtf8(bufOut.String())
jobResult.IsOk = true
if err != nil {
jobResult.IsOk = false
}
jobResult.IsTimeout = isTimeout
return
}
return job
}
func runCmdWithTimeout(cmd *exec.Cmd, timeout time.Duration) (error, bool) {
done := make(chan error)
go func() {
done <- cmd.Wait()
}()
var err error
select {
case <-time.After(timeout):
logs.Warn(fmt.Sprintf("任务执行时间超过%d秒进程将被强制杀掉: %d", int(timeout/time.Second), cmd.Process.Pid))
go func() {
<-done // 读出上面的goroutine数据避免阻塞导致无法退出
}()
if err = cmd.Process.Kill(); err != nil {
logs.Error(fmt.Sprintf("进程无法杀掉: %d, 错误信息: %s", cmd.Process.Pid, err))
}
return err, true
case err = <-done:
return err, false
}
}
func Run(j *Job) *JobResult {
defer func() {
if err := recover(); err != nil {
logs.Error(err, "\n", string(debug.Stack()))
}
}()
logs.Debug(fmt.Sprintf("开始执行任务: %d", j.JobKey))
j.Status++
defer func() {
j.Status--
}()
timeout := time.Duration(time.Hour * 24)
if j.Task.Timeout > 0 {
timeout = time.Second * time.Duration(j.Task.Timeout)
}
return j.RunFunc(timeout)
}

41
agent/server/logs.go Normal file
View File

@@ -0,0 +1,41 @@
/************************************************************
** @Description: log
** @Author: haodaquan
** @Date: 2018-08-22 23:00
** @Last Modified by: haodaquan
** @Last Modified time: 2018-08-22 23:00
*************************************************************/
package server
import (
"fmt"
"log"
"net/http"
"strings"
"time"
)
var Env string
func init() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
}
//http相关
func WriteLog(r *http.Request, t time.Time, match string, pattern string) {
if C.AppMode != "prod" {
d := time.Now().Sub(t)
l := fmt.Sprintf("[ACCESS] | % -10s | % -40s | % -16s | % -10s | % -40s |",
r.Method, r.URL.Path, d.String(), match, pattern)
log.Println(l)
}
}
//系统运行相关
func NLog(level string, value ...interface{}) {
if strings.Contains(C.LogLevel, level) || C.LogLevel == "ALL" {
log.Println("["+level+"]", value)
return
}
}

104
agent/server/notify.go Normal file
View File

@@ -0,0 +1,104 @@
/************************************************************
** @Description: notify
** @Author: george hao
** @Date: 2019-06-26 15:17
** @Last Modified by: george hao
** @Last Modified time: 2019-06-26 15:17
*************************************************************/
package server
import (
"encoding/json"
"fmt"
"github.com/george518/PPGo_Job/libs"
"strconv"
)
//启动时注册
func Register() error {
//获取本机ip以及端口 todo ip合法性判断
if C.TcpIp == "auto" {
tcpIp := libs.GetHostIp(C.IpType)
if tcpIp == "" {
return fmt.Errorf("无法获取本机IP请手工在配置文件里设置")
}
SaveConfig("TcpIp", tcpIp)
}
param := make(map[string]string, 0)
if C.ServerName == "auto" {
serverName := "agent-" + C.TcpIp + "-" + strconv.Itoa(C.TcpPort)
SaveConfig("ServerName", serverName)
}
param["server_ip"] = C.TcpIp
param["port"] = strconv.Itoa(C.TcpPort)
param["server_name"] = C.ServerName
param["detail"] = "自动注册执行器"
param["connection_type"] = "2"
param["group_id"] = C.GroupId
if C.RegisterUrl == "" {
return fmt.Errorf("自动注册地址配置错误")
}
body, err := libs.HttpGet(C.RegisterUrl, param)
if err != nil {
return err
}
m := make(map[string]interface{})
err = json.Unmarshal([]byte(body), &m)
if err != nil {
return err
}
if _, ok := m["status"]; ok {
if m["status"] == float64(0) {
//回写serverId
serverId := int(m["message"].(float64))
SaveConfig("ServerId", strconv.Itoa(serverId))
return nil
} else {
return fmt.Errorf("自动注册失败:%v", m["message"])
}
}
return fmt.Errorf("自动注册失败")
}
//程序异常退出的通知
func Close() error {
param := make(map[string]string, 0)
param["server_ip"] = C.TcpIp
param["port"] = strconv.Itoa(C.TcpPort)
param["status"] = "1"
if C.UpdateStatusUrl == "" {
return fmt.Errorf("执行器退出通知异常,请到系统中修改状态")
}
body, err := libs.HttpGet(C.UpdateStatusUrl, param)
if err != nil {
return err
}
m := make(map[string]interface{})
err = json.Unmarshal([]byte(body), &m)
if err != nil {
return err
}
if _, ok := m["status"]; ok {
if m["status"] == float64(0) {
return nil
} else {
return fmt.Errorf("执行器退出通知异常:%v", m["message"])
}
}
return fmt.Errorf("执行器退出通知异常:未知原因")
}
//心跳机制
func Heartbeat() error {
return nil
}

38
agent/server/service.go Normal file
View File

@@ -0,0 +1,38 @@
/************************************************************
** @Description: service
** @Author: george hao
** @Date: 2019-06-26 15:27
** @Last Modified by: george hao
** @Last Modified time: 2019-06-26 15:27
*************************************************************/
package server
import (
"net"
"net/rpc"
"net/rpc/jsonrpc"
"strconv"
)
//初始化路由
func init() {
rpc.RegisterName("RpcTask", new(RpcTask))
rpc.RegisterName("HeartBeat", new(RpcTask))
}
func RpcRun() error {
listener, err := net.Listen("tcp", ":"+strconv.Itoa(C.TcpPort))
if err != nil {
return err
}
for {
conn, err := listener.Accept()
if err != nil {
return err
}
//注意ServerCodec是个方法不是接口
go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
//go rpc.ServeConn(conn)
}
}

41
agent/server/signal.go Normal file
View File

@@ -0,0 +1,41 @@
/************************************************************
** @Description: server
** @Author: george hao
** @Date: 2018-11-29 11:24
** @Last Modified by: george hao
** @Last Modified time: 2018-11-29 11:24
*************************************************************/
package server
import (
"os"
"os/signal"
"syscall"
)
//监听关闭状态
func ListenSignal() {
//创建监听退出chan
c := make(chan os.Signal)
//监听指定信号 ctrl+c kill
signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1, syscall.SIGUSR2)
go func() {
for s := range c {
switch s {
case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1, syscall.SIGUSR2:
NLog("NOTICE", " Ready to quit close type ", s)
//TODO 异常警报,汇报状态
if err := Close(); err != nil {
NLog("ERROR", err.Error())
} else {
NLog("NOTICE", " 执行器安全关闭...")
}
os.Exit(0)
default:
NLog("NOTICE", " close type ", s)
os.Exit(1)
}
}
}()
}

47
agent/server/task.go Normal file
View File

@@ -0,0 +1,47 @@
/************************************************************
** @Description: task
** @Author: george hao
** @Date: 2019-06-24 13:22
** @Last Modified by: george hao
** @Last Modified time: 2019-06-24 13:22
*************************************************************/
package server
import (
"github.com/astaxie/beego/logs"
"github.com/george518/PPGo_Job/jobs"
"github.com/george518/PPGo_Job/models"
)
type RpcTask struct {
}
type RpcResult struct {
Status int
Message string
}
//Execute once
func (r *RpcTask) RunTask(task *models.Task, Result *jobs.JobResult) error {
server_id := C.ServerId
job, err := RestJobFromTask(task, server_id)
if err != nil {
return nil
}
*Result = *(Run(job))
return nil
}
//Kill execution
func (r *RpcTask) KillCommand(task models.Task, reply *RpcResult) error {
reply.Status = 200
reply.Message = "Ok kill " + task.TaskName
return nil
}
func (r *RpcTask) HeartBeat(ping string, reply *RpcResult) error {
reply.Status = 200
reply.Message = ping + " pong"
logs.Info(ping)
return nil
}