From cbcadccd831f05bcac07fea822c1f69dab4d8389 Mon Sep 17 00:00:00 2001 From: balajisekarraman Date: Wed, 12 Feb 2025 17:33:14 +0530 Subject: [PATCH] This fix is to handle , When JMS header key comes as empty from MQ properties, the kafka connect pipeline is failing with NullpointerException while sinking the data to kafka . This scenario is handled in try block in the JmsToKafkaHeaderConverter class --- .../mqsource/processor/JmsToKafkaHeaderConverter.java | 8 +++++++- .../connect/mqsource/JmsToKafkaHeaderConverterTest.java | 5 +++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java index 977fe4e..54741ae 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java @@ -48,7 +48,13 @@ public ConnectHeaders convertJmsPropertiesToKafkaHeaders(final Message message) jmsPropertyKeys.forEach(key -> { try { - connectHeaders.addString(key.toString(), message.getObjectProperty(key.toString()).toString()); + String value = ""; + if (message.getObjectProperty(key.toString()) != null) + { + value = message.getObjectProperty(key.toString()).toString(); + } + connectHeaders.addString(key.toString(), value); + //connectHeaders.addString(key.toString(), message.getObjectProperty(key.toString()).toString()); } catch (final JMSException e) { // Not failing the message processing if JMS properties cannot be read for some // reason. diff --git a/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java b/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java index 057b29e..4cce656 100644 --- a/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java +++ b/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java @@ -44,7 +44,7 @@ public class JmsToKafkaHeaderConverterTest { @Test public void convertJmsPropertiesToKafkaHeaders() throws JMSException { - final List keys = Arrays.asList("facilityCountryCode", "facilityNum"); + final List keys = Arrays.asList("facilityCountryCode", "facilityNum","nullProperty"); final Enumeration keyEnumeration = Collections.enumeration(keys); @@ -52,6 +52,7 @@ public void convertJmsPropertiesToKafkaHeaders() throws JMSException { when(message.getPropertyNames()).thenReturn(keyEnumeration); when(message.getObjectProperty("facilityCountryCode")).thenReturn("US"); when(message.getObjectProperty("facilityNum")).thenReturn("12345"); + when(message.getObjectProperty("nullProperty")).thenReturn(null); // Act final ConnectHeaders actualConnectHeaders = jmsToKafkaHeaderConverter @@ -59,7 +60,7 @@ public void convertJmsPropertiesToKafkaHeaders() throws JMSException { //Verify - assertEquals("Both custom JMS properties were copied to kafka successfully.", 2, actualConnectHeaders.size()); + assertEquals("Both custom JMS properties were copied to kafka successfully.", 3, actualConnectHeaders.size()); }