定时任务管理
This commit is contained in:
郝大全
2017-06-23 12:24:42 +08:00
commit 34bfe3d354
77 changed files with 16609 additions and 0 deletions

74
jobs/cron.go Normal file
View File

@@ -0,0 +1,74 @@
/*
* @Author: haodaquan
* @Date: 2017-06-21 12:54:47
* @Last Modified by: haodaquan
* @Last Modified time: 2017-06-23 11:04:25
*/
package jobs
import (
"PPGo_Job/crons"
"github.com/astaxie/beego"
"sync"
)
var (
mainCron *cron.Cron
workPool chan bool
lock sync.Mutex
)
func init() {
if size, _ := beego.AppConfig.Int("jobs.pool"); size > 0 {
workPool = make(chan bool, size)
}
mainCron = cron.New()
mainCron.Start()
}
func AddJob(spec string, job *Job) bool {
lock.Lock()
defer lock.Unlock()
if GetEntryById(job.id) != nil {
return false
}
err := mainCron.AddJob(spec, job)
if err != nil {
beego.Error("AddJob: ", err.Error())
return false
}
return true
}
func RemoveJob(id int) {
mainCron.RemoveJob(func(e *cron.Entry) bool {
if v, ok := e.Job.(*Job); ok {
if v.id == id {
return true
}
}
return false
})
}
func GetEntryById(id int) *cron.Entry {
entries := mainCron.Entries()
for _, e := range entries {
if v, ok := e.Job.(*Job); ok {
if v.id == id {
return e
}
}
}
return nil
}
func GetEntries(size int) []*cron.Entry {
ret := mainCron.Entries()
if len(ret) > size {
return ret[:size]
}
return ret
}

50
jobs/init.go Normal file
View File

@@ -0,0 +1,50 @@
/*
* @Author: haodaquan
* @Date: 2017-06-21 12:55:19
* @Last Modified by: haodaquan
* @Last Modified time: 2017-06-21 13:03:06
*/
package jobs
import (
"PPGo_Job/models"
"fmt"
"github.com/astaxie/beego"
"os/exec"
"time"
)
func InitJobs() {
list, _ := models.TaskGetList(1, 1000000, "status", 1)
for _, task := range list {
job, err := NewJobFromTask(task)
if err != nil {
beego.Error("InitJobs:", err.Error())
continue
}
AddJob(task.CronSpec, job)
}
}
func runCmdWithTimeout(cmd *exec.Cmd, timeout time.Duration) (error, bool) {
done := make(chan error)
go func() {
done <- cmd.Wait()
}()
var err error
select {
case <-time.After(timeout):
beego.Warn(fmt.Sprintf("任务执行时间超过%d秒进程将被强制杀掉: %d", int(timeout/time.Second), cmd.Process.Pid))
go func() {
<-done // 读出上面的goroutine数据避免阻塞导致无法退出
}()
if err = cmd.Process.Kill(); err != nil {
beego.Error(fmt.Sprintf("进程无法杀掉: %d, 错误信息: %s", cmd.Process.Pid, err))
}
return err, true
case err = <-done:
return err, false
}
}

200
jobs/job.go Normal file
View File

