Skip to content

Commit f1ad9a3

Browse files
Merge branch '5.4' into 6.0
* 5.4: [Messenger] cs fix [Messenger] Fix time-limit check exception [Console] Fix console `ProgressBar::override()` after manual `ProgressBar::cleanup()` [FrameworkBundle] typo default_lifetime example [HttpClient] Handle Amp HTTP client v5 incompatibility gracefully [HttpKernel] Don’t try to wire Response argument with controller.service_arguments [PhpUnitBridge] Fix language deprecations incorrectly marked as direct Tell about messenger:consume invalid limit options [Messenger] Do not throw 'no handlers' exception when skipping due to duplicate handling
2 parents 785fafd + ead27c3 commit f1ad9a3

File tree

5 files changed

+111
-12
lines changed

5 files changed

+111
-12
lines changed

Command/ConsumeMessagesCommand.php

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use Symfony\Component\Console\Command\Command;
1818
use Symfony\Component\Console\Completion\CompletionInput;
1919
use Symfony\Component\Console\Completion\CompletionSuggestions;
20+
use Symfony\Component\Console\Exception\InvalidOptionException;
2021
use Symfony\Component\Console\Exception\RuntimeException;
2122
use Symfony\Component\Console\Input\InputArgument;
2223
use Symfony\Component\Console\Input\InputInterface;
@@ -131,7 +132,7 @@ protected function interact(InputInterface $input, OutputInterface $output)
131132
{
132133
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
133134

134-
if ($this->receiverNames && 0 === \count($input->getArgument('receivers'))) {
135+
if ($this->receiverNames && !$input->getArgument('receivers')) {
135136
$io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);
136137

137138
$io->writeln('Choose which receivers you want to consume messages from in order of priority.');
@@ -145,7 +146,7 @@ protected function interact(InputInterface $input, OutputInterface $output)
145146
$input->setArgument('receivers', $io->askQuestion($question));
146147
}
147148

148-
if (0 === \count($input->getArgument('receivers'))) {
149+
if (!$input->getArgument('receivers')) {
149150
throw new RuntimeException('Please pass at least one receiver.');
150151
}
151152
}
@@ -174,7 +175,11 @@ protected function execute(InputInterface $input, OutputInterface $output): int
174175
}
175176

176177
$stopsWhen = [];
177-
if ($limit = $input->getOption('limit')) {
178+
if (null !== $limit = $input->getOption('limit')) {
179+
if (!is_numeric($limit) || 0 >= $limit) {
180+
throw new InvalidOptionException(sprintf('Option "limit" must be a positive integer, "%s" passed.', $limit));
181+
}
182+
178183
$stopsWhen[] = "processed {$limit} messages";
179184
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($limit, $this->logger));
180185
}
@@ -189,15 +194,19 @@ protected function execute(InputInterface $input, OutputInterface $output): int
189194
$this->eventDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener($this->convertToBytes($memoryLimit), $this->logger));
190195
}
191196

192-
if (null !== ($timeLimit = $input->getOption('time-limit'))) {
197+
if (null !== $timeLimit = $input->getOption('time-limit')) {
198+
if (!is_numeric($timeLimit) || 0 >= $timeLimit) {
199+
throw new InvalidOptionException(sprintf('Option "time-limit" must be a positive integer, "%s" passed.', $timeLimit));
200+
}
201+
193202
$stopsWhen[] = "been running for {$timeLimit}s";
194203
$this->eventDispatcher->addSubscriber(new StopWorkerOnTimeLimitListener($timeLimit, $this->logger));
195204
}
196205

197206
$stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';
198207

199208
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
200-
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
209+
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 1 ? 's' : '', implode(', ', $receiverNames)));
201210

