Skip to content

Commit 044e10e

Browse files
committed
feat: handle poison messages from base builder class
- Handle poison messages from base builder class - Poison message is handle when `errors.tolerance` is set to all - Option to configure deadletter queue for poison messages - New UTs for testing errors tolerance behaviour Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent dd64af5 commit 044e10e

File tree

7 files changed

+647
-133
lines changed

7 files changed

+647
-133
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ protected JMSContext getJmsContext() throws Exception {
9494
return jmsContext;
9595
}
9696

97-
protected Map<String, String> getDefaultConnectorProperties() {
97+
public static Map<String, String> getDefaultConnectorProperties() {
9898
final Map<String, String> props = new HashMap<>();
9999
props.put("mq.queue.manager", QMGR_NAME);
100100
props.put("mq.connection.mode", "client");
@@ -106,7 +106,7 @@ protected Map<String, String> getDefaultConnectorProperties() {
106106
return props;
107107
}
108108

109-
public AbstractConfig getPropertiesConfig(Map<String, String> props) {
109+
public AbstractConfig getPropertiesConfig(final Map<String, String> props) {
110110
final AbstractConfig connectorConfig = new AbstractConfig(MQSourceConnector.CONFIGDEF, props);
111111
return connectorConfig;
112112
}

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 365 additions & 57 deletions
Large diffs are not rendered by default.

src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import static org.junit.Assert.assertNull;
2121
import static org.junit.Assert.assertThrows;
2222

23-
import java.util.HashMap;
2423
import java.util.List;
2524
import java.util.Map;
2625

@@ -113,8 +112,7 @@ public void buildFromJmsTestJsonError() throws Exception {
113112
// use the builder to convert it to a Kafka record
114113
final JsonRecordBuilder builder = new JsonRecordBuilder();
115114
final DataException exec = assertThrows(DataException.class, () -> builder.toSourceRecord(getJmsContext(), topic, isJMS, message));
116-
System.out.println(exec);
117-
System.out.println(exec.getMessage());
115+
assertEquals("Converting byte[] to Kafka Connect data failed due to serialization error: ", exec.getMessage());
118116
}
119117

120118
@Test
@@ -124,12 +122,13 @@ public void buildFromJmsTestErrorTolerance() throws Exception {
124122

125123
// use the builder to convert it to a Kafka record
126124
final JsonRecordBuilder builder = new JsonRecordBuilder();
127-
final HashMap<String, String> config = new HashMap<String, String>();
125+
final Map<String, String> config = AbstractJMSContextIT.getDefaultConnectorProperties();
128126
config.put("errors.tolerance", "all");
127+
config.put("mq.message.body.jms", "true");
128+
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
129+
129130
builder.configure(config);
130131
final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
131-
assertNull(record.key());
132-
assertNull(record.valueSchema());
133-
verifyJsonMap((Map<?, ?>) record.value());
132+
assertNull(record);
134133
}
135134
}

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,14 @@ public class MQSourceConnector extends SourceConnector {
181181
public static final long CONFIG_RECONNECT_DELAY_MAX_DEFAULT = 8192L;
182182
public static final long CONFIG_RECONNECT_DELAY_MAX_MINIMUM = 10L;
183183

184+
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG = "errors.deadletterqueue.topic.name";
185+
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DISPLAY = "Output errors to the dead letter queue";
186+
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DEFAULT = "";
187+
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC = "Whether to output conversion errors to the dead letter queue. "
188+
+ "Stops poison messages when using schemas, any message will be outputted as extended json on the specified topic. "
189+
+ "By default messages are not outputted to the dead letter queue. "
190+
+ "Also requires `errors.tolerance=all`.";
191+
184192
// Define valid reconnect options
185193
public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = {
186194
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF,

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.ibm.eventstreams.connect.mqsource.util.QueueConfig;
2525
import org.apache.kafka.common.config.AbstractConfig;
2626
import org.apache.kafka.connect.errors.ConnectException;
27-
import org.apache.kafka.connect.errors.DataException;
2827
import org.apache.kafka.connect.runtime.ConnectorConfig;
2928
import org.apache.kafka.connect.runtime.errors.ToleranceType;
3029
import org.apache.kafka.connect.source.SourceRecord;
@@ -112,7 +111,6 @@ public class MQSourceTask extends SourceTask {
112111

113112
private int startActionPollLimit = 300; // This is a 5 minute time out on the initial start procedure
114113
private AtomicInteger startActionPollCount = new AtomicInteger(0);
115-
private boolean tolerateErrors = false;
116114

117115
private final static String OFFSET_IDENTIFIER = "sequence-id";
118116
private final static String SOURCE_PARTITION_IDENTIFIER = "source";
@@ -151,7 +149,7 @@ public void start(final Map<String, String> props) {
151149
final JMSWorker reader = new JMSWorker();
152150
JMSWorker dedicated = null;
153151
SequenceStateClient client = null;
154-
tolerateErrors = tolerateErrors(props);
152+
155153
if (MQSourceConnector.configSupportsExactlyOnce(props)) {
156154
dedicated = new JMSWorker();
157155
client = new SequenceStateClient(props.get(CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE), reader, dedicated);
@@ -368,17 +366,7 @@ private List<SourceRecord> internalPoll() throws InterruptedException, JMSRuntim
368366
final ArrayList<String> msgIds = new ArrayList<>();
369367
final List<SourceRecord> sourceRecordList = messageList.stream()
370368
.peek(saveMessageID(msgIds))
371-
.map(message -> {
372-
try {
373-
return reader.toSourceRecord(message, sourceQueueConfig.isMqMessageBodyJms(), sourceOffset, sourceQueuePartition);
374-
} catch (final DataException dataException) {
375-
if (!tolerateErrors) {
376-
throw dataException;
377-
}
378-
}
379-
return null;
380-
})
381-
.filter(sourceRecord -> sourceRecord != null)
369+
.map(message -> reader.toSourceRecord(message, sourceQueueConfig.isMqMessageBodyJms(), sourceOffset, sourceQueuePartition))
382370
.collect(Collectors.toList());
383371

384372
// In RE-DELIVER we already have a state on the queue
@@ -601,9 +589,4 @@ private void closeAllWorkers() {
601589
dedicated.close();
602590
}
603591
}
604-
605-
public static boolean tolerateErrors(final Map<String, String> sfConnectorConfig) {
606-
final String errorsTolerance = sfConnectorConfig.getOrDefault(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, ToleranceType.NONE.toString()).toUpperCase(Locale.ROOT);
607-
return ToleranceType.valueOf(errorsTolerance).equals(ToleranceType.ALL);
608-
}
609592
}

0 commit comments

Comments
 (0)