@@ -0,0 +1,200 @@
/*
* @Author: haodaquan
* @Date: 2017-06-21 12:56:08
* @Last Modified by: haodaquan
* @Last Modified time: 2017-06-21 13:05:57
*/
package jobs
import (
"PPGo_Job/mail"
"PPGo_Job/models"
"bytes"
"fmt"
"github.com/astaxie/beego"
"html/template"
"os/exec"
"runtime/debug"
"strings"
"time"
)
var mailTpl *template.Template
func init() {
mailTpl, _ = template.New("mail_tpl").Parse(`
你好 {{.username}}<br/>
<p>以下是任务执行结果:</p>
<p>
任务ID{{.task_id}}<br/>
任务名称:{{.task_name}}<br/>
执行时间:{{.start_time}}<br />
执行耗时:{{.process_time}}秒<br />
执行状态:{{.status}}
</p>
<p>-------------以下是任务执行输出-------------</p>
<p>{{.output}}</p>
<p>
--------------------------------------------<br />
本邮件由系统自动发出,请勿回复<br />
如果要取消邮件通知,请登录到系统进行设置<br />
</p>
`)
}
type Job struct {
id int // 任务ID
logId int64 // 日志记录ID
name string // 任务名称
task *models.Task // 任务对象
runFunc func(time.Duration) (string, string, error, bool) // 执行函数
status int // 任务状态大于0表示正在执行中
Concurrent bool // 同一个任务是否允许并行执行
}
func NewJobFromTask(task *models.Task) (*Job, error) {
if task.Id < 1 {
return nil, fmt.Errorf("ToJob: 缺少id")
}
job := NewCommandJob(task.Id, task.TaskName, task.Command)
job.task = task
job.Concurrent = task.Concurrent == 1
return job, nil
}
func NewCommandJob(id int, name string, command string) *Job {
job := &Job{
id: id,
name: name,
}
job.runFunc = func(timeout time.Duration) (string, string, error, bool) {
bufOut := new(bytes.Buffer)
bufErr := new(bytes.Buffer)
cmd := exec.Command("/bin/bash", "-c", command)
cmd.Stdout = bufOut
cmd.Stderr = bufErr
cmd.Start()
err, isTimeout := runCmdWithTimeout(cmd, timeout)
return bufOut.String(), bufErr.String(), err, isTimeout
}
return job
}
func (j *Job) Status() int {
return j.status
}
func (j *Job) GetName() string {
return j.name
}
func (j *Job) GetId() int {
return j.id
}
func (j *Job) GetLogId() int64 {
return j.logId
}
func (j *Job) Run() {
if !j.Concurrent && j.status > 0 {
beego.Warn(fmt.Sprintf("任务[%d]上一次执行尚未结束,本次被忽略。", j.id))
return
}
defer func() {
if err := recover(); err != nil {
beego.Error(err, "\n", string(debug.Stack()))
}
}()
if workPool != nil {
workPool <- true
defer func() {
<-workPool
}()
}
beego.Debug(fmt.Sprintf("开始执行任务: %d", j.id))
j.status++
defer func() {
j.status--
}()
t := time.Now()
timeout := time.Duration(time.Hour * 24)
if j.task.Timeout > 0 {
timeout = time.Second * time.Duration(j.task.Timeout)
}
cmdOut, cmdErr, err, isTimeout := j.runFunc(timeout)
ut := time.Now().Sub(t) / time.Millisecond
// 插入日志
log := new(models.TaskLog)
log.TaskId = j.id
log.Output = cmdOut
log.Error = cmdErr
log.ProcessTime = int(ut)
log.CreateTime = t.Unix()
if isTimeout {
log.Status = models.TASK_TIMEOUT
log.Error = fmt.Sprintf("任务执行超过 %d 秒\n----------------------\n%s\n", int(timeout/time.Second), cmdErr)
} else if err != nil {
log.Status = models.TASK_ERROR
log.Error = err.Error() + ":" + cmdErr
}
j.logId, _ = models.TaskLogAdd(log)
// 更新上次执行时间
j.task.PrevTime = t.Unix()
j.task.ExecuteTimes++
j.task.Update("PrevTime", "ExecuteTimes")
// 发送邮件通知
if (j.task.Notify == 1 && err != nil) || j.task.Notify == 2 {
user, uerr := models.UserGetById(j.task.UserId)
if uerr != nil {
return
}
var title string
data := make(map[string]interface{})
data["task_id"] = j.task.Id
data["username"] = user.UserName
data["task_name"] = j.task.TaskName
data["start_time"] = beego.Date(t, "Y-m-d H:i:s")
data["process_time"] = float64(ut) / 1000
data["output"] = cmdOut
if isTimeout {
title = fmt.Sprintf("任务执行结果通知 #%d: %s", j.task.Id, "超时")
data["status"] = fmt.Sprintf("超时(%d秒", int(timeout/time.Second))
} else if err != nil {
title = fmt.Sprintf("任务执行结果通知 #%d: %s", j.task.Id, "失败")
data["status"] = "失败(" + err.Error() + ""
} else {
title = fmt.Sprintf("任务执行结果通知 #%d: %s", j.task.Id, "成功")
data["status"] = "成功"
}
content := new(bytes.Buffer)
mailTpl.Execute(content, data)
ccList := make([]string, 0)
if j.task.NotifyEmail != "" {
ccList = strings.Split(j.task.NotifyEmail, "\n")
}
if !mail.SendMail(user.Email, user.UserName, title, content.String(), ccList) {
beego.Error("发送邮件超时:", user.Email)
}
}
}