202211
if ($stopsWhen) {
203212
$last = array_pop($stopsWhen);
@@ -252,11 +261,11 @@ private function convertToBytes(string $memoryLimit): int
252261

253262
switch (substr(rtrim($memoryLimit, 'b'), -1)) {
254263
case 't': $max *= 1024;
255-
// no break
264+
// no break
256265
case 'g': $max *= 1024;
257-
// no break
266+
// no break
258267
case 'm': $max *= 1024;
259-
// no break
268+
// no break
260269
case 'k': $max *= 1024;
261270
}
262271

EventListener/StopWorkerOnTimeLimitListener.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
1616
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
1717
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
18+
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
1819

1920
/**
2021
* @author Simon Delicata <simon.delicata@free.fr>
@@ -30,6 +31,10 @@ public function __construct(int $timeLimitInSeconds, LoggerInterface $logger = n
3031
{
3132
$this->timeLimitInSeconds = $timeLimitInSeconds;
3233
$this->logger = $logger;
34+
35+
if ($timeLimitInSeconds <= 0) {
36+
throw new InvalidArgumentException('Time limit must be greater than zero.');
37+
}
3338
}
3439

3540
public function onWorkerStarted(): void

Middleware/HandleMessageMiddleware.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,10 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
5757
];
5858

5959
$exceptions = [];
60+
$alreadyHandled = false;
6061
foreach ($this->handlersLocator->getHandlers($envelope) as $handlerDescriptor) {
6162
if ($this->messageHasAlreadyBeenHandled($envelope, $handlerDescriptor)) {
63+
$alreadyHandled = true;
6264
continue;
6365
}
6466

@@ -116,7 +118,7 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
116118
}
117119
}
118120

119-
if (null === $handler) {
121+
if (null === $handler && !$alreadyHandled) {
120122
if (!$this->allowNoHandlers) {
121123
throw new NoHandlerForMessageException(sprintf('No handler for message "%s".', $context['class']));
122124
}

Tests/Command/ConsumeMessagesCommandTest.php

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Console\Application;
16+
use Symfony\Component\Console\Exception\InvalidOptionException;
1617
use Symfony\Component\Console\Tester\CommandCompletionTester;
1718
use Symfony\Component\Console\Tester\CommandTester;
1819
use Symfony\Component\DependencyInjection\ContainerInterface;
@@ -68,7 +69,7 @@ public function testBasicRun()
6869
]);
6970

7071
$tester->assertCommandIsSuccessful();
71-
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
72+
$this->assertStringContainsString('[OK] Consuming messages from transport "dummy-receiver"', $tester->getDisplay());
7273
}
7374

7475
public function testRunWithBusOption()
@@ -101,7 +102,7 @@ public function testRunWithBusOption()
101102
]);
102103

103104
$tester->assertCommandIsSuccessful();
104-
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
105+
$this->assertStringContainsString('[OK] Consuming messages from transport "dummy-receiver"', $tester->getDisplay());
105106
}
106107

107108
public function provideRunWithResetServicesOption(): iterable
@@ -146,7 +147,71 @@ public function testRunWithResetServicesOption(bool $shouldReset)
146147

147148
$this->assertEquals($shouldReset, $receiver->hasBeenReset(), '$receiver->reset() should have been called');
148149
$tester->assertCommandIsSuccessful();
149-
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
150+
$this->assertStringContainsString('[OK] Consuming messages from transport "dummy-receiver"', $tester->getDisplay());
151+
}
152+
153+
/**
154+
* @dataProvider getInvalidOptions
155+
*/
156+
public function testRunWithInvalidOption(string $option, string $value, string $expectedMessage)
157+
{
158+
$receiverLocator = $this->createMock(ContainerInterface::class);
159+
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
160+
161+
$busLocator = $this->createMock(ContainerInterface::class);
162+
163+
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher());
164+
165+
$application = new Application();
166+
$application->add($command);
167+
$tester = new CommandTester($application->get('messenger:consume'));
168+
169+
$this->expectException(InvalidOptionException::class);
170+
$this->expectExceptionMessage($expectedMessage);
171+
$tester->execute([
172+
'receivers' => ['dummy-receiver'],
173+
$option => $value,
174+
]);
175+
}
176+
177+
public function getInvalidOptions()
178+
{
179+
yield 'Zero message limit' => ['--limit', '0', 'Option "limit" must be a positive integer, "0" passed.'];
180+
yield 'Non-numeric message limit' => ['--limit', 'whatever', 'Option "limit" must be a positive integer, "whatever" passed.'];
181+
182+
yield 'Zero second time limit' => ['--time-limit', '0', 'Option "time-limit" must be a positive integer, "0" passed.'];
183+
yield 'Non-numeric time limit' => ['--time-limit', 'whatever', 'Option "time-limit" must be a positive integer, "whatever" passed.'];
184+
}
185+
186+
public function testRunWithTimeLimit()
187+
{
188+
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
189+
190+
$receiver = $this->createMock(ReceiverInterface::class);
191+
$receiver->method('get')->willReturn([$envelope]);
192+
193+
$receiverLocator = $this->createMock(ContainerInterface::class);
194+
$receiverLocator->method('has')->with('dummy-receiver')->willReturn(true);
195+
$receiverLocator->method('get')->with('dummy-receiver')->willReturn($receiver);
196+
197+
$bus = $this->createMock(MessageBusInterface::class);
198+
199+
$busLocator = $this->createMock(ContainerInterface::class);
200+
$busLocator->method('has')->with('dummy-bus')->willReturn(true);
201+
$busLocator->method('get')->with('dummy-bus')->willReturn($bus);
202+
203+
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher());
204+
205+
$application = new Application();
206+
$application->add($command);
207+
$tester = new CommandTester($application->get('messenger:consume'));
208+
$tester->execute([
209+
'receivers' => ['dummy-receiver'],
210+
'--time-limit' => 1,
211+
]);
212+
213+
$this->assertSame(0, $tester->getStatusCode());
214+
$this->assertStringContainsString('[OK] Consuming messages from transport "dummy-receiver"', $tester->getDisplay());
150215
}
151216

152217
/**

Tests/Middleware/HandleMessageMiddlewareTest.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,24 @@ public function testThrowsNoHandlerException()
129129
$middleware->handle(new Envelope(new DummyMessage('Hey')), new StackMiddleware());
130130
}
131131

132+
public function testMessageAlreadyHandled()
133+
{
134+
$handler = $this->createPartialMock(HandleMessageMiddlewareTestCallable::class, ['__invoke']);
135+
136+
$middleware = new HandleMessageMiddleware(new HandlersLocator([
137+
DummyMessage::class => [$handler],
138+
]));
139+
140+
$envelope = new Envelope(new DummyMessage('Hey'));
141+
142+
$envelope = $middleware->handle($envelope, $this->getStackMock());
143+
$handledStamp = $envelope->all(HandledStamp::class);
144+
145+
$envelope = $middleware->handle($envelope, $this->getStackMock());
146+
147+
$this->assertSame($envelope->all(HandledStamp::class), $handledStamp);
148+
}
149+
132150
public function testAllowNoHandlers()
133151
{
134152
$middleware = new HandleMessageMiddleware(new HandlersLocator([]), true);

0 commit comments

Comments
 (0)