Skip to content

Commit 0f76918

Browse files
committed
The receivers are returning a \Generator. This allows us to manage the exceptions without specific cases.
1 parent bbd5f5a commit 0f76918

File tree

4 files changed

+27
-9
lines changed

4 files changed

+27
-9
lines changed

src/Symfony/Component/Message/Asynchronous/Transport/WrapIntoReceivedMessage.php

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,16 @@ public function __construct(ReceiverInterface $decoratedConsumer)
2525
$this->decoratedReceiver = $decoratedConsumer;
2626
}
2727

28-
public function receive(): iterable
28+
public function receive(): \Generator
2929
{
30-
foreach ($this->decoratedReceiver->receive() as $message) {
31-
yield new ReceivedMessage($message);
30+
$generator = $this->decoratedReceiver->receive();
31+
32+
foreach ($generator as $message) {
33+
try {
34+
yield new ReceivedMessage($message);
35+
} catch (\Throwable $e) {
36+
$generator->throw($e);
37+
}
3238
}
3339
}
3440
}

src/Symfony/Component/Message/Transport/Enhancers/MaximumCountReceiver.php

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,17 @@ public function __construct(ReceiverInterface $decoratedReceiver, int $maximumNu
2424
$this->maximumNumberOfMessages = $maximumNumberOfMessages;
2525
}
2626

27-
public function receive(): iterable
27+
public function receive(): \Generator
2828
{
29+
$generator = $this->decoratedReceiver->receive();
2930
$receivedMessages = 0;
3031

31-
foreach ($this->decoratedReceiver->receive() as $message) {
32-
yield $message;
32+
foreach ($generator as $message) {
33+
try {
34+
yield $message;
35+
} catch (\Throwable $e) {
36+
$generator->throw($e);
37+
}
3338

3439
if (++$receivedMessages > $this->maximumNumberOfMessages) {
3540
break;

src/Symfony/Component/Message/Transport/ReceiverInterface.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@
1616
*/
1717
interface ReceiverInterface
1818
{
19-
public function receive(): iterable;
19+
public function receive(): \Generator;
2020
}

src/Symfony/Component/Message/Worker.php

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,19 @@ public function __construct(ReceiverInterface $receiver, MessageBusInterface $bu
3333
*/
3434
public function run()
3535
{
36-
foreach ($this->receiver->receive() as $message) {
36+
$messageGenerator = $this->receiver->receive();
37+
38+
foreach ($messageGenerator as $message) {
3739
if (!$message instanceof ReceivedMessage) {
3840
$message = new ReceivedMessage($message);
3941
}
4042

41-
$this->bus->dispatch($message);
43+
try {
44+
$this->bus->dispatch($message);
45+
} catch (\Throwable $e) {
46+
// Return the exception to the message generator so it can properly handle it if needed.
47+
$messageGenerator->throw($e);
48+
}
4249
}
4350
}
4451
}

0 commit comments

Comments
 (0)