Skip to content

Commit 07370f8

Browse files
Merge branch 'array_aggregation' into master
2 parents 0f0dda7 + 4366545 commit 07370f8

17 files changed

+454
-37
lines changed

.travis.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,6 @@ script:
2121
#
2222
# It is also importan that the first test to be run is a Spring Boot Test since spring boot also downloads
2323
# another version of embedded mongo db so all Spring Boot tests will end up in a race condition also
24-
- mvn -DsomeModule.test.includes="**/QueryServiceTest.java" test
25-
- mvn -DsomeModule.test.excludes="**/QueryServiceTest.java" test
24+
- mvn -DsomeModule.test.includes="**/QueryServiceTest.java" test jacoco:report coveralls:report
25+
- mvn -DsomeModule.test.excludes="**/QueryServiceTest.java" test jacoco:report coveralls:report
2626

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,19 @@
1515
limitations under the License.
1616
--->
1717

18+
[![Build Status](https://travis-ci.org/Ericsson/eiffel-intelligence.svg?branch=master)](https://travis-ci.org/Ericsson/eiffel-intelligence)
19+
[![Coverage Status](https://coveralls.io/repos/github/Ericsson/eiffel-intelligence/badge.svg?branch=master)](https://coveralls.io/github/Ericsson/eiffel-intelligence?branch=master)
20+
[![](https://jitpack.io/v/Ericsson/eiffel-intelligence.svg)](https://jitpack.io/#Ericsson/eiffel-intelligence)
21+
1822
# Eiffel Intelligence
1923
Eiffel Intelligence is a real time data aggregation and analysis solution for Eiffel events. While Eiffel represents an event based architecture, Eiffel Intelligence addresses the need of stakeholders to view the current state of the system by bridging the divide from immutable events to mutable state representation.
2024

2125
This repository contains a software implementation based on the Eiffel protocol. For more information on Eiffel, its vocabulary, descriptions, guides and schemas, please see [the Eiffel repository](https://github.com/Ericsson/eiffel). For news, discussions and questions, please visit the [Eiffel Community Google group](https://groups.google.com/forum/#!forum/eiffel-community).
2226

2327
Eiffel Intelligence is licensed under the [Apache License 2.0](./LICENSE).
2428

29+
## Detailed documentation is provided at the following link: http://ericsson.github.io/eiffel-intelligence
30+
2531
## How to Propose Changes
2632

2733
Anyone is welcome to propose changes to this repository by creating a new [Issue](https://github.com/Ericsson/eiffel-intelligence/issues) ticket in GitHub. These requests may concern anything contained in the repo: changes to documentation, changes to interfaces, changes to implementations, additional tests et cetera.

docs/_config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
theme: jekyll-theme-slate

docs/index.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
2+
# _Introduction_
3+
4+
Eiffel Intelligence is a service that aggregates informations from different events of a flow with the purpose of notifying subscribers when content of interest has been collected from desired events. The aggregation is stored in a JSON bject. By flow we mean a chain of Eiffel events that are linked together directly or indirectly.
5+
6+
Eiffel Intelligence uses a set of rules to define what information will be extracted from an Eiffel event in the flow and at what location to store this information in the aggregated object. Today only one rule set can be run in each instance and the reason is that in an Eiffel domain with million of events flowing we will have multiple extractions and checks for each rule set and multiple rule sets will require large machines and scalability problems.
7+
8+
Eiffel intelligence uses subscriptions to notify interested parties when several evens have occured with desired content in each event aggregated in an aggregated object. Every time an aggregated object is updated we check whether any subscription is fulfilled by the curent state of the aggregated object.
9+
10+
[Rules](rules.md)
11+
12+
[Subscriptions](subscription.md)

docs/rules.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Rules in Eiffel Intelligence

docs/subscriptions.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Subscriptions in Eiffel Intelligence

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@
9898
<artifactId>spring-security-test</artifactId>
9999
<scope>test</scope>
100100
</dependency>
101-
101+
102102
<dependency>
103103
<groupId>org.springframework.session</groupId>
104104
<artifactId>spring-session-data-mongodb</artifactId>
@@ -244,7 +244,7 @@
244244

245245
</dependencies>
246246
<build>
247-
<plugins>
247+
<plugins>
248248
<plugin>
249249
<groupId>org.springframework.boot</groupId>
250250
<artifactId>spring-boot-maven-plugin</artifactId>
@@ -343,4 +343,4 @@
343343

344344
</plugins>
345345
</build>
346-
</project>
346+
</project>

src/main/java/com/ericsson/ei/handlers/UpStreamEventsHandler.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.ericsson.ei.rules.RulesHandler;
2020
import com.ericsson.ei.rules.RulesObject;
2121
import com.fasterxml.jackson.databind.JsonNode;
22+
2223
import org.slf4j.Logger;
2324
import org.slf4j.LoggerFactory;
2425
import org.springframework.beans.factory.annotation.Autowired;
@@ -107,15 +108,21 @@ private void traverseTree(final JsonNode jsonArray, final String aggregatedObjec
107108
final String pathInAggregatedObject) {
108109

109110
final JsonNode parent = jsonArray.get(0);
110-
RulesObject rules = rulesHandler.getRulesForEvent(parent.toString());
111-
112-
String np = historyExtractionHandler.runHistoryExtraction(aggregatedObjectId, rules, parent.toString(),
113-
pathInAggregatedObject);
111+
JsonNode parentId = parent.at("/meta/id");
112+
String np = pathInAggregatedObject;
113+
if (!aggregatedObjectId.equals(parentId.textValue())) {
114+
// parent event is not the same as the starting event so we can
115+
// start collecting history
116+
RulesObject rules = rulesHandler.getRulesForEvent(parent.toString());
117+
118+
np = historyExtractionHandler.runHistoryExtraction(aggregatedObjectId, rules, parent.toString(),
119+
pathInAggregatedObject);
120+
}
114121
String prevNp = null;
115122
for (int i = 1; i < jsonArray.size(); i++) {
116123
if (jsonArray.get(i).isObject()) {
117124
String event = jsonArray.get(i).toString();
118-
rules = rulesHandler.getRulesForEvent(event);
125+
RulesObject rules = rulesHandler.getRulesForEvent(event);
119126
prevNp = historyExtractionHandler.runHistoryExtraction(aggregatedObjectId, rules, event,
120127
pathInAggregatedObject);
121128
} else {

src/main/java/com/ericsson/ei/jmespath/JmesPathInterface.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616
*/
1717
package com.ericsson.ei.jmespath;
1818

19+
import com.fasterxml.jackson.databind.JsonNode;
20+
import com.fasterxml.jackson.databind.ObjectMapper;
21+
1922
import org.slf4j.Logger;
2023
import org.slf4j.LoggerFactory;
2124
import org.springframework.stereotype.Component;
2225

23-
import com.fasterxml.jackson.databind.JsonNode;
24-
import com.fasterxml.jackson.databind.ObjectMapper;
25-
2626
import io.burt.jmespath.Expression;
2727
import io.burt.jmespath.JmesPath;
2828
import io.burt.jmespath.function.FunctionRegistry;
@@ -50,14 +50,16 @@ public JsonNode runRuleOnEvent(String rule, String input) {
5050
if (input == null) {
5151
input = "";
5252
}
53-
Expression<JsonNode> expression = jmespath.compile(rule);
53+
54+
JsonNode result = null;
5455
ObjectMapper objectMapper = new ObjectMapper();
5556
try {
57+
Expression<JsonNode> expression = jmespath.compile(rule);
5658
event = objectMapper.readValue(input, JsonNode.class);
59+
result = expression.search(event);
5760
} catch (Exception e) {
5861
log.info(e.getMessage(), e);
5962
}
60-
JsonNode result = expression.search(event);
6163

6264
return result;
6365
}

src/main/java/com/ericsson/ei/jsonmerge/MergePrepare.java

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,27 @@
1616
*/
1717
package com.ericsson.ei.jsonmerge;
1818

19-
import org.slf4j.Logger;
20-
import org.slf4j.LoggerFactory;
21-
import org.apache.commons.lang3.StringUtils;
22-
import org.json.*;
23-
import org.springframework.beans.factory.annotation.Autowired;
24-
import org.springframework.stereotype.Component;
25-
2619
import com.ericsson.ei.jmespath.JmesPathInterface;
2720
import com.fasterxml.jackson.databind.JsonNode;
2821
import com.fasterxml.jackson.databind.ObjectMapper;
2922
import com.fasterxml.jackson.databind.node.ArrayNode;
23+
import com.fasterxml.jackson.databind.node.NullNode;
3024
import com.github.wnameless.json.flattener.JsonFlattener;
3125

32-
import lombok.Setter;
26+
import java.util.ArrayList;
27+
import java.util.Arrays;
28+
import java.util.Map;
29+
30+
import org.apache.commons.lang3.StringUtils;
31+
import org.json.JSONArray;
32+
import org.json.JSONException;
33+
import org.json.JSONObject;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
import org.springframework.beans.factory.annotation.Autowired;
37+
import org.springframework.stereotype.Component;
3338

34-
import java.util.*;
39+
import lombok.Setter;
3540

3641
@Component
3742
public class MergePrepare {
@@ -241,10 +246,10 @@ public String getMergePath(String originObject, String mergeRule, boolean skipPa
241246
if (pos > 0)
242247
ruleKey = ruleKey.substring(0, pos);
243248
try {
244-
Object object = objectJSONObject.get(ruleKey);
245-
if (object != null)
249+
JsonNode jsonResult = jmesPathInterface.runRuleOnEvent(ruleKey, originObject);
250+
if (!(jsonResult instanceof NullNode))
246251
mergePath = ruleKey;
247-
} catch (JSONException e) {
252+
} catch (Exception e) {
248253
log.error(e.getMessage(), e);
249254
}
250255
} else {
@@ -320,13 +325,14 @@ public String getMergePath(String originObject, String mergeRule, boolean skipPa
320325
Object value = parsedJson.at(mergePath);
321326
if (value instanceof ArrayNode) {
322327
int arraySize = ((ArrayNode) value).size();
323-
mergePath += "." + arraySize++ + "." + ruleKeyLast;
324-
} else {
328+
mergePath += "." + arraySize++;
329+
}
330+
if (!StringUtils.isAllBlank(ruleKeyLast)) {
325331
mergePath += "." + ruleKeyLast;
326332
}
333+
327334
mergePath = mergePath.replaceFirst("\\/", "");
328335
mergePath = mergePath.replaceAll("\\/", "\\.");
329-
330336
} catch (Exception e) {
331337
log.error(e.getMessage(), e);
332338
}

0 commit comments

Comments
 (0)