Skip to content

Commit ff9c956

Browse files
vishnu-alapatiVishnu Alapati
andauthored
AMQP connection recovery creates extra connection/channel. (#272)
--------- Co-authored-by: Vishnu Alapati <zalavis@seliiuts03320.seli.gic.ericsson.se>
1 parent 1ac5a9f commit ff9c956

File tree

2 files changed

+56
-22
lines changed

2 files changed

+56
-22
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
## 2.1.1
22
- Implemented the changes to log the eventId and HTTPStatus while the level is INFO.
33
- Implemented the changes to print the user information while the log level is INFO.
4+
- Made changes to resolve extra RabbitMQ connection issue.
45

56
## 2.1.0
67
- Implemented new routing key template for Sepia.

publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/helper/RabbitMqProperties.java

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
import org.slf4j.LoggerFactory;
3131

3232
import com.ericsson.eiffel.remrem.publish.config.PropertiesConfig;
33-
import com.ericsson.eiffel.remrem.publish.exception.RemRemPublishException;
3433
import com.ericsson.eiffel.remrem.publish.exception.NackException;
34+
import com.ericsson.eiffel.remrem.publish.exception.RemRemPublishException;
3535
import com.rabbitmq.client.AMQP.BasicProperties;
36+
import com.rabbitmq.client.AlreadyClosedException;
37+
import com.rabbitmq.client.BlockedListener;
3638
import com.rabbitmq.client.Channel;
3739
import com.rabbitmq.client.Connection;
3840
import com.rabbitmq.client.MessageProperties;
@@ -296,15 +298,36 @@ public void createRabbitMqConnection() throws RemRemPublishException {
296298
}
297299
factory.setConnectionTimeout(tcpTimeOut);
298300
rabbitConnection = factory.newConnection();
301+
rabbitConnection.addShutdownListener(new ShutdownListener() {
302+
@Override
303+
public void shutdownCompleted(ShutdownSignalException cause) {
304+
log.debug("Connection Shutdown completed " + cause.getMessage());
305+
try {
306+
rabbitConnection.close();
307+
} catch (AlreadyClosedException | IOException e) {
308+
// This is intentionally added, if we do not call the close function, connection is not closed properly
309+
// and the connections count is getting increased..
310+
}
311+
}
312+
});
313+
314+
rabbitConnection.addBlockedListener(new BlockedListener() {
315+
public void handleBlocked(String reason) throws IOException {
316+
// Connection is now blocked
317+
log.debug("Connection is blocked " + reason);
318+
}
319+
320+
public void handleUnblocked() throws IOException {
321+
// Connection is now unblocked
322+
}
323+
});
299324
log.info("Connected to RabbitMQ.");
300325
rabbitChannels = new ArrayList<>();
301326
if(channelsCount == null || channelsCount == 0 ) {
302327
channelsCount = DEFAULT_CHANNEL_COUNT;
303328
}
304329
for (int i = 0; i < channelsCount; i++) {
305-
Channel channel = rabbitConnection.createChannel();
306-
channel.confirmSelect();
307-
rabbitChannels.add(channel);
330+
createNewChannel();
308331
}
309332
} catch (IOException | TimeoutException e) {
310333
log.error(e.getMessage(), e);
@@ -313,6 +336,32 @@ public void createRabbitMqConnection() throws RemRemPublishException {
313336
}
314337
}
315338

339+
/**
340+
* This method is used to create Rabbitmq channels
341+
* @throws IOException
342+
*/
343+
private Channel createNewChannel() throws IOException {
344+
Channel channel = rabbitConnection.createChannel();
345+
channel.addShutdownListener(new ShutdownListener() {
346+
public void shutdownCompleted(ShutdownSignalException cause) {
347+
// Beware that proper synchronization is needed here
348+
if (cause.isInitiatedByApplication()) {
349+
log.debug("Shutdown is initiated by application. Ignoring it.");
350+
} else {
351+
log.error("Shutdown is NOT initiated by application.");
352+
log.error(cause.getMessage());
353+
boolean cliMode = Boolean.getBoolean(PropertiesConfig.CLI_MODE);
354+
if (cliMode) {
355+
System.exit(-3);
356+
}
357+
}
358+
}
359+
});
360+
channel.confirmSelect();
361+
rabbitChannels.add(channel);
362+
return channel;
363+
}
364+
316365
private void initCli() {
317366
setValues();
318367
}
@@ -517,21 +566,7 @@ public void send(String routingKey, String msg, String eventId)
517566
throws IOException, NackException, TimeoutException, RemRemPublishException, IllegalArgumentException {
518567
Channel channel = giveMeRandomChannel();
519568
checkAndCreateExchangeIfNeeded();
520-
channel.addShutdownListener(new ShutdownListener() {
521-
public void shutdownCompleted(ShutdownSignalException cause) {
522-
// Beware that proper synchronization is needed here
523-
if (cause.isInitiatedByApplication()) {
524-
log.debug("Shutdown is initiated by application. Ignoring it.");
525-
} else {
526-
log.error("Shutdown is NOT initiated by application.");
527-
log.error(cause.getMessage());
528-
boolean cliMode = Boolean.getBoolean(PropertiesConfig.CLI_MODE);
529-
if (cliMode) {
530-
System.exit(-3);
531-
}
532-
}
533-
}
534-
});
569+
535570
BasicProperties msgProps = usePersitance ? PERSISTENT_BASIC_APPLICATION_JSON
536571
: MessageProperties.BASIC;
537572

@@ -577,9 +612,7 @@ private Channel giveMeRandomChannel() throws RemRemPublishException {
577612
}
578613
}
579614
try {
580-
Channel channel = rabbitConnection.createChannel();
581-
channel.confirmSelect();
582-
rabbitChannels.add(channel);
615+
Channel channel = createNewChannel();
583616
return channel;
584617
} catch (IOException e) {
585618
log.error(e.getMessage(), e);

0 commit comments

Comments
 (0)