Skip to content

Commit cdb5190

Browse files
Implementing "publisher Confirms " in REMReM to get confirmations about the messages sent to MB. (#242)
* Implemented "publisher confirms " in REMReM so that this can be used to get confirmation about the messages sent to MB.
1 parent ed6c5e6 commit cdb5190

File tree

18 files changed

+293
-129
lines changed

18 files changed

+293
-129
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 2.0.25
2+
- Implemented "publisher confirms " in REMReM so that this can be used to get confirmation about the messages sent to MB.
3+
14
## 2.0.24
25
- Updated all the curl commands in documentation
36
- Uplifted eiffel-remrem-parent version from 2.0.6 to 2.0.7.

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<version>2.0.8</version>
1010
</parent>
1111
<properties>
12-
<eiffel-remrem-publish.version>2.0.24</eiffel-remrem-publish.version>
12+
<eiffel-remrem-publish.version>2.0.25</eiffel-remrem-publish.version>
1313
<eiffel-remrem-semantics.version>2.2.3</eiffel-remrem-semantics.version>
1414
</properties>
1515
<artifactId>eiffel-remrem-publish</artifactId>

publish-cli/src/main/java/com/ericsson/eiffel/remrem/publish/cli/CliOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public static void createCLIOptions() {
7979
options.addOption("mp", "messaging_protocol", true, "name of messaging protocol to be used, e.g. eiffel3, eiffelsemantics, default is eiffelsemantics");
8080
options.addOption("domain", "domainId", true, "identifies the domain that produces the event");
8181
options.addOption("cc", "channelsCount", true, "Number of channels connected to message bus, default is 1");
82+
options.addOption("wcto", "wait_for_confirms_timeOut", true, "the timeout for wait for confirms, default is 5000 ms/milliseconds");
8283
options.addOption("ud", "user_domain_suffix", true, "user domain suffix");
8384
options.addOption("v", "lists the versions of publish and all loaded protocols");
8485
options.addOption("tag", "tag", true, "tag to be used in routing key");
@@ -225,6 +226,12 @@ public static void handleMessageBusOptions() throws HandleMessageBusException {
225226
System.setProperty(key, channelsCount);
226227
}
227228

229+
if (commandLine.hasOption("wcto")) {
230+
String timeOut = commandLine.getOptionValue("wcto");
231+
String key = PropertiesConfig.WAIT_FOR_CONFIRMS_TIME_OUT;
232+
System.setProperty(key,timeOut);
233+
}
234+
228235
if (commandLine.hasOption("tls")) {
229236
String tls_ver = commandLine.getOptionValue("tls");
230237
if (tls_ver == null) {
@@ -272,6 +279,7 @@ public static void clearSystemProperties() {
272279
System.clearProperty(PropertiesConfig.TLS);
273280
System.clearProperty(PropertiesConfig.DOMAIN_ID);
274281
System.clearProperty(PropertiesConfig.CHANNELS_COUNT);
282+
System.clearProperty(PropertiesConfig.WAIT_FOR_CONFIRMS_TIME_OUT);
275283
}
276284

277285
/**

publish-cli/src/test/java/com/ericsson/eiffel/remrem/publish/cli/CliOptionsUnitTests.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,20 @@ public void testEnOptionFails() throws Exception {
125125
assertTrue(CliOptions.getErrorCodes().contains(code));
126126
}
127127

128+
public void testWctoOption() throws Exception {
129+
String[] args = {"-f", "/a/b/c/test.file", "test", "-wcto", "5000"};
130+
CliOptions.parse(args);
131+
assertTrue(CliOptions.getErrorCodes().isEmpty());
132+
}
133+
134+
@Test
135+
public void testWctoOptionFails() throws Exception {
136+
String[] args = {"-f", "/a/b/c/test.file", "test", "-wctof", "5000"};
137+
CliOptions.parse(args);
138+
int code = CLIExitCodes.CLI_MISSING_OPTION_EXCEPTION;
139+
assertTrue(CliOptions.getErrorCodes().contains(code));
140+
}
141+
128142
public void testNpOption() throws Exception {
129143
String[] args = {"-f", "/a/b/c/test.file", "test", "-np", "non_persistent"};
130144
CliOptions.parse(args);

publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/config/PropertiesConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public class PropertiesConfig {
1919
public static final String MESSAGE_BUS_PORT = "com.ericsson.eiffel.remrem.publish.messagebus.port";
2020
public static final String VIRTUAL_HOST = "com.ericsson.eiffel.remrem.publish.messagebus.virtualhost";
2121
public static final String CHANNELS_COUNT = "com.ericsson.eiffel.remrem.publish.messagebus.channels";
22+
public static final String WAIT_FOR_CONFIRMS_TIME_OUT = "com.ericsson.eiffel.remrem.publish.messagebus.waitforconfirmstimeout";
2223
public static final String TLS = "com.ericsson.eiffel.remrem.publish.messagebus.tls";
2324
public static final String EXCHANGE_NAME = "com.ericsson.eiffel.remrem.publish.exchange.name";
2425
public static final String USE_PERSISTENCE = "com.ericsson.eiffel.remrem.publish.use.persistence";
@@ -45,7 +46,10 @@ public class PropertiesConfig {
4546
public static final String INVALID_EXCHANGE_MESSAGE_SERVICE = " ExchangeName is not present, To create the exchange specify createExchangeIfNotExisting in application configuration";
4647

4748
public static final String SERVER_DOWN = "Internal Server Error";
49+
public static final String GATEWAY_TIMEOUT = "Gateway Timeout";
4850
public static final String SERVER_DOWN_MESSAGE = "RabbitMQ is down. Please try later";
51+
public static final String MESSAGE_NACK = "Message is nacked";
52+
public static final String TIMEOUT_WAITING_FOR_ACK = "Time out waiting for ACK";
4953
public static final String ROUTING_KEY_GENERATION_FAILED_CONTENT = "Could not prepare Routing key to publish message";
5054
public static final String UNSUCCESSFUL_EVENT_CONTENT = "Please check previous event and try again later";
5155
public static final String RABBITMQ_PROPERTIES_NOT_FOUND = "RabbitMQ properties not found";

publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/config/RabbitMqPropertiesConfig.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,14 @@ private void readSpringProperties() {
115115
rabbitMqProperties.setCreateExchangeIfNotExisting(rabbitmqInstanceObject.get("createExchangeIfNotExisting").asBoolean());
116116
rabbitMqProperties.setDomainId(rabbitmqInstanceObject.get("domainId").asText());
117117
if((rabbitmqInstanceObject.get("channelsCount") != null) ) {
118-
rabbitMqProperties.setChannelsCount(Integer.parseInt(rabbitmqInstanceObject.get("channelsCount").asText()));
118+
rabbitMqProperties.setChannelsCount(
119+
Integer.getInteger(rabbitmqInstanceObject.get("channelsCount").asText(),
120+
RabbitMqProperties.DEFAULT_CHANNEL_COUNT));
121+
}
122+
if((rabbitmqInstanceObject.get("waitForConfirmsTimeOut") != null) ) {
123+
rabbitMqProperties.setWaitForConfirmsTimeOut(Long.getLong(
124+
rabbitmqInstanceObject.get("waitForConfirmsTimeOut").asText(),
125+
RabbitMqProperties.DEFAULT_WAIT_FOR_CONFIRMS_TIMEOUT));
119126
}
120127
rabbitMqPropertiesMap.put(protocol, rabbitMqProperties);
121128
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
Copyright 2022 Ericsson AB.
3+
For a full list of individual contributors, please see the commit history.
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
*/
15+
package com.ericsson.eiffel.remrem.publish.exception;
16+
17+
import java.io.IOException;
18+
19+
public class NackException extends IOException {
20+
21+
private static final long serialVersionUID = 1872149119179038290L;
22+
23+
public NackException(String message) {
24+
super(message);
25+
}
26+
public NackException(String message , Throwable cause) {
27+
super(message, cause);
28+
}
29+
}

publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/helper/RMQHelper.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.io.IOException;
1919
import java.util.HashMap;
2020
import java.util.Map;
21+
import java.util.concurrent.TimeoutException;
2122

2223
import javax.annotation.PostConstruct;
2324
import javax.annotation.PreDestroy;
@@ -29,6 +30,7 @@
2930
import com.ericsson.eiffel.remrem.protocol.MsgService;
3031
import com.ericsson.eiffel.remrem.publish.config.PropertiesConfig;
3132
import com.ericsson.eiffel.remrem.publish.config.RabbitMqPropertiesConfig;
33+
import com.ericsson.eiffel.remrem.publish.exception.NackException;
3234
import com.ericsson.eiffel.remrem.publish.exception.RemRemPublishException;
3335

3436
import ch.qos.logback.classic.Level;
@@ -81,28 +83,30 @@ public void rabbitMqPropertiesInit(String protocol) throws RemRemPublishExceptio
8183
* @param protocol name
8284
* @throws RemRemPublishException
8385
*/
84-
private void protocolInit(String protocol) throws RemRemPublishException{
85-
rabbitMqPropertiesMap.get(protocol).setProtocol(protocol);
86-
rabbitMqPropertiesMap.get(protocol).init();
86+
private void protocolInit(String protocol) throws RemRemPublishException {
87+
RabbitMqProperties rabbitmqProtocolProperties = rabbitMqPropertiesMap.get(protocol);
88+
rabbitmqProtocolProperties.setProtocol(protocol);
89+
rabbitmqProtocolProperties.init();
8790
}
8891

89-
public void send(String routingKey, String msg, MsgService msgService) throws IOException {
92+
public void send(String routingKey, String msg, MsgService msgService) throws IOException, NackException, TimeoutException, RemRemPublishException {
9093
String protocol = msgService.getServiceName();
91-
if(rabbitMqPropertiesMap.get(protocol) != null) {
92-
rabbitMqPropertiesMap.get(protocol).send(routingKey,msg);
94+
RabbitMqProperties rabbitmqProtocolProperties = rabbitMqPropertiesMap.get(protocol);
95+
if (rabbitmqProtocolProperties != null) {
96+
rabbitmqProtocolProperties.send(routingKey, msg);
9397
} else {
94-
log.error("RabbitMq properties not configured for the protocol "+protocol);
98+
log.error("RabbitMq properties not configured for the protocol " + protocol);
9599
}
96100
}
97101

98102
@PreDestroy
99103
public void cleanUp() throws IOException {
100104
log.info("RMQHelper cleanUp ...");
101105
for(String protocol : rabbitMqPropertiesMap.keySet()) {
102-
rabbitMqPropertiesMap.get(protocol).getRabbitConnection();
103-
if (rabbitMqPropertiesMap.get(protocol).getRabbitConnection() != null){
104-
rabbitMqPropertiesMap.get(protocol).getRabbitConnection().close();
105-
rabbitMqPropertiesMap.get(protocol).setRabbitConnection(null);
106+
RabbitMqProperties rabbitmqProtocolProperties = rabbitMqPropertiesMap.get(protocol);
107+
if (rabbitmqProtocolProperties.getRabbitConnection() != null) {
108+
rabbitmqProtocolProperties.getRabbitConnection().close();
109+
rabbitmqProtocolProperties.setRabbitConnection(null);
106110
} else {
107111
log.warn("rabbitConnection is null when cleanUp");
108112
}
@@ -118,4 +122,5 @@ private void handleLogging() {
118122
log.setLevel(Level.OFF);
119123
}
120124
}
125+
121126
}

publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/helper/RabbitMqProperties.java

Lines changed: 68 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import com.ericsson.eiffel.remrem.publish.config.PropertiesConfig;
2828
import com.ericsson.eiffel.remrem.publish.exception.RemRemPublishException;
29+
import com.ericsson.eiffel.remrem.publish.exception.NackException;
2930
import com.rabbitmq.client.AMQP.BasicProperties;
3031
import com.rabbitmq.client.Channel;
3132
import com.rabbitmq.client.Connection;
@@ -51,6 +52,9 @@ public class RabbitMqProperties {
5152
private String domainId;
5253
private Integer channelsCount;
5354
private boolean createExchangeIfNotExisting;
55+
private Long waitForConfirmsTimeOut;
56+
public static final Long DEFAULT_WAIT_FOR_CONFIRMS_TIMEOUT = 5000L;
57+
public static final Integer DEFAULT_CHANNEL_COUNT = 1;
5458

5559
private Connection rabbitConnection;
5660
private String protocol;
@@ -59,6 +63,15 @@ public class RabbitMqProperties {
5963

6064
Logger log = (Logger) LoggerFactory.getLogger(RMQHelper.class);
6165

66+
67+
public Long getWaitForConfirmsTimeOut() {
68+
return waitForConfirmsTimeOut;
69+
}
70+
71+
public void setWaitForConfirmsTimeOut(Long waitForConfirmsTimeOut) {
72+
this.waitForConfirmsTimeOut = waitForConfirmsTimeOut;
73+
}
74+
6275
public String getHost() {
6376
return host;
6477
}
@@ -219,20 +232,24 @@ public void init() throws RemRemPublishException {
219232

220233
/**
221234
* This method is used to create Rabbitmq connection and channels
235+
* @throws RemRemPublishException
222236
*/
223-
public void createRabbitMqConnection() {
237+
public void createRabbitMqConnection() throws RemRemPublishException {
224238
try {
225239
rabbitConnection = factory.newConnection();
226240
log.info("Connected to RabbitMQ.");
227241
rabbitChannels = new ArrayList<>();
228242
if(channelsCount == null || channelsCount == 0 ) {
229-
channelsCount = 1;
243+
channelsCount = DEFAULT_CHANNEL_COUNT;
230244
}
231245
for (int i = 0; i < channelsCount; i++) {
232-
rabbitChannels.add(rabbitConnection.createChannel());
246+
Channel channel = rabbitConnection.createChannel();
247+
channel.confirmSelect();
248+
rabbitChannels.add(channel);
233249
}
234250
} catch (IOException | TimeoutException e) {
235251
log.error(e.getMessage(), e);
252+
throw new RemRemPublishException("Failed to create connection for Rabbitmq ::" + factory.getHost() + ":" + factory.getPort());
236253
}
237254
}
238255

@@ -276,6 +293,9 @@ private void initService() {
276293
if (channelsCount == null ) {
277294
channelsCount = Integer.getInteger(getValuesFromSystemProperties(protocol + ".rabbitmq.channelsCount"));
278295
}
296+
if (waitForConfirmsTimeOut == null ) {
297+
waitForConfirmsTimeOut = Long.getLong(getValuesFromSystemProperties(protocol + ".rabbitmq.waitForConfirmsTimeOut"));
298+
}
279299
}
280300

281301

@@ -285,6 +305,7 @@ private void setValues() {
285305
virtualHost = getValuesFromSystemProperties(PropertiesConfig.VIRTUAL_HOST);
286306
domainId = getValuesFromSystemProperties(PropertiesConfig.DOMAIN_ID);
287307
channelsCount = Integer.getInteger(PropertiesConfig.CHANNELS_COUNT);
308+
waitForConfirmsTimeOut = Long.getLong(PropertiesConfig.WAIT_FOR_CONFIRMS_TIME_OUT);
288309
tlsVer = getValuesFromSystemProperties(PropertiesConfig.TLS);
289310
exchangeName = getValuesFromSystemProperties(PropertiesConfig.EXCHANGE_NAME);
290311
usePersitance = Boolean.getBoolean(PropertiesConfig.USE_PERSISTENCE);
@@ -400,40 +421,60 @@ private boolean hasExchange() throws RemRemPublishException {
400421
* @param routingKey
401422
* @param msg is Eiffel Event
402423
* @throws IOException
424+
* @throws NackException
425+
* @throws TimeoutException
426+
* @throws RemRemPublishException
403427
*/
404-
public void send(String routingKey, String msg) throws IOException {
405-
406-
Channel channel = giveMeRandomChannel();
407-
channel.addShutdownListener(new ShutdownListener() {
408-
public void shutdownCompleted(ShutdownSignalException cause) {
409-
// Beware that proper synchronization is needed here
410-
if (cause.isInitiatedByApplication()) {
411-
log.debug("Shutdown is initiated by application. Ignoring it.");
412-
} else {
413-
log.error("Shutdown is NOT initiated by application.");
414-
log.error(cause.getMessage());
415-
boolean cliMode = Boolean.getBoolean(PropertiesConfig.CLI_MODE);
416-
if (cliMode) {
417-
System.exit(-3);
428+
public void send(String routingKey, String msg)
429+
throws IOException, NackException, TimeoutException, RemRemPublishException {
430+
Channel channel = giveMeRandomChannel();
431+
channel.addShutdownListener(new ShutdownListener() {
432+
public void shutdownCompleted(ShutdownSignalException cause) {
433+
// Beware that proper synchronization is needed here
434+
if (cause.isInitiatedByApplication()) {
435+
log.debug("Shutdown is initiated by application. Ignoring it.");
436+
} else {
437+
log.error("Shutdown is NOT initiated by application.");
438+
log.error(cause.getMessage());
439+
boolean cliMode = Boolean.getBoolean(PropertiesConfig.CLI_MODE);
440+
if (cliMode) {
441+
System.exit(-3);
442+
}
418443
}
419444
}
445+
});
446+
BasicProperties msgProps = MessageProperties.BASIC;
447+
if (usePersitance)
448+
msgProps = MessageProperties.PERSISTENT_BASIC;
449+
try {
450+
channel.basicPublish(exchangeName, routingKey, msgProps, msg.getBytes());
451+
log.info("Published message with size {} bytes on exchange '{}' with routing key '{}'",
452+
msg.getBytes().length, exchangeName, routingKey);
453+
if (waitForConfirmsTimeOut == null || waitForConfirmsTimeOut == 0) {
454+
waitForConfirmsTimeOut = DEFAULT_WAIT_FOR_CONFIRMS_TIMEOUT;
420455
}
421-
});
422-
423-
BasicProperties msgProps = MessageProperties.BASIC;
424-
if (usePersitance)
425-
msgProps = MessageProperties.PERSISTENT_BASIC;
426-
427-
channel.basicPublish(exchangeName, routingKey, msgProps, msg.getBytes());
428-
log.info("Published message with size {} bytes on exchange '{}' with routing key '{}'", msg.getBytes().length,
429-
exchangeName, routingKey);
456+
channel.waitForConfirmsOrDie(waitForConfirmsTimeOut);
457+
} catch (InterruptedException | IOException e) {
458+
log.error("Failed to publish message due to " + e.getMessage());
459+
throw new NackException("The message is nacked due to " + e.getMessage(), e);
460+
} catch (TimeoutException e) {
461+
log.error("Failed to publish message due to " + e.getMessage());
462+
throw new TimeoutException("Timeout waiting for ACK " + e.getMessage());
463+
} catch (Exception e) {
464+
log.error(e.getMessage(), e);
465+
if(!channel.isOpen()&& rabbitConnection.isOpen()){
466+
throw new RemRemPublishException("Channel was closed for Rabbitmq connection ::" + factory.getHost() + factory.getPort());
467+
}
468+
throw new IOException("Failed to publish message due to " + e.getMessage());
469+
}
430470
}
431471

432472
/**
433473
* This method is used to give random channel
434474
* @return channel
475+
* @throws RemRemPublishException
435476
*/
436-
private Channel giveMeRandomChannel() {
477+
private Channel giveMeRandomChannel() throws RemRemPublishException {
437478
if ((rabbitConnection == null || !rabbitConnection.isOpen())) {
438479
createRabbitMqConnection();
439480
}

0 commit comments

Comments
 (0)