Skip to content

Commit e9bbc91

Browse files
committed
feat: move out the error handling part to a seperate class
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent 6616083 commit e9bbc91

File tree

4 files changed

+433
-284
lines changed

4 files changed

+433
-284
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -833,6 +833,7 @@ public void shouldPreserveDlqHeadersWithErrorInformation() throws Exception {
833833
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
834834
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
835835
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
836+
connectorConfigProps.put(MQSourceConnector.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true");
836837
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
837838
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
838839
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -918,6 +919,7 @@ public void shouldPreserveJmsPropertiesInDlqMessages() throws Exception {
918919
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
919920
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
920921
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
922+
connectorConfigProps.put(MQSourceConnector.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true");
921923
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
922924
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true");
923925
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
@@ -1028,6 +1030,7 @@ public void verifyHeadersWithErrorTolerance() throws Exception {
10281030
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
10291031
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
10301032
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1033+
connectorConfigProps.put(MQSourceConnector.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true");
10311034
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
10321035
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
10331036
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
@@ -1106,6 +1109,56 @@ public void verifyHeadersWithErrorTolerance() throws Exception {
11061109
.toString().contains("com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord")).isTrue();
11071110
}
11081111

1112+
@Test
1113+
public void verifyHeadersWithErrorTolerance_WithDLQHeaderContextDisabled() throws Exception {
1114+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1115+
1116+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1117+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
1118+
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
1119+
connectorConfigProps.put(MQSourceConnector.DLQ_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1120+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1121+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
1122+
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
1123+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true");
1124+
1125+
connectTask.start(connectorConfigProps);
1126+
1127+
final TextMessage message = getJmsContext().createTextMessage("Invalid JSON message");
1128+
message.setStringProperty("teststring", "myvalue");
1129+
message.setIntProperty("volume", 11);
1130+
message.setDoubleProperty("decimalmeaning", 42.0);
1131+
1132+
// Both invalid and valid messages are received
1133+
final List<Message> testMessages = Arrays.asList(
1134+
message, // Poison message
1135+
getJmsContext().createTextMessage("{ \"i\": 0 }") // Valid message
1136+
);
1137+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages);
1138+
1139+
final List<SourceRecord> processedRecords = connectTask.poll();
1140+
1141+
assertThat(processedRecords).hasSize(2);
1142+
1143+
final SourceRecord dlqRecord = processedRecords.get(0);
1144+
assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source");
1145+
1146+
final Headers headers = dlqRecord.headers();
1147+
1148+
// Actual headers
1149+
assertThat(headers.lastWithName("teststring").value()).isEqualTo("myvalue");
1150+
assertThat(headers.lastWithName("volume").value()).isEqualTo("11");
1151+
assertThat(headers.lastWithName("decimalmeaning").value()).isEqualTo("42.0");
1152+
1153+
assertThat(headers.lastWithName("__connect.errors.topic")).isNull();
1154+
assertThat(headers.lastWithName("__connect.errors.class.name")).isNull();
1155+
assertThat(headers.lastWithName("__connect.errors.exception.message")).isNull();
1156+
assertThat(headers.lastWithName("__connect.errors.timestamp")).isNull();
1157+
assertThat(headers.lastWithName("__connect.errors.cause.message")).isNull();
1158+
assertThat(headers.lastWithName("__connect.errors.cause.class")).isNull();
1159+
assertThat(headers.lastWithName("__connect.errors.exception.stacktrace")).isNull();
1160+
}
1161+
11091162
@Test
11101163
public void verifyLoggingWarningWithErrorTolerance() throws Exception {
11111164
connectTask = getSourceTaskWithEmptyKafkaOffset();

src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,6 @@ public class MQSourceConnector extends SourceConnector {
190190
public static final String DLQ_TOPIC_DEFAULT = "";
191191
private static final String DLQ_TOPIC_DISPLAY = "Dead Letter Queue Topic Name";
192192

193-
public static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG = DLQ_PREFIX + "topic.replication.factor";
194-
private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC = "Replication factor used to create the dead letter queue topic when it doesn't already exist.";
195-
public static final short DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT = 3;
196-
private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY = "Dead Letter Queue Topic Replication Factor";
197-
198193
public static final String DLQ_CONTEXT_HEADERS_ENABLE_CONFIG = DLQ_PREFIX + "context.headers.enable";
199194
public static final boolean DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT = false;
200195
public static final String DLQ_CONTEXT_HEADERS_ENABLE_DOC = "If true, add headers containing error context to the messages " +
@@ -628,20 +623,12 @@ null, new ReadableFile(),
628623
CONFIG_GROUP_MQ, 29,
629624
Width.MEDIUM,
630625
DLQ_TOPIC_DISPLAY);
631-
CONFIGDEF.define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG,
632-
Type.SHORT,
633-
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT,
634-
Importance.MEDIUM,
635-
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC,
636-
CONFIG_GROUP_MQ, 30,
637-
Width.MEDIUM,
638-
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY);
639626
CONFIGDEF.define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG,
640627
Type.BOOLEAN,
641628
DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT,
642629
Importance.MEDIUM,
643630
DLQ_CONTEXT_HEADERS_ENABLE_DOC,
644-
CONFIG_GROUP_MQ, 31,
631+
CONFIG_GROUP_MQ, 30,
645632
Width.MEDIUM,
646633
DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
647634

0 commit comments

Comments
 (0)