Skip to content

Commit 0c0ee51

Browse files
committed
feat: Handle jms values in error headers
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent 4d01f2b commit 0c0ee51

File tree

3 files changed

+81
-22
lines changed

3 files changed

+81
-22
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -763,7 +763,7 @@ public void shouldRoutePoisonMessagesToDeadLetterQueueWhenErrorToleranceIsAll()
763763
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
764764
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
765765
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
766-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
766+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
767767
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
768768
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
769769
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -808,7 +808,7 @@ public void shouldFailWhenErrorToleranceIsNone() throws Exception {
808808
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
809809
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
810810
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "none");
811-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
811+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
812812
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
813813
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
814814
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -835,16 +835,18 @@ public void shouldPreserveDlqHeadersWithErrorInformation() throws Exception {
835835
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
836836
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
837837
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
838-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
838+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
839839
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
840840
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
841841
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
842842

843843
connectTask.start(connectorConfigProps);
844844

845845
// An invalid message is received
846+
final TextMessage message = getJmsContext().createTextMessage("Invalid JSON message");
847+
message.setJMSMessageID("message_id");
846848
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE,
847-
Collections.singletonList(getJmsContext().createTextMessage("Invalid JSON message")));
849+
Collections.singletonList(message));
848850

849851
// The message should be routed to DLQ with error headers
850852
final List<SourceRecord> processedRecords = connectTask.poll();
@@ -870,7 +872,9 @@ public void shouldPreserveDlqHeadersWithErrorInformation() throws Exception {
870872
.isEqualTo("org.apache.kafka.common.errors.SerializationException");
871873
assertThat(headers.lastWithName("__connect.errors.exception.stacktrace").value()
872874
.toString().contains("com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord")).isTrue();
873-
875+
assertEquals(headers.lastWithName("__connect.errors.jms.message.id").value(), message.getJMSMessageID());
876+
assertEquals(headers.lastWithName("__connect.errors.jms.timestamp").value(), message.getJMSTimestamp());
877+
assertEquals(headers.lastWithName("__connect.errors.mq.queue").value(), DEFAULT_SOURCE_QUEUE);
874878
connectTask.commitRecord(dlqRecord);
875879
}
876880

