Skip to content

Commit 7135b8f

Browse files
author
Vali (Vasile Baluta)
committed
prepare for external aggregation
1 parent c0c6b9f commit 7135b8f

21 files changed

+778
-112
lines changed

pom.xml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@
106106
<artifactId>commons-io</artifactId>
107107
<version>1.3.2</version>
108108
</dependency>
109-
<!-- <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId>
109+
<!-- <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId>
110110
<version>1.7.25</version> </dependency> -->
111111

112112
<!-- https://mvnrepository.com/artifact/org.json/json -->
@@ -116,6 +116,13 @@
116116
<version>20160810</version>
117117
</dependency>
118118

119+
<!-- https://mvnrepository.com/artifact/com.github.wnameless/json-flattener -->
120+
<dependency>
121+
<groupId>com.github.wnameless</groupId>
122+
<artifactId>json-flattener</artifactId>
123+
<version>0.4.1</version>
124+
</dependency>
125+
119126
<dependency>
120127
<groupId>org.mongodb</groupId>
121128
<artifactId>mongo-java-driver</artifactId>
@@ -168,7 +175,7 @@
168175
<artifactId>spring-boot-maven-plugin</artifactId>
169176
</plugin>
170177

171-
<!-- PhoenixNAP RAML Code Generator plugin used to generate sources
178+
<!-- PhoenixNAP RAML Code Generator plugin used to generate sources
172179
from raml -->
173180
<plugin>
174181
<groupId>com.phoenixnap.oss</groupId>
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.ericsson.ei.handlers;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.springframework.beans.factory.annotation.Autowired;
6+
import org.springframework.stereotype.Component;
7+
8+
import com.ericsson.ei.jmespath.JmesPathInterface;
9+
import com.ericsson.ei.jsonmerge.DownstreamMergeHandler;
10+
import com.ericsson.ei.rules.RulesObject;
11+
import com.fasterxml.jackson.databind.JsonNode;
12+
import com.fasterxml.jackson.databind.ObjectMapper;
13+
14+
@Component
15+
public class DownstreamExtractionHandler {
16+
17+
private static Logger log = LoggerFactory.getLogger(DownstreamExtractionHandler.class);
18+
19+
@Autowired private JmesPathInterface jmesPathInterface;
20+
@Autowired private DownstreamMergeHandler mergeHandler;
21+
@Autowired private ObjectHandler objectHandler;
22+
23+
public void runExtraction(RulesObject rulesObject, String mergeId, String event, String aggregatedDbObject) {
24+
try {
25+
ObjectMapper mapper = new ObjectMapper();
26+
JsonNode aggregatedJsonObject = mapper.readValue(aggregatedDbObject, JsonNode.class);
27+
runExtraction(rulesObject, mergeId, event, aggregatedJsonObject);
28+
} catch (Exception e) {
29+
log.info(e.getMessage(),e);
30+
}
31+
}
32+
33+
public void runExtraction(RulesObject rulesObject, String mergeId, String event, JsonNode aggregatedDbObject) {
34+
JsonNode extractedContent;
35+
extractedContent = extractContent(rulesObject, event);
36+
37+
if(aggregatedDbObject != null) {
38+
String objectId = objectHandler.extractObjectId(aggregatedDbObject);
39+
String mergedContent = mergeHandler.mergeObject(objectId, mergeId, rulesObject, event, extractedContent);
40+
}
41+
}
42+
43+
private JsonNode extractContent(RulesObject rulesObject, String event) {
44+
String extractionRules = rulesObject.getDownstreamExtractionRules();
45+
return jmesPathInterface.runRuleOnEvent(extractionRules, event);
46+
}
47+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package com.ericsson.ei.handlers;
2+
3+
import java.util.ArrayList;
4+
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
import org.springframework.beans.factory.annotation.Autowired;
8+
import org.springframework.stereotype.Component;
9+
10+
import com.ericsson.ei.jmespath.JmesPathInterface;
11+
import com.ericsson.ei.rules.RulesObject;
12+
import com.ericsson.ei.waitlist.WaitListStorageHandler;
13+
import com.fasterxml.jackson.databind.JsonNode;
14+
15+
@Component
16+
public class DownstreamIdRulesHandler {
17+
18+
static Logger log = (Logger) LoggerFactory.getLogger(DownstreamIdRulesHandler.class);
19+
20+
@Autowired
21+
private JmesPathInterface jmesPathInterface;
22+
23+
@Autowired
24+
private MatchIdRulesHandler matchIdRulesHandler;
25+
26+
@Autowired
27+
private DownstreamExtractionHandler downstreamExtractionHandler;
28+
29+
@Autowired
30+
private WaitListStorageHandler waitListStorageHandler;
31+
32+
public void setJmesPathInterface(JmesPathInterface jmesPathInterface) {
33+
this.jmesPathInterface = jmesPathInterface;
34+
}
35+
36+
public void runIdRules(RulesObject rulesObject, String event) {
37+
JsonNode idsJsonObj = getIds(rulesObject, event);
38+
ArrayList<String> objects = null;
39+
String id;
40+
if (idsJsonObj != null && idsJsonObj.isArray()) {
41+
for (final JsonNode idJsonObj : idsJsonObj) {
42+
id = idJsonObj.textValue();
43+
objects = matchIdRulesHandler.fetchObjectsById(rulesObject, id);
44+
for (String object:objects) {
45+
downstreamExtractionHandler.runExtraction(rulesObject, id, event, object);
46+
}
47+
if (objects.size() == 0) {
48+
try {
49+
waitListStorageHandler.addEventToWaitList(event, rulesObject);
50+
} catch (Exception e) {
51+
log.info(e.getMessage(),e);
52+
}
53+
}
54+
}
55+
}
56+
}
57+
58+
public JsonNode getIds(RulesObject rulesObject, String event) {
59+
String idRule = rulesObject.getDownstreamIdentifyRules();
60+
JsonNode ids = null;
61+
if (idRule != null && !idRule.isEmpty()) {
62+
try {
63+
ids = jmesPathInterface.runRuleOnEvent(idRule, event);
64+
} catch (Exception e) {
65+
log.info(e.getMessage(),e);
66+
}
67+
}
68+
69+
return ids;
70+
}
71+
}

src/main/java/com/ericsson/ei/handlers/EventHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,13 @@ public class EventHandler {
3131
@Autowired
3232
IdRulesHandler idRulesHandler;
3333

34+
@Autowired
35+
DownstreamIdRulesHandler downstreamIdRulesHandler;
36+
3437
public void eventReceived(String event) {
3538
RulesObject eventRules = rulesHandler.getRulesForEvent(event);
3639
idRulesHandler.runIdRules(eventRules, event);
40+
downstreamIdRulesHandler.runIdRules(eventRules, event);
3741
}
3842

3943
public void eventReceived(byte[] message) {
@@ -57,5 +61,4 @@ public void onMessage(Message message, Channel channel) throws Exception {
5761
channel.basicAck(deliveryTag, false);
5862
int breakHere = 1;
5963
}
60-
6164
}

src/main/java/com/ericsson/ei/handlers/ExtractionHandler.java

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
package com.ericsson.ei.handlers;
22

3-
import java.io.IOException;
4-
5-
import org.json.JSONException;
6-
import org.json.JSONObject;
73
import org.slf4j.Logger;
84
import org.slf4j.LoggerFactory;
95
import org.springframework.beans.factory.annotation.Autowired;
@@ -12,8 +8,6 @@
128
import com.ericsson.ei.jmespath.JmesPathInterface;
139
import com.ericsson.ei.jsonmerge.MergeHandler;
1410
import com.ericsson.ei.rules.RulesObject;
15-
import com.fasterxml.jackson.core.JsonParseException;
16-
import com.fasterxml.jackson.databind.JsonMappingException;
1711
import com.fasterxml.jackson.databind.JsonNode;
1812
import com.fasterxml.jackson.databind.ObjectMapper;
1913

@@ -36,13 +30,9 @@ public void setMergeHandler(MergeHandler mergeHandler) {
3630
this.mergeHandler = mergeHandler;
3731
}
3832

39-
// public void setProcessRulesHandler(ProcessRulesHandler processRulesHandler) {
40-
// this.processRulesHandler = processRulesHandler;
41-
// }
42-
//
43-
// public void setHistoryIdRulesHandler(HistoryIdRulesHandler historyIdRulesHandler) {
44-
// this.historyIdRulesHandler = historyIdRulesHandler;
45-
// }
33+
public void setProcessRulesHandler(ProcessRulesHandler processRulesHandler) {
34+
this.processRulesHandler = processRulesHandler;
35+
}
4636

4737
public void setObjectHandler(ObjectHandler objectHandler) {
4838
this.objectHandler = objectHandler;
@@ -58,23 +48,24 @@ public void runExtraction(RulesObject rulesObject, String id, String event, Stri
5848
}
5949
}
6050

61-
public void runExtraction(RulesObject rulesObject, String id, String event, JsonNode aggregatedDbObject) {
51+
public void runExtraction(RulesObject rulesObject, String mergeId, String event, JsonNode aggregatedDbObject) {
6252
JsonNode extractedContent;
6353
extractedContent = extractContent(rulesObject, event);
6454

6555
if(aggregatedDbObject != null) {
6656
String objectId = objectHandler.extractObjectId(aggregatedDbObject);
67-
String mergedContent = mergeHandler.mergeObject(objectId, rulesObject, event, extractedContent);
68-
mergedContent = processRulesHandler.runProcessRules(event, rulesObject, mergedContent, objectId);
57+
String mergedContent = mergeHandler.mergeObject(objectId, mergeId, rulesObject, event, extractedContent);
58+
mergedContent = processRulesHandler.runProcessRules(event, rulesObject, mergedContent, objectId, mergeId);
6959
//historyIdRulesHandler.runHistoryIdRules(aggregationObject, rulesObject, event);
7060
} else {
7161
mergeHandler.addNewObject(event, extractedContent, rulesObject);
7262
}
7363
}
7464

7565
private JsonNode extractContent(RulesObject rulesObject, String event) {
76-
String extractonRules;
77-
extractonRules = rulesObject.getExtractionRules();
78-
return jmesPathInterface.runRuleOnEvent(extractonRules, event);
66+
String extractionRules;
67+
extractionRules = rulesObject.getExtractionRules();
68+
return jmesPathInterface.runRuleOnEvent(extractionRules, event);
7969
}
70+
8071
}

src/main/java/com/ericsson/ei/handlers/IdRulesHandler.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,6 @@ public class IdRulesHandler {
3030
@Autowired
3131
private WaitListStorageHandler waitListStorageHandler;
3232

33-
@Autowired
34-
private ObjectHandler objectHandler;
35-
3633
public void setJmesPathInterface(JmesPathInterface jmesPathInterface) {
3734
this.jmesPathInterface = jmesPathInterface;
3835
}
@@ -41,7 +38,7 @@ public void runIdRules(RulesObject rulesObject, String event) {
4138
JsonNode idsJsonObj = getIds(rulesObject, event);
4239
ArrayList<String> objects = null;
4340
String id;
44-
if (idsJsonObj.isArray()) {
41+
if (idsJsonObj != null && idsJsonObj.isArray()) {
4542
for (final JsonNode idJsonObj : idsJsonObj) {
4643
id = idJsonObj.textValue();
4744
objects = matchIdRulesHandler.fetchObjectsById(rulesObject, id);
@@ -66,10 +63,12 @@ public void runIdRules(RulesObject rulesObject, String event) {
6663
public JsonNode getIds(RulesObject rulesObject, String event) {
6764
String idRule = rulesObject.getIdentifyRules();
6865
JsonNode ids = null;
69-
try {
70-
ids = jmesPathInterface.runRuleOnEvent(idRule, event);
71-
} catch (Exception e) {
72-
int stop = 0;
66+
if (idRule != null && !idRule.isEmpty()) {
67+
try {
68+
ids = jmesPathInterface.runRuleOnEvent(idRule, event);
69+
} catch (Exception e) {
70+
log.info(e.getMessage(),e);
71+
}
7372
}
7473

7574
return ids;

src/main/java/com/ericsson/ei/handlers/ProcessRulesHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ public void setMergeHandler(MergeHandler mergeHandler) {
2929
this.mergeHandler = mergeHandler;
3030
}
3131

32-
public String runProcessRules(String event, RulesObject rulesObject, String aggregationObject, String objectId) {
32+
public String runProcessRules(String event, RulesObject rulesObject, String aggregationObject, String objectId, String mergeId) {
3333
String processRules = rulesObject.fetchProcessRules();
3434
if (processRules != null) {
3535
log.info("processRules: " + processRules);
3636
log.info("aggregationObject: " + aggregationObject);
3737
log.info("event: " + event);
3838
JsonNode ruleResult = jmespath.runRuleOnEvent(processRules, aggregationObject);
39-
String aggregatedObject = mergeHandler.mergeObject(objectId, rulesObject, event, ruleResult);
39+
String aggregatedObject = mergeHandler.mergeObject(objectId, mergeId, rulesObject, event, ruleResult);
4040
return aggregatedObject;
4141
}
4242

src/main/java/com/ericsson/ei/jmespath/JmesPathInterface.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public JmesPathInterface() {
2828

2929
public JsonNode runRuleOnEvent(String rule, String input) {
3030
JsonNode event = null;
31+
if (input == null)
32+
input = "";
3133
Expression<JsonNode> expression = jmespath.compile(rule);
3234
ObjectMapper objectMapper = new ObjectMapper();
3335
try {
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.ericsson.ei.jsonmerge;
2+
3+
import org.springframework.stereotype.Component;
4+
5+
import com.ericsson.ei.rules.RulesObject;
6+
7+
@Component
8+
public class DownstreamMergeHandler extends MergeHandler {
9+
10+
protected String getMergeRules(RulesObject rules) {
11+
return rules.getDownstreamMergeRules();
12+
}
13+
}

src/main/java/com/ericsson/ei/jsonmerge/MergeHandler.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.ericsson.ei.rules.RulesObject;
77

88
import com.ericsson.ei.waitlist.WaitListStorageHandler;
9+
import com.fasterxml.jackson.core.JsonProcessingException;
910
import com.fasterxml.jackson.databind.JsonNode;
1011
import com.fasterxml.jackson.databind.ObjectMapper;
1112

@@ -16,6 +17,7 @@
1617
import org.springframework.beans.factory.annotation.Value;
1718
import org.springframework.stereotype.Component;
1819

20+
import java.io.IOException;
1921
import java.util.*;
2022

2123
@Component
@@ -46,15 +48,15 @@ public void setMergeIdMarker(String marker) {
4648
mergeIdMarker = marker;
4749
}
4850

49-
public String mergeObject(String id, RulesObject rules, String event, JsonNode objectToMerge){
51+
public String mergeObject(String id, String mergeId, RulesObject rules, String event, JsonNode objectToMerge){
5052
String mergedObject = null;
5153
String preparedToMergeObject;
5254
try{
5355
// lock and get the AggregatedObject
5456
String aggregatedObject = (String) getAggregatedObject(id);
55-
String mergeRule = (String) rules.getMergeRules();
57+
String mergeRule = getMergeRules(rules);
5658
if (mergeRule != null && !mergeRule.isEmpty()){
57-
String updatedRule = (String) replaceIdMarkerInRules(mergeRule, id);
59+
String updatedRule = (String) replaceIdMarkerInRules(mergeRule, mergeId);
5860
String ruleForMerge = jmesPathInterface.runRuleOnEvent(updatedRule, event).toString();
5961
String mergePath = (String) prepareMergePrepareObject.getMergePath(aggregatedObject, ruleForMerge);
6062
preparedToMergeObject = (String) prepareMergePrepareObject.addMissingLevels(aggregatedObject,
@@ -71,10 +73,18 @@ public String mergeObject(String id, RulesObject rules, String event, JsonNode o
7173
return mergedObject;
7274
}
7375

76+
protected String getMergeRules(RulesObject rules) {
77+
return rules.getMergeRules();
78+
}
79+
7480
public String replaceIdMarkerInRules(String rule, String id){
75-
String literal = "`\"" + id + "\"`";
76-
String updatedRule = rule.replaceAll(mergeIdMarker, literal);
77-
return updatedRule;
81+
String literal = "\"" + id + "\"";
82+
if (rule.contains(mergeIdMarker)) {
83+
String updatedRule = rule.replaceAll(mergeIdMarker, literal);
84+
updatedRule = "`" + updatedRule + "`";
85+
return updatedRule;
86+
}
87+
return rule;
7888
}
7989

8090
public String mergeContentToObject(String aggregatedObject, String preparedObject){

0 commit comments

Comments
 (0)