Skip to content

Commit e59a75f

Browse files
Improve internal composition handling
Improve aggregation of data from events prior a start event in aggregation.
1 parent d295f63 commit e59a75f

15 files changed

+808
-578
lines changed

src/main/java/com/ericsson/ei/controller/SubscriptionControllerImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class SubscriptionControllerImpl implements SubscriptionController {
5858
public ResponseEntity<SubscriptionResponse> createSubscription(@RequestBody List<Subscription> subscriptions) {
5959
ResponseEntity<SubscriptionResponse> subResponse = null;
6060
SubscriptionResponse subscriptionResponse = new SubscriptionResponse();
61-
for (Subscription subscription : subscriptions){
61+
for (Subscription subscription : subscriptions) {
6262
subResponse = null;
6363
try {
6464
subscription.setCreated(Instant.now().toEpochMilli());
@@ -69,7 +69,8 @@ public ResponseEntity<SubscriptionResponse> createSubscription(@RequestBody List
6969
LOG.error(msg);
7070
subscriptionResponse.setMsg(msg);
7171
subscriptionResponse.setStatusCode(HttpStatus.PRECONDITION_FAILED.value());
72-
subResponse = new ResponseEntity<SubscriptionResponse>(subscriptionResponse, HttpStatus.PRECONDITION_FAILED);
72+
subResponse = new ResponseEntity<SubscriptionResponse>(subscriptionResponse,
73+
HttpStatus.PRECONDITION_FAILED);
7374
}
7475

7576
if (!subscriptionService.doSubscriptionExist(subscription.getSubscriptionName())) {

src/main/java/com/ericsson/ei/handlers/HistoryExtractionHandler.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,16 @@
1414

1515
package com.ericsson.ei.handlers;
1616

17-
import org.slf4j.Logger;
18-
import org.slf4j.LoggerFactory;
19-
import org.springframework.beans.factory.annotation.Autowired;
20-
2117
import com.ericsson.ei.jmespath.JmesPathInterface;
2218
import com.ericsson.ei.jsonmerge.MergeHandler;
2319
import com.ericsson.ei.jsonmerge.MergePrepare;
2420
import com.ericsson.ei.rules.RulesObject;
2521
import com.fasterxml.jackson.databind.JsonNode;
2622
import com.github.wnameless.json.flattener.JsonFlattener;
2723

24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
import org.springframework.beans.factory.annotation.Autowired;
2827
import org.springframework.stereotype.Component;
2928

3029
// TODO: Auto-generated Javadoc
@@ -67,11 +66,23 @@ public String runHistoryExtraction(String aggregatedObjectId, RulesObject rules,
6766
String ruleString = ruleJson.toString();
6867

6968
// if we need to add append to an array then array_path will not be
70-
// empty so we use it instead passed pathInAggregatedObject
69+
// empty so we use it instead of passed pathInAggregatedObject
7170
String aggregatedObject = mergeHandler.getAggregatedObject(aggregatedObjectId, false);
72-
String array_path = getPathFromExtractedContent(aggregatedObject, ruleString);
71+
String objAtPathStr = "";
72+
String pathTrimmed = mergePrepare.trimLastInPath(pathInAggregatedObject, ".");
73+
try {
74+
pathTrimmed = mergePrepare.makeJmespathArrayIndexes(pathTrimmed);
75+
JsonNode objAtPath = jmesPathInterface.runRuleOnEvent(pathTrimmed, aggregatedObject);
76+
objAtPathStr = objAtPath.toString();
77+
} catch (Exception e) {
78+
log.error(e.getMessage(), e);
79+
}
80+
String array_path = getPathFromExtractedContent(objAtPathStr, ruleString);
81+
7382
if (!array_path.isEmpty()) {
74-
pathInAggregatedObject = array_path;
83+
// pathInAggregatedObject = array_path;
84+
pathInAggregatedObject = pathTrimmed + "." + array_path;
85+
pathInAggregatedObject = MergePrepare.destringify(pathInAggregatedObject);
7586
} else {
7687
String ruleKey = getRulePath(ruleString);
7788
if (pathInAggregatedObject.isEmpty()) {

src/main/java/com/ericsson/ei/handlers/ObjectHandler.java

Lines changed: 45 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,13 @@ public class ObjectHandler {
4343

4444
static Logger log = (Logger) LoggerFactory.getLogger(ObjectHandler.class);
4545

46-
@Getter @Setter
46+
@Getter
47+
@Setter
4748
@Value("${aggregated.collection.name}")
4849
private String collectionName;
4950

50-
@Getter @Setter
51+
@Getter
52+
@Setter
5153
@Value("${database.name}")
5254
private String databaseName;
5355

@@ -66,7 +68,7 @@ public class ObjectHandler {
6668
@Setter
6769
@Autowired
6870
private SubscriptionHandler subscriptionHandler;
69-
71+
7072
@Getter
7173
@Value("${aggregated.collection.ttlValue}")
7274
private int ttlValue;
@@ -81,12 +83,11 @@ public boolean insertObject(String aggregatedObject, RulesObject rulesObject, St
8183
log.debug("ObjectHandler: Aggregated Object document to be inserted: " + document.toString());
8284

8385
mongoDbHandler.createTTLIndex(databaseName, collectionName, "Time", ttlValue);
84-
8586

8687
boolean result = mongoDbHandler.insertDocument(databaseName, collectionName, document.toString());
8788
if (result)
8889
eventToObjectMap.updateEventToObjectMapInMemoryDB(rulesObject, event, id);
89-
subscriptionHandler.checkSubscriptionForObject(aggregatedObject);
90+
subscriptionHandler.checkSubscriptionForObject(aggregatedObject);
9091
return result;
9192
}
9293

@@ -95,12 +96,18 @@ public boolean insertObject(JsonNode aggregatedObject, RulesObject rulesObject,
9596
}
9697

9798
/**
98-
* This method uses previously locked in database aggregatedObject (lock was set in lockDocument method)
99-
* and modifies this document with the new values and removes the lock in one query
100-
* @param aggregatedObject String to insert in database
101-
* @param rulesObject used for fetching id
102-
* @param event String to fetch id if it was not specified
103-
* @param id String
99+
* This method uses previously locked in database aggregatedObject (lock was
100+
* set in lockDocument method) and modifies this document with the new
101+
* values and removes the lock in one query
102+
*
103+
* @param aggregatedObject
104+
* String to insert in database
105+
* @param rulesObject
106+
* used for fetching id
107+
* @param event
108+
* String to fetch id if it was not specified
109+
* @param id
110+
* String
104111
* @return true if operation succeed
105112
*/
106113
public boolean updateObject(String aggregatedObject, RulesObject rulesObject, String event, String id) {
@@ -109,15 +116,15 @@ public boolean updateObject(String aggregatedObject, RulesObject rulesObject, St
109116
JsonNode idNode = jmespathInterface.runRuleOnEvent(idRules, event);
110117
id = idNode.textValue();
111118
}
112-
log.debug("ObjectHandler: Updating Aggregated Object:\n" + aggregatedObject +
113-
"\nEvent:\n" + event);
119+
log.debug("ObjectHandler: Updating Aggregated Object:\n" + aggregatedObject + "\nEvent:\n" + event);
114120
JsonNode document = prepareDocumentForInsertion(id, aggregatedObject);
115121
String condition = "{\"_id\" : \"" + id + "\"}";
116122
String documentStr = document.toString();
117123
boolean result = mongoDbHandler.updateDocument(databaseName, collectionName, condition, documentStr);
118-
if (result)
124+
if (result) {
119125
eventToObjectMap.updateEventToObjectMapInMemoryDB(rulesObject, event, id);
120126
subscriptionHandler.checkSubscriptionForObject(aggregatedObject);
127+
}
121128
return result;
122129
}
123130

@@ -132,9 +139,6 @@ public List<String> findObjectsByCondition(String condition) {
132139
public String findObjectById(String id) {
133140
String condition = "{\"_id\" : \"" + id + "\"}";
134141
String document = findObjectsByCondition(condition).get(0);
135-
// JsonNode result = getAggregatedObject(document);
136-
// if (result != null)
137-
// return result.asText();
138142
return document;
139143
}
140144

@@ -158,54 +162,59 @@ public JsonNode prepareDocumentForInsertion(String id, String object) {
158162

159163
return jsonNode;
160164
} catch (Exception e) {
161-
log.info(e.getMessage(),e);
165+
log.error(e.getMessage(), e);
162166
}
163167
return null;
164168
}
165169

166170
public JsonNode getAggregatedObject(String dbDocument) {
167-
ObjectMapper mapper = new ObjectMapper();
168-
try {
169-
JsonNode documentJson = mapper.readValue(dbDocument, JsonNode.class);
170-
JsonNode objectDoc = documentJson.get("aggregatedObject");
171-
return objectDoc;
172-
} catch (Exception e) {
173-
log.info(e.getMessage(),e);
174-
}
175-
return null;
171+
ObjectMapper mapper = new ObjectMapper();
172+
try {
173+
JsonNode documentJson = mapper.readValue(dbDocument, JsonNode.class);
174+
JsonNode objectDoc = documentJson.get("aggregatedObject");
175+
return objectDoc;
176+
} catch (Exception e) {
177+
log.error(e.getMessage(), e);
178+
}
179+
return null;
176180
}
177181

178182
public String extractObjectId(JsonNode aggregatedDbObject) {
179183
return aggregatedDbObject.get("_id").textValue();
180184
}
181185

182186
/**
183-
* Locks the document in database to achieve pessimistic locking. Method findAndModify is used to optimize
184-
* the quantity of requests towards database.
185-
* @param id String to search
187+
* Locks the document in database to achieve pessimistic locking. Method
188+
* findAndModify is used to optimize the quantity of requests towards
189+
* database.
190+
*
191+
* @param id
192+
* String to search
186193
* @return String aggregated document
187194
*/
188-
public String lockDocument(String id){
195+
public String lockDocument(String id) {
189196
boolean documentLocked = true;
190197
String conditionId = "{\"_id\" : \"" + id + "\"}";
191198
String conditionLock = "[ { \"lock\" : null } , { \"lock\" : \"0\"}]";
192199
String setLock = "{ \"$set\" : { \"lock\" : \"1\"}}";
193200
ObjectMapper mapper = new ObjectMapper();
194-
while (documentLocked==true){
201+
while (documentLocked == true) {
195202
try {
196203
JsonNode documentJson = mapper.readValue(setLock, JsonNode.class);
197204
JsonNode queryCondition = mapper.readValue(conditionId, JsonNode.class);
198205
((ObjectNode) queryCondition).set("$or", mapper.readValue(conditionLock, JsonNode.class));
199-
Document result = mongoDbHandler.findAndModify(databaseName, collectionName, queryCondition.toString(), documentJson.toString());
206+
Document result = mongoDbHandler.findAndModify(databaseName, collectionName, queryCondition.toString(),
207+
documentJson.toString());
200208
if (result != null) {
201-
log.info("DB locked by " + Thread.currentThread().getId() + " thread");
209+
log.debug("DB locked by " + Thread.currentThread().getId() + " thread");
202210
documentLocked = false;
203211
return JSON.serialize(result);
204212
}
205213
// To Remove
206-
log.info("Waiting by " + Thread.currentThread().getId() + " thread");
214+
log.debug("Waiting by " + Thread.currentThread().getId() + " thread");
207215
} catch (Exception e) {
208-
log.info(e.getMessage(),e); }
216+
log.error(e.getMessage(), e);
217+
}
209218
}
210219
return null;
211220
}

src/main/java/com/ericsson/ei/handlers/UpStreamEventsHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,13 +109,14 @@ private void traverseTree(final JsonNode jsonArray, final String aggregatedObjec
109109
final JsonNode parent = jsonArray.get(0);
110110
RulesObject rules = rulesHandler.getRulesForEvent(parent.toString());
111111

112-
final String np = historyExtractionHandler.runHistoryExtraction(aggregatedObjectId, rules, parent.toString(),
112+
String np = historyExtractionHandler.runHistoryExtraction(aggregatedObjectId, rules, parent.toString(),
113113
pathInAggregatedObject);
114114
String prevNp = null;
115115
for (int i = 1; i < jsonArray.size(); i++) {
116116
if (jsonArray.get(i).isObject()) {
117-
rules = rulesHandler.getRulesForEvent(jsonArray.get(i).toString());
118-
historyExtractionHandler.runHistoryExtraction(aggregatedObjectId, rules, jsonArray.get(i).toString(),
117+
String event = jsonArray.get(i).toString();
118+
rules = rulesHandler.getRulesForEvent(event);
119+
prevNp = historyExtractionHandler.runHistoryExtraction(aggregatedObjectId, rules, event,
119120
pathInAggregatedObject);
120121
} else {
121122
// if we have prevNp then we should use that because it is the

0 commit comments

Comments
 (0)