Skip to content

Commit cf0eae1

Browse files
author
Vali (Vasile Baluta)
committed
add support for manual akcnowledgement when event has been processed
1 parent 263d2e6 commit cf0eae1

File tree

2 files changed

+19
-2
lines changed

2 files changed

+19
-2
lines changed

src/main/java/com/ericsson/ei/handlers/EventHandler.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@
22

33
import org.slf4j.Logger;
44
import org.slf4j.LoggerFactory;
5+
import org.springframework.amqp.core.Message;
6+
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
57
import org.springframework.beans.factory.annotation.Autowired;
68
import org.springframework.stereotype.Component;
79

810
import com.ericsson.ei.rules.RulesHandler;
911
import com.ericsson.ei.rules.RulesObject;
12+
import com.rabbitmq.client.Channel;
1013

1114
@Component
12-
public class EventHandler {
15+
public class EventHandler implements ChannelAwareMessageListener {
1316

1417
private static Logger log = LoggerFactory.getLogger(EventHandler.class);
1518

@@ -35,4 +38,14 @@ public void eventReceived(byte[] message) {
3538
System.setProperty("eiffel.intelligence.processedEventsCount", "" + count);
3639
}
3740
}
41+
42+
@Override
43+
public void onMessage(Message message, Channel channel) throws Exception {
44+
byte[] messageBody = message.getBody();
45+
eventReceived(messageBody);
46+
long deliveryTag = message.getMessageProperties().getDeliveryTag();
47+
channel.basicAck(deliveryTag, false);
48+
int breakHere = 1;
49+
50+
}
3851
}

src/main/java/com/ericsson/ei/rmqhandler/RmqHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44

55
import org.slf4j.Logger;
66
import org.slf4j.LoggerFactory;
7+
import org.springframework.amqp.core.AcknowledgeMode;
78
import org.springframework.amqp.core.Binding;
89
import org.springframework.amqp.core.BindingBuilder;
910
import org.springframework.amqp.core.Queue;
1011
import org.springframework.amqp.core.TopicExchange;
1112
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
1213
import org.springframework.amqp.rabbit.connection.Connection;
1314
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
15+
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
1416
import org.springframework.amqp.rabbit.core.RabbitTemplate;
1517
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
1618
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
@@ -170,11 +172,13 @@ Binding binding(Queue queue, TopicExchange exchange) {
170172
@Bean
171173
SimpleMessageListenerContainer bindToQueueForRecentEvents(ConnectionFactory factory, EventHandler eventHandler) {
172174
String queueName = getQueueName();
173-
MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(eventHandler, "eventReceived");
175+
// MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(eventHandler, "eventReceived");
176+
MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(eventHandler);
174177
container = new SimpleMessageListenerContainer();
175178
container.setConnectionFactory(factory);
176179
container.setQueueNames(queueName);
177180
container.setMessageListener(listenerAdapter);
181+
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
178182
return container;
179183
}
180184

0 commit comments

Comments
 (0)