Skip to content

Commit e2039f0

Browse files
alexbumbaceaskafandri
authored andcommitted
#353 - added logger rabbitmq
1 parent c98c62d commit e2039f0

File tree

9 files changed

+134
-6
lines changed

9 files changed

+134
-6
lines changed

DependencyInjection/Configuration.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ protected function addProducers(ArrayNodeDefinition $node)
8888
->scalarNode('connection')->defaultValue('default')->end()
8989
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
9090
->scalarNode('class')->defaultValue('%old_sound_rabbit_mq.producer.class%')->end()
91+
->scalarNode('audit')->defaultFalse()->end()
9192
->end()
9293
->end()
9394
->end()
@@ -143,6 +144,7 @@ protected function addConsumers(ArrayNodeDefinition $node)
143144
->booleanNode('global')->defaultFalse()->end()
144145
->end()
145146
->end()
147+
->scalarNode('audit')->defaultFalse()->end()
146148
->end()
147149
->end()
148150
->end()
@@ -174,6 +176,7 @@ protected function addMultipleConsumers(ArrayNodeDefinition $node)
174176
->end()
175177
->end()
176178
->scalarNode('queues_provider')->defaultNull()->end()
179+
->scalarNode('audit')->defaultFalse()->end()
177180
->end()
178181
->end()
179182
->end()
@@ -204,6 +207,7 @@ protected function addDynamicConsumers(ArrayNodeDefinition $node)
204207
->end()
205208
->end()
206209
->scalarNode('queue_options_provider')->isRequired()->end()
210+
->scalarNode('audit')->defaultFalse()->end()
207211
->end()
208212
->end()
209213
->end()
@@ -275,6 +279,7 @@ protected function addRpcServers(ArrayNodeDefinition $node)
275279
->end()
276280
->end()
277281
->scalarNode('serializer')->defaultValue('serialize')->end()
282+
->scalarNode('audit')->defaultFalse()->end()
278283
->end()
279284
->end()
280285
->end()

DependencyInjection/OldSoundRabbitMqExtension.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,10 @@ protected function loadProducers()
150150
$definition->addMethodCall('disableAutoSetupFabric');
151151
}
152152

153+
if ($producer['audit']) {
154+
$this->injectLogger($definition);
155+
}
156+
153157
$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer', $key), $definition);
154158
}
155159
} else {
@@ -191,6 +195,10 @@ protected function loadConsumers()
191195
$this->injectLoggedChannel($definition, $key, $consumer['connection']);
192196
}
193197

198+
if ($consumer['audit']) {
199+
$this->injectLogger($definition);
200+
}
201+
194202
$name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
195203
$this->container->setDefinition($name, $definition);
196204
$this->addDequeuerAwareCall($consumer['callback'], $name);
@@ -250,6 +258,10 @@ protected function loadMultipleConsumers()
250258
$this->injectLoggedChannel($definition, $key, $consumer['connection']);
251259
}
252260

261+
if ($consumer['audit']) {
262+
$this->injectLogger($definition);
263+
}
264+
253265
$name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
254266
$this->container->setDefinition($name, $definition);
255267
if ($consumer['queues_provider']) {
@@ -305,6 +317,10 @@ protected function loadDynamicConsumers()
305317
$this->injectLoggedChannel($definition, $key, $consumer['connection']);
306318
}
307319

320+
if ($consumer['audit']) {
321+
$this->injectLogger($definition);
322+
}
323+
308324
$name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
309325
$this->container->setDefinition($name, $definition);
310326
$this->addDequeuerAwareCall($consumer['callback'], $name);
@@ -481,4 +497,11 @@ protected function addDequeuerAwareCall($callback, $name)
481497
$callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
482498
}
483499
}
500+
501+
private function injectLogger(Definition $definition)
502+
{
503+
$definition->addTag('monolog.logger', [
504+
'channel' => 'phpamqplib'
505+
]);
506+
}
484507
}

README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,23 @@ This seems to be quite a lot of work for just sending messages, let's recap to h
361361
362362
And that's it!
363363
364+
### Audit / Logging ###
365+
366+
This was a requirement to have a traceability of messages received/published.
367+
In order to enable this you'll need to add "audit" config to consumers or publishers.
368+
369+
```yaml
370+
consumers:
371+
upload_picture:
372+
connection: default
373+
exchange_options: {name: 'upload-picture', type: direct}
374+
queue_options: {name: 'upload-picture'}
375+
callback: upload_picture_service
376+
audit: true
377+
```
378+
379+
If you would like you can also treat logging from queues with different handlers in monolog, by referencing channel "phpamqplib"
380+
364381
### RPC or Reply/Response ###
365382
366383
So far we just have sent messages to consumers, but what if we want to get a reply from them? To achieve this we have to implement RPC calls into our application. This bundle makes it pretty easy to achieve such things with Symfony2.

RabbitMq/BaseAmqp.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
use PhpAmqpLib\Channel\AMQPChannel;
55
use PhpAmqpLib\Connection\AbstractConnection;
66
use PhpAmqpLib\Connection\AMQPLazyConnection;
7+
use Psr\Log\LoggerInterface;
8+
use Psr\Log\NullLogger;
79

