Skip to content

Commit 91acfa8

Browse files
committed
bug symfony#58763 [Messenger][RateLimiter] fix additional message handled when using a rate limiter (Jean-Beru)
This PR was merged into the 6.4 branch. Discussion ---------- [Messenger][RateLimiter] fix additional message handled when using a rate limiter | Q | A | ------------- | --- | Branch? | 6.4 | Bug fix? | yes | New feature? | no | Deprecations? | no | Issues | Fix symfony#57230 | License | MIT Fix additional message handled by Messenger when using a rate limiter. A token was reserved but not consumed. See symfony#57230 With the following configuration: ```yaml framework: rate_limiter: test: policy: 'fixed_window' limit: 1 interval: '10 seconds' messenger: transports: test: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' rate_limiter: 'test' routing: 'App\Messenger\DoSomething': 'test' ``` Log generated by the MessageHandler: ```bash $ bin/console messenger:consume test [OK] Consuming messages from transport "test". // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command. // Quit the worker with CONTROL-C. // Re-run the command with a -vv option to see logs about consumed messages. 09:13:48 WARNING [app] Message handled 09:13:58 WARNING [app] Message handled 09:13:58 WARNING [app] Message handled # Duplicated 09:14:08 WARNING [app] Message handled 09:14:08 WARNING [app] Message handled # Duplicated 09:14:18 WARNING [app] Message handled 09:14:18 WARNING [app] Message handled # Duplicated ``` After fix: ```bash bin/console messenger:consume test [OK] Consuming messages from transport "test". // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command. // Quit the worker with CONTROL-C. // Re-run the command with a -vv option to see logs about consumed messages. 09:18:54 WARNING [app] Message handled 09:19:04 WARNING [app] Message handled 09:19:14 WARNING [app] Message handled 09:19:24 WARNING [app] Message handled ``` Commits ------- ec1b999 [Messenger][RateLimiter] fix additional message handled when using a rate limiter
2 parents 1021ae2 + ec1b999 commit 91acfa8

File tree

2 files changed

+17
-12
lines changed

2 files changed

+17
-12
lines changed

src/Symfony/Component/Messenger/Tests/WorkerTest.php

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use PHPUnit\Framework\TestCase;
1515
use Psr\EventDispatcher\EventDispatcherInterface;
1616
use Psr\Log\LoggerInterface;
17+
use Symfony\Bridge\PhpUnit\ClockMock;
1718
use Symfony\Component\Clock\MockClock;
1819
use Symfony\Component\EventDispatcher\EventDispatcher;
1920
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
@@ -47,8 +48,8 @@
4748
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
4849
use Symfony\Component\Messenger\Worker;
4950
use Symfony\Component\RateLimiter\RateLimiterFactory;
51+
use Symfony\Component\RateLimiter\Reservation;
5052
use Symfony\Component\RateLimiter\Storage\InMemoryStorage;
51-
use Symfony\Contracts\Service\ResetInterface;
5253

5354
/**
5455
* @group time-sensitive
@@ -73,7 +74,7 @@ public function testWorkerDispatchTheReceivedMessage()
7374
return $envelopes[] = $envelope;
7475
});
7576

76-
$dispatcher = new class() implements EventDispatcherInterface {
77+
$dispatcher = new class implements EventDispatcherInterface {
7778
private StopWorkerOnMessageLimitListener $listener;
7879

7980
public function __construct()
@@ -403,7 +404,7 @@ public function testWorkerLimitQueuesUnsupported()
403404

404405
$worker = new Worker(['transport1' => $receiver1, 'transport2' => $receiver2], $bus, clock: new MockClock());
405406
$this->expectException(RuntimeException::class);
406-
$this->expectExceptionMessage(sprintf('Receiver for "transport2" does not implement "%s".', QueueReceiverInterface::class));
407+
$this->expectExceptionMessage(\sprintf('Receiver for "transport2" does not implement "%s".', QueueReceiverInterface::class));
407408
$worker->run(['queues' => ['foo']]);
408409
}
409410

@@ -418,7 +419,7 @@ public function testWorkerMessageReceivedEventMutability()
418419
$eventDispatcher = new EventDispatcher();
419420
$eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
420421

421-
$stamp = new class() implements StampInterface {
422+
$stamp = new class implements StampInterface {
422423
};
423424
$listener = function (WorkerMessageReceivedEvent $event) use ($stamp) {
424425
$event->addStamps($stamp);
@@ -438,21 +439,21 @@ public function testWorkerRateLimitMessages()
438439
$envelope = [
439440
new Envelope(new DummyMessage('message1')),
440441
new Envelope(new DummyMessage('message2')),
442+
new Envelope(new DummyMessage('message3')),
443+
new Envelope(new DummyMessage('message4')),
441444
];
442445
$receiver = new DummyReceiver([$envelope]);
443446

444447
$bus = $this->createMock(MessageBusInterface::class);
445448
$bus->method('dispatch')->willReturnArgument(0);
446449

447450
$eventDispatcher = new EventDispatcher();
448-
$eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(2));
451+
$eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(4));
449452

450453
$rateLimitCount = 0;
451-
$listener = function (WorkerRateLimitedEvent $event) use (&$rateLimitCount) {
454+
$eventDispatcher->addListener(WorkerRateLimitedEvent::class, static function () use (&$rateLimitCount) {
452455
++$rateLimitCount;
453-
$event->getLimiter()->reset(); // Reset limiter to continue test
454-
};
455-
$eventDispatcher->addListener(WorkerRateLimitedEvent::class, $listener);
456+
});
456457

457458
$rateLimitFactory = new RateLimiterFactory([
458459
'id' => 'bus',
@@ -461,11 +462,14 @@ public function testWorkerRateLimitMessages()
461462
'interval' => '1 minute',
462463
], new InMemoryStorage());
463464

465+
ClockMock::register(Reservation::class);
466+
ClockMock::register(InMemoryStorage::class);
467+
464468
$worker = new Worker(['bus' => $receiver], $bus, $eventDispatcher, null, ['bus' => $rateLimitFactory], new MockClock());
465469
$worker->run();
466470

467-
$this->assertCount(2, $receiver->getAcknowledgedEnvelopes());
468-
$this->assertEquals(1, $rateLimitCount);
471+
$this->assertSame(4, $receiver->getAcknowledgeCount());
472+
$this->assertSame(3, $rateLimitCount);
469473
}
470474

471475
public function testWorkerShouldLogOnStop()

src/Symfony/Component/Messenger/Worker.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public function run(array $options = []): void
8686
// if queue names are specified, all receivers must implement the QueueReceiverInterface
8787
foreach ($this->receivers as $transportName => $receiver) {
8888
if (!$receiver instanceof QueueReceiverInterface) {
89-
throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueReceiverInterface::class));
89+
throw new RuntimeException(\sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueReceiverInterface::class));
9090
}
9191
}
9292
}
@@ -242,6 +242,7 @@ private function rateLimit(string $transportName): void
242242

243243
$this->eventDispatcher?->dispatch(new WorkerRateLimitedEvent($rateLimiter, $transportName));
244244
$rateLimiter->reserve()->wait();
245+
$rateLimiter->consume();
245246
}
246247

247248
private function flush(bool $force): bool

0 commit comments

Comments
 (0)