Skip to content

Commit 6064cc2

Browse files
committed
feat: support of errors.tolerance
- Added a IT for error tolerance - Added conditions with in JSONRecordBuilder for error tolerance
1 parent 16b8500 commit 6064cc2

File tree

4 files changed

+58
-2
lines changed

4 files changed

+58
-2
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@
5454
<version>3.7.1</version>
5555
<scope>provided</scope>
5656
</dependency>
57+
<dependency>
58+
<groupId>org.apache.kafka</groupId>
59+
<artifactId>connect-runtime</artifactId>
60+
<version>3.7.1</version>
61+
<scope>provided</scope>
62+
</dependency>
5763

5864
<dependency>
5965
<groupId>javax.jms</groupId>

src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
import static org.junit.Assert.assertNull;
2121
import static org.junit.Assert.assertThrows;
2222

23+
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Map;
2526

2627
import javax.jms.BytesMessage;
2728
import javax.jms.MapMessage;
2829
import javax.jms.TextMessage;
2930

31+
import org.apache.kafka.connect.errors.DataException;
3032
import org.apache.kafka.connect.source.SourceRecord;
3133
import org.junit.Test;
3234

@@ -102,4 +104,32 @@ public void buildFromJmsMapMessage() throws Exception {
102104
// verify the exception
103105
assertEquals("Unsupported JMS message type", exc.getMessage());
104106
}
107+
108+
@Test
109+
public void buildFromJmsTestJsonError() throws Exception {
110+
// create MQ message
111+
final TextMessage message = getJmsContext().createTextMessage("Not a valid json string");
112+
113+
// use the builder to convert it to a Kafka record
114+
final JsonRecordBuilder builder = new JsonRecordBuilder();
115+
final DataException exec = assertThrows(DataException.class, () -> builder.toSourceRecord(getJmsContext(), topic, isJMS, message));
116+
System.out.println(exec);
117+
System.out.println(exec.getMessage());
118+
}
119+
120+
@Test
121+
public void buildFromJmsTestErrorTolerance() throws Exception {
122+
// create MQ message
123+
final TextMessage message = getJmsContext().createTextMessage("Not a valid json string");
124+
125+
// use the builder to convert it to a Kafka record
126+
final JsonRecordBuilder builder = new JsonRecordBuilder();
127+
final HashMap<String, String> config = new HashMap<String, String>();
128+
config.put("errors.tolerance", "all");
129+
builder.configure(config);
130+
final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
131+
assertNull(record.key());
132+
assertNull(record.valueSchema());
133+
verifyJsonMap((Map<?, ?>) record.value());
134+
}
105135
}

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,8 @@ private List<SourceRecord> internalPoll() throws InterruptedException, JMSRuntim
363363
final List<SourceRecord> sourceRecordList = messageList.stream()
364364
.peek(saveMessageID(msgIds))
365365
.map(message -> reader.toSourceRecord(message, sourceQueueConfig.isMqMessageBodyJms(), sourceOffset, sourceQueuePartition))
366+
// TODO: comment this for errors.tolerance
367+
.filter(sourceRecord -> sourceRecord != null)
366368
.collect(Collectors.toList());
367369

368370
// In RE-DELIVER we already have a state on the queue

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
import static java.nio.charset.StandardCharsets.UTF_8;
1919

2020
import java.util.HashMap;
21+
import java.util.Locale;
22+
import java.util.Map;
23+
2124
import javax.jms.BytesMessage;
2225
import javax.jms.JMSContext;
2326
import javax.jms.JMSException;
@@ -26,7 +29,8 @@
2629

2730
import org.apache.kafka.connect.data.SchemaAndValue;
2831
import org.apache.kafka.connect.json.JsonConverter;
29-
32+
import org.apache.kafka.connect.runtime.ConnectorConfig;
33+
import org.apache.kafka.connect.runtime.errors.ToleranceType;
3034
import org.slf4j.Logger;
3135
import org.slf4j.LoggerFactory;
3236

@@ -38,6 +42,7 @@ public class JsonRecordBuilder extends BaseRecordBuilder {
3842
private static final Logger log = LoggerFactory.getLogger(JsonRecordBuilder.class);
3943

4044
private JsonConverter converter;
45+
private ToleranceType toleranceType;
4146

4247
public JsonRecordBuilder() {
4348
log.info("Building records using com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -51,6 +56,13 @@ public JsonRecordBuilder() {
5156
converter.configure(m, false);
5257
}
5358

59+
@Override
60+
public void configure(final Map<String, String> props) {
61+
super.configure(props);
62+
final String errorTolerance = props.getOrDefault(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, ConnectorConfig.ERRORS_TOLERANCE_DEFAULT.toString());
63+
toleranceType = ToleranceType.valueOf(errorTolerance.toUpperCase(Locale.ROOT));
64+
}
65+
5466
/**
5567
* Gets the value schema to use for the Kafka Connect SourceRecord.
5668
*
@@ -78,6 +90,12 @@ public SchemaAndValue getValue(final JMSContext context, final String topic, fin
7890
throw new RecordBuilderException("Unsupported JMS message type");
7991
}
8092

81-
return converter.toConnectData(topic, payload);
93+
try {
94+
return converter.toConnectData(topic, payload);
95+
} catch (final Exception e) {
96+
if (toleranceType == ToleranceType.NONE)
97+
throw e;
98+
}
99+
return null;
82100
}
83101
}

0 commit comments

Comments
 (0)