Skip to content

Commit 40ad473

Browse files
author
alex-pex
committed
Added default exchange and queue options
1 parent a837a59 commit 40ad473

File tree

5 files changed

+85
-23
lines changed

5 files changed

+85
-23
lines changed

DependencyInjection/Configuration.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ protected function addQueueNodeConfiguration(ArrayNodeDefinition $node)
277277
->booleanNode('exclusive')->defaultFalse()->end()
278278
->booleanNode('auto_delete')->defaultFalse()->end()
279279
->booleanNode('nowait')->defaultFalse()->end()
280+
->booleanNode('declare')->defaultTrue()->end()
280281
->variableNode('arguments')->defaultNull()->end()
281282
->scalarNode('ticket')->defaultNull()->end()
282283
->arrayNode('routing_keys')

DependencyInjection/OldSoundRabbitMqExtension.php

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,12 @@ protected function loadProducers()
9898
$definition->addTag('old_sound_rabbit_mq.producer');
9999
//this producer doesn't define an exchange -> using AMQP Default
100100
if (!isset($producer['exchange_options'])) {
101-
$producer['exchange_options']['name'] = '';
102-
$producer['exchange_options']['type'] = 'direct';
103-
$producer['exchange_options']['passive'] = true;
104-
$producer['exchange_options']['declare'] = false;
101+
$producer['exchange_options'] = $this->getDefaultExchangeOptions();
105102
}
106103
$definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
107-
//this producer doesn't define a queue
104+
//this producer doesn't define a queue -> using AMQP Default
108105
if (!isset($producer['queue_options'])) {
109-
$producer['queue_options']['name'] = null;
106+
$producer['queue_options'] = $this->getDefaultQueueOptions();
110107
}
111108
$definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
112109
$this->injectConnection($definition, $producer['connection']);
@@ -131,12 +128,19 @@ protected function loadConsumers()
131128
{
132129
foreach ($this->config['consumers'] as $key => $consumer) {
133130
$definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
134-
$definition
135-
->addTag('old_sound_rabbit_mq.base_amqp')
136-
->addTag('old_sound_rabbit_mq.consumer')
137-
->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
138-
->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
139-
->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
131+
$definition->addTag('old_sound_rabbit_mq.base_amqp');
132+
$definition->addTag('old_sound_rabbit_mq.consumer');
133+
//this consumer doesn't define an exchange -> using AMQP Default
134+
if (!isset($consumer['exchange_options'])) {
135+
$consumer['exchange_options'] = $this->getDefaultExchangeOptions();
136+
}
137+
$definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
138+
//this consumer doesn't define a queue -> using AMQP Default
139+
if (!isset($consumer['queue_options'])) {
140+
$consumer['queue_options'] = $this->getDefaultQueueOptions();
141+
}
142+
$definition->addMethodCall('setQueueOptions', array($consumer['queue_options']));
143+
$definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
140144

141145
if (array_key_exists('qos_options', $consumer)) {
142146
$definition->addMethodCall('setQosOptions', array(
@@ -178,7 +182,7 @@ protected function loadMultipleConsumers()
178182
}
179183

180184
foreach ($consumer['queues'] as $queueName => $queueOptions) {
181-
$queues[$queueOptions['name']] = $queueOptions;
185+
$queues[$queueOptions['name']] = $queueOptions;
182186
$queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
183187
$callbacks[] = new Reference($queueOptions['callback']);
184188
}
@@ -393,4 +397,32 @@ protected function addDequeuerAwareCall($callback, $name)
393397
$callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
394398
}
395399
}
400+
401+
/**
402+
* Get default AMQP exchange options
403+
*
404+
* @return array
405+
*/
406+
protected function getDefaultExchangeOptions()
407+
{
408+
return array(
409+
'name' => '',
410+
'type' => 'direct',
411+
'passive' => true,
412+
'declare' => false
413+
);
414+
}
415+
416+
/**
417+
* Get default AMQP queue options
418+
*
419+
* @return array
420+
*/
421+
protected function getDefaultQueueOptions()
422+
{
423+
return array(
424+
'name' => '',
425+
'declare' => false
426+
);
427+
}
396428
}

RabbitMq/BaseAmqp.php

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ abstract class BaseAmqp
3535
'auto_delete' => false,
3636
'nowait' => false,
3737
'arguments' => null,
38-
'ticket' => null
38+
'ticket' => null,
39+
'declare' => true,
3940
);
4041

4142
/**
@@ -132,6 +133,9 @@ public function setRoutingKey($routingKey)
132133
$this->routingKey = $routingKey;
133134
}
134135

136+
/**
137+
* Declares exchange
138+
*/
135139
protected function exchangeDeclare()
136140
{
137141
if ($this->exchangeOptions['declare']) {
@@ -150,26 +154,44 @@ protected function exchangeDeclare()
150154
}
151155
}
152156

