Skip to content

Commit 103db8d

Browse files
committed
fix: review comments from ibm-messaging#144 as well as add more tests
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent cbcadcc commit 103db8d

File tree

3 files changed

+56
-11
lines changed

3 files changed

+56
-11
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,4 +692,53 @@ public void testJmsWorkerWithCustomReciveForConsumerAndCustomReconnectValues() t
692692
assertEquals(100L, shared.getReconnectDelayMillisMin());
693693
assertEquals(10000L, shared.getReconnectDelayMillisMax());
694694
}
695+
696+
@Test
697+
public void verifyJmsMessageWithNullHeaders() throws Exception {
698+
connectTask = getSourceTaskWithEmptyKafkaOffset();
699+
700+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
701+
connectorConfigProps.put("mq.message.body.jms", "true");
702+
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
703+
connectorConfigProps.put("mq.jms.properties.copy.to.kafka.headers", "true");
704+
705+
connectTask.start(connectorConfigProps);
706+
707+
final TextMessage message = getJmsContext().createTextMessage("hello");
708+
message.setStringProperty("teststring", "myvalue");
709+
message.setObjectProperty("testObject", null);
710+
711+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message));
712+
713+
final List<SourceRecord> kafkaMessages = connectTask.poll();
714+
assertEquals(1, kafkaMessages.size());
715+
716+
final SourceRecord kafkaMessage = kafkaMessages.get(0);
717+
718+
assertThat(kafkaMessage.value()).isEqualTo("hello");
719+
assertThat(kafkaMessage.headers().lastWithName("teststring").value()).isEqualTo("myvalue");
720+
assertThat(kafkaMessage.headers().lastWithName("testObject").value()).isNull();
721+
}
722+
723+
@Test
724+
public void verifyJmsMessageNoHeaderCopied_WhenCopyDisabledHavingNullHeader() throws Exception {
725+
connectTask = getSourceTaskWithEmptyKafkaOffset();
726+
727+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
728+
connectorConfigProps.put("mq.message.body.jms", "true");
729+
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
730+
connectorConfigProps.put("mq.jms.properties.copy.to.kafka.headers", "false");
731+
732+
connectTask.start(connectorConfigProps);
733+
734+
final TextMessage message = getJmsContext().createTextMessage("hello");
735+
message.setStringProperty("teststring", "myvalue");
736+
message.setObjectProperty("testObject", null);
737+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message));
738+
739+
final SourceRecord kafkaMessage = connectTask.poll().get(0);
740+
741+
assertThat(kafkaMessage.value()).isEqualTo("hello");
742+
assertThat(kafkaMessage.headers()).isEmpty();
743+
}
695744
}

src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Collections;
2525
import java.util.Enumeration;
2626
import java.util.List;
27+
import java.util.Objects;
2728

2829
/**
2930
* Single responsibility class to copy JMS properties to Kafka headers.
@@ -48,13 +49,10 @@ public ConnectHeaders convertJmsPropertiesToKafkaHeaders(final Message message)
4849

4950
jmsPropertyKeys.forEach(key -> {
5051
try {
51-
String value = "";
52-
if (message.getObjectProperty(key.toString()) != null)
53-
{
54-
value = message.getObjectProperty(key.toString()).toString();
55-
}
56-
connectHeaders.addString(key.toString(), value);
57-
//connectHeaders.addString(key.toString(), message.getObjectProperty(key.toString()).toString());
52+
final Object prop = message.getObjectProperty(key.toString());
53+
// this will yield `null` if prop is null, otherwise its toString()
54+
final String headerValue = Objects.toString(prop, null);
55+
connectHeaders.addString(key.toString(), headerValue);
5856
} catch (final JMSException e) {
5957
// Not failing the message processing if JMS properties cannot be read for some
6058
// reason.

src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class JmsToKafkaHeaderConverterTest {
4444
@Test
4545
public void convertJmsPropertiesToKafkaHeaders() throws JMSException {
4646

47-
final List<String> keys = Arrays.asList("facilityCountryCode", "facilityNum","nullProperty");
47+
final List<String> keys = Arrays.asList("facilityCountryCode", "facilityNum", "nullProperty");
4848

4949
final Enumeration<String> keyEnumeration = Collections.enumeration(keys);
5050

@@ -60,8 +60,6 @@ public void convertJmsPropertiesToKafkaHeaders() throws JMSException {
6060

6161

6262
//Verify
63-
assertEquals("Both custom JMS properties were copied to kafka successfully.", 3, actualConnectHeaders.size());
64-
65-
63+
assertEquals("All three custom JMS properties were copied to kafka successfully.", 3, actualConnectHeaders.size());
6664
}
6765
}

0 commit comments

Comments
 (0)