Skip to content

Commit 5868466

Browse files
authored
Functionality for handling repeat flag when processing subscriptions o… (#58)
* Functinality for handling repeat flag when processing subscriptions on aggregated objects. * Connected matched Subscripton Requirement Index Id to the Hashmap AggrObjId. * Code cleanup. * Stepped version to 0.0.5 in pom file. * Replaced RepeatFlag In-Memory hashmap with presistence Db storage. * Fixed some deprecated functionality in MongoDbHandler and symbol error in RunSubscription class. * Changed some log info to debug. * Connected matched AggrObjIds to which Requirement in Subscription it relates to. Tests added for this implementation. * Updated code according to review comments. * Changed mongoDbHandler updatedDocument function to use findAndModify function instead when updating matched subscription with new matched AggrObjId. * Solved findAndUpdate existing matched requirement list to push new array value with correct format. * Improved logging. * Restored SubscriptionJson object code handling which had been removed on main repository. Code is needed for ReapeatFlag handling. * Improved logging. * Change LOGGER to private static LOGGER. * Changed some Integer variables to int variables in SubscriptionRepeatDbHandlerTest class. * Change one debug log text contents. * Changed on log message to debug message.
1 parent bf49dd9 commit 5868466

File tree

8 files changed

+609
-33
lines changed

8 files changed

+609
-33
lines changed

src/main/java/com/ericsson/ei/mongodbhandler/MongoDBHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,5 +290,11 @@ private MongoCollection<Document> getMongoCollection(String dataBaseName, String
290290
MongoCollection<Document> collection = db.getCollection(collectionName);
291291
return collection;
292292
}
293+
294+
public void dropCollection(String dataBaseName, String collectionName) {
295+
MongoDatabase db = mongoClient.getDatabase(dataBaseName);
296+
MongoCollection<Document> mongoCollection = db.getCollection(collectionName);
297+
mongoCollection.drop();
298+
}
293299

294300
}

src/main/java/com/ericsson/ei/subscriptionhandler/RunSubscription.java

Lines changed: 69 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,19 @@
1616
*/
1717
package com.ericsson.ei.subscriptionhandler;
1818

19-
import com.ericsson.ei.jmespath.JmesPathInterface;
20-
import com.fasterxml.jackson.databind.JsonNode;
21-
import com.fasterxml.jackson.databind.node.ArrayNode;
19+
import java.util.HashMap;
20+
import java.util.Iterator;
21+
import java.util.concurrent.ConcurrentHashMap;
22+
2223
import org.slf4j.Logger;
2324
import org.slf4j.LoggerFactory;
2425
import org.springframework.beans.factory.annotation.Autowired;
2526
import org.springframework.stereotype.Component;
2627

27-
import java.util.Iterator;
28+
import com.ericsson.ei.jmespath.JmesPathInterface;
29+
import com.fasterxml.jackson.databind.JsonNode;
30+
import com.fasterxml.jackson.databind.ObjectMapper;
31+
import com.fasterxml.jackson.databind.node.ArrayNode;
2832

2933

