Skip to content

Commit 7336ce4

Browse files
committed
feat: update headers for dlq record and minor log error changes
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent d20ac82 commit 7336ce4

File tree

4 files changed

+351
-55
lines changed

4 files changed

+351
-55
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 240 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import java.util.List;
4444
import java.util.Map;
4545
import java.util.stream.Collectors;
46-
import java.util.stream.Stream;
4746

4847
import javax.jms.BytesMessage;
4948
import javax.jms.JMSException;
@@ -853,11 +852,21 @@ public void shouldPreserveDlqHeadersWithErrorInformation() throws Exception {
853852

854853
// Verify error headers are present
855854
final Headers headers = dlqRecord.headers();
856-
assertThat(headers.lastWithName("original_topic").value()).isEqualTo("mytopic");
857-
assertThat(headers.lastWithName("error_message")).isNotNull();
858-
assertThat(headers.lastWithName("error_class")).isNotNull();
859-
assertThat(headers.lastWithName("error_timestamp")).isNotNull();
860-
assertThat(headers.lastWithName("error_stack_trace")).isNotNull();
855+
assertThat(headers.lastWithName("__connect.errors.topic").value())
856+
.isEqualTo("mytopic");
857+
assertThat(headers.lastWithName("__connect.errors.class.name").value())
858+
.isEqualTo("org.apache.kafka.connect.errors.DataException");
859+
assertThat(headers.lastWithName("__connect.errors.exception.message").value())
860+
.isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: ");
861+
assertThat(headers.lastWithName("__connect.errors.timestamp").value().toString()
862+
.isEmpty()).isFalse();
863+
assertThat(headers.lastWithName("__connect.errors.cause.message").value().toString())
864+
.contains(
865+
"com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')");
866+
assertThat(headers.lastWithName("__connect.errors.cause.class").value())
867+
.isEqualTo("org.apache.kafka.common.errors.SerializationException");
868+
assertThat(headers.lastWithName("__connect.errors.exception.stacktrace").value()
869+
.toString().contains("com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord")).isTrue();
861870

862871
connectTask.commitRecord(dlqRecord);
863872
}
@@ -922,7 +931,7 @@ public void shouldPreserveJmsPropertiesInDlqMessages() throws Exception {
922931
assertThat(dlqRecord.topic()).isEqualTo("__dlq.mq.source");
923932

924933
final Headers headers = dlqRecord.headers();
925-
assertThat(headers.lastWithName("error_message").value())
934+
assertThat(headers.lastWithName("__connect.errors.exception.message").value())
926935
.isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: ");
927936

928937
connectTask.commitRecord(dlqRecord);
@@ -1006,4 +1015,228 @@ public void shouldContinueProcessingAfterUnhandleableDlqError() throws Exception
10061015
assertThat(processedRecords.get(2).topic()).isEqualTo("__dlq.mq.source");
10071016
assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic");
10081017
}
1018+
1019+
@Test
1020+
public void verifyHeadersWithErrorTolerance() throws Exception {
1021+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1022+
1023+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1024+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
1025+
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
1026+
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1027+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1028+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
1029+
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
1030+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true");
1031+
1032+
connectTask.start(connectorConfigProps);
1033+
1034+
final TextMessage message = getJmsContext().createTextMessage("Invalid JSON message");
1035+
message.setStringProperty("teststring", "myvalue");
1036+
message.setIntProperty("volume", 11);
1037+
message.setDoubleProperty("decimalmeaning", 42.0);
1038+
1039+
// Both invalid and valid messages are received
1040+
final List<Message> testMessages = Arrays.asList(
1041+
message, // Poison message
1042+
getJmsContext().createTextMessage("{ \"i\": 0 }") // Valid message
1043+
);
1044+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages);
1045+
1046+
final List<SourceRecord> processedRecords = connectTask.poll();
1047+
1048+
assertThat(processedRecords).hasSize(2);
1049+
1050+
assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source");
1051+
1052+
final Headers headers = processedRecords.get(0).headers();
1053+
1054+
// Actual headers
1055+
assertThat(headers.lastWithName("teststring").value()).isEqualTo("myvalue");
1056+
assertThat(headers.lastWithName("volume").value()).isEqualTo("11");
1057+
assertThat(headers.lastWithName("decimalmeaning").value()).isEqualTo("42.0");
1058+
1059+
// Expected DLQ Headers
1060+
// ConnectHeaders(headers=[ConnectHeader(key=__connect.errors.topic,
1061+
// value=mytopic, schema=Schema{STRING}),
1062+
// ConnectHeader(key=__connect.errors.class.name,
1063+
// value=org.apache.kafka.connect.errors.DataException, schema=Schema{STRING}),
1064+
// ConnectHeader(key=__connect.errors.exception.message, value=Converting byte[]
1065+
// to Kafka Connect data failed due to serialization error: ,
1066+
// schema=Schema{STRING}), ConnectHeader(key=__connect.errors.timestamp,
1067+
// value=1749036171558, schema=Schema{STRING}),
1068+
// ConnectHeader(key=__connect.errors.cause.message,
1069+
// value=com.fasterxml.jackson.core.JsonParseException: Unrecognized token
1070+
// 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null',
1071+
// 'true' or 'false')
1072+
// at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION`
1073+
// disabled); line: 1, column: 9], schema=Schema{STRING}),
1074+
// ConnectHeader(key=__connect.errors.cause.class,
1075+
// value=org.apache.kafka.common.errors.SerializationException,
1076+
// schema=Schema{STRING}),
1077+
// ConnectHeader(key=__connect.errors.exception.stacktrace,
1078+
// value=org.apache.kafka.connect.errors.DataException: Converting byte[] to
1079+
// Kafka Connect data failed due to serialization error:
1080+
// at
1081+
// org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:333)
1082+
// at
1083+
// com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder.getValue(JsonRecordBuilder.java:81)
1084+
// at
1085+
// com.ibm.eventstreams.connect.mqsource.builders.BaseRecordBuilder.toSourceRecord(BaseRecordBuilder.java:238)
1086+
// at com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord(JMSWork...
1087+
// [truncated], schema=Schema{STRING})])
1088+
assertThat(headers.lastWithName("__connect.errors.topic").value())
1089+
.isEqualTo("mytopic");
1090+
assertThat(headers.lastWithName("__connect.errors.class.name").value())
1091+
.isEqualTo("org.apache.kafka.connect.errors.DataException");
1092+
assertThat(headers.lastWithName("__connect.errors.exception.message").value())
1093+
.isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: ");
1094+
assertThat(headers.lastWithName("__connect.errors.timestamp").value().toString()
1095+
.isEmpty()).isFalse();
1096+
assertThat(headers.lastWithName("__connect.errors.cause.message").value().toString())
1097+
.contains(
1098+
"com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')");
1099+
assertThat(headers.lastWithName("__connect.errors.cause.class").value())
1100+
.isEqualTo("org.apache.kafka.common.errors.SerializationException");
1101+
assertThat(headers.lastWithName("__connect.errors.exception.stacktrace").value()
1102+
.toString().contains("com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord")).isTrue();
1103+
}
1104+
1105+
@Test
1106+
public void verifyLoggingWarningWithErrorTolerance() throws Exception {
1107+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1108+
1109+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1110+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
1111+
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
1112+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "false"); // default; Do not log errors
1113+
// default; Do not log errors with message
1114+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "false");
1115+
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1116+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1117+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
1118+
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
1119+
1120+
connectTask.start(connectorConfigProps);
1121+
1122+
// Both invalid and valid messages are received
1123+
final List<Message> testMessages = Arrays.asList(
1124+
getJmsContext().createTextMessage("Invalid JSON message"), // Poison message
1125+
getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message
1126+
getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message
1127+
getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message
1128+
);
1129+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages);
1130+
1131+
final List<SourceRecord> processedRecords = connectTask.poll();
1132+
1133+
assertThat(processedRecords).hasSize(4);
1134+
1135+
assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source");
1136+
assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic");
1137+
assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic");
1138+
assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic");
1139+
}
1140+
1141+
@Test
1142+
public void verifyLoggingErrorsWithErrorTolerance() throws Exception {
1143+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1144+
1145+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1146+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
1147+
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
1148+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors enabled
1149+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "false");
1150+
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1151+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1152+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
1153+
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
1154+
1155+
connectTask.start(connectorConfigProps);
1156+
1157+
// Both invalid and valid messages are received
1158+
final List<Message> testMessages = Arrays.asList(
1159+
getJmsContext().createTextMessage("Invalid JSON message"), // Poison message
1160+
getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message
1161+
getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message
1162+
getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message
1163+
);
1164+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages);
1165+
1166+
final List<SourceRecord> processedRecords = connectTask.poll();
1167+
1168+
assertThat(processedRecords).hasSize(4);
1169+
1170+
assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source");
1171+
assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic");
1172+
assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic");
1173+
assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic");
1174+
}
1175+
1176+
@Test
1177+
public void verifyLoggingErrorsWithMessage() throws Exception {
1178+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1179+
1180+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1181+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
1182+
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
1183+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors
1184+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); // Log errors with message
1185+
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1186+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1187+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
1188+
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
1189+
1190+
connectTask.start(connectorConfigProps);
1191+
1192+
// Both invalid and valid messages are received
1193+
final List<Message> testMessages = Arrays.asList(
1194+
getJmsContext().createTextMessage("Invalid JSON message"), // Poison message
1195+
getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message
1196+
getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message
1197+
getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message
1198+
);
1199+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages);
1200+
1201+
final List<SourceRecord> processedRecords = connectTask.poll();
1202+
1203+
assertThat(processedRecords).hasSize(4);
1204+
1205+
assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source");
1206+
assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic");
1207+
assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic");
1208+
assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic");
1209+
}
1210+
1211+
@Test
1212+
public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Exception {
1213+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1214+
1215+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1216+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
1217+
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
1218+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors
1219+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); // Log errors with message
1220+
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1221+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1222+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
1223+
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
1224+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1225+
connectorConfigProps.put("value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
1226+
connectTask.start(connectorConfigProps);
1227+
1228+
// Both invalid and valid messages are received
1229+
final List<Message> testMessages = Arrays.asList(
1230+
getJmsContext().createObjectMessage("Invalid message"), // Poison message
1231+
getJmsContext().createTextMessage("Text") // Valid
1232+
);
1233+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages);
1234+
1235+
final List<SourceRecord> processedRecords = connectTask.poll();
1236+
1237+
assertThat(processedRecords).hasSize(2);
1238+
1239+
assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source");
1240+
assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic");
1241+
}
10091242
}

src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.junit.Assert.assertNull;
2121
import static org.junit.Assert.assertThrows;
2222

23+
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Map;
2526

@@ -131,4 +132,22 @@ public void buildFromJmsTestErrorTolerance() throws Exception {
131132
final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
132133
assertNull(record);
133134
}
135+
136+
@Test
137+
public void buildFromJmsTestErrorToleranceNone() throws Exception {
138+
// create MQ message
139+
final TextMessage message = getJmsContext().createTextMessage("Not a valid json string");
140+
141+
// use the builder to convert it to a Kafka record
142+
final JsonRecordBuilder builder = new JsonRecordBuilder();
143+
final HashMap<String, String> config = new HashMap<String, String>();
144+
config.put("errors.tolerance", "none");
145+
config.put("mq.message.body.jms", "true");
146+
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
147+
148+
builder.configure(config);
149+
assertThrows(DataException.class, () -> {
150+
builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
151+
});
152+
}
134153
}

0 commit comments

Comments
 (0)