Skip to content

Commit a4d6758

Browse files
committed
Merge branch 'alex-pex-detached-queues-v2'
2 parents f2625a1 + 00eef52 commit a4d6758

File tree

5 files changed

+84
-22
lines changed

5 files changed

+84
-22
lines changed

DependencyInjection/Configuration.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ protected function addQueueNodeConfiguration(ArrayNodeDefinition $node)
343343
->booleanNode('exclusive')->defaultFalse()->end()
344344
->booleanNode('auto_delete')->defaultFalse()->end()
345345
->booleanNode('nowait')->defaultFalse()->end()
346+
->booleanNode('declare')->defaultTrue()->end()
346347
->variableNode('arguments')->defaultNull()->end()
347348
->scalarNode('ticket')->defaultNull()->end()
348349
->arrayNode('routing_keys')

DependencyInjection/OldSoundRabbitMqExtension.php

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,12 @@ protected function loadProducers()
131131
$definition->addTag('old_sound_rabbit_mq.producer');
132132
//this producer doesn't define an exchange -> using AMQP Default
133133
if (!isset($producer['exchange_options'])) {
134-
$producer['exchange_options']['name'] = '';
135-
$producer['exchange_options']['type'] = 'direct';
136-
$producer['exchange_options']['passive'] = true;
137-
$producer['exchange_options']['declare'] = false;
134+
$producer['exchange_options'] = $this->getDefaultExchangeOptions();
138135
}
139136
$definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
140-
//this producer doesn't define a queue
137+
//this producer doesn't define a queue -> using AMQP Default
141138
if (!isset($producer['queue_options'])) {
142-
$producer['queue_options']['name'] = null;
139+
$producer['queue_options'] = $this->getDefaultQueueOptions();
143140
}
144141
$definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
145142
$this->injectConnection($definition, $producer['connection']);
@@ -168,12 +165,19 @@ protected function loadConsumers()
168165
{
169166
foreach ($this->config['consumers'] as $key => $consumer) {
170167
$definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
171-
$definition
172-
->addTag('old_sound_rabbit_mq.base_amqp')
173-
->addTag('old_sound_rabbit_mq.consumer')
174-
->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
175-
->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
176-
->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
168+
$definition->addTag('old_sound_rabbit_mq.base_amqp');
169+
$definition->addTag('old_sound_rabbit_mq.consumer');
170+
//this consumer doesn't define an exchange -> using AMQP Default
171+
if (!isset($consumer['exchange_options'])) {
172+
$consumer['exchange_options'] = $this->getDefaultExchangeOptions();
173+
}
174+
$definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
175+
//this consumer doesn't define a queue -> using AMQP Default
176+
if (!isset($consumer['queue_options'])) {
177+
$consumer['queue_options'] = $this->getDefaultQueueOptions();
178+
}
179+
$definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
180+
$definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
177181

178182
if (array_key_exists('qos_options', $consumer)) {
179183
$definition->addMethodCall('setQosOptions', array(
@@ -510,4 +514,32 @@ private function injectLogger(Definition $definition)
510514
$definition->addMethodCall('setLogger', array(new Reference('logger')));
511515
}
512516
}
517+
518+
/**
519+
* Get default AMQP exchange options
520+
*
521+
* @return array
522+
*/
523+
protected function getDefaultExchangeOptions()
524+
{
525+
return array(
526+
'name' => '',
527+
'type' => 'direct',
528+
'passive' => true,
529+
'declare' => false
530+
);
531+
}
532+
533+
/**
534+
* Get default AMQP queue options
535+
*
536+
* @return array
537+
*/
538+
protected function getDefaultQueueOptions()
539+
{
540+
return array(
541+
'name' => '',
542+
'declare' => false
543+
);
544+
}
513545
}

RabbitMq/BaseAmqp.php

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ abstract class BaseAmqp
4242
'auto_delete' => false,
4343
'nowait' => false,
4444
'arguments' => null,
45-
'ticket' => null
45+
'ticket' => null,
46+
'declare' => true,
4647
);
4748

4849
/**
@@ -141,6 +142,9 @@ public function setRoutingKey($routingKey)
141142
$this->routingKey = $routingKey;
142143
}
143144

145+
/**
146+
* Declares exchange
147+
*/
144148
protected function exchangeDeclare()
145149
{
146150
if ($this->exchangeOptions['declare']) {
@@ -159,26 +163,44 @@ protected function exchangeDeclare()
159163
}
160164
}
161165

166+
/**
167+
* Declares queue, creates if needed
168+
*/
162169
protected function queueDeclare()
163170
{
164-
if (null !== $this->queueOptions['name']) {
171+
if ($this->queueOptions['declare']) {
165172
list($queueName, ,) = $this->getChannel()->queue_declare($this->queueOptions['name'], $this->queueOptions['passive'],
166173
$this->queueOptions['durable'], $this->queueOptions['exclusive'],
167174
$this->queueOptions['auto_delete'], $this->queueOptions['nowait'],
168175
$this->queueOptions['arguments'], $this->queueOptions['ticket']);
169176

170177
if (isset($this->queueOptions['routing_keys']) && count($this->queueOptions['routing_keys']) > 0) {
171178
foreach ($this->queueOptions['routing_keys'] as $routingKey) {
172-
$this->getChannel()->queue_bind($queueName, $this->exchangeOptions['name'], $routingKey);
179+
$this->queueBind($queueName, $this->exchangeOptions['name'], $routingKey);
173180
}
174181
} else {
175-
$this->getChannel()->queue_bind($queueName, $this->exchangeOptions['name'], $this->routingKey);
182+
$this->queueBind($queueName, $this->exchangeOptions['name'], $this->routingKey);
176183
}
177184

178185
$this->queueDeclared = true;
179186
}
180187
}
181188

189+
/**
190+
* Binds queue to an exchange
191+
*
192+
* @param string $queue
193+
* @param string $exchange
194+
* @param string $routing_key
195+
*/
196+
protected function queueBind($queue, $exchange, $routing_key)
197+
{
198+
// queue binding is not permitted on the default exchange
199+
if ('' !== $exchange) {
200+
$this->getChannel()->queue_bind($queue, $exchange, $routing_key);
201+
}
202+
}
203+
182204
public function setupFabric()
183205
{
184206
if (!$this->exchangeDeclared) {

RabbitMq/MultipleConsumer.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ protected function queueDeclare()
6868

6969
if (isset($options['routing_keys']) && count($options['routing_keys']) > 0) {
7070
foreach ($options['routing_keys'] as $routingKey) {
71-
$this->getChannel()->queue_bind($queueName, $this->exchangeOptions['name'], $routingKey);
71+
$this->queueBind($queueName, $this->exchangeOptions['name'], $routingKey);
7272
}
7373
} else {
74-
$this->getChannel()->queue_bind($queueName, $this->exchangeOptions['name'], $this->routingKey);
74+
$this->queueBind($queueName, $this->exchangeOptions['name'], $this->routingKey);
7575
}
7676
}
7777

Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,8 @@ public function testFooProducerDefinition()
231231
'setQueueOptions',
232232
array(
233233
array(
234-
'name' => null,
234+
'name' => '',
235+
'declare' => false,
235236
)
236237
)
237238
)
@@ -271,7 +272,8 @@ public function testDefaultProducerDefinition()
271272
'setQueueOptions',
272273
array(
273274
array(
274-
'name' => null,
275+
'name' => '',
276+
'declare' => false,
275277
)
276278
)
277279
)
@@ -320,6 +322,7 @@ public function testFooConsumerDefinition()
320322
'arguments' => null,
321323
'ticket' => null,
322324
'routing_keys' => array('android.#.upload', 'iphone.upload'),
325+
'declare' => true,
323326
)
324327
)
325328
),
@@ -372,6 +375,7 @@ public function testDefaultConsumerDefinition()
372375
'arguments' => null,
373376
'ticket' => null,
374377
'routing_keys' => array(),
378+
'declare' => true,
375379
)
376380
)
377381
),
@@ -449,7 +453,8 @@ public function testMultipleConsumerDefinition()
449453
'arguments' => null,
450454
'ticket' => null,
451455
'routing_keys' => array(),
452-
'callback' => array(new Reference('foo.multiple_test1.callback'), 'execute')
456+
'callback' => array(new Reference('foo.multiple_test1.callback'), 'execute'),
457+
'declare' => true,
453458
),
454459
'foo_bar_2' => array(
455460
'name' => 'foo_bar_2',
@@ -464,7 +469,8 @@ public function testMultipleConsumerDefinition()
464469
'android.upload',
465470
'iphone.upload'
466471
),
467-
'callback' => array(new Reference('foo.multiple_test2.callback'), 'execute')
472+
'callback' => array(new Reference('foo.multiple_test2.callback'), 'execute'),
473+
'declare' => true,
468474
)
469475
)
470476
)
@@ -711,6 +717,7 @@ public function testRpcServerWithQueueOptionsDefinition()
711717
'arguments' => null,
712718
'ticket' => null,
713719
'routing_keys' => array(),
720+
'declare' => true,
714721
))),
715722
array('setSerializer', array('serialize')),
716723
),

0 commit comments

Comments
 (0)