Skip to content

Commit 5461175

Browse files
Multiple routing key bindings in RabbitMQ (#457)
* Add support for multiple binding keys and send waitlist events with reserved key
1 parent face285 commit 5461175

File tree

10 files changed

+254
-63
lines changed

10 files changed

+254
-63
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.ericsson.ei.rabbitmq.configuration;
2+
3+
import org.junit.runner.RunWith;
4+
5+
import cucumber.api.CucumberOptions;
6+
import cucumber.api.junit.Cucumber;
7+
8+
@RunWith(Cucumber.class)
9+
@CucumberOptions(features = "src/functionaltests/resources/features/rabbitMQConfiguration.feature", glue = {
10+
"com.ericsson.ei.rabbitmq.configuration" }, plugin = { "html:target/cucumber-reports/RabbitMQConfigurationTestRunner" })
11+
public class RabbitMQConfigurationTestRunner {
12+
13+
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package com.ericsson.ei.rabbitmq.configuration;
2+
3+
import static org.junit.Assert.assertEquals;
4+
5+
import java.io.File;
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
9+
import org.junit.Ignore;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
import org.springframework.amqp.core.BindingBuilder;
13+
import org.springframework.amqp.core.Queue;
14+
import org.springframework.amqp.core.TopicExchange;
15+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
16+
import org.springframework.amqp.rabbit.core.RabbitAdmin;
17+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
18+
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
19+
import org.springframework.beans.factory.annotation.Autowired;
20+
import org.springframework.beans.factory.annotation.Qualifier;
21+
import org.springframework.beans.factory.annotation.Value;
22+
import org.springframework.test.context.TestPropertySource;
23+
import org.springframework.util.SocketUtils;
24+
25+
import com.ericsson.ei.handlers.EventHandler;
26+
import com.ericsson.ei.handlers.RmqHandler;
27+
import com.ericsson.ei.utils.AMQPBrokerManager;
28+
import com.ericsson.ei.utils.FunctionalTestBase;
29+
30+
import cucumber.api.java.en.Given;
31+
import cucumber.api.java.en.Then;
32+
import cucumber.api.java.en.When;
33+
34+
@Ignore
35+
@TestPropertySource(properties = {
36+
"spring.data.mongodb.database: RabbitMQConfigurationTestSteps",
37+
"missedNotificationDataBaseName: RabbitMQConfigurationTestSteps-missedNotifications",
38+
"rabbitmq.exchange.name: RabbitMQConfigurationTestSteps-exchange",
39+
"rabbitmq.consumerName: RabbitMQConfigurationTestStepsConsumer" })
40+
public class RabbitMQConfigurationTestSteps extends FunctionalTestBase {
41+
42+
@Value("${rabbitmq.port}")
43+
private String rabbitMQPort;
44+
45+
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConfigurationTestSteps.class);
46+
private static final String EIFFEL_EVENTS = "src/functionaltests/resources/eiffel_events_for_test.json";
47+
48+
private static final String ROUTING_KEY_1 = "routing-key-1";
49+
private static final String ROUTING_KEY_2 = "routing-key-2";
50+
51+
private AMQPBrokerManager amqpBroker;
52+
53+
@Autowired
54+
@Qualifier("bindToQueueForRecentEvents")
55+
SimpleMessageListenerContainer container;
56+
57+
@Autowired
58+
EventHandler eventHandler;
59+
60+
@Given("^We are connected to message bus$")
61+
public void connect_to_message_bus() throws Exception {
62+
int port = SocketUtils.findAvailableTcpPort();
63+
String config = "src/functionaltests/resources/configs/qpidConfig.json";
64+
File qpidConfig = new File(config);
65+
amqpBroker = new AMQPBrokerManager(qpidConfig.getAbsolutePath(), port);
66+
amqpBroker.startBroker();
67+
68+
RmqHandler rmqHandler = eventManager.getRmqHandler();
69+
rmqHandler.setPort(port);
70+
rmqHandler.connectionFactory();
71+
rmqHandler.getCachingConnectionFactory().createConnection();
72+
73+
RabbitAdmin rabbitAdmin = createExchange(rmqHandler);
74+
RabbitTemplate rabbitTemplate = rabbitAdmin.getRabbitTemplate();
75+
76+
rmqHandler.setRabbitTemplate(rabbitTemplate);
77+
rmqHandler.getContainer().setRabbitAdmin(rabbitAdmin);
78+
rmqHandler.getContainer().setConnectionFactory(rmqHandler.getCachingConnectionFactory());
79+
rmqHandler.getContainer().setQueueNames(rmqHandler.getQueueName());
80+
assertEquals("Expected message bus to be up", true, amqpBroker.isRunning);
81+
}
82+
83+
@When("^events are published using different routing keys$")
84+
public void events_are_published_using_different_routing_keys() throws Exception {
85+
LOGGER.debug("Sending eiffel events");
86+
List<String> eventNames = getEventNamesToSend();
87+
eventNames.remove(1);
88+
eventManager.getRmqHandler().rabbitMqTemplate().setRoutingKey(ROUTING_KEY_1);
89+
eventManager.sendEiffelEvents(EIFFEL_EVENTS, eventNames);
90+
eventNames.clear();
91+
eventNames = getEventNamesToSend();
92+
eventNames.remove(0);
93+
eventManager.getRmqHandler().rabbitMqTemplate().setRoutingKey(ROUTING_KEY_2);
94+
eventManager.sendEiffelEvents(EIFFEL_EVENTS, eventNames);
95+
}
96+
97+
@Then("^an aggregated object should be created$")
98+
public void an_aggregated_object_should_be_created() throws Exception {
99+
List<String> arguments = new ArrayList<>();
100+
arguments.add("_id=6acc3c87-75e0-4b6d-88f5-b1a5d4e62b43");
101+
arguments.add("uri=https://myrepository.com/mySubSystemArtifact");
102+
List<String> missingArguments = dbManager.verifyAggregatedObjectInDB(arguments);
103+
assertEquals("The following arguments are missing in the Aggregated Object in mongoDB: "
104+
+ missingArguments.toString(), 0, missingArguments.size());
105+
}
106+
107+
/**
108+
* This method collects all the event names of events we will send to the
109+
* message bus.
110+
*/
111+
protected List<String> getEventNamesToSend() {
112+
List<String> eventNames = new ArrayList<>();
113+
eventNames.add("event_EiffelArtifactCreatedEvent_3");
114+
eventNames.add("event_EiffelArtifactPublishedEvent_3");
115+
return eventNames;
116+
}
117+
118+
private RabbitAdmin createExchange(final RmqHandler rmqHandler) {
119+
final String exchangeName = rmqHandler.getExchangeName();
120+
final String queueName = rmqHandler.getQueueName();
121+
final CachingConnectionFactory ccf = rmqHandler.getCachingConnectionFactory();
122+
LOGGER.info("Creating exchange: {} and queue: {}", exchangeName, queueName);
123+
RabbitAdmin admin = new RabbitAdmin(ccf);
124+
Queue queue = new Queue(queueName, true);
125+
admin.declareQueue(queue);
126+
final TopicExchange exchange = new TopicExchange(exchangeName, true, false);
127+
admin.declareExchange(exchange);
128+
admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_1));
129+
admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_2));
130+
admin.initialize();
131+
admin.getQueueProperties(queueName);
132+
RabbitTemplate rabbitTemplate = admin.getRabbitTemplate();
133+
rabbitTemplate.setExchange(exchangeName);
134+
rabbitTemplate.setQueue(queueName);
135+
rabbitTemplate.setRoutingKey(ROUTING_KEY_1);
136+
return admin;
137+
}
138+
139+
}
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.ericsson.ei.rabbitmq;
1+
package com.ericsson.ei.rabbitmq.connection;
22

