Skip to content

Commit 36f9461

Browse files
sudharshan-bandaruDurga Vasaadi
andauthored
events are not processing from queue (#480)
* events are not processing from queue * made changes to send back the events to queue with un-acknowldement when MonogDB connection is Re-establised * Provided the fix for IllegalThreadStateException and done required changes * Added testcases * Added testcases * Added testcases * Added MongoDBConnectionException * Fixed the review comments * Format the code changes * Removed the unused code in current changes * Done the format of the code Co-authored-by: Durga Vasaadi <durga.vasaadi@ericsson.com>
1 parent 8cbaa10 commit 36f9461

File tree

16 files changed

+279
-41
lines changed

16 files changed

+279
-41
lines changed

src/functionaltests/java/com/ericsson/ei/notifications/ttl/TestTTLSteps.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.springframework.util.SocketUtils;
2727

2828
import com.ericsson.ei.exception.AuthenticationException;
29+
import com.ericsson.ei.exception.MongoDBConnectionException;
2930
import com.ericsson.ei.handlers.MongoDBHandler;
3031
import com.ericsson.ei.notifications.InformSubscriber;
3132
import com.ericsson.ei.utils.FunctionalTestBase;
@@ -120,7 +121,7 @@ public void create_subscription_object() throws IOException, JSONException {
120121
}
121122

122123
@When("^I want to inform subscriber$")
123-
public void inform_subscriber() throws IOException, AuthenticationException {
124+
public void inform_subscriber() throws IOException, AuthenticationException, MongoDBConnectionException {
124125
JsonNode aggregatedObject = eventManager.getJSONFromFile(AGGREGATED_OBJECT_FILE_PATH);
125126
informSubscriber.informSubscriber(aggregatedObject.toString(), subscriptionObject);
126127
}

src/main/java/com/ericsson/ei/controller/RuleControllerImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import com.ericsson.ei.controller.model.RuleCheckBody;
3535
import com.ericsson.ei.controller.model.RulesCheckBody;
36+
import com.ericsson.ei.exception.MongoDBConnectionException;
3637
import com.ericsson.ei.jmespath.JmesPathInterface;
3738
import com.ericsson.ei.rules.RulesHandler;
3839
import com.ericsson.ei.services.IRuleCheckService;
@@ -139,7 +140,7 @@ public ResponseEntity<?> createRuleCheckAggregation(
139140
String errorJsonAsString = ResponseMessage.createJsonMessage(errorMessage);
140141
return new ResponseEntity<>(errorJsonAsString, HttpStatus.BAD_REQUEST);
141142
}
142-
} catch (JSONException | IOException e) {
143+
} catch (JSONException | IOException | MongoDBConnectionException e) {
143144
String errorMessage = "Internal Server Error: Failed to generate aggregated object.";
144145
LOGGER.error(errorMessage, e);
145146
String errorJsonAsString = ResponseMessage.createJsonMessage(errorMessage);
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
Copyright 2020 Ericsson AB.
3+
For a full list of individual contributors, please see the commit history.
4+
5+
6+
7+
Licensed under the Apache License, Version 2.0 (the "License");
8+
you may not use this file except in compliance with the License.
9+
You may obtain a copy of the License at
10+
11+
12+
13+
http://www.apache.org/licenses/LICENSE-2.0
14+
15+
16+
17+
Unless required by applicable law or agreed to in writing, software
18+
distributed under the License is distributed on an "AS IS" BASIS,
19+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20+
See the License for the specific language governing permissions and
21+
limitations under the License.
22+
*/
23+
package com.ericsson.ei.exception;
24+
25+
public class MongoDBConnectionException extends Exception {
26+
27+
private static final long serialVersionUID = 2L;
28+
29+
public MongoDBConnectionException() {
30+
super();
31+
}
32+
33+
public MongoDBConnectionException(String message) {
34+
super(message);
35+
}
36+
37+
public MongoDBConnectionException(String message, Throwable e) {
38+
super(message, e);
39+
}
40+
41+
}

src/main/java/com/ericsson/ei/handlers/DownstreamIdRulesHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.springframework.beans.factory.annotation.Autowired;
2424
import org.springframework.stereotype.Component;
2525

26+
import com.ericsson.ei.exception.MongoDBConnectionException;
2627
import com.ericsson.ei.jmespath.JmesPathInterface;
2728
import com.ericsson.ei.rules.RulesObject;
2829
import com.ericsson.ei.waitlist.WaitListStorageHandler;
@@ -49,7 +50,7 @@ public void setJmesPathInterface(JmesPathInterface jmesPathInterface) {
4950
this.jmesPathInterface = jmesPathInterface;
5051
}
5152

52-
public void runIdRules(RulesObject rulesObject, String event) {
53+
public void runIdRules(RulesObject rulesObject, String event) throws MongoDBConnectionException {
5354
if (rulesObject != null && event != null) {
5455
JsonNode idsJsonObj = getIds(rulesObject, event);
5556
List<String> objects;

src/main/java/com/ericsson/ei/handlers/EventHandler.java

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,11 @@
2424
import org.springframework.scheduling.annotation.Async;
2525
import org.springframework.stereotype.Component;
2626

27+
import com.ericsson.ei.exception.MongoDBConnectionException;
2728
import com.ericsson.ei.rules.RulesHandler;
2829
import com.ericsson.ei.rules.RulesObject;
30+
import com.ericsson.ei.utils.MongoDBMonitorThread;
31+
import com.ericsson.ei.utils.SpringContext;
2932
import com.fasterxml.jackson.databind.JsonNode;
3033
import com.fasterxml.jackson.databind.ObjectMapper;
3134
import com.rabbitmq.client.Channel;
@@ -47,11 +50,14 @@ public class EventHandler {
4750
@Autowired
4851
Environment environment;
4952

53+
@Autowired
54+
MongoDBMonitorThread mongoDBMonitorThread;
55+
5056
public RulesHandler getRulesHandler() {
5157
return rulesHandler;
5258
}
5359

54-
public void eventReceived(final String event) {
60+
public void eventReceived(final String event) throws MongoDBConnectionException{
5561
final RulesObject eventRules = rulesHandler.getRulesForEvent(event);
5662
idRulesHandler.runIdRules(eventRules, event);
5763
}
@@ -62,13 +68,45 @@ public void onMessage(final Message message, final Channel channel) throws Excep
6268
final ObjectMapper objectMapper = new ObjectMapper();
6369
final JsonNode node = objectMapper.readTree(messageBody);
6470
final String id = node.get("meta").get("id").toString();
65-
LOGGER.debug("Thread id {} spawned for EventHandler", Thread.currentThread().getId());
66-
LOGGER.debug("Event {} received", id);
67-
68-
eventReceived(messageBody);
6971
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
70-
channel.basicAck(deliveryTag, false);
71-
72-
LOGGER.debug("Event {} processed", id);
73-
}
72+
LOGGER.debug("Thread id {} spawned for EventHandler", Thread.currentThread().getId());
73+
try {
74+
eventReceived(messageBody);
75+
channel.basicAck(deliveryTag, false);
76+
LOGGER.info("Event {} processed", id);
77+
} catch (MongoDBConnectionException mdce) {
78+
if (mdce.getMessage().equalsIgnoreCase("MongoDB Connection down")) {
79+
if (mongoDBMonitorThread.getState() == Thread.State.NEW
80+
|| mongoDBMonitorThread.getState() == Thread.State.TERMINATED) {
81+
// if the previous Thread state is TERMINATED then get a new
82+
// mongoDBMonitorThread instance
83+
synchronized (this) {
84+
if (mongoDBMonitorThread.getState() == Thread.State.TERMINATED) {
85+
mongoDBMonitorThread = SpringContext.getBean(MongoDBMonitorThread.class);
86+
}
87+
// New thread will start to monitor the mongoDB connection status
88+
if (mongoDBMonitorThread.getState() == Thread.State.NEW) {
89+
mongoDBMonitorThread.setMongoDBConnected(false);
90+
mongoDBMonitorThread.start();
91+
}
92+
}
93+
}
94+
// Continue the loop till the mongoDB connection is Re-established
95+
while (!mongoDBMonitorThread.isMongoDBConnected()) {
96+
try {
97+
Thread.sleep(30000);
98+
LOGGER.debug("Waiting for MongoDB connection...");
99+
} catch (InterruptedException ie) {
100+
LOGGER.error("MongoDBMonitorThread got Interrupted");
101+
}
102+
}
103+
}
104+
// once the mongoDB Connection is up event will be sent back to queue with
105+
// un-acknowledgement
106+
channel.basicNack(deliveryTag, false, true);
107+
LOGGER.debug("Sent back the event to queue with un-acknowledgement: " + message.getBody());
108+
} catch (Exception e) {
109+
LOGGER.error("Event is not Re-queued due to exception " + e);
110+
}
111+
}
74112
}

src/main/java/com/ericsson/ei/handlers/ExtractionHandler.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.springframework.beans.factory.annotation.Autowired;
2323
import org.springframework.stereotype.Component;
2424

25+
import com.ericsson.ei.exception.MongoDBConnectionException;
2526
import com.ericsson.ei.jmespath.JmesPathInterface;
2627
import com.ericsson.ei.jsonmerge.MergeHandler;
2728
import com.ericsson.ei.rules.RulesObject;
@@ -60,17 +61,20 @@ public void setObjectHandler(ObjectHandler objectHandler) {
6061
this.objectHandler = objectHandler;
6162
}
6263

63-
public void runExtraction(RulesObject rulesObject, String id, String event, String aggregatedDbObject) {
64+
public void runExtraction(RulesObject rulesObject, String id, String event, String aggregatedDbObject) throws MongoDBConnectionException {
6465
try {
6566
ObjectMapper mapper = new ObjectMapper();
6667
JsonNode aggregatedJsonObject = mapper.readTree(aggregatedDbObject);
6768
runExtraction(rulesObject, id, event, aggregatedJsonObject);
6869
} catch (Exception e) {
69-
LOGGER.info("Failed with extraction.", e);
70+
LOGGER.error("Failed with extraction.", e);
71+
if (e.getMessage().equalsIgnoreCase("MongoDB Connection down")) {
72+
throw new MongoDBConnectionException("MongoDB Connection down");
73+
}
7074
}
7175
}
7276

73-
public void runExtraction(RulesObject rulesObject, String mergeId, String event, JsonNode aggregatedDbObject) {
77+
public void runExtraction(RulesObject rulesObject, String mergeId, String event, JsonNode aggregatedDbObject) throws MongoDBConnectionException {
7478
try {
7579
JsonNode extractedContent = extractContent(rulesObject, event);
7680

@@ -90,6 +94,9 @@ public void runExtraction(RulesObject rulesObject, String mergeId, String event,
9094
}
9195
} catch (Exception e) {
9296
LOGGER.error("Failed to run extraction for event {} , stacktrace {}", event, ExceptionUtils.getStackTrace(e));
97+
if (e.getMessage().equalsIgnoreCase("MongoDB Connection down")) {
98+
throw new MongoDBConnectionException("MongoDB Connection down");
99+
}
93100
}
94101
}
95102

src/main/java/com/ericsson/ei/handlers/IdRulesHandler.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package com.ericsson.ei.handlers;
1818

19+
import com.ericsson.ei.exception.MongoDBConnectionException;
1920
import com.ericsson.ei.jmespath.JmesPathInterface;
2021
import com.ericsson.ei.rules.RulesObject;
2122
import com.ericsson.ei.waitlist.WaitListStorageHandler;
@@ -49,15 +50,16 @@ public void setJmesPathInterface(JmesPathInterface jmesPathInterface) {
4950
this.jmesPathInterface = jmesPathInterface;
5051
}
5152

52-
public void runIdRules(RulesObject rulesObject, String event) {
53+
public void runIdRules(RulesObject rulesObject, String event) throws MongoDBConnectionException {
5354
if (rulesObject != null && event != null) {
5455
JsonNode idsJsonObj = getIds(rulesObject, event);
5556
if (idsJsonObj != null && idsJsonObj.isArray()) {
5657
for (final JsonNode idJsonObj : idsJsonObj) {
5758
final String id = idJsonObj.textValue();
5859
final List<String> aggregatedObjects = matchIdRulesHandler.fetchObjectsById(rulesObject, id);
59-
aggregatedObjects.forEach(
60-
aggregatedObject -> extractionHandler.runExtraction(rulesObject, id, event, aggregatedObject));
60+
for(String aggregatedObject : aggregatedObjects) {
61+
extractionHandler.runExtraction(rulesObject, id, event, aggregatedObject);
62+
}
6163
if (aggregatedObjects.size() == 0) {
6264
if (rulesObject.isStartEventRules()) {
6365
extractionHandler.runExtraction(rulesObject, id, event, (JsonNode) null);

src/main/java/com/ericsson/ei/handlers/MongoDBHandler.java

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.springframework.boot.autoconfigure.mongo.MongoProperties;
3030
import org.springframework.stereotype.Component;
3131

32+
import com.ericsson.ei.exception.MongoDBConnectionException;
3233
import com.fasterxml.jackson.annotation.JsonIgnore;
3334
import com.mongodb.BasicDBObject;
3435
import com.mongodb.Block;
@@ -98,11 +99,17 @@ private void createConnection() {
9899
* @return
99100
*/
100101
public void insertDocument(String dataBaseName, String collectionName, String input) throws MongoWriteException {
101-
MongoCollection<Document> collection = getMongoCollection(dataBaseName, collectionName);
102-
if (collection != null) {
103-
final Document dbObjectInput = Document.parse(input);
104-
collection.insertOne(dbObjectInput);
105-
LOGGER.debug("Object: {}\n was inserted successfully in collection: {} and database {}.", input, collectionName, dataBaseName);
102+
103+
try {
104+
MongoCollection<Document> collection = getMongoCollection(dataBaseName, collectionName);
105+
if (collection != null) {
106+
final Document dbObjectInput = Document.parse(input);
107+
collection.insertOne(dbObjectInput);
108+
LOGGER.debug("Object: {}\n was inserted successfully in collection: {} and database {}.", input, collectionName, dataBaseName);
109+
}
110+
} catch (Exception e) {
111+
LOGGER.error("Failed to insert Object: {} \n in collection: {} and database {}. \n {}", input,
112+
collectionName, dataBaseName, e.getMessage());
106113
}
107114
}
108115

@@ -259,12 +266,18 @@ public boolean dropDocument(String dataBaseName, String collectionName, String c
259266
* @param collectionName
260267
* @param fieldName for index creation field
261268
* @param ttlValue seconds
269+
* @throws MongoDBConnectionException
262270
*/
263-
public void createTTLIndex(String dataBaseName, String collectionName, String fieldName, int ttlValue) {
264-
MongoCollection<Document> collection = getMongoCollection(dataBaseName, collectionName);
265-
IndexOptions indexOptions = new IndexOptions().expireAfter((long) ttlValue, TimeUnit.SECONDS);
266-
collection.createIndex(Indexes.ascending(fieldName), indexOptions);
267-
}
271+
public void createTTLIndex(String dataBaseName, String collectionName, String fieldName, int ttlValue)
272+
throws MongoDBConnectionException {
273+
try {
274+
MongoCollection<Document> collection = getMongoCollection(dataBaseName, collectionName);
275+
IndexOptions indexOptions = new IndexOptions().expireAfter((long) ttlValue, TimeUnit.SECONDS);
276+
collection.createIndex(Indexes.ascending(fieldName), indexOptions);
277+
} catch (Exception e) {
278+
throw new MongoDBConnectionException("MongoDB Connection down");
279+
}
280+
}
268281

269282
private MongoCollection<Document> getMongoCollection(String dataBaseName, String collectionName) {
270283
if (mongoClient == null)
@@ -352,10 +365,19 @@ public void dropDatabase(String databaseName) {
352365
* @return
353366
*/
354367
public boolean checkDocumentExists(String databaseName, String collectionName, String condition) {
355-
MongoDatabase db = mongoClient.getDatabase(databaseName);
356-
MongoCollection<Document> mongoCollection = db.getCollection(collectionName);
357-
Document doc = mongoCollection.find(BasicDBObject.parse(condition)).first();
358-
if (doc == null || doc.isEmpty()) {
368+
369+
try {
370+
MongoDatabase db = mongoClient.getDatabase(databaseName);
371+
MongoCollection<Document> mongoCollection = db.getCollection(collectionName);
372+
Document doc = null;
373+
if (mongoCollection != null) {
374+
doc = mongoCollection.find(BasicDBObject.parse(condition)).first();
375+
}
376+
if (doc == null || doc.isEmpty()) {
377+
return false;
378+
}
379+
} catch (Exception e) {
380+
LOGGER.error("something wrong with MongoDB " + e);
359381
return false;
360382
}
361383
return true;
@@ -386,4 +408,31 @@ public boolean updateDocumentAddToSet(String dataBaseName, String collectionName
386408

387409
return false;
388410
}
411+
412+
413+
/**
414+
*
415+
* This method checks the mongoDB connection status.
416+
*
417+
* @param dataBaseName
418+
* @return true if the connection is up otherwise return false
419+
*/
420+
public boolean checkMongoDbStatus(String dataBaseName) {
421+
MongoDatabase db;
422+
List<String> collectionList;
423+
try {
424+
if (mongoClient == null) {
425+
createConnection();
426+
}
427+
db = mongoClient.getDatabase(dataBaseName);
428+
collectionList = db.listCollectionNames().into(new ArrayList<String>());
429+
} catch (Exception e) {
430+
LOGGER.error("MongoCommandException, Something went wrong with MongoDb connection. Error: " + e);
431+
return false;
432+
}
433+
if (collectionList == null || collectionList.isEmpty()) {
434+
return false;
435+
}
436+
return true;
437+
}
389438
}

src/main/java/com/ericsson/ei/handlers/ObjectHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.springframework.beans.factory.annotation.Value;
2828
import org.springframework.stereotype.Component;
2929

30+
import com.ericsson.ei.exception.MongoDBConnectionException;
3031
import com.ericsson.ei.jmespath.JmesPathInterface;
3132
import com.ericsson.ei.rules.RulesObject;
3233
import com.ericsson.ei.subscription.SubscriptionHandler;
@@ -87,8 +88,9 @@ public class ObjectHandler {
8788
* String representation of event, used to fetch id if not specified
8889
* @param id
8990
* String id is stored together with aggregated object in database
91+
* @throws SubscriptionValidationException
9092
* */
91-
public void insertObject(String aggregatedObject, RulesObject rulesObject, String event, String id) {
93+
public void insertObject(String aggregatedObject, RulesObject rulesObject, String event, String id) throws MongoDBConnectionException {
9294
if (id == null) {
9395
String idRules = rulesObject.getIdRule();
9496
JsonNode idNode = jmespathInterface.runRuleOnEvent(idRules, event);
@@ -105,7 +107,7 @@ public void insertObject(String aggregatedObject, RulesObject rulesObject, Strin
105107
postInsertActions(aggregatedObject, rulesObject, event, id);
106108
}
107109

108-
public void insertObject(JsonNode aggregatedObject, RulesObject rulesObject, String event, String id) {
110+
public void insertObject(JsonNode aggregatedObject, RulesObject rulesObject, String event, String id) throws MongoDBConnectionException {
109111
insertObject(aggregatedObject.toString(), rulesObject, event, id);
110112
}
111113

0 commit comments

Comments
 (0)