@@ -3,6 +3,7 @@ package stream
3
3
import (
4
4
"context"
5
5
"fmt"
6
+ "sync"
6
7
"testing"
7
8
"time"
8
9
@@ -406,7 +407,6 @@ func FuzzTick(f *testing.F) {
406
407
}
407
408
408
409
func FuzzScaler (f * testing.F ) {
409
- // Define InterceptFunc
410
410
interceptFunc := func (ctx context.Context , t int ) (string , bool ) {
411
411
return fmt .Sprintf ("%d" , t ), true
412
412
}
@@ -464,3 +464,129 @@ func FuzzScaler(f *testing.F) {
464
464
}
465
465
})
466
466
}
467
+
468
+ func Test_Scaler_Max (t * testing.T ) {
469
+ tests := map [string ]struct {
470
+ max uint
471
+ send int
472
+ expected int
473
+ }{
474
+ "max 0" : {
475
+ max : 0 ,
476
+ send : 1000 ,
477
+ expected : 1000 ,
478
+ },
479
+ "max 1" : {
480
+ max : 1 ,
481
+ send : 10 ,
482
+ expected : 10 ,
483
+ },
484
+ "max 2" : {
485
+ max : 2 ,
486
+ send : 10 ,
487
+ expected : 10 ,
488
+ },
489
+ "max 3" : {
490
+ max : 3 ,
491
+ send : 10 ,
492
+ expected : 10 ,
493
+ },
494
+ "max 4" : {
495
+ max : 4 ,
496
+ send : 100 ,
497
+ expected : 100 ,
498
+ },
499
+ "max 1000" : {
500
+ max : 1000 ,
501
+ send : 10000 ,
502
+ expected : 10000 ,
503
+ },
504
+ }
505
+
506
+ for name , test := range tests {
507
+ t .Run (name , func (t * testing.T ) {
508
+ ctx , cancel := context .WithCancel (context .Background ())
509
+ defer cancel ()
510
+
511
+ inited := 0
512
+ initedMu := sync.Mutex {}
513
+ release := make (chan struct {})
514
+
515
+ interceptFunc := func (ctx context.Context , t int ) (int , bool ) {
516
+ defer func () {
517
+ initedMu .Lock ()
518
+ defer initedMu .Unlock ()
519
+ inited --
520
+ }()
521
+
522
+ initedMu .Lock ()
523
+ inited ++
524
+ initedMu .Unlock ()
525
+
526
+ <- release
527
+
528
+ return t , true
529
+ }
530
+
531
+ // Initialize Scaler
532
+ scaler := Scaler [int , int ]{
533
+ Wait : time .Millisecond ,
534
+ Life : time .Millisecond ,
535
+ Fn : interceptFunc ,
536
+ Max : test .max ,
537
+ }
538
+
539
+ // Create a simple input channel
540
+ input := make (chan int , test .send )
541
+ defer close (input )
542
+
543
+ for i := 0 ; i < test .send ; i ++ {
544
+ input <- i
545
+ }
546
+
547
+ // Execute the Scaler
548
+ out , err := scaler .Exec (ctx , input )
549
+ if err != nil {
550
+ t .Errorf ("Scaler Exec failed: %v" , err )
551
+ t .Fail ()
552
+ }
553
+
554
+ recv := 1
555
+
556
+ tloop:
557
+ for {
558
+ select {
559
+ case <- ctx .Done ():
560
+ t .Errorf ("Scaler Exec timed out" )
561
+ case _ , ok := <- out :
562
+ if ! ok {
563
+ break tloop
564
+ }
565
+
566
+ recv ++
567
+ t .Logf ("received %d" , recv )
568
+ if recv >= test .expected {
569
+ break tloop
570
+ }
571
+ default :
572
+ time .Sleep (time .Millisecond )
573
+
574
+ initedMu .Lock ()
575
+ if test .max > 0 && inited > int (test .max ) {
576
+ t .Errorf ("Scaler Exec failed: expected %d, got %d" , test .max , inited )
577
+ t .Fail ()
578
+ }
579
+ initedMu .Unlock ()
580
+
581
+ // Release one goroutine
582
+ release <- struct {}{}
583
+ }
584
+ }
585
+
586
+ if recv != test .expected {
587
+ t .Errorf ("Scaler Exec failed: expected %d, got %d" , test .expected , recv )
588
+ t .Fail ()
589
+ }
590
+ })
591
+ }
592
+ }
0 commit comments