-
Notifications
You must be signed in to change notification settings - Fork 84
feat: support of errors.tolerance and dead letter queue #143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
6064cc2
to
27c8df7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds error tolerance support to the Kafka MQ source connector along with improved error handling and additional testing for JSON processing failures.
- Added connect-runtime dependency to pom.xml.
- Enhanced error handling in JsonRecordBuilder and BaseRecordBuilder with DLQ support.
- Updated MQSourceTask to filter out null SourceRecords and included new integration tests.
Reviewed Changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java | Added an override for configure() method (currently a pass-through call). |
src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java | Introduced error tolerance configuration and DLQ handling with improved error logging. |
src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java | Added error tolerance check and filtering of null SourceRecords in poll. |
src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java | Added DLQ topic configuration constants. |
src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java | Added tests for JSON parsing errors and error tolerance behavior. |
src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java | Updated method access modifiers for connector properties. |
pom.xml | Added connect-runtime dependency for Kafka Connect runtime features. |
src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java
Outdated
Show resolved
Hide resolved
src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java
Outdated
Show resolved
Hide resolved
76baba8
to
b644944
Compare
@dalelane I've made a few changes that might help us handle the dead letter queue. Initially, we implemented the error tolerance logic in JsonRecordBuilder, with the intention of centralizing error tolerance handling across all record builders, I have moved it to the baseRecordBuilder. I'm not sure if this is the best approach, so I'd appreciate your thoughts on it. To handle the dead letter queue, I've added logic to create a new source record for the DLQ topic whenever there's an exception during source record creation in any builder class. While reviewing other connector available in github, I noticed that DLQ records often include headers with details about the exception, task id, etc., so I followed a similar approach—though I'm not certain it's the right one. I’d really appreciate your feedback on this PR. |
b644944
to
d20ac82
Compare
// Extract key and value | ||
key = Optional.ofNullable(this.getKey(context, topic, message)); | ||
final SchemaAndValue value = this.getValue(context, topic, messageBodyJms, message); | ||
final SchemaAndValue actualKey = key.orElse(new SchemaAndValue(null, null)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retrieving the key used to be (line 169):
SchemaAndValue key = this.getKey(context, topic, message);
And this pull request proposes changing it to (line 222, line 226, line 228)
Optional<SchemaAndValue> key = Optional.empty();
key = Optional.ofNullable(this.getKey(context, topic, message));
final SchemaAndValue actualKey = key.orElse(new SchemaAndValue(null, null));
What is the motivation for this change? Given that we know the implementation of getKey
, we know that it will never return null
, so I'm not sure what Optional
is protecting against here. And the fact that you're very quickly unwrapping it does suggest that you perhaps didn't find it very useful?
Where there is no key information, my reading of the getKey
implementation is that it will always default to returning new SchemaAndValue(null, null)
anyway, so I couldn't see why the additional round-trip added here would be necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(less critical comment - the fact that getting the key is split across a few interleaved lines does make it harder to follow. If we do determine that we need additional steps, please try and keep them together. But perhaps let's agree on whether we need the steps first before we worry about that!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. My understanding of getKey was wrong when I was doing this. After going through my implementation I was able to understand its not required and I am changing this accordingly
log.warn("Exception caught during conversion of JMS message to SourceRecord: {}", e.getMessage()); | ||
if (logErrors) { | ||
log.error("Detailed error information:", e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be best to avoid logging the error twice. Perhaps put the log.warn
in an else
(after your if
) so we either log a warning or an error
private void logError(final Exception e) { | ||
log.warn("Exception caught during conversion of JMS message to SourceRecord: {}", e.getMessage()); | ||
if (logErrors) { | ||
log.error("Detailed error information:", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we're reusing the standard Connect config, we should aim to adopt the standard behaviour.
Specifically, I think we should try to log the message data as well as the exception
(cf. https://github.com/apache/kafka/blob/6e380fbbcc8fde22d1f2bb3310e1270d5b3f4837/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L198-L199 )
} | ||
|
||
// Read dlqTopic directly from props | ||
dlqTopic = props.get(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest checking if the string is empty here, as a one-off upfront check, to avoid needing to do the check on every failed message
(also worth trimming the string at this point as well)
final Exception exception) { | ||
|
||
// If errors are tolerated but no DLQ is configured, skip the message | ||
if (dlqTopic == null || dlqTopic.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above - perhaps better to make this a null check only, with any other checks (such as if it is an empty string) at init time
} | ||
|
||
// Read dlqTopic directly from props | ||
dlqTopic = props.get(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could move this into the if (tolerateErrors)
block to make it clearer that DLQ is only an option when errors.tolerance is set
// First 500 characters or less to avoid overly large headers | ||
final String truncatedStackTrace = stackTrace.length() <= 500 ? stackTrace |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What prompted this? Did you observe problems with putting the whole stack trace in the header?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did try this out and doesn't seem to have any problems
headers.addString("original_topic", originalTopic); | ||
headers.addString("error_message", exception.getMessage()); | ||
headers.addString("error_class", exception.getClass().getName()); | ||
headers.addString("error_timestamp", String.valueOf(System.currentTimeMillis())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Headers don't need to be strings - you could leave this as a number and just add a numeric header
// Add first few lines of stack trace (full stack trace might be too large) | ||
try { | ||
final StringWriter sw = new StringWriter(); | ||
final PrintWriter pw = new PrintWriter(sw); | ||
exception.printStackTrace(pw); | ||
final String stackTrace = sw.toString(); | ||
|
||
// First 500 characters or less to avoid overly large headers | ||
final String truncatedStackTrace = stackTrace.length() <= 500 ? stackTrace | ||
: stackTrace.substring(0, 500) + "... [truncated]"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest moving this into a separate method for getting the stack trace from an exception
@@ -18,6 +18,7 @@ | |||
import static java.nio.charset.StandardCharsets.UTF_8; | |||
|
|||
import java.util.HashMap; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it'd clean up the PR if you revert the no-op changes to this file
9a4b191
to
7336ce4
Compare
f3e139c
to
445850c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request adds support for error tolerance in the Kafka MQ source connector by enhancing error handling (including DLQ functionality), improving test coverage, and updating dependency management.
- Implements and configures error tolerance and dead letter queue (DLQ) support in ErrorHandler and record builders.
- Adds new unit and integration tests for verifying error tolerance behavior.
- Updates dependency definitions and connector configuration with new DLQ-related properties.
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
src/main/java/com/ibm/eventstreams/connect/mqsource/util/ErrorHandler.java | Introduces error logging, DLQ record creation and payload extraction for tolerant error handling. |
src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java | Minor import cleanup. |
src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java | Integrates error handler configuration and improves builder setup. |
src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java | Adjusts JMSWorker logic for exactly-once processing. |
src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java | Adds new configuration definitions for DLQ. |
src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java | Adds tests for JSON conversion errors and error tolerance. |
src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java | Adjusts method accessibility and static declaration for default properties. |
pom.xml | Adds new connect-runtime dependency. |
Comments suppressed due to low confidence (1)
src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java:45
- [nitpick] The variable 'keyheader' could be renamed to 'keyHeader' to follow Java naming conventions and improve code readability.
protected KeyHeader keyheader = KeyHeader.NONE;
dlqTopic = props.get(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG); | ||
if (dlqTopic != null && !dlqTopic.isEmpty()) { | ||
dlqTopic = dlqTopic.trim(); | ||
// TODO: Check if DLQ topic exists |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider implementing the check to verify whether the configured DLQ topic exists, or document clearly under which conditions this TODO is acceptable.
Copilot uses AI. Check for mistakes.
2079c03
to
fcfb653
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one tiny chore (we need to add a copyright header) but otherwise this is looking really good, thanks!
src/main/java/com/ibm/eventstreams/connect/mqsource/util/ErrorHandler.java
Show resolved
Hide resolved
fcfb653
to
149b105
Compare
- Added a IT for error tolerance - Added conditions with in JSONRecordBuilder for error tolerance Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
- Handle poison messages from base builder class - Poison message is handle when `errors.tolerance` is set to all - Option to configure deadletter queue for poison messages - New UTs for testing errors tolerance behaviour Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
0b6b951
to
a26a583
Compare
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
a26a583
to
1f43afc
Compare
This pull request introduces several changes to the Kafka MQ source connector, focusing on error handling and testing improvements. The most important changes include adding a new dependency, implementing error tolerance in the
JsonRecordBuilder
, and adding new test cases to ensure proper functionality.[UPDATE]
errors.tolerance
is set to allDependency Management:
connect-runtime
dependency topom.xml
to support Kafka connect runtime features.Error Handling:
JsonRecordBuilder
by adding thetoleranceType
field and configuring it based on the connector properties. Updated thegetValue
method to handle exceptions based on the tolerance type. [1] [2] [3] [4]MQSourceTask
to filter out nullSourceRecord
objects, allowing for error tolerance handling.Testing:
JsonRecordBuilderIT
to verify JSON parsing errors and error tolerance behavior.JsonRecordBuilderIT
.