Skip to content

Commit 445850c

Browse files
committed
feat: move out the error handling part to a seperate class
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent 6616083 commit 445850c

File tree

2 files changed

+368
-270
lines changed

2 files changed

+368
-270
lines changed

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java

Lines changed: 24 additions & 270 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,22 @@
1515
*/
1616
package com.ibm.eventstreams.connect.mqsource.builders;
1717

18-
import static java.nio.charset.StandardCharsets.UTF_8;
19-
20-
import java.io.PrintWriter;
21-
import java.io.StringWriter;
22-
import java.util.Locale;
2318
import java.util.Map;
2419
import java.util.Optional;
2520

26-
import javax.jms.BytesMessage;
2721
import javax.jms.JMSContext;
2822
import javax.jms.JMSException;
2923
import javax.jms.Message;
30-
import javax.jms.TextMessage;
3124

3225
import org.apache.kafka.connect.data.Schema;
3326
import org.apache.kafka.connect.data.SchemaAndValue;
34-
import org.apache.kafka.connect.header.ConnectHeaders;
35-
import org.apache.kafka.connect.header.Headers;
36-
import org.apache.kafka.connect.runtime.ConnectorConfig;
37-
import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
38-
import org.apache.kafka.connect.runtime.errors.ToleranceType;
3927
import org.apache.kafka.connect.source.SourceRecord;
4028
import org.slf4j.Logger;
4129
import org.slf4j.LoggerFactory;
4230

4331
import com.ibm.eventstreams.connect.mqsource.MQSourceConnector;
4432
import com.ibm.eventstreams.connect.mqsource.processor.JmsToKafkaHeaderConverter;
33+
import com.ibm.eventstreams.connect.mqsource.util.ErrorHandler;
4534

4635
/**
4736
* Builds Kafka Connect SourceRecords from messages.
@@ -57,18 +46,7 @@ public enum KeyHeader {
5746

5847
private boolean copyJmsPropertiesFlag = Boolean.FALSE;
5948
private JmsToKafkaHeaderConverter jmsToKafkaHeaderConverter;
60-
private boolean tolerateErrors;
61-
private boolean logErrors;
62-
private boolean logIncludeMessages;
63-
private String dlqTopic = "";
64-
private String queueName = "";
65-
66-
public static final String ERROR_HEADER_EXCEPTION_TIMESTAMP = DeadLetterQueueReporter.HEADER_PREFIX + "timestamp";
67-
public static final String ERROR_HEADER_EXCEPTION_CAUSE_CLASS = DeadLetterQueueReporter.HEADER_PREFIX + "cause.class";
68-
public static final String ERROR_HEADER_EXCEPTION_CAUSE_MESSAGE = DeadLetterQueueReporter.HEADER_PREFIX + "cause.message";
69-
public static final String ERROR_HEADER_JMS_MESSAGE_ID = DeadLetterQueueReporter.HEADER_PREFIX + "jms.message.id";
70-
public static final String ERROR_HEADER_JMS_TIMESTAMP = DeadLetterQueueReporter.HEADER_PREFIX + "jms.timestamp";
71-
public static final String ERROR_HEADER_QUEUE = DeadLetterQueueReporter.HEADER_PREFIX + "mq.queue";
49+
private ErrorHandler errorHandler = new ErrorHandler();
7250

7351
/**
7452
* Configure this class.
@@ -82,8 +60,17 @@ public void configure(final Map<String, String> props) {
8260
log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(),
8361
props);
8462

85-
initializeErrorTolerance(props);
63+
configureKeyHeader(props);
64+
configureJmsProperties(props);
65+
configureErrorHandler(props);
66+
67+
log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName());
68+
}
8669

70+
/**
71+
* Configure key header settings.
72+
*/
73+
private void configureKeyHeader(final Map<String, String> props) {
8774
final String kh = props.get(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER);
8875
if (kh != null) {
8976
if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSMESSAGEID)) {
@@ -103,51 +90,22 @@ public void configure(final Map<String, String> props) {
10390
throw new RecordBuilderException("Unsupported MQ record builder key header value");
10491
}
10592
}
93+
}
10694

