Skip to content

Commit c0c6b9f

Browse files
merge from master repo (#7)
1 parent 5e1d1f2 commit c0c6b9f

File tree

8 files changed

+46
-133
lines changed

8 files changed

+46
-133
lines changed

.travis.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ before_install:
1313
- chmod +x pom.xml
1414

1515
script:
16-
- mvn -DsomeModule.test.excludes="**/FlowTest.java, **/FlowTest2.java, **/TrafficGeneratedTest.java" test
16+
- mvn -DsomeModule.test.excludes="**/FlowTest.java, **/FlowTest2.java, **/TrafficGeneratedTest.java, **/FlowTestExternalComposition.java" test
1717
- mvn -DsomeModule.test.includes="**/FlowTest.java" test
1818
- mvn -DsomeModule.test.includes="**/FlowTest2.java" test
19-
- mvn -DsomeModule.test.includes="**/TrafficGeneratedTest.java" test
19+
- mvn -DsomeModule.test.includes="**/TrafficGeneratedTest.java" test
20+
- mvn -DsomeModule.test.includes="**/FlowTestExternalComposition.java" test

src/main/java/com/ericsson/ei/handlers/EventHandler.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,6 @@ public void eventReceived(String event) {
3636
idRulesHandler.runIdRules(eventRules, event);
3737
}
3838

39-
@Bean
40-
public Executor asyncExecutor() {
41-
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
42-
executor.setCorePoolSize(corePoolSize);
43-
executor.setQueueCapacity(queueCapacity);
44-
executor.setMaxPoolSize(maxPoolSize);
45-
executor.setThreadNamePrefix("EventHandler-");
46-
executor.initialize();
47-
return executor;
48-
}
49-
5039
public void eventReceived(byte[] message) {
5140
log.info("Thread id " + Thread.currentThread().getId() + " spawned");
5241
String actualMessage = new String(message);
@@ -69,14 +58,4 @@ public void onMessage(Message message, Channel channel) throws Exception {
6958
int breakHere = 1;
7059
}
7160

72-
@Async
73-
public void onMessage(Message message, Channel channel) throws Exception {
74-
byte[] messageBody = message.getBody();
75-
// String messageStr = new String(messageBody);
76-
eventReceived(messageBody);
77-
long deliveryTag = message.getMessageProperties().getDeliveryTag();
78-
// String queue = message.getMessageProperties().getConsumerQueue();
79-
channel.basicAck(deliveryTag, false);
80-
int breakHere = 1;
81-
}
8261
}

src/main/java/com/ericsson/ei/rmqhandler/RmqHandler.java

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -171,28 +171,16 @@ Binding binding(Queue queue, TopicExchange exchange) {
171171
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
172172
}
173173

174-
// @Bean
175-
// SimpleMessageListenerContainer bindToQueueForRecentEvents(ConnectionFactory factory, EventHandler eventHandler) {
176-
// String queueName = getQueueName();
177-
//// MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(eventHandler, "eventReceived");
178-
// MessageListenerAdapter listenerAdapter = new EIMessageListenerAdapter(eventHandler);
179-
// container = new SimpleMessageListenerContainer();
180-
// container.setConnectionFactory(factory);
181-
// container.setQueueNames(queueName);
182-
// container.setMessageListener(listenerAdapter);
183-
//// container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
184-
// return container;
185-
// }
186-
187174
@Bean
188-
SimpleMessageListenerContainer bindToWaitlistQueueForRecentEvents(ConnectionFactory factory, EventHandler eventHandler) {
175+
SimpleMessageListenerContainer bindToQueueForRecentEvents(ConnectionFactory factory, EventHandler eventHandler) {
176+
String queueName = getQueueName();
189177
MessageListenerAdapter listenerAdapter = new EIMessageListenerAdapter(eventHandler);
190-
waitlistContainer = new SimpleMessageListenerContainer();
191-
waitlistContainer.setConnectionFactory(factory);
192-
waitlistContainer.setQueueNames(getWaitlistQueueName());
193-
waitlistContainer.setMessageListener(listenerAdapter);
194-
waitlistContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
195-
return waitlistContainer;
178+
container = new SimpleMessageListenerContainer();
179+
container.setConnectionFactory(factory);
180+
container.setQueueNames(queueName);
181+
container.setMessageListener(listenerAdapter);
182+
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
183+
return container;
196184
}
197185

198186
public String getQueueName() {
@@ -206,7 +194,7 @@ public String getWaitlistQueueName() {
206194
}
207195

208196
@Bean
209-
public RabbitTemplate waitListRabbitMqTemplate() {
197+
public RabbitTemplate rabbitMqTemplate() {
210198
if (rabbitTemplate == null) {
211199
if (factory != null) {
212200
rabbitTemplate = new RabbitTemplate(factory);
@@ -229,8 +217,7 @@ public void confirm(CorrelationData correlationData, boolean ack, String cause)
229217

230218
public void publishObjectToWaitlistQueue(String message) {
231219
log.info("publishing message to message bus...");
232-
//rabbitMqTemplate().convertAndSend(message);
233-
waitListRabbitMqTemplate().convertAndSend(message);
220+
rabbitMqTemplate().convertAndSend(message);
234221
}
235222

236223
public void close() {

src/main/java/com/ericsson/ei/waitlist/WaitListWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class WaitListWorker {
3939

4040
static Logger log = (Logger) LoggerFactory.getLogger(WaitListWorker.class);
4141

42-
@Scheduled(initialDelay = 10, fixedRate = 10)
42+
@Scheduled(initialDelayString = "${waitlist.initialDelayResend}", fixedRateString = "${waitlist.fixedRateResend}")
4343
public void run() {
4444
RulesObject rulesObject = null;
4545
ArrayList<String> documents = waitListStorageHandler.getWaitList();

src/main/resources/application.properties

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ rabbitmq.queue.durable:true
1717
rabbitmq.routing.key:#
1818
rabbitmq.waitlist.queue.suffix:waitlist
1919

20-
rabbitmq.waitlist.queue.suffix:waitlist
21-
2220
mergeidmarker: %IdentifyRules%
2321

2422
mongodb.host: localhost
@@ -28,6 +26,8 @@ aggregated.collection.name: aggregated_objects
2826
event_object_map.collection.name: event_object_map
2927
waitlist.collection.name: wait_list
3028
waitlist.collection.ttlValue: 600
29+
waitlist.initialDelayResend: 2000
30+
waitlist.fixedRateResend: 15000
3131
database.name: eiffel_intelligence_eiffelxxx
3232
subscription.collection.name: subscription
3333

@@ -48,4 +48,4 @@ email.subject: Email Subscription Notification
4848
notification.failAttempt: 3
4949
notification.ttl.value: 600
5050

51-
er.url: http://localhost:8080/search/
51+
er.url: http://localhost:8080/search/

src/test/java/com/ericsson/ei/flowtests/FlowTest.java

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,17 @@
11
package com.ericsson.ei.flowtests;
22

3-
import com.ericsson.ei.handlers.ObjectHandler;
4-
import com.ericsson.ei.mongodbhandler.MongoDBHandler;
5-
import com.ericsson.ei.rmqhandler.RmqHandler;
6-
import com.fasterxml.jackson.databind.JsonNode;
7-
import com.fasterxml.jackson.databind.ObjectMapper;
8-
import com.mongodb.MongoClient;
9-
import com.mongodb.client.MongoCollection;
10-
import com.mongodb.client.MongoDatabase;
11-
import com.rabbitmq.client.Channel;
12-
import com.rabbitmq.client.Connection;
13-
import com.rabbitmq.client.ConnectionFactory;
14-
import de.flapdoodle.embed.mongo.distribution.Version;
15-
import de.flapdoodle.embed.mongo.tests.MongodForTestsFactory;
16-
import org.apache.commons.io.FileUtils;
17-
import org.apache.qpid.server.Broker;
18-
import org.apache.qpid.server.BrokerOptions;
19-
import org.junit.AfterClass;
20-
import org.junit.BeforeClass;
21-
import org.junit.Test;
223
import org.junit.runner.RunWith;
234
import org.slf4j.Logger;
245
import org.slf4j.LoggerFactory;
25-
import org.springframework.amqp.core.BindingBuilder;
26-
import org.springframework.amqp.core.Queue;
27-
import org.springframework.amqp.core.TopicExchange;
28-
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
29-
import org.springframework.amqp.rabbit.core.RabbitAdmin;
30-
import org.springframework.beans.factory.annotation.Autowired;
31-
import org.springframework.beans.factory.annotation.Value;
6+
327
import org.springframework.boot.test.context.SpringBootTest;
338
import org.springframework.test.context.TestPropertySource;
349
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
3510

36-
import javax.annotation.PostConstruct;
37-
import java.io.File;
3811
import java.util.ArrayList;
39-
import java.util.concurrent.TimeUnit;
4012

41-
import static org.junit.Assert.assertEquals;
4213

4314
@RunWith(SpringJUnit4ClassRunner.class)
44-
@TestPropertySource(properties = "myConf.myProp=valueInTest")
4515
@SpringBootTest
4616
public class FlowTest extends FlowTestBase {
4717

src/test/java/com/ericsson/ei/flowtests/FlowTest2.java

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,17 @@
11
package com.ericsson.ei.flowtests;
22

3-
import com.ericsson.ei.handlers.ObjectHandler;
4-
import com.ericsson.ei.mongodbhandler.MongoDBHandler;
5-
import com.ericsson.ei.rmqhandler.RmqHandler;
63
import com.fasterxml.jackson.databind.JsonNode;
74
import com.fasterxml.jackson.databind.ObjectMapper;
8-
import com.mongodb.MongoClient;
9-
import com.mongodb.client.MongoCollection;
10-
import com.mongodb.client.MongoDatabase;
11-
import com.rabbitmq.client.Channel;
12-
import com.rabbitmq.client.Connection;
13-
import com.rabbitmq.client.ConnectionFactory;
14-
import de.flapdoodle.embed.mongo.distribution.Version;
15-
import de.flapdoodle.embed.mongo.tests.MongodForTestsFactory;
165
import org.apache.commons.io.FileUtils;
17-
import org.apache.qpid.server.Broker;
18-
import org.apache.qpid.server.BrokerOptions;
19-
import org.junit.AfterClass;
20-
import org.junit.BeforeClass;
21-
import org.junit.Test;
226
import org.junit.runner.RunWith;
237
import org.slf4j.Logger;
248
import org.slf4j.LoggerFactory;
25-
import org.springframework.amqp.core.BindingBuilder;
26-
import org.springframework.amqp.core.Queue;
27-
import org.springframework.amqp.core.TopicExchange;
28-
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
29-
import org.springframework.amqp.rabbit.core.RabbitAdmin;
30-
import org.springframework.beans.factory.annotation.Autowired;
31-
import org.springframework.beans.factory.annotation.Value;
9+
3210
import org.springframework.boot.test.context.SpringBootTest;
3311
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
3412

35-
import javax.annotation.PostConstruct;
3613
import java.io.File;
3714
import java.util.ArrayList;
38-
import java.util.concurrent.TimeUnit;
39-
4015
import static org.junit.Assert.assertEquals;
4116

4217
@RunWith(SpringJUnit4ClassRunner.class)
@@ -64,20 +39,6 @@ protected ArrayList<String> getEventNamesToSend() {
6439
return eventNames;
6540
}
6641

67-
protected void waitForEventsToBeProcessed(int eventsCount) {
68-
// wait for all events to be processed
69-
int processedEvents = 0;
70-
while (processedEvents < eventsCount) {
71-
String countStr = System.getProperty("eiffel.intelligence.processedEventsCount");
72-
String waitingCountStr = System.getProperty("eiffel.intelligence.waitListEventsCount");
73-
if (waitingCountStr == null)
74-
waitingCountStr = "0";
75-
Properties props = admin.getQueueProperties(queue.getName());
76-
int messageCount = Integer.parseInt(props.get("QUEUE_MESSAGE_COUNT").toString());
77-
processedEvents = Integer.parseInt(countStr) - Integer.parseInt(waitingCountStr) - messageCount;
78-
}
79-
}
80-
8142
protected void checkResult() {
8243
try {
8344
String document = objectHandler.findObjectById("6acc3c87-75e0-4b6d-88f5-b1a5d4e62b43");

src/test/java/com/ericsson/ei/flowtests/FlowTestBase.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import java.io.File;
66
import java.io.IOException;
77
import java.util.ArrayList;
8-
import java.util.Properties;
8+
import java.util.concurrent.TimeUnit;
99

1010
import javax.annotation.PostConstruct;
1111

@@ -23,13 +23,16 @@
2323
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
2424
import org.springframework.amqp.rabbit.core.RabbitAdmin;
2525
import org.springframework.beans.factory.annotation.Autowired;
26+
import org.springframework.beans.factory.annotation.Value;
2627

2728
import com.ericsson.ei.handlers.ObjectHandler;
2829
import com.ericsson.ei.mongodbhandler.MongoDBHandler;
2930
import com.ericsson.ei.rmqhandler.RmqHandler;
3031
import com.fasterxml.jackson.databind.JsonNode;
3132
import com.fasterxml.jackson.databind.ObjectMapper;
3233
import com.mongodb.MongoClient;
34+
import com.mongodb.client.MongoCollection;
35+
import com.mongodb.client.MongoDatabase;
3336
import com.rabbitmq.client.Channel;
3437
import com.rabbitmq.client.Connection;
3538
import com.rabbitmq.client.ConnectionFactory;
@@ -50,6 +53,9 @@ public class FlowTestBase {
5053
@Autowired
5154
private MongoDBHandler mongoDBHandler;
5255

56+
@Value("${database.name}") private String database;
57+
@Value("${event_object_map.collection.name}") private String event_map;
58+
5359
public static File qpidConfig = null;
5460
static AMQPBrokerManager amqpBrocker;
5561
protected static MongodForTestsFactory testsFactory;
@@ -106,7 +112,8 @@ public static void setUpMessageBus() throws Exception {
106112
System.setProperty("rabbitmq.port", "8672");
107113
System.setProperty("rabbitmq.user", "guest");
108114
System.setProperty("rabbitmq.password", "guest");
109-
115+
System.setProperty("waitlist.initialDelayResend", "10");
116+
System.setProperty("waitlist.fixedRateResend", "10");
110117

111118
String config = "src/test/resources/configs/qpidConfig.json";
112119
jsonFileContent = FileUtils.readFileToString(new File(jsonFilePath));
@@ -182,17 +189,25 @@ protected ArrayList<String> getEventNamesToSend() {
182189
return eventNames;
183190
}
184191

192+
// count documents that were processed
193+
private long countProcessedEvents(String database, String collection){
194+
MongoDatabase db = mongoClient.getDatabase(database);
195+
MongoCollection table = db.getCollection(collection);
196+
long countedDocuments = table.count();
197+
return countedDocuments;
198+
}
199+
185200
protected void waitForEventsToBeProcessed(int eventsCount) {
186-
int processedEvents = 0;
187-
while (processedEvents < eventsCount) {
188-
String countStr = System.getProperty("eiffel.intelligence.processedEventsCount");
189-
String waitingCountStr = System.getProperty("eiffel.intelligence.waitListEventsCount");
190-
if (waitingCountStr == null)
191-
waitingCountStr = "0";
192-
Properties props = admin.getQueueProperties(queue.getName());
193-
int messageCount = Integer.parseInt(props.get("QUEUE_MESSAGE_COUNT").toString());
194-
processedEvents = Integer.parseInt(countStr) - Integer.parseInt(waitingCountStr) - messageCount;
195-
}
201+
// wait for all events to be processed
202+
long processedEvents = 0;
203+
while (processedEvents < eventsCount) {
204+
processedEvents = countProcessedEvents(database, event_map);
205+
}
206+
try {
207+
TimeUnit.MILLISECONDS.sleep(100);
208+
} catch (Exception e) {
209+
log.info(e.getMessage(),e);
210+
}
196211
}
197212

198213
protected void checkResult() {

0 commit comments

Comments
 (0)