33
import org.junit.runner.RunWith;
44

@@ -7,7 +7,7 @@
77

88
@RunWith(Cucumber.class)
99
@CucumberOptions(features = "src/functionaltests/resources/features/rabbitMQTestConnection.feature", glue = {
10-
"com.ericsson.ei.rabbitmq" }, plugin = { "html:target/cucumber-reports/RabbitMQTestConnectionRunner" })
10+
"com.ericsson.ei.rabbitmq.connection" }, plugin = { "html:target/cucumber-reports/RabbitMQTestConnectionRunner" })
1111
public class RabbitMQTestConnectionRunner {
1212

1313
}

src/functionaltests/java/com/ericsson/ei/rabbitmq/RabbitMQTestConnectionSteps.java renamed to src/functionaltests/java/com/ericsson/ei/rabbitmq/connection/RabbitMQTestConnectionSteps.java

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.ericsson.ei.rabbitmq;
1+
package com.ericsson.ei.rabbitmq.connection;
22

33
import static org.junit.Assert.assertEquals;
44

@@ -16,9 +16,7 @@
1616
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
1717
import org.springframework.amqp.rabbit.core.RabbitAdmin;
1818
import org.springframework.amqp.rabbit.core.RabbitTemplate;
19-
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
2019
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
21-
import org.springframework.amqp.rabbit.support.CorrelationData;
2220
import org.springframework.beans.factory.annotation.Autowired;
2321
import org.springframework.beans.factory.annotation.Qualifier;
2422
import org.springframework.beans.factory.annotation.Value;
@@ -46,7 +44,9 @@ public class RabbitMQTestConnectionSteps extends FunctionalTestBase {
4644
private String rabbitMQPort;
4745

4846
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQTestConnectionSteps.class);
49-
private static final String EIFFEL_EVENTS = "src/functionaltests/resources/eiffel_events_for_thread_testing.json";
47+
private static final String EIFFEL_EVENTS = "src/functionaltests/resources/eiffel_events_for_test.json";
48+
49+
private static final String DEFAULT_ROUTING_KEY = "#";
5050

