Skip to content

Commit f6ca557

Browse files
authored
Merge pull request #102 from dalelane/fixed_issue_73_jms_open_conn-tests
fix: close JMS reader when connect task is stopped
2 parents 8505dfe + 5b03c3c commit f6ca557

File tree

6 files changed

+342
-80
lines changed

6 files changed

+342
-80
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static org.junit.Assert.assertArrayEquals;
1919
import static org.junit.Assert.assertEquals;
2020
import static org.junit.Assert.assertNull;
21+
import static org.junit.Assert.assertTrue;
2122

2223
import java.util.Arrays;
2324
import java.util.HashMap;
@@ -33,6 +34,8 @@
3334
import org.testcontainers.containers.GenericContainer;
3435
import org.testcontainers.containers.output.WaitingConsumer;
3536

37+
import com.ibm.eventstreams.connect.mqsource.utils.MQQueueManagerAttrs;
38+
import com.ibm.eventstreams.connect.mqsource.utils.SourceTaskStopper;
3639
import com.ibm.mq.MQException;
3740
import com.ibm.mq.MQMessage;
3841
import com.ibm.mq.MQQueue;
@@ -45,21 +48,19 @@ public class MQSourceTaskAuthIT {
4548
private static final String QUEUE_NAME = "DEV.QUEUE.2";
4649
private static final String CHANNEL_NAME = "DEV.APP.SVRCONN";
4750
private static final String APP_PASSWORD = "MySuperSecretPassword";
51+
private static final String ADMIN_PASSWORD = "MyAdminPassword";
4852

4953

5054
@ClassRule
5155
public static GenericContainer<?> MQ_CONTAINER = new GenericContainer<>("icr.io/ibm-messaging/mq:latest")
5256
.withEnv("LICENSE", "accept")
5357
.withEnv("MQ_QMGR_NAME", QMGR_NAME)
54-
.withEnv("MQ_ENABLE_EMBEDDED_WEB_SERVER", "false")
5558
.withEnv("MQ_APP_PASSWORD", APP_PASSWORD)
56-
.withExposedPorts(1414);
59+
.withEnv("MQ_ADMIN_PASSWORD", ADMIN_PASSWORD)
60+
.withExposedPorts(1414, 9443);
5761

5862

59-
@Test
60-
public void testAuthenticatedQueueManager() throws Exception {
61-
waitForQueueManagerStartup();
62-
63+
private Map<String, String> getConnectorProps() {
6364
Map<String, String> connectorProps = new HashMap<>();
6465
connectorProps.put("mq.queue.manager", QMGR_NAME);
6566
connectorProps.put("mq.connection.mode", "client");
@@ -71,9 +72,15 @@ public void testAuthenticatedQueueManager() throws Exception {
7172
connectorProps.put("mq.password", APP_PASSWORD);
7273
connectorProps.put("mq.message.body.jms", "false");
7374
connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
75+
return connectorProps;
76+
}
77+
78+
@Test
79+
public void testAuthenticatedQueueManager() throws Exception {
80+
waitForQueueManagerStartup();
7481

7582
MQSourceTask newConnectTask = new MQSourceTask();
76-
newConnectTask.start(connectorProps);
83+
newConnectTask.start(getConnectorProps());
7784

7885
MQMessage message1 = new MQMessage();
7986
message1.writeString("hello");
@@ -86,15 +93,53 @@ public void testAuthenticatedQueueManager() throws Exception {
8693
for (SourceRecord kafkaMessage : kafkaMessages) {
8794
assertNull(kafkaMessage.key());
8895
assertEquals(Schema.OPTIONAL_BYTES_SCHEMA, kafkaMessage.valueSchema());
96+
97+
newConnectTask.commitRecord(kafkaMessage);
8998
}
9099

91100
assertArrayEquals("hello".getBytes(), (byte[]) kafkaMessages.get(0).value());
92101
assertArrayEquals("world".getBytes(), (byte[]) kafkaMessages.get(1).value());
93102

94-
newConnectTask.stop();
103+
SourceTaskStopper stopper = new SourceTaskStopper(newConnectTask);
104+
stopper.run();
105+
}
106+
107+
108+
109+
@Test
110+
public void verifyJmsConnClosed() throws Exception {
111+
112+
int restApiPortNumber = MQ_CONTAINER.getMappedPort(9443);
113+
114+
// count number of connections to the qmgr at the start
115+
int numQmgrConnectionsBefore = MQQueueManagerAttrs.getNumConnections(QMGR_NAME, restApiPortNumber, ADMIN_PASSWORD);
116+
117+
// start the source connector so that it connects to the qmgr
118+
MQSourceTask connectTask = new MQSourceTask();
119+
connectTask.start(getConnectorProps());
120+
121+
// count number of connections to the qmgr now - it should have increased
122+
int numQmgrConnectionsDuring = MQQueueManagerAttrs.getNumConnections(QMGR_NAME, restApiPortNumber, ADMIN_PASSWORD);
123+
124+
// stop the source connector so it disconnects from the qmgr
125+
connectTask.stop();
126+
127+
// count number of connections to the qmgr now - it should have decreased
128+
int numQmgrConnectionsAfter = MQQueueManagerAttrs.getNumConnections(QMGR_NAME, restApiPortNumber, ADMIN_PASSWORD);
129+
130+
// verify number of connections changed as expected
131+
assertTrue("connections should have increased after starting the source task",
132+
numQmgrConnectionsDuring > numQmgrConnectionsBefore);
133+
assertTrue("connections should have decreased after calling stop()",
134+
numQmgrConnectionsAfter < numQmgrConnectionsDuring);
135+
136+
// cleanup
137+
SourceTaskStopper stopper = new SourceTaskStopper(connectTask);
138+
stopper.run();
95139
}
96140

97141

142+
98143
private void waitForQueueManagerStartup() throws TimeoutException {
99144
WaitingConsumer logConsumer = new WaitingConsumer();
100145
MQ_CONTAINER.followOutput(logConsumer);

0 commit comments

Comments
 (0)