Skip to content

Commit 6a2066c

Browse files
committed
fix: review comments from #144 as well as add more tests
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent f87fa1c commit 6a2066c

File tree

3 files changed

+56
-11
lines changed

3 files changed

+56
-11
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1299,4 +1299,53 @@ public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Ex
12991299
assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source");
13001300
assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic");
13011301
}
1302+
1303+
@Test
1304+
public void verifyJmsMessageWithNullHeaders() throws Exception {
1305+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1306+
1307+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1308+
connectorConfigProps.put("mq.message.body.jms", "true");
1309+
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
1310+
connectorConfigProps.put("mq.jms.properties.copy.to.kafka.headers", "true");
1311+
1312+
connectTask.start(connectorConfigProps);
1313+
1314+
final TextMessage message = getJmsContext().createTextMessage("hello");
1315+
message.setStringProperty("teststring", "myvalue");
1316+
message.setObjectProperty("testObject", null);
1317+
1318+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message));
1319+
1320+
final List<SourceRecord> kafkaMessages = connectTask.poll();
1321+
assertEquals(1, kafkaMessages.size());
1322+
1323+
final SourceRecord kafkaMessage = kafkaMessages.get(0);
1324+
1325+
assertThat(kafkaMessage.value()).isEqualTo("hello");
1326+
assertThat(kafkaMessage.headers().lastWithName("teststring").value()).isEqualTo("myvalue");
1327+
assertThat(kafkaMessage.headers().lastWithName("testObject").value()).isNull();
1328+
}
1329+
1330+
@Test
1331+
public void verifyJmsMessageNoHeaderCopied_WhenCopyDisabledHavingNullHeader() throws Exception {
1332+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1333+
1334+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1335+
connectorConfigProps.put("mq.message.body.jms", "true");
1336+
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
1337+
connectorConfigProps.put("mq.jms.properties.copy.to.kafka.headers", "false");
1338+
1339+
connectTask.start(connectorConfigProps);
1340+
1341+
final TextMessage message = getJmsContext().createTextMessage("hello");
1342+
message.setStringProperty("teststring", "myvalue");
1343+
message.setObjectProperty("testObject", null);
1344+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message));
1345+
1346+
final SourceRecord kafkaMessage = connectTask.poll().get(0);
1347+
1348+
assertThat(kafkaMessage.value()).isEqualTo("hello");
1349+
assertThat(kafkaMessage.headers()).isEmpty();
1350+
}
13021351
}

src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Collections;
2525
import java.util.Enumeration;
2626
import java.util.List;
27+
import java.util.Objects;
2728

2829
/**
2930
* Single responsibility class to copy JMS properties to Kafka headers.
@@ -48,13 +49,10 @@ public ConnectHeaders convertJmsPropertiesToKafkaHeaders(final Message message)
4849

4950
jmsPropertyKeys.forEach(key -> {
5051
try {
51-
String value = "";
52-
if (message.getObjectProperty(key.toString()) != null)
53-
{
54-
value = message.getObjectProperty(key.toString()).toString();
55-
}
56-
connectHeaders.addString(key.toString(), value);
57-
//connectHeaders.addString(key.toString(), message.getObjectProperty(key.toString()).toString());
52+
final Object prop = message.getObjectProperty(key.toString());
53+
// this will yield `null` if prop is null, otherwise its toString()
54+
final String headerValue = Objects.toString(prop, null);
55+
connectHeaders.addString(key.toString(), headerValue);
5856
} catch (final JMSException e) {
5957
// Not failing the message processing if JMS properties cannot be read for some
6058
// reason.

src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class JmsToKafkaHeaderConverterTest {
4444
@Test
4545
public void convertJmsPropertiesToKafkaHeaders() throws JMSException {
4646

47-
final List<String> keys = Arrays.asList("facilityCountryCode", "facilityNum","nullProperty");
47+
final List<String> keys = Arrays.asList("facilityCountryCode", "facilityNum", "nullProperty");
4848

4949
final Enumeration<String> keyEnumeration = Collections.enumeration(keys);
5050

@@ -60,8 +60,6 @@ public void convertJmsPropertiesToKafkaHeaders() throws JMSException {
6060

6161

6262
//Verify
63-
assertEquals("Both custom JMS properties were copied to kafka successfully.", 3, actualConnectHeaders.size());
64-
65-
63+
assertEquals("All three custom JMS properties were copied to kafka successfully.", 3, actualConnectHeaders.size());
6664
}
6765
}

0 commit comments

Comments
 (0)