Skip to content

Commit 313f3ed

Browse files
Add functional tests for reconnecting with message bus (#144)
* Add functional tests for reconnecting with message bus Refactored RmqHandler
1 parent 7497de0 commit 313f3ed

File tree

8 files changed

+183
-77
lines changed

8 files changed

+183
-77
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.ericsson.ei.rabbitmq;
2+
3+
import com.ericsson.ei.utils.BaseRunner;
4+
import org.junit.runner.RunWith;
5+
6+
import cucumber.api.CucumberOptions;
7+
import cucumber.api.junit.Cucumber;
8+
9+
@RunWith(Cucumber.class)
10+
@CucumberOptions(features = "src/functionaltests/resources/features/rabbitMQTestConnection.feature",
11+
glue = {"com.ericsson.ei.rabbitmq"}, plugin = {
12+
"html:target/cucumber-reports/RabbitMQTestConnectionRunner"})
13+
public class RabbitMQTestConnectionRunner extends BaseRunner {
14+
15+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package com.ericsson.ei.rabbitmq;
2+
3+
import com.ericsson.ei.utils.AMQPBrokerManager;
4+
import com.ericsson.ei.utils.FunctionalTestBase;
5+
import com.ericsson.ei.utils.TestContextInitializer;
6+
7+
import cucumber.api.java.en.Given;
8+
import cucumber.api.java.en.Then;
9+
import cucumber.api.java.en.When;
10+
import org.junit.Ignore;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
import org.springframework.beans.factory.annotation.Value;
14+
15+
import java.util.ArrayList;
16+
import java.util.List;
17+
import java.util.concurrent.TimeUnit;
18+
19+
import static org.junit.Assert.assertEquals;
20+
21+
@Ignore
22+
public class RabbitMQTestConnectionSteps extends FunctionalTestBase {
23+
24+
@Value("${rabbitmq.port}")
25+
private String rabbitMQPort;
26+
27+
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQTestConnectionSteps.class);
28+
private static final String EIFFEL_EVENTS = "src/functionaltests/resources/eiffel_events_for_thread_testing.json";
29+
30+
private AMQPBrokerManager amqpBroker;
31+
32+
@Given("^We are connected to message bus$")
33+
public void connect_to_message_bus() {
34+
amqpBroker = TestContextInitializer.getBroker(Integer.parseInt(rabbitMQPort));
35+
assertEquals("Expected message bus to be up",
36+
true, amqpBroker.isRunning);
37+
}
38+
39+
@When("^Message bus goes down$")
40+
public void message_bus_goes_down() {
41+
LOGGER.debug("Shutting down message bus");
42+
amqpBroker.stopBroker();
43+
assertEquals("Expected message bus to be down",
44+
false, amqpBroker.isRunning);
45+
}
46+
47+
@When("^Message bus is restarted$")
48+
public void message_bus_is_restarted() throws Exception {
49+
amqpBroker.startBroker();
50+
}
51+
52+
@Then("^I can send events which are put in the waitlist$")
53+
public void can_send_events_which_are_put_in_the_waitlist() throws Exception {
54+
LOGGER.debug("Sending eiffel events");
55+
int waitListSize = 0;
56+
List<String> eventNames = getEventNamesToSend();
57+
long maxTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30);
58+
59+
while (waitListSize != 4 && maxTime > System.currentTimeMillis()) {
60+
eventManager.sendEiffelEvents(EIFFEL_EVENTS, eventNames);
61+
TimeUnit.SECONDS.sleep(2);
62+
waitListSize = dbManager.waitListSize();
63+
}
64+
assertEquals(4, waitListSize);
65+
}
66+
67+
/**
68+
* This method collects all the event names of events we will send to the message bus.
69+
*/
70+
protected List<String> getEventNamesToSend() {
71+
List<String> eventNames = new ArrayList<>();
72+
eventNames.add("event_EiffelConfidenceLevelModifiedEvent_3_2");
73+
eventNames.add("event_EiffelArtifactPublishedEvent_3");
74+
eventNames.add("event_EiffelTestCaseTriggeredEvent_3");
75+
eventNames.add("event_EiffelTestCaseStartedEvent_3");
76+
return eventNames;
77+
}
78+
79+
}