5151
private AMQPBrokerManager amqpBroker;
5252

@@ -72,12 +72,6 @@ public void connect_to_message_bus() throws Exception {
7272

7373
RabbitAdmin rabbitAdmin = createExchange(rmqHandler);
7474
RabbitTemplate rabbitTemplate = rabbitAdmin.getRabbitTemplate();
75-
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
76-
@Override
77-
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
78-
LOGGER.info("Received confirm with result : {}", ack);
79-
}
80-
});
8175

8276
rmqHandler.setRabbitTemplate(rabbitTemplate);
8377
rmqHandler.getContainer().setRabbitAdmin(rabbitAdmin);
@@ -111,7 +105,7 @@ public void can_send_events_which_are_put_in_the_waitlist() throws Exception {
111105
TimeUnit.SECONDS.sleep(2);
112106
waitListSize = dbManager.waitListSize();
113107
}
114-
assertEquals(4, waitListSize);
108+
assertEquals(1, waitListSize);
115109
}
116110

117111
/**
@@ -120,10 +114,7 @@ public void can_send_events_which_are_put_in_the_waitlist() throws Exception {
120114
*/
121115
protected List<String> getEventNamesToSend() {
122116
List<String> eventNames = new ArrayList<>();
123-
eventNames.add("event_EiffelConfidenceLevelModifiedEvent_3_2");
124117
eventNames.add("event_EiffelArtifactPublishedEvent_3");
125-
eventNames.add("event_EiffelTestCaseTriggeredEvent_3");
126-
eventNames.add("event_EiffelTestCaseStartedEvent_3");
127118
return eventNames;
128119
}
129120

@@ -137,13 +128,13 @@ private RabbitAdmin createExchange(final RmqHandler rmqHandler) {
137128
admin.declareQueue(queue);
138129
final TopicExchange exchange = new TopicExchange(exchangeName, true, false);
139130
admin.declareExchange(exchange);
140-
admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("#"));
131+
admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(DEFAULT_ROUTING_KEY));
141132
admin.initialize();
142133
admin.getQueueProperties(queueName);
143134
RabbitTemplate rabbitTemplate = admin.getRabbitTemplate();
144135
rabbitTemplate.setExchange(exchangeName);
145-
rabbitTemplate.setRoutingKey(rmqHandler.getBindingKey());
146136
rabbitTemplate.setQueue(queueName);
137+
rabbitTemplate.setRoutingKey(DEFAULT_ROUTING_KEY);
147138
return admin;
148139
}
149140

src/functionaltests/java/com/ericsson/ei/threadingAndWaitlistRepeat/ThreadingAndWaitlistRepeatSteps.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -106,26 +106,6 @@ public void when_waitlist_has_resent_events_they_should_have_been_deleted() thro
106106
dbManager.waitListSize());
107107
}
108108

109-
@Then("^correct amount of threads should be spawned$")
110-
public void correct_amount_of_threads_should_be_spawned() throws Throwable {
111-
List<String> threadsSpawned = new ArrayList<>();
112-
String port = environment.getProperty("local.server.port");
113-
String eventHandlerThreadNamePattern = String.format("EventHandler-(\\d+)-%s", port);
114-
Pattern pattern = Pattern.compile(eventHandlerThreadNamePattern);
115-
116-
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
117-
Thread[] threadArray = threadSet.toArray(new Thread[threadSet.size()]);
118-
for (Thread thread : threadArray) {
119-
Matcher matcher = pattern.matcher(thread.getName());
120-
if (matcher.find() && !matcher.group(1).equals("")) {
121-
if (!threadsSpawned.contains(matcher.group(1))) {
122-
threadsSpawned.add(matcher.group(1));
123-
}
124-
}
125-
}
126-
assertEquals(getEventNamesToSend().size() - queueCapacity, threadsSpawned.size());
127-
}
128-
129109
@Then("^after the time to live has ended, the waitlist should be empty$")
130110
public void after_the_time_to_live_has_ended_the_waitlist_should_be_empty() throws Throwable {
131111
long stopTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(waitlistTtl + 60);
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
@RabbitMQConfiguration
2+
Feature: Test Rabbit MQ Configuration
3+
4+
@RabbitMQConfigurationMultipleRoutingKeysScenario
5+
Scenario: Test that aggregations are done when events are sent with different routing keys
6+
Given We are connected to message bus
7+
When events are published using different routing keys
8+
Then an aggregated object should be created

src/functionaltests/resources/features/threadingAndWaitlistRepeat.feature

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,4 @@ Feature: Test Threading and Waitlist Repeat
88
And no event is aggregated
99
And event-to-object-map is manipulated to include the sent events
1010
And when waitlist has resent events they should have been deleted
11-
And correct amount of threads should be spawned
1211
And after the time to live has ended, the waitlist should be empty

0 commit comments

Comments
 (0)