Skip to content

Commit 21097ce

Browse files
Merge branch 'message_bus_acknowledgement' into master
2 parents 2473a17 + 2c081d0 commit 21097ce

File tree

12 files changed

+392
-344
lines changed

12 files changed

+392
-344
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.ericsson.ei.config;
2+
3+
import java.util.concurrent.Executor;
4+
5+
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
6+
import org.springframework.beans.factory.annotation.Value;
7+
import org.springframework.context.annotation.Configuration;
8+
import org.springframework.scheduling.annotation.AsyncConfigurer;
9+
import org.springframework.scheduling.annotation.EnableAsync;
10+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
11+
12+
@Configuration
13+
@EnableAsync
14+
public class SpringAsyncConfig implements AsyncConfigurer{
15+
16+
@Value("${threads.corePoolSize}") private int corePoolSize;
17+
@Value("${threads.queueCapacity}") private int queueCapacity;
18+
@Value("${threads.maxPoolSize}") private int maxPoolSize;
19+
20+
21+
@Override
22+
public Executor getAsyncExecutor() {
23+
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
24+
executor.setCorePoolSize(corePoolSize);
25+
executor.setQueueCapacity(queueCapacity);
26+
executor.setMaxPoolSize(maxPoolSize);
27+
executor.setThreadNamePrefix("EventHandler-");
28+
executor.initialize();
29+
return executor;
30+
}
31+
32+
33+
@Override
34+
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
35+
// TODO Auto-generated method stub
36+
return null;
37+
}
38+
}

src/main/java/com/ericsson/ei/handlers/EventHandler.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@
1818

1919
import java.util.concurrent.Executor;
2020

21+
import javax.annotation.PostConstruct;
22+
2123
@Component
2224
public class EventHandler {
23-
@Value("${threads.corePoolSize}") private int corePoolSize;
24-
@Value("${threads.queueCapacity}") private int queueCapacity;
25-
@Value("${threads.maxPoolSize}") private int maxPoolSize;
2625

2726
private static Logger log = LoggerFactory.getLogger(EventHandler.class);
2827

@@ -69,4 +68,15 @@ public void onMessage(Message message, Channel channel) throws Exception {
6968
channel.basicAck(deliveryTag, false);
7069
int breakHere = 1;
7170
}
71+
72+
@Async
73+
public void onMessage(Message message, Channel channel) throws Exception {
74+
byte[] messageBody = message.getBody();
75+
// String messageStr = new String(messageBody);
76+
eventReceived(messageBody);
77+
long deliveryTag = message.getMessageProperties().getDeliveryTag();
78+
// String queue = message.getMessageProperties().getConsumerQueue();
79+
channel.basicAck(deliveryTag, false);
80+
int breakHere = 1;
81+
}
7282
}

