18
18
import static org .junit .Assert .assertArrayEquals ;
19
19
import static org .junit .Assert .assertEquals ;
20
20
import static org .junit .Assert .assertNull ;
21
+ import static org .junit .Assert .assertTrue ;
21
22
22
23
import java .util .Arrays ;
23
24
import java .util .HashMap ;
33
34
import org .testcontainers .containers .GenericContainer ;
34
35
import org .testcontainers .containers .output .WaitingConsumer ;
35
36
37
+ import com .ibm .eventstreams .connect .mqsource .utils .MQQueueManagerAttrs ;
38
+ import com .ibm .eventstreams .connect .mqsource .utils .SourceTaskStopper ;
36
39
import com .ibm .mq .MQException ;
37
40
import com .ibm .mq .MQMessage ;
38
41
import com .ibm .mq .MQQueue ;
@@ -45,21 +48,19 @@ public class MQSourceTaskAuthIT {
45
48
private static final String QUEUE_NAME = "DEV.QUEUE.2" ;
46
49
private static final String CHANNEL_NAME = "DEV.APP.SVRCONN" ;
47
50
private static final String APP_PASSWORD = "MySuperSecretPassword" ;
51
+ private static final String ADMIN_PASSWORD = "MyAdminPassword" ;
48
52
49
53
50
54
@ ClassRule
51
55
public static GenericContainer <?> MQ_CONTAINER = new GenericContainer <>("icr.io/ibm-messaging/mq:latest" )
52
56
.withEnv ("LICENSE" , "accept" )
53
57
.withEnv ("MQ_QMGR_NAME" , QMGR_NAME )
54
- .withEnv ("MQ_ENABLE_EMBEDDED_WEB_SERVER" , "false" )
55
58
.withEnv ("MQ_APP_PASSWORD" , APP_PASSWORD )
56
- .withExposedPorts (1414 );
59
+ .withEnv ("MQ_ADMIN_PASSWORD" , ADMIN_PASSWORD )
60
+ .withExposedPorts (1414 , 9443 );
57
61
58
62
59
- @ Test
60
- public void testAuthenticatedQueueManager () throws Exception {
61
- waitForQueueManagerStartup ();
62
-
63
+ private Map <String , String > getConnectorProps () {
63
64
Map <String , String > connectorProps = new HashMap <>();
64
65
connectorProps .put ("mq.queue.manager" , QMGR_NAME );
65
66
connectorProps .put ("mq.connection.mode" , "client" );
@@ -71,9 +72,15 @@ public void testAuthenticatedQueueManager() throws Exception {
71
72
connectorProps .put ("mq.password" , APP_PASSWORD );
72
73
connectorProps .put ("mq.message.body.jms" , "false" );
73
74
connectorProps .put ("mq.record.builder" , "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder" );
75
+ return connectorProps ;
76
+ }
77
+
78
+ @ Test
79
+ public void testAuthenticatedQueueManager () throws Exception {
80
+ waitForQueueManagerStartup ();
74
81
75
82
MQSourceTask newConnectTask = new MQSourceTask ();
76
- newConnectTask .start (connectorProps );
83
+ newConnectTask .start (getConnectorProps () );
77
84
78
85
MQMessage message1 = new MQMessage ();
79
86
message1 .writeString ("hello" );
@@ -98,6 +105,41 @@ public void testAuthenticatedQueueManager() throws Exception {
98
105
}
99
106
100
107
108
+
109
+ @ Test
110
+ public void verifyJmsConnClosed () throws Exception {
111
+
112
+ int restApiPortNumber = MQ_CONTAINER .getMappedPort (9443 );
113
+
114
+ // count number of connections to the qmgr at the start
115
+ int numQmgrConnectionsBefore = MQQueueManagerAttrs .getNumConnections (QMGR_NAME , restApiPortNumber , ADMIN_PASSWORD );
116
+
117
+ // start the source connector so that it connects to the qmgr
118
+ MQSourceTask connectTask = new MQSourceTask ();
119
+ connectTask .start (getConnectorProps ());
120
+
121
+ // count number of connections to the qmgr now - it should have increased
122
+ int numQmgrConnectionsDuring = MQQueueManagerAttrs .getNumConnections (QMGR_NAME , restApiPortNumber , ADMIN_PASSWORD );
123
+
124
+ // stop the source connector so it disconnects from the qmgr
125
+ connectTask .stop ();
126
+
127
+ // count number of connections to the qmgr now - it should have decreased
128
+ int numQmgrConnectionsAfter = MQQueueManagerAttrs .getNumConnections (QMGR_NAME , restApiPortNumber , ADMIN_PASSWORD );
129
+
130
+ // verify number of connections changed as expected
131
+ assertTrue ("connections should have increased after starting the source task" ,
132
+ numQmgrConnectionsDuring > numQmgrConnectionsBefore );
133
+ assertTrue ("connections should have decreased after calling stop()" ,
134
+ numQmgrConnectionsAfter < numQmgrConnectionsDuring );
135
+
136
+ // cleanup
137
+ SourceTaskStopper stopper = new SourceTaskStopper (connectTask );
138
+ stopper .run ();
139
+ }
140
+
141
+
142
+
101
143
private void waitForQueueManagerStartup () throws TimeoutException {
102
144
WaitingConsumer logConsumer = new WaitingConsumer ();
103
145
MQ_CONTAINER .followOutput (logConsumer );
0 commit comments