diff --git a/.github/ISSUE_TEMPLATE/BUG-REPORT.yml b/.github/ISSUE_TEMPLATE/BUG-REPORT.yml index 6fd8ea1..401ecfd 100644 --- a/.github/ISSUE_TEMPLATE/BUG-REPORT.yml +++ b/.github/ISSUE_TEMPLATE/BUG-REPORT.yml @@ -58,7 +58,7 @@ body: description: What version of our software are you running? options: - latest - - 2.3.0 + - 2.4.0 (Default) - 1.3.5 - older (<1.3.5) validations: diff --git a/README.md b/README.md index 0cdb50a..1e19519 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ mvn clean package Once built, the output is a single JAR called `target/kafka-connect-mq-source--jar-with-dependencies.jar` which contains all of the required dependencies. -**NOTE:** With the 2.0.0 release the base Kafka Connect library has been updated from 2.6.0 to 3.4.0 to enable the implementation of the exactly-once delivery. +**NOTE:** With the 2.0.0 release the base Kafka Connect library has been updated from 2.6.0 to 3.4.0 to enable the implementation of the exactly-once delivery. ## Running the connector @@ -75,7 +75,7 @@ The connector connects to MQ using either a client or a bindings connection. For To run the connector in standalone mode from the directory into which you installed Apache Kafka, you use a command like this: -``` shell +```shell bin/connect-standalone.sh connect-standalone.properties mq-source.properties ``` @@ -85,7 +85,7 @@ You need an instance of Kafka Connect running in distributed mode. The Kafka dis To start the MQ connector, you can use `config/mq-source.json` in this repository after replacing all placeholders and use a command like this: -``` shell +```shell curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors \ --data "@./config/mq-source.json" ``` @@ -100,13 +100,13 @@ This repository includes an example Dockerfile to run Kafka Connect in distribut **NOTE:** To provide custom properties files create a folder called `config` containing the `connect-distributed.properties` and `connect-log4j.properties` files and use a Docker volume to make them available when running the container like this: -``` shell +```shell docker run -v $(pwd)/config:/opt/kafka/config -p 8083:8083 kafkaconnect-with-mq-source:2.0.0 ``` To start the MQ connector, you can use `config/mq-source.json` in this repository after replacing all placeholders and use a command like this: -``` shell +```shell curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors \ --data "@./config/mq-source.json" ``` @@ -182,12 +182,12 @@ There are two record builders supplied with the connector, although you can writ There are three converters built into Apache Kafka. You need to make sure that the incoming message format, the setting of the `mq.message.body.jms` configuration, the record builder and converter are all compatible. By default, everything is just treated as bytes but if you want the connector to understand the message format and apply more sophisticated processing such as single-message transforms, you'll need a more complex configuration. The following table shows the basic options that work. -| Record builder class | Incoming MQ message | mq.message.body.jms | Converter class | Outgoing Kafka message | -| ------------------------------------------------------------------- | ---------------------- | ------------------- | ------------------------------------------------------ | ----------------------- | -| com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder | Any | false (default) | org.apache.kafka.connect.converters.ByteArrayConverter | **Binary data** | -| com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder | JMS BytesMessage | true | org.apache.kafka.connect.converters.ByteArrayConverter | **Binary data** | -| com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder | JMS TextMessage | true | org.apache.kafka.connect.storage.StringConverter | **String data** | -| com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder | JSON, may have schema | Not used | org.apache.kafka.connect.json.JsonConverter | **JSON, no schema** | +| Record builder class | Incoming MQ message | mq.message.body.jms | Converter class | Outgoing Kafka message | +| ------------------------------------------------------------------- | --------------------- | ------------------- | ------------------------------------------------------ | ---------------------- | +| com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder | Any | false (default) | org.apache.kafka.connect.converters.ByteArrayConverter | **Binary data** | +| com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder | JMS BytesMessage | true | org.apache.kafka.connect.converters.ByteArrayConverter | **Binary data** | +| com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder | JMS TextMessage | true | org.apache.kafka.connect.storage.StringConverter | **String data** | +| com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder | JSON, may have schema | Not used | org.apache.kafka.connect.json.JsonConverter | **JSON, no schema** | There's no single configuration that will always be right, but here are some high-level suggestions. @@ -243,7 +243,7 @@ You must then choose a converter than can handle the value schema and class. The By default, the connector does not use keys for the Kafka messages it publishes. It can be configured to use the JMS message headers to set the key of the Kafka records. You could use this, for example, to use the MQMD correlation identifier as the partitioning key when the messages are published to Kafka. There are four valid values for the `mq.record.builder.key.header` that controls this behavior. | mq.record.builder.key.header | Key schema | Key class | Recommended value for key.converter | -| ---------------------------- |---------------- | --------- | ------------------------------------------------------ | +| ---------------------------- | --------------- | --------- | ------------------------------------------------------ | | JMSMessageID | OPTIONAL_STRING | String | org.apache.kafka.connect.storage.StringConverter | | JMSCorrelationID | OPTIONAL_STRING | String | org.apache.kafka.connect.storage.StringConverter | | JMSCorrelationIDAsBytes | OPTIONAL_BYTES | byte[] | org.apache.kafka.connect.converters.ByteArrayConverter | @@ -277,43 +277,63 @@ For troubleshooting, or to better understand the handshake performed by the IBM The configuration options for the Kafka Connect source connector for IBM MQ are as follows: -| Name | Description | Type | Default | Valid values | -| --------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------- | ------- | -------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| topic | The name of the target Kafka topic | string | | Topic name | -| mq.queue.manager | The name of the MQ queue manager | string | | MQ queue manager name | -| mq.connection.mode | The connection mode - bindings or client | string | client | client, bindings | -| mq.connection.name.list | List of connection names for queue manager | string | | host(port)[,host(port),...] | -| mq.channel.name | The name of the server-connection channel | string | | MQ channel name | -| mq.queue | The name of the source MQ queue | string | | MQ queue name | -| mq.exactly.once.state.queue | The name of the MQ queue used to store state when running with exactly-once semantics | string | | MQ state queue name | -| mq.user.name | The user name for authenticating with the queue manager | string | | User name | -| mq.password | The password for authenticating with the queue manager | string | | Password | -| mq.user.authentication.mqcsp | Whether to use MQ connection security parameters (MQCSP) | boolean | true | | -| mq.ccdt.url | The URL for the CCDT file containing MQ connection details | string | | URL for obtaining a CCDT file | -| mq.record.builder | The class used to build the Kafka Connect record | string | | Class implementing RecordBuilder | -| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | | -| mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string | | JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes, JMSDestination | -| mq.jms.properties.copy.to.kafka.headers | Whether to copy JMS message properties to Kafka headers | boolean | false | | -| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite | -| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern | -| mq.ssl.keystore.location | The path to the JKS keystore to use for SSL (TLS) connections | string | JVM keystore | Local path to a JKS file | -| mq.ssl.keystore.password | The password of the JKS keystore to use for SSL (TLS) connections | string | | | -| mq.ssl.truststore.location | The path to the JKS truststore to use for SSL (TLS) connections | string | JVM truststore | Local path to a JKS file | -| mq.ssl.truststore.password | The password of the JKS truststore to use for SSL (TLS) connections | string | | | -| mq.ssl.use.ibm.cipher.mappings | Whether to set system property to control use of IBM cipher mappings | boolean | | | -| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater | -| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | | -| mq.max.poll.blocked.time.ms | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. | -| mq.client.reconnect.options | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED | -| mq.message.receive.timeout | The timeout (in milliseconds) for the first request to receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater | -| mq.receive.subsequent.timeout.ms | The timeout (in milliseconds) for receiving messages from the queue manager on the subsequent receives before returning to Kafka Connect. | long | 0 | 1 or greater | -| mq.reconnect.delay.min.ms | The minimum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 64 | 1 or greater | -| mq.reconnect.delay.max.ms | The maximum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 8192 | 1 or greater | +| Name | Description | Type | Default | Valid values | +| ----------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | -------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `topic` | The name of the target Kafka topic | string | | Topic name | +| `mq.queue.manager` | The name of the MQ queue manager | string | | MQ queue manager name | +| `mq.connection.mode` | The connection mode - bindings or client | string | client | client, bindings | +| `mq.connection.name.list` | List of connection names for queue manager | string | | host(port)[,host(port),...] | +| `mq.channel.name` | The name of the server-connection channel | string | | MQ channel name | +| `mq.queue` | The name of the source MQ queue | string | | MQ queue name | +| `mq.exactly.once.state.queue` | The name of the MQ queue used to store state when running with exactly-once semantics | string | | MQ state queue name | +| `mq.user.name` | The user name for authenticating with the queue manager | string | | User name | +| `mq.password` | The password for authenticating with the queue manager | string | | Password | +| `mq.user.authentication.mqcsp` | Whether to use MQ connection security parameters (MQCSP) | boolean | true | | +| `mq.ccdt.url` | The URL for the CCDT file containing MQ connection details | string | | URL for obtaining a CCDT file | +| `mq.record.builder` | The class used to build the Kafka Connect record | string | | Class implementing RecordBuilder | +| `mq.message.body.jms` | Whether to interpret the message body as a JMS message type | boolean | false | | +| `mq.record.builder.key.header` | The JMS message header to use as the Kafka record key | string | | JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes, JMSDestination | +| `mq.jms.properties.copy.to.kafka.headers` | Whether to copy JMS message properties to Kafka headers | boolean | false | | +| `mq.ssl.cipher.suite` | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite | +| `mq.ssl.peer.name` | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern | +| `mq.ssl.keystore.location` | The path to the JKS keystore to use for SSL (TLS) connections | string | JVM keystore | Local path to a JKS file | +| `mq.ssl.keystore.password` | The password of the JKS keystore to use for SSL (TLS) connections | string | | | +| `mq.ssl.truststore.location` | The path to the JKS truststore to use for SSL (TLS) connections | string | JVM truststore | Local path to a JKS file | +| `mq.ssl.truststore.password` | The password of the JKS truststore to use for SSL (TLS) connections | string | | | +| `mq.ssl.use.ibm.cipher.mappings` | Whether to set system property to control use of IBM cipher mappings | boolean | | | +| `mq.batch.size` | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater | +| `mq.message.mqmd.read` | Whether to enable reading of all MQMD fields | boolean | false | | +| `mq.max.poll.blocked.time.ms` | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. | +| `mq.client.reconnect.options` | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED | +| `mq.message.receive.timeout` | The timeout (in milliseconds) for receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater | +| `mq.reconnect.delay.min.ms` | The minimum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 64 | 1 or greater | +| `mq.reconnect.delay.max.ms` | The maximum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 8192 | 1 or greater | +| `errors.deadletterqueue.topic.name` | The name of the Kafka topic to use as the dead letter queue (DLQ) for poison messages that fail during processing within the record builder component of the connector. | string | | If left blank (default), failed messages will not be written to a DLQ. | +| `errors.deadletterqueue.context.headers.enable` | Whether to add error context headers to messages written to the DLQ. | boolean | false | When enabled, additional headers describing the error will be included with each DLQ record. | ### Using a CCDT file Some of the connection details for MQ can be provided in a [CCDT file](https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.1.0/com.ibm.mq.con.doc/q016730_.htm) by setting `mq.ccdt.url` in the MQ source connector configuration file. If using a CCDT file the `mq.connection.name.list` and `mq.channel.name` configuration options are not required. +### DLQ Context Headers + +If `errors.deadletterqueue.context.headers.enable` is set to `true`, the following Kafka record headers are added to each DLQ record: + +| Header | Description | +| --------------------------------------- | --------------------------------------------------------------------------------------- | +| `__connect.errors.topic` | The name of the topic to which the JMS message was originally written by the connector. | +| `__connect.errors.exception.class.name` | The class name of the exception that caused the failure. | +| `__connect.errors.exception.message` | The message of the exception. | +| `__connect.errors.exception.stacktrace` | The stack trace of the exception (first 500 characters only, to minimize record size). | +| `__connect.errors.timestamp` | The timestamp when the DLQ record was created. | +| `__connect.errors.cause.class` | The class name of the root cause of the failure (if applicable). | +| `__connect.errors.cause.message` | The message of the root cause (if applicable). | +| `__connect.errors.jms.message.id` | The ID of the JMS message that failed. | +| `__connect.errors.jms.timestamp` | The timestamp of the original JMS message. | +| `__connect.errors.mq.queue` | The name of the MQ queue from which the JMS message was consumed. | + +Additionally, the connector uses the header specified by `mq.record.builder.key.header` (if configured) as the Kafka record key for DLQ messages. + ### Externalizing secrets [KIP 297](https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations) introduced a mechanism to externalize secrets to be used as configuration for Kafka connectors. @@ -353,7 +373,7 @@ Version 2.0.0 of the MQ source connector introduced exactly-once message deliver ### Exactly-once delivery Kafka Connect worker configuration -To enable exactly-once delivery, the MQ source connector must be run on Kafka Connect version 3.3.0 or later with the `exactly.once.source.support` property set to `enabled` in the Kafka connect worker configuration. See the [Kafka documentation](https://kafka.apache.org/documentation/#connect_exactlyoncesource) for more details about this setting, and the ACL requirements for the worker nodes. +To enable exactly-once delivery, the MQ source connector must be run on Kafka Connect version 3.3.0 or later with the `exactly.once.source.support` property set to `enabled` in the Kafka connect worker configuration. See the [Kafka documentation](https://kafka.apache.org/documentation/#connect_exactlyoncesource) for more details about this setting, and the ACL requirements for the worker nodes. **Note**: Exactly-once support for source connectors is only available in [distributed mode](#running-in-distributed-mode); standalone Connect workers cannot provide exactly-once delivery semantics. Kafka Connect is in distributed mode when [running the connector with Docker](#running-with-docker) and when [deploying the connector to Kubernetes](#deploying-to-kubernetes). @@ -392,7 +412,7 @@ In some of these failure scenarios, it will be necessary for an MQ administrator ### Connector in a `FAILED` state -If the connector experiences a non retriable error then a ConnectException will cause the connector to go in to a `FAILED` state. This will require a manual restart using the Kafka Connect REST API to restart the connector. +If the connector experiences a non retriable error then a ConnectException will cause the connector to go in to a `FAILED` state. This will require a manual restart using the Kafka Connect REST API to restart the connector. ### Unable to connect to Kafka @@ -424,4 +444,4 @@ Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and -limitations under the License.The project is licensed under the Apache 2 license. \ No newline at end of file +limitations under the License.The project is licensed under the Apache 2 license. diff --git a/pom.xml b/pom.xml index f143f50..2cba6bf 100644 --- a/pom.xml +++ b/pom.xml @@ -54,6 +54,12 @@ 3.7.1 provided + + org.apache.kafka + connect-runtime + 3.7.1 + provided + javax.jms diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java index eaf97cf..316a47c 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java @@ -94,7 +94,7 @@ protected JMSContext getJmsContext() throws Exception { return jmsContext; } - protected Map getDefaultConnectorProperties() { + public static Map getDefaultConnectorProperties() { final Map props = new HashMap<>(); props.put("mq.queue.manager", QMGR_NAME); props.put("mq.connection.mode", "client"); @@ -106,7 +106,7 @@ protected Map getDefaultConnectorProperties() { return props; } - public AbstractConfig getPropertiesConfig(Map props) { + public AbstractConfig getPropertiesConfig(final Map props) { final AbstractConfig connectorConfig = new AbstractConfig(MQSourceConnector.CONFIGDEF, props); return connectorConfig; } diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java index f391e6b..4f4fe32 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java @@ -22,6 +22,7 @@ import static com.ibm.eventstreams.connect.mqsource.utils.MessagesObjectMother.createAListOfMessages; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -37,15 +38,21 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.TextMessage; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.source.SourceRecord; import org.junit.After; import org.junit.Before; @@ -59,8 +66,6 @@ import com.ibm.eventstreams.connect.mqsource.utils.MQTestUtil; import com.ibm.eventstreams.connect.mqsource.utils.SourceTaskStopper; - - public class MQSourceTaskIT extends AbstractJMSContextIT { private MQSourceTask connectTask = null; @@ -106,7 +111,8 @@ public void verifyJmsTextMessages() throws Exception { final Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectTask.start(connectorConfigProps); assertThat(connectTask.startUpAction).isEqualTo(MQSourceTaskStartUpAction.NORMAL_OPERATION); @@ -133,7 +139,8 @@ public void verifyJmsJsonMessages() throws Exception { final Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); connectTask.start(connectorConfigProps); @@ -165,20 +172,23 @@ public void verifyMQMessage() throws Exception { connectTask = getSourceTaskWithEmptyKafkaOffset(); final Map connectorConfigProps = createDefaultConnectorProperties(); - connectorConfigProps.put("mq.message.body.jms", "false"); //this could also be absent but if set to true the test should fail - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.message.body.jms", "false"); // this could also be absent but if set to true the + // test should fail + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectTask.start(connectorConfigProps); final String sent = "Hello World"; - final String url = "https://localhost:" + REST_API_HOST_PORT + "/ibmmq/rest/v1/messaging/qmgr/" + QMGR_NAME + "/queue/DEV.QUEUE.1/message"; + final String url = "https://localhost:" + REST_API_HOST_PORT + "/ibmmq/rest/v1/messaging/qmgr/" + QMGR_NAME + + "/queue/DEV.QUEUE.1/message"; JsonRestApi.postString(url, "app", ADMIN_PASSWORD, sent); final List kafkaMessages = connectTask.poll(); // get all the SRs (1) - SourceRecord firstMsg = kafkaMessages.get(0); - Object received = firstMsg.value(); + final SourceRecord firstMsg = kafkaMessages.get(0); + final Object received = firstMsg.value(); - assertNotEquals(received.getClass(), String.class); //jms messages are retrieved as Strings + assertNotEquals(received.getClass(), String.class); // jms messages are retrieved as Strings assertEquals(received.getClass(), byte[].class); assertEquals(new String((byte[]) received, StandardCharsets.UTF_8), sent); @@ -192,7 +202,8 @@ public void verifyJmsMessageHeaders() throws Exception { final Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.jms.properties.copy.to.kafka.headers", "true"); connectTask.start(connectorConfigProps); @@ -226,7 +237,8 @@ public void verifyMessageBatchIndividualCommits() throws Exception { final Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.batch.size", "10"); connectTask.start(connectorConfigProps); @@ -273,7 +285,8 @@ public void verifyMessageBatchGroupCommits() throws Exception { final Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.batch.size", "10"); connectTask.start(connectorConfigProps); @@ -427,28 +440,30 @@ public void verifyDestinationAsKey() throws Exception { @Test public void testSequenceStateMsgReadUnderMQTx() throws Exception { - JMSWorker spyJMSWorker = Mockito.spy(new JMSWorker()); + final JMSWorker spyJMSWorker = Mockito.spy(new JMSWorker()); connectTask = getSourceTaskWithEmptyKafkaOffset(); final Map connectorConfigProps = createExactlyOnceConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); spyJMSWorker.configure(getPropertiesConfig(connectorConfigProps)); - JMSWorker dedicated = new JMSWorker(); + final JMSWorker dedicated = new JMSWorker(); dedicated.configure(getPropertiesConfig(connectorConfigProps)); - SequenceStateClient sequenceStateClient = Mockito.spy(new SequenceStateClient(DEFAULT_STATE_QUEUE, spyJMSWorker, dedicated)); + final SequenceStateClient sequenceStateClient = Mockito + .spy(new SequenceStateClient(DEFAULT_STATE_QUEUE, spyJMSWorker, dedicated)); connectTask.start(connectorConfigProps, spyJMSWorker, dedicated, sequenceStateClient); final List messages = createAListOfMessages(getJmsContext(), 2, "message "); putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages); - List kafkaMessages; + final List kafkaMessages; kafkaMessages = connectTask.poll(); - List stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE); + final List stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE); assertThat(stateMsgs1.size()).isEqualTo(1); for (final SourceRecord m : kafkaMessages) { @@ -460,23 +475,21 @@ public void testSequenceStateMsgReadUnderMQTx() throws Exception { spyJMSWorker.getContext().rollback(); throw new Exception("such an exception"); - }).when(spyJMSWorker).commit(); + }).when(spyJMSWorker).commit(); try { connectTask.poll(); - } catch (Exception e) { + } catch (final Exception e) { System.out.println("exception caught"); } - /// expect statequeue to not be empty - List stateMsgs2 = getAllMessagesFromQueue(DEFAULT_STATE_QUEUE); + final List stateMsgs2 = getAllMessagesFromQueue(DEFAULT_STATE_QUEUE); assertThat(stateMsgs2.size()).isEqualTo(1); - List sourceMsgs = getAllMessagesFromQueue(DEFAULT_SOURCE_QUEUE); + final List sourceMsgs = getAllMessagesFromQueue(DEFAULT_SOURCE_QUEUE); assertThat(sourceMsgs.size()).isEqualTo(2); - } @Test @@ -486,13 +499,14 @@ public void testSequenceStateMsgWrittenIndependentFromGetSource() throws Excepti final Map connectorConfigProps = createExactlyOnceConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); - JMSWorker shared = new JMSWorker(); + final JMSWorker shared = new JMSWorker(); shared.configure(getPropertiesConfig(connectorConfigProps)); - JMSWorker dedicated = new JMSWorker(); + final JMSWorker dedicated = new JMSWorker(); dedicated.configure(getPropertiesConfig(connectorConfigProps)); - SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated); + final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated); connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient); @@ -501,10 +515,11 @@ public void testSequenceStateMsgWrittenIndependentFromGetSource() throws Excepti connectTask.poll(); - List stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE); + final List stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE); assertThat(stateMsgs1.size()).isEqualTo(1); shared.attemptRollback(); - assertThat(stateMsgs1.size()).isEqualTo(1); //state message is still there even though source message were rolled back + assertThat(stateMsgs1.size()).isEqualTo(1); // state message is still there even though source message were + // rolled back } @@ -513,17 +528,18 @@ public void testRemoveDeliveredMessagesFromSourceQueueThrowsException() throws E final Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); - JMSWorker spyJMSWorker = Mockito.spy(new JMSWorker()); - JMSWorker spyDedicated = Mockito.spy(new JMSWorker()); - JMSWorker spyShared = Mockito.spy(new JMSWorker()); + final JMSWorker spyJMSWorker = Mockito.spy(new JMSWorker()); + final JMSWorker spyDedicated = Mockito.spy(new JMSWorker()); + final JMSWorker spyShared = Mockito.spy(new JMSWorker()); spyJMSWorker.configure(getPropertiesConfig(connectorConfigProps)); spyDedicated.configure(getPropertiesConfig(connectorConfigProps)); spyShared.configure(getPropertiesConfig(connectorConfigProps)); - Message messageSpy = Mockito.spy(getJmsContext().createTextMessage("Spy Injected Message")); + final Message messageSpy = Mockito.spy(getJmsContext().createTextMessage("Spy Injected Message")); doReturn("6") .when(messageSpy) @@ -536,32 +552,32 @@ public void testRemoveDeliveredMessagesFromSourceQueueThrowsException() throws E anyBoolean()); connectTask = getSourceTaskWithEmptyKafkaOffset(); - connectTask.start(connectorConfigProps, spyJMSWorker, spyDedicated, new SequenceStateClient(DEFAULT_STATE_QUEUE, spyShared, spyJMSWorker)); + connectTask.start(connectorConfigProps, spyJMSWorker, spyDedicated, + new SequenceStateClient(DEFAULT_STATE_QUEUE, spyShared, spyJMSWorker)); - String[] msgIds = new String[] {"1", "2"}; + final String[] msgIds = new String[] { "1", "2" }; assertThrows(SequenceStateException.class, - () -> connectTask.removeDeliveredMessagesFromSourceQueue(Arrays.asList(msgIds)) - ); + () -> connectTask.removeDeliveredMessagesFromSourceQueue(Arrays.asList(msgIds))); } - @Test public void testRemoveDeliveredMessagesFromSourceQueueDoesNotThrowException() throws Exception { final Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); - JMSWorker spyJMSWorker = Mockito.spy(new JMSWorker()); - JMSWorker spyDedicated = Mockito.spy(new JMSWorker()); - JMSWorker spyShared = Mockito.spy(new JMSWorker()); + final JMSWorker spyJMSWorker = Mockito.spy(new JMSWorker()); + final JMSWorker spyDedicated = Mockito.spy(new JMSWorker()); + final JMSWorker spyShared = Mockito.spy(new JMSWorker()); spyJMSWorker.configure(getPropertiesConfig(connectorConfigProps)); spyDedicated.configure(getPropertiesConfig(connectorConfigProps)); spyShared.configure(getPropertiesConfig(connectorConfigProps)); - Message messageSpy = Mockito.spy(getJmsContext().createTextMessage("Spy Injected Message")); + final Message messageSpy = Mockito.spy(getJmsContext().createTextMessage("Spy Injected Message")); doReturn("1") .when(messageSpy) @@ -574,15 +590,15 @@ public void testRemoveDeliveredMessagesFromSourceQueueDoesNotThrowException() th anyBoolean()); connectTask = getSourceTaskWithEmptyKafkaOffset(); - connectTask.start(connectorConfigProps, spyJMSWorker, spyDedicated, new SequenceStateClient(DEFAULT_STATE_QUEUE, spyShared, spyJMSWorker)); + connectTask.start(connectorConfigProps, spyJMSWorker, spyDedicated, + new SequenceStateClient(DEFAULT_STATE_QUEUE, spyShared, spyJMSWorker)); - String[] msgIds = new String[] {"1", "2"}; + final String[] msgIds = new String[] { "1", "2" }; assertThatNoException() .isThrownBy(() -> connectTask.removeDeliveredMessagesFromSourceQueue(Arrays.asList(msgIds))); } - @Test public void testConfigureClientReconnectOptions() throws Exception { // setup test condition: put messages on source queue, poll once to read them @@ -590,14 +606,15 @@ public void testConfigureClientReconnectOptions() throws Exception { final Map connectorConfigProps = createExactlyOnceConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.client.reconnect.options", "QMGR"); - JMSWorker shared = new JMSWorker(); + final JMSWorker shared = new JMSWorker(); shared.configure(getPropertiesConfig(connectorConfigProps)); - JMSWorker dedicated = new JMSWorker(); + final JMSWorker dedicated = new JMSWorker(); dedicated.configure(getPropertiesConfig(connectorConfigProps)); - SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated); + final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated); connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient); @@ -606,10 +623,11 @@ public void testConfigureClientReconnectOptions() throws Exception { connectTask.poll(); - List stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE); + final List stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE); assertThat(stateMsgs1.size()).isEqualTo(1); shared.attemptRollback(); - assertThat(stateMsgs1.size()).isEqualTo(1); //state message is still there even though source message were rolled back + assertThat(stateMsgs1.size()).isEqualTo(1); // state message is still there even though source message were + // rolled back } @@ -624,7 +642,7 @@ public void verifyEmptyMessage() throws Exception { connectTask.start(connectorConfigProps); - Message emptyMessage = getJmsContext().createMessage(); + final Message emptyMessage = getJmsContext().createMessage(); putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(emptyMessage)); final List kafkaMessages = connectTask.poll(); @@ -647,7 +665,7 @@ public void verifyEmptyTextMessage() throws Exception { connectTask.start(connectorConfigProps); - TextMessage emptyMessage = getJmsContext().createTextMessage(); + final TextMessage emptyMessage = getJmsContext().createTextMessage(); putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(emptyMessage)); final List kafkaMessages = connectTask.poll(); @@ -695,4 +713,590 @@ public void testJmsWorkerWithCustomReciveForConsumerAndCustomReconnectValues() t assertEquals(100L, shared.getReconnectDelayMillisMin()); assertEquals(10000L, shared.getReconnectDelayMillisMax()); } + + @Test + public void verifyErrorToleranceMessages() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // Both invalid and valid messages are received + final List testMessages = Arrays.asList( + getJmsContext().createTextMessage("Invalid JSON message"), // Poison message + getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + // All messages are processed, with poison message routed to DLQ + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(4); + assertThat(processedRecords.stream().filter(record -> record != null)).hasSize(3); + + final List nonNullProcesssedRecord = processedRecords.stream().filter(record -> record != null) + .collect(Collectors.toList()); + + for (int i = 0; i < 3; i++) { + final SourceRecord validRecord = nonNullProcesssedRecord.get(i); + assertThat(validRecord.topic()).isEqualTo("mytopic"); + assertThat(validRecord.valueSchema()).isNull(); + + final Map value = (Map) validRecord.value(); + assertThat(value.get("i")).isEqualTo(Long.valueOf(i)); + + connectTask.commitRecord(validRecord); + } + } + + @Test + public void shouldRoutePoisonMessagesToDeadLetterQueueWhenErrorToleranceIsAll() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // When: Both invalid and valid messages are received + final List testMessages = Arrays.asList( + getJmsContext().createTextMessage("Invalid JSON message"), // Poison message + getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(4); + + // Verify poison message goes to DLQ + final SourceRecord poisonRecord = processedRecords.get(0); + assertThat(poisonRecord.topic()).isEqualTo("__dlq.mq.source"); + assertThat(poisonRecord.valueSchema().type()).isEqualTo(Schema.Type.BYTES); + assertThat(poisonRecord.value()).isEqualTo("Invalid JSON message".getBytes(StandardCharsets.UTF_8)); + + // Verify valid messages are processed correctly + for (int i = 1; i < 4; i++) { + final SourceRecord validRecord = processedRecords.get(i); + assertThat(validRecord.topic()).isEqualTo("mytopic"); + assertThat(validRecord.valueSchema()).isNull(); + + final Map value = (Map) validRecord.value(); + assertThat(value.get("i")).isEqualTo(Long.valueOf(i - 1)); + + connectTask.commitRecord(validRecord); + } + } + + @Test + public void shouldFailWhenErrorToleranceIsNone() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "none"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // Both invalid and valid messages are received + final List testMessages = Arrays.asList( + getJmsContext().createTextMessage("Invalid JSON message"), // Poison message + getJmsContext().createTextMessage("{ \"i\": 0 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + // Processing should fail on the poison message + assertThatThrownBy(() -> connectTask.poll()) + .isInstanceOfAny(ConnectException.class, RuntimeException.class) + .hasMessageContaining("Converting byte[] to Kafka Connect data failed due to serialization error:"); + } + + @Test + public void shouldPreserveDlqHeadersWithErrorInformation() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // An invalid message is received + final TextMessage message = getJmsContext().createTextMessage("Invalid JSON message"); + message.setJMSMessageID("message_id"); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, + Collections.singletonList(message)); + + // The message should be routed to DLQ with error headers + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(1); + + final SourceRecord dlqRecord = processedRecords.get(0); + assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source"); + + // Verify error headers are present + final Headers headers = dlqRecord.headers(); + assertThat(headers.lastWithName("__connect.errors.topic").value()) + .isEqualTo("mytopic"); + assertThat(headers.lastWithName("__connect.errors.exception.class.name").value()) + .isEqualTo("org.apache.kafka.connect.errors.DataException"); + assertThat(headers.lastWithName("__connect.errors.exception.message").value()) + .isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: "); + assertThat(headers.lastWithName("__connect.errors.timestamp").value().toString() + .isEmpty()).isFalse(); + assertThat(headers.lastWithName("__connect.errors.cause.message").value().toString()) + .contains( + "com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')"); + assertThat(headers.lastWithName("__connect.errors.cause.class").value()) + .isEqualTo("org.apache.kafka.common.errors.SerializationException"); + assertThat(headers.lastWithName("__connect.errors.exception.stacktrace").value() + .toString().contains("com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord")).isTrue(); + assertEquals(headers.lastWithName("__connect.errors.jms.message.id").value(), message.getJMSMessageID()); + assertEquals(headers.lastWithName("__connect.errors.jms.timestamp").value(), message.getJMSTimestamp()); + assertEquals(headers.lastWithName("__connect.errors.mq.queue").value(), DEFAULT_SOURCE_QUEUE); + connectTask.commitRecord(dlqRecord); + } + + @Test + public void shouldHandleDifferentMessageTypesToDlq() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // When different types of invalid messages are received + final List testMessages = new ArrayList<>(); + // Text message with invalid JSON + testMessages.add(getJmsContext().createTextMessage("Invalid JSON message")); + // BytesMessage with invalid content + final BytesMessage bytesMsg = getJmsContext().createBytesMessage(); + bytesMsg.writeBytes("Invalid binary data".getBytes(StandardCharsets.UTF_8)); + testMessages.add(bytesMsg); + + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(2); + + for (final SourceRecord dlqRecord : processedRecords) { + assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source"); + assertThat(dlqRecord.valueSchema().type()).isEqualTo(Schema.Type.BYTES); + connectTask.commitRecord(dlqRecord); + } + } + + @Test + public void shouldPreserveJmsPropertiesInDlqMessages() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + final TextMessage invalidMessage = getJmsContext().createTextMessage("Invalid JSON message"); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Collections.singletonList(invalidMessage)); + + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(1); + + final SourceRecord dlqRecord = processedRecords.get(0); + assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source"); + + final Headers headers = dlqRecord.headers(); + assertThat(headers.lastWithName("__connect.errors.exception.message").value()) + .isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: "); + + connectTask.commitRecord(dlqRecord); + } + + @Test + public void shouldHandleMixOfValidAndInvalidMessagesWithDifferentFormats() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // When: Mix of valid and invalid messages received + final List testMessages = Arrays.asList( + getJmsContext().createTextMessage("Invalid JSON message"), // Invalid JSON + getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid JSON + getJmsContext().createTextMessage("{ malformed json"), // Malformed JSON + getJmsContext().createTextMessage("{ \"i\": 1, \"text\": \"valid\" }"), // Valid JSON + getJmsContext().createTextMessage("{}") // Valid but empty JSON + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(5); + + int validCount = 0; + int dlqCount = 0; + + for (final SourceRecord record : processedRecords) { + if (record.topic().equals("__dlq.mq.source")) { + dlqCount++; + assertThat(record.valueSchema().type()).isEqualTo(Schema.Type.BYTES); + } else { + validCount++; + assertThat(record.topic()).isEqualTo("mytopic"); + } + connectTask.commitRecord(record); + } + + assertThat(validCount).isEqualTo(3); + assertThat(dlqCount).isEqualTo(2); + } + + @Test + public void shouldContinueProcessingAfterUnhandleableDlqError() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // When: Multiple messages are received + final List testMessages = Arrays.asList( + getJmsContext().createTextMessage("First invalid message"), // Invalid message + getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message + getJmsContext().createTextMessage("Second invalid message"), // Invalid message + getJmsContext().createTextMessage("{ \"i\": 1 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + // Then: Processing should continue despite DLQ failure + final List processedRecords = connectTask.poll(); + + assertThat(processedRecords).hasSize(4); + + assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); + assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); + assertThat(processedRecords.get(2).topic()).isEqualTo("__dlq.mq.source"); + assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic"); + } + + @Test + public void verifyHeadersWithErrorTolerance() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true"); + + connectTask.start(connectorConfigProps); + + final TextMessage message = getJmsContext().createTextMessage("Invalid JSON message"); + message.setStringProperty("teststring", "myvalue"); + message.setIntProperty("volume", 11); + message.setDoubleProperty("decimalmeaning", 42.0); + + // Both invalid and valid messages are received + final List testMessages = Arrays.asList( + message, // Poison message + getJmsContext().createTextMessage("{ \"i\": 0 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + + assertThat(processedRecords).hasSize(2); + + assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); + + final Headers headers = processedRecords.get(0).headers(); + + // Actual headers + assertThat(headers.lastWithName("teststring").value()).isEqualTo("myvalue"); + assertThat(headers.lastWithName("volume").value()).isEqualTo("11"); + assertThat(headers.lastWithName("decimalmeaning").value()).isEqualTo("42.0"); + + // Expected DLQ Headers + // ConnectHeaders(headers=[ConnectHeader(key=__connect.errors.topic, + // value=mytopic, schema=Schema{STRING}), + // ConnectHeader(key=__connect.errors.class.name, + // value=org.apache.kafka.connect.errors.DataException, schema=Schema{STRING}), + // ConnectHeader(key=__connect.errors.exception.message, value=Converting byte[] + // to Kafka Connect data failed due to serialization error: , + // schema=Schema{STRING}), ConnectHeader(key=__connect.errors.timestamp, + // value=1749036171558, schema=Schema{STRING}), + // ConnectHeader(key=__connect.errors.cause.message, + // value=com.fasterxml.jackson.core.JsonParseException: Unrecognized token + // 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null', + // 'true' or 'false') + // at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` + // disabled); line: 1, column: 9], schema=Schema{STRING}), + // ConnectHeader(key=__connect.errors.cause.class, + // value=org.apache.kafka.common.errors.SerializationException, + // schema=Schema{STRING}), + // ConnectHeader(key=__connect.errors.exception.stacktrace, + // value=org.apache.kafka.connect.errors.DataException: Converting byte[] to + // Kafka Connect data failed due to serialization error: + // at + // org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:333) + // at + // com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder.getValue(JsonRecordBuilder.java:81) + // at + // com.ibm.eventstreams.connect.mqsource.builders.BaseRecordBuilder.toSourceRecord(BaseRecordBuilder.java:238) + // at com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord(JMSWork... + // [truncated], schema=Schema{STRING})]) + assertThat(headers.lastWithName("__connect.errors.topic").value()) + .isEqualTo("mytopic"); + assertThat(headers.lastWithName("__connect.errors.exception.class.name").value()) + .isEqualTo("org.apache.kafka.connect.errors.DataException"); + assertThat(headers.lastWithName("__connect.errors.exception.message").value()) + .isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: "); + assertThat(headers.lastWithName("__connect.errors.timestamp").value().toString() + .isEmpty()).isFalse(); + assertThat(headers.lastWithName("__connect.errors.cause.message").value().toString()) + .contains( + "com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')"); + assertThat(headers.lastWithName("__connect.errors.cause.class").value()) + .isEqualTo("org.apache.kafka.common.errors.SerializationException"); + assertThat(headers.lastWithName("__connect.errors.exception.stacktrace").value() + .toString().contains("com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord")).isTrue(); + } + + @Test + public void verifyHeadersWithErrorTolerance_WithDLQHeaderContextDisabled() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true"); + + connectTask.start(connectorConfigProps); + + final TextMessage message = getJmsContext().createTextMessage("Invalid JSON message"); + message.setStringProperty("teststring", "myvalue"); + message.setIntProperty("volume", 11); + message.setDoubleProperty("decimalmeaning", 42.0); + + // Both invalid and valid messages are received + final List testMessages = Arrays.asList( + message, // Poison message + getJmsContext().createTextMessage("{ \"i\": 0 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + + assertThat(processedRecords).hasSize(2); + + final SourceRecord dlqRecord = processedRecords.get(0); + assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source"); + + final Headers headers = dlqRecord.headers(); + + // Actual headers + assertThat(headers.lastWithName("teststring").value()).isEqualTo("myvalue"); + assertThat(headers.lastWithName("volume").value()).isEqualTo("11"); + assertThat(headers.lastWithName("decimalmeaning").value()).isEqualTo("42.0"); + + assertThat(headers.lastWithName("__connect.errors.topic")).isNull(); + assertThat(headers.lastWithName("__connect.errors.class.name")).isNull(); + assertThat(headers.lastWithName("__connect.errors.exception.message")).isNull(); + assertThat(headers.lastWithName("__connect.errors.timestamp")).isNull(); + assertThat(headers.lastWithName("__connect.errors.cause.message")).isNull(); + assertThat(headers.lastWithName("__connect.errors.cause.class")).isNull(); + assertThat(headers.lastWithName("__connect.errors.exception.stacktrace")).isNull(); + } + + @Test + public void verifyLoggingWarningWithErrorTolerance() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "false"); // default; Do not log errors + // default; Do not log errors with message + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "false"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // Both invalid and valid messages are received + final List testMessages = Arrays.asList( + getJmsContext().createTextMessage("Invalid JSON message"), // Poison message + getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + + assertThat(processedRecords).hasSize(4); + + assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); + assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); + assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic"); + assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic"); + } + + @Test + public void verifyLoggingErrorsWithErrorTolerance() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors enabled + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "false"); + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // Both invalid and valid messages are received + final List testMessages = Arrays.asList( + getJmsContext().createTextMessage("Invalid JSON message"), // Poison message + getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + + assertThat(processedRecords).hasSize(4); + + assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); + assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); + assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic"); + assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic"); + } + + @Test + public void verifyLoggingErrorsWithMessage() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); // Log errors with message + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + // Both invalid and valid messages are received + final List testMessages = Arrays.asList( + getJmsContext().createTextMessage("Invalid JSON message"), // Poison message + getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message + getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + + assertThat(processedRecords).hasSize(4); + + assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); + assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); + assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic"); + assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic"); + } + + @Test + public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); // Log errors with message + connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put("value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); + connectTask.start(connectorConfigProps); + + // Both invalid and valid messages are received + final List testMessages = Arrays.asList( + getJmsContext().createObjectMessage("Invalid message"), // Poison message + getJmsContext().createTextMessage("Text") // Valid + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + + assertThat(processedRecords).hasSize(2); + + assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); + assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); + } } diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java index fd8f055..ded71b4 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -27,6 +28,7 @@ import javax.jms.MapMessage; import javax.jms.TextMessage; +import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.source.SourceRecord; import org.junit.Test; @@ -102,4 +104,50 @@ public void buildFromJmsMapMessage() throws Exception { // verify the exception assertEquals("Unsupported JMS message type", exc.getMessage()); } + + @Test + public void buildFromJmsTestJsonError() throws Exception { + // create MQ message + final TextMessage message = getJmsContext().createTextMessage("Not a valid json string"); + + // use the builder to convert it to a Kafka record + final JsonRecordBuilder builder = new JsonRecordBuilder(); + final DataException exec = assertThrows(DataException.class, () -> builder.toSourceRecord(getJmsContext(), topic, isJMS, message)); + assertEquals("Converting byte[] to Kafka Connect data failed due to serialization error: ", exec.getMessage()); + } + + @Test + public void buildFromJmsTestErrorTolerance() throws Exception { + // create MQ message + final TextMessage message = getJmsContext().createTextMessage("Not a valid json string"); + + // use the builder to convert it to a Kafka record + final JsonRecordBuilder builder = new JsonRecordBuilder(); + final Map config = AbstractJMSContextIT.getDefaultConnectorProperties(); + config.put("errors.tolerance", "all"); + config.put("mq.message.body.jms", "true"); + config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + builder.configure(config); + final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message); + assertNull(record); + } + + @Test + public void buildFromJmsTestErrorToleranceNone() throws Exception { + // create MQ message + final TextMessage message = getJmsContext().createTextMessage("Not a valid json string"); + + // use the builder to convert it to a Kafka record + final JsonRecordBuilder builder = new JsonRecordBuilder(); + final HashMap config = new HashMap(); + config.put("errors.tolerance", "none"); + config.put("mq.message.body.jms", "true"); + config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + builder.configure(config); + assertThrows(DataException.class, () -> { + builder.toSourceRecord(getJmsContext(), topic, isJMS, message); + }); + } } diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java index 292863f..4d5b994 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java @@ -187,6 +187,22 @@ public class MQSourceConnector extends SourceConnector { public static final long CONFIG_RECONNECT_DELAY_MAX_DEFAULT = 8192L; public static final long CONFIG_RECONNECT_DELAY_MAX_MINIMUM = 10L; + public static final String DLQ_PREFIX = "errors.deadletterqueue."; + + public static final String DLQ_TOPIC_NAME_CONFIG = DLQ_PREFIX + "topic.name"; + public static final String DLQ_TOPIC_NAME_DOC = "The name of the topic to be used as the dead letter queue (DLQ) for messages that " + + "result in an error when processed by this source connector, or its transformations or converters. The topic name is blank by default, " + + "which means that no messages are to be recorded in the DLQ."; + public static final String DLQ_TOPIC_DEFAULT = ""; + private static final String DLQ_TOPIC_DISPLAY = "Dead Letter Queue Topic Name"; + + public static final String DLQ_CONTEXT_HEADERS_ENABLE_CONFIG = DLQ_PREFIX + "context.headers.enable"; + public static final boolean DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT = false; + public static final String DLQ_CONTEXT_HEADERS_ENABLE_DOC = "If true, add headers containing error context to the messages " + + "written to the dead letter queue. To avoid clashing with headers from the original record, all error context header " + + "keys, all error context header keys will start with __connect.errors."; + private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable Error Context Headers"; + // Define valid reconnect options public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = { CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF, @@ -616,6 +632,22 @@ null, new ReadableFile(), CONFIG_GROUP_MQ, 29, Width.MEDIUM, CONFIG_DISPLAY_RECONNECT_DELAY_MAX); + CONFIGDEF.define(DLQ_TOPIC_NAME_CONFIG, + Type.STRING, + DLQ_TOPIC_DEFAULT, + Importance.MEDIUM, + DLQ_TOPIC_NAME_DOC, + CONFIG_GROUP_MQ, 30, + Width.MEDIUM, + DLQ_TOPIC_DISPLAY); + CONFIGDEF.define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, + Type.BOOLEAN, + DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, + Importance.MEDIUM, + DLQ_CONTEXT_HEADERS_ENABLE_DOC, + CONFIG_GROUP_MQ, 31, + Width.MEDIUM, + DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY); CONFIGDEF.define(CONFIG_NAME_TOPIC, Type.STRING, diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java index 8126e48..9a876d5 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java @@ -146,6 +146,7 @@ public void start(final Map props) { final JMSWorker reader = new JMSWorker(); JMSWorker dedicated = null; SequenceStateClient client = null; + if (MQSourceConnector.configSupportsExactlyOnce(props)) { dedicated = new JMSWorker(); client = new SequenceStateClient(props.get(CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE), reader, dedicated); diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java index 1dfd0d8..4d18a6f 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java @@ -15,20 +15,22 @@ */ package com.ibm.eventstreams.connect.mqsource.builders; -import com.ibm.eventstreams.connect.mqsource.MQSourceConnector; -import com.ibm.eventstreams.connect.mqsource.processor.JmsToKafkaHeaderConverter; +import java.util.Map; +import java.util.Optional; + +import javax.jms.JMSContext; +import javax.jms.JMSException; +import javax.jms.Message; + import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.source.SourceRecord; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.JMSContext; -import javax.jms.JMSException; -import javax.jms.Message; -import java.util.Map; -import java.util.Optional; +import com.ibm.eventstreams.connect.mqsource.MQSourceConnector; +import com.ibm.eventstreams.connect.mqsource.processor.JmsToKafkaHeaderConverter; +import com.ibm.eventstreams.connect.mqsource.util.ErrorHandler; /** * Builds Kafka Connect SourceRecords from messages. @@ -36,24 +38,39 @@ public abstract class BaseRecordBuilder implements RecordBuilder { private static final Logger log = LoggerFactory.getLogger(BaseRecordBuilder.class); - public enum KeyHeader { NONE, MESSAGE_ID, CORRELATION_ID, CORRELATION_ID_AS_BYTES, DESTINATION }; - protected KeyHeader keyheader = KeyHeader.NONE; + public enum KeyHeader { + NONE, MESSAGE_ID, CORRELATION_ID, CORRELATION_ID_AS_BYTES, DESTINATION + }; + protected KeyHeader keyheader = KeyHeader.NONE; private boolean copyJmsPropertiesFlag = Boolean.FALSE; private JmsToKafkaHeaderConverter jmsToKafkaHeaderConverter; + private ErrorHandler errorHandler = new ErrorHandler(); /** * Configure this class. * * @param props initial configuration * - * @throws RecordBuilderException Operation failed and connector should stop. + * @throws RecordBuilderException Operation failed and connector should stop. */ - @Override public void configure(final Map props) { + @Override + public void configure(final Map props) { log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(), props); + configureKeyHeader(props); + configureJmsProperties(props); + configureErrorHandler(props); + + log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName()); + } + + /** + * Configure key header settings. + */ + private void configureKeyHeader(final Map props) { final String kh = props.get(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER); if (kh != null) { if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSMESSAGEID)) { @@ -73,12 +90,22 @@ public enum KeyHeader { NONE, MESSAGE_ID, CORRELATION_ID, CORRELATION_ID_AS_BYTE throw new RecordBuilderException("Unsupported MQ record builder key header value"); } } + } + /** + * Configure JMS properties settings. + */ + private void configureJmsProperties(final Map props) { final String str = props.get(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER); copyJmsPropertiesFlag = Boolean.parseBoolean(Optional.ofNullable(str).orElse("false")); jmsToKafkaHeaderConverter = new JmsToKafkaHeaderConverter(); + } - log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName()); + /** + * Configure error handler. + */ + public void configureErrorHandler(final Map props) { + errorHandler.configure(props, copyJmsPropertiesFlag, jmsToKafkaHeaderConverter); } /** @@ -160,38 +187,58 @@ public abstract SchemaAndValue getValue(JMSContext context, String topic, boolea * @throws JMSException Message could not be converted */ @Override - public SourceRecord toSourceRecord(final JMSContext context, final String topic, final boolean messageBodyJms, final Message message) throws JMSException { + public SourceRecord toSourceRecord(final JMSContext context, final String topic, final boolean messageBodyJms, + final Message message) throws JMSException { return toSourceRecord(context, topic, messageBodyJms, message, null, null); } @Override - public SourceRecord toSourceRecord(final JMSContext context, final String topic, final boolean messageBodyJms, final Message message, final Map sourceOffset, final Map sourceQueuePartition) throws JMSException { - final SchemaAndValue key = this.getKey(context, topic, message); - final SchemaAndValue value = this.getValue(context, topic, messageBodyJms, message); - - if (copyJmsPropertiesFlag && messageBodyJms) { - return new SourceRecord( - sourceQueuePartition, - sourceOffset, - topic, - null, - key.schema(), - key.value(), - value.schema(), - value.value(), - message.getJMSTimestamp(), - jmsToKafkaHeaderConverter.convertJmsPropertiesToKafkaHeaders(message) - ); - } else { - return new SourceRecord( - sourceQueuePartition, - sourceOffset, - topic, - key.schema(), - key.value(), - value.schema(), - value.value() - ); + public SourceRecord toSourceRecord(final JMSContext context, final String topic, final boolean messageBodyJms, + final Message message, final Map sourceOffset, final Map sourceQueuePartition) + throws JMSException { + + SchemaAndValue key = new SchemaAndValue(null, null); + + try { + // Extract key and value + final SchemaAndValue value = this.getValue(context, topic, messageBodyJms, message); + key = this.getKey(context, topic, message); + + // Create and return appropriate record based on configuration + if (copyJmsPropertiesFlag && messageBodyJms) { + return new SourceRecord( + sourceQueuePartition, + sourceOffset, + topic, + null, + key.schema(), + key.value(), + value.schema(), + value.value(), + message.getJMSTimestamp(), + jmsToKafkaHeaderConverter.convertJmsPropertiesToKafkaHeaders(message)); + } else { + return new SourceRecord( + sourceQueuePartition, + sourceOffset, + topic, + null, + key.schema(), + key.value(), + value.schema(), + value.value()); + } + } catch (final Exception e) { + // Log the error using error handler + errorHandler.logError(e, topic, message); + + // If errors are not tolerated, rethrow + if (!errorHandler.shouldTolerateErrors()) { + throw e; + } + + // Handle the error based on configured error tolerance + return errorHandler.handleBuildException(message, sourceQueuePartition, sourceOffset, topic, key, e); } } } \ No newline at end of file diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java index 5ff72ac..cf25d03 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java @@ -26,7 +26,6 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.json.JsonConverter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/util/ErrorHandler.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/util/ErrorHandler.java new file mode 100644 index 0000000..49a096f --- /dev/null +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/util/ErrorHandler.java @@ -0,0 +1,362 @@ +/** + * Copyright 2025 IBM Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.eventstreams.connect.mqsource.util; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.TextMessage; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.errors.ToleranceType; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.ibm.eventstreams.connect.mqsource.MQSourceConnector; +import com.ibm.eventstreams.connect.mqsource.processor.JmsToKafkaHeaderConverter; + +/** + * Handles error processing and Dead Letter Queue (DLQ) functionality for MQ + * Source Connector. + */ +public class ErrorHandler { + private static final Logger log = LoggerFactory.getLogger(ErrorHandler.class); + + public static final String HEADER_PREFIX = "__connect.errors."; + public static final String ERROR_HEADER_ORIG_TOPIC = HEADER_PREFIX + "topic"; + public static final String ERROR_HEADER_EXCEPTION = HEADER_PREFIX + "exception.class.name"; + public static final String ERROR_HEADER_EXCEPTION_MESSAGE = HEADER_PREFIX + "exception.message"; + public static final String ERROR_HEADER_EXCEPTION_STACK_TRACE = HEADER_PREFIX + "exception.stacktrace"; + public static final String ERROR_HEADER_EXCEPTION_TIMESTAMP = HEADER_PREFIX + "timestamp"; + public static final String ERROR_HEADER_EXCEPTION_CAUSE_CLASS = HEADER_PREFIX + "cause.class"; + public static final String ERROR_HEADER_EXCEPTION_CAUSE_MESSAGE = HEADER_PREFIX + "cause.message"; + public static final String ERROR_HEADER_JMS_MESSAGE_ID = HEADER_PREFIX + "jms.message.id"; + public static final String ERROR_HEADER_JMS_TIMESTAMP = HEADER_PREFIX + "jms.timestamp"; + public static final String ERROR_HEADER_QUEUE = HEADER_PREFIX + "mq.queue"; + + private boolean tolerateErrors; + private boolean logErrors; + private boolean logIncludeMessages; + private String dlqTopic; + private String queueName; + private boolean copyJmsPropertiesFlag; + private JmsToKafkaHeaderConverter jmsToKafkaHeaderConverter; + private boolean enableDLQContextHeader; + + /** + * Configure the error handler with the provided properties. + * + * @param props Configuration properties + * @param copyJmsPropertiesFlag Whether to copy JMS properties to Kafka + * headers + * @param jmsToKafkaHeaderConverter Converter for JMS properties to Kafka + * headers + */ + public void configure(final Map props, final boolean copyJmsPropertiesFlag, + final JmsToKafkaHeaderConverter jmsToKafkaHeaderConverter) { + log.trace("[{}] Entry {}.configure", Thread.currentThread().getId(), this.getClass().getName()); + + this.copyJmsPropertiesFlag = copyJmsPropertiesFlag; + this.jmsToKafkaHeaderConverter = jmsToKafkaHeaderConverter; + + initializeErrorTolerance(props); + + log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName()); + } + + /** + * Initializes error tolerance configuration by reading directly from properties + * map. + */ + private void initializeErrorTolerance(final Map props) { + // Read tolerateErrors directly from props + final String errorToleranceValue = props.getOrDefault( + ConnectorConfig.ERRORS_TOLERANCE_CONFIG, + ToleranceType.NONE.toString()).toUpperCase(Locale.ROOT); + + tolerateErrors = ToleranceType.valueOf(errorToleranceValue).equals(ToleranceType.ALL); + + // Read logErrors directly from props + if (tolerateErrors) { + final String logErrorsValue = props.getOrDefault( + ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, + String.valueOf(ConnectorConfig.ERRORS_LOG_ENABLE_DEFAULT)); + logErrors = Boolean.parseBoolean(logErrorsValue); + + final String logIncludeMessagesValue = props.getOrDefault( + ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, + String.valueOf(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_DEFAULT)); + logIncludeMessages = Boolean.parseBoolean(logIncludeMessagesValue); + + final String enableDLQContextHeaderValue = props.getOrDefault( + MQSourceConnector.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, + String.valueOf(MQSourceConnector.DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT)); + enableDLQContextHeader = Boolean.parseBoolean(enableDLQContextHeaderValue); + + dlqTopic = props.get(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG); + if (dlqTopic != null && !dlqTopic.isEmpty()) { + dlqTopic = dlqTopic.trim(); + } + + queueName = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE); + if (queueName != null && !queueName.isEmpty()) { + queueName = queueName.trim(); + } + } else { + logErrors = false; + logIncludeMessages = false; + } + } + + /** + * Checks if errors should be tolerated based on configuration. + * + * @return true if errors should be tolerated, false otherwise + */ + public boolean shouldTolerateErrors() { + return tolerateErrors; + } + + /** + * Logs error based on configuration settings. + * + * @param exception The exception that needs to be logged + * @param topic The Kafka topic associated with the message + * @param message The JMS message that caused the error + */ + public void logError(final Exception exception, final String topic, final Message message) { + if (logErrors) { + if (logIncludeMessages) { + log.error("Failed to process message on topic '{}'. Message content: {}. \nException: {}", + topic, message, exception.toString(), exception); + } else { + log.error("Failed to process message on topic '{}'. \nException: {}", + topic, exception.toString(), exception); + } + } else { + log.warn("Error during message processing on topic '{}', but logging is suppressed. \nReason: {}", + topic, extractReason(exception)); + } + } + + /** + * Handles conversion errors based on configuration. + * + * @param message The actual MQ message + * @param sourceQueuePartition The Source Record queue partition + * @param sourceOffset The Source Record offset + * @param topic The original topic name + * @param key The SchemaAndValue to include in the source + * record key + * @param exception The exception that needs to be stored in the + * header + * @return SourceRecord for DLQ or null if message should be skipped + */ + public SourceRecord handleBuildException(final Message message, final Map sourceQueuePartition, + final Map sourceOffset, final String topic, + final SchemaAndValue key, final Exception exception) { + + // If errors are tolerated but no DLQ is configured, skip the message + if (dlqTopic == null || dlqTopic.isEmpty()) { + log.debug( + "Skipping message due to conversion error: error tolerance is enabled but DLQ is not configured. Message will not be processed further."); + return null; + } + + // Create DLQ record + return createDlqRecord(message, sourceQueuePartition, sourceOffset, topic, key, exception); + } + + /** + * Creates a DLQ record with error information. + * + * @param message The actual MQ message + * @param sourceQueuePartition The Source Record queue partition + * @param sourceOffset The Source Record offset + * @param originalTopic The original topic name + * @param key The SchemaAndValue to include in the source + * record key + * @param exception The exception that needs to be stored in the + * header + * @return SourceRecord for DLQ or null if creation fails + */ + private SourceRecord createDlqRecord(final Message message, final Map sourceQueuePartition, + final Map sourceOffset, final String originalTopic, + final SchemaAndValue key, final Exception exception) { + + try { + // Extract payload or return null if extraction fails + final Optional maybePayload = extractPayload(message); + if (!maybePayload.isPresent()) { + log.error("Skipping message due to payload extraction failure"); + return null; + } + + final byte[] payload = maybePayload.get(); + + // Create headers with error information, if DLQ context header config is enabled + final Headers headers = createErrorHeaders(message, originalTopic, exception); + + return new SourceRecord( + sourceQueuePartition, + sourceOffset, + dlqTopic, + null, + key.schema(), + key.value(), + Schema.OPTIONAL_BYTES_SCHEMA, + payload, + message.getJMSTimestamp(), + headers); + } catch (final Exception dlqException) { + // If DLQ processing itself fails, log and skip + log.error("Failed to create DLQ record: {}", dlqException.getMessage(), dlqException); + return null; + } + } + + /** + * Extracts payload from a JMS message with improved error handling. + * + * @param message The actual message coming from MQ + * @return Optional containing the payload or empty if extraction fails + */ + private Optional extractPayload(final Message message) { + try { + if (message instanceof BytesMessage) { + log.debug("Extracting payload from BytesMessage for DLQ"); + return Optional.ofNullable(message.getBody(byte[].class)); + } else if (message instanceof TextMessage) { + log.debug("Extracting payload from TextMessage for DLQ"); + final String text = message.getBody(String.class); + return Optional.ofNullable(text != null ? text.getBytes(UTF_8) : null); + } else { + log.warn( + "Unsupported JMS message type '{}' encountered while extracting payload for DLQ. Falling back to message.toString().", + message.getClass().getName()); + return Optional.ofNullable(message.toString().getBytes(UTF_8)); + } + } catch (final JMSException e) { + log.error( + "JMSException while extracting payload from message type '{}': {} for DLQ. Falling back to message.toString().", + message.getClass().getName(), e.getMessage(), e); + return Optional.ofNullable(message.toString().getBytes(UTF_8)); + } + } + + /** + * Creates enhanced headers with error information for DLQ records. + * + * @param message The original message + * @param originalTopic The original topic name + * @param exception The exception that needs to be included in the header + * @return Headers containing error information + */ + private Headers createErrorHeaders(final Message message, final String originalTopic, final Exception exception) { + Headers headers = new ConnectHeaders(); + if (copyJmsPropertiesFlag && jmsToKafkaHeaderConverter != null) { + headers = jmsToKafkaHeaderConverter.convertJmsPropertiesToKafkaHeaders(message); + } + + // if DLQ context header is not enabled continue with JMS propery headers + if (!enableDLQContextHeader) { + return headers; + } + + // Basic error information + headers.addString(ERROR_HEADER_ORIG_TOPIC, originalTopic); + headers.addString(ERROR_HEADER_EXCEPTION, exception.getClass().getName()); + headers.addString(ERROR_HEADER_EXCEPTION_MESSAGE, exception.getMessage()); + + try { + headers.addString(ERROR_HEADER_JMS_MESSAGE_ID, message.getJMSMessageID()); + headers.addLong(ERROR_HEADER_JMS_TIMESTAMP, message.getJMSTimestamp()); + } catch (final JMSException jmsException) { + log.warn("Failed to extract JMS message ID or timestamp for DLQ headers", jmsException); + } + + headers.addString(ERROR_HEADER_QUEUE, queueName != null ? queueName : ""); + headers.addLong(ERROR_HEADER_EXCEPTION_TIMESTAMP, System.currentTimeMillis()); + + // Add cause if available + if (exception.getCause() != null) { + headers.addString(ERROR_HEADER_EXCEPTION_CAUSE_CLASS, exception.getCause().getClass().getName()); + headers.addString(ERROR_HEADER_EXCEPTION_CAUSE_MESSAGE, exception.getCause().getMessage()); + } + + // Add first few lines of stack trace (full stack trace might be too large) + final String stackTrace = getStackTrace(exception); + if (stackTrace != null) { + headers.addString(ERROR_HEADER_EXCEPTION_STACK_TRACE, stackTrace); + } + + return headers; + } + + /** + * Extracts and truncates stack trace from exception. + * + * @param exception The exception to extract stack trace from + * @return Truncated stack trace string or null if extraction fails + */ + private String getStackTrace(final Exception exception) { + try { + final StringWriter sw = new StringWriter(); + final PrintWriter pw = new PrintWriter(sw); + exception.printStackTrace(pw); + final String stackTrace = sw.toString(); + + // First 500 characters or less to avoid overly large headers + return stackTrace.length() <= 500 ? stackTrace + : stackTrace.substring(0, 500) + "... [truncated]"; + } catch (final Exception e) { + log.warn("Could not extract stack trace for DLQ headers", e); + return null; + } + } + + /** + * Extracts a clean reason message from an exception. + * + * @param exception The exception to extract reason from + * @return Clean reason string + */ + private String extractReason(final Exception exception) { + if (exception == null) { + return "Unknown error"; + } + + final String message = exception.getMessage(); + if (message == null || message.trim().isEmpty()) { + return "Unknown error"; + } + + // Clean up trailing punctuation or whitespace (e.g., "error:" → "error") + return message.replaceAll("[:\\s]+$", ""); + } +} \ No newline at end of file