Skip to content

Commit 3ecb03f

Browse files
Surya-Kollixdurvak
authored andcommitted
DURACI 6689 : Implemented WaitList Storage Handler for Eiffel Intelligence
DURACI 6700 : Implemented Waitlist worker for eiffel intelligence Change-Id: Ib78c63a7988b45247846cd76392f6a1d6dff7070
1 parent af47957 commit 3ecb03f

File tree

14 files changed

+641
-66
lines changed

14 files changed

+641
-66
lines changed

src/main/java/com/ericsson/ei/App.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22

33
import org.springframework.boot.SpringApplication;
44
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
import org.springframework.scheduling.annotation.EnableScheduling;
56

67
@SpringBootApplication
8+
@EnableScheduling
79
public class App {
810

911
public static void main(String[] args) {

src/main/java/com/ericsson/ei/handlers/IdRulesHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,11 @@ public void runIdRules(RulesObject rulesObject, String event) {
5252
if (rulesObject.isStartEventRules()) {
5353
extractionHandler.runExtraction(rulesObject, id, event, (JsonNode)null);
5454
} else {
55-
waitListStorageHandler.addEventToWaitList(event, rulesObject);
55+
try {
56+
waitListStorageHandler.addEventToWaitList(event, rulesObject);
57+
} catch (Exception e) {
58+
log.info(e.getMessage(),e);
59+
}
5660
}
5761
}
5862
}

src/main/java/com/ericsson/ei/mongodbhandler/MongoDBHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.springframework.beans.factory.annotation.Value;
1010
import org.springframework.stereotype.Component;
1111

12+
import com.mongodb.BasicDBObject;
1213
import com.mongodb.DB;
1314
import com.mongodb.DBCollection;
1415
import com.mongodb.DBCursor;
@@ -53,7 +54,7 @@ public boolean insertDocument(String dataBaseName, String collectionName, Strin
5354
return result.wasAcknowledged();
5455
}
5556
} catch (Exception e) {
56-
System.out.println(e.getMessage());
57+
log.info(e.getMessage(),e);
5758
}
5859
return false;
5960
}
@@ -145,4 +146,11 @@ public boolean dropDocument(String dataBaseName, String collectionName,String c
145146
return false;
146147
}
147148

149+
public void createTTLIndex(String dataBaseName, String collectionName,String fieldName,int ttlValue){
150+
DB db = mongoClient.getDB(dataBaseName);
151+
BasicDBObject ttlField=new BasicDBObject(fieldName,1);
152+
BasicDBObject ttlTime=new BasicDBObject("expireAfterSeconds",ttlValue);
153+
db.getCollection(collectionName).createIndex(ttlField,ttlTime);
154+
}
155+
148156
}

src/main/java/com/ericsson/ei/rmq/consumer/RmqConsumer.java renamed to src/main/java/com/ericsson/ei/rmqhandler/RmqHandler.java

Lines changed: 91 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,54 @@
1-
package com.ericsson.ei.rmq.consumer;
1+
package com.ericsson.ei.rmqhandler;
22

3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.springframework.amqp.core.Binding;
36
import org.springframework.amqp.core.BindingBuilder;
47
import org.springframework.amqp.core.Queue;
58
import org.springframework.amqp.core.TopicExchange;
69
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
710
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
8-
import org.springframework.amqp.rabbit.core.RabbitAdmin;
11+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
12+
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
913
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
1014
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
15+
import org.springframework.amqp.rabbit.support.CorrelationData;
1116
import org.springframework.beans.factory.annotation.Value;
1217
import org.springframework.context.annotation.Bean;
1318
import org.springframework.stereotype.Component;
1419

1520
import com.ericsson.ei.handlers.EventHandler;
1621

