Skip to content

Commit 3dd1b63

Browse files
committed
Merge pull request #238 from igaponov/custom-serializer
Add serialize/unserialize function for rpc
2 parents a15a4eb + a1c85f0 commit 3dd1b63

File tree

11 files changed

+101
-5
lines changed

11 files changed

+101
-5
lines changed

CHANGELOG

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
- 2015-02-07
2+
* Added possibility to set serialize/unserialize function for rpc servers/clients
3+
14
- 2014-11-27
25
* Added interface `OldSound\RabbitMqBundle\Provider\QueuesProviderInterface`
36
* Added `queues_provider` configuration for multiple consumer

DependencyInjection/Configuration.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ protected function addRpcClients(ArrayNodeDefinition $node)
185185
->children()
186186
->scalarNode('connection')->defaultValue('default')->end()
187187
->booleanNode('expect_serialized_response')->defaultTrue()->end()
188+
->scalarNode('unserializer')->defaultValue('unserialize')->end()
188189
->end()
189190
->end()
190191
->end()
@@ -213,6 +214,7 @@ protected function addRpcServers(ArrayNodeDefinition $node)
213214
->booleanNode('global')->defaultFalse()->end()
214215
->end()
215216
->end()
217+
->scalarNode('serializer')->defaultValue('serialize')->end()
216218
->end()
217219
->end()
218220
->end()

DependencyInjection/OldSoundRabbitMqExtension.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,9 @@ protected function loadRpcClients()
312312
if ($this->collectorEnabled) {
313313
$this->injectLoggedChannel($definition, $key, $client['connection']);
314314
}
315+
if (array_key_exists('unserializer', $client)) {
316+
$definition->addMethodCall('setUnserializer', array($client['unserializer']));
317+
}
315318

