Skip to content

Commit 0f73349

Browse files
[Messenger] make senders and handlers subscribing to parent interfaces receive *all* matching messages, wildcard included
1 parent 7a258b0 commit 0f73349

29 files changed

+369
-903
lines changed

CHANGELOG.md

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,38 +6,34 @@ CHANGELOG
66

77
* The component is not experimental anymore
88
* All the changes below are BC BREAKS
9+
* Senders and handlers subscribing to parent interfaces now receive *all* matching messages, wildcard included
910
* `MessageBusInterface::dispatch()`, `MiddlewareInterface::handle()` and `SenderInterface::send()` return `Envelope`
1011
* `MiddlewareInterface::handle()` now require an `Envelope` as first argument and a `StackInterface` as second
1112
* `EnvelopeAwareInterface` has been removed
1213
* The signature of `Amqp*` classes changed to take a `Connection` as a first argument and an optional
1314
`Serializer` as a second argument.
14-
* `SenderLocator` has been renamed to `ContainerSenderLocator`
15-
Be careful as there is still a `SenderLocator` class, but it does not rely on a `ContainerInterface` to find senders.
16-
Instead, it accepts the sender instance itself instead of its identifier in the container.
1715
* `MessageSubscriberInterface::getHandledMessages()` return value has changed. The value of an array item
1816
needs to be an associative array or the method name.
1917
* `StampInterface` replaces `EnvelopeItemInterface` and doesn't extend `Serializable` anymore
2018
* The `ConsumeMessagesCommand` class now takes an instance of `Psr\Container\ContainerInterface`
2119
as first constructor argument
2220
* The `EncoderInterface` and `DecoderInterface` have been replaced by a unified `Symfony\Component\Messenger\Transport\Serialization\SerializerInterface`.
23-
* The locator passed to `ContainerHandlerLocator` should not prefix its keys by "handler." anymore
24-
* The `AbstractHandlerLocator::getHandler()` method uses `?callable` as return type
2521
* Renamed `EnvelopeItemInterface` to `StampInterface`
2622
* `Envelope`'s constructor and `with()` method now accept `StampInterface` objects as variadic parameters
2723
* Renamed and moved `ReceivedMessage`, `ValidationConfiguration` and `SerializerConfiguration` in the `Stamp` namespace
28-
* Removed the `WrapIntoReceivedMessage`
29-
* `SenderLocatorInterface::getSenderForMessage()` has been replaced by `getSender(Envelope $envelope)`
24+
* Removed the `WrapIntoReceivedMessage` class
3025
* `MessengerDataCollector::getMessages()` returns an iterable, not just an array anymore
31-
* `AbstractHandlerLocator` is now internal
32-
* `HandlerLocatorInterface::resolve()` has been replaced by `getHandler(Envelope $envelope): ?callable` and shouldn't throw when no handlers are found
33-
* `SenderLocatorInterface::getSenderForMessage()` has been replaced by `getSender(Envelope $envelope)`
26+
* `HandlerLocatorInterface::resolve()` has been removed, use `HandlersLocator::getHandlers()` instead
27+
* `SenderLocatorInterface::getSenderForMessage()` has been removed, use `SendersLocator::getSenders()` instead
3428
* Classes in the `Middleware\Enhancers` sub-namespace have been moved to the `Middleware` one
3529
* Classes in the `Asynchronous\Routing` sub-namespace have been moved to the `Transport\Sender\Locator` sub-namespace
3630
* The `Asynchronous/Middleware/SendMessageMiddleware` class has been moved to the `Middleware` namespace
37-
* `SenderInterface` and `ChainSender` classes have been moved to the `Transport\Sender` sub-namespace
31+
* `SenderInterface` has been moved to the `Transport\Sender` sub-namespace
32+
* The `ChainHandler` and `ChainSender` classes have been removed
3833
* `ReceiverInterface` and its implementations have been moved to the `Transport\Receiver` sub-namespace
3934
* `ActivationMiddlewareDecorator` has been renamed `ActivationMiddleware`
4035
* `AllowNoHandlerMiddleware` has been removed in favor of a new constructor argument on `HandleMessageMiddleware`
36+
* The `ContainerHandlerLocator`, `AbstractHandlerLocator`, `SenderLocator` and `AbstractSenderLocator` classes have been removed
4137