17-
1822
@Component
19-
public class RmqConsumer {
20-
21-
@Value("${rabbitmq.queue.durable}") private Boolean queueDurable;
22-
@Value("${rabbitmq.host}") private String host;
23-
@Value("${rabbitmq.exchange.name}") private String exchangeName;
24-
@Value("${rabbitmq.port}") private Integer port;
25-
@Value("${rabbitmq.tls}") private String tlsVer;
26-
@Value("${rabbitmq.user}") private String user;
27-
@Value("${rabbitmq.password}") private String password;
28-
@Value("${rabbitmq.domainId}") private String domainId;
29-
@Value("${rabbitmq.componentName}") private String componentName;
30-
@Value("${rabbitmq.routing.key}") private String routingKey;
31-
@Value("${rabbitmq.consumerName}") private String consumerName;
32-
33-
// SimpleMessageListenerContainer container;
23+
public class RmqHandler {
24+
25+
@Value("${rabbitmq.queue.durable}")
26+
private Boolean queueDurable;
27+
@Value("${rabbitmq.host}")
28+
private String host;
29+
@Value("${rabbitmq.exchange.name}")
30+
private String exchangeName;
31+
@Value("${rabbitmq.port}")
32+
private Integer port;
33+
@Value("${rabbitmq.tls}")
34+
private String tlsVer;
35+
@Value("${rabbitmq.user}")
36+
private String user;
37+
@Value("${rabbitmq.password}")
38+
private String password;
39+
@Value("${rabbitmq.domainId}")
40+
private String domainId;
41+
@Value("${rabbitmq.componentName}")
42+
private String componentName;
43+
@Value("${rabbitmq.routing.key}")
44+
private String routingKey;
45+
@Value("${rabbitmq.consumerName}")
46+
private String consumerName;
47+
// SimpleMessageListenerContainer container;
48+
private RabbitTemplate rabbitTemplate;
49+
private CachingConnectionFactory factory;
50+
private SimpleMessageListenerContainer container;
51+
static Logger log = (Logger) LoggerFactory.getLogger(RmqHandler.class);
3452

3553
public Boolean getQueueDurable() {
3654
return queueDurable;
@@ -122,32 +140,76 @@ public void setConsumerName(String consumerName) {
122140

123141
@Bean
124142
ConnectionFactory connectionFactory() {
125-
CachingConnectionFactory factory = new CachingConnectionFactory(host,port);
126-
// factory.setUsername("guest");
127-
// factory.setPassword("guest");
143+
factory = new CachingConnectionFactory(host, port);
144+
factory.setPublisherConfirms(true);
145+
factory.setPublisherReturns(true);
146+
// factory.setUsername("guest");
147+
// factory.setPassword("guest");
128148
return factory;
129149
}
130150

151+
@Bean
152+
Queue queue() {
153+
return new Queue(getQueueName(), true);
154+
}
155+
156+
@Bean
157+
TopicExchange exchange() {
158+
return new TopicExchange(exchangeName);
159+
}
160+
161+
@Bean
162+
Binding binding(Queue queue, TopicExchange exchange) {
163+
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
164+
}
165+
131166
@Bean
132167
SimpleMessageListenerContainer bindToQueueForRecentEvents(ConnectionFactory factory, EventHandler eventHandler) {
133168
String queueName = getQueueName();
134-
Queue queue = new Queue(queueName, queueDurable);
135-
TopicExchange topicExchange = new TopicExchange(exchangeName);
136-
RabbitAdmin rabbitAdmin = new RabbitAdmin(factory);
137-
rabbitAdmin.declareExchange(topicExchange);
138-
rabbitAdmin.declareQueue(queue);
139-
BindingBuilder.bind(queue).to(topicExchange).with(routingKey);
140169
MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(eventHandler, "eventReceived");
141-
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
170+
container = new SimpleMessageListenerContainer();
142171
container.setConnectionFactory(factory);
143172
container.setQueueNames(queueName);
144173
container.setMessageListener(listenerAdapter);
145174
return container;
146175
}
147176

148-
149177
public String getQueueName() {
150178
String durableName = queueDurable ? "durable" : "transient";
151179
return domainId + "." + componentName + "." + consumerName + "." + durableName;
152180
}
181+
182+
@Bean
183+
public RabbitTemplate rabbitMqTemplate() {
184+
if (factory != null) {
185+
rabbitTemplate = new RabbitTemplate(factory);
186+
} else {
187+
rabbitTemplate = new RabbitTemplate(connectionFactory());
188+
}
189+
rabbitTemplate.setExchange(exchangeName);
190+
rabbitTemplate.setRoutingKey(routingKey);
191+
rabbitTemplate.setQueue(getQueueName());
192+
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
193+
@Override
194+
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
195+
log.info("Received confirm with result : {}", ack);
196+
}
197+
});
198+
return rabbitTemplate;
199+
}
200+
201+
public void publishObjectToMessageBus(String message) {
202+
log.info("publishing message to message bus...");
203+
rabbitTemplate.convertAndSend(message);
204+
}
205+
206+
public void close() {
207+
try {
208+
container.destroy();
209+
factory.destroy();
210+
} catch (Exception e) {
211+
log.info("exception occured while closing connections");
212+
log.info(e.getMessage(),e);
213+
}
214+
}
153215
}
Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,84 @@
11
package com.ericsson.ei.waitlist;
22