src/main/java/com/ericsson/ei/rmqhandler/RmqHandler.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class RmqHandler {
5555
private RabbitTemplate rabbitTemplate;
5656
private CachingConnectionFactory factory;
5757
private SimpleMessageListenerContainer container;
58+
private SimpleMessageListenerContainer waitlistContainer;
5859
static Logger log = (Logger) LoggerFactory.getLogger(RmqHandler.class);
5960

6061
public Boolean getQueueDurable() {
@@ -170,16 +171,28 @@ Binding binding(Queue queue, TopicExchange exchange) {
170171
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
171172
}
172173

174+
// @Bean
175+
// SimpleMessageListenerContainer bindToQueueForRecentEvents(ConnectionFactory factory, EventHandler eventHandler) {
176+
// String queueName = getQueueName();
177+
//// MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(eventHandler, "eventReceived");
178+
// MessageListenerAdapter listenerAdapter = new EIMessageListenerAdapter(eventHandler);
179+
// container = new SimpleMessageListenerContainer();
180+
// container.setConnectionFactory(factory);
181+
// container.setQueueNames(queueName);
182+
// container.setMessageListener(listenerAdapter);
183+
//// container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
184+
// return container;
185+
// }
186+
173187
@Bean
174-
SimpleMessageListenerContainer bindToQueueForRecentEvents(ConnectionFactory factory, EventHandler eventHandler) {
175-
String queueName = getQueueName();
188+
SimpleMessageListenerContainer bindToWaitlistQueueForRecentEvents(ConnectionFactory factory, EventHandler eventHandler) {
176189
MessageListenerAdapter listenerAdapter = new EIMessageListenerAdapter(eventHandler);
177-
container = new SimpleMessageListenerContainer();
178-
container.setConnectionFactory(factory);
179-
container.setQueueNames(queueName);
180-
container.setMessageListener(listenerAdapter);
181-
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
182-
return container;
190+
waitlistContainer = new SimpleMessageListenerContainer();
191+
waitlistContainer.setConnectionFactory(factory);
192+
waitlistContainer.setQueueNames(getWaitlistQueueName());
193+
waitlistContainer.setMessageListener(listenerAdapter);
194+
waitlistContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
195+
return waitlistContainer;
183196
}
184197

185198
public String getQueueName() {
@@ -193,7 +206,7 @@ public String getWaitlistQueueName() {
193206
}
194207

195208
@Bean
196-
public RabbitTemplate rabbitMqTemplate() {
209+
public RabbitTemplate waitListRabbitMqTemplate() {
197210
if (rabbitTemplate == null) {
198211
if (factory != null) {
199212
rabbitTemplate = new RabbitTemplate(factory);
@@ -214,13 +227,15 @@ public void confirm(CorrelationData correlationData, boolean ack, String cause)
214227
return rabbitTemplate;
215228
}
216229

217-
public void publishObjectToMessageBus(String message) {
230+
public void publishObjectToWaitlistQueue(String message) {
218231
log.info("publishing message to message bus...");
219-
rabbitMqTemplate().convertAndSend(message);
232+
//rabbitMqTemplate().convertAndSend(message);
233+
waitListRabbitMqTemplate().convertAndSend(message);
220234
}
221235

222236
public void close() {
223237
try {
238+
waitlistContainer.destroy();
224239
container.destroy();
225240
factory.destroy();
226241
} catch (Exception e) {

src/main/java/com/ericsson/ei/rules/RulesObject.java

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,66 @@ public JsonNode getJsonRulesObject() {
1515
}
1616

1717
public String getMatchIdRules() {
18-
return rulesObject.get("MatchIdRules").toString();
18+
JsonNode jsonNode = rulesObject.get("MatchIdRules");
19+
if (jsonNode != null)
20+
return jsonNode.toString();
21+
return "";
1922
}
2023

2124
public String getIdRule() {
22-
return rulesObject.get("IdRule").textValue();
25+
JsonNode jsonNode = rulesObject.get("IdRule");
26+
if (jsonNode != null)
27+
return jsonNode.textValue();
28+
return "";
2329
}
2430

2531
public String getIdentifyRules() {
26-
return rulesObject.get("IdentifyRules").textValue();
32+
JsonNode jsonNode = rulesObject.get("IdentifyRules");
33+
if (jsonNode != null)
34+
return jsonNode.textValue();
35+
return "";
2736
}
2837

2938
public String getExtractionRules() {
30-
return rulesObject.get("ExtractionRules").textValue();
39+
JsonNode jsonNode = rulesObject.get("ExtractionRules");
40+
if (jsonNode != null)
41+
return jsonNode.textValue();
42+
return "";
3143
}
3244

3345
public String getMergeRules() {
34-
return rulesObject.get("MergeResolverRules").textValue();
46+
JsonNode jsonNode = rulesObject.get("MergeResolverRules");
47+
if (jsonNode != null)
48+
return jsonNode.textValue();
49+
return "";
50+
}
51+
52+
public String getDownstreamIdentifyRules() {
53+
JsonNode jsonNode = rulesObject.get("DownstreamIdentifyRules");
54+
if (jsonNode != null)
55+
return jsonNode.textValue();
56+
return "";
57+
}
58+
59+
public String getDownstreamExtractionRules() {
60+
JsonNode jsonNode = rulesObject.get("DownstreamExtractionRules");
61+
if (jsonNode != null)
62+
return jsonNode.textValue();
63+
return "";
64+
}
65+
66+
public String getDownstreamMergeRules() {
67+
JsonNode jsonNode = rulesObject.get("DownstreamMergeRules");
68+
if (jsonNode != null)
69+
return jsonNode.textValue();
70+
return "";
3571
}
3672

3773
public String fetchProcessRules() {
38-
return rulesObject.get("ProcessRules").textValue();
74+
JsonNode jsonNode = rulesObject.get("ProcessRules");
75+
if (jsonNode != null)
76+
return jsonNode.textValue();
77+
return "";
3978
}
4079

4180
public boolean equals(Object other) {

src/main/java/com/ericsson/ei/waitlist/WaitListWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public void run() {
5353
for (final JsonNode idJsonObj : ids) {
5454
ArrayList<String> objects = matchIdRulesHandler.fetchObjectsById(rulesObject, idJsonObj.textValue());
5555
if (objects.size() > 0) {
56-
rmqHandler.publishObjectToMessageBus(event);
56+
rmqHandler.publishObjectToWaitlistQueue(event);
5757
waitListStorageHandler.dropDocumentFromWaitList(document);
5858
}
5959
}

src/main/resources/application.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ rabbitmq.queue.durable:true
1717
rabbitmq.routing.key:#
1818
rabbitmq.waitlist.queue.suffix:waitlist
1919

20+
rabbitmq.waitlist.queue.suffix:waitlist
21+
2022
mergeidmarker: %IdentifyRules%
2123

2224
mongodb.host: localhost

0 commit comments

Comments
 (0)