Skip to content

Commit dd64af5

Browse files
committed
feat: support of errors.tolerance
- Added a IT for error tolerance - Added conditions with in JSONRecordBuilder for error tolerance Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent 16b8500 commit dd64af5

File tree

5 files changed

+79
-3
lines changed

5 files changed

+79
-3
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@
5454
<version>3.7.1</version>
5555
<scope>provided</scope>
5656
</dependency>
57+
<dependency>
58+
<groupId>org.apache.kafka</groupId>
59+
<artifactId>connect-runtime</artifactId>
60+
<version>3.7.1</version>
61+
<scope>provided</scope>
62+
</dependency>
5763

5864
<dependency>
5965
<groupId>javax.jms</groupId>

src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
import static org.junit.Assert.assertNull;
2121
import static org.junit.Assert.assertThrows;
2222

23+
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Map;
2526

2627
import javax.jms.BytesMessage;
2728
import javax.jms.MapMessage;
2829
import javax.jms.TextMessage;
2930

31+
import org.apache.kafka.connect.errors.DataException;
3032
import org.apache.kafka.connect.source.SourceRecord;
3133
import org.junit.Test;
3234

@@ -102,4 +104,32 @@ public void buildFromJmsMapMessage() throws Exception {
102104
// verify the exception
103105
assertEquals("Unsupported JMS message type", exc.getMessage());
104106
}
107+
108+
@Test
109+
public void buildFromJmsTestJsonError() throws Exception {
110+
// create MQ message
111+
final TextMessage message = getJmsContext().createTextMessage("Not a valid json string");
112+
113+
// use the builder to convert it to a Kafka record
114+
final JsonRecordBuilder builder = new JsonRecordBuilder();
115+
final DataException exec = assertThrows(DataException.class, () -> builder.toSourceRecord(getJmsContext(), topic, isJMS, message));
116+
System.out.println(exec);
117+
System.out.println(exec.getMessage());
118+
}
119+
120+
@Test
121+
public void buildFromJmsTestErrorTolerance() throws Exception {
122+
// create MQ message
123+
final TextMessage message = getJmsContext().createTextMessage("Not a valid json string");
124+
125+
// use the builder to convert it to a Kafka record
126+
final JsonRecordBuilder builder = new JsonRecordBuilder();
127+
final HashMap<String, String> config = new HashMap<String, String>();
128+
config.put("errors.tolerance", "all");
129+
builder.configure(config);
130+
final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
131+
assertNull(record.key());
132+
assertNull(record.valueSchema());
133+
verifyJsonMap((Map<?, ?>) record.value());
134+
}
105135
}

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import com.ibm.eventstreams.connect.mqsource.util.QueueConfig;
2525
import org.apache.kafka.common.config.AbstractConfig;
2626
import org.apache.kafka.connect.errors.ConnectException;
27+
import org.apache.kafka.connect.errors.DataException;
28+
import org.apache.kafka.connect.runtime.ConnectorConfig;
29+
import org.apache.kafka.connect.runtime.errors.ToleranceType;
2730
import org.apache.kafka.connect.source.SourceRecord;
2831
import org.apache.kafka.connect.source.SourceTask;
2932
import org.slf4j.Logger;
@@ -37,6 +40,7 @@
3740
import java.util.Collections;
3841
import java.util.HashMap;
3942
import java.util.List;
43+
import java.util.Locale;
4044
import java.util.Map;
4145
import java.util.Optional;
4246
import java.util.concurrent.CountDownLatch;
@@ -108,6 +112,7 @@ public class MQSourceTask extends SourceTask {
108112

109113
private int startActionPollLimit = 300; // This is a 5 minute time out on the initial start procedure
110114
private AtomicInteger startActionPollCount = new AtomicInteger(0);
115+
private boolean tolerateErrors = false;
111116

112117
private final static String OFFSET_IDENTIFIER = "sequence-id";
113118
private final static String SOURCE_PARTITION_IDENTIFIER = "source";
@@ -146,6 +151,7 @@ public void start(final Map<String, String> props) {
146151
final JMSWorker reader = new JMSWorker();
147152
JMSWorker dedicated = null;
148153
SequenceStateClient client = null;
154+
tolerateErrors = tolerateErrors(props);
149155
if (MQSourceConnector.configSupportsExactlyOnce(props)) {
150156
dedicated = new JMSWorker();
151157
client = new SequenceStateClient(props.get(CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE), reader, dedicated);
@@ -362,7 +368,17 @@ private List<SourceRecord> internalPoll() throws InterruptedException, JMSRuntim
362368
final ArrayList<String> msgIds = new ArrayList<>();
363369
final List<SourceRecord> sourceRecordList = messageList.stream()
364370
.peek(saveMessageID(msgIds))
365-
.map(message -> reader.toSourceRecord(message, sourceQueueConfig.isMqMessageBodyJms(), sourceOffset, sourceQueuePartition))
371+
.map(message -> {
372+
try {
373+
return reader.toSourceRecord(message, sourceQueueConfig.isMqMessageBodyJms(), sourceOffset, sourceQueuePartition);
374+
} catch (final DataException dataException) {
375+
if (!tolerateErrors) {
376+
throw dataException;
377+
}
378+
}
379+
return null;
380+
})
381+
.filter(sourceRecord -> sourceRecord != null)
366382
.collect(Collectors.toList());
367383

368384
// In RE-DELIVER we already have a state on the queue
@@ -585,4 +601,9 @@ private void closeAllWorkers() {
585601
dedicated.close();
586602
}
587603
}
604+
605+
public static boolean tolerateErrors(final Map<String, String> sfConnectorConfig) {
606+
final String errorsTolerance = sfConnectorConfig.getOrDefault(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, ToleranceType.NONE.toString()).toUpperCase(Locale.ROOT);
607+
return ToleranceType.valueOf(errorsTolerance).equals(ToleranceType.ALL);
608+
}
588609
}

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import javax.jms.JMSContext;
2828
import javax.jms.JMSException;
2929
import javax.jms.Message;
30+
3031
import java.util.Map;
3132
import java.util.Optional;
3233

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
import static java.nio.charset.StandardCharsets.UTF_8;
1919

2020
import java.util.HashMap;
21+
import java.util.Locale;
22+
import java.util.Map;
23+
2124
import javax.jms.BytesMessage;
2225
import javax.jms.JMSContext;
2326
import javax.jms.JMSException;
@@ -26,7 +29,8 @@
2629

2730
import org.apache.kafka.connect.data.SchemaAndValue;
2831
import org.apache.kafka.connect.json.JsonConverter;
29-
32+
import org.apache.kafka.connect.runtime.ConnectorConfig;
33+
import org.apache.kafka.connect.runtime.errors.ToleranceType;
3034
import org.slf4j.Logger;
3135
import org.slf4j.LoggerFactory;
3236

@@ -38,6 +42,7 @@ public class JsonRecordBuilder extends BaseRecordBuilder {
3842
private static final Logger log = LoggerFactory.getLogger(JsonRecordBuilder.class);
3943

4044
private JsonConverter converter;
45+
private ToleranceType toleranceType;
4146

4247
public JsonRecordBuilder() {
4348
log.info("Building records using com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -51,6 +56,13 @@ public JsonRecordBuilder() {
5156
converter.configure(m, false);
5257
}
5358

59+
@Override
60+
public void configure(final Map<String, String> props) {
61+
super.configure(props);
62+
final String errorTolerance = props.getOrDefault(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, ConnectorConfig.ERRORS_TOLERANCE_DEFAULT.toString());
63+
toleranceType = ToleranceType.valueOf(errorTolerance.toUpperCase(Locale.ROOT));
64+
}
65+
5466
/**
5567
* Gets the value schema to use for the Kafka Connect SourceRecord.
5668
*
@@ -78,6 +90,12 @@ public SchemaAndValue getValue(final JMSContext context, final String topic, fin
7890
throw new RecordBuilderException("Unsupported JMS message type");
7991
}
8092

81-
return converter.toConnectData(topic, payload);
93+
try {
94+
return converter.toConnectData(topic, payload);
95+
} catch (final Exception e) {
96+
if (toleranceType == ToleranceType.NONE)
97+
throw e;
98+
}
99+
return null;
82100
}
83101
}

0 commit comments

Comments
 (0)