Skip to content

Commit 2360715

Browse files
author
Vali (Vasile Baluta)
committed
message bus acknowledgement
1 parent 147e479 commit 2360715

File tree

7 files changed

+117
-49
lines changed

7 files changed

+117
-49
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.ericsson.ei.config;
2+
3+
import java.util.concurrent.Executor;
4+
5+
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
6+
import org.springframework.beans.factory.annotation.Value;
7+
import org.springframework.context.annotation.Configuration;
8+
import org.springframework.scheduling.annotation.AsyncConfigurer;
9+
import org.springframework.scheduling.annotation.EnableAsync;
10+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
11+
12+
@Configuration
13+
@EnableAsync
14+
public class SpringAsyncConfig implements AsyncConfigurer{
15+
16+
@Value("${threads.corePoolSize}") private int corePoolSize;
17+
@Value("${threads.queueCapacity}") private int queueCapacity;
18+
@Value("${threads.maxPoolSize}") private int maxPoolSize;
19+
20+
21+
@Override
22+
public Executor getAsyncExecutor() {
23+
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
24+
executor.setCorePoolSize(corePoolSize);
25+
executor.setQueueCapacity(queueCapacity);
26+
executor.setMaxPoolSize(maxPoolSize);
27+
executor.setThreadNamePrefix("EventHandler-");
28+
executor.initialize();
29+
return executor;
30+
}
31+
32+
33+
@Override
34+
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
35+
// TODO Auto-generated method stub
36+
return null;
37+
}
38+
}

src/main/java/com/ericsson/ei/handlers/EventHandler.java

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,10 @@
1818

1919
import java.util.concurrent.Executor;
2020

