Skip to content

Commit cbcadcc

Browse files
This fix is to handle , When JMS header key comes as empty from MQ properties, the kafka connect pipeline is failing with NullpointerException while sinking the data to kafka . This scenario is handled in try block in the JmsToKafkaHeaderConverter class
1 parent 16b8500 commit cbcadcc

File tree

2 files changed

+10
-3
lines changed

2 files changed

+10
-3
lines changed

src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,13 @@ public ConnectHeaders convertJmsPropertiesToKafkaHeaders(final Message message)
4848

4949
jmsPropertyKeys.forEach(key -> {
5050
try {
51-
connectHeaders.addString(key.toString(), message.getObjectProperty(key.toString()).toString());
51+
String value = "";
52+
if (message.getObjectProperty(key.toString()) != null)
53+
{
54+
value = message.getObjectProperty(key.toString()).toString();
55+
}
56+
connectHeaders.addString(key.toString(), value);
57+
//connectHeaders.addString(key.toString(), message.getObjectProperty(key.toString()).toString());
5258
} catch (final JMSException e) {
5359
// Not failing the message processing if JMS properties cannot be read for some
5460
// reason.

src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,22 +44,23 @@ public class JmsToKafkaHeaderConverterTest {
4444
@Test
4545
public void convertJmsPropertiesToKafkaHeaders() throws JMSException {
4646

47-
final List<String> keys = Arrays.asList("facilityCountryCode", "facilityNum");
47+
final List<String> keys = Arrays.asList("facilityCountryCode", "facilityNum","nullProperty");
4848

4949
final Enumeration<String> keyEnumeration = Collections.enumeration(keys);
5050

5151
// Arrange
5252
when(message.getPropertyNames()).thenReturn(keyEnumeration);
5353
when(message.getObjectProperty("facilityCountryCode")).thenReturn("US");
5454
when(message.getObjectProperty("facilityNum")).thenReturn("12345");
55+
when(message.getObjectProperty("nullProperty")).thenReturn(null);
5556

5657
// Act
5758
final ConnectHeaders actualConnectHeaders = jmsToKafkaHeaderConverter
5859
.convertJmsPropertiesToKafkaHeaders(message);
5960

6061

6162
//Verify
62-
assertEquals("Both custom JMS properties were copied to kafka successfully.", 2, actualConnectHeaders.size());
63+
assertEquals("Both custom JMS properties were copied to kafka successfully.", 3, actualConnectHeaders.size());
6364

6465

6566
}

0 commit comments

Comments
 (0)