Skip to content

Commit c98e3d7

Browse files
committed
Add an option to limit the number of received messages
1 parent 6f10698 commit c98e3d7

File tree

2 files changed

+45
-0
lines changed

2 files changed

+45
-0
lines changed

src/Symfony/Bundle/FrameworkBundle/Command/MessageConsumeCommand.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Console\Output\OutputInterface;
1919
use Symfony\Component\DependencyInjection\ContainerInterface;
2020
use Symfony\Component\Message\MessageBusInterface;
21+
use Symfony\Component\Message\Transport\Enhancers\MaximumCountReceiver;
2122
use Symfony\Component\Message\Transport\ReceiverInterface;
2223
use Symfony\Component\Message\Worker;
2324

@@ -37,6 +38,7 @@ protected function configure()
3738
->setDefinition(array(
3839
new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'),
3940
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to dispatch the messages to', 'message_bus'),
41+
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
4042
))
4143
->setDescription('Consumes a message')
4244
->setHelp(<<<'EOF'
@@ -72,6 +74,10 @@ protected function execute(InputInterface $input, OutputInterface $output)
7274
throw new \RuntimeException(sprintf('Bus "%s" is not a valid message bus. It must implement the "%s" interface.', $busName, MessageBusInterface::class));
7375
}
7476

77+
if ($limit = $input->getOption('limit')) {
78+
$receiver = new MaximumCountReceiver($receiver, $limit);
79+
}
80+
7581
$worker = new Worker($receiver, $messageBus);
7682
$worker->run();
7783
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Message\Transport\Enhancers;
13+
14+
use Symfony\Component\Message\Transport\ReceiverInterface;
15+
16+
class MaximumCountReceiver implements ReceiverInterface
17+
{
18+
private $decoratedReceiver;
19+
private $maximumNumberOfMessages;
20+
21+
public function __construct(ReceiverInterface $decoratedReceiver, int $maximumNumberOfMessages)
22+
{
23+
$this->decoratedReceiver = $decoratedReceiver;
24+
$this->maximumNumberOfMessages = $maximumNumberOfMessages;
25+
}
26+
27+
public function receive(): iterable
28+
{
29+
$receivedMessages = 0;
30+
31+
foreach ($this->decoratedReceiver->receive() as $message) {
32+
yield $message;
33+
34+
if (++$receivedMessages > $this->maximumNumberOfMessages) {
35+
break;
36+
}
37+
}
38+
}
39+
}

0 commit comments

Comments
 (0)