src/functionaltests/java/com/ericsson/ei/utils/AMQPBrokerManager.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,31 @@
66
import org.apache.qpid.server.BrokerOptions;
77

88
public class AMQPBrokerManager {
9+
public boolean isRunning = false;
910

1011
private final Broker broker = new Broker();
1112
private String path;
12-
private int port;
13+
private String port;
1314

14-
public AMQPBrokerManager(String path, int port) {
15+
public AMQPBrokerManager(String path, String port) {
1516
super();
1617
this.path = path;
1718
this.port = port;
1819
}
1920

2021
public void startBroker() throws Exception {
2122
final BrokerOptions brokerOptions = new BrokerOptions();
22-
brokerOptions.setConfigProperty("qpid.amqp_port", "" + port);
23+
brokerOptions.setConfigProperty("qpid.amqp_port", port);
2324
brokerOptions.setConfigProperty("qpid.pass_file", "src/functionaltests/resources/configs/password.properties");
2425
brokerOptions.setConfigProperty("qpid.work_dir", Files.createTempDir().getAbsolutePath());
2526
brokerOptions.setInitialConfigurationLocation(path);
2627

2728
broker.startup(brokerOptions);
29+
isRunning = true;
2830
}
2931

3032
public void stopBroker() {
3133
broker.shutdown();
34+
isRunning = false;
3235
}
3336
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.ericsson.ei.utils;
2+
3+
import org.junit.AfterClass;
4+
5+
public class BaseRunner {
6+
7+
@AfterClass()
8+
public static void shutdownAmqpBroker() {
9+
String rabbitMQPort = System.getProperty("rabbitmq.port");
10+
TestContextInitializer.removeBroker(rabbitMQPort);
11+
}
12+
}
Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,55 @@
11
package com.ericsson.ei.utils;
22

33
import com.mongodb.MongoClient;
4-
import com.rabbitmq.client.Connection;
5-
import com.rabbitmq.client.ConnectionFactory;
64

75
import java.io.File;
86
import java.io.IOException;
7+
import java.util.HashMap;
8+
import java.util.Map;
99

1010
import org.apache.tomcat.util.codec.binary.Base64;
1111
import org.apache.tomcat.util.codec.binary.StringUtils;
1212
import org.slf4j.Logger;
1313
import org.slf4j.LoggerFactory;
14-
import org.springframework.amqp.core.BindingBuilder;
15-
import org.springframework.amqp.core.Queue;
16-
import org.springframework.amqp.core.TopicExchange;
17-
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
18-
import org.springframework.amqp.rabbit.core.RabbitAdmin;
1914
import org.springframework.util.SocketUtils;
2015

2116
import de.flapdoodle.embed.mongo.distribution.Version;
2217
import de.flapdoodle.embed.mongo.tests.MongodForTestsFactory;
2318

2419
public class TestConfigs {
2520

26-
private static AMQPBrokerManager amqpBroker;
27-
private static ConnectionFactory cf;
28-
private Connection conn;
29-
private MongodForTestsFactory testsFactory;
21+
private final static Logger LOGGER = LoggerFactory.getLogger(TestConfigs.class);
22+
3023
private MongoClient mongoClient = null;
3124

32-
final static Logger LOGGER = (Logger) LoggerFactory.getLogger(TestConfigs.class);
25+
protected static Map<Integer, AMQPBrokerManager> amqpBrokerMap = new HashMap<>();
3326

34-
public void amqpBroker() throws Exception {
27+
AMQPBrokerManager createAmqpBroker() throws Exception {
28+
// Generates a random port for amqpBroker and starts up a new broker
3529
int port = SocketUtils.findAvailableTcpPort();
36-
System.setProperty("rabbitmq.port", "" + port);
30+
31+
System.setProperty("rabbitmq.port", Integer.toString(port));
3732
System.setProperty("rabbitmq.user", "guest");
3833
System.setProperty("rabbitmq.password", "guest");
3934
System.setProperty("waitlist.initialDelayResend", "500");
4035
System.setProperty("waitlist.fixedRateResend", "3000");
4136

4237
String config = "src/functionaltests/resources/configs/qpidConfig.json";
4338
File qpidConfig = new File(config);
44-
amqpBroker = new AMQPBrokerManager(qpidConfig.getAbsolutePath(), port);
45-
amqpBroker.startBroker();
46-
cf = new ConnectionFactory();
47-
cf.setUsername("guest");
48-
cf.setPassword("guest");
49-
50-
cf.setPort(port);
51-
cf.setHandshakeTimeout(600000);
52-
cf.setConnectionTimeout(600000);
53-
conn = cf.newConnection();
39+
AMQPBrokerManager amqpBroker = new AMQPBrokerManager(qpidConfig.getAbsolutePath(), Integer.toString(port));
40+
5441
LOGGER.debug("Started embedded message bus for tests on port: " + port);
42+
amqpBroker.startBroker();
43+
44+
// add new amqp broker to pool
45+
amqpBrokerMap.put(port, amqpBroker);
46+
47+
return amqpBroker;
5548
}
5649

57-
public void mongoClient() throws IOException {
50+
void startUpMongoClient() throws IOException {
5851
try {
59-
testsFactory = MongodForTestsFactory.with(Version.V3_4_1);
52+
MongodForTestsFactory testsFactory = MongodForTestsFactory.with(Version.V3_4_1);
6053
mongoClient = testsFactory.newMongo();
6154
String port = "" + mongoClient.getAddress().getPort();
6255
System.setProperty("spring.data.mongodb.port", port);
@@ -66,7 +59,7 @@ public void mongoClient() throws IOException {
6659
}
6760
}
6861

69-
public void setAuthorization() {
62+
void setAuthorization() {
7063
String password = StringUtils.newStringUtf8(Base64.encodeBase64("password".getBytes()));
7164
System.setProperty("ldap.enabled", "true");
7265
System.setProperty("ldap.url", "ldap://ldap.forumsys.com:389/dc=example,dc=com");
@@ -76,14 +69,13 @@ public void setAuthorization() {
7669
System.setProperty("ldap.user.filter", "uid={0}");
7770
}
7871

79-
public void createExchange(final String exchangeName, final String queueName) {
80-
final CachingConnectionFactory ccf = new CachingConnectionFactory(cf);
81-
RabbitAdmin admin = new RabbitAdmin(ccf);
82-
Queue queue = new Queue(queueName, false);
83-
admin.declareQueue(queue);
84-
final TopicExchange exchange = new TopicExchange(exchangeName);
85-
admin.declareExchange(exchange);
86-
admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("#"));
87-
ccf.destroy();
72+
public static AMQPBrokerManager getBroker(int port) {
73+
return amqpBrokerMap.get(port);
74+
}
75+
76+
public static void removeBroker(String port) {
77+
AMQPBrokerManager broker = amqpBrokerMap.get(Integer.parseInt(port));
78+
broker.stopBroker();
79+
amqpBrokerMap.remove(port);
8880
}
8981
}
Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,25 @@
11
package com.ericsson.ei.utils;
22

3+
34
import org.slf4j.Logger;
45
import org.slf4j.LoggerFactory;
56
import org.springframework.context.ApplicationContextInitializer;
67
import org.springframework.context.ConfigurableApplicationContext;
78

9+
810
public class TestContextInitializer extends TestConfigs
911
implements ApplicationContextInitializer<ConfigurableApplicationContext> {
1012

1113
private static final Logger LOGGER = LoggerFactory.getLogger(TestContextInitializer.class);
1214

1315
@Override
1416
public void initialize(ConfigurableApplicationContext ac) {
15-
try {
16-
amqpBroker();
17-
mongoClient();
1817

18+
try {
19+
createAmqpBroker();
20+
startUpMongoClient();
1921
} catch (Exception e) {
20-
LOGGER.error(e.getMessage(), e);
22+
LOGGER.error("Failed to startup Mongo client or AMQP broker for test");
2123
}
2224
}
2325
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
2+
Feature: RabbitMQConnection
3+
4+
Scenario: Testing to automatically reconnect to Message Bus
5+
Given We are connected to message bus
6+
When Message bus goes down
7+
And Message bus is restarted
8+
Then I can send events which are put in the waitlist

0 commit comments

Comments
 (0)