Skip to content

Commit 131cb76

Browse files
committed
Added direct reply-to clients support
1 parent 119c60d commit 131cb76

File tree

6 files changed

+42
-5
lines changed

6 files changed

+42
-5
lines changed

DependencyInjection/Configuration.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ protected function addRpcClients(ArrayNodeDefinition $node)
218218
->scalarNode('connection')->defaultValue('default')->end()
219219
->booleanNode('expect_serialized_response')->defaultTrue()->end()
220220
->scalarNode('unserializer')->defaultValue('unserialize')->end()
221+
->booleanNode('direct_reply_to')->defaultFalse()->end()
221222
->end()
222223
->end()
223224
->end()

DependencyInjection/OldSoundRabbitMqExtension.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,9 @@ protected function loadRpcClients()
373373
if (array_key_exists('unserializer', $client)) {
374374
$definition->addMethodCall('setUnserializer', array($client['unserializer']));
375375
}
376+
if (array_key_exists('direct_reply_to', $client)) {
377+
$definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
378+
}
376379

377380
$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
378381
}

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ rpc_clients:
376376
integer_store:
377377
connection: default
378378
unserializer: json_decode
379+
direct_reply_to: false
379380
rpc_servers:
380381
random_int:
381382
connection: default
@@ -476,6 +477,12 @@ public function indexAction($name)
476477
477478
Is very similar to the previous example, we just have an extra `addRequest` call. Also we provide meaningful request identifiers so later will be easier for us to find the reply we want in the __$replies__ array.
478479
480+
### Direct Reply-To clients ###
481+
482+
To enable [direct reply-to clients](https://www.rabbitmq.com/direct-reply-to.html) you just have to enable option __direct_reply_to__ on the __rpc_clients__ configuration for the client.
483+
484+
This option will use pseudo-queue __amq.rabbitmq.reply-to__ when doing RPC calls. On the RPC server there is no modification needed.
485+
479486
### Multiple Consumers ###
480487
481488
It's a good practice to have a lot of queues for logic separation. With a simple consumer you will have to create one worker (consumer) per queue and it can be hard to manage when dealing

RabbitMq/RpcClient.php

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ class RpcClient extends BaseAmqp
1414

1515
private $queueName;
1616
private $unserializer = 'unserialize';
17+
private $directReplyTo;
18+
private $directConsumerTag;
1719

1820
public function initClient($expectSerializedResponse = true)
1921
{
@@ -26,8 +28,20 @@ public function addRequest($msgBody, $server, $requestId = null, $routingKey = '
2628
throw new \InvalidArgumentException('You must provide a $requestId');
2729
}
2830

31+
if (0 == $this->requests) {
32+
// On first addRequest() call, clear all replies
33+
$this->replies = array();
34+
35+
if ($this->directReplyTo) {
36+
// On direct reply-to mode, make initial consume call
37+
$this->directConsumerTag = $this->getChannel()->basic_consume('amq.rabbitmq.reply-to', '', false, true, false, false, array($this, 'processMessage'));
38+
}
39+
}
40+
2941
$msg = new AMQPMessage($msgBody, array('content_type' => 'text/plain',
30-
'reply_to' => $this->getQueueName(),
42+
'reply_to' => $this->directReplyTo
43+
? 'amq.rabbitmq.reply-to' // On direct reply-to mode, use predefined queue name
44+
: $this->getQueueName(),
3145
'delivery_mode' => 1, // non durable
3246
'expiration' => $expiration*1000,
3347
'correlation_id' => $requestId));
@@ -43,14 +57,18 @@ public function addRequest($msgBody, $server, $requestId = null, $routingKey = '
4357

4458
public function getReplies()
4559
{
46-
$this->replies = array();
47-
$consumer_tag = $this->getChannel()->basic_consume($this->getQueueName(), '', false, true, false, false, array($this, 'processMessage'));
60+
if ($this->directReplyTo) {
61+
$consumer_tag = $this->directConsumerTag;
62+
} else {
63+
$consumer_tag = $this->getChannel()->basic_consume($this->getQueueName(), '', false, true, false, false, array($this, 'processMessage'));
64+
}
4865

4966
while (count($this->replies) < $this->requests) {
5067
$this->getChannel()->wait(null, false, $this->timeout);
5168
}
5269

5370
$this->getChannel()->basic_cancel($consumer_tag);
71+
$this->directConsumerTag = null;
5472
$this->requests = 0;
5573
$this->timeout = 0;
5674

@@ -80,4 +98,9 @@ public function setUnserializer($unserializer)
8098
{
8199
$this->unserializer = $unserializer;
82100
}
101+
102+
public function setDirectReplyTo($directReplyTo)
103+
{
104+
$this->directReplyTo = $directReplyTo;
105+
}
83106
}

Tests/DependencyInjection/Fixtures/test.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ old_sound_rabbit_mq:
164164
foo_client:
165165
connection: foo_connection
166166
unserializer: json_decode
167+
direct_reply_to: true
167168

168169
default_client:
169170

Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,8 @@ public function testFooRpcClientDefinition()
518518
$this->assertEquals(
519519
array(
520520
array('initClient', array(true)),
521-
array('setUnserializer', array('json_decode'))
521+
array('setUnserializer', array('json_decode')),
522+
array('setDirectReplyTo', array(true)),
522523
),
523524
$definition->getMethodCalls()
524525
);
@@ -536,7 +537,8 @@ public function testDefaultRpcClientDefinition()
536537
$this->assertEquals(
537538
array(
538539
array('initClient', array(true)),
539-
array('setUnserializer', array('unserialize'))
540+
array('setUnserializer', array('unserialize')),
541+
array('setDirectReplyTo', array(false)),
540542
),
541543
$definition->getMethodCalls()
542544
);

0 commit comments

Comments
 (0)