Skip to content

Commit aa87caa

Browse files
Waitlist queue is now created and add support for multiple binding keys (#452)
* Add separate queue and routing key for waitlist * Add support for multiple binding keys * Update rmq property description in documentation
1 parent 40b404d commit aa87caa

File tree

11 files changed

+208
-51
lines changed

11 files changed

+208
-51
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.ericsson.ei.rabbitmq.configuration;
2+
3+
import org.junit.runner.RunWith;
4+
5+
import cucumber.api.CucumberOptions;
6+
import cucumber.api.junit.Cucumber;
7+
8+
@RunWith(Cucumber.class)
9+
@CucumberOptions(features = "src/functionaltests/resources/features/rabbitMQConfiguration.feature", glue = {
10+
"com.ericsson.ei.rabbitmq.configuration" }, plugin = { "html:target/cucumber-reports/RabbitMQTestConfigurationRunner" })
11+
public class RabbitMQConfigurationTestRunner {
12+
13+
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package com.ericsson.ei.rabbitmq.configuration;
2+
3+
import static org.junit.Assert.assertEquals;
4+
5+
import java.io.File;
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
9+
import org.junit.Ignore;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
import org.springframework.amqp.core.BindingBuilder;
13+
import org.springframework.amqp.core.Queue;
14+
import org.springframework.amqp.core.TopicExchange;
15+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
16+
import org.springframework.amqp.rabbit.core.RabbitAdmin;
17+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
18+
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
19+
import org.springframework.beans.factory.annotation.Autowired;
20+
import org.springframework.beans.factory.annotation.Qualifier;
21+
import org.springframework.beans.factory.annotation.Value;
22+
import org.springframework.test.context.TestPropertySource;
23+
import org.springframework.util.SocketUtils;
24+
25+
import com.ericsson.ei.handlers.EventHandler;
26+
import com.ericsson.ei.handlers.RMQHandler;
27+
import com.ericsson.ei.utils.AMQPBrokerManager;
28+
import com.ericsson.ei.utils.FunctionalTestBase;
29+
30+
import cucumber.api.java.en.Given;
31+
import cucumber.api.java.en.Then;
32+
import cucumber.api.java.en.When;
33+
34+
@Ignore
35+
@TestPropertySource(properties = {
36+
"spring.data.mongodb.database: RabbitMQConfigurationTestSteps",
37+
"missedNotificationDataBaseName: RabbitMQConfigurationTestSteps-missedNotifications",
38+
"rabbitmq.exchange.name: RabbitMQConfigurationTestSteps-exchange",
39+
"rabbitmq.consumerName: RabbitMQConfigurationTestStepsConsumer" })
40+
public class RabbitMQConfigurationTestSteps extends FunctionalTestBase {
41+
42+
@Value("${rabbitmq.port}")
43+
private String rabbitMQPort;
44+
45+
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConfigurationTestSteps.class);
46+
private static final String EIFFEL_EVENTS = "src/functionaltests/resources/eiffel_events_for_test.json";
47+
48+
private static final String ROUTING_KEY_1 = "routing-key-1";
49+
private static final String ROUTING_KEY_2 = "routing-key-2";
50+
51+
private AMQPBrokerManager amqpBroker;
52+
53+
@Autowired
54+
@Qualifier("bindToQueueForRecentEvents")
55+
SimpleMessageListenerContainer container;
56+
57+
@Autowired
58+
EventHandler eventHandler;
59+
60+
@Given("^We are connected to message bus$")
61+
public void connect_to_message_bus() throws Exception {
62+
int port = SocketUtils.findAvailableTcpPort();
63+
String config = "src/functionaltests/resources/configs/qpidConfig.json";
64+
File qpidConfig = new File(config);
65+
amqpBroker = new AMQPBrokerManager(qpidConfig.getAbsolutePath(), port);
66+
amqpBroker.startBroker();
67+
68+
RMQHandler rmqHandler = eventManager.getRmqHandler();
69+
rmqHandler.getRmqProperties().setPort(port);
70+
rmqHandler.connectionFactory();
71+
rmqHandler.getCachingConnectionFactory().createConnection();
72+
73+
RabbitAdmin rabbitAdmin = createExchange(rmqHandler);
74+
RabbitTemplate rabbitTemplate = rabbitAdmin.getRabbitTemplate();
75+
76+
rmqHandler.setRabbitTemplate(rabbitTemplate);
77+
rmqHandler.getContainer().setRabbitAdmin(rabbitAdmin);
78+
rmqHandler.getContainer().setConnectionFactory(rmqHandler.getCachingConnectionFactory());
79+
rmqHandler.getContainer().setQueueNames(rmqHandler.getRmqProperties().getQueueName());
80+
assertEquals("Expected message bus to be up", true, amqpBroker.isRunning);
81+
}
82+
83+
@When("^events are published using different routing keys$")
84+
public void events_are_published_using_different_routing_keys() throws Exception {
85+
LOGGER.debug("Sending eiffel events");
86+
List<String> eventNames = getEventNamesToSend();
87+
eventNames.remove(1);
88+
eventManager.getRmqHandler().rabbitMqTemplate().setRoutingKey(ROUTING_KEY_1);
89+
eventManager.sendEiffelEvents(EIFFEL_EVENTS, eventNames);
90+
eventNames.clear();
91+
eventNames = getEventNamesToSend();
92+
eventNames.remove(0);
93+
eventManager.getRmqHandler().rabbitMqTemplate().setRoutingKey(ROUTING_KEY_2);
94+
eventManager.sendEiffelEvents(EIFFEL_EVENTS, eventNames);
95+
}
96+
97+
@Then("^an aggregated object should be created$")
98+
public void an_aggregated_object_should_be_created() throws Exception {
99+
List<String> arguments = new ArrayList<>();
100+
arguments.add("_id=6acc3c87-75e0-4b6d-88f5-b1a5d4e62b43");
101+
arguments.add("uri=https://myrepository.com/mySubSystemArtifact");
102+
List<String> missingArguments = dbManager.verifyAggregatedObjectInDB(arguments);
103+
assertEquals("The following arguments are missing in the Aggregated Object in mongoDB: "
104+
+ missingArguments.toString(), 0, missingArguments.size());
105+
}
106+
107+
/**
108+
* This method collects all the event names of events we will send to the
109+
* message bus.
110+
*/
111+
protected List<String> getEventNamesToSend() {
112+
List<String> eventNames = new ArrayList<>();
113+
eventNames.add("event_EiffelArtifactCreatedEvent_3");
114+
eventNames.add("event_EiffelArtifactPublishedEvent_3");
115+
return eventNames;
116+
}
117+
118+
private RabbitAdmin createExchange(final RMQHandler rmqHandler) {
119+
final String exchangeName = rmqHandler.getRmqProperties().getExchangeName();
120+
final String queueName = rmqHandler.getRmqProperties().getQueueName();
121+
final CachingConnectionFactory ccf = rmqHandler.getCachingConnectionFactory();
122+
LOGGER.info("Creating exchange: {} and queue: {}", exchangeName, queueName);
123+
RabbitAdmin admin = new RabbitAdmin(ccf);
124+
Queue queue = new Queue(queueName, true);
125+
admin.declareQueue(queue);
126+
final TopicExchange exchange = new TopicExchange(exchangeName, true, false);
127+
admin.declareExchange(exchange);
128+
admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_1));
129+
admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_2));
130+
admin.initialize();
131+
admin.getQueueProperties(queueName);
132+
RabbitTemplate rabbitTemplate = admin.getRabbitTemplate();
133+
rabbitTemplate.setExchange(exchangeName);
134+
rabbitTemplate.setQueue(queueName);
135+
rabbitTemplate.setRoutingKey(ROUTING_KEY_1);
136+
return admin;
137+
}
138+
139+
}
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.ericsson.ei.rabbitmq;
1+
package com.ericsson.ei.rabbitmq.connection;
22

