Skip to content

Commit 99fc0ff

Browse files
authored
fix: handle empty MQ messages (#135)
With this commit, the Connector will create tombstone (null value) Kafka records to represent empty MQ messages. Contributes to: #134 Signed-off-by: Dale Lane <dale.lane@uk.ibm.com>
1 parent 6d2b939 commit 99fc0ff

File tree

5 files changed

+55
-4
lines changed

5 files changed

+55
-4
lines changed

.github/ISSUE_TEMPLATE/BUG-REPORT.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ body:
5757
label: Version
5858
description: What version of our software are you running?
5959
options:
60-
- 1.3.4 (Default)
61-
- older (<1.3.4)
60+
- 1.3.5 (Default)
61+
- older (<1.3.5)
6262
validations:
6363
required: true
6464
- type: textarea

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<groupId>com.ibm.eventstreams.connect</groupId>
2222
<artifactId>kafka-connect-mq-source</artifactId>
2323
<packaging>jar</packaging>
24-
<version>1.3.4</version>
24+
<version>1.3.5</version>
2525
<name>kafka-connect-mq-source</name>
2626
<organization>
2727
<name>IBM Corporation</name>

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,4 +426,52 @@ public void verifyDestinationAsKey() throws Exception {
426426

427427
connectTask.commitRecord(kafkaMessage);
428428
}
429+
430+
431+
@Test
432+
public void verifyEmptyMessage() throws Exception {
433+
connectTask = new MQSourceTask();
434+
435+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
436+
connectorConfigProps.put("mq.message.body.jms", "true");
437+
connectorConfigProps.put("mq.record.builder",
438+
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
439+
440+
connectTask.start(connectorConfigProps);
441+
442+
Message emptyMessage = getJmsContext().createMessage();
443+
putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(emptyMessage));
444+
445+
final List<SourceRecord> kafkaMessages = connectTask.poll();
446+
assertEquals(1, kafkaMessages.size());
447+
448+
final SourceRecord kafkaMessage = kafkaMessages.get(0);
449+
assertNull(kafkaMessage.value());
450+
451+
connectTask.commitRecord(kafkaMessage);
452+
}
453+
454+
455+
@Test
456+
public void verifyEmptyTextMessage() throws Exception {
457+
connectTask = new MQSourceTask();
458+
459+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
460+
connectorConfigProps.put("mq.message.body.jms", "true");
461+
connectorConfigProps.put("mq.record.builder",
462+
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
463+
464+
connectTask.start(connectorConfigProps);
465+
466+
TextMessage emptyMessage = getJmsContext().createTextMessage();
467+
putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(emptyMessage));
468+
469+
final List<SourceRecord> kafkaMessages = connectTask.poll();
470+
assertEquals(1, kafkaMessages.size());
471+
472+
final SourceRecord kafkaMessage = kafkaMessages.get(0);
473+
assertNull(kafkaMessage.value());
474+
475+
connectTask.commitRecord(kafkaMessage);
476+
}
429477
}

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public class MQSourceConnector extends SourceConnector {
141141
public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic.";
142142
public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic";
143143

144-
public static String version = "1.3.4";
144+
public static String version = "1.3.5";
145145

146146
private Map<String, String> configProps;
147147

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilder.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ public DefaultRecordBuilder() {
7070
} else if (message instanceof TextMessage) {
7171
log.debug("Text message with no schema");
7272
value = message.getBody(String.class);
73+
} else if (message.getBody(Object.class) == null) {
74+
log.debug("Empty message");
75+
value = null;
7376
} else {
7477
log.error("Unsupported JMS message type {}", message.getClass());
7578
throw new ConnectException("Unsupported JMS message type");

0 commit comments

Comments
 (0)