4238
4.1.0
4339
-----

DependencyInjection/MessengerPass.php

Lines changed: 40 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\DependencyInjection;
1313

1414
use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
15+
use Symfony\Component\DependencyInjection\Argument\RewindableGenerator;
1516
use Symfony\Component\DependencyInjection\ChildDefinition;
1617
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
1718
use Symfony\Component\DependencyInjection\Compiler\PriorityTaggedServiceTrait;
@@ -20,12 +21,10 @@
2021
use Symfony\Component\DependencyInjection\Definition;
2122
use Symfony\Component\DependencyInjection\Exception\RuntimeException;
2223
use Symfony\Component\DependencyInjection\Reference;
23-
use Symfony\Component\Messenger\Handler\ChainHandler;
24-
use Symfony\Component\Messenger\Handler\Locator\ContainerHandlerLocator;
24+
use Symfony\Component\Messenger\Handler\HandlersLocator;
2525
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
2626
use Symfony\Component\Messenger\TraceableMessageBus;
2727
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
28-
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
2928

3029
/**
3130
* @author Samuel Roze <samuel.roze@gmail.com>
@@ -38,14 +37,12 @@ class MessengerPass implements CompilerPassInterface
3837

3938
private $handlerTag;
4039
private $busTag;
41-
private $senderTag;
4240
private $receiverTag;
4341

44-
public function __construct(string $handlerTag = 'messenger.message_handler', string $busTag = 'messenger.bus', string $senderTag = 'messenger.sender', string $receiverTag = 'messenger.receiver')
42+
public function __construct(string $handlerTag = 'messenger.message_handler', string $busTag = 'messenger.bus', string $receiverTag = 'messenger.receiver')
4543
{
4644
$this->handlerTag = $handlerTag;
4745
$this->busTag = $busTag;
48-
$this->senderTag = $senderTag;
4946
$this->receiverTag = $receiverTag;
5047
}
5148

@@ -54,10 +51,6 @@ public function __construct(string $handlerTag = 'messenger.message_handler', st
5451
*/
5552
public function process(ContainerBuilder $container)
5653
{
57-
if (!$container->has('message_bus')) {
58-
return;
59-
}
60-
6154
$busIds = array();
6255
foreach ($container->findTaggedServiceIds($this->busTag) as $busId => $tags) {
6356
$busIds[] = $busId;
@@ -72,8 +65,9 @@ public function process(ContainerBuilder $container)
7265
}
7366
}
7467

75-
$this->registerReceivers($container, $busIds);
76-
$this->registerSenders($container);
68+
if ($container->hasDefinition('messenger.receiver_locator')) {
69+
$this->registerReceivers($container, $busIds);
70+
}
7771
$this->registerHandlers($container, $busIds);
7872
}
7973

@@ -96,45 +90,41 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
9690
$handles = $this->guessHandledClasses($r, $serviceId);
9791
}
9892

99-
$priority = $tag['priority'] ?? 0;
93+
$message = null;
10094
$handlerBuses = (array) ($tag['bus'] ?? $busIds);
10195

