Skip to content

Commit 4d01f2b

Browse files
committed
feat: update headers for dlq record and minor log error changes
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent 113d2a8 commit 4d01f2b

File tree

4 files changed

+351
-55
lines changed

4 files changed

+351
-55
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 240 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import java.util.List;
4444
import java.util.Map;
4545
import java.util.stream.Collectors;
46-
import java.util.stream.Stream;
4746

4847
import javax.jms.BytesMessage;
4948
import javax.jms.JMSException;
@@ -856,11 +855,21 @@ public void shouldPreserveDlqHeadersWithErrorInformation() throws Exception {
856855

857856
// Verify error headers are present
858857
final Headers headers = dlqRecord.headers();
859-
assertThat(headers.lastWithName("original_topic").value()).isEqualTo("mytopic");
860-
assertThat(headers.lastWithName("error_message")).isNotNull();
861-
assertThat(headers.lastWithName("error_class")).isNotNull();
862-
assertThat(headers.lastWithName("error_timestamp")).isNotNull();
863-
assertThat(headers.lastWithName("error_stack_trace")).isNotNull();
858+
assertThat(headers.lastWithName("__connect.errors.topic").value())
859+
.isEqualTo("mytopic");
860+
assertThat(headers.lastWithName("__connect.errors.class.name").value())
861+
.isEqualTo("org.apache.kafka.connect.errors.DataException");
862+
assertThat(headers.lastWithName("__connect.errors.exception.message").value())
863+
.isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: ");
864+
assertThat(headers.lastWithName("__connect.errors.timestamp").value().toString()
865+
.isEmpty()).isFalse();
866+
assertThat(headers.lastWithName("__connect.errors.cause.message").value().toString())
867+
.contains(
868+
"com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')");
869+
assertThat(headers.lastWithName("__connect.errors.cause.class").value())
870+
.isEqualTo("org.apache.kafka.common.errors.SerializationException");
871+
assertThat(headers.lastWithName("__connect.errors.exception.stacktrace").value()
872+
.toString().contains("com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord")).isTrue();
864873

865874
connectTask.commitRecord(dlqRecord);
866875
}
@@ -925,7 +934,7 @@ public void shouldPreserveJmsPropertiesInDlqMessages() throws Exception {
925934
assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source");
926935

927936
final Headers headers = dlqRecord.headers();
928-
assertThat(headers.lastWithName("error_message").value())
937+
assertThat(headers.lastWithName("__connect.errors.exception.message").value())
929938
.isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: ");
930939

931940
connectTask.commitRecord(dlqRecord);
@@ -1009,4 +1018,228 @@ public void shouldContinueProcessingAfterUnhandleableDlqError() throws Exception
10091018
assertThat(processedRecords.get(2).topic()).isEqualTo("__dlq.mq.source");
10101019
assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic");
10111020
}
1021+
1022+
@Test
1023+
public void verifyHeadersWithErrorTolerance() throws Exception {
1024+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1025+
1026+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1027+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
1028+
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
1029+
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1030+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1031+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
1032+
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
1033+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true");
1034+
1035+
connectTask.start(connectorConfigProps);
1036+
1037+
final TextMessage message = getJmsContext().createTextMessage("Invalid JSON message");
1038+
message.setStringProperty("teststring", "myvalue");
1039+
message.setIntProperty("volume", 11);
1040+
message.setDoubleProperty("decimalmeaning", 42.0);
1041+
1042+
// Both invalid and valid messages are received
1043+
final List<Message> testMessages = Arrays.asList(
1044+
message, // Poison message
1045+
getJmsContext().createTextMessage("{ \"i\": 0 }") // Valid message
1046+
);
1047+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages);
1048+
1049+
final List<SourceRecord> processedRecords = connectTask.poll();
1050+
1051+
assertThat(processedRecords).hasSize(2);
1052+
1053+
assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source");
1054+
1055+
final Headers headers = processedRecords.get(0).headers();
1056+
1057+
// Actual headers
1058+
assertThat(headers.lastWithName("teststring").value()).isEqualTo("myvalue");
1059+
assertThat(headers.lastWithName("volume").value()).isEqualTo("11");
1060+
assertThat(headers.lastWithName("decimalmeaning").value()).isEqualTo("42.0");
1061+
1062+
// Expected DLQ Headers
1063+
// ConnectHeaders(headers=[ConnectHeader(key=__connect.errors.topic,
1064+
// value=mytopic, schema=Schema{STRING}),
1065+
// ConnectHeader(key=__connect.errors.class.name,
1066+
// value=org.apache.kafka.connect.errors.DataException, schema=Schema{STRING}),
1067+
// ConnectHeader(key=__connect.errors.exception.message, value=Converting byte[]
1068+
// to Kafka Connect data failed due to serialization error: ,
1069+
// schema=Schema{STRING}), ConnectHeader(key=__connect.errors.timestamp,
1070+
// value=1749036171558, schema=Schema{STRING}),
1071+
// ConnectHeader(key=__connect.errors.cause.message,
1072+
// value=com.fasterxml.jackson.core.JsonParseException: Unrecognized token
1073+
// 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null',
1074+
// 'true' or 'false')
1075+
// at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION`
1076+
// disabled); line: 1, column: 9], schema=Schema{STRING}),
1077+
// ConnectHeader(key=__connect.errors.cause.class,
1078+
// value=org.apache.kafka.common.errors.SerializationException,
1079+
// schema=Schema{STRING}),
1080+
// ConnectHeader(key=__connect.errors.exception.stacktrace,
1081+
// value=org.apache.kafka.connect.errors.DataException: Converting byte[] to
1082+
// Kafka Connect data failed due to serialization error:
1083+
// at
1084+
// org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:333)
1085+
// at
1086+
// com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder.getValue(JsonRecordBuilder.java:81)
1087+
// at
1088+
// com.ibm.eventstreams.connect.mqsource.builders.BaseRecordBuilder.toSourceRecord(BaseRecordBuilder.java:238)
1089+
// at com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord(JMSWork...
1090+
// [truncated], schema=Schema{STRING})])
1091+
assertThat(headers.lastWithName("__connect.errors.topic").value())
1092+
.isEqualTo("mytopic");
1093+
assertThat(headers.lastWithName("__connect.errors.class.name").value())
1094+
.isEqualTo("org.apache.kafka.connect.errors.DataException");
1095+
assertThat(headers.lastWithName("__connect.errors.exception.message").value())
1096+
.isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: ");
1097+
assertThat(headers.lastWithName("__connect.errors.timestamp").value().toString()
1098+
.isEmpty()).isFalse();
1099+
assertThat(headers.lastWithName("__connect.errors.cause.message").value().toString())
1100+
.contains(
1101+
"com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')");
1102+
assertThat(headers.lastWithName("__connect.errors.cause.class").value())
1103+
.isEqualTo("org.apache.kafka.common.errors.SerializationException");
1104+
assertThat(headers.lastWithName("__connect.errors.exception.stacktrace").value()
1105+
.toString().contains("com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord")).isTrue();
1106+
}
1107+
1108+
@Test
1109+
public void verifyLoggingWarningWithErrorTolerance() throws Exception {
1110+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1111+
1112+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1113+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
1114+
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
1115+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "false"); // default; Do not log errors
1116+
// default; Do not log errors with message
1117+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "false");
1118+
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1119+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1120+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
1121+
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
1122+
1123+
connectTask.start(connectorConfigProps);
1124+
1125+
// Both invalid and valid messages are received
1126+
final List<Message> testMessages = Arrays.asList(
1127+
getJmsContext().createTextMessage("Invalid JSON message"), // Poison message
1128+
getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message
1129+
getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message
1130+
getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message
1131+
);
1132+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages);
1133+
1134+
final List<SourceRecord> processedRecords = connectTask.poll();
1135+
1136+
assertThat(processedRecords).hasSize(4);
1137+
1138+
assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source");
1139+
assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic");
1140+
assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic");
1141+
assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic");
1142+
}
1143+
1144+
@Test
1145+
public void verifyLoggingErrorsWithErrorTolerance() throws Exception {
1146+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1147+
1148+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1149+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
1150+
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
1151+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors enabled
1152+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "false");
1153+
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1154+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1155+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
1156+
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
1157+
1158+
connectTask.start(connectorConfigProps);
1159+
1160+
// Both invalid and valid messages are received
1161+
final List<Message> testMessages = Arrays.asList(
1162+
getJmsContext().createTextMessage("Invalid JSON message"), // Poison message
1163+
getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message
1164+
getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message
1165+
getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message
1166+
);
1167+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages);
1168+
1169+
final List<SourceRecord> processedRecords = connectTask.poll();
1170+
1171+
assertThat(processedRecords).hasSize(4);
1172+
1173+
assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source");
1174+
assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic");
1175+
assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic");
1176+
assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic");
1177+
}
1178+
1179+
@Test
1180+
public void verifyLoggingErrorsWithMessage() throws Exception {
1181+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1182+
1183+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1184+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
1185+
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
1186+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors
1187+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); // Log errors with message
1188+
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1189+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1190+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
1191+
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
1192+
1193+
connectTask.start(connectorConfigProps);
1194+
1195+
// Both invalid and valid messages are received
1196+
final List<Message> testMessages = Arrays.asList(
1197+
getJmsContext().createTextMessage("Invalid JSON message"), // Poison message
1198+
getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message
1199+
getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message
1200+
getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message
1201+
);
1202+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages);
1203+
1204+
final List<SourceRecord> processedRecords = connectTask.poll();
1205+
1206+
assertThat(processedRecords).hasSize(4);
1207+
1208+
assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source");
1209+
assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic");
1210+
assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic");
1211+
assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic");
1212+
}
1213+
1214+
@Test
1215+
public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Exception {
1216+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1217+
1218+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1219+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
1220+
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
1221+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors
1222+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); // Log errors with message
1223+
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1224+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1225+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
1226+
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
1227+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1228+
connectorConfigProps.put("value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
1229+
connectTask.start(connectorConfigProps);
1230+
1231+
// Both invalid and valid messages are received
1232+
final List<Message> testMessages = Arrays.asList(
1233+
getJmsContext().createObjectMessage("Invalid message"), // Poison message
1234+
getJmsContext().createTextMessage("Text") // Valid
1235+
);
1236+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages);
1237+
1238+
final List<SourceRecord> processedRecords = connectTask.poll();
1239+
1240+
assertThat(processedRecords).hasSize(2);
1241+
1242+
assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source");
1243+
assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic");
1244+
}
10121245
}

src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.junit.Assert.assertNull;
2121
import static org.junit.Assert.assertThrows;
2222

23+
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Map;
2526

@@ -131,4 +132,22 @@ public void buildFromJmsTestErrorTolerance() throws Exception {
131132
final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
132133
assertNull(record);
133134
}
135+
136+
@Test
137+
public void buildFromJmsTestErrorToleranceNone() throws Exception {
138+
// create MQ message
139+
final TextMessage message = getJmsContext().createTextMessage("Not a valid json string");
140+
141+
// use the builder to convert it to a Kafka record
142+
final JsonRecordBuilder builder = new JsonRecordBuilder();
143+
final HashMap<String, String> config = new HashMap<String, String>();
144+
config.put("errors.tolerance", "none");
145+
config.put("mq.message.body.jms", "true");
146+
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
147+
148+
builder.configure(config);
149+
assertThrows(DataException.class, () -> {
150+
builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
151+
});
152+
}
134153
}

0 commit comments

Comments
 (0)