Skip to content

Commit 93700b9

Browse files
committed
test: add test for an authenticated queue manager connection
Signed-off-by: Dale Lane <dale.lane@uk.ibm.com>
1 parent ec2368f commit 93700b9

File tree

1 file changed

+141
-0
lines changed

1 file changed

+141
-0
lines changed
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/**
2+
* Copyright 2022 IBM Corporation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.ibm.eventstreams.connect.mqsink;
17+
18+
import static org.junit.Assert.assertEquals;
19+
20+
import java.util.ArrayList;
21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.concurrent.TimeoutException;
25+
26+
import javax.jms.Connection;
27+
import javax.jms.Destination;
28+
import javax.jms.JMSException;
29+
import javax.jms.Message;
30+
import javax.jms.MessageConsumer;
31+
import javax.jms.Session;
32+
33+
import org.apache.kafka.connect.sink.SinkRecord;
34+
import org.junit.ClassRule;
35+
import org.junit.Test;
36+
import org.testcontainers.containers.GenericContainer;
37+
import org.testcontainers.containers.output.WaitingConsumer;
38+
39+
import com.ibm.msg.client.jms.JmsConnectionFactory;
40+
import com.ibm.msg.client.jms.JmsFactoryFactory;
41+
import com.ibm.msg.client.wmq.WMQConstants;
42+
43+
public class MQSinkTaskAuthIT {
44+
45+
private static final String QMGR_NAME = "MYAUTHQMGR";
46+
private static final String QUEUE_NAME = "DEV.QUEUE.2";
47+
private static final String CHANNEL_NAME = "DEV.APP.SVRCONN";
48+
private static final String APP_PASSWORD = "MySuperSecretPassword";
49+
50+
51+
@ClassRule
52+
public static GenericContainer<?> MQ_CONTAINER = new GenericContainer<>("icr.io/ibm-messaging/mq:latest")
53+
.withEnv("LICENSE", "accept")
54+
.withEnv("MQ_QMGR_NAME", QMGR_NAME)
55+
.withEnv("MQ_ENABLE_EMBEDDED_WEB_SERVER", "false")
56+
.withEnv("MQ_APP_PASSWORD", APP_PASSWORD)
57+
.withExposedPorts(1414);
58+
59+
60+
@Test
61+
public void testAuthenticatedQueueManager() throws Exception {
62+
waitForQueueManagerStartup();
63+
64+
Map<String, String> connectorProps = new HashMap<>();
65+
connectorProps.put("mq.queue.manager", QMGR_NAME);
66+
connectorProps.put("mq.connection.mode", "client");
67+
connectorProps.put("mq.connection.name.list", "localhost(" + MQ_CONTAINER.getMappedPort(1414).toString() + ")");
68+
connectorProps.put("mq.channel.name", CHANNEL_NAME);
69+
connectorProps.put("mq.queue", QUEUE_NAME);
70+
connectorProps.put("mq.user.authentication.mqcsp", "true");
71+
connectorProps.put("mq.user.name", "app");
72+
connectorProps.put("mq.password", APP_PASSWORD);
73+
connectorProps.put("mq.message.builder", "com.ibm.eventstreams.connect.mqsink.builders.DefaultMessageBuilder");
74+
75+
MQSinkTask newConnectTask = new MQSinkTask();
76+
newConnectTask.start(connectorProps);
77+
78+
List<SinkRecord> records = new ArrayList<>();
79+
SinkRecord record = new SinkRecord("KAFKA.TOPIC", 0,
80+
null, null,
81+
null, "message payload",
82+
0);
83+
records.add(record);
84+
85+
newConnectTask.put(records);
86+
87+
newConnectTask.stop();
88+
89+
List<Message> messages = getAllMessagesFromQueue();
90+
assertEquals(1, messages.size());
91+
assertEquals("message payload", messages.get(0).getBody(String.class));
92+
}
93+
94+
95+
private void waitForQueueManagerStartup() throws TimeoutException {
96+
WaitingConsumer logConsumer = new WaitingConsumer();
97+
MQ_CONTAINER.followOutput(logConsumer);
98+
logConsumer.waitUntil(logline -> logline.getUtf8String().contains("AMQ5975I"));
99+
}
100+
101+
private List<Message> getAllMessagesFromQueue() throws JMSException {
102+
Connection connection = null;
103+
Session session = null;
104+
Destination destination = null;
105+
MessageConsumer consumer = null;
106+
107+
JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
108+
109+
JmsConnectionFactory cf = ff.createConnectionFactory();
110+
cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, "localhost");
111+
cf.setIntProperty(WMQConstants.WMQ_PORT, MQ_CONTAINER.getMappedPort(1414));
112+
cf.setStringProperty(WMQConstants.WMQ_CHANNEL, CHANNEL_NAME);
113+
cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
114+
cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, QMGR_NAME);
115+
cf.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
116+
cf.setStringProperty(WMQConstants.USERID, "app");
117+
cf.setStringProperty(WMQConstants.PASSWORD, APP_PASSWORD);
118+
119+
connection = cf.createConnection();
120+
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
121+
122+
destination = session.createQueue(QUEUE_NAME);
123+
consumer = session.createConsumer(destination);
124+
125+
connection.start();
126+
127+
List<Message> messages = new ArrayList<>();
128+
Message message;
129+
do {
130+
message = consumer.receiveNoWait();
131+
if (message != null) {
132+
messages.add(message);
133+
}
134+
}
135+
while (message != null);
136+
137+
connection.close();
138+
139+
return messages;
140+
}
141+
}

0 commit comments

Comments
 (0)