@@ -53,6 +53,7 @@ public class RabbitMqProperties {
53
53
private Integer channelsCount ;
54
54
private boolean createExchangeIfNotExisting ;
55
55
private Integer tcpTimeOut ;
56
+ private boolean hasExchange = false ;
56
57
// built in tcp connection timeout value for MB in milliseconds.
57
58
public static final Integer DEFAULT_TCP_TIMEOUT = 60000 ;
58
59
private Long waitForConfirmsTimeOut ;
@@ -88,7 +89,10 @@ public String getExchangeName() {
88
89
}
89
90
90
91
public void setExchangeName (String exchangeName ) {
91
- this .exchangeName = exchangeName ;
92
+ if (!exchangeName .equals (this .exchangeName )) {
93
+ this .exchangeName = exchangeName ;
94
+ this .hasExchange = false ;
95
+ }
92
96
}
93
97
94
98
public Integer getPort () {
@@ -183,7 +187,7 @@ public void setRabbitConnection(Connection rabbitConnection) {
183
187
this .rabbitConnection = rabbitConnection ;
184
188
}
185
189
186
- public void init () throws RemRemPublishException {
190
+ public void init () {
187
191
log .info ("RabbitMqProperties init ..." );
188
192
if (Boolean .getBoolean (PropertiesConfig .CLI_MODE )) {
189
193
initCli ();
@@ -238,7 +242,13 @@ public void init() throws RemRemPublishException {
238
242
} catch (NoSuchAlgorithmException e ) {
239
243
log .error (e .getMessage (), e );
240
244
}
241
- checkAndCreateExchangeIfNeeded ();
245
+ try {
246
+ //The exception can be safely handled here as there is a check for existence of exchange is done before each publish.
247
+ checkAndCreateExchangeIfNeeded ();
248
+ } catch (RemRemPublishException e ) {
249
+ log .error ("Error occured while setting up the RabbitMq Connection. " +e .getMessage ());
250
+ e .printStackTrace ();
251
+ }
242
252
}
243
253
244
254
/**
@@ -264,7 +274,8 @@ public void createRabbitMqConnection() throws RemRemPublishException {
264
274
}
265
275
} catch (IOException | TimeoutException e ) {
266
276
log .error (e .getMessage (), e );
267
- throw new RemRemPublishException ("Failed to create connection for Rabbitmq ::" + factory .getHost () + ":" + factory .getPort ());
277
+ throw new RemRemPublishException ("Failed to create connection for Rabbitmq :: " , factory ,
278
+ e );
268
279
}
269
280
}
270
281
@@ -364,19 +375,26 @@ public void checkAndCreateExchangeIfNeeded() throws RemRemPublishException {
364
375
try {
365
376
connection = factory .newConnection ();
366
377
} catch (final IOException | TimeoutException e ) {
367
- throw new RemRemPublishException ("Exception occurred while creating Rabbitmq connection ::" + factory .getHost () + ":" + factory .getPort () + e .getMessage ());
378
+ throw new RemRemPublishException (
379
+ "Exception occurred while creating Rabbitmq connection :: " , factory , e );
368
380
}
369
381
Channel channel = null ;
370
382
try {
371
383
channel = connection .createChannel ();
372
384
} catch (final IOException e ) {
373
- throw new RemRemPublishException ("Exception occurred while creating Channel with Rabbitmq connection ::" + factory .getHost () + ":" + factory .getPort () + e .getMessage ());
385
+ throw new RemRemPublishException (
386
+ "Exception occurred while creating Channel with Rabbitmq connection ::" ,
387
+ factory , e );
374
388
}
375
389
try {
376
390
channel .exchangeDeclare (exchangeName , "topic" , true );
391
+ log .info ("Exchange {} is created" ,exchangeName );
392
+ hasExchange = true ;
377
393
} catch (final IOException e ) {
378
394
log .info (exchangeName + "failed to create an exchange" );
379
- throw new RemRemPublishException ("Unable to create Exchange with Rabbitmq connection " + exchangeName + factory .getHost () + ":" + factory .getPort () + e .getMessage ());
395
+ throw new RemRemPublishException (
396
+ "Unable to create Exchange with Rabbitmq connection " + exchangeName ,
397
+ factory , e );
380
398
} finally {
381
399
if (channel == null || channel .isOpen ()) {
382
400
try {
@@ -405,25 +423,32 @@ public void checkAndCreateExchangeIfNeeded() throws RemRemPublishException {
405
423
* @throws IOException
406
424
*/
407
425
private boolean hasExchange () throws RemRemPublishException {
408
- log .info ("Exchange is: " + exchangeName );
426
+ if (hasExchange ) {
427
+ log .info ("Exchange is: {}" , exchangeName );
428
+ return true ;
429
+ }
430
+
409
431
Connection connection ;
410
432
try {
411
433
connection = factory .newConnection ();
412
434
} catch (final IOException | TimeoutException e ) {
413
- throw new RemRemPublishException ("Exception occurred while creating Rabbitmq connection ::" + factory .getHost () + factory .getPort () + e .getMessage ());
435
+ throw new RemRemPublishException (
436
+ "Exception occurred while creating Rabbitmq connection :: " , factory , e );
414
437
}
415
438
Channel channel = null ;
416
439
try {
417
440
channel = connection .createChannel ();
418
441
} catch (final IOException e ) {
419
- log .info ("Exchange " + exchangeName + " does not Exist" );
420
- throw new RemRemPublishException ("Exception occurred while creating Channel with Rabbitmq connection ::" + factory .getHost () + factory .getPort () + e .getMessage ());
442
+ throw new RemRemPublishException (
443
+ "Exception occurred while creating Channel with Rabbitmq connection :: " ,
444
+ factory , e );
421
445
}
422
446
try {
423
447
channel .exchangeDeclarePassive (exchangeName );
424
- return true ;
448
+ hasExchange = true ;
449
+ return hasExchange ;
425
450
} catch (final IOException e ) {
426
- log .info ("Exchange " + exchangeName + " does not Exist " );
451
+ log .info ("Exchange " + exchangeName + " was not created " );
427
452
return false ;
428
453
} finally {
429
454
if (channel != null && channel .isOpen ()) {
@@ -437,7 +462,8 @@ private boolean hasExchange() throws RemRemPublishException {
437
462
}
438
463
}
439
464
440
- /**
465
+
466
+ /**
441
467
* This method is used to publish the message to RabbitMQ
442
468
* @param routingKey
443
469
* @param msg is Eiffel Event
@@ -449,6 +475,7 @@ private boolean hasExchange() throws RemRemPublishException {
449
475
public void send (String routingKey , String msg )
450
476
throws IOException , NackException , TimeoutException , RemRemPublishException , IllegalArgumentException {
451
477
Channel channel = giveMeRandomChannel ();
478
+ checkAndCreateExchangeIfNeeded ();
452
479
channel .addShutdownListener (new ShutdownListener () {
453
480
public void shutdownCompleted (ShutdownSignalException cause ) {
454
481
// Beware that proper synchronization is needed here
@@ -483,13 +510,14 @@ public void shutdownCompleted(ShutdownSignalException cause) {
483
510
throw new TimeoutException ("Timeout waiting for ACK " + e .getMessage ());
484
511
} catch (IllegalArgumentException e ) {
485
512
log .error ("Failed to publish message due to " + e .getMessage ());
486
- throw new IllegalArgumentException ("DomainId limit exceeded " + e .getMessage ());
513
+ throw new IllegalArgumentException ("DomainId limit exceeded " + e .getMessage (), e );
487
514
} catch (Exception e ) {
488
515
log .error (e .getMessage (), e );
489
516
if (!channel .isOpen ()&& rabbitConnection .isOpen ()){
490
- throw new RemRemPublishException ("Channel was closed for Rabbitmq connection ::" + factory .getHost () + factory .getPort ());
517
+ throw new RemRemPublishException ("Channel was closed for Rabbitmq connection :: " ,
518
+ factory , e );
491
519
}
492
- throw new IOException ("Failed to publish message due to " + e .getMessage ());
520
+ throw new IOException ("Failed to publish message due to " + e .getMessage (), e );
493
521
}
494
522
}
495
523
0 commit comments