Skip to content

Commit 7acac2e

Browse files
committed
feat: move out the error handling part to a seperate class
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent 0c0ee51 commit 7acac2e

File tree

4 files changed

+433
-284
lines changed

4 files changed

+433
-284
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -836,6 +836,7 @@ public void shouldPreserveDlqHeadersWithErrorInformation() throws Exception {
836836
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
837837
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
838838
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
839+
connectorConfigProps.put(MQSourceConnector.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true");
839840
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
840841
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
841842
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -921,6 +922,7 @@ public void shouldPreserveJmsPropertiesInDlqMessages() throws Exception {
921922
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
922923
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
923924
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
925+
connectorConfigProps.put(MQSourceConnector.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true");
924926
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
925927
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true");
926928
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
@@ -1031,6 +1033,7 @@ public void verifyHeadersWithErrorTolerance() throws Exception {
10311033
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
10321034
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
10331035
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1036+
connectorConfigProps.put(MQSourceConnector.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true");
10341037
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
10351038
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
10361039
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -1109,6 +1112,56 @@ public void verifyHeadersWithErrorTolerance() throws Exception {
11091112
.toString().contains("com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord")).isTrue();
11101113
}
11111114

1115+
@Test
1116+
public void verifyHeadersWithErrorTolerance_WithDLQHeaderContextDisabled() throws Exception {
1117+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1118+
1119+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1120+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
1121+
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
1122+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1123+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1124+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
1125+
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
1126+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true");
1127+
1128+
connectTask.start(connectorConfigProps);
1129+
1130+
final TextMessage message = getJmsContext().createTextMessage("Invalid JSON message");
1131+
message.setStringProperty("teststring", "myvalue");
1132+
message.setIntProperty("volume", 11);
1133+
message.setDoubleProperty("decimalmeaning", 42.0);
1134+
1135+
// Both invalid and valid messages are received
1136+
final List<Message> testMessages = Arrays.asList(
1137+
message, // Poison message
1138+
getJmsContext().createTextMessage("{ \"i\": 0 }") // Valid message
1139+
);
1140+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages);
1141+
1142+
final List<SourceRecord> processedRecords = connectTask.poll();
1143+
1144+
assertThat(processedRecords).hasSize(2);
1145+
1146+
final SourceRecord dlqRecord = processedRecords.get(0);
1147+
assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source");
1148+
1149+
final Headers headers = dlqRecord.headers();
1150+
1151+
// Actual headers
1152+
assertThat(headers.lastWithName("teststring").value()).isEqualTo("myvalue");
1153+
assertThat(headers.lastWithName("volume").value()).isEqualTo("11");
1154+
assertThat(headers.lastWithName("decimalmeaning").value()).isEqualTo("42.0");
1155+
1156+
assertThat(headers.lastWithName("__connect.errors.topic")).isNull();
1157+
assertThat(headers.lastWithName("__connect.errors.class.name")).isNull();
1158+
assertThat(headers.lastWithName("__connect.errors.exception.message")).isNull();
1159+
assertThat(headers.lastWithName("__connect.errors.timestamp")).isNull();
1160+
assertThat(headers.lastWithName("__connect.errors.cause.message")).isNull();
1161+
assertThat(headers.lastWithName("__connect.errors.cause.class")).isNull();
1162+
assertThat(headers.lastWithName("__connect.errors.exception.stacktrace")).isNull();
1163+
}
1164+
11121165
@Test
11131166
public void verifyLoggingWarningWithErrorTolerance() throws Exception {
11141167
connectTask = getSourceTaskWithEmptyKafkaOffset();

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -196,11 +196,6 @@ public class MQSourceConnector extends SourceConnector {
196196
public static final String DLQ_TOPIC_DEFAULT = "";
197197
private static final String DLQ_TOPIC_DISPLAY = "Dead Letter Queue Topic Name";
198198

199-
public static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG = DLQ_PREFIX + "topic.replication.factor";
200-
private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC = "Replication factor used to create the dead letter queue topic when it doesn't already exist.";
201-
public static final short DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT = 3;
202-
private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY = "Dead Letter Queue Topic Replication Factor";
203-
204199
public static final String DLQ_CONTEXT_HEADERS_ENABLE_CONFIG = DLQ_PREFIX + "context.headers.enable";
205200
public static final boolean DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT = false;
206201
public static final String DLQ_CONTEXT_HEADERS_ENABLE_DOC = "If true, add headers containing error context to the messages " +
@@ -645,20 +640,12 @@ null, new ReadableFile(),
645640
CONFIG_GROUP_MQ, 29,
646641
Width.MEDIUM,
647642
DLQ_TOPIC_DISPLAY);
648-
CONFIGDEF.define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG,
649-
Type.SHORT,
650-
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT,
651-
Importance.MEDIUM,
652-
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC,
653-
CONFIG_GROUP_MQ, 30,
654-
Width.MEDIUM,
655-
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY);
656643
CONFIGDEF.define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG,
657644
Type.BOOLEAN,
658645
DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT,
659646
Importance.MEDIUM,
660647
DLQ_CONTEXT_HEADERS_ENABLE_DOC,
661-
CONFIG_GROUP_MQ, 31,
648+
CONFIG_GROUP_MQ, 30,
662649
Width.MEDIUM,
663650
DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
664651

0 commit comments

Comments
 (0)