95+
/**
96+
* Configure JMS properties settings.
97+
*/
98+
private void configureJmsProperties(final Map<String, String> props) {
10799
final String str = props.get(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER);
108100
copyJmsPropertiesFlag = Boolean.parseBoolean(Optional.ofNullable(str).orElse("false"));
109101
jmsToKafkaHeaderConverter = new JmsToKafkaHeaderConverter();
110-
111-
log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName());
112102
}
113103

114104
/**
115-
* Initializes error tolerance configuration by reading directly from properties
116-
* map
117-
* instead of using AbstractConfig
105+
* Configure error handler.
118106
*/
119-
private void initializeErrorTolerance(final Map<String, String> props) {
120-
// Read tolerateErrors directly from props
121-
final String errorToleranceValue = props.getOrDefault(
122-
ConnectorConfig.ERRORS_TOLERANCE_CONFIG,
123-
ToleranceType.NONE.toString()).toUpperCase(Locale.ROOT);
124-
125-
tolerateErrors = ToleranceType.valueOf(errorToleranceValue).equals(ToleranceType.ALL);
126-
127-
// Read logErrors directly from props
128-
if (tolerateErrors) {
129-
final String logErrorsValue = props.getOrDefault(
130-
ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG,
131-
String.valueOf(ConnectorConfig.ERRORS_LOG_ENABLE_DEFAULT));
132-
logErrors = Boolean.parseBoolean(logErrorsValue);
133-
final String logIncludeMessagesValue = props.getOrDefault(
134-
ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG,
135-
String.valueOf(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_DEFAULT));
136-
logIncludeMessages = Boolean.parseBoolean(logIncludeMessagesValue);
137-
138-
dlqTopic = props.get(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG);
139-
if (dlqTopic != null && !dlqTopic.isEmpty()) {
140-
dlqTopic = dlqTopic.trim();
141-
}
142-
143-
queueName = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE);
144-
if (queueName != null && !queueName.isEmpty()) {
145-
queueName = queueName.trim();
146-
}
147-
} else {
148-
logErrors = false;
149-
logIncludeMessages = false;
150-
}
107+
public void configureErrorHandler(final Map<String, String> props) {
108+
errorHandler.configure(props, copyJmsPropertiesFlag, jmsToKafkaHeaderConverter);
151109
}
152110

