1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
package jobs
import (
"fmt"
log "github.com/go-admin-team/go-admin-core/logger"
"github.com/go-admin-team/go-admin-core/sdk"
models2 "go-admin/app/jobs/models"
"gorm.io/gorm"
"sync"
"time"
"github.com/robfig/cron/v3"
"github.com/go-admin-team/go-admin-core/sdk/pkg"
"github.com/go-admin-team/go-admin-core/sdk/pkg/cronjob"
)
var timeFormat = "2006-01-02 15:04:05"
var retryCount = 3
var jobList map[string]JobsExec
var lock sync.Mutex
type JobCore struct {
InvokeTarget string
Name string
JobId int
EntryId int
CronExpression string
Args string
}
// 任务类型 http
type HttpJob struct {
JobCore
}
type ExecJob struct {
JobCore
}
func (e *ExecJob) Run() {
startTime := time.Now()
var obj = jobList[e.InvokeTarget]
if obj == nil {
log.Warn("[Job] ExecJob Run job nil")
return
}
err := CallExec(obj.(JobsExec), e.Args)
if err != nil {
// 如果失败暂停一段时间重试
fmt.Println(time.Now().Format(timeFormat), " [ERROR] mission failed! ", err)
}
// 结束时间
endTime := time.Now()
// 执行时间
latencyTime := endTime.Sub(startTime)
//TODO: 待完善部分
//str := time.Now().Format(timeFormat) + " [INFO] JobCore " + string(e.EntryId) + "exec success , spend :" + latencyTime.String()
//ws.SendAll(str)
log.Info("[Job] JobCore %s exec success , spend :%v", e.Name, latencyTime)
return
}
//http 任务接口
func (h *HttpJob) Run() {
startTime := time.Now()
var count = 0
var err error
var str string
/* 循环 */
LOOP:
if count < retryCount {
/* 跳过迭代 */
str, err = pkg.Get(h.InvokeTarget)
if err != nil {
// 如果失败暂停一段时间重试
fmt.Println(time.Now().Format(timeFormat), " [ERROR] mission failed! ", err)
fmt.Printf(time.Now().Format(timeFormat)+" [INFO] Retry after the task fails %d seconds! %s \n", (count+1)*5, str)
time.Sleep(time.Duration(count+1) * 5 * time.Second)
count = count + 1
goto LOOP
}
}
// 结束时间
endTime := time.Now()
// 执行时间
latencyTime := endTime.Sub(startTime)
//TODO: 待完善部分
log.Infof("[Job] JobCore %s exec success , spend :%v", h.Name, latencyTime)
return
}
// 初始化
func Setup(dbs map[string]*gorm.DB) {
fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore Starting...")
for k, db := range dbs {
sdk.Runtime.SetCrontab(k, cronjob.NewWithSeconds())
setup(k, db)
}
}
func setup(key string, db *gorm.DB) {
crontab := sdk.Runtime.GetCrontabKey(key)
sysJob := models2.SysJob{}
jobList := make([]models2.SysJob, 0)
err := sysJob.GetList(db, &jobList)
if err != nil {
fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore init error", err)
}
if len(jobList) == 0 {
fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore total:0")
}
_, err = sysJob.RemoveAllEntryID(db)
if err != nil {
fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore remove entry_id error", err)
}
for i := 0; i < len(jobList); i++ {
if jobList[i].JobType == 1 {
j := &HttpJob{}
j.InvokeTarget = jobList[i].InvokeTarget
j.CronExpression = jobList[i].CronExpression
j.JobId = jobList[i].JobId
j.Name = jobList[i].JobName
sysJob.EntryId, err = AddJob(crontab, j)
} else if jobList[i].JobType == 2 {
j := &ExecJob{}
j.InvokeTarget = jobList[i].InvokeTarget
j.CronExpression = jobList[i].CronExpression
j.JobId = jobList[i].JobId
j.Name = jobList[i].JobName
j.Args = jobList[i].Args
sysJob.EntryId, err = AddJob(crontab, j)
}
err = sysJob.Update(db, jobList[i].JobId)
}
// 其中任务
crontab.Start()
fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore start success.")
// 关闭任务
defer crontab.Stop()
select {}
}
// 添加任务 AddJob(invokeTarget string, jobId int, jobName string, cronExpression string)
func AddJob(c *cron.Cron, job Job) (int, error) {
if job == nil {
fmt.Println("unknown")
return 0, nil
}
return job.addJob(c)
}
func (h *HttpJob) addJob(c *cron.Cron) (int, error) {
id, err := c.AddJob(h.CronExpression, h)
if err != nil {
fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore AddJob error", err)
return 0, err
}
EntryId := int(id)
return EntryId, nil
}
func (h *ExecJob) addJob(c *cron.Cron) (int, error) {
id, err := c.AddJob(h.CronExpression, h)
if err != nil {
fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore AddJob error", err)
return 0, err
}
EntryId := int(id)
return EntryId, nil
}
// 移除任务
func Remove(c *cron.Cron, entryID int) chan bool {
ch := make(chan bool)
go func() {
c.Remove(cron.EntryID(entryID))
fmt.Println(time.Now().Format(timeFormat), " [INFO] JobCore Remove success ,info entryID :", entryID)
ch <- true
}()
return ch
}
// 任务停止
//func Stop() chan bool {
// ch := make(chan bool)
// go func() {
// global.GADMCron.Stop()
// ch <- true
// }()
// return ch
//}