33
import org.junit.runner.RunWith;
44

@@ -7,7 +7,7 @@
77

88
@RunWith(Cucumber.class)
99
@CucumberOptions(features = "src/functionaltests/resources/features/rabbitMQTestConnection.feature", glue = {
10-
"com.ericsson.ei.rabbitmq" }, plugin = { "html:target/cucumber-reports/RabbitMQTestConnectionRunner" })
10+
"com.ericsson.ei.rabbitmq.connection" }, plugin = { "html:target/cucumber-reports/RabbitMQTestConnectionRunner" })
1111
public class RabbitMQTestConnectionRunner {
1212

1313
}

src/functionaltests/java/com/ericsson/ei/rabbitmq/RabbitMQTestConnectionSteps.java renamed to src/functionaltests/java/com/ericsson/ei/rabbitmq/connection/RabbitMQTestConnectionSteps.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.ericsson.ei.rabbitmq;
1+
package com.ericsson.ei.rabbitmq.connection;
22

33
import static org.junit.Assert.assertEquals;
44

@@ -145,7 +145,7 @@ private RabbitAdmin createExchange(final RMQHandler rmqHandler) {
145145
admin.getQueueProperties(queueName);
146146
RabbitTemplate rabbitTemplate = admin.getRabbitTemplate();
147147
rabbitTemplate.setExchange(exchangeName);
148-
rabbitTemplate.setRoutingKey(rmqProperties.getBindingKey());
148+
rabbitTemplate.setRoutingKey(RMQProperties.WAITLIST_BINDING_KEY);
149149
rabbitTemplate.setQueue(queueName);
150150
return admin;
151151
}