810
abstract class BaseAmqp
911
{
@@ -16,6 +18,11 @@ abstract class BaseAmqp
1618
protected $autoSetupFabric = true;
1719
protected $basicProperties = array('content_type' => 'text/plain', 'delivery_mode' => 2);
1820

21+
/**
22+
* @var LoggerInterface
23+
*/
24+
protected $logger;
25+
1926
protected $exchangeOptions = array(
2027
'passive' => false,
2128
'durable' => true,
@@ -53,6 +60,8 @@ public function __construct(AbstractConnection $conn, AMQPChannel $ch = null, $c
5360
}
5461

5562
$this->consumerTag = empty($consumerTag) ? sprintf("PHPPROCESS_%s_%s", gethostname(), getmypid()) : $consumerTag;
63+
64+
$this->logger = new NullLogger();
5665
}
5766

5867
public function __destruct()
@@ -187,4 +196,12 @@ public function setupFabric()
187196
public function disableAutoSetupFabric() {
188197
$this->autoSetupFabric = false;
189198
}
199+
200+
/**
201+
* @param LoggerInterface $logger
202+
*/
203+
public function setLogger($logger)
204+
{
205+
$this->logger = $logger;
206+
}
190207
}

RabbitMq/Consumer.php

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,36 @@ public function delete()
6767

6868
public function processMessage(AMQPMessage $msg)
6969
{
70-
$processFlag = call_user_func($this->callback, $msg);
70+
try {
71+
$processFlag = call_user_func($this->callback, $msg);
72+
$this->handleProcessMessage($msg, $processFlag);
73+
$this->logger->debug('Queue message processed', array(
74+
'amqp' => array(
75+
'queue' => $this->queueOptions['name'],
76+
'message' => $msg,
77+
'return_code' => $processFlag
78+
)
79+
));
80+
} catch (\Exception $e) {
81+
$this->logger->error($e->getMessage(), array(
82+
'amqp' => array(
83+
'queue' => $this->queueOptions['name'],
84+
'message' => $msg,
85+
'stacktrace' => $e->getTraceAsString()
86+
)
87+
));
88+
throw $e;
89+
} catch (\Error $e) {
90+
$this->logger->error($e->getMessage(), array(
91+
'amqp' => array(
92+
'queue' => $this->queueOptions['name'],
93+
'message' => $msg,
94+
'stacktrace' => $e->getTraceAsString()
95+
)
96+
));
97+
throw $e;
98+
}
7199

72-
$this->handleProcessMessage($msg, $processFlag);
73100
}
74101

75102
protected function handleProcessMessage(AMQPMessage $msg, $processFlag)

RabbitMq/Producer.php

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
namespace OldSound\RabbitMqBundle\RabbitMq;
44

5-
use OldSound\RabbitMqBundle\RabbitMq\BaseAmqp;
65
use PhpAmqpLib\Message\AMQPMessage;
76
use PhpAmqpLib\Wire\AMQPTable;
87

@@ -49,11 +48,19 @@ public function publish($msgBody, $routingKey = '', $additionalProperties = arra
4948

5049
$msg = new AMQPMessage((string) $msgBody, array_merge($this->getBasicProperties(), $additionalProperties));
5150

52-
if(!empty($headers)){
51+
if (!empty($headers)) {
5352
$headersTable = new AMQPTable($headers);
5453
$msg->set('application_headers', $headersTable);
5554
}
5655

57-
$this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string) $routingKey);
56+
$this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string)$routingKey);
57+
$this->logger->debug('AMQP message published', array(
58+
'amqp' => array(
59+
'body' => $msgBody,
60+
'routingkeys' => $routingKey,
61+
'properties' => $additionalProperties,
62+
'headers' => $headers
63+
)
64+
));
5865
}
5966
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
old_sound_rabbit_mq:
2+
3+
enable_collector: true
4+
5+
connections:
6+
default:
7+
8+
producers:
9+
default_producer:
10+
exchange_options:
11+
name: default_exchange
12+
type: direct
13+
14+
consumers:
15+
default_consumer:
16+
exchange_options:
17+
name: default_exchange
18+
type: direct
19+
queue_options:
20+
name: default_queue
21+
callback: default.callback
22+
audit: true

Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,15 @@ public function testProducerWithoutExplicitExchangeOptionsConnectsToAMQPDefault(
805805
$this->assertEquals(true, $options[0]['passive']);
806806
}
807807

808+
public function testProducersWithLogger()
809+
{
810+
$container = $this->getContainer('config_with_audit.yml');
811+
$this->assertTrue(
812+
$container->getDefinition('old_sound_rabbit_mq.default_consumer_consumer')->hasTag('monolog.logger'),
813+
'service should be marked for logger'
814+
);
815+
}
816+
808817
private function getContainer($file, $debug = false)
809818
{
810819
$container = new ContainerBuilder(new ParameterBag(array('kernel.debug' => $debug)));

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
"symfony/config": "~2.3 || ~3.0",
1616
"symfony/yaml": "~2.3 || ~3.0",
1717
"symfony/console": "~2.3 || ~3.0",
18-
"php-amqplib/php-amqplib": "~2.6"
18+
"php-amqplib/php-amqplib": "~2.6",
19+
"psr/log": "~1.0"
1920
},
2021
"require-dev": {
2122
"symfony/serializer": "~2.3 || ~3.0",

0 commit comments

Comments
 (0)