From f87fa1c860573c777586cbcd11dc1d6ef482b9ed Mon Sep 17 00:00:00 2001 From: balajisekarraman Date: Wed, 12 Feb 2025 17:33:14 +0530 Subject: [PATCH 1/2] This fix is to handle , When JMS header key comes as empty from MQ properties, the kafka connect pipeline is failing with NullpointerException while sinking the data to kafka . This scenario is handled in try block in the JmsToKafkaHeaderConverter class --- .../mqsource/processor/JmsToKafkaHeaderConverter.java | 8 +++++++- .../connect/mqsource/JmsToKafkaHeaderConverterTest.java | 5 +++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java index 977fe4e..54741ae 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java @@ -48,7 +48,13 @@ public ConnectHeaders convertJmsPropertiesToKafkaHeaders(final Message message) jmsPropertyKeys.forEach(key -> { try { - connectHeaders.addString(key.toString(), message.getObjectProperty(key.toString()).toString()); + String value = ""; + if (message.getObjectProperty(key.toString()) != null) + { + value = message.getObjectProperty(key.toString()).toString(); + } + connectHeaders.addString(key.toString(), value); + //connectHeaders.addString(key.toString(), message.getObjectProperty(key.toString()).toString()); } catch (final JMSException e) { // Not failing the message processing if JMS properties cannot be read for some // reason. diff --git a/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java b/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java index 057b29e..4cce656 100644 --- a/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java +++ b/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java @@ -44,7 +44,7 @@ public class JmsToKafkaHeaderConverterTest { @Test public void convertJmsPropertiesToKafkaHeaders() throws JMSException { - final List keys = Arrays.asList("facilityCountryCode", "facilityNum"); + final List keys = Arrays.asList("facilityCountryCode", "facilityNum","nullProperty"); final Enumeration keyEnumeration = Collections.enumeration(keys); @@ -52,6 +52,7 @@ public void convertJmsPropertiesToKafkaHeaders() throws JMSException { when(message.getPropertyNames()).thenReturn(keyEnumeration); when(message.getObjectProperty("facilityCountryCode")).thenReturn("US"); when(message.getObjectProperty("facilityNum")).thenReturn("12345"); + when(message.getObjectProperty("nullProperty")).thenReturn(null); // Act final ConnectHeaders actualConnectHeaders = jmsToKafkaHeaderConverter @@ -59,7 +60,7 @@ public void convertJmsPropertiesToKafkaHeaders() throws JMSException { //Verify - assertEquals("Both custom JMS properties were copied to kafka successfully.", 2, actualConnectHeaders.size()); + assertEquals("Both custom JMS properties were copied to kafka successfully.", 3, actualConnectHeaders.size()); } From 6a2066cb70dd097ee081e183fbeee6a7420d4dd2 Mon Sep 17 00:00:00 2001 From: Joel Hanson Date: Tue, 24 Jun 2025 12:54:46 +0530 Subject: [PATCH 2/2] fix: review comments from #144 as well as add more tests Signed-off-by: Joel Hanson --- .../connect/mqsource/MQSourceTaskIT.java | 49 +++++++++++++++++++ .../processor/JmsToKafkaHeaderConverter.java | 12 ++--- .../JmsToKafkaHeaderConverterTest.java | 6 +-- 3 files changed, 56 insertions(+), 11 deletions(-) diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java index 4f4fe32..63efbf0 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java @@ -1299,4 +1299,53 @@ public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Ex assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); } + + @Test + public void verifyJmsMessageWithNullHeaders() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.jms.properties.copy.to.kafka.headers", "true"); + + connectTask.start(connectorConfigProps); + + final TextMessage message = getJmsContext().createTextMessage("hello"); + message.setStringProperty("teststring", "myvalue"); + message.setObjectProperty("testObject", null); + + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message)); + + final List kafkaMessages = connectTask.poll(); + assertEquals(1, kafkaMessages.size()); + + final SourceRecord kafkaMessage = kafkaMessages.get(0); + + assertThat(kafkaMessage.value()).isEqualTo("hello"); + assertThat(kafkaMessage.headers().lastWithName("teststring").value()).isEqualTo("myvalue"); + assertThat(kafkaMessage.headers().lastWithName("testObject").value()).isNull(); + } + + @Test + public void verifyJmsMessageNoHeaderCopied_WhenCopyDisabledHavingNullHeader() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.jms.properties.copy.to.kafka.headers", "false"); + + connectTask.start(connectorConfigProps); + + final TextMessage message = getJmsContext().createTextMessage("hello"); + message.setStringProperty("teststring", "myvalue"); + message.setObjectProperty("testObject", null); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message)); + + final SourceRecord kafkaMessage = connectTask.poll().get(0); + + assertThat(kafkaMessage.value()).isEqualTo("hello"); + assertThat(kafkaMessage.headers()).isEmpty(); + } } diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java index 54741ae..81d62a5 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.Enumeration; import java.util.List; +import java.util.Objects; /** * Single responsibility class to copy JMS properties to Kafka headers. @@ -48,13 +49,10 @@ public ConnectHeaders convertJmsPropertiesToKafkaHeaders(final Message message) jmsPropertyKeys.forEach(key -> { try { - String value = ""; - if (message.getObjectProperty(key.toString()) != null) - { - value = message.getObjectProperty(key.toString()).toString(); - } - connectHeaders.addString(key.toString(), value); - //connectHeaders.addString(key.toString(), message.getObjectProperty(key.toString()).toString()); + final Object prop = message.getObjectProperty(key.toString()); + // this will yield `null` if prop is null, otherwise its toString() + final String headerValue = Objects.toString(prop, null); + connectHeaders.addString(key.toString(), headerValue); } catch (final JMSException e) { // Not failing the message processing if JMS properties cannot be read for some // reason. diff --git a/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java b/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java index 4cce656..82eebe6 100644 --- a/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java +++ b/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java @@ -44,7 +44,7 @@ public class JmsToKafkaHeaderConverterTest { @Test public void convertJmsPropertiesToKafkaHeaders() throws JMSException { - final List keys = Arrays.asList("facilityCountryCode", "facilityNum","nullProperty"); + final List keys = Arrays.asList("facilityCountryCode", "facilityNum", "nullProperty"); final Enumeration keyEnumeration = Collections.enumeration(keys); @@ -60,8 +60,6 @@ public void convertJmsPropertiesToKafkaHeaders() throws JMSException { //Verify - assertEquals("Both custom JMS properties were copied to kafka successfully.", 3, actualConnectHeaders.size()); - - + assertEquals("All three custom JMS properties were copied to kafka successfully.", 3, actualConnectHeaders.size()); } } \ No newline at end of file