Skip to content

Commit 2d832c2

Browse files
Fix: Enhance JMS Header Handling for Null Values with proper Kafka Header Mapping (#154)
* This fix is to handle , When JMS header key comes as empty from MQ properties, the kafka connect pipeline is failing with NullpointerException while sinking the data to kafka . This scenario is handled in try block in the JmsToKafkaHeaderConverter class * fix: review comments from #144 as well as add more tests Signed-off-by: Joel Hanson <joelhanson025@gmail.com> Co-authored-by: balajisekarraman <balajisekarraman@gmail.com>
1 parent b423935 commit 2d832c2

File tree

3 files changed

+57
-5
lines changed

3 files changed

+57
-5
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1299,4 +1299,53 @@ public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Ex
12991299
assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source");
13001300
assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic");
13011301
}
1302+
1303+
@Test
1304+
public void verifyJmsMessageWithNullHeaders() throws Exception {
1305+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1306+
1307+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1308+
connectorConfigProps.put("mq.message.body.jms", "true");
1309+
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
1310+
connectorConfigProps.put("mq.jms.properties.copy.to.kafka.headers", "true");
1311+
1312+
connectTask.start(connectorConfigProps);
1313+
1314+
final TextMessage message = getJmsContext().createTextMessage("hello");
1315+
message.setStringProperty("teststring", "myvalue");
1316+
message.setObjectProperty("testObject", null);
1317+
1318+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message));
1319+
1320+
final List<SourceRecord> kafkaMessages = connectTask.poll();
1321+
assertEquals(1, kafkaMessages.size());
1322+
1323+
final SourceRecord kafkaMessage = kafkaMessages.get(0);
1324+
1325+
assertThat(kafkaMessage.value()).isEqualTo("hello");
1326+
assertThat(kafkaMessage.headers().lastWithName("teststring").value()).isEqualTo("myvalue");
1327+
assertThat(kafkaMessage.headers().lastWithName("testObject").value()).isNull();
1328+
}
1329+
1330+
@Test
1331+
public void verifyJmsMessageNoHeaderCopied_WhenCopyDisabledHavingNullHeader() throws Exception {
1332+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1333+
1334+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1335+
connectorConfigProps.put("mq.message.body.jms", "true");
1336+
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
1337+
connectorConfigProps.put("mq.jms.properties.copy.to.kafka.headers", "false");
1338+
1339+
connectTask.start(connectorConfigProps);
1340+
1341+
final TextMessage message = getJmsContext().createTextMessage("hello");
1342+
message.setStringProperty("teststring", "myvalue");
1343+
message.setObjectProperty("testObject", null);
1344+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message));
1345+
1346+
final SourceRecord kafkaMessage = connectTask.poll().get(0);
1347+
1348+
assertThat(kafkaMessage.value()).isEqualTo("hello");
1349+
assertThat(kafkaMessage.headers()).isEmpty();
1350+
}
13021351
}

src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Collections;
2525
import java.util.Enumeration;
2626
import java.util.List;
27+
import java.util.Objects;
2728

2829
/**
2930
* Single responsibility class to copy JMS properties to Kafka headers.
@@ -48,7 +49,10 @@ public ConnectHeaders convertJmsPropertiesToKafkaHeaders(final Message message)
4849

4950
jmsPropertyKeys.forEach(key -> {
5051
try {
51-
connectHeaders.addString(key.toString(), message.getObjectProperty(key.toString()).toString());
52+
final Object prop = message.getObjectProperty(key.toString());
53+
// this will yield `null` if prop is null, otherwise its toString()
54+
final String headerValue = Objects.toString(prop, null);
55+
connectHeaders.addString(key.toString(), headerValue);
5256
} catch (final JMSException e) {
5357
// Not failing the message processing if JMS properties cannot be read for some
5458
// reason.

src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,23 +44,22 @@ public class JmsToKafkaHeaderConverterTest {
4444
@Test
4545
public void convertJmsPropertiesToKafkaHeaders() throws JMSException {
4646

47-
final List<String> keys = Arrays.asList("facilityCountryCode", "facilityNum");
47+
final List<String> keys = Arrays.asList("facilityCountryCode", "facilityNum", "nullProperty");
4848

4949
final Enumeration<String> keyEnumeration = Collections.enumeration(keys);
5050

5151
// Arrange
5252
when(message.getPropertyNames()).thenReturn(keyEnumeration);
5353
when(message.getObjectProperty("facilityCountryCode")).thenReturn("US");
5454
when(message.getObjectProperty("facilityNum")).thenReturn("12345");
55+
when(message.getObjectProperty("nullProperty")).thenReturn(null);
5556

5657
// Act
5758
final ConnectHeaders actualConnectHeaders = jmsToKafkaHeaderConverter
5859
.convertJmsPropertiesToKafkaHeaders(message);
5960

6061

6162
//Verify
62-
assertEquals("Both custom JMS properties were copied to kafka successfully.", 2, actualConnectHeaders.size());
63-
64-
63+
assertEquals("All three custom JMS properties were copied to kafka successfully.", 3, actualConnectHeaders.size());
6564
}
6665
}

0 commit comments

Comments
 (0)