src/functionaltests/java/com/ericsson/ei/threadingAndWaitlistRepeat/ThreadingAndWaitlistRepeatSteps.java

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,11 @@
55

66
import java.util.ArrayList;
77
import java.util.List;
8-
import java.util.Set;
98
import java.util.concurrent.TimeUnit;
10-
import java.util.regex.Matcher;
11-
import java.util.regex.Pattern;
129

1310
import org.junit.Ignore;
1411
import org.springframework.beans.factory.annotation.Autowired;
1512
import org.springframework.beans.factory.annotation.Value;
16-
import org.springframework.core.env.Environment;
1713
import org.springframework.test.context.TestPropertySource;
1814

1915
import com.ericsson.ei.handlers.EventToObjectMapHandler;
@@ -44,15 +40,6 @@ public class ThreadingAndWaitlistRepeatSteps extends FunctionalTestBase {
4440
private static final String EIFFEL_EVENTS_JSON_PATH = "src/functionaltests/resources/eiffel_events_for_thread_testing.json";
4541
private static final String ID_RULE = "{" + "\"IdRule\": \"meta.id\"" + "}";
4642

47-
@Autowired
48-
private Environment environment;
49-
50-
@Value("${threads.core.pool.size}")
51-
private int corePoolSize;
52-
@Value("${threads.queue.capacity}")
53-
private int queueCapacity;
54-
@Value("${threads.max.pool.size}")
55-
private int maxPoolSize;
5643
@Value("${waitlist.collection.ttl}")
5744
private int waitlistTtl;
5845

@@ -107,26 +94,6 @@ public void when_waitlist_has_resent_events_they_should_have_been_deleted() thro
10794
dbManager.waitListSize());
10895
}
10996

