Skip to content

feat: support of errors.tolerance and dead letter queue #143

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 23, 2025

Conversation

Joel-hanson
Copy link
Contributor

@Joel-hanson Joel-hanson commented Feb 6, 2025

This pull request introduces several changes to the Kafka MQ source connector, focusing on error handling and testing improvements. The most important changes include adding a new dependency, implementing error tolerance in the JsonRecordBuilder, and adding new test cases to ensure proper functionality.

[UPDATE]

  • Handle poison messages from base builder class
  • Poison message is handle when errors.tolerance is set to all
  • Option to configure deadletter queue for poison messages
  • New UTs for testing errors tolerance behaviour

Dependency Management:

  • Added connect-runtime dependency to pom.xml to support Kafka connect runtime features.

Error Handling:

  • Implemented error tolerance in JsonRecordBuilder by adding the toleranceType field and configuring it based on the connector properties. Updated the getValue method to handle exceptions based on the tolerance type. [1] [2] [3] [4]
  • Modified MQSourceTask to filter out null SourceRecord objects, allowing for error tolerance handling.

Testing:

  • Added new test cases in JsonRecordBuilderIT to verify JSON parsing errors and error tolerance behavior.
  • Imported necessary classes for the new test cases in JsonRecordBuilderIT.

@Joel-hanson Joel-hanson requested a review from Copilot May 10, 2025 12:21
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds error tolerance support to the Kafka MQ source connector along with improved error handling and additional testing for JSON processing failures.

  • Added connect-runtime dependency to pom.xml.
  • Enhanced error handling in JsonRecordBuilder and BaseRecordBuilder with DLQ support.
  • Updated MQSourceTask to filter out null SourceRecords and included new integration tests.

Reviewed Changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java Added an override for configure() method (currently a pass-through call).
src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java Introduced error tolerance configuration and DLQ handling with improved error logging.
src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java Added error tolerance check and filtering of null SourceRecords in poll.
src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java Added DLQ topic configuration constants.
src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java Added tests for JSON parsing errors and error tolerance behavior.
src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java Updated method access modifiers for connector properties.
pom.xml Added connect-runtime dependency for Kafka Connect runtime features.

@Joel-hanson Joel-hanson force-pushed the errors-tolerance branch 4 times, most recently from 76baba8 to b644944 Compare May 10, 2025 12:37
@Joel-hanson Joel-hanson marked this pull request as ready for review May 10, 2025 12:39
@Joel-hanson
Copy link
Contributor Author

@dalelane I've made a few changes that might help us handle the dead letter queue. Initially, we implemented the error tolerance logic in JsonRecordBuilder, with the intention of centralizing error tolerance handling across all record builders, I have moved it to the baseRecordBuilder. I'm not sure if this is the best approach, so I'd appreciate your thoughts on it.

To handle the dead letter queue, I've added logic to create a new source record for the DLQ topic whenever there's an exception during source record creation in any builder class. While reviewing other connector available in github, I noticed that DLQ records often include headers with details about the exception, task id, etc., so I followed a similar approach—though I'm not certain it's the right one. I’d really appreciate your feedback on this PR.

@Joel-hanson Joel-hanson requested a review from dalelane May 10, 2025 12:39
@Joel-hanson Joel-hanson requested a review from Copilot May 12, 2025 09:20
Copilot

This comment was marked as outdated.

// Extract key and value
key = Optional.ofNullable(this.getKey(context, topic, message));
final SchemaAndValue value = this.getValue(context, topic, messageBodyJms, message);
final SchemaAndValue actualKey = key.orElse(new SchemaAndValue(null, null));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retrieving the key used to be (line 169):

SchemaAndValue key = this.getKey(context, topic, message);

And this pull request proposes changing it to (line 222, line 226, line 228)

Optional<SchemaAndValue> key = Optional.empty();
key = Optional.ofNullable(this.getKey(context, topic, message));
final SchemaAndValue actualKey = key.orElse(new SchemaAndValue(null, null));

What is the motivation for this change? Given that we know the implementation of getKey, we know that it will never return null, so I'm not sure what Optional is protecting against here. And the fact that you're very quickly unwrapping it does suggest that you perhaps didn't find it very useful?

Where there is no key information, my reading of the getKey implementation is that it will always default to returning new SchemaAndValue(null, null) anyway, so I couldn't see why the additional round-trip added here would be necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(less critical comment - the fact that getting the key is split across a few interleaved lines does make it harder to follow. If we do determine that we need additional steps, please try and keep them together. But perhaps let's agree on whether we need the steps first before we worry about that!)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. My understanding of getKey was wrong when I was doing this. After going through my implementation I was able to understand its not required and I am changing this accordingly

