Skip to content

Commit f6fca06

Browse files
debugging multiple subscription notification when repeat flag is off (#309)
* adding synchronized block to avoid race condition
1 parent 97d68fa commit f6fca06

File tree

9 files changed

+159
-151
lines changed

9 files changed

+159
-151
lines changed

src/functionaltests/java/com/ericsson/ei/notifications/trigger/SubscriptionNotificationSteps.java

Lines changed: 72 additions & 100 deletions
Large diffs are not rendered by default.

src/main/java/com/ericsson/ei/subscription/RunSubscription.java

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@
2121
import org.slf4j.Logger;
2222
import org.slf4j.LoggerFactory;
2323
import org.springframework.beans.factory.annotation.Autowired;
24+
import org.springframework.beans.factory.annotation.Value;
2425
import org.springframework.stereotype.Component;
2526

2627
import com.ericsson.ei.jmespath.JmesPathInterface;
2728
import com.fasterxml.jackson.databind.JsonNode;
2829
import com.fasterxml.jackson.databind.node.ArrayNode;
2930

31+
import lombok.Getter;
32+
3033
/**
3134
* This class represents the mechanism to fetch the rule conditions from the
3235
* Subscription Object and match it with the aggregatedObject to check if it is
@@ -45,6 +48,10 @@ public class RunSubscription {
4548

4649
@Autowired
4750
private SubscriptionRepeatDbHandler subscriptionRepeatDbHandler;
51+
52+
@Getter
53+
@Value("${spring.data.mongodb.database}")
54+
public String dataBaseName;
4855

4956
/**
5057
* This method matches every condition specified in the subscription Object and
@@ -71,15 +78,15 @@ public boolean runSubscriptionOnObject(String aggregatedObject, Iterator<JsonNod
7178
String subscriptionRepeatFlag = subscriptionJson.get("repeat").asText();
7279

7380
if (id == null) {
74-
LOGGER.error(
75-
"ID has not been passed for given aggregated object. The subscription will be triggered again.");
81+
LOGGER.debug(
82+
"ID has not been passed for given aggregated object. The subscription will be triggered again.");
7683
}
7784

78-
if (subscriptionRepeatFlag == "false" && id != null
79-
&& subscriptionRepeatDbHandler.checkIfAggrObjIdExistInSubscriptionAggrIdsMatchedList(
80-
subscriptionName, requirementIndex, id)) {
81-
LOGGER.info("Subscription has already matched with AggregatedObject Id: {}\n"
82-
+ "SubscriptionName: {}\nand has Subscription Repeat flag set to: {}",
85+
if (subscriptionRepeatFlag.equals("false") && id != null && subscriptionRepeatDbHandler
86+
.checkIfAggrObjIdExistInSubscriptionAggrIdsMatchedList(subscriptionName, requirementIndex, id)) {
87+
LOGGER.debug(
88+
"Subscription has already matched with AggregatedObject Id: {}\n"
89+
+ "SubscriptionName: {}\nand has Subscription Repeat flag set to: {}",
8390
id, subscriptionName, subscriptionRepeatFlag);
8491
break;
8592
}
@@ -103,19 +110,28 @@ public boolean runSubscriptionOnObject(String aggregatedObject, Iterator<JsonNod
103110
boolean resultNotEmpty = !resultString.equals("");
104111
boolean isFulfilled = resultNotEqualsToNull && resultNotEqualsToFalse && resultNotEmpty;
105112
String fulfilledStatement = String.format("Condition was %sfulfilled.", isFulfilled ? "" : "not ");
106-
LOGGER.debug("Condition: {}\nJMESPath evaluation result: {}\n{}",
107-
condition, result.toString(), fulfilledStatement);
113+
LOGGER.debug("Condition: {}\nJMESPath evaluation result: {}\n{}", condition, result.toString(),
114+
fulfilledStatement);
108115
if (resultNotEqualsToNull && resultNotEqualsToFalse && resultNotEmpty) {
109116
count_condition_fulfillment++;
110117
}
111118
}
112119

113120
if (count_conditions != 0 && count_condition_fulfillment == count_conditions) {
114121
conditionFulfilled = true;
115-
if (subscriptionJson.get("repeat").toString() == "false" && id != null) {
116-
LOGGER.debug("Adding matched AggrObj id to SubscriptionRepeatFlagHandlerDb.");
117-
subscriptionRepeatDbHandler.addMatchedAggrObjToSubscriptionId(subscriptionName,
118-
requirementIndex, id);
122+
if (subscriptionRepeatFlag.equals("false") && id != null) {
123+
// the keyword 'synchronized' ensures that this part of the code run
124+
// synchronously. Thus avoids race condition.
125+
synchronized (this) {
126+
if (!subscriptionRepeatDbHandler.checkIfAggrObjIdExistInSubscriptionAggrIdsMatchedList(
127+
subscriptionName, requirementIndex, id)) {
128+
LOGGER.debug("Adding matched aggregated object to database:" + dataBaseName);
129+
subscriptionRepeatDbHandler.addMatchedAggrObjToSubscriptionId(subscriptionName,
130+
requirementIndex, id);
131+
} else {
132+
conditionFulfilled = false;
133+
}
134+
}
119135
}
120136
}
121137

src/test/java/com/ericsson/ei/flowtests/FlowSourceChangeObject.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@
3232
import org.mockito.Mock;
3333
import org.mockito.MockitoAnnotations;
3434
import org.springframework.beans.factory.annotation.Autowired;
35+
import org.springframework.boot.test.context.SpringBootContextLoader;
3536
import org.springframework.boot.test.context.SpringBootTest;
37+
import org.springframework.http.HttpStatus;
38+
import org.springframework.http.ResponseEntity;
39+
import org.springframework.test.context.ContextConfiguration;
3640
import org.springframework.test.context.TestExecutionListeners;
3741
import org.springframework.test.context.TestPropertySource;
3842
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@@ -42,7 +46,7 @@
4246
import com.ericsson.ei.erqueryservice.ERQueryService;
4347
import com.ericsson.ei.erqueryservice.SearchOption;
4448
import com.ericsson.ei.handlers.UpStreamEventsHandler;
45-
import com.ericsson.eiffelcommons.utils.ResponseEntity;
49+
import com.ericsson.ei.utils.TestContextInitializer;
4650
import com.fasterxml.jackson.databind.JsonNode;
4751
import com.fasterxml.jackson.databind.ObjectMapper;
4852
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -56,6 +60,7 @@
5660
"missedNotificationDataBaseName: FlowSourceChangeObject-missedNotifications",
5761
"rabbitmq.exchange.name: FlowSourceChangeObject-exchange",
5862
"rabbitmq.consumerName: FlowSourceChangeObjectConsumer" })
63+
@ContextConfiguration(classes = App.class, loader = SpringBootContextLoader.class, initializers = TestContextInitializer.class)
5964
public class FlowSourceChangeObject extends FlowTestBase {
6065
private static final String EVENTS_FILE_PATH = "src/test/resources/TestSourceChangeObjectEvents.json";
6166
private static final String AGGREGATED_OBJECT_FILE_PATH = "src/test/resources/aggregatedSourceChangeObject.json";

src/test/java/com/ericsson/ei/flowtests/FlowTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
"missedNotificationDataBaseName: FlowTest-missedNotifications",
6666
"rabbitmq.exchange.name: FlowTest-exchange",
6767
"rabbitmq.consumerName: FlowTestConsumer" })
68+
6869
public class FlowTest extends FlowTestBase {
6970

7071
private static final String UPSTREAM_RESULT_FILE = "upStreamResultFile.json";

src/test/java/com/ericsson/ei/flowtests/FlowTest2.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
"missedNotificationDataBaseName: FlowTest2-missedNotifications",
4646
"rabbitmq.exchange.name: FlowTest2-exchange",
4747
"rabbitmq.consumerName: FlowTest2Consumer" })
48+
4849
public class FlowTest2 extends FlowTestBase {
4950
private static final String EVENTS_FILE_PATH = "src/test/resources/test_events.json";
5051
private static final String AGGREGATED_OBJECT_FILE_PATH_1 = "src/test/resources/AggregatedDocument.json";

src/test/java/com/ericsson/ei/flowtests/FlowTestBase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,12 @@ protected int extraEventsCount() {
131131
public void flowTest() throws Exception {
132132
try {
133133
String queueName = rmqHandler.getQueueName();
134-
Channel channel = TestConfigs.getConn().createChannel();
134+
Channel channel = TestConfigs.getConnection().createChannel();
135135
if (channel == null) {
136136
channel = connectionFactory.createConnection().createChannel(true);
137137
}
138138

139139
String exchangeName = rmqHandler.getExchangeName();
140-
141140
List<String> eventNames = getEventNamesToSend();
142141
JsonNode parsedJSON = getJSONFromFile(getEventsFilePath());
143142
int eventsCount = eventNames.size() + extraEventsCount();

src/test/java/com/ericsson/ei/flowtests/TrafficGeneratedTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@
4242
import org.slf4j.LoggerFactory;
4343
import org.springframework.beans.factory.annotation.Autowired;
4444
import org.springframework.beans.factory.annotation.Value;
45+
import org.springframework.boot.test.context.SpringBootContextLoader;
4546
import org.springframework.boot.test.context.SpringBootTest;
47+
import org.springframework.http.HttpStatus;
48+
import org.springframework.http.ResponseEntity;
49+
import org.springframework.test.context.ContextConfiguration;
4650
import org.springframework.test.context.TestExecutionListeners;
4751
import org.springframework.test.context.TestPropertySource;
4852
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@@ -55,7 +59,7 @@
5559
import com.ericsson.ei.handlers.RmqHandler;
5660
import com.ericsson.ei.handlers.UpStreamEventsHandler;
5761
import com.ericsson.ei.test.utils.TestConfigs;
58-
import com.ericsson.eiffelcommons.utils.ResponseEntity;
62+
import com.ericsson.ei.utils.TestContextInitializer;
5963
import com.fasterxml.jackson.databind.JsonNode;
6064
import com.fasterxml.jackson.databind.ObjectMapper;
6165
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -71,6 +75,7 @@
7175
"missedNotificationDataBaseName: TrafficGeneratedTest-missedNotifications",
7276
"rabbitmq.exchange.name: TrafficGeneratedTest-exchange",
7377
"rabbitmq.consumerName: TrafficGeneratedTestConsumer" })
78+
@ContextConfiguration(classes = App.class, loader = SpringBootContextLoader.class, initializers = TestContextInitializer.class)
7479
public class TrafficGeneratedTest extends FlowTestBase {
7580

7681
private static final Logger LOGGER = LoggerFactory.getLogger(TrafficGeneratedTest.class);
@@ -122,7 +127,7 @@ public void flowTest() {
122127
String queueName = rmqHandler.getQueueName();
123128
String exchange = "ei-poc-4";
124129
TestConfigs.createExchange(exchange, queueName);
125-
Channel channel = TestConfigs.getConn().createChannel();
130+
Channel channel = TestConfigs.getConnection().createChannel();
126131

127132
long timeBefore = System.currentTimeMillis();
128133

src/test/java/com/ericsson/ei/test/utils/TestConfigs.java

Lines changed: 40 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.io.File;
44
import java.io.IOException;
5+
import java.util.concurrent.TimeoutException;
56

67
import org.apache.tomcat.util.codec.binary.Base64;
78
import org.apache.tomcat.util.codec.binary.StringUtils;
@@ -30,12 +31,12 @@ public class TestConfigs {
3031
private static MongodForTestsFactory testsFactory;
3132

3233
@Getter
33-
private static ConnectionFactory cf;
34+
private static ConnectionFactory connectionFactory;
3435

3536
final static Logger LOGGER = LoggerFactory.getLogger(TestConfigs.class);
3637

3738
@Getter
38-
private static Connection conn;
39+
private static Connection connection;
3940

4041
@Getter
4142
private static MongoClient mongoClient = null;
@@ -46,42 +47,24 @@ public static synchronized void init() throws Exception {
4647
}
4748

4849
private static synchronized void setUpMessageBus() throws Exception {
49-
LOGGER.debug("Debug:setting up message buss");
50-
51-
LOGGER.debug("before setting up message buss: amqpBroker: " + amqpBroker + ", conn: " + conn + ",cf:" + cf);
52-
if (amqpBroker != null || conn != null || cf != null) {
50+
LOGGER.debug("Before setting up message bus, amqp broker: " + amqpBroker + ", connection: " + connection + ",connection factory:"
51+
+ connectionFactory);
52+
if (amqpBroker != null || connection != null || connectionFactory != null) {
5353
return;
5454
}
5555

5656
int port = SocketUtils.findAvailableTcpPort();
57-
System.setProperty("rabbitmq.port", "" + port);
58-
System.setProperty("rabbitmq.user", "guest");
59-
System.setProperty("rabbitmq.password", "guest");
60-
System.setProperty("waitlist.initialDelayResend", "500");
61-
System.setProperty("waitlist.fixedRateResend", "100");
57+
setSystemProperties(port);
58+
setupBroker(port);
6259

63-
LOGGER.debug("done setting up message buss properties");
64-
LOGGER.info("setting up message buss");
65-
String config = "src/test/resources/configs/qpidConfig.json";
66-
File qpidConfig = new File(config);
67-
amqpBroker = new AMQPBrokerManager(qpidConfig.getAbsolutePath(), port);
68-
amqpBroker.startBroker();
69-
cf = new ConnectionFactory();
70-
cf.setUsername("guest");
71-
cf.setPassword("guest");
72-
73-
cf.setPort(port);
74-
cf.setHandshakeTimeout(600000);
75-
cf.setConnectionTimeout(600000);
76-
conn = cf.newConnection();
77-
LOGGER.debug("after setting up message buss");
60+
setupConnectionFactory(port);
61+
LOGGER.debug("Setting up message bus done!");
7862
}
7963

8064
public static MongoClient mongoClientInstance() throws Exception {
8165
if (mongoClient == null) {
8266
setUpEmbeddedMongo();
8367
}
84-
8568
return mongoClient;
8669
}
8770

@@ -102,12 +85,13 @@ private static synchronized void setUpEmbeddedMongo() throws IOException {
10285
}
10386

10487
public static void createExchange(final String exchangeName, final String queueName) {
105-
final CachingConnectionFactory ccf = new CachingConnectionFactory(cf);
88+
final CachingConnectionFactory ccf = new CachingConnectionFactory(connectionFactory);
10689
LOGGER.info("Creating exchange: {} and queue: {}", exchangeName, queueName);
107-
RabbitAdmin admin = new RabbitAdmin(ccf);
108-
Queue queue = new Queue(queueName, false);
109-
admin.declareQueue(queue);
90+
final RabbitAdmin admin = new RabbitAdmin(ccf);
91+
final Queue queue = new Queue(queueName, false);
11092
final TopicExchange exchange = new TopicExchange(exchangeName);
93+
94+
admin.declareQueue(queue);
11195
admin.declareExchange(exchange);
11296
admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("#"));
11397
ccf.destroy();
@@ -122,4 +106,29 @@ protected void setRules() {
122106
System.setProperty("rules", " /rules/ArtifactRules-Eiffel-Agen-Version.json");
123107
}
124108

109+
protected static void setSystemProperties(int port) {
110+
System.setProperty("rabbitmq.port", "" + port);
111+
System.setProperty("rabbitmq.user", "guest");
112+
System.setProperty("rabbitmq.password", "guest");
113+
System.setProperty("waitlist.initialDelayResend", "500");
114+
System.setProperty("waitlist.fixedRateResend", "100");
115+
LOGGER.info("Message bus port:{}", port);
116+
}
117+
118+
protected static void setupBroker(int port) throws Exception {
119+
String config = "src/test/resources/configs/qpidConfig.json";
120+
File qpidConfig = new File(config);
121+
amqpBroker = new AMQPBrokerManager(qpidConfig.getAbsolutePath(), port);
122+
amqpBroker.startBroker();
123+
}
124+
125+
protected static void setupConnectionFactory(int port) throws IOException, TimeoutException {
126+
connectionFactory = new ConnectionFactory();
127+
connectionFactory.setUsername("guest");
128+
connectionFactory.setPassword("guest");
129+
connectionFactory.setPort(port);
130+
connectionFactory.setHandshakeTimeout(600000);
131+
connectionFactory.setConnectionTimeout(600000);
132+
connection = connectionFactory.newConnection();
133+
}
125134
}

src/test/java/com/ericsson/ei/waitlist/TestWaitListWorker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public void testRunIfEventExistsInEventObjectMap() {
145145
@Test
146146
public void testPublishAndReceiveEvent() {
147147
try {
148-
Channel channel = TestConfigs.getConn().createChannel();
148+
Channel channel = TestConfigs.getConnection().createChannel();
149149
String queueName = "er001-eiffelxxx.eiffelintelligence.messageConsumer.durable";
150150
String exchange = "ei-poc-4";
151151
createExchange(exchange, queueName);
@@ -168,7 +168,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
168168
}
169169

170170
private void createExchange(final String exchangeName, final String queueName) {
171-
final CachingConnectionFactory ccf = new CachingConnectionFactory(TestConfigs.getCf());
171+
final CachingConnectionFactory ccf = new CachingConnectionFactory(TestConfigs.getConnectionFactory());
172172
RabbitAdmin admin = new RabbitAdmin(ccf);
173173
Queue queue = new Queue(queueName, false);
174174
admin.declareQueue(queue);

0 commit comments

Comments
 (0)