|
1 | 1 | package syncs |
2 | 2 |
|
3 | 3 | import ( |
| 4 | + "context" |
4 | 5 | "errors" |
5 | 6 | "fmt" |
6 | 7 | "log" |
@@ -123,34 +124,35 @@ func TestErrorSizedGroup_Term(t *testing.T) { |
123 | 124 | assert.True(t, c < uint32(1000), fmt.Sprintf("%d, some of routines has to be terminated early", c)) |
124 | 125 | } |
125 | 126 |
|
126 | | -func TestErrorSizedGroup_WaitWithoutGo(t *testing.T) { |
127 | | - ewg := NewErrSizedGroup(10) |
128 | | - assert.NoError(t, ewg.Wait()) |
129 | | -} |
130 | | - |
131 | | -// illustrates the use of a SizedGroup for concurrent, limited execution of goroutines. |
132 | | -func ExampleErrSizedGroup_go() { |
| 127 | +func TestErrorSizedGroup_TermOnErr(t *testing.T) { |
| 128 | + ewg := NewErrSizedGroup(10, TermOnErr) |
| 129 | + var c uint32 |
133 | 130 |
|
134 | | - // create sized waiting group allowing maximum 10 goroutines |
135 | | - grp := NewErrSizedGroup(10) |
| 131 | + const N = 1000 |
| 132 | + const errIndex = 100 // index of a function that will return an error |
136 | 133 |
|
137 | | - var c uint32 |
138 | | - for i := 0; i < 1000; i++ { |
139 | | - // Go call is non-blocking, like regular go statement |
140 | | - grp.Go(func() error { |
141 | | - // do some work in 10 goroutines in parallel |
142 | | - atomic.AddUint32(&c, 1) |
143 | | - time.Sleep(10 * time.Millisecond) |
| 134 | + for i := 0; i < N; i++ { |
| 135 | + i := i |
| 136 | + ewg.Go(func() error { |
| 137 | + val := atomic.AddUint32(&c, 1) |
| 138 | + if i == errIndex || val > uint32(errIndex+1) { |
| 139 | + return fmt.Errorf("err from function %d", i) |
| 140 | + } |
144 | 141 | return nil |
145 | 142 | }) |
146 | 143 | } |
147 | | - // Note: grp.Go acts like go command - never blocks. This code will be executed right away |
148 | | - log.Print("all 1000 jobs submitted") |
149 | 144 |
|
150 | | - // wait for completion |
151 | | - if err := grp.Wait(); err != nil { |
152 | | - panic(err) |
153 | | - } |
| 145 | + err := ewg.Wait() |
| 146 | + |
| 147 | + require.NotNil(t, err) |
| 148 | + require.Contains(t, err.Error(), "err from function 100") |
| 149 | + // we don't know how many routines will be executed before the error, but it should be less than 10 |
| 150 | + require.LessOrEqual(t, c, uint32(errIndex+100), fmt.Sprintf("%d, routines have to be terminated early", c)) |
| 151 | +} |
| 152 | + |
| 153 | +func TestErrorSizedGroup_WaitWithoutGo(t *testing.T) { |
| 154 | + ewg := NewErrSizedGroup(10) |
| 155 | + assert.NoError(t, ewg.Wait()) |
154 | 156 | } |
155 | 157 |
|
156 | 158 | func TestErrorSizedGroup_TermAndPreemptive(t *testing.T) { |
@@ -181,7 +183,127 @@ func TestErrorSizedGroup_TermAndPreemptive(t *testing.T) { |
181 | 183 |
|
182 | 184 | select { |
183 | 185 | case <-time.After(5 * time.Second): |
184 | | - t.Fatal("timeout deadlock may happy") |
| 186 | + t.Fatal("timeout deadlock may happen") |
185 | 187 | case <-done: |
186 | 188 | } |
187 | 189 | } |
| 190 | + |
| 191 | +func TestErrorSizedGroup_ConcurrencyLimit(t *testing.T) { |
| 192 | + concurrentGoroutines := int32(0) |
| 193 | + maxConcurrentGoroutines := int32(0) |
| 194 | + ewg := NewErrSizedGroup(5) // Limit of concurrent goroutines set to 5 |
| 195 | + |
| 196 | + for i := 0; i < 100; i++ { |
| 197 | + ewg.Go(func() error { |
| 198 | + atomic.AddInt32(&concurrentGoroutines, 1) |
| 199 | + defer atomic.AddInt32(&concurrentGoroutines, -1) |
| 200 | + |
| 201 | + if v := atomic.LoadInt32(&concurrentGoroutines); v > atomic.LoadInt32(&maxConcurrentGoroutines) { |
| 202 | + atomic.StoreInt32(&maxConcurrentGoroutines, v) |
| 203 | + } |
| 204 | + |
| 205 | + time.Sleep(time.Millisecond * 50) |
| 206 | + return nil |
| 207 | + }) |
| 208 | + } |
| 209 | + |
| 210 | + err := ewg.Wait() |
| 211 | + assert.Nil(t, err) |
| 212 | + assert.Equal(t, int32(5), maxConcurrentGoroutines) |
| 213 | +} |
| 214 | + |
| 215 | +func TestErrorSizedGroup_MultiError(t *testing.T) { |
| 216 | + ewg := NewErrSizedGroup(10) |
| 217 | + |
| 218 | + for i := 0; i < 10; i++ { |
| 219 | + i := i |
| 220 | + ewg.Go(func() error { |
| 221 | + return fmt.Errorf("error from goroutine %d", i) |
| 222 | + }) |
| 223 | + } |
| 224 | + |
| 225 | + err := ewg.Wait() |
| 226 | + assert.NotNil(t, err) |
| 227 | + |
| 228 | + for i := 0; i < 10; i++ { |
| 229 | + assert.Contains(t, err.Error(), fmt.Sprintf("error from goroutine %d", i)) |
| 230 | + } |
| 231 | +} |
| 232 | + |
| 233 | +func TestErrorSizedGroup_Cancel(t *testing.T) { |
| 234 | + ctx, cancel := context.WithCancel(context.Background()) |
| 235 | + ewg := NewErrSizedGroup(10, Context(ctx)) |
| 236 | + |
| 237 | + var c uint32 |
| 238 | + const N = 1000 |
| 239 | + |
| 240 | + for i := 0; i < N; i++ { |
| 241 | + i := i |
| 242 | + time.Sleep(1 * time.Millisecond) // prevent all the goroutines to be started at once |
| 243 | + ewg.Go(func() error { |
| 244 | + atomic.AddUint32(&c, 1) |
| 245 | + if i == 100 { |
| 246 | + cancel() |
| 247 | + } |
| 248 | + time.Sleep(1 * time.Millisecond) // simulate some work |
| 249 | + return nil |
| 250 | + }) |
| 251 | + } |
| 252 | + |
| 253 | + err := ewg.Wait() |
| 254 | + require.EqualError(t, err, "1 error(s) occurred: [0] {context canceled}") |
| 255 | + assert.ErrorIs(t, ctx.Err(), context.Canceled, ctx.Err()) |
| 256 | + t.Logf("completed: %d", c) |
| 257 | + require.LessOrEqual(t, c, uint32(110), "some of goroutines has to be terminated early") |
| 258 | +} |
| 259 | + |
| 260 | +func TestErrorSizedGroup_CancelWithPreemptive(t *testing.T) { |
| 261 | + ctx, cancel := context.WithCancel(context.Background()) |
| 262 | + ewg := NewErrSizedGroup(10, Context(ctx), Preemptive) |
| 263 | + |
| 264 | + var c uint32 |
| 265 | + const N = 1000 |
| 266 | + |
| 267 | + for i := 0; i < N; i++ { |
| 268 | + i := i |
| 269 | + ewg.Go(func() error { |
| 270 | + atomic.AddUint32(&c, 1) |
| 271 | + if i == 100 { |
| 272 | + cancel() |
| 273 | + } |
| 274 | + time.Sleep(1 * time.Millisecond) // simulate some work |
| 275 | + return nil |
| 276 | + }) |
| 277 | + } |
| 278 | + |
| 279 | + err := ewg.Wait() |
| 280 | + require.EqualError(t, err, "1 error(s) occurred: [0] {context canceled}") |
| 281 | + assert.ErrorIs(t, ctx.Err(), context.Canceled, ctx.Err()) |
| 282 | + t.Logf("completed: %d", c) |
| 283 | + require.LessOrEqual(t, c, uint32(110), "some of goroutines has to be terminated early") |
| 284 | +} |
| 285 | + |
| 286 | +// illustrates the use of a SizedGroup for concurrent, limited execution of goroutines. |
| 287 | +func ExampleErrSizedGroup_go() { |
| 288 | + |
| 289 | + // create sized waiting group allowing maximum 10 goroutines |
| 290 | + grp := NewErrSizedGroup(10) |
| 291 | + |
| 292 | + var c uint32 |
| 293 | + for i := 0; i < 1000; i++ { |
| 294 | + // Go call is non-blocking, like regular go statement |
| 295 | + grp.Go(func() error { |
| 296 | + // do some work in 10 goroutines in parallel |
| 297 | + atomic.AddUint32(&c, 1) |
| 298 | + time.Sleep(10 * time.Millisecond) |
| 299 | + return nil |
| 300 | + }) |
| 301 | + } |
| 302 | + // Note: grp.Go acts like go command - never blocks. This code will be executed right away |
| 303 | + log.Print("all 1000 jobs submitted") |
| 304 | + |
| 305 | + // wait for completion |
| 306 | + if err := grp.Wait(); err != nil { |
| 307 | + panic(err) |
| 308 | + } |
| 309 | +} |
0 commit comments