Skip to content

Commit 25c102f

Browse files
authored
feat: support of errors.tolerance and dead letter queue (#143)
feat: add support for error tolerance and poison message handling - Introduced support for `errors.tolerance` in the MQ source connector. - Poison messages are now safely handled when `errors.tolerance=all`. - Added base logic in JSONRecordBuilder and moved error handling to a separate utility class. - DLQ (dead letter queue) publishing is supported for poison messages with configurable topic and headers. - Extended error headers to include JMS-specific values for improved observability. - Updated logs to better reflect DLQ publishing and message handling decisions. - Added new unit and integration tests covering: - Poison message scenarios - Error header propagation - DLQ publishing behavior - Updated README with new configuration options for error handling. Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent 4ba2e41 commit 25c102f

File tree

11 files changed

+1268
-149
lines changed

11 files changed

+1268
-149
lines changed

.github/ISSUE_TEMPLATE/BUG-REPORT.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ body:
5858
description: What version of our software are you running?
5959
options:
6060
- latest
61-
- 2.3.0
61+
- 2.4.0 (Default)
6262
- 1.3.5
6363
- older (<1.3.5)
6464
validations:

README.md

Lines changed: 67 additions & 47 deletions
Large diffs are not rendered by default.

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@
5454
<version>3.7.1</version>
5555
<scope>provided</scope>
5656
</dependency>
57+
<dependency>
58+
<groupId>org.apache.kafka</groupId>
59+
<artifactId>connect-runtime</artifactId>
60+
<version>3.7.1</version>
61+
<scope>provided</scope>
62+
</dependency>
5763

5864
<dependency>
5965
<groupId>javax.jms</groupId>

src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ protected JMSContext getJmsContext() throws Exception {
9494
return jmsContext;
9595
}
9696

97-
protected Map<String, String> getDefaultConnectorProperties() {
97+
public static Map<String, String> getDefaultConnectorProperties() {
9898
final Map<String, String> props = new HashMap<>();
9999
props.put("mq.queue.manager", QMGR_NAME);
100100
props.put("mq.connection.mode", "client");
@@ -106,7 +106,7 @@ protected Map<String, String> getDefaultConnectorProperties() {
106106
return props;
107107
}
108108

109-
public AbstractConfig getPropertiesConfig(Map<String, String> props) {
109+
public AbstractConfig getPropertiesConfig(final Map<String, String> props) {
110110
final AbstractConfig connectorConfig = new AbstractConfig(MQSourceConnector.CONFIGDEF, props);
111111
return connectorConfig;
112112
}

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 661 additions & 57 deletions
Large diffs are not rendered by default.

src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
import static org.junit.Assert.assertNull;
2121
import static org.junit.Assert.assertThrows;
2222

23+
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Map;
2526

2627
import javax.jms.BytesMessage;
2728
import javax.jms.MapMessage;
2829
import javax.jms.TextMessage;
2930

31+
import org.apache.kafka.connect.errors.DataException;
3032
import org.apache.kafka.connect.source.SourceRecord;
3133
import org.junit.Test;
3234

@@ -102,4 +104,50 @@ public void buildFromJmsMapMessage() throws Exception {
102104
// verify the exception
103105
assertEquals("Unsupported JMS message type", exc.getMessage());
104106
}
107+
108+
@Test
109+
public void buildFromJmsTestJsonError() throws Exception {
110+
// create MQ message
111+
final TextMessage message = getJmsContext().createTextMessage("Not a valid json string");
112+
113+
// use the builder to convert it to a Kafka record
114+
final JsonRecordBuilder builder = new JsonRecordBuilder();
115+
final DataException exec = assertThrows(DataException.class, () -> builder.toSourceRecord(getJmsContext(), topic, isJMS, message));
116+
assertEquals("Converting byte[] to Kafka Connect data failed due to serialization error: ", exec.getMessage());
117+
}
118+
119+
@Test
120+
public void buildFromJmsTestErrorTolerance() throws Exception {
121+
// create MQ message
122+
final TextMessage message = getJmsContext().createTextMessage("Not a valid json string");
123+
124+
// use the builder to convert it to a Kafka record
125+
final JsonRecordBuilder builder = new JsonRecordBuilder();
126+
final Map<String, String> config = AbstractJMSContextIT.getDefaultConnectorProperties();
127+
config.put("errors.tolerance", "all");
128+
config.put("mq.message.body.jms", "true");
129+
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
130+
131+
builder.configure(config);
132+
final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
133+
assertNull(record);
134+
}
135+
136+
@Test
137+
public void buildFromJmsTestErrorToleranceNone() throws Exception {
138+
// create MQ message
139+
final TextMessage message = getJmsContext().createTextMessage("Not a valid json string");
140+
141+
// use the builder to convert it to a Kafka record
142+
final JsonRecordBuilder builder = new JsonRecordBuilder();
143+
final HashMap<String, String> config = new HashMap<String, String>();
144+
config.put("errors.tolerance", "none");
145+
config.put("mq.message.body.jms", "true");
146+
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
147+
148+
builder.configure(config);
149+
assertThrows(DataException.class, () -> {
150+
builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
151+
});
152+
}
105153
}

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,22 @@ public class MQSourceConnector extends SourceConnector {
187187
public static final long CONFIG_RECONNECT_DELAY_MAX_DEFAULT = 8192L;
188188
public static final long CONFIG_RECONNECT_DELAY_MAX_MINIMUM = 10L;
189189

190+
public static final String DLQ_PREFIX = "errors.deadletterqueue.";
191+
192+
public static final String DLQ_TOPIC_NAME_CONFIG = DLQ_PREFIX + "topic.name";
193+
public static final String DLQ_TOPIC_NAME_DOC = "The name of the topic to be used as the dead letter queue (DLQ) for messages that " +
194+
"result in an error when processed by this source connector, or its transformations or converters. The topic name is blank by default, " +
195+
"which means that no messages are to be recorded in the DLQ.";
196+
public static final String DLQ_TOPIC_DEFAULT = "";
197+
private static final String DLQ_TOPIC_DISPLAY = "Dead Letter Queue Topic Name";
198+
199+
public static final String DLQ_CONTEXT_HEADERS_ENABLE_CONFIG = DLQ_PREFIX + "context.headers.enable";
200+
public static final boolean DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT = false;
201+
public static final String DLQ_CONTEXT_HEADERS_ENABLE_DOC = "If true, add headers containing error context to the messages " +
202+
"written to the dead letter queue. To avoid clashing with headers from the original record, all error context header " +
203+
"keys, all error context header keys will start with <code>__connect.errors.</code>";
204+
private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable Error Context Headers";
205+
190206
// Define valid reconnect options
191207
public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = {
192208
CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF,
@@ -616,6 +632,22 @@ null, new ReadableFile(),
616632
CONFIG_GROUP_MQ, 29,
617633
Width.MEDIUM,
618634
CONFIG_DISPLAY_RECONNECT_DELAY_MAX);
635+
CONFIGDEF.define(DLQ_TOPIC_NAME_CONFIG,
636+
Type.STRING,
637+
DLQ_TOPIC_DEFAULT,
638+
Importance.MEDIUM,
639+
DLQ_TOPIC_NAME_DOC,
640+
CONFIG_GROUP_MQ, 30,
641+
Width.MEDIUM,
642+
DLQ_TOPIC_DISPLAY);
643+
CONFIGDEF.define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG,
644+
Type.BOOLEAN,
645+
DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT,
646+
Importance.MEDIUM,
647+
DLQ_CONTEXT_HEADERS_ENABLE_DOC,
648+
CONFIG_GROUP_MQ, 31,
649+
Width.MEDIUM,
650+
DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
619651

620652
CONFIGDEF.define(CONFIG_NAME_TOPIC,
621653
Type.STRING,

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ public void start(final Map<String, String> props) {
146146
final JMSWorker reader = new JMSWorker();
147147
JMSWorker dedicated = null;
148148
SequenceStateClient client = null;
149+
149150
if (MQSourceConnector.configSupportsExactlyOnce(props)) {
150151
dedicated = new JMSWorker();
151152
client = new SequenceStateClient(props.get(CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE), reader, dedicated);

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java

Lines changed: 88 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,45 +15,62 @@
1515
*/
1616
package com.ibm.eventstreams.connect.mqsource.builders;
1717

18-
import com.ibm.eventstreams.connect.mqsource.MQSourceConnector;
19-
import com.ibm.eventstreams.connect.mqsource.processor.JmsToKafkaHeaderConverter;
18+
import java.util.Map;
19+
import java.util.Optional;
20+
21+
import javax.jms.JMSContext;
22+
import javax.jms.JMSException;
23+
import javax.jms.Message;
24+
2025
import org.apache.kafka.connect.data.Schema;
2126
import org.apache.kafka.connect.data.SchemaAndValue;
2227
import org.apache.kafka.connect.source.SourceRecord;
23-
2428
import org.slf4j.Logger;
2529
import org.slf4j.LoggerFactory;
2630

27-
import javax.jms.JMSContext;
28-
import javax.jms.JMSException;
29-
import javax.jms.Message;
30-
import java.util.Map;
31-
import java.util.Optional;
31+
import com.ibm.eventstreams.connect.mqsource.MQSourceConnector;
32+
import com.ibm.eventstreams.connect.mqsource.processor.JmsToKafkaHeaderConverter;
33+
import com.ibm.eventstreams.connect.mqsource.util.ErrorHandler;
3234

3335
/**
3436
* Builds Kafka Connect SourceRecords from messages.
3537
*/
3638
public abstract class BaseRecordBuilder implements RecordBuilder {
3739
private static final Logger log = LoggerFactory.getLogger(BaseRecordBuilder.class);
3840

39-
public enum KeyHeader { NONE, MESSAGE_ID, CORRELATION_ID, CORRELATION_ID_AS_BYTES, DESTINATION };
40-
protected KeyHeader keyheader = KeyHeader.NONE;
41+
public enum KeyHeader {
42+
NONE, MESSAGE_ID, CORRELATION_ID, CORRELATION_ID_AS_BYTES, DESTINATION
43+
};
4144

45+
protected KeyHeader keyheader = KeyHeader.NONE;
4246

4347
private boolean copyJmsPropertiesFlag = Boolean.FALSE;
4448
private JmsToKafkaHeaderConverter jmsToKafkaHeaderConverter;
49+
private ErrorHandler errorHandler = new ErrorHandler();
4550

4651
/**
4752
* Configure this class.
4853
*
4954
* @param props initial configuration
5055
*
51-
* @throws RecordBuilderException Operation failed and connector should stop.
56+
* @throws RecordBuilderException Operation failed and connector should stop.
5257
*/
53-
@Override public void configure(final Map<String, String> props) {
58+
@Override
59+
public void configure(final Map<String, String> props) {
5460
log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(),
5561
props);
5662

63+
configureKeyHeader(props);
64+
configureJmsProperties(props);
65+
configureErrorHandler(props);
66+
67+
log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName());
68+
}
69+
70+
/**
71+
* Configure key header settings.
72+
*/
73+
private void configureKeyHeader(final Map<String, String> props) {
5774
final String kh = props.get(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER);
5875
if (kh != null) {
5976
if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSMESSAGEID)) {
@@ -73,12 +90,22 @@ public enum KeyHeader { NONE, MESSAGE_ID, CORRELATION_ID, CORRELATION_ID_AS_BYTE
7390
throw new RecordBuilderException("Unsupported MQ record builder key header value");
7491
}
7592
}
93+
}
7694

95+
/**
96+
* Configure JMS properties settings.
97+
*/
98+
private void configureJmsProperties(final Map<String, String> props) {
7799
final String str = props.get(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER);
78100
copyJmsPropertiesFlag = Boolean.parseBoolean(Optional.ofNullable(str).orElse("false"));
79101
jmsToKafkaHeaderConverter = new JmsToKafkaHeaderConverter();
102+
}
80103

81-
log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName());
104+
/**
105+
* Configure error handler.
106+
*/
107+
public void configureErrorHandler(final Map<String, String> props) {
108+
errorHandler.configure(props, copyJmsPropertiesFlag, jmsToKafkaHeaderConverter);
82109
}
83110

84111
/**
@@ -160,38 +187,58 @@ public abstract SchemaAndValue getValue(JMSContext context, String topic, boolea
160187
* @throws JMSException Message could not be converted
161188
*/
162189
@Override
163-
public SourceRecord toSourceRecord(final JMSContext context, final String topic, final boolean messageBodyJms, final Message message) throws JMSException {
190+
public SourceRecord toSourceRecord(final JMSContext context, final String topic, final boolean messageBodyJms,
191+
final Message message) throws JMSException {
164192
return toSourceRecord(context, topic, messageBodyJms, message, null, null);
165193
}
166194

167195
@Override
168-
public SourceRecord toSourceRecord(final JMSContext context, final String topic, final boolean messageBodyJms, final Message message, final Map<String, Long> sourceOffset, final Map<String, String> sourceQueuePartition) throws JMSException {
169-
final SchemaAndValue key = this.getKey(context, topic, message);
170-
final SchemaAndValue value = this.getValue(context, topic, messageBodyJms, message);
171-
172-
if (copyJmsPropertiesFlag && messageBodyJms) {
173-
return new SourceRecord(
174-
sourceQueuePartition,
175-
sourceOffset,
176-
topic,
177-
null,
178-
key.schema(),
179-
key.value(),
180-
value.schema(),
181-
value.value(),
182-
message.getJMSTimestamp(),
183-
jmsToKafkaHeaderConverter.convertJmsPropertiesToKafkaHeaders(message)
184-
);
185-
} else {
186-
return new SourceRecord(
187-
sourceQueuePartition,
188-
sourceOffset,
189-
topic,
190-
key.schema(),
191-
key.value(),
192-
value.schema(),
193-
value.value()
194-
);
196+
public SourceRecord toSourceRecord(final JMSContext context, final String topic, final boolean messageBodyJms,
197+
final Message message, final Map<String, Long> sourceOffset, final Map<String, String> sourceQueuePartition)
198+
throws JMSException {
199+
200+
SchemaAndValue key = new SchemaAndValue(null, null);
201+
202+
try {
203+
// Extract key and value
204+
final SchemaAndValue value = this.getValue(context, topic, messageBodyJms, message);
205+
key = this.getKey(context, topic, message);
206+
207+
// Create and return appropriate record based on configuration
208+
if (copyJmsPropertiesFlag && messageBodyJms) {
209+
return new SourceRecord(
210+
sourceQueuePartition,
211+
sourceOffset,
212+
topic,
213+
null,
214+
key.schema(),
215+
key.value(),
216+
value.schema(),
217+
value.value(),
218+
message.getJMSTimestamp(),
219+
jmsToKafkaHeaderConverter.convertJmsPropertiesToKafkaHeaders(message));
220+
} else {
221+
return new SourceRecord(
222+
sourceQueuePartition,
223+
sourceOffset,
224+
topic,
225+
null,
226+
key.schema(),
227+
key.value(),
228+
value.schema(),
229+
value.value());
230+
}
231+
} catch (final Exception e) {
232+
// Log the error using error handler
233+
errorHandler.logError(e, topic, message);
234+
235+
// If errors are not tolerated, rethrow
236+
if (!errorHandler.shouldTolerateErrors()) {
237+
throw e;
238+
}
239+
240+
// Handle the error based on configured error tolerance
241+
return errorHandler.handleBuildException(message, sourceQueuePartition, sourceOffset, topic, key, e);
195242
}
196243
}
197244
}

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727
import org.apache.kafka.connect.data.SchemaAndValue;
2828
import org.apache.kafka.connect.json.JsonConverter;
29-
3029
import org.slf4j.Logger;
3130
import org.slf4j.LoggerFactory;
3231

0 commit comments

Comments
 (0)