3+
import java.text.DateFormat;
4+
import java.text.ParseException;
5+
import java.text.SimpleDateFormat;
36
import java.util.ArrayList;
7+
import java.util.Date;
48

9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
import org.springframework.beans.factory.annotation.Autowired;
12+
import org.springframework.beans.factory.annotation.Value;
513
import org.springframework.stereotype.Component;
614

15+
import com.ericsson.ei.jmespath.JmesPathInterface;
16+
import com.ericsson.ei.mongodbhandler.MongoDBHandler;
717
import com.ericsson.ei.rules.RulesObject;
18+
import com.fasterxml.jackson.databind.JsonNode;
19+
import com.mongodb.BasicDBObject;
20+
import com.mongodb.util.JSON;
821

922
@Component
1023
public class WaitListStorageHandler {
1124

12-
public void addEventToWaitList(String event, RulesObject rulesObject) {
25+
@Value("${waitlist.collection.name}") private String collectionName;
26+
@Value("${database.name}") private String databaseName;
27+
@Value("${waitlist.collection.ttlValue}") private int ttlValue;
28+
static Logger log = (Logger) LoggerFactory.getLogger(WaitListStorageHandler.class);
29+
30+
@Autowired
31+
private MongoDBHandler mongoDbHandler;
1332

33+
public void setMongoDbHandler(MongoDBHandler mongoDbHandler) {
34+
this.mongoDbHandler = mongoDbHandler;
35+
}
36+
37+
@Autowired
38+
private JmesPathInterface jmesPathInterface;
39+
40+
public void setJmesPathInterface(JmesPathInterface jmesPathInterface) {
41+
this.jmesPathInterface = jmesPathInterface;
42+
}
43+
44+
public void addEventToWaitList(String event, RulesObject rulesObject) throws Exception {
45+
String input = addProprtiesToEvent(event, rulesObject);
46+
boolean result=mongoDbHandler.insertDocument(databaseName,collectionName, input);
47+
if (result == false) {
48+
throw new Exception("failed to insert the document into database");
49+
}
50+
}
51+
52+
private String addProprtiesToEvent(String event, RulesObject rulesObject) {
53+
String time = null;
54+
Date date=null;
55+
String idRule = rulesObject.getIdRule();
56+
JsonNode id = jmesPathInterface.runRuleOnEvent(idRule, event);
57+
String condition = "{Event:" + JSON.parse(event).toString()+"}";
58+
ArrayList<String> documents = mongoDbHandler.find(databaseName, collectionName, condition);
59+
if (documents.size() == 0){
60+
DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
61+
date = new Date();
62+
time = dateFormat.format(date);
63+
try {
64+
date=dateFormat.parse(time);
65+
} catch (ParseException e) {
66+
log.info(e.getMessage(),e);
67+
}
68+
69+
}
70+
BasicDBObject document = new BasicDBObject();
71+
document.put("_id", id.textValue());
72+
document.put("Time", date);
73+
document.put("Event", JSON.parse(event));
74+
mongoDbHandler.createTTLIndex(databaseName, collectionName, "Time",ttlValue);
75+
String input = document.toString();
76+
return input;
1477
}
1578

1679
public ArrayList<String> getWaitList() {
17-
return null;
80+
ArrayList<String> documents = mongoDbHandler.getAllDocuments(databaseName,collectionName);
81+
return documents;
1882
}
19-
}
83+
84+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package com.ericsson.ei.waitlist;
2+
3+
import java.util.ArrayList;
4+
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
import org.springframework.beans.factory.annotation.Autowired;
8+
import org.springframework.beans.factory.annotation.Value;
9+
import org.springframework.scheduling.annotation.Scheduled;
10+
import org.springframework.stereotype.Component;
11+
12+
import com.ericsson.ei.handlers.MatchIdRulesHandler;
13+
import com.ericsson.ei.jmespath.JmesPathInterface;
14+
import com.ericsson.ei.mongodbhandler.MongoDBHandler;
15+
import com.ericsson.ei.rmqhandler.RmqHandler;
16+
import com.ericsson.ei.rules.RulesHandler;
17+
import com.ericsson.ei.rules.RulesObject;
18+
import com.fasterxml.jackson.databind.JsonNode;
19+
import com.mongodb.DBObject;
20+
import com.mongodb.util.JSON;
21+
22+
@Component
23+
public class WaitListWorker {
24+
25+
@Value("${waitlist.collection.name}")
26+
private String collectionName;
27+
@Value("${database.name}")
28+
private String databaseName;
29+
30+
@Autowired
31+
private WaitListStorageHandler waitListStorageHandler;
32+
33+
@Autowired
34+
private MongoDBHandler mongoDbHandler;
35+
36+
@Autowired
37+
private RmqHandler rmqHandler;
38+
39+
@Autowired
40+
private RulesHandler rulesHandler;
41+
42+
@Autowired
43+
private JmesPathInterface jmesPathInterface;
44+
45+
@Autowired
46+
private MatchIdRulesHandler matchIdRulesHandler;
47+
48+
static Logger log = (Logger) LoggerFactory.getLogger(WaitListWorker.class);
49+
50+
@Scheduled(initialDelay = 1000, fixedRate = 10000)
51+
public void run() {
52+
RulesObject rulesObject = null;
53+
ArrayList<String> documents = waitListStorageHandler.getWaitList();
54+
for (String document : documents) {
55+
DBObject dbObject = (DBObject) JSON.parse(document);
56+
String event = dbObject.get("Event").toString();
57+
rulesObject = rulesHandler.getRulesForEvent(event);
58+
String idRule = rulesObject.getIdRule();
59+
JsonNode id = jmesPathInterface.runRuleOnEvent(idRule, event);
60+
ArrayList<String> objects = matchIdRulesHandler.fetchObjectsById(rulesObject, id.textValue());
61+
if (objects.size() > 0) {
62+
rmqHandler.publishObjectToMessageBus(event);
63+
dropDocumentFromWaitList(document);
64+
}
65+
}
66+
}
67+
68+
public boolean dropDocumentFromWaitList(String document) {
69+
return mongoDbHandler.dropDocument(databaseName, collectionName, document);
70+
}
71+
}

src/main/resources/application.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ mongodb.port: 27017
2323

2424
aggregated.collection.name: aggregated_objects
2525
event_object_map.collection.name: event_object_map
26+
waitlist.collection.name: wait_list
27+
waitlist.collection.ttlValue: 600
2628
database.name: eiffel_intelligence_eiffelxxx
2729
subscription.collection.name: subscription
2830

0 commit comments

Comments
 (0)