@@ -881,7 +885,7 @@ public void shouldHandleDifferentMessageTypesToDlq() throws Exception {
881885
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
882886
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
883887
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
884-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
888+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
885889
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
886890
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
887891
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -916,7 +920,7 @@ public void shouldPreserveJmsPropertiesInDlqMessages() throws Exception {
916920
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
917921
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
918922
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
919-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
923+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
920924
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
921925
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true");
922926
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
@@ -947,7 +951,7 @@ public void shouldHandleMixOfValidAndInvalidMessagesWithDifferentFormats() throw
947951
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
948952
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
949953
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
950-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
954+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
951955
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
952956
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
953957
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -992,7 +996,7 @@ public void shouldContinueProcessingAfterUnhandleableDlqError() throws Exception
992996
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
993997
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
994998
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
995-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
999+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
9961000
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
9971001
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
9981002
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -1026,7 +1030,7 @@ public void verifyHeadersWithErrorTolerance() throws Exception {
10261030
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
10271031
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
10281032
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
1029-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1033+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
10301034
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
10311035
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
10321036
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -1115,7 +1119,7 @@ public void verifyLoggingWarningWithErrorTolerance() throws Exception {
11151119
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "false"); // default; Do not log errors
11161120
// default; Do not log errors with message
11171121
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "false");
1118-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1122+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
11191123
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
11201124
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
11211125
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -1150,7 +1154,7 @@ public void verifyLoggingErrorsWithErrorTolerance() throws Exception {
11501154
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
11511155
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors enabled
11521156
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "false");
1153-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1157+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
11541158
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
11551159
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
11561160
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -1185,7 +1189,7 @@ public void verifyLoggingErrorsWithMessage() throws Exception {
11851189
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
11861190
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors
11871191
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); // Log errors with message
1188-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1192+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
11891193
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
11901194
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
11911195
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -1220,7 +1224,7 @@ public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Ex
12201224
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
12211225
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors
12221226
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); // Log errors with message
1223-
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1227+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
12241228
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
12251229
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
12261230
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -187,13 +187,26 @@ public class MQSourceConnector extends SourceConnector {
187187
public static final long CONFIG_RECONNECT_DELAY_MAX_DEFAULT = 8192L;
188188
public static final long CONFIG_RECONNECT_DELAY_MAX_MINIMUM = 10L;
189189

190-
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG = "errors.deadletterqueue.topic.name";
191-
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DISPLAY = "Output errors to the dead letter queue";
192-
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DEFAULT = "";
193-
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC = "Whether to output conversion errors to the dead letter queue. "
194-
+ "Stops poison messages when using schemas, any message will be outputted as extended json on the specified topic. "
195-
+ "By default messages are not outputted to the dead letter queue. "
196-
+ "Also requires `errors.tolerance=all`.";
190+
public static final String DLQ_PREFIX = "errors.deadletterqueue.";
191+
192+
public static final String DLQ_TOPIC_NAME_CONFIG = DLQ_PREFIX + "topic.name";
193+
public static final String DLQ_TOPIC_NAME_DOC = "The name of the topic to be used as the dead letter queue (DLQ) for messages that " +
194+
"result in an error when processed by this source connector, or its transformations or converters. The topic name is blank by default, " +
195+
"which means that no messages are to be recorded in the DLQ.";
196+
public static final String DLQ_TOPIC_DEFAULT = "";
197+
private static final String DLQ_TOPIC_DISPLAY = "Dead Letter Queue Topic Name";
198+
199+
public static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG = DLQ_PREFIX + "topic.replication.factor";
200+
private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC = "Replication factor used to create the dead letter queue topic when it doesn't already exist.";
201+
public static final short DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT = 3;
202+
private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY = "Dead Letter Queue Topic Replication Factor";
203+
204+
public static final String DLQ_CONTEXT_HEADERS_ENABLE_CONFIG = DLQ_PREFIX + "context.headers.enable";
205+
public static final boolean DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT = false;
206+
public static final String DLQ_CONTEXT_HEADERS_ENABLE_DOC = "If true, add headers containing error context to the messages " +
207+
"written to the dead letter queue. To avoid clashing with headers from the original record, all error context header " +
208+
"keys, all error context header keys will start with <code>__connect.errors.</code>";
209+
private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable Error Context Headers";
197210

198211
// Define valid reconnect options
199212
public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = {
@@ -624,6 +637,30 @@ null, new ReadableFile(),
624637
CONFIG_GROUP_MQ, 29,
625638
Width.MEDIUM,
626639
CONFIG_DISPLAY_RECONNECT_DELAY_MAX);
640+
CONFIGDEF.define(DLQ_TOPIC_NAME_CONFIG,
641+
Type.STRING,
642+
DLQ_TOPIC_DEFAULT,
643+
Importance.MEDIUM,
644+
DLQ_TOPIC_NAME_DOC,
645+
CONFIG_GROUP_MQ, 29,
646+
Width.MEDIUM,
647+
DLQ_TOPIC_DISPLAY);
648+
CONFIGDEF.define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG,
649+
Type.SHORT,
650+
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT,
651+
Importance.MEDIUM,
652+
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC,
653+
CONFIG_GROUP_MQ, 30,
654+
Width.MEDIUM,
655+
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY);
656+
CONFIGDEF.define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG,
657+
Type.BOOLEAN,
658+
DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT,
659+
Importance.MEDIUM,
660+
DLQ_CONTEXT_HEADERS_ENABLE_DOC,
661+
CONFIG_GROUP_MQ, 31,
662+
Width.MEDIUM,
663+
DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
627664

628665
CONFIGDEF.define(CONFIG_NAME_TOPIC,
629666
Type.STRING,

src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,14 @@ public enum KeyHeader {
6161
private boolean logErrors;
6262
private boolean logIncludeMessages;
6363
private String dlqTopic = "";
64+
private String queueName = "";
6465

6566
public static final String ERROR_HEADER_EXCEPTION_TIMESTAMP = DeadLetterQueueReporter.HEADER_PREFIX + "timestamp";
6667
public static final String ERROR_HEADER_EXCEPTION_CAUSE_CLASS = DeadLetterQueueReporter.HEADER_PREFIX + "cause.class";
6768
public static final String ERROR_HEADER_EXCEPTION_CAUSE_MESSAGE = DeadLetterQueueReporter.HEADER_PREFIX + "cause.message";
69+
public static final String ERROR_HEADER_JMS_MESSAGE_ID = DeadLetterQueueReporter.HEADER_PREFIX + "jms.message.id";
70+
public static final String ERROR_HEADER_JMS_TIMESTAMP = DeadLetterQueueReporter.HEADER_PREFIX + "jms.timestamp";
71+
public static final String ERROR_HEADER_QUEUE = DeadLetterQueueReporter.HEADER_PREFIX + "mq.queue";
6872

6973
/**
7074
* Configure this class.
@@ -131,10 +135,15 @@ private void initializeErrorTolerance(final Map<String, String> props) {
131135
String.valueOf(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_DEFAULT));
132136
logIncludeMessages = Boolean.parseBoolean(logIncludeMessagesValue);
133137

134-
dlqTopic = props.get(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG);
138+
dlqTopic = props.get(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG);
135139
if (dlqTopic != null && !dlqTopic.isEmpty()) {
136140
dlqTopic = dlqTopic.trim();
137141
}
142+
143+
queueName = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE);
144+
if (queueName != null && !queueName.isEmpty()) {
145+
queueName = queueName.trim();
146+
}
138147
} else {
139148
logErrors = false;
140149
logIncludeMessages = false;
@@ -439,6 +448,15 @@ private Headers createErrorHeaders(final Message message, final String originalT
439448
headers.addString(DeadLetterQueueReporter.ERROR_HEADER_ORIG_TOPIC, originalTopic);
440449
headers.addString(DeadLetterQueueReporter.ERROR_HEADER_EXECUTING_CLASS, exception.getClass().getName());
441450
headers.addString(DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_MESSAGE, exception.getMessage());
451+
452+
try {
453+
headers.addString(ERROR_HEADER_JMS_MESSAGE_ID, message.getJMSMessageID());
454+
headers.addLong(ERROR_HEADER_JMS_TIMESTAMP, message.getJMSTimestamp());
455+
} catch (final JMSException jmsException) {
456+
jmsException.printStackTrace();
457+
}
458+
459+
headers.addString(ERROR_HEADER_QUEUE, queueName);
442460
headers.addLong(ERROR_HEADER_EXCEPTION_TIMESTAMP, System.currentTimeMillis());
443461

444462
// Add cause if available

0 commit comments

Comments
 (0)