157+
/**
158+
* Declares queue, creates if needed
159+
*/
153160
protected function queueDeclare()
154161
{
155-
if (null !== $this->queueOptions['name']) {
162+
if ($this->queueOptions['declare']) {
156163
list($queueName, ,) = $this->getChannel()->queue_declare($this->queueOptions['name'], $this->queueOptions['passive'],
157164
$this->queueOptions['durable'], $this->queueOptions['exclusive'],
158165
$this->queueOptions['auto_delete'], $this->queueOptions['nowait'],
159166
$this->queueOptions['arguments'], $this->queueOptions['ticket']);
160167

161168
if (isset($this->queueOptions['routing_keys']) && count($this->queueOptions['routing_keys']) > 0) {
162169
foreach ($this->queueOptions['routing_keys'] as $routingKey) {
163-
$this->getChannel()->queue_bind($queueName, $this->exchangeOptions['name'], $routingKey);
170+
$this->queueBind($queueName, $this->exchangeOptions['name'], $routingKey);
164171
}
165172
} else {
166-
$this->getChannel()->queue_bind($queueName, $this->exchangeOptions['name'], $this->routingKey);
173+
$this->queueBind($queueName, $this->exchangeOptions['name'], $this->routingKey);
167174
}
168175

169176
$this->queueDeclared = true;
170177
}
171178
}
172179

180+
/**
181+
* Binds queue to an exchange
182+
*
183+
* @param string $queue
184+
* @param string $exchange
185+
* @param string $routing_key
186+
*/
187+
protected function queueBind($queue, $exchange, $routing_key)
188+
{
189+
// queue binding is not permitted on the default exchange
190+
if ('' !== $exchange) {
191+
$this->getChannel()->queue_bind($queue, $exchange, $routing_key);
192+
}
193+
}
194+
173195
public function setupFabric()
174196
{
175197
if (!$this->exchangeDeclared) {

RabbitMq/MultipleConsumer.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ protected function queueDeclare()
6868

6969
if (isset($options['routing_keys']) && count($options['routing_keys']) > 0) {
7070
foreach ($options['routing_keys'] as $routingKey) {
71-
$this->getChannel()->queue_bind($queueName, $this->exchangeOptions['name'], $routingKey);
71+
$this->queueBind($queueName, $this->exchangeOptions['name'], $routingKey);
7272
}
7373
} else {
74-
$this->getChannel()->queue_bind($queueName, $this->exchangeOptions['name'], $this->routingKey);
74+
$this->queueBind($queueName, $this->exchangeOptions['name'], $this->routingKey);
7575
}
7676
}
7777

Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@ public function testFooProducerDefinition()
143143
'setQueueOptions',
144144
array(
145145
array(
146-
'name' => null,
146+
'name' => '',
147+
'declare' => false,
147148
)
148149
)
149150
)
@@ -183,7 +184,8 @@ public function testDefaultProducerDefinition()
183184
'setQueueOptions',
184185
array(
185186
array(
186-
'name' => null,
187+
'name' => '',
188+
'declare' => false,
187189
)
188190
)
189191
)
@@ -232,6 +234,7 @@ public function testFooConsumerDefinition()
232234
'arguments' => null,
233235
'ticket' => null,
234236
'routing_keys' => array('android.#.upload', 'iphone.upload'),
237+
'declare' => true,
235238
)
236239
)
237240
),
@@ -284,6 +287,7 @@ public function testDefaultConsumerDefinition()
284287
'arguments' => null,
285288
'ticket' => null,
286289
'routing_keys' => array(),
290+
'declare' => true,
287291
)
288292
)
289293
),
@@ -361,7 +365,8 @@ public function testMultipleConsumerDefinition()
361365
'arguments' => null,
362366
'ticket' => null,
363367
'routing_keys' => array(),
364-
'callback' => array(new Reference('foo.multiple_test1.callback'), 'execute')
368+
'callback' => array(new Reference('foo.multiple_test1.callback'), 'execute'),
369+
'declare' => true,
365370
),
366371
'foo_bar_2' => array(
367372
'name' => 'foo_bar_2',
@@ -376,7 +381,8 @@ public function testMultipleConsumerDefinition()
376381
'android.upload',
377382
'iphone.upload'
378383
),
379-
'callback' => array(new Reference('foo.multiple_test2.callback'), 'execute')
384+
'callback' => array(new Reference('foo.multiple_test2.callback'), 'execute'),
385+
'declare' => true,
380386
)
381387
)
382388
)
@@ -557,6 +563,7 @@ public function testRpcServerWithQueueOptionsDefinition()
557563
'arguments' => null,
558564
'ticket' => null,
559565
'routing_keys' => array(),
566+
'declare' => true,
560567
))),
561568
array('setSerializer', array('serialize')),
562569
),

0 commit comments

Comments
 (0)