Skip to content

Commit 8f09cb6

Browse files
authored
Merge pull request #191 from BillFarber/task/hardenCode
Gracefully handle problems with older versions of Kafka.
2 parents 33c2600 + 0615190 commit 8f09cb6

File tree

1 file changed

+31
-12
lines changed

1 file changed

+31
-12
lines changed

src/main/java/com/marklogic/kafka/connect/sink/WriteBatcherSinkTask.java

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.apache.kafka.connect.runtime.InternalSinkRecord;
3636
import org.apache.kafka.connect.sink.ErrantRecordReporter;
3737
import org.apache.kafka.connect.sink.SinkRecord;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
3840
import org.springframework.util.StringUtils;
3941

4042
import java.io.IOException;
@@ -53,6 +55,8 @@
5355
*/
5456
public class WriteBatcherSinkTask extends AbstractSinkTask {
5557

58+
protected static final Logger classLogger = LoggerFactory.getLogger(WriteBatcherSinkTask.class);
59+
5660
private DatabaseClient databaseClient;
5761
private DataMovementManager dataMovementManager;
5862
private WriteBatcher writeBatcher;
@@ -101,20 +105,35 @@ protected void writeSinkRecord(SinkRecord sinkRecord) {
101105

102106
static void addFailureHeaders(SinkRecord sinkRecord, Throwable e, String failureHeaderValue, WriteEvent writeEvent) {
103107
if (sinkRecord instanceof InternalSinkRecord) {
104-
ConsumerRecord<byte[], byte[]> originalRecord = ((InternalSinkRecord) sinkRecord).context().original();
105-
originalRecord.headers().add(MARKLOGIC_MESSAGE_FAILURE_HEADER, getBytesHandleNull(failureHeaderValue));
106-
originalRecord.headers().add(MARKLOGIC_MESSAGE_EXCEPTION_MESSAGE, getBytesHandleNull(e.getMessage()));
107-
originalRecord.headers().add(MARKLOGIC_ORIGINAL_TOPIC, getBytesHandleNull(sinkRecord.topic()));
108-
if (writeEvent != null) {
109-
originalRecord.headers().add(MARKLOGIC_TARGET_URI, writeEvent.getTargetUri().getBytes(StandardCharsets.UTF_8));
108+
try {
109+
ConsumerRecord<byte[], byte[]> originalRecord = ((InternalSinkRecord) sinkRecord).context().original();
110+
addFailureHeadersToOriginalSinkRecord(originalRecord, e, failureHeaderValue, writeEvent);
111+
} catch (NoSuchMethodError methodException) {
112+
classLogger.warn("This version of the MarkLogic Kafka Connector requires Kafka version 3.8.0 or" +
113+
" higher in order to store failure information on the original sink record. Instead, the failure" +
114+
" information will be on the wrapper sink record.");
115+
addFailureHeadersToNonInternalSinkRecord(sinkRecord, e, failureHeaderValue, writeEvent);
110116
}
111117
} else {
112-
sinkRecord.headers().addString(MARKLOGIC_MESSAGE_FAILURE_HEADER, failureHeaderValue);
113-
sinkRecord.headers().addString(MARKLOGIC_MESSAGE_EXCEPTION_MESSAGE, e.getMessage());
114-
sinkRecord.headers().addString(MARKLOGIC_ORIGINAL_TOPIC, sinkRecord.topic());
115-
if (writeEvent != null) {
116-
sinkRecord.headers().addString(MARKLOGIC_TARGET_URI, writeEvent.getTargetUri());
117-
}
118+
addFailureHeadersToNonInternalSinkRecord(sinkRecord, e, failureHeaderValue, writeEvent);
119+
}
120+
}
121+
122+
static void addFailureHeadersToNonInternalSinkRecord(SinkRecord sinkRecord, Throwable e, String failureHeaderValue, WriteEvent writeEvent) {
123+
sinkRecord.headers().addString(MARKLOGIC_MESSAGE_FAILURE_HEADER, failureHeaderValue);
124+
sinkRecord.headers().addString(MARKLOGIC_MESSAGE_EXCEPTION_MESSAGE, e.getMessage());
125+
sinkRecord.headers().addString(MARKLOGIC_ORIGINAL_TOPIC, sinkRecord.topic());
126+
if (writeEvent != null) {
127+
sinkRecord.headers().addString(MARKLOGIC_TARGET_URI, writeEvent.getTargetUri());
128+
}
129+
}
130+
131+
static void addFailureHeadersToOriginalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, Throwable e, String failureHeaderValue, WriteEvent writeEvent) {
132+
originalRecord.headers().add(MARKLOGIC_MESSAGE_FAILURE_HEADER, getBytesHandleNull(failureHeaderValue));
133+
originalRecord.headers().add(MARKLOGIC_MESSAGE_EXCEPTION_MESSAGE, getBytesHandleNull(e.getMessage()));
134+
originalRecord.headers().add(MARKLOGIC_ORIGINAL_TOPIC, getBytesHandleNull(originalRecord.topic()));
135+
if (writeEvent != null) {
136+
originalRecord.headers().add(MARKLOGIC_TARGET_URI, writeEvent.getTargetUri().getBytes(StandardCharsets.UTF_8));
118137
}
119138
}
120139

0 commit comments

Comments
 (0)