Skip to content

Commit 8066cb6

Browse files
committed
Uses a service locator to discover receivers
1 parent 4a835bc commit 8066cb6

File tree

6 files changed

+47
-17
lines changed

6 files changed

+47
-17
lines changed

src/Symfony/Bundle/FrameworkBundle/Command/MessageConsumeCommand.php

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111

1212
namespace Symfony\Bundle\FrameworkBundle\Command;
1313

14+
use Psr\Container\ContainerInterface;
1415
use Symfony\Component\Console\Command\Command;
1516
use Symfony\Component\Console\Input\InputArgument;
1617
use Symfony\Component\Console\Input\InputInterface;
1718
use Symfony\Component\Console\Input\InputOption;
1819
use Symfony\Component\Console\Output\OutputInterface;
19-
use Symfony\Component\DependencyInjection\ContainerInterface;
2020
use Symfony\Component\Message\MessageBusInterface;
2121
use Symfony\Component\Message\Transport\Enhancers\MaximumCountReceiver;
2222
use Symfony\Component\Message\Transport\ReceiverInterface;
@@ -29,6 +29,17 @@ class MessageConsumeCommand extends Command
2929
{
3030
protected static $defaultName = 'message:consume';
3131

32+
private $bus;
33+
private $receiverLocator;
34+
35+
public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator)
36+
{
37+
parent::__construct();
38+
39+
$this->bus = $bus;
40+
$this->receiverLocator = $receiverLocator;
41+
}
42+
3243
/**
3344
* {@inheritdoc}
3445
*/
@@ -37,7 +48,6 @@ protected function configure()
3748
$this
3849
->setDefinition(array(
3950
new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'),
40-
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to dispatch the messages to', 'message_bus'),
4151
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
4252
))
4353
->setDescription('Consumes a message')
@@ -55,30 +65,19 @@ protected function configure()
5565
*/
5666
protected function execute(InputInterface $input, OutputInterface $output)
5767
{
58-
/** @var ContainerInterface $container */
59-
$container = $this->getApplication()->getKernel()->getContainer();
60-
61-
if (!$container->has($receiverName = $input->getArgument('receiver'))) {
68+
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
6269
throw new \RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
6370
}
6471

65-
if (!($receiver = $container->get($receiverName)) instanceof ReceiverInterface) {
72+
if (!($receiver = $this->receiverLocator->get($receiverName)) instanceof ReceiverInterface) {
6673
throw new \RuntimeException(sprintf('Receiver "%s" is not a valid message consumer. It must implement the "%s" interface.', $receiverName, ReceiverInterface::class));
6774
}
6875

69-
if (!$container->has($busName = $input->getOption('bus'))) {
70-
throw new \RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
71-
}
72-
73-
if (!($messageBus = $container->get($busName)) instanceof MessageBusInterface) {
74-
throw new \RuntimeException(sprintf('Bus "%s" is not a valid message bus. It must implement the "%s" interface.', $busName, MessageBusInterface::class));
75-
}
76-
7776
if ($limit = $input->getOption('limit')) {
7877
$receiver = new MaximumCountReceiver($receiver, $limit);
7978
}
8079

81-
$worker = new Worker($receiver, $messageBus);
80+
$worker = new Worker($receiver, $this->bus);
8281
$worker->run();
8382
}
8483
}

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
use Symfony\Component\DependencyInjection\ParameterBag\ContainerBagInterface;
4343
use Symfony\Component\DependencyInjection\ParameterBag\ParameterBagInterface;
4444
use Symfony\Component\DependencyInjection\Reference;
45+
use Symfony\Component\DependencyInjection\ServiceLocator;
4546
use Symfony\Component\DependencyInjection\ServiceSubscriberInterface;
4647
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
4748
use Symfony\Component\ExpressionLanguage\ExpressionLanguage;
@@ -58,6 +59,8 @@
5859
use Symfony\Component\Lock\LockInterface;
5960
use Symfony\Component\Lock\Store\StoreFactory;
6061
use Symfony\Component\Lock\StoreInterface;
62+
use Symfony\Component\Message\Transport\ReceiverInterface;
63+
use Symfony\Component\Message\Transport\SenderInterface;
6164
use Symfony\Component\PropertyAccess\PropertyAccessor;
6265
use Symfony\Component\PropertyInfo\PropertyAccessExtractorInterface;
6366
use Symfony\Component\PropertyInfo\PropertyDescriptionExtractorInterface;
@@ -267,6 +270,8 @@ public function load(array $configs, ContainerBuilder $container)
267270

268271
if ($this->isConfigEnabled($container, $config['message'])) {
269272
$this->registerMessageConfiguration($config['message'], $container, $loader);
273+
} else {
274+
$container->removeDefinition('console.command.message_consume');
270275
}
271276

272277
if ($this->isConfigEnabled($container, $config['web_link'])) {
@@ -334,6 +339,10 @@ public function load(array $configs, ContainerBuilder $container)
334339
->addTag('validator.constraint_validator');
335340
$container->registerForAutoconfiguration(ObjectInitializerInterface::class)
336341
->addTag('validator.initializer');
342+
$container->registerForAutoconfiguration(ReceiverInterface::class)
343+
->addTag('message.receiver');
344+
$container->registerForAutoconfiguration(SenderInterface::class)
345+
->addTag('message.sender');
337346

338347
if (!$container->getParameter('kernel.debug')) {
339348
// remove tagged iterator argument for resource checkers

src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@
6565
</service>
6666

6767
<service id="console.command.message_consume" class="Symfony\Bundle\FrameworkBundle\Command\MessageConsumeCommand">
68+
<argument type="service" id="message_bus" />
69+
<argument type="service" id="message.receiver_locator" />
70+
6871
<tag name="console.command" command="message:consume" />
6972
</service>
7073

src/Symfony/Bundle/FrameworkBundle/Resources/config/message.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,5 +55,11 @@
5555
<tag name="data_collector" template="@WebProfiler/Collector/messages.html.twig" id="messages" priority="100" />
5656
<tag name="message_middleware" />
5757
</service>
58+
59+
<!-- Discovery -->
60+
<service id="message.receiver_locator">
61+
<tag name="container.service_locator" />
62+
<argument type="collection" />
63+
</service>
5864
</services>
5965
</container>

src/Symfony/Component/Message/DependencyInjection/MessagePass.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public function process(ContainerBuilder $container)
4747
return;
4848
}
4949

50+
$this->registerReceivers($container);
5051
$this->registerHandlers($container);
5152
}
5253

@@ -113,4 +114,16 @@ private function guessHandledClass(ContainerBuilder $container, string $serviceI
113114

114115
return $parameter->getClass()->getName();
115116
}
117+
118+
private function registerReceivers(ContainerBuilder $container)
119+
{
120+
$receiverMapping = [];
121+
foreach ($container->findTaggedServiceIds('message.receiver') as $id => $tags) {
122+
foreach ($tags as $tag) {
123+
$receiverMapping[$tag['id'] ?? $id] = new Reference($id);
124+
}
125+
}
126+
127+
$container->getDefinition('message.receiver_locator')->replaceArgument(0, $receiverMapping);
128+
}
116129
}

src/Symfony/Component/Message/Handler/ChainHandler.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public function __construct(array $handlers)
3737

3838
public function __invoke($message)
3939
{
40-
$results = [];
40+
$results = array();
4141

4242
foreach ($this->handlers as $handler) {
4343
$results[] = $handler($message);

0 commit comments

Comments
 (0)