Skip to content

Commit d5fd9f5

Browse files
vrpanchalVishalkumar Panchal
andauthored
Fixing missing events issues (#508)
* Rebasing with eiffel-community/master Fixing missing events issues * Revert "Fixing missing events issues" This reverts commit 2d3571f. * Fixed Missing events issues * Refactoring as per review comments * Refactoring as per review comments * Refectoring as per review comments * Printing environment vars in travis * Printing environment vars in travis with echo * Refactoring code * Removing echo statements * Removing printenv * Printing printenv * Removing printenv * Resolving conflicts * Refectoring as per review comments * Changing project version to 3.2.0 * Refectoring as per review comments Co-authored-by: Vishalkumar Panchal <vishalkumar.panchal@ericsson.com>
1 parent a1bd6ad commit d5fd9f5

20 files changed

+123
-72
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<modelVersion>4.0.0</modelVersion>
77
<groupId>com.github.ericsson</groupId>
88
<artifactId>eiffel-intelligence</artifactId>
9-
<version>3.1.3</version>
9+
<version>3.2.0</version>
1010
<packaging>war</packaging>
1111

1212
<parent>

src/main/java/com/ericsson/ei/controller/RuleTestControllerImpl.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import javax.servlet.http.HttpServletRequest;
66

7-
import com.ericsson.ei.exception.InvalidRulesException;
87
import org.json.JSONArray;
98
import org.json.JSONException;
109
import org.json.JSONObject;
@@ -20,10 +19,12 @@
2019

2120
import com.ericsson.ei.controller.model.RuleCheckBody;
2221
import com.ericsson.ei.controller.model.RulesCheckBody;
22+
import com.ericsson.ei.exception.InvalidRulesException;
2323
import com.ericsson.ei.jmespath.JmesPathInterface;
2424
import com.ericsson.ei.rules.IRuleTestService;
2525
import com.ericsson.ei.utils.ResponseMessage;
2626

27+
import io.netty.util.internal.StringUtil;
2728
import io.swagger.annotations.ApiOperation;
2829
import io.swagger.annotations.ApiParam;
2930
import lombok.Setter;
@@ -85,8 +86,14 @@ public ResponseEntity<?> createRuleTestRunFullAggregation(
8586
final HttpServletRequest httpRequest) {
8687
if (testEnabled) {
8788
try {
88-
String aggregatedObject = ruleTestService.prepareAggregatedObject(
89-
new JSONArray(body.getListRulesJson()), new JSONArray(body.getListEventsJson()));
89+
String aggregatedObject = StringUtil.EMPTY_STRING;
90+
try {
91+
aggregatedObject = ruleTestService.prepareAggregatedObject(
92+
new JSONArray(body.getListRulesJson()), new JSONArray(body.getListEventsJson()));
93+
} catch (Exception e) {
94+
String errorMessage = "Failed to generate aggregated object.";
95+
LOGGER.error(errorMessage, e);
96+
}
9097
if (aggregatedObject != null && !aggregatedObject.equals("[]")) {
9198
return new ResponseEntity<>(aggregatedObject, HttpStatus.OK);
9299
} else {
@@ -96,11 +103,7 @@ public ResponseEntity<?> createRuleTestRunFullAggregation(
96103
return new ResponseEntity<>(errorJsonAsString, HttpStatus.BAD_REQUEST);
97104
}
98105
}
99-
catch (InvalidRulesException e) {
100-
String errorJsonAsString = ResponseMessage.createJsonMessage(e.getMessage());
101-
return new ResponseEntity<>(errorJsonAsString, HttpStatus.BAD_REQUEST);
102-
}
103-
catch (JSONException | IOException e) {
106+
catch (JSONException e) {
104107
String errorMessage = "Failed to generate aggregated object.";
105108
LOGGER.error(errorMessage, e);
106109
String errorJsonAsString = ResponseMessage.createJsonMessage(errorMessage);

src/main/java/com/ericsson/ei/erqueryservice/ERQueryService.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ public ResponseEntity sendRequestToER(String eventId, SearchOption searchOption,
9393
return request.performRequest();
9494
}
9595

96-
9796
private HttpRequest prepareRequest(String eventId, SearchOption searchOption, int limit,
9897
int levels, boolean tree, HttpRequest request) throws IOException, URISyntaxException {
9998
Boolean shallowParameter;

src/main/java/com/ericsson/ei/handlers/DownstreamExtractionHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121
import org.springframework.beans.factory.annotation.Autowired;
2222
import org.springframework.stereotype.Component;
2323

24+
import com.ericsson.ei.exception.MongoDBConnectionException;
2425
import com.ericsson.ei.jmespath.JmesPathInterface;
2526
import com.ericsson.ei.jsonmerge.DownstreamMergeHandler;
2627
import com.ericsson.ei.rules.RulesObject;
2728
import com.fasterxml.jackson.databind.JsonNode;
2829
import com.fasterxml.jackson.databind.ObjectMapper;
30+
import com.mongodb.MongoExecutionTimeoutException;
2931

3032
@Component
3133
public class DownstreamExtractionHandler {
@@ -35,6 +37,7 @@ public class DownstreamExtractionHandler {
3537
@Autowired private JmesPathInterface jmesPathInterface;
3638
@Autowired private DownstreamMergeHandler mergeHandler;
3739
@Autowired private ObjectHandler objectHandler;
40+
3841

3942
public void runExtraction(RulesObject rulesObject, String mergeId, String event, String aggregatedDbObject) {
4043
try {
@@ -47,7 +50,8 @@ public void runExtraction(RulesObject rulesObject, String mergeId, String event,
4750
}
4851
}
4952

50-
public void runExtraction(RulesObject rulesObject, String mergeId, String event, JsonNode aggregatedDbObject) {
53+
public void runExtraction(RulesObject rulesObject, String mergeId, String event, JsonNode aggregatedDbObject)
54+
throws MongoExecutionTimeoutException, MongoDBConnectionException {
5155
JsonNode extractedContent;
5256
extractedContent = extractContent(rulesObject, event);
5357
LOGGER.debug("Start extraction of Aggregated Object:\n{} \nwith Event:\n{}", aggregatedDbObject.toString(), event);

src/main/java/com/ericsson/ei/handlers/EventHandler.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616
*/
1717
package com.ericsson.ei.handlers;
1818

19-
import com.ericsson.ei.exception.MongoDBConnectionException;
20-
import com.ericsson.ei.rules.IdRulesHandler;
19+
import org.apache.http.conn.HttpHostConnectException;
2120
import org.slf4j.Logger;
2221
import org.slf4j.LoggerFactory;
2322
import org.springframework.amqp.core.Message;
@@ -26,13 +25,16 @@
2625
import org.springframework.scheduling.annotation.Async;
2726
import org.springframework.stereotype.Component;
2827

28+
import com.ericsson.ei.exception.MongoDBConnectionException;
29+
import com.ericsson.ei.rules.IdRulesHandler;
2930
import com.ericsson.ei.rules.RulesHandler;
3031
import com.ericsson.ei.rules.RulesObject;
32+
import com.ericsson.ei.utils.MongoDBMonitorThread;
33+
import com.ericsson.ei.utils.SpringContext;
3134
import com.fasterxml.jackson.databind.JsonNode;
3235
import com.fasterxml.jackson.databind.ObjectMapper;
36+
import com.mongodb.MongoExecutionTimeoutException;
3337
import com.rabbitmq.client.Channel;
34-
import com.ericsson.ei.utils.MongoDBMonitorThread;
35-
import com.ericsson.ei.utils.SpringContext;
3638

3739
@Component
3840
public class EventHandler {
@@ -58,9 +60,10 @@ public RulesHandler getRulesHandler() {
5860
return rulesHandler;
5961
}
6062

61-
public void eventReceived(String event) throws MongoDBConnectionException {
63+
public void eventReceived(String event, final boolean isRelivered)
64+
throws MongoDBConnectionException, Exception {
6265
RulesObject eventRules = rulesHandler.getRulesForEvent(event);
63-
idRulesHandler.runIdRules(eventRules, event);
66+
idRulesHandler.runIdRules(eventRules, event, isRelivered);
6467
}
6568

6669
@Async
@@ -69,12 +72,13 @@ public void onMessage(Message message, Channel channel) throws Exception {
6972
ObjectMapper objectMapper = new ObjectMapper();
7073
JsonNode node = objectMapper.readTree(messageBody);
7174
String id = node.get("meta").get("id").toString();
72-
75+
final boolean isRedelivered = message.getMessageProperties().isRedelivered();
76+
final int waitBeforeSendBack = 2000;
7377
long deliveryTag = message.getMessageProperties().getDeliveryTag();
7478
LOGGER.debug("Thread id {} spawned for EventHandler", Thread.currentThread().getId());
7579
try {
7680
LOGGER.info("Event {} Received", id);
77-
eventReceived(messageBody);
81+
eventReceived(messageBody, isRedelivered);
7882
channel.basicAck(deliveryTag, false);
7983
LOGGER.info("Event {} processed", id);
8084
} catch (MongoDBConnectionException mdce) {
@@ -108,10 +112,14 @@ public void onMessage(Message message, Channel channel) throws Exception {
108112
// once the mongoDB Connection is up event will be sent back to queue with
109113
// un-acknowledgement
110114
channel.basicNack(deliveryTag, false, true);
111-
LOGGER.info(
112-
"Sent back the event {} to queue with un-acknowledgement due to {}", id, mdce);
115+
LOGGER.info("Sent back the event {} to queue with un-acknowledgement due to {}", id, mdce);
116+
} catch (HttpHostConnectException | MongoExecutionTimeoutException e) {
117+
LOGGER.info("Waiting for {} mili-seconds before sending the event back to queue", waitBeforeSendBack);
118+
Thread.sleep(waitBeforeSendBack);
119+
channel.basicNack(deliveryTag, false, true);
120+
LOGGER.info("Sent back the event {} to queue with un-acknowledgement: ", id);
113121
} catch (Exception e) {
114-
LOGGER.error("Event is not Re-queued due to exception for id: {} Exception: {} ", id, e);
122+
LOGGER.error("Event is not Re-queued due to exception for id: {} Exception: {} ", id, e);
115123
}
116124
}
117125

src/main/java/com/ericsson/ei/handlers/EventToObjectMapHandler.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,7 @@ public void init() throws AbortExecutionException {
7878
LOGGER.error("Failed to create an index for {} due to: {}", collectionName, e);
7979
}
8080
}
81-
82-
81+
8382
public void setCollectionName(String collectionName) {
8483
this.collectionName = collectionName;
8584
}
@@ -112,8 +111,7 @@ public ArrayList<String> getObjectsForEventId(String eventId) {
112111
* @param event
113112
* @param objectId aggregated event object Id
114113
*/
115-
public void updateEventToObjectMapInMemoryDB(RulesObject rulesObject, String event,
116-
String objectId, int ttlValue) {
114+
public void updateEventToObjectMapInMemoryDB(RulesObject rulesObject, String event, String objectId, int ttlValue) {
117115
String eventId = getEventId(rulesObject, event);
118116

119117
final MongoCondition condition = MongoCondition.idCondition(objectId);

src/main/java/com/ericsson/ei/handlers/ExtractionHandler.java

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
*/
1717
package com.ericsson.ei.handlers;
1818

19-
import com.ericsson.ei.rules.ProcessRulesHandler;
19+
import java.io.IOException;
20+
21+
import org.apache.http.conn.HttpHostConnectException;
2022
import org.slf4j.Logger;
2123
import org.slf4j.LoggerFactory;
2224
import org.springframework.beans.factory.annotation.Autowired;
@@ -26,10 +28,12 @@
2628
import com.ericsson.ei.exception.PropertyNotFoundException;
2729
import com.ericsson.ei.jmespath.JmesPathInterface;
2830
import com.ericsson.ei.jsonmerge.MergeHandler;
31+
import com.ericsson.ei.rules.ProcessRulesHandler;
2932
import com.ericsson.ei.rules.RulesObject;
3033
import com.fasterxml.jackson.databind.JsonNode;
3134
import com.fasterxml.jackson.databind.ObjectMapper;
3235
import com.fasterxml.jackson.databind.node.ObjectNode;
36+
import com.mongodb.MongoExecutionTimeoutException;
3337

3438
@Component
3539
public class ExtractionHandler {
@@ -62,23 +66,21 @@ public void setObjectHandler(ObjectHandler objectHandler) {
6266
this.objectHandler = objectHandler;
6367
}
6468

65-
public void runExtraction(RulesObject rulesObject, String id, String event, String aggregatedDbObject) throws MongoDBConnectionException {
69+
public void runExtraction(RulesObject rulesObject, String id, String event, String aggregatedDbObject, boolean isRedelivered)
70+
throws HttpHostConnectException, MongoExecutionTimeoutException, MongoDBConnectionException {
6671
try {
6772
ObjectMapper mapper = new ObjectMapper();
6873
JsonNode aggregatedJsonObject = mapper.readTree(aggregatedDbObject);
69-
runExtraction(rulesObject, id, event, aggregatedJsonObject);
70-
} catch (Exception e) {
71-
LOGGER.error("Failed with extraction.", e);
72-
if (e.getMessage() != null && e.getMessage().equalsIgnoreCase("MongoDB Connection down")) {
73-
throw new MongoDBConnectionException("MongoDB Connection down");
74-
}
74+
runExtraction(rulesObject, id, event, aggregatedJsonObject, isRedelivered);
75+
} catch (IOException e) {
76+
LOGGER.warn("Failed to read the aggregated object due to {} ", e);
7577
}
7678
}
7779

78-
public void runExtraction(RulesObject rulesObject, String mergeId, String event, JsonNode aggregatedDbObject) throws MongoDBConnectionException {
80+
public void runExtraction(RulesObject rulesObject, String mergeId, String event, JsonNode aggregatedDbObject, boolean isRedelivered)
81+
throws HttpHostConnectException, MongoExecutionTimeoutException, MongoDBConnectionException {
7982
try {
8083
JsonNode extractedContent = extractContent(rulesObject, event);
81-
8284
String mergedContent = null;
8385
String aggregatedObjectId = null;
8486

@@ -89,7 +91,17 @@ public void runExtraction(RulesObject rulesObject, String mergeId, String event,
8991
aggregatedDbObject.toString(), extractedContent.toString(), event);
9092
aggregatedObjectId = objectHandler.extractObjectId(aggregatedDbObject);
9193
mergedContent = mergeHandler.mergeObject(aggregatedObjectId, mergeId, rulesObject, event, extractedContent);
92-
mergedContent = processRulesHandler.runProcessRules(event, rulesObject, mergedContent, aggregatedObjectId, mergeId);
94+
if (mergedContent == null) {
95+
return;
96+
}
97+
98+
// Need to extract the history rules for the re-delivered start event type.
99+
if (rulesObject.isStartEventRules() && isRedelivered) {
100+
upStreamEventsHandler.runHistoryExtractionRulesOnAllUpstreamEvents(mergeId);
101+
} else {
102+
mergedContent = processRulesHandler.runProcessRules(event, rulesObject, mergedContent,
103+
aggregatedObjectId, mergeId);
104+
}
93105
} else {
94106
LOGGER.trace("***** Extraction starts for the aggregation Id: " + mergeId);
95107
ObjectNode objectNode = (ObjectNode) extractedContent;
@@ -101,8 +113,9 @@ public void runExtraction(RulesObject rulesObject, String mergeId, String event,
101113
LOGGER.trace("**** Extraction ends for the aggregation Id: " + mergeId);
102114
}
103115
objectHandler.checkAggregations(mergedContent, aggregatedObjectId);
104-
} catch (PropertyNotFoundException e) {
105-
LOGGER.debug("Did not run history extraction on upstream events.", e);
116+
} catch (HttpHostConnectException | MongoExecutionTimeoutException e) {
117+
LOGGER.warn("Extraction failed for {}, due to {}. Sending back to queue.", event, e.getMessage());
118+
throw e;
106119
} catch (Exception e) {
107120
LOGGER.error("Failed to run extraction for event {}", event, e);
108121
if (e.getMessage() != null && e.getMessage().equalsIgnoreCase("MongoDB Connection down")) {

src/main/java/com/ericsson/ei/handlers/ObjectHandler.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222

2323
import javax.annotation.PostConstruct;
2424

25-
import com.ericsson.ei.mongo.*;
26-
import com.fasterxml.jackson.annotation.JsonIgnore;
2725
import org.apache.commons.lang3.StringUtils;
2826
import org.bson.Document;
2927
import org.slf4j.Logger;
@@ -35,8 +33,14 @@
3533
import com.ericsson.ei.exception.AbortExecutionException;
3634
import com.ericsson.ei.exception.MongoDBConnectionException;
3735
import com.ericsson.ei.jmespath.JmesPathInterface;
36+
import com.ericsson.ei.mongo.MongoCondition;
37+
import com.ericsson.ei.mongo.MongoConstants;
38+
import com.ericsson.ei.mongo.MongoDBHandler;
39+
import com.ericsson.ei.mongo.MongoQuery;
40+
import com.ericsson.ei.mongo.MongoQueryBuilder;
3841
import com.ericsson.ei.rules.RulesObject;
3942
import com.ericsson.ei.subscription.SubscriptionHandler;
43+
import com.fasterxml.jackson.annotation.JsonIgnore;
4044
import com.fasterxml.jackson.databind.JsonNode;
4145
import com.fasterxml.jackson.databind.ObjectMapper;
4246
import com.mongodb.BasicDBObject;
@@ -82,7 +86,7 @@ public class ObjectHandler {
8286
@Setter
8387
@Autowired
8488
private SubscriptionHandler subscriptionHandler;
85-
89+
8690
@PostConstruct
8791
public void init() throws AbortExecutionException {
8892
try {
@@ -93,6 +97,7 @@ public void init() throws AbortExecutionException {
9397
LOGGER.error("Failed to create an index for {} due to: {}", aggregationsCollectionName, e);
9498
}
9599
}
100+
96101
/**
97102
* This method is responsible for inserting an aggregated object in to the database.
98103
*
@@ -113,14 +118,15 @@ public String insertObject(String aggregatedObject, RulesObject rulesObject, Str
113118
BasicDBObject document = prepareDocumentForInsertion(id, aggregatedObject);
114119
LOGGER.debug("ObjectHandler: Aggregated Object document to be inserted: {}",
115120
document.toString());
121+
116122
mongoDbHandler.insertDocument(databaseName, aggregationsCollectionName, document.toString());
117123
postInsertActions(aggregatedObject, rulesObject, event, id);
118124
return aggregatedObject;
119125
}
120126

121127
public String insertObject(JsonNode aggregatedObject, RulesObject rulesObject, String event,
122128
String id) throws MongoDBConnectionException {
123-
return insertObject(aggregatedObject.toString(), rulesObject, event, id);
129+
return insertObject(aggregatedObject.toString(), rulesObject, event, id);
124130
}
125131

126132
/**

src/main/java/com/ericsson/ei/handlers/UpStreamEventsHandler.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,18 +66,24 @@ public void setHistoryExtractionHandler(final HistoryExtractionHandler historyEx
6666
* @throws Exception
6767
* @throws PropertyNotFoundException
6868
*/
69-
public void runHistoryExtractionRulesOnAllUpstreamEvents(String aggregatedObjectId) throws PropertyNotFoundException, Exception {
69+
public void runHistoryExtractionRulesOnAllUpstreamEvents(String aggregatedObjectId) throws Exception {
7070

7171
// Use aggregatedObjectId as eventId since they are the same for start
7272
// events.
7373
long start = System.currentTimeMillis();
7474
final ResponseEntity responseEntity = eventRepositoryQueryService
7575
.getEventStreamDataById(aggregatedObjectId, SearchOption.UP_STREAM, -1, -1, true);
76-
7776
long stop = System.currentTimeMillis();
7877
LOGGER.debug("%%%% Response time for upstream query for id: {}: {} ", aggregatedObjectId, stop-start);
79-
78+
LOGGER.debug("ResponseEntity: " + responseEntity);
79+
80+
if (responseEntity == null) {
81+
LOGGER.info("Asked for upstream from {} but got null response entity back!", aggregatedObjectId);
82+
return;
83+
}
84+
8085
final String searchResultString = responseEntity.getBody();
86+
LOGGER.debug("Search result string is: " + searchResultString);
8187
ObjectMapper mapper = new ObjectMapper();
8288
final JsonNode searchResult = mapper.readTree(searchResultString);
8389

0 commit comments

Comments
 (0)