102-
foreach ($handles as $messageClass => $method) {
96+
foreach ($handles as $message => $method) {
10397
$buses = $handlerBuses;
104-
if (\is_int($messageClass)) {
105-
$messageClass = $method;
98+
if (\is_int($message)) {
99+
$message = $method;
106100
$method = '__invoke';
107101
}
108102

109-
if (\is_array($messageClass)) {
110-
$messagePriority = $messageClass[1];
111-
$messageClass = $messageClass[0];
103+
if (\is_array($message)) {
104+
list($message, $priority) = $message;
112105
} else {
113-
$messagePriority = $priority;
106+
$priority = $tag['priority'] ?? 0;
114107
}
115108

116109
if (\is_array($method)) {
117110
if (isset($method['bus'])) {
118111
if (!\in_array($method['bus'], $busIds)) {
119-
$messageClassLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : $r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method);
112+
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : $r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method);
120113

121-
throw new RuntimeException(sprintf('Invalid configuration %s for message "%s": bus "%s" does not exist.', $messageClassLocation, $messageClass, $method['bus']));
114+
throw new RuntimeException(sprintf('Invalid configuration %s for message "%s": bus "%s" does not exist.', $messageLocation, $message, $method['bus']));
122115
}
123116

124117
$buses = array($method['bus']);
125118
}
126119

127-
if (isset($method['priority'])) {
128-
$messagePriority = $method['priority'];
129-
}
130-
120+
$priority = $method['priority'] ?? $priority;
131121
$method = $method['method'] ?? '__invoke';
132122
}
133123

134-
if (!\class_exists($messageClass) && !\interface_exists($messageClass, false)) {
135-
$messageClassLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : $r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method);
124+
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
125+
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : $r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method);
136126

137-
throw new RuntimeException(sprintf('Invalid handler service "%s": message class "%s" %s does not exist.', $serviceId, $messageClass, $messageClassLocation));
127+
throw new RuntimeException(sprintf('Invalid handler service "%s": class or interface "%s" %s not found.', $serviceId, $message, $messageLocation));
138128
}
139129

140130
if (!$r->hasMethod($method)) {
@@ -144,15 +134,19 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
144134
if ('__invoke' !== $method) {
145135
$wrapperDefinition = (new Definition('callable'))->addArgument(array(new Reference($serviceId), $method))->setFactory('Closure::fromCallable');
146136

147-
$definitions[$definitionId = '.messenger.method_on_object_wrapper.'.ContainerBuilder::hash($messageClass.':'.$messagePriority.':'.$serviceId.':'.$method)] = $wrapperDefinition;
137+
$definitions[$definitionId = '.messenger.method_on_object_wrapper.'.ContainerBuilder::hash($message.':'.$priority.':'.$serviceId.':'.$method)] = $wrapperDefinition;
148138
} else {
149139
$definitionId = $serviceId;
150140
}
151141

152142
foreach ($buses as $handlerBus) {
153-
$handlersByBusAndMessage[$handlerBus][$messageClass][$messagePriority][] = $definitionId;
143+
$handlersByBusAndMessage[$handlerBus][$message][$priority][] = $definitionId;
154144
}
155145
}
146+
147+
if (null === $message) {
148+
throw new RuntimeException(sprintf('Invalid handler service "%s": method "%s::getHandledMessages()" must return one or more messages.', $serviceId, $r->getName()));
149+
}
156150
}
157151
}
158152

@@ -165,29 +159,24 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
165159

