@@ -7,6 +7,15 @@ import (
7
7
"time"
8
8
)
9
9
10
+ // MinWait is the absolute minimum wait time for the ticker. This is used to
11
+ // prevent the ticker from firing too often and causing too small of a wait
12
+ // time.
13
+ const MinWait = time .Millisecond
14
+
15
+ // MinLife is the minimum life time for the scaler. This is used to prevent
16
+ // the scaler from exiting too quickly, and causing too small of a lifetime.
17
+ const MinLife = time .Millisecond
18
+
10
19
// Scaler implements generic auto-scaling logic which starts with a net-zero
11
20
// set of processing routines (with the exception of the channel listener) and
12
21
// then scales up and down based on the CPU contention of a system and the speed
@@ -42,6 +51,10 @@ type Scaler[T, U any] struct {
42
51
// that are CPU bound and need to scale up more/less quickly.
43
52
WaitModifier DurationScaler
44
53
54
+ // Max is the maximum number of layer2 routines that will be spawned.
55
+ // If Max is set to 0, then there is no limit.
56
+ Max uint
57
+
45
58
wScale * DurationScaler
46
59
}
47
60
@@ -51,7 +64,7 @@ var ErrFnRequired = fmt.Errorf("nil InterceptFunc, Fn is required")
51
64
// returns the output channel where the resulting data from the Fn function
52
65
// will be sent.
53
66
//
54
- //nolint:funlen // This really can't be broken up any further
67
+ //nolint:funlen,gocognit // This really can't be broken up any further
55
68
func (s Scaler [T , U ]) Exec (ctx context.Context , in <- chan T ) (<- chan U , error ) {
56
69
ctx = _ctx (ctx )
57
70
@@ -72,13 +85,13 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
72
85
// because the caller did not specify a wait time. This means Scaler will
73
86
// likely always scale up rather than waiting for an existing layer2 routine
74
87
// to pick up data.
75
- if s .Wait <= 0 {
76
- s .Wait = time . Nanosecond
88
+ if s .Wait <= MinWait {
89
+ s .Wait = MinWait
77
90
}
78
91
79
92
// Minimum life of a spawned layer2 should be 1ms
80
- if s .Life < time . Microsecond {
81
- s .Life = time . Microsecond
93
+ if s .Life < MinLife {
94
+ s .Life = MinLife
82
95
}
83
96
84
97
go func () {
@@ -99,7 +112,16 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
99
112
ticker := time .NewTicker (s .Wait )
100
113
defer ticker .Stop ()
101
114
step := 0
102
- var stepMu sync.RWMutex
115
+ stepMu := sync.RWMutex {}
116
+
117
+ var max chan struct {}
118
+
119
+ if s .Max > 0 {
120
+ max = make (chan struct {}, s .Max )
121
+ for i := uint (0 ); i < s .Max ; i ++ {
122
+ max <- struct {}{}
123
+ }
124
+ }
103
125
104
126
scaleLoop:
105
127
for {
@@ -117,6 +139,17 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
117
139
case <- ctx .Done ():
118
140
return
119
141
case <- ticker .C :
142
+ if max != nil {
143
+ select {
144
+ case <- ctx .Done ():
145
+ return
146
+ case <- max : // start a new layer2 routine
147
+ default :
148
+ // wait for a layer2 routine to finish
149
+ continue l2loop
150
+ }
151
+ }
152
+
120
153
wgMu .Lock ()
121
154
wg .Add (1 )
122
155
wgMu .Unlock ()
@@ -129,6 +162,16 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
129
162
130
163
go func () {
131
164
defer wg .Done ()
165
+
166
+ if s .Max > 0 {
167
+ defer func () {
168
+ select {
169
+ case <- ctx .Done ():
170
+ case max <- struct {}{}:
171
+ }
172
+ }()
173
+ }
174
+
132
175
if ! s .WaitModifier .inactive () {
133
176
defer func () {
134
177
stepMu .Lock ()
@@ -144,11 +187,16 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
144
187
}
145
188
}
146
189
147
- stepMu .RLock ()
190
+ stepN := 0
191
+ if ! s .WaitModifier .inactive () {
192
+ stepMu .RLock ()
193
+ stepN = step
194
+ stepMu .RUnlock ()
195
+ }
196
+
148
197
// Reset the ticker so that it does not immediately trip the
149
198
// case statement on loop.
150
- ticker .Reset (s .wScale .scaledDuration (s .Wait , step ))
151
- stepMu .RUnlock ()
199
+ ticker .Reset (s .wScale .scaledDuration (s .Wait , stepN ))
152
200
}
153
201
}
154
202
}()
@@ -261,6 +309,10 @@ func (t *DurationScaler) scaledDuration(
261
309
dur time.Duration ,
262
310
currentInterval int ,
263
311
) time.Duration {
312
+ if dur < MinWait {
313
+ dur = MinWait
314
+ }
315
+
264
316
if t .inactive () {
265
317
return dur
266
318
}
@@ -272,7 +324,12 @@ func (t *DurationScaler) scaledDuration(
272
324
273
325
if currentInterval % t .Interval == 0 {
274
326
t .lastInterval = currentInterval
275
- return dur + time .Duration (float64 (t .originalDuration )* mod )
327
+ out := dur + time .Duration (float64 (t .originalDuration )* mod )
328
+ if out < MinWait {
329
+ return MinWait
330
+ }
331
+
332
+ return out
276
333
}
277
334
278
335
return dur
0 commit comments