v2版本正式上线测试版
This commit is contained in:
74
V1/jobs/cron.go
Normal file
74
V1/jobs/cron.go
Normal file
@@ -0,0 +1,74 @@
|
||||
/*
|
||||
* @Author: haodaquan
|
||||
* @Date: 2017-06-21 12:54:47
|
||||
* @Last Modified by: haodaquan
|
||||
* @Last Modified time: 2017-06-23 11:04:25
|
||||
*/
|
||||
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"github.com/astaxie/beego"
|
||||
"github.com/george518/PPGo_Job/crons"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var (
|
||||
mainCron *cron.Cron
|
||||
workPool chan bool
|
||||
lock sync.Mutex
|
||||
)
|
||||
|
||||
func init() {
|
||||
if size, _ := beego.AppConfig.Int("jobs.pool"); size > 0 {
|
||||
workPool = make(chan bool, size)
|
||||
}
|
||||
mainCron = cron.New()
|
||||
mainCron.Start()
|
||||
}
|
||||
|
||||
func AddJob(spec string, job *Job) bool {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
if GetEntryById(job.id) != nil {
|
||||
return false
|
||||
}
|
||||
err := mainCron.AddJob(spec, job)
|
||||
if err != nil {
|
||||
beego.Error("AddJob: ", err.Error())
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func RemoveJob(id int) {
|
||||
mainCron.RemoveJob(func(e *cron.Entry) bool {
|
||||
if v, ok := e.Job.(*Job); ok {
|
||||
if v.id == id {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
func GetEntryById(id int) *cron.Entry {
|
||||
entries := mainCron.Entries()
|
||||
for _, e := range entries {
|
||||
if v, ok := e.Job.(*Job); ok {
|
||||
if v.id == id {
|
||||
return e
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetEntries(size int) []*cron.Entry {
|
||||
ret := mainCron.Entries()
|
||||
if len(ret) > size {
|
||||
return ret[:size]
|
||||
}
|
||||
return ret
|
||||
}
|
||||
51
V1/jobs/init.go
Normal file
51
V1/jobs/init.go
Normal file
@@ -0,0 +1,51 @@
|
||||
/*
|
||||
* @Author: haodaquan
|
||||
* @Date: 2017-06-21 12:55:19
|
||||
* @Last Modified by: haodaquan
|
||||
* @Last Modified time: 2017-06-21 13:03:06
|
||||
*/
|
||||
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/astaxie/beego"
|
||||
"github.com/george518/PPGo_Job/models"
|
||||
"os/exec"
|
||||
"time"
|
||||
)
|
||||
|
||||
func InitJobs() {
|
||||
list, _ := models.TaskGetList(1, 1000000, "status", 1)
|
||||
for _, task := range list {
|
||||
job, err := NewJobFromTask(task)
|
||||
if err != nil {
|
||||
beego.Error("InitJobs:", err.Error())
|
||||
continue
|
||||
}
|
||||
AddJob(task.CronSpec, job)
|
||||
}
|
||||
}
|
||||
|
||||
func runCmdWithTimeout(cmd *exec.Cmd, timeout time.Duration) (error, bool) {
|
||||
done := make(chan error)
|
||||
go func() {
|
||||
done <- cmd.Wait()
|
||||
}()
|
||||
|
||||
var err error
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
beego.Warn(fmt.Sprintf("任务执行时间超过%d秒,进程将被强制杀掉: %d", int(timeout/time.Second), cmd.Process.Pid))
|
||||
go func() {
|
||||
<-done // 读出上面的goroutine数据,避免阻塞导致无法退出
|
||||
}()
|
||||
if err = cmd.Process.Kill(); err != nil {
|
||||
beego.Error(fmt.Sprintf("进程无法杀掉: %d, 错误信息: %s", cmd.Process.Pid, err))
|
||||
}
|
||||
return err, true
|
||||
case err = <-done:
|
||||
return err, false
|
||||
}
|
||||
}
|
||||
|
||||
272
V1/jobs/job.go
Normal file
272
V1/jobs/job.go
Normal file
@@ -0,0 +1,272 @@
|
||||
/*
|
||||
* @Author: haodaquan
|
||||
* @Date: 2017-06-21 12:56:08
|
||||
* @Last Modified by: haodaquan
|
||||
* @Last Modified time: 2017-06-21 13:05:57
|
||||
*/
|
||||
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os/exec"
|
||||
"runtime/debug"
|
||||
"time"
|
||||
|
||||
"github.com/astaxie/beego"
|
||||
"github.com/george518/PPGo_Job/models"
|
||||
"golang.org/x/crypto/ssh"
|
||||
)
|
||||
|
||||
type Job struct {
|
||||
id int // 任务ID
|
||||
logId int64 // 日志记录ID
|
||||
name string // 任务名称
|
||||
task *models.Task // 任务对象
|
||||
runFunc func(time.Duration) (string, string, error, bool) // 执行函数
|
||||
status int // 任务状态,大于0表示正在执行中
|
||||
Concurrent bool // 同一个任务是否允许并行执行
|
||||
}
|
||||
|
||||
func NewJobFromTask(task *models.Task) (*Job, error) {
|
||||
if task.Id < 1 {
|
||||
return nil, fmt.Errorf("ToJob: 缺少id")
|
||||
}
|
||||
//本地程序执行
|
||||
if task.ServerId == 0 {
|
||||
job := NewCommandJob(task.Id, task.TaskName, task.Command)
|
||||
job.task = task
|
||||
job.Concurrent = task.Concurrent == 1
|
||||
return job, nil
|
||||
}
|
||||
|
||||
server, _ := models.TaskServerGetById(task.ServerId)
|
||||
if server.Type == 0 {
|
||||
//密码验证登录服务器
|
||||
job := RemoteCommandJobByPassword(task.Id, task.TaskName, task.Command, server)
|
||||
job.task = task
|
||||
job.Concurrent = task.Concurrent == 1
|
||||
return job, nil
|
||||
}
|
||||
|
||||
job := RemoteCommandJob(task.Id, task.TaskName, task.Command, server)
|
||||
job.task = task
|
||||
job.Concurrent = task.Concurrent == 1
|
||||
return job, nil
|
||||
|
||||
}
|
||||
|
||||
func NewCommandJob(id int, name string, command string) *Job {
|
||||
job := &Job{
|
||||
id: id,
|
||||
name: name,
|
||||
}
|
||||
job.runFunc = func(timeout time.Duration) (string, string, error, bool) {
|
||||
bufOut := new(bytes.Buffer)
|
||||
bufErr := new(bytes.Buffer)
|
||||
//cmd := exec.Command("/bin/bash", "-c", command)
|
||||
cmd := exec.Command("sh", "-c", command)
|
||||
cmd.Stdout = bufOut
|
||||
cmd.Stderr = bufErr
|
||||
cmd.Start()
|
||||
err, isTimeout := runCmdWithTimeout(cmd, timeout)
|
||||
|
||||
return bufOut.String(), bufErr.String(), err, isTimeout
|
||||
}
|
||||
return job
|
||||
}
|
||||
|
||||
//远程执行任务 密钥验证
|
||||
func RemoteCommandJob(id int, name string, command string, servers *models.TaskServer) *Job {
|
||||
job := &Job{
|
||||
id: id,
|
||||
name: name,
|
||||
}
|
||||
job.runFunc = func(timeout time.Duration) (string, string, error, bool) {
|
||||
|
||||
key, err := ioutil.ReadFile(servers.PrivateKeySrc)
|
||||
if err != nil {
|
||||
return "", "", err, false
|
||||
}
|
||||
// Create the Signer for this private key.
|
||||
signer, err := ssh.ParsePrivateKey(key)
|
||||
if err != nil {
|
||||
return "", "", err, false
|
||||
}
|
||||
addr := fmt.Sprintf("%s:%d", servers.ServerIp, servers.Port)
|
||||
config := &ssh.ClientConfig{
|
||||
User: servers.ServerAccount,
|
||||
Auth: []ssh.AuthMethod{
|
||||
// Use the PublicKeys method for remote authentication.
|
||||
ssh.PublicKeys(signer),
|
||||
},
|
||||
//HostKeyCallback: ssh.FixedHostKey(hostKey),
|
||||
HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
// Connect to the remote server and perform the SSH handshake.47.93.220.5
|
||||
client, err := ssh.Dial("tcp", addr, config)
|
||||
if err != nil {
|
||||
return "", "", err, false
|
||||
}
|
||||
|
||||
defer client.Close()
|
||||
|
||||
session, err := client.NewSession()
|
||||
if err != nil {
|
||||
return "", "", err, false
|
||||
}
|
||||
defer session.Close()
|
||||
|
||||
// Once a Session is created, you can execute a single command on
|
||||
// the remote side using the Run method.
|
||||
|
||||
var b bytes.Buffer
|
||||
var c bytes.Buffer
|
||||
session.Stdout = &b
|
||||
session.Stderr = &c
|
||||
|
||||
//session.Output(command)
|
||||
if err := session.Run(command); err != nil {
|
||||
return "", "", err, false
|
||||
}
|
||||
isTimeout := false
|
||||
return b.String(), c.String(), err, isTimeout
|
||||
}
|
||||
return job
|
||||
}
|
||||
|
||||
func RemoteCommandJobByPassword(id int, name string, command string, servers *models.TaskServer) *Job {
|
||||
var (
|
||||
auth []ssh.AuthMethod
|
||||
addr string
|
||||
clientConfig *ssh.ClientConfig
|
||||
client *ssh.Client
|
||||
session *ssh.Session
|
||||
err error
|
||||
)
|
||||
|
||||
job := &Job{
|
||||
id: id,
|
||||
name: name,
|
||||
}
|
||||
job.runFunc = func(timeout time.Duration) (string, string, error, bool) {
|
||||
// get auth method
|
||||
auth = make([]ssh.AuthMethod, 0)
|
||||
auth = append(auth, ssh.Password(servers.Password))
|
||||
|
||||
clientConfig = &ssh.ClientConfig{
|
||||
User: servers.ServerAccount,
|
||||
Auth: auth,
|
||||
HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
|
||||
return nil
|
||||
},
|
||||
//Timeout: 1000 * time.Second,
|
||||
}
|
||||
|
||||
// connet to ssh
|
||||
addr = fmt.Sprintf("%s:%d", servers.ServerIp, servers.Port)
|
||||
|
||||
if client, err = ssh.Dial("tcp", addr, clientConfig); err != nil {
|
||||
return "", "", err, false
|
||||
}
|
||||
|
||||
defer client.Close()
|
||||
|
||||
// create session
|
||||
if session, err = client.NewSession(); err != nil {
|
||||
return "", "", err, false
|
||||
}
|
||||
|
||||
var b bytes.Buffer
|
||||
var c bytes.Buffer
|
||||
session.Stdout = &b
|
||||
session.Stderr = &c
|
||||
|
||||
//session.Output(command)
|
||||
if err := session.Run(command); err != nil {
|
||||
return "", "", err, false
|
||||
}
|
||||
isTimeout := false
|
||||
return b.String(), c.String(), err, isTimeout
|
||||
}
|
||||
|
||||
return job
|
||||
}
|
||||
|
||||
func (j *Job) Status() int {
|
||||
return j.status
|
||||
}
|
||||
|
||||
func (j *Job) GetName() string {
|
||||
return j.name
|
||||
}
|
||||
|
||||
func (j *Job) GetId() int {
|
||||
return j.id
|
||||
}
|
||||
|
||||
func (j *Job) GetLogId() int64 {
|
||||
return j.logId
|
||||
}
|
||||
|
||||
func (j *Job) Run() {
|
||||
if !j.Concurrent && j.status > 0 {
|
||||
beego.Warn(fmt.Sprintf("任务[%d]上一次执行尚未结束,本次被忽略。", j.id))
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
beego.Error(err, "\n", string(debug.Stack()))
|
||||
}
|
||||
}()
|
||||
|
||||
if workPool != nil {
|
||||
workPool <- true
|
||||
defer func() {
|
||||
<-workPool
|
||||
}()
|
||||
}
|
||||
|
||||
beego.Debug(fmt.Sprintf("开始执行任务: %d", j.id))
|
||||
|
||||
j.status++
|
||||
defer func() {
|
||||
j.status--
|
||||
}()
|
||||
|
||||
t := time.Now()
|
||||
timeout := time.Duration(time.Hour * 24)
|
||||
if j.task.Timeout > 0 {
|
||||
timeout = time.Second * time.Duration(j.task.Timeout)
|
||||
}
|
||||
cmdOut, cmdErr, err, isTimeout := j.runFunc(timeout)
|
||||
ut := time.Now().Sub(t) / time.Millisecond
|
||||
|
||||
// 插入日志
|
||||
log := new(models.TaskLog)
|
||||
log.TaskId = j.id
|
||||
log.Output = cmdOut
|
||||
log.Error = cmdErr
|
||||
log.ProcessTime = int(ut)
|
||||
log.CreateTime = t.Unix()
|
||||
|
||||
if isTimeout {
|
||||
log.Status = models.TASK_TIMEOUT
|
||||
log.Error = fmt.Sprintf("任务执行超过 %d 秒\n----------------------\n%s\n", int(timeout/time.Second), cmdErr)
|
||||
} else if err != nil {
|
||||
log.Status = models.TASK_ERROR
|
||||
log.Error = err.Error() + ":" + cmdErr
|
||||
}
|
||||
j.logId, _ = models.TaskLogAdd(log)
|
||||
|
||||
// 更新上次执行时间
|
||||
j.task.PrevTime = t.Unix()
|
||||
j.task.ExecuteTimes++
|
||||
j.task.Update("PrevTime", "ExecuteTimes")
|
||||
}
|
||||
Reference in New Issue
Block a user