Skip to content

Commit b644944

Browse files
committed
feat: handle poison messages from base builder class
- Handle poison messages from base builder class - Poison message is handle when `errors.tolerance` is set to all - Option to configure deadletter queue for poison messages - New UTs for testing errors tolerance behaviour Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent dd64af5 commit b644944

File tree

7 files changed

+648
-142
lines changed

7 files changed

+648
-142
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ protected JMSContext getJmsContext() throws Exception {
9494
return jmsContext;
9595
}
9696

97-
protected Map<String, String> getDefaultConnectorProperties() {
97+
public static Map<String, String> getDefaultConnectorProperties() {
9898
final Map<String, String> props = new HashMap<>();
9999
props.put("mq.queue.manager", QMGR_NAME);
100100
props.put("mq.connection.mode", "client");
@@ -106,7 +106,7 @@ protected Map<String, String> getDefaultConnectorProperties() {
106106
return props;
107107
}
108108

109-
public AbstractConfig getPropertiesConfig(Map<String, String> props) {
109+
public AbstractConfig getPropertiesConfig(final Map<String, String> props) {
110110
final AbstractConfig connectorConfig = new AbstractConfig(MQSourceConnector.CONFIGDEF, props);
111111
return connectorConfig;
112112
}

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 365 additions & 57 deletions
Large diffs are not rendered by default.

src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import static org.junit.Assert.assertNull;
2121
import static org.junit.Assert.assertThrows;
2222

23-
import java.util.HashMap;
2423
import java.util.List;
2524
import java.util.Map;
2625

@@ -113,8 +112,7 @@ public void buildFromJmsTestJsonError() throws Exception {
113112
// use the builder to convert it to a Kafka record
114113
final JsonRecordBuilder builder = new JsonRecordBuilder();
115114
final DataException exec = assertThrows(DataException.class, () -> builder.toSourceRecord(getJmsContext(), topic, isJMS, message));
116-
System.out.println(exec);
117-
System.out.println(exec.getMessage());
115+
assertEquals("Converting byte[] to Kafka Connect data failed due to serialization error: ", exec.getMessage());
118116
}
119117

120118
@Test
@@ -124,12 +122,13 @@ public void buildFromJmsTestErrorTolerance() throws Exception {
124122

125123
// use the builder to convert it to a Kafka record
126124
final JsonRecordBuilder builder = new JsonRecordBuilder();
127-
final HashMap<String, String> config = new HashMap<String, String>();
125+
final Map<String, String> config = AbstractJMSContextIT.getDefaultConnectorProperties();
128126
config.put("errors.tolerance", "all");
127+
config.put("mq.message.body.jms", "true");
128+
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
129+
129130
builder.configure(config);
130131
final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
131-
assertNull(record.key());
132-
assertNull(record.valueSchema());
133-
verifyJsonMap((Map<?, ?>) record.value());
132+
assertNull(record);
134133
}
135134
}

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,14 @@ public class MQSourceConnector extends SourceConnector {
181181
public static final long CONFIG_RECONNECT_DELAY_MAX_DEFAULT = 8192L;
182182
public static final long CONFIG_RECONNECT_DELAY_MAX_MINIMUM = 10L;
183183

184+
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG = "errors.deadletterqueue.topic.name";
185+
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DISPLAY = "Output errors to the dead letter queue";
186+
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DEFAULT = "";
187+
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC = "Whether to output conversion errors to the dead letter queue. "
188+
+ "Stops poison messages when using schemas, any message will be outputted as extended json on the specified topic. "
189+
+ "By default messages are not outputted to the dead letter queue. "
190+
+ "Also requires `errors.tolerance=all`.";
191+
184192
// Define valid reconnect options
185193
public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = {
186194
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF,

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@
2424
import com.ibm.eventstreams.connect.mqsource.util.QueueConfig;
2525
import org.apache.kafka.common.config.AbstractConfig;
2626
import org.apache.kafka.connect.errors.ConnectException;
27-
import org.apache.kafka.connect.errors.DataException;
28-
import org.apache.kafka.connect.runtime.ConnectorConfig;
29-
import org.apache.kafka.connect.runtime.errors.ToleranceType;
3027
import org.apache.kafka.connect.source.SourceRecord;
3128
import org.apache.kafka.connect.source.SourceTask;
3229
import org.slf4j.Logger;
@@ -40,7 +37,6 @@
4037
import java.util.Collections;
4138
import java.util.HashMap;
4239
import java.util.List;
43-
import java.util.Locale;
4440
import java.util.Map;
4541
import java.util.Optional;
4642
import java.util.concurrent.CountDownLatch;
@@ -112,7 +108,6 @@ public class MQSourceTask extends SourceTask {
112108

113109
private int startActionPollLimit = 300; // This is a 5 minute time out on the initial start procedure
114110
private AtomicInteger startActionPollCount = new AtomicInteger(0);
115-
private boolean tolerateErrors = false;
116111

117112
private final static String OFFSET_IDENTIFIER = "sequence-id";
118113
private final static String SOURCE_PARTITION_IDENTIFIER = "source";
@@ -151,7 +146,7 @@ public void start(final Map<String, String> props) {
151146
final JMSWorker reader = new JMSWorker();
152147
JMSWorker dedicated = null;
153148
SequenceStateClient client = null;
154-
tolerateErrors = tolerateErrors(props);
149+
155150
if (MQSourceConnector.configSupportsExactlyOnce(props)) {
156151
dedicated = new JMSWorker();
157152
client = new SequenceStateClient(props.get(CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE), reader, dedicated);
@@ -368,17 +363,7 @@ private List<SourceRecord> internalPoll() throws InterruptedException, JMSRuntim
368363
final ArrayList<String> msgIds = new ArrayList<>();
369364
final List<SourceRecord> sourceRecordList = messageList.stream()
370365
.peek(saveMessageID(msgIds))
371-
.map(message -> {
372-
try {
373-
return reader.toSourceRecord(message, sourceQueueConfig.isMqMessageBodyJms(), sourceOffset, sourceQueuePartition);
374-
} catch (final DataException dataException) {
375-
if (!tolerateErrors) {
376-
throw dataException;
377-
}
378-
}
379-
return null;
380-
})
381-
.filter(sourceRecord -> sourceRecord != null)
366+
.map(message -> reader.toSourceRecord(message, sourceQueueConfig.isMqMessageBodyJms(), sourceOffset, sourceQueuePartition))
382367
.collect(Collectors.toList());
383368

384369
// In RE-DELIVER we already have a state on the queue
@@ -601,9 +586,4 @@ private void closeAllWorkers() {
601586
dedicated.close();
602587
}
603588
}
604-
605-
public static boolean tolerateErrors(final Map<String, String> sfConnectorConfig) {
606-
final String errorsTolerance = sfConnectorConfig.getOrDefault(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, ToleranceType.NONE.toString()).toUpperCase(Locale.ROOT);
607-
return ToleranceType.valueOf(errorsTolerance).equals(ToleranceType.ALL);
608-
}
609589
}

0 commit comments

Comments
 (0)