Skip to content

Commit b8c5dda

Browse files
committed
add scheduled cron
1 parent 9f0b424 commit b8c5dda

File tree

4 files changed

+404
-0
lines changed

4 files changed

+404
-0
lines changed

pkg/gocron/README.md

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
## gocron
2+
3+
Scheduled task library encapsulated on [cron v3](github.com/robfig/cron).
4+
5+
<br>
6+
7+
### Example of use
8+
9+
```go
10+
package main
11+
12+
import (
13+
"fmt"
14+
"time"
15+
16+
"github.com/zhufuyi/sponge/pkg/gocron"
17+
)
18+
19+
var task1 = func() {
20+
fmt.Println("this is task1")
21+
fmt.Println("running task list:", gocron.GetRunningTasks())
22+
}
23+
24+
var taskOnce = func() {
25+
taskName := "taskOnce"
26+
fmt.Println("this is taskOnce")
27+
gocron.DeleteTask(taskName)
28+
}
29+
30+
func main() {
31+
err := gocron.Init()
32+
if err != nil {
33+
panic(err)
34+
}
35+
36+
gocron.Run([]*gocron.Task{
37+
{
38+
Name: "task1",
39+
TimeSpec: "@every 2s",
40+
Fn: task1,
41+
},
42+
{
43+
Name: "taskOnce",
44+
TimeSpec: "@every 5s",
45+
Fn: taskOnce,
46+
},
47+
}...)
48+
49+
time.Sleep(time.Minute)
50+
51+
// delete task1
52+
gocron.DeleteTask("task1")
53+
54+
// view running tasks
55+
fmt.Println("running task list:", gocron.GetRunningTasks())
56+
}
57+
```

pkg/gocron/cron.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package gocron
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"strings"
7+
"sync"
8+
9+
"github.com/robfig/cron/v3"
10+
)
11+
12+
var (
13+
c *cron.Cron
14+
// task name and id mapping, used to add, delete, modify and query tasks
15+
nameID = sync.Map{}
16+
// id and task name mapping, used in log printing
17+
idName = sync.Map{}
18+
)
19+
20+
// Task scheduled task
21+
type Task struct {
22+
// seconds (0-59) minutes (0- 59) hours (0-23) days (1-31) months (1-12) weeks (0-6)
23+
// "*/5 * * * * *" means every five seconds.
24+
// "0 15,45 9-12 * * * " indicates execution at the 15th and 45th minutes from 9 a.m. to 12 a.m. each day
25+
TimeSpec string
26+
27+
Name string // task name
28+
Fn func() // task
29+
}
30+
31+
// Init initialize and start timed tasks
32+
func Init(opts ...Option) error {
33+
o := defaultOptions()
34+
o.apply(opts...)
35+
36+
log := &zapLog{zapLog: o.zapLog}
37+
cronOpts := []cron.Option{
38+
cron.WithSeconds(), // second-level granularity, default is minute-level granularity
39+
cron.WithLogger(log),
40+
cron.WithChain(
41+
cron.Recover(log),
42+
),
43+
}
44+
45+
c = cron.New(cronOpts...)
46+
c.Start()
47+
48+
return nil
49+
}
50+
51+
// Run the tasks
52+
func Run(tasks ...*Task) error {
53+
if c == nil {
54+
return errors.New("cron is not initialized")
55+
}
56+
57+
var errs []string
58+
for _, task := range tasks {
59+
if IsRunningTask(task.Name) {
60+
errs = append(errs, fmt.Sprintf("task '%s' is already exists", task.Name))
61+
continue
62+
}
63+
64+
id, err := c.AddFunc(task.TimeSpec, task.Fn)
65+
if err != nil {
66+
errs = append(errs, fmt.Sprintf("run task '%s' error: %v", task.Name, err))
67+
continue
68+
}
69+
idName.Store(id, task.Name)
70+
nameID.Store(task.Name, id)
71+
}
72+
73+
if len(errs) > 0 {
74+
return errors.New(strings.Join(errs, " || "))
75+
}
76+
77+
return nil
78+
}
79+
80+
// IsRunningTask determine if the task is running
81+
func IsRunningTask(name string) bool {
82+
_, ok := nameID.Load(name)
83+
return ok
84+
}
85+
86+
// GetRunningTasks gets a list of running task names
87+
func GetRunningTasks() []string {
88+
var names []string
89+
nameID.Range(func(key, value interface{}) bool {
90+
names = append(names, key.(string))
91+
return true
92+
})
93+
return names
94+
}
95+
96+
// DeleteTask delete task
97+
func DeleteTask(name string) {
98+
if id, ok := nameID.Load(name); ok {
99+
entryID, isOk := id.(cron.EntryID)
100+
if !isOk {
101+
return
102+
}
103+
c.Remove(entryID)
104+
nameID.Delete(name)
105+
idName.Delete(entryID)
106+
}
107+
}
108+
109+
// Stop scheduled task
110+
func Stop() {
111+
if c != nil {
112+
c.Stop()
113+
}
114+
}
115+
116+
// EverySecond executed every size second (1~59)
117+
func EverySecond(size int) string {
118+
return fmt.Sprintf("@every %ds", size)
119+
}
120+
121+
// EveryMinute every size minute (1~59)
122+
func EveryMinute(size int) string {
123+
return fmt.Sprintf("@every %dm", size)
124+
}
125+
126+
// EveryHour every size hour (1~23)
127+
func EveryHour(size int) string {
128+
return fmt.Sprintf("@every %dh", size)
129+
}
130+
131+
// EveryDay every size day (1~31)
132+
func EveryDay(size int) string {
133+
return fmt.Sprintf("@every %dd", size)
134+
}