153111
/**
@@ -271,220 +229,16 @@ public SourceRecord toSourceRecord(final JMSContext context, final String topic,
271229
value.value());
272230
}
273231
} catch (final Exception e) {
274-
// Log the error
275-
logError(e, topic, message);
232+
// Log the error using error handler
233+
errorHandler.logError(e, topic, message);
276234

277235
// If errors are not tolerated, rethrow
278-
if (!tolerateErrors) {
236+
if (!errorHandler.shouldTolerateErrors()) {
279237
throw e;
280238
}
281239

282240
// Handle the error based on configured error tolerance
283-
return handleBuildException(message, sourceQueuePartition, sourceOffset, topic, key, e);
284-
}
285-
}
286-
287-
/**
288-
* Logs error based on `errors.log.enable` and `errors.log.include.messages` configurations.
289-
*
290-
* @param exception The exception that needs to be logged.
291-
* @param topic The Kafka topic associated with the message.
292-
* @param message The JMS message that caused the error.
293-
*/
294-
private void logError(final Exception exception, final String topic, final Message message) {
295-
if (logErrors) {
296-
if (logIncludeMessages) {
297-
log.error("Failed to process message on topic '{}'. Message content: {}. \nException: {}",
298-
topic, message, exception.toString(), exception);
299-
} else {
300-
log.error("Failed to process message on topic '{}'. \nException: {}", topic, exception.toString(), exception);
301-
}
302-
} else {
303-
log.warn("Error during message processing on topic '{}', but logging is suppressed. \nReason: {}",
304-
topic, extractReason(exception));
305-
}
306-
}
307-
308-
private String extractReason(final Exception exception) {
309-
if (exception == null) {
310-
return "Unknown error";
311-
}
312-
313-
final String message = exception.getMessage();
314-
if (message == null || message.trim().isEmpty()) {
315-
return "Unknown error";
316-
}
317-
318-
// Clean up trailing punctuation or whitespace (e.g., "error:" → "error")
319-
return message.replaceAll("[:\\s]+$", "");
320-
}
321-
322-
323-
/**
324-
*
325-
* Handles conversion errors based on configuration
326-
*
327-
* @param message The actual MQ message
328-
* @param sourceQueuePartition The Source Record queue partition
329-
* @param sourceOffset The Source Record offset
330-
* @param originalTopic The original topic name
331-
* @param key The SchemaAndValue to include in the source
332-
* record key
333-
* @param exception The exception that needs to be stored in the
334-
* header
335-
* @return SourceRecord
336-
*/
337-
private SourceRecord handleBuildException(final Message message, final Map<String, String> sourceQueuePartition,
338-
final Map<String, Long> sourceOffset, final String topic, final SchemaAndValue key,
339-
final Exception exception) {
340-
341-
// If errors are tolerated but no DLQ is configured, skip the message
342-
if (dlqTopic == null) {
343-
log.debug(
344-
"Skipping message due to conversion error: error tolerance is enabled but DLQ is not configured. Message will not be processed further.");
345-
return null;
346-
}
347-
348-
// Create DLQ record
349-
return createDlqRecord(message, sourceQueuePartition, sourceOffset, topic, key, exception);
350-
}
351-
352-
/**
353-
*
354-
* Creates a DLQ record with error information
355-
*
356-
* @param message The actual MQ message
357-
* @param sourceQueuePartition The Source Record queue partition
358-
* @param sourceOffset The Source Record offset
359-
* @param originalTopic The original topic name
360-
* @param key The SchemaAndValue to include in the source
361-
* record key
362-
* @param exception The exception that needs to be stored in the
363-
* header
364-
* @return SourceRecord
365-
*/
366-
private SourceRecord createDlqRecord(final Message message, final Map<String, String> sourceQueuePartition,
367-
final Map<String, Long> sourceOffset, final String originalTopic,
368-
final SchemaAndValue key, final Exception exception) {
369-
370-
try {
371-
// Extract payload or return null if extraction fails
372-
final Optional<byte[]> maybePayload = extractPayload(message);
373-
if (!maybePayload.isPresent()) {
374-
log.error("Skipping message due to payload extraction failure");
375-
return null;
376-
}
377-
378-
final byte[] payload = maybePayload.get();
379-
380-
// Create headers with error information
381-
final Headers headers = createErrorHeaders(message, originalTopic, exception);
382-
383-
return new SourceRecord(
384-
sourceQueuePartition,
385-
sourceOffset,
386-
dlqTopic,
387-
null,
388-
key.schema(),
389-
key.value(),
390-
Schema.OPTIONAL_BYTES_SCHEMA,
391-
payload,
392-
message.getJMSTimestamp(),
393-
headers);
394-
} catch (final Exception dlqException) {
395-
// If DLQ processing itself fails, log and skip
396-
log.error("Failed to create DLQ record: {}", dlqException.getMessage(), dlqException);
397-
return null;
398-
}
399-
}
400-
401-
/**
402-
*
403-
* Extracts payload from a JMS message with improved error handling
404-
*
405-
* @param message The actual message coming from mq
406-
*
407-
* @return Optional<byte[]>
408-
*/
409-
private Optional<byte[]> extractPayload(final Message message) {
410-
try {
411-
if (message instanceof BytesMessage) {
412-
log.debug("Extracting payload from BytesMessage for DLQ");
413-
return Optional.ofNullable(message.getBody(byte[].class));
414-
} else if (message instanceof TextMessage) {
415-
log.debug("Extracting payload from TextMessage for DLQ");
416-
final String text = message.getBody(String.class);
417-
return Optional.ofNullable(text != null ? text.getBytes(UTF_8) : null);
418-
} else {
419-
log.warn("Unsupported JMS message type '{}' encountered while extracting payload for DLQ. Falling back to message.toString().",
420-
message.getClass().getName());
421-
return Optional.ofNullable(message.toString().getBytes(UTF_8));
422-
}
423-
} catch (final JMSException e) {
424-
log.error("JMSException while extracting payload from message type '{}': {} for DLQ. Falling back to message.toString().",
425-
message.getClass().getName(), e.getMessage(), e);
426-
return Optional.ofNullable(message.toString().getBytes(UTF_8));
427-
}
428-
}
429-
430-
431-
/**
432-
*
433-
* Creates enhanced headers with error information for DLQ records
434-
* @param message The orginal message
435-
*
436-
* @param originalTopic The original topic name
437-
* @param exception The execption that needs to be included in the header
438-
*
439-
* @return Headers
440-
*/
441-
private Headers createErrorHeaders(final Message message, final String originalTopic, final Exception exception) {
442-
Headers headers = new ConnectHeaders();
443-
if (copyJmsPropertiesFlag) {
444-
headers = jmsToKafkaHeaderConverter.convertJmsPropertiesToKafkaHeaders(message);
445-
}
446-
447-
// Basic error information
448-
headers.addString(DeadLetterQueueReporter.ERROR_HEADER_ORIG_TOPIC, originalTopic);
449-
headers.addString(DeadLetterQueueReporter.ERROR_HEADER_EXECUTING_CLASS, exception.getClass().getName());
450-
headers.addString(DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_MESSAGE, exception.getMessage());
451-
452-
try {
453-
headers.addString(ERROR_HEADER_JMS_MESSAGE_ID, message.getJMSMessageID());
454-
headers.addLong(ERROR_HEADER_JMS_TIMESTAMP, message.getJMSTimestamp());
455-
} catch (final JMSException jmsException) {
456-
jmsException.printStackTrace();
457-
}
458-
459-
headers.addString(ERROR_HEADER_QUEUE, queueName);
460-
headers.addLong(ERROR_HEADER_EXCEPTION_TIMESTAMP, System.currentTimeMillis());
461-
462-
// Add cause if available
463-
if (exception.getCause() != null) {
464-
headers.addString(ERROR_HEADER_EXCEPTION_CAUSE_MESSAGE, exception.getCause().getMessage());
465-
headers.addString(ERROR_HEADER_EXCEPTION_CAUSE_CLASS, exception.getCause().getClass().getName());
466-
}
467-
468-
// Add first few lines of stack trace (full stack trace might be too large)
469-
headers.addString(DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_STACK_TRACE, stacktrace(exception));
470-
471-
return headers;
472-
}
473-
474-
private String stacktrace(final Exception exception) {
475-
try {
476-
final StringWriter sw = new StringWriter();
477-
final PrintWriter pw = new PrintWriter(sw);
478-
exception.printStackTrace(pw);
479-
final String stackTrace = sw.toString();
480-
481-
// First 500 characters or less to avoid overly large headers
482-
final String truncatedStackTrace = stackTrace.length() <= 500 ? stackTrace
483-
: stackTrace.substring(0, 500) + "... [truncated]";
484-
return truncatedStackTrace;
485-
} catch (final Exception e) {
486-
log.warn("Could not add stack trace to DLQ headers", e);
241+
return errorHandler.handleBuildException(message, sourceQueuePartition, sourceOffset, topic, key, e);
487242
}
488-
return null;
489243
}
490244
}

0 commit comments

Comments
 (0)