110-
@Then("^correct amount of threads should be spawned$")
111-
public void correct_amount_of_threads_should_be_spawned() throws Throwable {
112-
List<String> threadsSpawned = new ArrayList<>();
113-
String port = environment.getProperty("local.server.port");
114-
String eventHandlerThreadNamePattern = String.format("EventHandler-(\\d+)-%s", port);
115-
Pattern pattern = Pattern.compile(eventHandlerThreadNamePattern);
116-
117-
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
118-
Thread[] threadArray = threadSet.toArray(new Thread[threadSet.size()]);
119-
for (Thread thread : threadArray) {
120-
Matcher matcher = pattern.matcher(thread.getName());
121-
if (matcher.find() && !matcher.group(1).equals("")) {
122-
if (!threadsSpawned.contains(matcher.group(1))) {
123-
threadsSpawned.add(matcher.group(1));
124-
}
125-
}
126-
}
127-
assertEquals(getEventNamesToSend().size() - queueCapacity, threadsSpawned.size());
128-
}
129-
13097
@Then("^after the time to live has ended, the waitlist should be empty$")
13198
public void after_the_time_to_live_has_ended_the_waitlist_should_be_empty() throws Throwable {
13299
long stopTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(waitlistTtl + 60);
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
@RabbitMQConfiguration
2+
Feature: Test Rabbit MQ Configuration
3+
4+
@RabbitMQConfigurationMultipleRoutingKeysScenario
5+
Scenario: Test that aggregations are done when events are sent with different routing keys
6+
Given We are connected to message bus
7+
When events are published using different routing keys
8+
Then an aggregated object should be created

src/functionaltests/resources/features/threadingAndWaitlistRepeat.feature

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,5 @@ Feature: Test Threading and Waitlist Repeat
77
Then waitlist should not be empty
88
And no event is aggregated
99
And event-to-object-map is manipulated to include the sent events
10-
And when waitlist has resent events they should have been deleted
11-
And correct amount of threads should be spawned
10+
And when waitlist has resent events they should have been deleted
1211
And after the time to live has ended, the waitlist should be empty

src/main/java/com/ericsson/ei/handlers/RMQHandler.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
*/
1717
package com.ericsson.ei.handlers;
1818

19+
import java.util.ArrayList;
20+
import java.util.List;
21+
1922
import org.apache.commons.lang3.StringUtils;
2023
import org.slf4j.Logger;
2124
import org.slf4j.LoggerFactory;
@@ -98,11 +101,10 @@ public ConnectionFactory connectionFactory() {
98101
public SimpleMessageListenerContainer bindToQueueForRecentEvents(
99102
ConnectionFactory springConnectionFactory,
100103
EventHandler eventHandler) {
101-
String queueName = rmqProperties.getQueueName();
102104
MessageListenerAdapter listenerAdapter = new EIMessageListenerAdapter(eventHandler);
103105
container = new SimpleMessageListenerContainer();
104106
container.setConnectionFactory(springConnectionFactory);
105-
container.setQueueNames(queueName);
107+
container.setQueueNames(rmqProperties.getQueueName(), rmqProperties.getWaitlistQueueName());
106108
container.setMessageListener(listenerAdapter);
107109
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
108110
container.setPrefetchCount(maxThreads);
@@ -118,9 +120,9 @@ public RabbitTemplate rabbitMqTemplate() {
118120
rabbitTemplate = new RabbitTemplate(connectionFactory());
119121
}
120122

123+
rabbitTemplate.setQueue(rmqProperties.getWaitlistQueueName());
121124
rabbitTemplate.setExchange(rmqProperties.getExchangeName());
122-
rabbitTemplate.setRoutingKey(rmqProperties.getBindingKey());
123-
rabbitTemplate.setQueue(rmqProperties.getQueueName());
125+
rabbitTemplate.setRoutingKey(RMQProperties.WAITLIST_BINDING_KEY);
124126
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
125127
@Override
126128
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
@@ -146,21 +148,41 @@ public void close() {
146148
}
147149

148150
@Bean
149-
protected Queue queue() {
151+
protected Queue externalQueue() {
150152
return new Queue(rmqProperties.getQueueName(), true);
151153
}
152154

155+
@Bean
156+
protected Queue internalQueue() {
157+
return new Queue(rmqProperties.getWaitlistQueueName(), true);
158+
}
159+
153160
@Bean
154161
protected TopicExchange exchange() {
155162
return new TopicExchange(rmqProperties.getExchangeName());
156163
}
157164

158165
@Bean
159-
protected Binding binding(Queue queue, TopicExchange exchange) {
160-
return BindingBuilder.bind(queue).to(exchange).with(rmqProperties.getBindingKey());
166+
protected Binding binding() {
167+
return BindingBuilder.bind(internalQueue()).to(exchange()).with(RMQProperties.WAITLIST_BINDING_KEY);
168+
}
169+
170+
@Bean
171+
public List<Binding> bindings() {
172+
String[] bingingKeysArray = splitBindingKeys(rmqProperties.getBindingKeys());
173+
List<Binding> bindingList = new ArrayList<Binding>();
174+
for (String bindingKey : bingingKeysArray) {
175+
bindingList.add(BindingBuilder.bind(externalQueue()).to(exchange()).with(bindingKey));
176+
}
177+
return bindingList;
161178
}
162179

163180
private boolean isRMQCredentialsSet() {
164181
return !StringUtils.isEmpty(rmqProperties.getUser()) && !StringUtils.isEmpty(rmqProperties.getPassword());
165182
}
183+
184+
private String[] splitBindingKeys(String bindingKeys) {
185+
String bindingKeysWithoutWhitespace = bindingKeys.replaceAll("\\s+", "");
186+
return bindingKeysWithoutWhitespace.split(",");
187+
}
166188
}

src/main/java/com/ericsson/ei/handlers/RMQProperties.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,15 @@ public class RMQProperties {
8181
@Getter
8282
@Setter
8383
@Value("${rabbitmq.binding.key}")
84-
private String bindingKey;
84+
private String bindingKeys;
8585

8686
@Getter
8787
@Setter
8888
@Value("${rabbitmq.queue.suffix}")
8989
private String queueSuffix;
9090

91+
public static final String WAITLIST_BINDING_KEY = "eiffel-intelligence.waitlist";
92+
9193
public String getQueueName() {
9294
final String durableName = this.queueDurable ? "durable" : "transient";
9395
return this.domainId + "." + this.componentName + "." + this.queueSuffix + "." + durableName;

src/test/java/com/ericsson/ei/handlers/test/RmqPropertiesTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public class RmqPropertiesTest {
4242
private Integer port = 5672;
4343
private String domainId = "EN1";
4444
private String componentName = "eiffelintelligence";
45-
private String bindingKey = "#";
45+
private String bindingKeys = "#";
4646
private String queueSuffix = "RmqHandlerTest";
4747
private String queueName = "EN1.eiffelintelligence.RmqHandlerTest.durable";
4848
private String waitlistQueueName = "EN1.eiffelintelligence.RmqHandlerTest.durable.waitlist";
@@ -82,7 +82,7 @@ public void getComponentNameTest() {
8282

8383
@Test
8484
public void getRoutingKeyTest() {
85-
assertThat(rmqProperties.getBindingKey(), is(equalTo(bindingKey)));
85+
assertThat(rmqProperties.getBindingKeys(), is(equalTo(bindingKeys)));
8686
}
8787

8888
@Test

0 commit comments

Comments
 (0)