diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java index 4f4fe32..63efbf0 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java @@ -1299,4 +1299,53 @@ public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Ex assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); } + + @Test + public void verifyJmsMessageWithNullHeaders() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.jms.properties.copy.to.kafka.headers", "true"); + + connectTask.start(connectorConfigProps); + + final TextMessage message = getJmsContext().createTextMessage("hello"); + message.setStringProperty("teststring", "myvalue"); + message.setObjectProperty("testObject", null); + + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message)); + + final List kafkaMessages = connectTask.poll(); + assertEquals(1, kafkaMessages.size()); + + final SourceRecord kafkaMessage = kafkaMessages.get(0); + + assertThat(kafkaMessage.value()).isEqualTo("hello"); + assertThat(kafkaMessage.headers().lastWithName("teststring").value()).isEqualTo("myvalue"); + assertThat(kafkaMessage.headers().lastWithName("testObject").value()).isNull(); + } + + @Test + public void verifyJmsMessageNoHeaderCopied_WhenCopyDisabledHavingNullHeader() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.jms.properties.copy.to.kafka.headers", "false"); + + connectTask.start(connectorConfigProps); + + final TextMessage message = getJmsContext().createTextMessage("hello"); + message.setStringProperty("teststring", "myvalue"); + message.setObjectProperty("testObject", null); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message)); + + final SourceRecord kafkaMessage = connectTask.poll().get(0); + + assertThat(kafkaMessage.value()).isEqualTo("hello"); + assertThat(kafkaMessage.headers()).isEmpty(); + } } diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java index 977fe4e..81d62a5 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.Enumeration; import java.util.List; +import java.util.Objects; /** * Single responsibility class to copy JMS properties to Kafka headers. @@ -48,7 +49,10 @@ public ConnectHeaders convertJmsPropertiesToKafkaHeaders(final Message message) jmsPropertyKeys.forEach(key -> { try { - connectHeaders.addString(key.toString(), message.getObjectProperty(key.toString()).toString()); + final Object prop = message.getObjectProperty(key.toString()); + // this will yield `null` if prop is null, otherwise its toString() + final String headerValue = Objects.toString(prop, null); + connectHeaders.addString(key.toString(), headerValue); } catch (final JMSException e) { // Not failing the message processing if JMS properties cannot be read for some // reason. diff --git a/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java b/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java index 057b29e..82eebe6 100644 --- a/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java +++ b/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java @@ -44,7 +44,7 @@ public class JmsToKafkaHeaderConverterTest { @Test public void convertJmsPropertiesToKafkaHeaders() throws JMSException { - final List keys = Arrays.asList("facilityCountryCode", "facilityNum"); + final List keys = Arrays.asList("facilityCountryCode", "facilityNum", "nullProperty"); final Enumeration keyEnumeration = Collections.enumeration(keys); @@ -52,6 +52,7 @@ public void convertJmsPropertiesToKafkaHeaders() throws JMSException { when(message.getPropertyNames()).thenReturn(keyEnumeration); when(message.getObjectProperty("facilityCountryCode")).thenReturn("US"); when(message.getObjectProperty("facilityNum")).thenReturn("12345"); + when(message.getObjectProperty("nullProperty")).thenReturn(null); // Act final ConnectHeaders actualConnectHeaders = jmsToKafkaHeaderConverter @@ -59,8 +60,6 @@ public void convertJmsPropertiesToKafkaHeaders() throws JMSException { //Verify - assertEquals("Both custom JMS properties were copied to kafka successfully.", 2, actualConnectHeaders.size()); - - + assertEquals("All three custom JMS properties were copied to kafka successfully.", 3, actualConnectHeaders.size()); } } \ No newline at end of file