pkg/gocron/cron_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package gocron
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
"time"
7+
)
8+
9+
func TestInitAndRun(t *testing.T) {
10+
count := 0
11+
task1 := func() {
12+
fmt.Println("running task list:", GetRunningTasks())
13+
}
14+
task2 := func() {
15+
time.Sleep(time.Second)
16+
}
17+
task3 := func() {
18+
count++
19+
if count%3 == 0 {
20+
panic("trigger panic")
21+
}
22+
}
23+
24+
tasks := []*Task{
25+
{
26+
Name: "task1",
27+
TimeSpec: "@every 1s",
28+
Fn: task1,
29+
},
30+
{
31+
Name: "task2",
32+
TimeSpec: "@every 2s",
33+
Fn: task2,
34+
},
35+
{
36+
Name: "task3",
37+
TimeSpec: "@every 3s",
38+
Fn: task3,
39+
},
40+
}
41+
42+
err := Init()
43+
if err != nil {
44+
t.Fatal(err)
45+
}
46+
err = Run(tasks...)
47+
if err != nil {
48+
t.Fatal(err)
49+
}
50+
51+
time.Sleep(time.Second * 7)
52+
}
53+
54+
func TestRunOnce(t *testing.T) {
55+
myTask := func() {
56+
taskName := "myTask"
57+
fmt.Println("running task list:", GetRunningTasks())
58+
fmt.Printf("the task '%s' is executed only once\n", taskName)
59+
DeleteTask(taskName)
60+
fmt.Println("running task list:", GetRunningTasks())
61+
}
62+
63+
tasks := []*Task{
64+
{
65+
Name: "myTask",
66+
Fn: myTask,
67+
TimeSpec: "@every 2s",
68+
},
69+
}
70+
71+
err := Init()
72+
if err != nil {
73+
t.Fatal(err)
74+
}
75+
err = Run(tasks...)
76+
if err != nil {
77+
t.Fatal(err)
78+
}
79+
80+
time.Sleep(time.Second * 5)
81+
Stop()
82+
}
83+
84+
func TestEvery(t *testing.T) {
85+
task_1 := func() {
86+
fmt.Println("this is task_1")
87+
fmt.Println("running task list:", GetRunningTasks())
88+
}
89+
task_2 := func() {
90+
fmt.Println("this is task_2")
91+
}
92+
task_3 := func() {
93+
fmt.Println("this is task_3")
94+
}
95+
96+
tasks := []*Task{
97+
{
98+
TimeSpec: EverySecond(5),
99+
Name: "task_1",
100+
Fn: task_1,
101+
},
102+
{
103+
TimeSpec: EveryMinute(1),
104+
Name: "task_2",
105+
Fn: task_2,
106+
},
107+
{
108+
TimeSpec: EveryHour(1),
109+
Name: "task_3",
110+
Fn: task_3,
111+
},
112+
}
113+
114+
err := Init()
115+
if err != nil {
116+
t.Fatal(err)
117+
}
118+
err = Run(tasks...)
119+
if err != nil {
120+
t.Fatal(err)
121+
}
122+
123+
time.Sleep(time.Second * 7)
124+
}

0 commit comments

Comments
 (0)