-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexecute.go
148 lines (137 loc) · 3.52 KB
/
execute.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/*
Copyright © 2023 Jonathan Gotti <jgotti at jgotti dot org>
SPDX-FileType: SOURCE
SPDX-License-Identifier: MIT
SPDX-FileCopyrightText: 2023 Jonathan Gotti <jgotti@jgotti.org>
*/
package jobExecutor
import (
"runtime"
"sync"
"time"
)
// never close this channel, it's only purpose is to limit concurrency.
// closing it will make next call to execute cause a panic
var limiterChan chan struct{}
// set the default number of concurrent jobs to run default to GOMAXPROCS
func SetMaxConcurrentJobs(n int) {
if n < 1 {
n = runtime.GOMAXPROCS(0)
}
limiterChan = make(chan struct{}, n)
}
func init() {
SetMaxConcurrentJobs(runtime.GOMAXPROCS(0))
}
type executeOptions struct {
onJobsStart func(jobs JobList)
onJobStart func(jobs JobList, jobIndex int)
onJobDone func(jobs JobList, jobIndex int)
onJobsDone func(jobs JobList)
}
// effectively launch the child process, call on jobDone
// you should prepare child process before by calling either
// PrepareCmds, PrepareFns
// returns the number of errors encountered
// @todo add cancelation support
func execute(jobs JobList, opts executeOptions) {
if opts.onJobsStart != nil {
opts.onJobsStart(jobs)
}
var wg sync.WaitGroup
wg.Add(len(jobs))
for i, child := range jobs {
limiterChan <- struct{}{}
jobIndex := i
job := child
job.mutex.Lock()
job.StartTime = time.Now()
job.status = JobStateRunning
job.mutex.Unlock()
if opts.onJobStart != nil {
opts.onJobStart(jobs, jobIndex)
}
go job.run(func() {
defer func() { <-limiterChan }()
defer wg.Done()
if opts.onJobDone != nil {
opts.onJobDone(jobs, jobIndex)
}
})
}
wg.Wait()
// close(limiterChan) <-- we don't close the chan we will use it for further call
if opts.onJobsDone != nil {
opts.onJobsDone(jobs)
}
}
// cyclic dependency check MUST be done before calling this function if not it may wait forever
func dagExecute(jobs JobList, opts executeOptions) error {
if opts.onJobsStart != nil {
opts.onJobsStart(jobs)
}
length := len(jobs)
// create a list of edges
adjacencyList := make(map[int][]int, length)
// count dependent
dependentCount := make(map[int]int, length)
for _, job := range jobs {
for _, to := range job.DependsOn {
adjacencyList[to.id] = append(adjacencyList[to.id], job.id)
dependentCount[job.id]++
}
}
// init a queue with starter jobs
var jobQueue []int
for id := range jobs {
if dependentCount[id] == 0 {
jobQueue = append(jobQueue, id)
}
}
var wg sync.WaitGroup
wg.Add(length)
doneChan := make(chan int)
defer func() { close(doneChan) }()
doneJob := 0
for doneJob < len(jobs) { // until all jobs are done
for len(jobQueue) > 0 { // while the queue is not empty
job := jobs[jobQueue[0]] // unqueue job
jobQueue = jobQueue[1:]
limiterChan <- struct{}{} // Wait if we are over the concurrency limit
// run job
job.mutex.Lock()
job.StartTime = time.Now()
job.status = JobStateRunning
job.mutex.Unlock()
if opts.onJobStart != nil {
opts.onJobStart(jobs, job.id)
}
go job.run(func() {
defer func() {
<-limiterChan
doneChan <- job.id
}()
defer wg.Done()
if opts.onJobDone != nil {
opts.onJobDone(jobs, job.id)
}
})
}
for doneId := range doneChan {
doneJob++
for _, to := range adjacencyList[doneId] {
dependentCount[to]--
if dependentCount[to] == 0 {
jobQueue = append(jobQueue, to)
}
}
break
}
}
wg.Wait()
// close(limiterChan) <-- we don't close the chan we will use it for further call
if opts.onJobsDone != nil {
opts.onJobsDone(jobs)
}
return nil
}