Skip to content

Commit 6616083

Browse files
committed
feat: Handle jms values in error headers
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent 7336ce4 commit 6616083

File tree

3 files changed

+81
-22
lines changed

3 files changed

+81
-22
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -760,7 +760,7 @@ public void shouldRoutePoisonMessagesToDeadLetterQueueWhenErrorToleranceIsAll()
760760
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
761761
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
762762
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
763-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
763+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
764764
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
765765
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
766766
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -805,7 +805,7 @@ public void shouldFailWhenErrorToleranceIsNone() throws Exception {
805805
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
806806
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
807807
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "none");
808-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
808+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
809809
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
810810
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
811811
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -832,16 +832,18 @@ public void shouldPreserveDlqHeadersWithErrorInformation() throws Exception {
832832
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
833833
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
834834
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
835-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
835+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
836836
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
837837
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
838838
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
839839

840840
connectTask.start(connectorConfigProps);
841841

842842
// An invalid message is received
843+
final TextMessage message = getJmsContext().createTextMessage("Invalid JSON message");
844+
message.setJMSMessageID("message_id");
843845
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE,
844-
Collections.singletonList(getJmsContext().createTextMessage("Invalid JSON message")));
846+
Collections.singletonList(message));
845847

846848
// The message should be routed to DLQ with error headers
847849
final List<SourceRecord> processedRecords = connectTask.poll();
@@ -867,7 +869,9 @@ public void shouldPreserveDlqHeadersWithErrorInformation() throws Exception {
867869
.isEqualTo("org.apache.kafka.common.errors.SerializationException");
868870
assertThat(headers.lastWithName("__connect.errors.exception.stacktrace").value()
869871
.toString().contains("com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord")).isTrue();
870-
872+
assertEquals(headers.lastWithName("__connect.errors.jms.message.id").value(), message.getJMSMessageID());
873+
assertEquals(headers.lastWithName("__connect.errors.jms.timestamp").value(), message.getJMSTimestamp());
874+
assertEquals(headers.lastWithName("__connect.errors.mq.queue").value(), DEFAULT_SOURCE_QUEUE);
871875
connectTask.commitRecord(dlqRecord);
872876
}
873877