3034
/**
@@ -42,8 +46,11 @@ public class RunSubscription {
4246

4347
@Autowired
4448
private JmesPathInterface jmespath;
45-
46-
/**
49+
50+
@Autowired
51+
private SubscriptionRepeatDbHandler subscriptionRepeatDbHandler;
52+
53+
/**
4754
* This method matches every condition specified in the subscription Object
4855
* and if all conditions are matched then only the aggregatedObject is
4956
* eligible for notification via e-mail or REST POST.
@@ -54,34 +61,75 @@ public class RunSubscription {
5461
* @param requirementIterator
5562
* @return boolean
5663
*/
57-
public boolean runSubscriptionOnObject(String aggregatedObject, Iterator<JsonNode> requirementIterator) {
64+
public boolean runSubscriptionOnObject(String aggregatedObject, Iterator<JsonNode> requirementIterator, JsonNode subscriptionJson) {
5865
boolean conditionFulfilled = false;
59-
int countConditionFulfillment;
60-
int countConditions;
66+
int count_condition_fulfillment = 0;
67+
int count_conditions = 0;
68+
69+
int requirementIndex = 0;
70+
6171
while (requirementIterator.hasNext()) {
72+
73+
JsonNode aggrObjJsonNode = null;
74+
ObjectMapper objectMapper = new ObjectMapper();
75+
try {
76+
aggrObjJsonNode = objectMapper.readValue(aggregatedObject, JsonNode.class);
77+
} catch (Exception e) {
78+
LOGGER.error(e.getMessage(), e);
79+
}
80+
81+
82+
String aggrObjId = aggrObjJsonNode.get("id").asText();
83+
String subscriptionName = subscriptionJson.get("subscriptionName").asText();
84+
String subscriptionRepeatFlag = subscriptionJson.get("repeat").asText();
85+
86+
if (subscriptionRepeatFlag == "false" && subscriptionRepeatDbHandler.checkIfAggrObjIdExistInSubscriptionAggrIdsMatchedList(subscriptionName, requirementIndex, aggrObjId)){
87+
LOGGER.info("Subscription has already matched with AggregatedObject Id: " + aggrObjId +
88+
"\nSubscriptionName: " + subscriptionName +
89+
"\nand has Subsctrion Repeat flag set to: " + subscriptionRepeatFlag);
90+
break;
91+
}
92+
6293
JsonNode requirement = requirementIterator.next();
63-
LOGGER.debug("The fulfilled requirement which will condition checked is : " + requirement.toString());
94+
95+
LOGGER.info("The fulfilled requirement which will condition checked is : " + requirement.toString());
6496
ArrayNode conditions = (ArrayNode) requirement.get("conditions");
65-
countConditionFulfillment = 0;
66-
countConditions = conditions.size();
67-
LOGGER.debug("Conditions of the subscription : " + conditions.toString());
97+
98+
count_condition_fulfillment = 0;
99+
count_conditions = conditions.size();
100+
101+
LOGGER.info("Conditions of the subscription : " + conditions.toString());
68102
Iterator<JsonNode> conditionIterator = conditions.elements();
69103
while (conditionIterator.hasNext()) {
70104
String rule = conditionIterator.next().get("jmespath").toString().replaceAll("^\"|\"$", "");
71-
String newRule = rule.replace("'", "\"");
72-
LOGGER.debug("Rule : " + rule);
73-
LOGGER.debug("New Rule after replacing single quote : " + newRule);
105+
String new_Rule = rule.replace("'", "\"");
106+
LOGGER.info("Rule : " + rule);
107+
LOGGER.info("New Rule after replacing single quote : " + new_Rule);
74108
JsonNode result = jmespath.runRuleOnEvent(rule, aggregatedObject);
75-
LOGGER.debug("Result : " + result.toString());
76-
if (result.toString() != null && !result.toString().equals("false") && !result.toString().equals("[]")) {
77-
countConditionFulfillment++;
109+
LOGGER.info("Result : " + result.toString());
110+
if (result.toString() != null && result.toString() != "false" && !result.toString().equals("[]")){
111+
count_condition_fulfillment++;
78112
}
79113
}
80-
if (countConditions != 0 && countConditionFulfillment == countConditions) {
114+
115+
if(count_conditions != 0 && count_condition_fulfillment == count_conditions){
81116
conditionFulfilled = true;
117+
if (subscriptionJson.get("repeat").toString() == "false") {
118+
LOGGER.info("Adding matched AggrObj id to SubscriptionRepeatFlagHandlerDb.");
119+
try {
120+
subscriptionRepeatDbHandler.addMatchedAggrObjToSubscriptionId(subscriptionName, requirementIndex, aggrObjId);
121+
} catch (Exception e) {
122+
LOGGER.error(e.getMessage());
123+
e.printStackTrace();
124+
}
125+
}
82126
}
127+
128+
requirementIndex++;
83129
}
84-
LOGGER.debug("The final value of conditionFulfilled is : " + conditionFulfilled);
130+
131+
LOGGER.info("The final value of conditionFulfilled is : " + conditionFulfilled);
132+
85133
return conditionFulfilled;
86134
}
87135
}

src/main/java/com/ericsson/ei/subscriptionhandler/SubscriptionHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,9 @@ private void extractConditions(String aggregatedObject, String subscriptionData)
102102
LOGGER.error(e.getMessage(), e);
103103
}
104104
ArrayNode requirementNode = (ArrayNode) subscriptionJson.get("requirements");
105-
LOGGER.debug("RequirementNode : " + requirementNode.toString());
105+
LOGGER.debug("Requirements : " + requirementNode.toString());
106106
Iterator<JsonNode> requirementIterator = requirementNode.elements();
107-
if (runSubscription.runSubscriptionOnObject(aggregatedObject, requirementIterator)) {
107+
if (runSubscription.runSubscriptionOnObject(aggregatedObject, requirementIterator, subscriptionJson)) {
108108
LOGGER.debug("The subscription conditions match for the aggregatedObject");
109109
informSubscription.informSubscriber(aggregatedObject, subscriptionJson);
110110
}
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
Copyright 2018 Ericsson AB.
3+
For a full list of individual contributors, please see the commit history.
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
package com.ericsson.ei.subscriptionhandler;
18+
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
import org.springframework.beans.factory.annotation.Autowired;
24+
import org.springframework.beans.factory.annotation.Value;
25+
import org.springframework.stereotype.Component;
26+
27+
28+
import com.ericsson.ei.mongodbhandler.MongoDBHandler;
29+
import com.fasterxml.jackson.core.type.TypeReference;
30+
import com.fasterxml.jackson.databind.JsonNode;
31+
import com.fasterxml.jackson.databind.ObjectMapper;
32+
import com.fasterxml.jackson.databind.ObjectReader;
33+
import com.mongodb.BasicDBObject;
34+
35+
import lombok.Getter;
36+
import lombok.Setter;
37+
38+
39+
@Component
40+
public class SubscriptionRepeatDbHandler {
41+
42+
private static Logger LOGGER = (Logger) LoggerFactory.getLogger(SubscriptionRepeatDbHandler.class);
43+
44+
45+
@Autowired
46+
public MongoDBHandler mongoDbHandler;
47+
48+
private ObjectMapper mapper = new ObjectMapper();
49+
50+
@Getter
51+
@Setter
52+
@Value("${database.name}")
53+
public String dataBaseName;
54+
@Getter
55+
@Setter
56+
@Value("${subscription.collection.repeatFlagHandlerName}")
57+
public String collectionName;
58+
59+
60+
/*
61+
* RepeatFlagHandling structure in MongoDb:
62+
* {
63+
"_id" : ObjectId("5ac62b4ea4f87e29e8cc5915"),
64+
"subscriptionId" : "subsA",
65+
// RequirementId corresponds to a Requirement List of matched Aggregated Objects Ids.
66+
<RequirementId> <AggrObjIds>
67+
"requirements" : {"0" : [
68+
"11112",
69+
"72324",
70+
"72364",
71+
"72233",
72+
"71233"
73+
],
74+
"1" : [
75+
"11112",
76+
"72324",
77+
"72364",
78+
"72233",
79+
"71233"
80+
]
81+
}
82+
}
83+
*
84+
*/
85+
86+
/*
87+
* Function that stores the matched aggregatedObjectId to the database.
88+
*
89+
*/
90+
public void addMatchedAggrObjToSubscriptionId(String subscriptionId, int requirementId, String aggrObjId) throws Exception {
91+
92+
LOGGER.debug("Adding/Updating matched AggrObjId: " + aggrObjId + " to SubscriptionsId: " + subscriptionId + " aggrId matched list" );
93+
94+
if (checkIfAggrObjIdExistInSubscriptionAggrIdsMatchedList(subscriptionId, requirementId, aggrObjId)) {
95+
LOGGER.info("Subscription: " + subscriptionId + " and AggrObjId, " +
96+
aggrObjId + " has already been matched." +
97+
"No need to register the subscription match.");
98+
return;
99+
}
100+
101+
102+
if (!updateExistingMatchedSubscriptionWithAggrObjId(subscriptionId, requirementId, aggrObjId)) {
103+
LOGGER.error("Couldn't update SubscriptionMathced id.");
104+
}
105+
else {
106+
LOGGER.debug("New Subscription AggrId has not matched, inserting new SubscriptionId and AggrObjId to matched list.");
107+
BasicDBObject document = new BasicDBObject();
108+
document.put("subscriptionId", subscriptionId);
109+
110+
ArrayList<String> aggrObjIdsList = new ArrayList<String>();
111+
aggrObjIdsList.add(aggrObjId);
112+
113+
BasicDBObject reqDocument = new BasicDBObject();
114+
reqDocument.put(String.valueOf(requirementId), aggrObjIdsList);
115+
116+
document.put("requirements", reqDocument);
117+
118+
LOGGER.debug("New Matched AggrIdObject update on Subscription to be inserted to Db: " + document);
119+
boolean result = mongoDbHandler.insertDocument(dataBaseName, collectionName, document.toString());
120+
if (result == false) {
121+
throw new Exception("Failed to insert the document into database");
122+
}
123+
}
124+
}
125+
126+
private boolean updateExistingMatchedSubscriptionWithAggrObjId(String subscriptionId, int requirementId,
127+
String aggrObjId) throws Exception {
128+
String subscriptionQuery = "{\"subscriptionId\" : \"" + subscriptionId + "\"}";
129+
130+
JsonNode updateDocJsonNode = mapper
131+
.readValue("{\"$push\" : { \"requirements." + requirementId + "\" : \"" + aggrObjId + "\"}}", JsonNode.class);
132+
133+
LOGGER.debug("SubscriptionId \"", subscriptionId, "\" document will be updated with following requirement update object: " + updateDocJsonNode.toString());
134+
135+
JsonNode queryJsonNode = mapper.readValue(subscriptionQuery, JsonNode.class);
136+
try {
137+
mongoDbHandler.findAndModify(dataBaseName, collectionName, queryJsonNode.toString(),
138+
updateDocJsonNode.toString());
139+
} catch (Exception e) {
140+
LOGGER.debug("Failed to update existing matched SubscriptionId with new AggrId." + "SubscriptionId: "
141+
+ subscriptionId + "New matched AggrObjId: " + aggrObjId + "RequirementId that have matched: "
142+
+ requirementId);
143+
return false;
144+
}
145+
return true;
146+
}
147+
148+
149+
public boolean checkIfAggrObjIdExistInSubscriptionAggrIdsMatchedList(String subscriptionId, int requirementId, String aggrObjId) {
150+
151+
LOGGER.debug("Checking if AggrObjId: " + aggrObjId + " exist in SubscriptionId: " + subscriptionId + " AggrId matched list.");
152+
String subscriptionQuery = "{\"subscriptionId\" : \"" + subscriptionId + "\"}";
153+
List<String> objArray = mongoDbHandler.find(dataBaseName, collectionName, subscriptionQuery);
154+
if (objArray != null && !objArray.isEmpty()) {
155+
JsonNode jNode = null;
156+
LOGGER.debug("Making AggrObjId checks on SubscriptionId document: " + objArray.get(0));
157+
try {
158+
jNode = mapper.readTree(objArray.get(0));
159+
} catch (Exception e) {
160+
LOGGER.error(e.getMessage());
161+
e.printStackTrace();
162+
}
163+
if (jNode.get("subscriptionId").asText().trim().equals(subscriptionId)) {
164+
LOGGER.debug("SubscriptionId \"" , subscriptionId , "\" , exist in document. Checking if AggrObjId has matched earlier.");
165+
List<String> listAggrObjIds = null;
166+
LOGGER.debug("Subscription requirementId: " + requirementId + " and Requirements content:\n" + jNode.get("requirements").get(new Integer(requirementId).toString()));
167+
try {
168+
ObjectReader reader = mapper.readerFor(new TypeReference<List<String>>() {
169+
});
170+
JsonNode arrayNode = mapper.createArrayNode().add(jNode.get("requirements").get(new Integer(requirementId)));
171+
listAggrObjIds = reader.readValue(arrayNode);
172+
173+
if (requirementId > (listAggrObjIds.size() - 1)) {
174+
LOGGER.debug("RequirementId: " + requirementId + " and SubscriptionId: " + subscriptionId +
175+
"\nhas not matched any AggregatedObject yet. No need to do anymore check.");
176+
return false;
177+
}
178+
179+
if (listAggrObjIds.get(requirementId) == aggrObjId) {
180+
LOGGER.info("Subscription has matched aggrObjId already: " + aggrObjId);
181+
return true;
182+
}
183+
} catch (Exception e) {
184+
LOGGER.error(e.getMessage());
185+
e.printStackTrace();
186+
}
187+
}
188+
}
189+
LOGGER.info("AggrObjId not found for SubscriptionId in SubscriptionRepeatFlagHandlerDb -> Returning FALSE.");
190+
return false;
191+
}
192+
193+
}

src/main/resources/application.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ waitlist.initialDelayResend: 2000
3939
waitlist.fixedRateResend: 15000
4040
database.name: eiffel_intelligence
4141
subscription.collection.name: subscription
42+
subscription.collection.repeatFlagHandlerName: subscription_repeat_handler
4243

4344
threads.corePoolSize: 100
4445
threads.queueCapacity: 5000

0 commit comments

Comments
 (0)