Skip to content

Commit e9efde1

Browse files
committed
test: additional builder tests
Signed-off-by: Dale Lane <dale.lane@uk.ibm.com>
1 parent 0b6bd18 commit e9efde1

File tree

3 files changed

+227
-17
lines changed

3 files changed

+227
-17
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsink/DefaultMessageBuilderIT.java renamed to src/integration/java/com/ibm/eventstreams/connect/mqsink/builders/DefaultMessageBuilderIT.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.ibm.eventstreams.connect.mqsink;
1+
package com.ibm.eventstreams.connect.mqsink.builders;
22

33
import static org.junit.Assert.assertEquals;
44

@@ -13,40 +13,40 @@
1313
import org.junit.Before;
1414
import org.junit.Test;
1515

16-
import com.ibm.eventstreams.connect.mqsink.builders.DefaultMessageBuilder;
17-
import com.ibm.eventstreams.connect.mqsink.builders.MessageBuilder;
16+
import com.ibm.eventstreams.connect.mqsink.AbstractJMSContextIT;
1817

1918
public class DefaultMessageBuilderIT extends AbstractJMSContextIT {
2019

2120
private MessageBuilder builder;
22-
21+
2322
@Before
2423
public void prepareMessageBuilder() {
2524
builder = new DefaultMessageBuilder();
2625
}
27-
26+
2827
private SinkRecord generateSinkRecord(Schema valueSchema, Object value) {
2928
final String TOPIC = "TOPIC.NAME";
3029
final int PARTITION = 0;
3130
final long OFFSET = 0;
3231
final Schema KEY_SCHEMA = Schema.STRING_SCHEMA;
3332
final String KEY = "mykey";
34-
35-
return new SinkRecord(TOPIC, PARTITION,
33+
34+
return new SinkRecord(TOPIC, PARTITION,
3635
KEY_SCHEMA, KEY,
37-
valueSchema, value,
36+
valueSchema, value,
3837
OFFSET);
3938
}
40-
41-
39+
40+
4241
@Test
4342
public void buildEmptyMessageWithoutSchema() throws Exception {
4443
createAndVerifyEmptyMessage(null);
4544
}
4645
@Test
4746
public void buildEmptyMessageWithSchema() throws Exception {
4847
createAndVerifyEmptyMessage(Schema.STRING_SCHEMA);
49-
}
48+
}
49+
5050
@Test
5151
public void buildTextMessageWithoutSchema() throws Exception {
5252
createAndVerifyStringMessage(null, "Hello World");
@@ -89,29 +89,29 @@ public void buildByteBufferMessageWithSchema() throws Exception {
8989
value.put(payload);
9090
createAndVerifyByteMessage(Schema.BYTES_SCHEMA, value, TEST_MESSAGE);
9191
}
92-
92+
9393

9494
private void createAndVerifyEmptyMessage(Schema valueSchema) throws Exception {
9595
Message message = builder.fromSinkRecord(getJmsContext(), generateSinkRecord(valueSchema, null));
9696
assertEquals(null, message.getBody(String.class));
9797
}
98-
98+
9999
private void createAndVerifyStringMessage(Schema valueSchema, String value) throws Exception {
100100
Message message = builder.fromSinkRecord(getJmsContext(), generateSinkRecord(valueSchema, value));
101101
assertEquals(value, message.getBody(String.class));
102-
102+
103103
TextMessage textmessage = (TextMessage) message;
104104
assertEquals(value, textmessage.getText());
105105
}
106-
106+
107107
private void createAndVerifyIntegerMessage(Schema valueSchema, Integer value) throws Exception {
108108
Message message = builder.fromSinkRecord(getJmsContext(), generateSinkRecord(valueSchema, value));
109109
assertEquals(value, new Integer(message.getBody(String.class)));
110110
}
111-
111+
112112
private void createAndVerifyByteMessage(Schema valueSchema, Object value, String valueAsString) throws Exception {
113113
Message message = builder.fromSinkRecord(getJmsContext(), generateSinkRecord(valueSchema, value));
114-
114+
115115
BytesMessage byteMessage = (BytesMessage) message;
116116
byteMessage.reset();
117117

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package com.ibm.eventstreams.connect.mqsink.builders;
2+
3+
import static org.junit.Assert.assertEquals;
4+
5+
import java.util.ArrayList;
6+
import java.util.HashMap;
7+
import java.util.List;
8+
import java.util.Map;
9+
10+
import javax.jms.Message;
11+
import javax.jms.TextMessage;
12+
13+
import org.apache.kafka.connect.data.Schema;
14+
import org.apache.kafka.connect.sink.SinkRecord;
15+
import org.json.JSONObject;
16+
import org.junit.Before;
17+
import org.junit.Test;
18+
19+
import com.ibm.eventstreams.connect.mqsink.AbstractJMSContextIT;
20+
21+
public class JsonMessageBuilderIT extends AbstractJMSContextIT {
22+
23+
private MessageBuilder builder;
24+
25+
@Before
26+
public void prepareMessageBuilder() {
27+
builder = new JsonMessageBuilder();
28+
}
29+
30+
private SinkRecord generateSinkRecord(Schema valueSchema, Object value) {
31+
final String TOPIC = "TOPIC.NAME";
32+
final int PARTITION = 0;
33+
final long OFFSET = 0;
34+
final Schema KEY_SCHEMA = Schema.STRING_SCHEMA;
35+
final String KEY = "mykey";
36+
37+
return new SinkRecord(TOPIC, PARTITION,
38+
KEY_SCHEMA, KEY,
39+
valueSchema, value,
40+
OFFSET);
41+
}
42+
43+
44+
@Test
45+
public void buildTextMessageWithoutSchema() throws Exception {
46+
createAndVerifyStringMessage(null, "Hello World");
47+
}
48+
@Test
49+
public void buildTextMessageWithSchema() throws Exception {
50+
createAndVerifyStringMessage(Schema.STRING_SCHEMA, "Hello World with a schema");
51+
}
52+
53+
@Test
54+
public void buildJsonMessageWithoutSchema() throws Exception {
55+
Object testObject = generateComplexObject();
56+
57+
Message message = builder.fromSinkRecord(getJmsContext(), generateSinkRecord(null, testObject));
58+
String contents = message.getBody(String.class);
59+
60+
JSONObject jsonContents = new JSONObject(contents);
61+
assertEquals(3, jsonContents.length());
62+
assertEquals("this is a string", jsonContents.getString("mystring"));
63+
assertEquals(true, jsonContents.getJSONObject("myobj").getBoolean("mybool"));
64+
assertEquals(12345, jsonContents.getJSONObject("myobj").getInt("myint"));
65+
assertEquals(12.4, jsonContents.getJSONObject("myobj").getDouble("myfloat"), 0.0001);
66+
assertEquals(4, jsonContents.getJSONArray("myarray").length());
67+
assertEquals("first", jsonContents.getJSONArray("myarray").getString(0));
68+
}
69+
70+
71+
private Object generateComplexObject() {
72+
Map<String, Object> obj = new HashMap<>();
73+
74+
obj.put("mystring", "this is a string");
75+
76+
Map<String, Object> innerobj = new HashMap<>();
77+
innerobj.put("mybool", true);
78+
innerobj.put("myint", 12345);
79+
innerobj.put("myfloat", 12.4f);
80+
innerobj.put("mybytes", "Hello".getBytes());
81+
obj.put("myobj", innerobj);
82+
83+
List<String> innerary = new ArrayList<>();
84+
innerary.add("first");
85+
innerary.add("second");
86+
innerary.add("third");
87+
innerary.add("fourth");
88+
obj.put("myarray", innerary);
89+
90+
return obj;
91+
}
92+
93+
94+
private void createAndVerifyStringMessage(Schema valueSchema, String value) throws Exception {
95+
Message message = builder.fromSinkRecord(getJmsContext(), generateSinkRecord(valueSchema, value));
96+
assertEquals("\"" + value + "\"", message.getBody(String.class));
97+
98+
TextMessage textmessage = (TextMessage) message;
99+
assertEquals("\"" + value + "\"", textmessage.getText());
100+
}
101+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package com.ibm.eventstreams.connect.mqsink.builders;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertThrows;
5+
6+
import java.nio.ByteBuffer;
7+
import java.util.HashMap;
8+
import java.util.Map;
9+
10+
import javax.jms.Message;
11+
12+
import org.apache.kafka.connect.data.Schema;
13+
import org.apache.kafka.connect.errors.ConnectException;
14+
import org.apache.kafka.connect.sink.SinkRecord;
15+
import org.junit.Test;
16+
17+
import com.ibm.eventstreams.connect.mqsink.AbstractJMSContextIT;
18+
19+
public class KeyHeaderIT extends AbstractJMSContextIT {
20+
21+
private SinkRecord generateSinkRecord(Schema keySchema, Object keyValue) {
22+
final String TOPIC = "TOPIC.NAME";
23+
final int PARTITION = 0;
24+
final long OFFSET = 0;
25+
final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
26+
final String VALUE = "message payload";
27+
28+
return new SinkRecord(TOPIC, PARTITION,
29+
keySchema, keyValue,
30+
VALUE_SCHEMA, VALUE,
31+
OFFSET);
32+
}
33+
34+
35+
@Test
36+
public void verifyUnsupportedKeyHeader() throws Exception {
37+
Map<String, String> props = new HashMap<>();
38+
props.put("mq.message.builder.key.header", "unsupported");
39+
40+
DefaultMessageBuilder builder = new DefaultMessageBuilder();
41+
ConnectException exc = assertThrows(ConnectException.class, () -> {
42+
builder.configure(props);
43+
});
44+
assertEquals("Unsupported MQ message builder key header value", exc.getMessage());
45+
}
46+
47+
@Test
48+
public void buildStringKeyHeaderWithoutSchema() throws Exception {
49+
createAndVerifyStringKeyHeader(null, "my-message-key");
50+
}
51+
52+
@Test
53+
public void buildStringKeyHeaderWithSchema() throws Exception {
54+
createAndVerifyStringKeyHeader(Schema.STRING_SCHEMA, "my-message-key");
55+
}
56+
57+
@Test
58+
public void buildByteArrayKeyHeaderWithoutSchema() throws Exception {
59+
createAndVerifyBytesKeyHeader(null, "my-key-bytes".getBytes(), "my-key-bytes");
60+
}
61+
62+
@Test
63+
public void buildByteArrayKeyHeaderWithSchema() throws Exception {
64+
createAndVerifyBytesKeyHeader(Schema.BYTES_SCHEMA, "message-key-bytes".getBytes(), "message-key-bytes");
65+
}
66+
67+
@Test
68+
public void buildByteBufferKeyHeaderWithoutSchema() throws Exception {
69+
String key = "this-is-my-key";
70+
byte[] payload = key.getBytes();
71+
ByteBuffer value = ByteBuffer.allocate(payload.length);
72+
value.put(payload);
73+
74+
createAndVerifyBytesKeyHeader(null, value, key);
75+
}
76+
77+
@Test
78+
public void buildByteBufferKeyHeaderWithSchema() throws Exception {
79+
String key = "this-is-a-key";
80+
byte[] payload = key.getBytes();
81+
ByteBuffer value = ByteBuffer.allocate(payload.length);
82+
value.put(payload);
83+
84+
createAndVerifyBytesKeyHeader(Schema.BYTES_SCHEMA, value, key);
85+
}
86+
87+
88+
private void createAndVerifyStringKeyHeader(Schema schema, String key) throws Exception {
89+
Map<String, String> props = new HashMap<>();
90+
props.put("mq.message.builder.key.header", "JMSCorrelationID");
91+
92+
DefaultMessageBuilder builder = new DefaultMessageBuilder();
93+
builder.configure(props);
94+
95+
Message message = builder.fromSinkRecord(getJmsContext(), generateSinkRecord(schema, key));
96+
assertEquals(key, message.getJMSCorrelationID());
97+
}
98+
99+
private void createAndVerifyBytesKeyHeader(Schema schema, Object key, String keyAsString) throws Exception {
100+
Map<String, String> props = new HashMap<>();
101+
props.put("mq.message.builder.key.header", "JMSCorrelationID");
102+
103+
DefaultMessageBuilder builder = new DefaultMessageBuilder();
104+
builder.configure(props);
105+
106+
Message message = builder.fromSinkRecord(getJmsContext(), generateSinkRecord(schema, key));
107+
assertEquals(keyAsString, new String(message.getJMSCorrelationIDAsBytes()));
108+
}
109+
}

0 commit comments

Comments
 (0)