Skip to content

Commit 40e59d9

Browse files
committed
Uses a Worker within the FrameworkBundle command
1 parent 4504058 commit 40e59d9

File tree

2 files changed

+47
-8
lines changed

2 files changed

+47
-8
lines changed

src/Symfony/Bundle/FrameworkBundle/Command/MessageConsumeCommand.php

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
use Symfony\Component\Console\Input\InputOption;
1818
use Symfony\Component\Console\Output\OutputInterface;
1919
use Symfony\Component\DependencyInjection\ContainerInterface;
20-
use Symfony\Component\Message\Asynchronous\Transport\ReceivedMessage;
2120
use Symfony\Component\Message\MessageBusInterface;
2221
use Symfony\Component\Message\Transport\ReceiverInterface;
22+
use Symfony\Component\Message\Worker;
2323

2424
/**
2525
* @author Samuel Roze <samuel.roze@gmail.com>
@@ -69,12 +69,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
6969
throw new \RuntimeException(sprintf('Bus "%s" is not a valid message bus. It must implement the interface "%s"', $busName, MessageBusInterface::class));
7070
}
7171

72-
foreach ($receiver->receive() as $message) {
73-
if (!$message instanceof ReceivedMessage) {
74-
$message = new ReceivedMessage($message);
75-
}
76-
77-
$messageBus->dispatch($message);
78-
}
72+
$worker = new Worker($receiver, $messageBus);
73+
$worker->run();
7974
}
8075
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Message;
13+
14+
use Symfony\Component\Message\Asynchronous\Transport\ReceivedMessage;
15+
use Symfony\Component\Message\Transport\ReceiverInterface;
16+
17+
/**
18+
* @author Samuel Roze <samuel.roze@gmail.com>
19+
*/
20+
class Worker
21+
{
22+
private $receiver;
23+
private $bus;
24+
25+
public function __construct(ReceiverInterface $receiver, MessageBusInterface $bus)
26+
{
27+
$this->receiver = $receiver;
28+
$this->bus = $bus;
29+
}
30+
31+
/**
32+
* Receive the messages and dispatch them to the bus.
33+
*/
34+
public function run()
35+
{
36+
foreach ($this->receiver->receive() as $message) {
37+
if (!$message instanceof ReceivedMessage) {
38+
$message = new ReceivedMessage($message);
39+
}
40+
41+
$this->bus->dispatch($message);
42+
}
43+
}
44+
}

0 commit comments

Comments
 (0)