Skip to content

Commit 629f0e4

Browse files
committed
tests: initial integration test for DefaultMessageBuilder
Signed-off-by: Dale Lane <dale.lane@uk.ibm.com>
1 parent 119857b commit 629f0e4

File tree

3 files changed

+196
-14
lines changed

3 files changed

+196
-14
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package com.ibm.eventstreams.connect.mqsink;
2+
3+
import java.util.concurrent.TimeoutException;
4+
5+
import javax.jms.JMSContext;
6+
7+
import org.junit.ClassRule;
8+
import org.testcontainers.containers.GenericContainer;
9+
import org.testcontainers.containers.output.WaitingConsumer;
10+
11+
import com.ibm.mq.jms.MQConnectionFactory;
12+
import com.ibm.msg.client.wmq.WMQConstants;
13+
14+
15+
/**
16+
* Helper class for integration tests that have a dependency on JMSContext.
17+
*
18+
* It starts a queue manager in a test container, and uses it to create
19+
* a JMSContext instance, that can be used in tests.
20+
*/
21+
public abstract class AbstractJMSContextIT {
22+
23+
private static final String QMGR_NAME = "MYQMGR";
24+
25+
@ClassRule
26+
public static GenericContainer<?> MQ_CONTAINER = new GenericContainer<>("icr.io/ibm-messaging/mq:latest")
27+
.withEnv("LICENSE", "accept")
28+
.withEnv("MQ_QMGR_NAME", QMGR_NAME)
29+
.withEnv("MQ_ENABLE_EMBEDDED_WEB_SERVER", "false")
30+
.withExposedPorts(1414);
31+
32+
private JMSContext jmsContext;
33+
34+
35+
/**
36+
* Returns a JMS context pointing at a developer queue manager running in a
37+
* test container.
38+
*/
39+
public JMSContext getJmsContext() throws Exception {
40+
if (jmsContext == null) {
41+
waitForQueueManagerStartup();
42+
43+
MQConnectionFactory mqcf = new MQConnectionFactory();
44+
mqcf.setTransportType(WMQConstants.WMQ_CM_CLIENT);
45+
mqcf.setChannel("DEV.APP.SVRCONN");
46+
mqcf.setQueueManager(QMGR_NAME);
47+
mqcf.setConnectionNameList("localhost(" + getMQPort().toString() + ")");
48+
49+
jmsContext = mqcf.createContext();
50+
}
51+
52+
return jmsContext;
53+
}
54+
55+
56+
/**
57+
* Gets the host port that has been mapped to the default MQ 1414 port in the test container.
58+
*/
59+
private Integer getMQPort() {
60+
return MQ_CONTAINER.getMappedPort(1414);
61+
}
62+
63+
/**
64+
* Waits until we see a log line in the queue manager test container that indicates
65+
* the queue manager is ready.
66+
*/
67+
private void waitForQueueManagerStartup() throws TimeoutException {
68+
WaitingConsumer logConsumer = new WaitingConsumer();
69+
MQ_CONTAINER.followOutput(logConsumer);
70+
logConsumer.waitUntil(logline -> logline.getUtf8String().contains("AMQ5975I"));
71+
}
72+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package com.ibm.eventstreams.connect.mqsink;
2+
3+
import static org.junit.Assert.assertEquals;
4+
5+
import java.nio.ByteBuffer;
6+
7+
import javax.jms.BytesMessage;
8+
import javax.jms.Message;
9+
import javax.jms.TextMessage;
10+
11+
import org.apache.kafka.connect.data.Schema;
12+
import org.apache.kafka.connect.sink.SinkRecord;
13+
import org.junit.Before;
14+
import org.junit.Test;
15+
16+
import com.ibm.eventstreams.connect.mqsink.builders.DefaultMessageBuilder;
17+
import com.ibm.eventstreams.connect.mqsink.builders.MessageBuilder;
18+
19+
public class DefaultMessageBuilderIT extends AbstractJMSContextIT {
20+
21+
private MessageBuilder builder;
22+
23+
@Before
24+
public void prepareMessageBuilder() {
25+
builder = new DefaultMessageBuilder();
26+
}
27+
28+
private SinkRecord generateSinkRecord(Schema valueSchema, Object value) {
29+
final String TOPIC = "TOPIC.NAME";
30+
final int PARTITION = 0;
31+
final long OFFSET = 0;
32+
final Schema KEY_SCHEMA = Schema.STRING_SCHEMA;
33+
final String KEY = "mykey";
34+
35+
return new SinkRecord(TOPIC, PARTITION,
36+
KEY_SCHEMA, KEY,
37+
valueSchema, value,
38+
OFFSET);
39+
}
40+
41+
42+
@Test
43+
public void buildEmptyMessageWithoutSchema() throws Exception {
44+
createAndVerifyEmptyMessage(null);
45+
}
46+
@Test
47+
public void buildEmptyMessageWithSchema() throws Exception {
48+
createAndVerifyEmptyMessage(Schema.STRING_SCHEMA);
49+
}
50+
@Test
51+
public void buildTextMessageWithoutSchema() throws Exception {
52+
createAndVerifyStringMessage(null, "Hello World");
53+
}
54+
@Test
55+
public void buildTextMessageWithSchema() throws Exception {
56+
createAndVerifyStringMessage(Schema.STRING_SCHEMA, "Hello World with a schema");
57+
}
58+
@Test
59+
public void buildIntMessageWithoutSchema() throws Exception {
60+
createAndVerifyIntegerMessage(null, 1234);
61+
}
62+
@Test
63+
public void buildIntMessageWithSchema() throws Exception {
64+
createAndVerifyIntegerMessage(Schema.INT32_SCHEMA, 1234);
65+
}
66+
@Test
67+
public void buildByteArrayMessageWithoutSchema() throws Exception {
68+
String TEST_MESSAGE = "This is a test";
69+
createAndVerifyByteMessage(null, TEST_MESSAGE.getBytes(), TEST_MESSAGE);
70+
}
71+
@Test
72+
public void buildByteArrayMessageWithSchema() throws Exception {
73+
String TEST_MESSAGE = "This is another test";
74+
createAndVerifyByteMessage(Schema.BYTES_SCHEMA, TEST_MESSAGE.getBytes(), TEST_MESSAGE);
75+
}
76+
@Test
77+
public void buildByteBufferMessageWithoutSchema() throws Exception {
78+
String TEST_MESSAGE = "This is also a test!";
79+
byte[] payload = TEST_MESSAGE.getBytes();
80+
ByteBuffer value = ByteBuffer.allocate(payload.length);
81+
value.put(payload);
82+
createAndVerifyByteMessage(null, value, TEST_MESSAGE);
83+
}
84+
@Test
85+
public void buildByteBufferMessageWithSchema() throws Exception {
86+
String TEST_MESSAGE = "This is a bytebuffer test";
87+
byte[] payload = TEST_MESSAGE.getBytes();
88+
ByteBuffer value = ByteBuffer.allocate(payload.length);
89+
value.put(payload);
90+
createAndVerifyByteMessage(Schema.BYTES_SCHEMA, value, TEST_MESSAGE);
91+
}
92+
93+
94+
private void createAndVerifyEmptyMessage(Schema valueSchema) throws Exception {
95+
Message message = builder.fromSinkRecord(getJmsContext(), generateSinkRecord(valueSchema, null));
96+
assertEquals(null, message.getBody(String.class));
97+
}
98+
99+
private void createAndVerifyStringMessage(Schema valueSchema, String value) throws Exception {
100+
Message message = builder.fromSinkRecord(getJmsContext(), generateSinkRecord(valueSchema, value));
101+
assertEquals(value, message.getBody(String.class));
102+
103+
TextMessage textmessage = (TextMessage) message;
104+
assertEquals(value, textmessage.getText());
105+
}
106+
107+
private void createAndVerifyIntegerMessage(Schema valueSchema, Integer value) throws Exception {
108+
Message message = builder.fromSinkRecord(getJmsContext(), generateSinkRecord(valueSchema, value));
109+
assertEquals(value, new Integer(message.getBody(String.class)));
110+
}
111+
112+
private void createAndVerifyByteMessage(Schema valueSchema, Object value, String valueAsString) throws Exception {
113+
Message message = builder.fromSinkRecord(getJmsContext(), generateSinkRecord(valueSchema, value));
114+
115+
BytesMessage byteMessage = (BytesMessage) message;
116+
byteMessage.reset();
117+
118+
byte[] byteData = null;
119+
byteData = new byte[(int) byteMessage.getBodyLength()];
120+
byteMessage.readBytes(byteData);
121+
String stringMessage = new String(byteData);
122+
assertEquals(valueAsString, stringMessage);
123+
}
124+
}

src/integration/java/com/ibm/eventstreams/connect/mqsink/PlaceholderIntegrationIT.java

Lines changed: 0 additions & 14 deletions
This file was deleted.

0 commit comments

Comments
 (0)