166160
$handlersLocatorMappingByBus = array();
167161
foreach ($handlersByBusAndMessage as $bus => $handlersByMessage) {
168-
foreach ($handlersByMessage as $message => $handlersIds) {
169-
if (1 === \count($handlersIds)) {
170-
$handlersLocatorMappingByBus[$bus][$message] = new Reference(current($handlersIds));
171-
} else {
172-
$chainHandler = new Definition(ChainHandler::class, array(array_map(function (string $handlerId): Reference {
173-
return new Reference($handlerId);
174-
}, $handlersIds)));
175-
$chainHandler->setPrivate(true);
176-
$serviceId = '.messenger.chain_handler.'.ContainerBuilder::hash($bus.$message);
177-
$definitions[$serviceId] = $chainHandler;
178-
$handlersLocatorMappingByBus[$bus][$message] = new Reference($serviceId);
179-
}
162+
foreach ($handlersByMessage as $message => $handlerIds) {
163+
$handlers = array_map(function (string $handlerId) { return new Reference($handlerId); }, $handlerIds);
164+
$handlersId = "messenger.handlers.$bus.$message";
165+
$definitions[$handlersId] = (new Definition(RewindableGenerator::class))
166+
->setFactory('current')
167+
->addArgument(array($handlers));
168+
$handlersLocatorMappingByBus[$bus][$message] = new Reference($handlersId);
180169
}
181170
}
182171
$container->addDefinitions($definitions);
183172

184173
foreach ($busIds as $bus) {
185-
$container->register($resolverName = "$bus.messenger.handler_resolver", ContainerHandlerLocator::class)
186-
->setArgument(0, ServiceLocatorTagPass::register($container, $handlersLocatorMappingByBus[$bus] ?? array()))
174+
$container->register($locatorId = $bus.'.messenger.handlers_locator', HandlersLocator::class)
175+
->setArgument(0, $handlersLocatorMappingByBus[$bus] ?? array())
187176
;
188-
if ($container->has($callMessageHandlerId = "$bus.middleware.call_message_handler")) {
189-
$container->getDefinition($callMessageHandlerId)
190-
->replaceArgument(0, new Reference($resolverName))
177+
if ($container->has($handleMessageId = $bus.'.middleware.handle_message')) {
178+
$container->getDefinition($handleMessageId)
179+
->replaceArgument(0, new Reference($locatorId))
191180
;
192181
}
193182
}
@@ -206,11 +195,7 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
206195
private function guessHandledClasses(\ReflectionClass $handlerClass, string $serviceId): iterable
207196
{
208197
if ($handlerClass->implementsInterface(MessageSubscriberInterface::class)) {
209-
if (!$handledMessages = $handlerClass->getName()::getHandledMessages()) {
210-
throw new RuntimeException(sprintf('Invalid handler service "%s": method "%s::getHandledMessages()" must return one or more messages.', $serviceId, $handlerClass->getName()));
211-
}
212-
213-
return $handledMessages;
198+
return $handlerClass->getName()::getHandledMessages();
214199
}
215200

216201
try {
@@ -273,27 +258,6 @@ private function registerReceivers(ContainerBuilder $container, array $busIds)
273258
$container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping);
274259
}
275260

276-
private function registerSenders(ContainerBuilder $container)
277-
{
278-
$senderLocatorMapping = array();
279-
foreach ($container->findTaggedServiceIds($this->senderTag) as $id => $tags) {
280-
$senderClass = $container->findDefinition($id)->getClass();
281-
if (!is_subclass_of($senderClass, SenderInterface::class)) {
282-
throw new RuntimeException(sprintf('Invalid sender "%s": class "%s" must implement interface "%s".', $id, $senderClass, SenderInterface::class));
283-
}
284-
285-
$senderLocatorMapping[$id] = new Reference($id);
286-
287-
foreach ($tags as $tag) {
288-
if (isset($tag['alias'])) {
289-
$senderLocatorMapping[$tag['alias']] = $senderLocatorMapping[$id];
290-
}
291-
}
292-
}
293-
294-
$container->getDefinition('messenger.sender_locator')->replaceArgument(0, $senderLocatorMapping);
295-
}
296-
297261
private function registerBusToCollector(ContainerBuilder $container, string $busId)
298262
{
299263
$container->setDefinition(
@@ -315,18 +279,12 @@ private function registerBusMiddleware(ContainerBuilder $container, string $busI
315279
}
316280

317281
if (!$container->has($messengerMiddlewareId)) {
318-
throw new RuntimeException(sprintf('Invalid middleware "%s": define such service to be able to use it.', $id));
282+
throw new RuntimeException(sprintf('Invalid middleware: service "%s" not found.', $id));
319283
}
320284

321285
if (($definition = $container->findDefinition($messengerMiddlewareId))->isAbstract()) {
322286
$childDefinition = new ChildDefinition($messengerMiddlewareId);
323-
$count = \count($definition->getArguments());
324-
foreach (array_values($arguments ?? array()) as $key => $argument) {
325-
// Parent definition can provide default arguments.
326-
// Replace each explicitly or add if not set:
327-
$key < $count ? $childDefinition->replaceArgument($key, $argument) : $childDefinition->addArgument($argument);
328-
}
329-
287+
$childDefinition->setArguments($arguments);
330288
$container->setDefinition($messengerMiddlewareId = $busId.'.middleware.'.$id, $childDefinition);
331289
} elseif ($arguments) {
332290
throw new RuntimeException(sprintf('Invalid middleware factory "%s": a middleware factory must be an abstract definition.', $id));

Envelope.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,9 @@ public function getMessage()
7474
{
7575
return $this->message;
7676
}
77+
78+
public function getMessageName(): string
79+
{
80+
return \get_class($this->message);
81+
}
7782
}

Handler/ChainHandler.php

Lines changed: 0 additions & 48 deletions
This file was deleted.

0 commit comments

Comments
 (0)