14
14
use PHPUnit \Framework \TestCase ;
15
15
use Psr \EventDispatcher \EventDispatcherInterface ;
16
16
use Psr \Log \LoggerInterface ;
17
+ use Symfony \Bridge \PhpUnit \ClockMock ;
17
18
use Symfony \Component \Clock \MockClock ;
18
19
use Symfony \Component \EventDispatcher \EventDispatcher ;
19
20
use Symfony \Component \HttpKernel \DependencyInjection \ServicesResetter ;
47
48
use Symfony \Component \Messenger \Transport \Receiver \ReceiverInterface ;
48
49
use Symfony \Component \Messenger \Worker ;
49
50
use Symfony \Component \RateLimiter \RateLimiterFactory ;
51
+ use Symfony \Component \RateLimiter \Reservation ;
50
52
use Symfony \Component \RateLimiter \Storage \InMemoryStorage ;
51
- use Symfony \Contracts \Service \ResetInterface ;
52
53
53
54
/**
54
55
* @group time-sensitive
@@ -73,7 +74,7 @@ public function testWorkerDispatchTheReceivedMessage()
73
74
return $ envelopes [] = $ envelope ;
74
75
});
75
76
76
- $ dispatcher = new class () implements EventDispatcherInterface {
77
+ $ dispatcher = new class implements EventDispatcherInterface {
77
78
private StopWorkerOnMessageLimitListener $ listener ;
78
79
79
80
public function __construct ()
@@ -403,7 +404,7 @@ public function testWorkerLimitQueuesUnsupported()
403
404
404
405
$ worker = new Worker (['transport1 ' => $ receiver1 , 'transport2 ' => $ receiver2 ], $ bus , clock: new MockClock ());
405
406
$ this ->expectException (RuntimeException::class);
406
- $ this ->expectExceptionMessage (sprintf ('Receiver for "transport2" does not implement "%s". ' , QueueReceiverInterface::class));
407
+ $ this ->expectExceptionMessage (\ sprintf ('Receiver for "transport2" does not implement "%s". ' , QueueReceiverInterface::class));
407
408
$ worker ->run (['queues ' => ['foo ' ]]);
408
409
}
409
410
@@ -418,7 +419,7 @@ public function testWorkerMessageReceivedEventMutability()
418
419
$ eventDispatcher = new EventDispatcher ();
419
420
$ eventDispatcher ->addSubscriber (new StopWorkerOnMessageLimitListener (1 ));
420
421
421
- $ stamp = new class () implements StampInterface {
422
+ $ stamp = new class implements StampInterface {
422
423
};
423
424
$ listener = function (WorkerMessageReceivedEvent $ event ) use ($ stamp ) {
424
425
$ event ->addStamps ($ stamp );
@@ -438,21 +439,21 @@ public function testWorkerRateLimitMessages()
438
439
$ envelope = [
439
440
new Envelope (new DummyMessage ('message1 ' )),
440
441
new Envelope (new DummyMessage ('message2 ' )),
442
+ new Envelope (new DummyMessage ('message3 ' )),
443
+ new Envelope (new DummyMessage ('message4 ' )),
441
444
];
442
445
$ receiver = new DummyReceiver ([$ envelope ]);
443
446
444
447
$ bus = $ this ->createMock (MessageBusInterface::class);
445
448
$ bus ->method ('dispatch ' )->willReturnArgument (0 );
446
449
447
450
$ eventDispatcher = new EventDispatcher ();
448
- $ eventDispatcher ->addSubscriber (new StopWorkerOnMessageLimitListener (2 ));
451
+ $ eventDispatcher ->addSubscriber (new StopWorkerOnMessageLimitListener (4 ));
449
452
450
453
$ rateLimitCount = 0 ;
451
- $ listener = function (WorkerRateLimitedEvent $ event ) use (&$ rateLimitCount ) {
454
+ $ eventDispatcher -> addListener (WorkerRateLimitedEvent::class, static function () use (&$ rateLimitCount ) {
452
455
++$ rateLimitCount ;
453
- $ event ->getLimiter ()->reset (); // Reset limiter to continue test
454
- };
455
- $ eventDispatcher ->addListener (WorkerRateLimitedEvent::class, $ listener );
456
+ });
456
457
457
458
$ rateLimitFactory = new RateLimiterFactory ([
458
459
'id ' => 'bus ' ,
@@ -461,11 +462,14 @@ public function testWorkerRateLimitMessages()
461
462
'interval ' => '1 minute ' ,
462
463
], new InMemoryStorage ());
463
464
465
+ ClockMock::register (Reservation::class);
466
+ ClockMock::register (InMemoryStorage::class);
467
+
464
468
$ worker = new Worker (['bus ' => $ receiver ], $ bus , $ eventDispatcher , null , ['bus ' => $ rateLimitFactory ], new MockClock ());
465
469
$ worker ->run ();
466
470
467
- $ this ->assertCount ( 2 , $ receiver ->getAcknowledgedEnvelopes ());
468
- $ this ->assertEquals ( 1 , $ rateLimitCount );
471
+ $ this ->assertSame ( 4 , $ receiver ->getAcknowledgeCount ());
472
+ $ this ->assertSame ( 3 , $ rateLimitCount );
469
473
}
470
474
471
475
public function testWorkerShouldLogOnStop ()
0 commit comments