21-
@Component
22-
public class EventHandler implements ChannelAwareMessageListener {
23-
24-
@Value("${threads.corePoolSize}") private int corePoolSize;
25-
@Value("${threads.queueCapacity}") private int queueCapacity;
26-
@Value("${threads.maxPoolSize}") private int maxPoolSize;
21+
import javax.annotation.PostConstruct;
2722

23+
@Component
24+
public class EventHandler {
2825

2926
private static Logger log = LoggerFactory.getLogger(EventHandler.class);
3027

@@ -39,18 +36,6 @@ public void eventReceived(String event) {
3936
idRulesHandler.runIdRules(eventRules, event);
4037
}
4138

42-
@Bean
43-
public Executor asyncExecutor() {
44-
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
45-
executor.setCorePoolSize(corePoolSize);
46-
executor.setQueueCapacity(queueCapacity);
47-
executor.setMaxPoolSize(maxPoolSize);
48-
executor.setThreadNamePrefix("EventHandler-");
49-
executor.initialize();
50-
return executor;
51-
}
52-
53-
@Async
5439
public void eventReceived(byte[] message) {
5540
log.info("Thread id " + Thread.currentThread().getId() + " spawned");
5641
String actualMessage = new String(message);
@@ -64,13 +49,14 @@ public void eventReceived(byte[] message) {
6449
}
6550
}
6651

67-
@Override
52+
@Async
6853
public void onMessage(Message message, Channel channel) throws Exception {
6954
byte[] messageBody = message.getBody();
55+
// String messageStr = new String(messageBody);
7056
eventReceived(messageBody);
7157
long deliveryTag = message.getMessageProperties().getDeliveryTag();
58+
// String queue = message.getMessageProperties().getConsumerQueue();
7259
channel.basicAck(deliveryTag, false);
7360
int breakHere = 1;
74-
7561
}
7662
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.ericsson.ei.rmqhandler;
2+
3+
import org.springframework.amqp.core.Message;
4+
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
5+
6+
import com.ericsson.ei.handlers.EventHandler;
7+
import com.rabbitmq.client.Channel;
8+
9+
public class EIMessageListenerAdapter extends MessageListenerAdapter {
10+
11+
public EIMessageListenerAdapter(Object delegate) {
12+
super(delegate);
13+
}
14+
15+
@Override
16+
public void onMessage(Message message, Channel channel) throws Exception {
17+
Object delegate = getDelegate();
18+
if (delegate != this) {
19+
if (delegate instanceof EventHandler) {
20+
if (channel != null) {
21+
((EventHandler) delegate).onMessage(message, channel);
22+
return;
23+
}
24+
}
25+
}
26+
}
27+
}

src/main/java/com/ericsson/ei/rmqhandler/RmqHandler.java

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ public class RmqHandler {
4646
private String domainId;
4747
@Value("${rabbitmq.componentName}")
4848
private String componentName;
49+
@Value("${rabbitmq.waitlist.queue.suffix}")
50+
private String waitlistSufix;
4951
@Value("${rabbitmq.routing.key}")
5052
private String routingKey;
5153
@Value("${rabbitmq.consumerName}")
@@ -54,6 +56,7 @@ public class RmqHandler {
5456
private RabbitTemplate rabbitTemplate;
5557
private CachingConnectionFactory factory;
5658
private SimpleMessageListenerContainer container;
59+
private SimpleMessageListenerContainer waitlistContainer;
5760
static Logger log = (Logger) LoggerFactory.getLogger(RmqHandler.class);
5861

5962
public Boolean getQueueDurable() {
@@ -169,26 +172,42 @@ Binding binding(Queue queue, TopicExchange exchange) {
169172
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
170173
}
171174

175+
// @Bean
176+
// SimpleMessageListenerContainer bindToQueueForRecentEvents(ConnectionFactory factory, EventHandler eventHandler) {
177+
// String queueName = getQueueName();
178+
//// MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(eventHandler, "eventReceived");
179+
// MessageListenerAdapter listenerAdapter = new EIMessageListenerAdapter(eventHandler);
180+
// container = new SimpleMessageListenerContainer();
181+
// container.setConnectionFactory(factory);
182+
// container.setQueueNames(queueName);
183+
// container.setMessageListener(listenerAdapter);
184+
//// container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
185+
// return container;
186+
// }
187+
172188
@Bean
173-
SimpleMessageListenerContainer bindToQueueForRecentEvents(ConnectionFactory factory, EventHandler eventHandler) {
174-
String queueName = getQueueName();
175-
// MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(eventHandler, "eventReceived");
176-
MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(eventHandler);
177-
container = new SimpleMessageListenerContainer();
178-
container.setConnectionFactory(factory);
179-
container.setQueueNames(queueName);
180-
container.setMessageListener(listenerAdapter);
181-
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
182-
return container;
189+
SimpleMessageListenerContainer bindToWaitlistQueueForRecentEvents(ConnectionFactory factory, EventHandler eventHandler) {
190+
MessageListenerAdapter listenerAdapter = new EIMessageListenerAdapter(eventHandler);
191+
waitlistContainer = new SimpleMessageListenerContainer();
192+
waitlistContainer.setConnectionFactory(factory);
193+
waitlistContainer.setQueueNames(getWaitlistQueueName());
194+
waitlistContainer.setMessageListener(listenerAdapter);
195+
waitlistContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
196+
return waitlistContainer;
183197
}
184198

185199
public String getQueueName() {
186200
String durableName = queueDurable ? "durable" : "transient";
187201
return domainId + "." + componentName + "." + consumerName + "." + durableName;
188202
}
189203

204+
public String getWaitlistQueueName() {
205+
String durableName = queueDurable ? "durable" : "transient";
206+
return domainId + "." + componentName + "." + consumerName + "." + durableName + "." + waitlistSufix;
207+
}
208+
190209
@Bean
191-
public RabbitTemplate rabbitMqTemplate() {
210+
public RabbitTemplate waitListRabbitMqTemplate() {
192211
if (rabbitTemplate == null) {
193212
if (factory != null) {
194213
rabbitTemplate = new RabbitTemplate(factory);
@@ -209,22 +228,14 @@ public void confirm(CorrelationData correlationData, boolean ack, String cause)
209228
return rabbitTemplate;
210229
}
211230

212-
public void publishObjectToMessageBus(String message) {
231+
public void publishObjectToWaitlistQueue(String message) {
213232
log.info("publishing message to message bus...");
214-
rabbitMqTemplate().convertAndSend(message);
215-
// Connection conn = factory.createConnection();
216-
// Channel channel = conn.createChannel(true);
217-
// String queueName = getQueueName();
218-
// String exchange = exchangeName;
219-
// try {
220-
// channel.basicPublish(exchange, queueName, null, message.getBytes());
221-
// } catch (Exception e) {
222-
// log.info(e.getMessage(),e);
223-
// }
233+
waitListRabbitMqTemplate().convertAndSend(message);
224234
}
225235

226236
public void close() {
227237
try {
238+
waitlistContainer.destroy();
228239
container.destroy();
229240
factory.destroy();
230241
} catch (Exception e) {

src/main/java/com/ericsson/ei/waitlist/WaitListWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public void run() {
5353
for (final JsonNode idJsonObj : ids) {
5454
ArrayList<String> objects = matchIdRulesHandler.fetchObjectsById(rulesObject, idJsonObj.textValue());
5555
if (objects.size() > 0) {
56-
rmqHandler.publishObjectToMessageBus(event);
56+
rmqHandler.publishObjectToWaitlistQueue(event);
5757
waitListStorageHandler.dropDocumentFromWaitList(document);
5858
}
5959
}

src/main/resources/application.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ rabbitmq.consumerName: messageConsumer
1616
rabbitmq.queue.durable:true
1717
rabbitmq.routing.key:#
1818

19+
rabbitmq.waitlist.queue.suffix:waitlist
20+
1921
mergeidmarker: %IdentifyRules%
2022

2123
mongodb.host: localhost

src/test/java/com/ericsson/ei/flowtests/FlowTest.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,16 +150,15 @@ public void flowTest() {
150150
try {
151151
String queueName = rmqHandler.getQueueName();
152152
Channel channel = conn.createChannel();
153-
String exchange = "ei-poc-4";
154-
createExchange(exchange, queueName);
155-
153+
String exchangeName = "ei-poc-4";
154+
createExchange();
156155

157156
ArrayList<String> eventNames = getEventNamesToSend();
158157
int eventsCount = eventNames.size();
159158
for(String eventName : eventNames) {
160159
JsonNode eventJson = parsedJason.get(eventName);
161160
String event = eventJson.toString();
162-
channel.basicPublish(exchange, queueName, null, event.getBytes());
161+
channel.basicPublish(exchangeName, queueName, null, event.getBytes());
163162
}
164163

165164
// wait for all events to be processed
@@ -197,11 +196,16 @@ private ArrayList<String> getEventNamesToSend() {
197196
return eventNames;
198197
}
199198

200-
private void createExchange(final String exchangeName, final String queueName) {
199+
private void createExchange() {
200+
String queueName = rmqHandler.getQueueName();
201+
String waitlistQueueName = rmqHandler.getWaitlistQueueName();
202+
String exchangeName = "ei-poc-4";
201203
final CachingConnectionFactory ccf = new CachingConnectionFactory(cf);
202204
admin = new RabbitAdmin(ccf);
203-
queue = new Queue(queueName, false);
205+
queue = new Queue(queueName, true);
206+
Queue waitlistQueue = new Queue(waitlistQueueName, true);
204207
admin.declareQueue(queue);
208+
admin.declareQueue(waitlistQueue);
205209
final TopicExchange exchange = new TopicExchange(exchangeName);
206210
admin.declareExchange(exchange);
207211
admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("#"));

0 commit comments

Comments
 (0)