Comment on lines 275 to 278
log.warn("Exception caught during conversion of JMS message to SourceRecord: {}", e.getMessage());
if (logErrors) {
log.error("Detailed error information:", e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be best to avoid logging the error twice. Perhaps put the log.warn in an else (after your if) so we either log a warning or an error

private void logError(final Exception e) {
log.warn("Exception caught during conversion of JMS message to SourceRecord: {}", e.getMessage());
if (logErrors) {
log.error("Detailed error information:", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we're reusing the standard Connect config, we should aim to adopt the standard behaviour.

Specifically, I think we should try to log the message data as well as the exception
(cf. https://github.com/apache/kafka/blob/6e380fbbcc8fde22d1f2bb3310e1270d5b3f4837/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L198-L199 )

}

// Read dlqTopic directly from props
dlqTopic = props.get(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest checking if the string is empty here, as a one-off upfront check, to avoid needing to do the check on every failed message

(also worth trimming the string at this point as well)

final Exception exception) {

// If errors are tolerated but no DLQ is configured, skip the message
if (dlqTopic == null || dlqTopic.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above - perhaps better to make this a null check only, with any other checks (such as if it is an empty string) at init time

}

// Read dlqTopic directly from props
dlqTopic = props.get(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could move this into the if (tolerateErrors) block to make it clearer that DLQ is only an option when errors.tolerance is set

Comment on lines 417 to 418
// First 500 characters or less to avoid overly large headers
final String truncatedStackTrace = stackTrace.length() <= 500 ? stackTrace
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What prompted this? Did you observe problems with putting the whole stack trace in the header?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did try this out and doesn't seem to have any problems

headers.addString("original_topic", originalTopic);
headers.addString("error_message", exception.getMessage());
headers.addString("error_class", exception.getClass().getName());
headers.addString("error_timestamp", String.valueOf(System.currentTimeMillis()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 410 to 419
// Add first few lines of stack trace (full stack trace might be too large)
try {
final StringWriter sw = new StringWriter();
final PrintWriter pw = new PrintWriter(sw);
exception.printStackTrace(pw);
final String stackTrace = sw.toString();

// First 500 characters or less to avoid overly large headers
final String truncatedStackTrace = stackTrace.length() <= 500 ? stackTrace
: stackTrace.substring(0, 500) + "... [truncated]";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest moving this into a separate method for getting the stack trace from an exception

@@ -18,6 +18,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;

import java.util.HashMap;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd clean up the PR if you revert the no-op changes to this file

@Joel-hanson Joel-hanson force-pushed the errors-tolerance branch 2 times, most recently from 9a4b191 to 7336ce4 Compare June 6, 2025 15:20
@Joel-hanson Joel-hanson requested a review from Copilot June 6, 2025 15:24
Copilot

This comment was marked as outdated.

Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This pull request adds support for error tolerance in the Kafka MQ source connector by enhancing error handling (including DLQ functionality), improving test coverage, and updating dependency management.

  • Implements and configures error tolerance and dead letter queue (DLQ) support in ErrorHandler and record builders.
  • Adds new unit and integration tests for verifying error tolerance behavior.
  • Updates dependency definitions and connector configuration with new DLQ-related properties.

Reviewed Changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
src/main/java/com/ibm/eventstreams/connect/mqsource/util/ErrorHandler.java Introduces error logging, DLQ record creation and payload extraction for tolerant error handling.
src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java Minor import cleanup.
src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java Integrates error handler configuration and improves builder setup.
src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java Adjusts JMSWorker logic for exactly-once processing.
src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java Adds new configuration definitions for DLQ.
src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java Adds tests for JSON conversion errors and error tolerance.
src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java Adjusts method accessibility and static declaration for default properties.
pom.xml Adds new connect-runtime dependency.
Comments suppressed due to low confidence (1)

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java:45

  • [nitpick] The variable 'keyheader' could be renamed to 'keyHeader' to follow Java naming conventions and improve code readability.
    protected KeyHeader keyheader = KeyHeader.NONE;

dlqTopic = props.get(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG);
if (dlqTopic != null && !dlqTopic.isEmpty()) {
dlqTopic = dlqTopic.trim();
// TODO: Check if DLQ topic exists
Copy link
Preview

Copilot AI Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider implementing the check to verify whether the configured DLQ topic exists, or document clearly under which conditions this TODO is acceptable.

Copilot uses AI. Check for mistakes.

@Joel-hanson Joel-hanson changed the title feat: support of errors.tolerance feat: support of errors.tolerance and dead letter queue Jun 17, 2025
@Joel-hanson Joel-hanson force-pushed the errors-tolerance branch 2 times, most recently from 2079c03 to fcfb653 Compare June 20, 2025 11:09
dalelane
dalelane previously approved these changes Jun 22, 2025
Copy link
Contributor

@dalelane dalelane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one tiny chore (we need to add a copyright header) but otherwise this is looking really good, thanks!

- Added a IT for error tolerance
- Added conditions with in JSONRecordBuilder for error tolerance

Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
- Handle poison messages from base builder class
- Poison message is handle when `errors.tolerance` is set to all
- Option to configure deadletter queue for poison messages
- New UTs for testing errors tolerance behaviour

Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
@Joel-hanson Joel-hanson force-pushed the errors-tolerance branch 2 times, most recently from 0b6b951 to a26a583 Compare June 23, 2025 06:58
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
@Joel-hanson Joel-hanson merged commit 25c102f into ibm-messaging:main Jun 23, 2025
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants