Skip to content

Commit 556d139

Browse files
committed
Added direct reply-to clients support #329 - merge master
2 parents b6f370b + 131cb76 commit 556d139

File tree

6 files changed

+49
-5
lines changed

6 files changed

+49
-5
lines changed

DependencyInjection/Configuration.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ protected function addRpcClients(ArrayNodeDefinition $node)
249249
->booleanNode('expect_serialized_response')->defaultTrue()->end()
250250
->scalarNode('unserializer')->defaultValue('unserialize')->end()
251251
->booleanNode('lazy')->defaultFalse()->end()
252+
->booleanNode('direct_reply_to')->defaultFalse()->end()
252253
->end()
253254
->end()
254255
->end()

DependencyInjection/OldSoundRabbitMqExtension.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,9 @@ protected function loadRpcClients()
416416
if (array_key_exists('unserializer', $client)) {
417417
$definition->addMethodCall('setUnserializer', array($client['unserializer']));
418418
}
419+
if (array_key_exists('direct_reply_to', $client)) {
420+
$definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
421+
}
419422

420423
$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
421424
}

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,7 @@ rpc_clients:
390390
connection: default #default: default
391391
unserializer: json_decode #default: unserialize
392392
lazy: true #default: false
393+
direct_reply_to: false
393394
rpc_servers:
394395
random_int:
395396
connection: default
@@ -490,6 +491,12 @@ public function indexAction($name)
490491
491492
Is very similar to the previous example, we just have an extra `addRequest` call. Also we provide meaningful request identifiers so later will be easier for us to find the reply we want in the __$replies__ array.
492493
494+
### Direct Reply-To clients ###
495+
496+
To enable [direct reply-to clients](https://www.rabbitmq.com/direct-reply-to.html) you just have to enable option __direct_reply_to__ on the __rpc_clients__ configuration for the client.
497+
498+
This option will use pseudo-queue __amq.rabbitmq.reply-to__ when doing RPC calls. On the RPC server there is no modification needed.
499+
493500
### Multiple Consumers ###
494501
495502
It's a good practice to have a lot of queues for logic separation. With a simple consumer you will have to create one worker (consumer) per queue and it can be hard to manage when dealing

RabbitMq/RpcClient.php

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ class RpcClient extends BaseAmqp
1414

1515
private $queueName;
1616
private $unserializer = 'unserialize';
17+
private $directReplyTo;
18+
private $directConsumerTag;
1719

1820
public function initClient($expectSerializedResponse = true)
1921
{
@@ -26,8 +28,20 @@ public function addRequest($msgBody, $server, $requestId = null, $routingKey = '
2628
throw new \InvalidArgumentException('You must provide a $requestId');
2729
}
2830

31+
if (0 == $this->requests) {
32+
// On first addRequest() call, clear all replies
33+
$this->replies = array();
34+
35+
if ($this->directReplyTo) {
36+
// On direct reply-to mode, make initial consume call
37+
$this->directConsumerTag = $this->getChannel()->basic_consume('amq.rabbitmq.reply-to', '', false, true, false, false, array($this, 'processMessage'));
38+
}
39+
}
40+
2941
$msg = new AMQPMessage($msgBody, array('content_type' => 'text/plain',
30-
'reply_to' => $this->getQueueName(),
42+
'reply_to' => $this->directReplyTo
43+
? 'amq.rabbitmq.reply-to' // On direct reply-to mode, use predefined queue name
44+
: $this->getQueueName(),
3145
'delivery_mode' => 1, // non durable
3246
'expiration' => $expiration*1000,
3347
'correlation_id' => $requestId));
@@ -43,14 +57,18 @@ public function addRequest($msgBody, $server, $requestId = null, $routingKey = '
4357

4458
public function getReplies()
4559
{
46-
$this->replies = array();
47-
$consumer_tag = $this->getChannel()->basic_consume($this->getQueueName(), '', false, true, false, false, array($this, 'processMessage'));
60+
if ($this->directReplyTo) {
61+
$consumer_tag = $this->directConsumerTag;
62+
} else {
63+
$consumer_tag = $this->getChannel()->basic_consume($this->getQueueName(), '', false, true, false, false, array($this, 'processMessage'));
64+
}
4865

4966
while (count($this->replies) < $this->requests) {
5067
$this->getChannel()->wait(null, false, $this->timeout);
5168
}
5269

5370
$this->getChannel()->basic_cancel($consumer_tag);
71+
$this->directConsumerTag = null;
5472
$this->requests = 0;
5573
$this->timeout = 0;
5674

@@ -80,4 +98,9 @@ public function setUnserializer($unserializer)
8098
{
8199
$this->unserializer = $unserializer;
82100
}
101+
102+
public function setDirectReplyTo($directReplyTo)
103+
{
104+
$this->directReplyTo = $directReplyTo;
105+
}
83106
}

Tests/DependencyInjection/Fixtures/test.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,14 @@ old_sound_rabbit_mq:
162162
type: direct
163163
callback: default_anon.callback
164164

165+
rpc_clients:
166+
foo_client:
167+
connection: foo_connection
168+
unserializer: json_decode
169+
direct_reply_to: true
170+
171+
default_client:
172+
165173
rpc_servers:
166174
foo_server:
167175
connection: foo_connection

Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,8 @@ public function testFooRpcClientDefinition()
606606
$this->assertEquals(
607607
array(
608608
array('initClient', array(true)),
609-
array('setUnserializer', array('json_decode'))
609+
array('setUnserializer', array('json_decode')),
610+
array('setDirectReplyTo', array(true)),
610611
),
611612
$definition->getMethodCalls()
612613
);
@@ -624,7 +625,8 @@ public function testDefaultRpcClientDefinition()
624625
$this->assertEquals(
625626
array(
626627
array('initClient', array(true)),
627-
array('setUnserializer', array('unserialize'))
628+
array('setUnserializer', array('unserialize')),
629+
array('setDirectReplyTo', array(false)),
628630
),
629631
$definition->getMethodCalls()
630632
);

0 commit comments

Comments
 (0)