@@ -878,7 +882,7 @@ public void shouldHandleDifferentMessageTypesToDlq() throws Exception {
878882
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
879883
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
880884
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
881-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
885+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
882886
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
883887
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
884888
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -913,7 +917,7 @@ public void shouldPreserveJmsPropertiesInDlqMessages() throws Exception {
913917
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
914918
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
915919
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
916-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
920+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
917921
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
918922
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true");
919923
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
@@ -944,7 +948,7 @@ public void shouldHandleMixOfValidAndInvalidMessagesWithDifferentFormats() throw
944948
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
945949
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
946950
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
947-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
951+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
948952
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
949953
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
950954
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -989,7 +993,7 @@ public void shouldContinueProcessingAfterUnhandleableDlqError() throws Exception
989993
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
990994
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
991995
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
992-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
996+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
993997
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
994998
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
995999
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -1023,7 +1027,7 @@ public void verifyHeadersWithErrorTolerance() throws Exception {
10231027
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
10241028
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
10251029
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
1026-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1030+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
10271031
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
10281032
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
10291033
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -1112,7 +1116,7 @@ public void verifyLoggingWarningWithErrorTolerance() throws Exception {
11121116
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "false"); // default; Do not log errors
11131117
// default; Do not log errors with message
11141118
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "false");
1115-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1119+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
11161120
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
11171121
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
11181122
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -1147,7 +1151,7 @@ public void verifyLoggingErrorsWithErrorTolerance() throws Exception {
11471151
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
11481152
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors enabled
11491153
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "false");
1150-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1154+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
11511155
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
11521156
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
11531157
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -1182,7 +1186,7 @@ public void verifyLoggingErrorsWithMessage() throws Exception {
11821186
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
11831187
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors
11841188
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); // Log errors with message
1185-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1189+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
11861190
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
11871191
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
11881192
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -1217,7 +1221,7 @@ public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Ex
12171221
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
12181222
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors
12191223
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); // Log errors with message
1220-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1224+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
12211225
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
12221226
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
12231227
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -181,13 +181,26 @@ public class MQSourceConnector extends SourceConnector {
181181
public static final long CONFIG_RECONNECT_DELAY_MAX_DEFAULT = 8192L;
182182
public static final long CONFIG_RECONNECT_DELAY_MAX_MINIMUM = 10L;
183183

184-
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG = "errors.deadletterqueue.topic.name";
185-
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DISPLAY = "Output errors to the dead letter queue";
186-
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DEFAULT = "";
187-
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC = "Whether to output conversion errors to the dead letter queue. "
188-
+ "Stops poison messages when using schemas, any message will be outputted as extended json on the specified topic. "
189-
+ "By default messages are not outputted to the dead letter queue. "
190-
+ "Also requires `errors.tolerance=all`.";
184+
public static final String DLQ_PREFIX = "errors.deadletterqueue.";
185+
186+
public static final String DLQ_TOPIC_NAME_CONFIG = DLQ_PREFIX + "topic.name";
187+
public static final String DLQ_TOPIC_NAME_DOC = "The name of the topic to be used as the dead letter queue (DLQ) for messages that " +
188+
"result in an error when processed by this source connector, or its transformations or converters. The topic name is blank by default, " +
189+
"which means that no messages are to be recorded in the DLQ.";
190+
public static final String DLQ_TOPIC_DEFAULT = "";
191+
private static final String DLQ_TOPIC_DISPLAY = "Dead Letter Queue Topic Name";
192+
193+
public static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG = DLQ_PREFIX + "topic.replication.factor";
194+
private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC = "Replication factor used to create the dead letter queue topic when it doesn't already exist.";
195+
public static final short DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT = 3;
196+
private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY = "Dead Letter Queue Topic Replication Factor";
197+
198+
public static final String DLQ_CONTEXT_HEADERS_ENABLE_CONFIG = DLQ_PREFIX + "context.headers.enable";
199+
public static final boolean DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT = false;
200+
public static final String DLQ_CONTEXT_HEADERS_ENABLE_DOC = "If true, add headers containing error context to the messages " +
201+
"written to the dead letter queue. To avoid clashing with headers from the original record, all error context header " +
202+
"keys, all error context header keys will start with <code>__connect.errors.</code>";
203+
private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable Error Context Headers";
191204

192205
// Define valid reconnect options
193206
public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = {
@@ -607,6 +620,30 @@ null, new ReadableFile(),
607620
CONFIG_GROUP_MQ, 28,
608621
Width.MEDIUM,
609622
CONFIG_DISPLAY_RECONNECT_DELAY_MAX);
623+
CONFIGDEF.define(DLQ_TOPIC_NAME_CONFIG,
624+
Type.STRING,
625+
DLQ_TOPIC_DEFAULT,
626+
Importance.MEDIUM,
627+
DLQ_TOPIC_NAME_DOC,
628+
CONFIG_GROUP_MQ, 29,
629+
Width.MEDIUM,
630+
DLQ_TOPIC_DISPLAY);
631+
CONFIGDEF.define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG,
632+
Type.SHORT,
633+
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT,
634+
Importance.MEDIUM,
635+
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC,
636+
CONFIG_GROUP_MQ, 30,
637+
Width.MEDIUM,
638+
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY);
639+
CONFIGDEF.define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG,
640+
Type.BOOLEAN,
641+
DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT,
642+
Importance.MEDIUM,
643+
DLQ_CONTEXT_HEADERS_ENABLE_DOC,
644+
CONFIG_GROUP_MQ, 31,
645+
Width.MEDIUM,
646+
DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
610647

611648
CONFIGDEF.define(CONFIG_NAME_TOPIC,
612649
Type.STRING,

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,14 @@ public enum KeyHeader {
6161
private boolean logErrors;
6262
private boolean logIncludeMessages;
6363
private String dlqTopic = "";
64+
private String queueName = "";
6465

6566
public static final String ERROR_HEADER_EXCEPTION_TIMESTAMP = DeadLetterQueueReporter.HEADER_PREFIX + "timestamp";
6667
public static final String ERROR_HEADER_EXCEPTION_CAUSE_CLASS = DeadLetterQueueReporter.HEADER_PREFIX + "cause.class";
6768
public static final String ERROR_HEADER_EXCEPTION_CAUSE_MESSAGE = DeadLetterQueueReporter.HEADER_PREFIX + "cause.message";
69+
public static final String ERROR_HEADER_JMS_MESSAGE_ID = DeadLetterQueueReporter.HEADER_PREFIX + "jms.message.id";
70+
public static final String ERROR_HEADER_JMS_TIMESTAMP = DeadLetterQueueReporter.HEADER_PREFIX + "jms.timestamp";
71+
public static final String ERROR_HEADER_QUEUE = DeadLetterQueueReporter.HEADER_PREFIX + "mq.queue";
6872

6973
/**
7074
* Configure this class.
@@ -131,10 +135,15 @@ private void initializeErrorTolerance(final Map<String, String> props) {
131135
String.valueOf(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_DEFAULT));
132136
logIncludeMessages = Boolean.parseBoolean(logIncludeMessagesValue);
133137

134-
dlqTopic = props.get(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG);
138+
dlqTopic = props.get(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG);
135139
if (dlqTopic != null && !dlqTopic.isEmpty()) {
136140
dlqTopic = dlqTopic.trim();
137141
}
142+
143+
queueName = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE);
144+
if (queueName != null && !queueName.isEmpty()) {
145+
queueName = queueName.trim();
146+
}
138147
} else {
139148
logErrors = false;
140149
logIncludeMessages = false;
@@ -439,6 +448,15 @@ private Headers createErrorHeaders(final Message message, final String originalT
439448
headers.addString(DeadLetterQueueReporter.ERROR_HEADER_ORIG_TOPIC, originalTopic);
440449
headers.addString(DeadLetterQueueReporter.ERROR_HEADER_EXECUTING_CLASS, exception.getClass().getName());
441450
headers.addString(DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_MESSAGE, exception.getMessage());
451+
452+
try {
453+
headers.addString(ERROR_HEADER_JMS_MESSAGE_ID, message.getJMSMessageID());
454+
headers.addLong(ERROR_HEADER_JMS_TIMESTAMP, message.getJMSTimestamp());
455+
} catch (final JMSException jmsException) {
456+
jmsException.printStackTrace();
457+
}
458+
459+
headers.addString(ERROR_HEADER_QUEUE, queueName);
442460
headers.addLong(ERROR_HEADER_EXCEPTION_TIMESTAMP, System.currentTimeMillis());
443461

444462
// Add cause if available

0 commit comments

Comments
 (0)