316319
$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
317320
}
@@ -340,6 +343,9 @@ protected function loadRpcServers()
340343
if (array_key_exists('queue_options', $server)) {
341344
$definition->addMethodCall('setQueueOptions', array($server['queue_options']));
342345
}
346+
if (array_key_exists('serializer', $server)) {
347+
$definition->addMethodCall('setSerializer', array($server['serializer']));
348+
}
343349
$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
344350
}
345351
}

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,12 +380,14 @@ Let's add a RPC client and server into the configuration:
380380
rpc_clients:
381381
integer_store:
382382
connection: default
383+
unserializer: json_decode
383384
rpc_servers:
384385
random_int:
385386
connection: default
386387
callback: random_int_server
387388
qos_options: {prefetch_size: 0, prefetch_count: 1, global: false}
388389
queue_options: {name: random_int_queue, durable: false, auto_delete: true}
390+
serializer: json_encode
389391
```
390392
391393
*For a full configuration reference please use the `php app/console config:dump-reference old_sound_rabbit_mq` command.*

RabbitMq/RpcClient.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class RpcClient extends BaseAmqp
1313
protected $timeout = 0;
1414

1515
private $queueName;
16+
private $unserializer = 'unserialize';
1617

1718
public function initClient($expectSerializedResponse = true)
1819
{
@@ -60,7 +61,7 @@ public function processMessage(AMQPMessage $msg)
6061
{
6162
$messageBody = $msg->body;
6263
if ($this->expectSerializedResponse) {
63-
$messageBody = unserialize($messageBody);
64+
$messageBody = call_user_func($this->unserializer, $messageBody);
6465
}
6566

6667
$this->replies[$msg->get('correlation_id')] = $messageBody;
@@ -74,4 +75,9 @@ protected function getQueueName()
7475

7576
return $this->queueName;
7677
}
78+
79+
public function setUnserializer($unserializer)
80+
{
81+
$this->unserializer = $unserializer;
82+
}
7783
}

RabbitMq/RpcServer.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
class RpcServer extends BaseConsumer
88
{
9+
private $serializer = 'serialize';
10+
911
public function initServer($name)
1012
{
1113
$this->setExchangeOptions(array('name' => $name, 'type' => 'direct'));
@@ -17,7 +19,8 @@ public function processMessage(AMQPMessage $msg)
1719
try {
1820
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
1921
$result = call_user_func($this->callback, $msg);
20-
$this->sendReply(serialize($result), $msg->get('reply_to'), $msg->get('correlation_id'));
22+
$result = call_user_func($this->serializer, $result);
23+
$this->sendReply($result, $msg->get('reply_to'), $msg->get('correlation_id'));
2124
$this->consumed++;
2225
$this->maybeStopConsumer();
2326
} catch (\Exception $e) {
@@ -30,4 +33,9 @@ protected function sendReply($result, $client, $correlationId)
3033
$reply = new AMQPMessage($result, array('content_type' => 'text/plain', 'correlation_id' => $correlationId));
3134
$this->getChannel()->basic_publish($reply, '', $client);
3235
}
36+
37+
public function setSerializer($serializer)
38+
{
39+
$this->serializer = $serializer;
40+
}
3341
}

Tests/DependencyInjection/Fixtures/test.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,15 @@ old_sound_rabbit_mq:
147147
rpc_clients:
148148
foo_client:
149149
connection: foo_connection
150+
unserializer: json_decode
150151

151152
default_client:
152153

153154
rpc_servers:
154155
foo_server:
155156
connection: foo_connection
156157
callback: foo_server.callback
158+
serializer: json_encode
157159

158160
default_server:
159161
callback: default_server.callback

Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,10 @@ public function testFooRpcClientDefinition()
473473
$this->assertEquals((string) $definition->getArgument(0), 'old_sound_rabbit_mq.connection.foo_connection');
474474
$this->assertEquals((string) $definition->getArgument(1), 'old_sound_rabbit_mq.channel.foo_client');
475475
$this->assertEquals(
476-
array(array('initClient', array(true))),
476+
array(
477+
array('initClient', array(true)),
478+
array('setUnserializer', array('json_decode'))
479+
),
477480
$definition->getMethodCalls()
478481
);
479482
$this->assertEquals('%old_sound_rabbit_mq.rpc_client.class%', $definition->getClass());
@@ -488,7 +491,10 @@ public function testDefaultRpcClientDefinition()
488491
$this->assertEquals((string) $definition->getArgument(0), 'old_sound_rabbit_mq.connection.default');
489492
$this->assertEquals((string) $definition->getArgument(1), 'old_sound_rabbit_mq.channel.default_client');
490493
$this->assertEquals(
491-
array(array('initClient', array(true))),
494+
array(
495+
array('initClient', array(true)),
496+
array('setUnserializer', array('unserialize'))
497+
),
492498
$definition->getMethodCalls()
493499
);
494500
$this->assertEquals('%old_sound_rabbit_mq.rpc_client.class%', $definition->getClass());
@@ -505,6 +511,7 @@ public function testFooRpcServerDefinition()
505511
$this->assertEquals(array(
506512
array('initServer', array('foo_server')),
507513
array('setCallback', array(array(new Reference('foo_server.callback'), 'execute'))),
514+
array('setSerializer', array('json_encode')),
508515
),
509516
$definition->getMethodCalls()
510517
);
@@ -522,6 +529,7 @@ public function testDefaultRpcServerDefinition()
522529
$this->assertEquals(array(
523530
array('initServer', array('default_server')),
524531
array('setCallback', array(array(new Reference('default_server.callback'), 'execute'))),
532+
array('setSerializer', array('serialize')),
525533
),
526534
$definition->getMethodCalls()
527535
);
@@ -550,6 +558,7 @@ public function testRpcServerWithQueueOptionsDefinition()
550558
'ticket' => null,
551559
'routing_keys' => array(),
552560
))),
561+
array('setSerializer', array('serialize')),
553562
),
554563
$definition->getMethodCalls()
555564
);

Tests/RabbitMq/RpcClientTest.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Tests\RabbitMq;
4+
5+
use OldSound\RabbitMqBundle\RabbitMq\RpcClient;
6+
7+
class RpcClientTest extends \PHPUnit_Framework_TestCase
8+
{
9+
public function testProcessMessageWithCustomUnserializer()
10+
{
11+
/** @var RpcClient $client */
12+
$client = $this->getMockBuilder('\OldSound\RabbitMqBundle\RabbitMq\RpcClient')
13+
->setMethods(array('sendReply', 'maybeStopConsumer'))
14+
->disableOriginalConstructor()
15+
->getMock();
16+
$message = $this->getMock('\PhpAmqpLib\Message\AMQPMessage', array('get'), array('message'));
17+
$serializer = $this->getMock('\Symfony\Component\Serializer\SerializerInterface', array('serialize', 'deserialize'));
18+
$serializer->expects($this->once())->method('deserialize')->with('message', 'json', null);
19+
$client->initClient(true);
20+
$client->setUnserializer(function($data) use ($serializer) {
21+
$serializer->deserialize($data, 'json', null);
22+
});
23+
$client->processMessage($message);
24+
}
25+
}

Tests/RabbitMq/RpcServerTest.php

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Tests\RabbitMq;
4+
5+
use OldSound\RabbitMqBundle\RabbitMq\RpcServer;
6+
use PhpAmqpLib\Message\AMQPMessage;
7+
8+
class RpcServerTest extends \PHPUnit_Framework_TestCase
9+
{
10+
public function testProcessMessageWithCustomSerializer()
11+
{
12+
/** @var RpcServer $server */
13+
$server = $this->getMockBuilder('\OldSound\RabbitMqBundle\RabbitMq\RpcServer')
14+
->setMethods(array('sendReply', 'maybeStopConsumer'))
15+
->disableOriginalConstructor()
16+
->getMock();
17+
$message = $this->getMock('\PhpAmqpLib\Message\AMQPMessage', array('get'));
18+
$message->delivery_info = array(
19+
'channel' => $this->getMock('\PhpAmqpLib\Channel\AMQPChannel', array(), array(), '', false),
20+
'delivery_tag' => null
21+
);
22+
$server->setCallback(function() {
23+
return 'message';
24+
});
25+
$serializer = $this->getMock('\Symfony\Component\Serializer\SerializerInterface', array('serialize', 'deserialize'));
26+
$serializer->expects($this->once())->method('serialize')->with('message', 'json');
27+
$server->setSerializer(function($data) use ($serializer) {
28+
$serializer->serialize($data, 'json');
29+
});
30+
$server->processMessage($message);
31+
}
32+
}

0 commit comments

Comments
 (0)