Skip to content

Commit 9a4b191

Browse files
committed
feat: update headers for dlq record and minor log error changes
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
1 parent d20ac82 commit 9a4b191

File tree

4 files changed

+332
-49
lines changed

4 files changed

+332
-49
lines changed

src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java

Lines changed: 221 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import java.util.List;
4444
import java.util.Map;
4545
import java.util.stream.Collectors;
46-
import java.util.stream.Stream;
4746

4847
import javax.jms.BytesMessage;
4948
import javax.jms.JMSException;
@@ -1006,4 +1005,225 @@ public void shouldContinueProcessingAfterUnhandleableDlqError() throws Exception
10061005
assertThat(processedRecords.get(2).topic()).isEqualTo("__dlq.mq.source");
10071006
assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic");
10081007
}
1008+
1009+
@Test
1010+
public void verifyHeadersWithErrorTolerance() throws Exception {
1011+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1012+
1013+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1014+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
1015+
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
1016+
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1017+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1018+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
1019+
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
1020+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true");
1021+
1022+
connectTask.start(connectorConfigProps);
1023+
1024+
final TextMessage message = getJmsContext().createTextMessage("Invalid JSON message");
1025+
message.setStringProperty("teststring", "myvalue");
1026+
message.setIntProperty("volume", 11);
1027+
message.setDoubleProperty("decimalmeaning", 42.0);
1028+
1029+
// Both invalid and valid messages are received
1030+
final List<Message> testMessages = Arrays.asList(
1031+
message, // Poison message
1032+
getJmsContext().createTextMessage("{ \"i\": 0 }") // Valid message
1033+
);
1034+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages);
1035+
1036+
final List<SourceRecord> processedRecords = connectTask.poll();
1037+
1038+
assertThat(processedRecords).hasSize(2);
1039+
1040+
assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source");
1041+
1042+
// Actual headers
1043+
assertThat(processedRecords.get(0).headers().lastWithName("teststring").value()).isEqualTo("myvalue");
1044+
assertThat(processedRecords.get(0).headers().lastWithName("volume").value()).isEqualTo("11");
1045+
assertThat(processedRecords.get(0).headers().lastWithName("decimalmeaning").value()).isEqualTo("42.0");
1046+
1047+
// Expected DLQ Headers
1048+
// ConnectHeaders(headers=[ConnectHeader(key=__connect.errors.topic,
1049+
// value=mytopic, schema=Schema{STRING}),
1050+
// ConnectHeader(key=__connect.errors.class.name,
1051+
// value=org.apache.kafka.connect.errors.DataException, schema=Schema{STRING}),
1052+
// ConnectHeader(key=__connect.errors.exception.message, value=Converting byte[]
1053+
// to Kafka Connect data failed due to serialization error: ,
1054+
// schema=Schema{STRING}), ConnectHeader(key=__connect.errors.timestamp,
1055+
// value=1749036171558, schema=Schema{STRING}),
1056+
// ConnectHeader(key=__connect.errors.cause.message,
1057+
// value=com.fasterxml.jackson.core.JsonParseException: Unrecognized token
1058+
// 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null',
1059+
// 'true' or 'false')
1060+
// at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION`
1061+
// disabled); line: 1, column: 9], schema=Schema{STRING}),
1062+
// ConnectHeader(key=__connect.errors.cause.class,
1063+
// value=org.apache.kafka.common.errors.SerializationException,
1064+
// schema=Schema{STRING}),
1065+
// ConnectHeader(key=__connect.errors.exception.stacktrace,
1066+
// value=org.apache.kafka.connect.errors.DataException: Converting byte[] to
1067+
// Kafka Connect data failed due to serialization error:
1068+
// at
1069+
// org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:333)
1070+
// at
1071+
// com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder.getValue(JsonRecordBuilder.java:81)
1072+
// at
1073+
// com.ibm.eventstreams.connect.mqsource.builders.BaseRecordBuilder.toSourceRecord(BaseRecordBuilder.java:238)
1074+
// at com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord(JMSWork...
1075+
// [truncated], schema=Schema{STRING})])
1076+
assertThat(processedRecords.get(0).headers().lastWithName("__connect.errors.topic").value())
1077+
.isEqualTo("mytopic");
1078+
assertThat(processedRecords.get(0).headers().lastWithName("__connect.errors.class.name").value())
1079+
.isEqualTo("org.apache.kafka.connect.errors.DataException");
1080+
assertThat(processedRecords.get(0).headers().lastWithName("__connect.errors.exception.message").value())
1081+
.isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: ");
1082+
assertThat(processedRecords.get(0).headers().lastWithName("__connect.errors.timestamp").value().toString()
1083+
.isEmpty()).isFalse();
1084+
assertThat(processedRecords.get(0).headers().lastWithName("__connect.errors.cause.message").value().toString())
1085+
.contains(
1086+
"com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')");
1087+
assertThat(processedRecords.get(0).headers().lastWithName("__connect.errors.cause.class").value())
1088+
.isEqualTo("org.apache.kafka.common.errors.SerializationException");
1089+
assertThat(processedRecords.get(0).headers().lastWithName("__connect.errors.exception.stacktrace").value()
1090+
.toString().contains("com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord")).isTrue();
1091+
}
1092+
1093+
@Test
1094+
public void verifyLoggingWarningWithErrorTolerance() throws Exception {
1095+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1096+
1097+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1098+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
1099+
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
1100+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "false"); // default; Do not log errors
1101+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "false"); // default; Do not log errors with message
1102+
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1103+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1104+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
1105+
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
1106+
1107+
connectTask.start(connectorConfigProps);
1108+
1109+
// Both invalid and valid messages are received
1110+
final List<Message> testMessages = Arrays.asList(
1111+
getJmsContext().createTextMessage("Invalid JSON message"), // Poison message
1112+
getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message
1113+
getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message
1114+
getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message
1115+
);
1116+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages);
1117+
1118+
final List<SourceRecord> processedRecords = connectTask.poll();
1119+
1120+
assertThat(processedRecords).hasSize(4);
1121+
1122+
assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source");
1123+
assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic");
1124+
assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic");
1125+
assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic");
1126+
}
1127+
1128+
@Test
1129+
public void verifyLoggingErrorsWithErrorTolerance() throws Exception {
1130+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1131+
1132+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1133+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
1134+
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
1135+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors enabled
1136+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "false");
1137+
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1138+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1139+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
1140+
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
1141+
1142+
connectTask.start(connectorConfigProps);
1143+
1144+
// Both invalid and valid messages are received
1145+
final List<Message> testMessages = Arrays.asList(
1146+
getJmsContext().createTextMessage("Invalid JSON message"), // Poison message
1147+
getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message
1148+
getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message
1149+
getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message
1150+
);
1151+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages);
1152+
1153+
final List<SourceRecord> processedRecords = connectTask.poll();
1154+
1155+
assertThat(processedRecords).hasSize(4);
1156+
1157+
assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source");
1158+
assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic");
1159+
assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic");
1160+
assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic");
1161+
}
1162+
1163+
@Test
1164+
public void verifyLoggingErrorsWithMessage() throws Exception {
1165+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1166+
1167+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1168+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
1169+
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
1170+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors
1171+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); // Log errors with message
1172+
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1173+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1174+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
1175+
"com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
1176+
1177+
connectTask.start(connectorConfigProps);
1178+
1179+
// Both invalid and valid messages are received
1180+
final List<Message> testMessages = Arrays.asList(
1181+
getJmsContext().createTextMessage("Invalid JSON message"), // Poison message
1182+
getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message
1183+
getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message
1184+
getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message
1185+
);
1186+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages);
1187+
1188+
final List<SourceRecord> processedRecords = connectTask.poll();
1189+
1190+
assertThat(processedRecords).hasSize(4);
1191+
1192+
assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source");
1193+
assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic");
1194+
assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic");
1195+
assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic");
1196+
}
1197+
1198+
@Test
1199+
public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Exception {
1200+
connectTask = getSourceTaskWithEmptyKafkaOffset();
1201+
1202+
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
1203+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic");
1204+
connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all");
1205+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors
1206+
connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); // Log errors with message
1207+
connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source");
1208+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1209+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER,
1210+
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
1211+
connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true");
1212+
connectorConfigProps.put("value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
1213+
connectTask.start(connectorConfigProps);
1214+
1215+
// Both invalid and valid messages are received
1216+
final List<Message> testMessages = Arrays.asList(
1217+
getJmsContext().createObjectMessage("Invalid message"), // Poison message
1218+
getJmsContext().createTextMessage("Text") // Valid
1219+
);
1220+
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages);
1221+
1222+
final List<SourceRecord> processedRecords = connectTask.poll();
1223+
1224+
assertThat(processedRecords).hasSize(2);
1225+
1226+
assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source");
1227+
assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic");
1228+
}
10091229
}

src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.junit.Assert.assertNull;
2121
import static org.junit.Assert.assertThrows;
2222

23+
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Map;
2526

@@ -131,4 +132,22 @@ public void buildFromJmsTestErrorTolerance() throws Exception {
131132
final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
132133
assertNull(record);
133134
}
135+
136+
@Test
137+
public void buildFromJmsTestErrorToleranceNone() throws Exception {
138+
// create MQ message
139+
final TextMessage message = getJmsContext().createTextMessage("Not a valid json string");
140+
141+
// use the builder to convert it to a Kafka record
142+
final JsonRecordBuilder builder = new JsonRecordBuilder();
143+
final HashMap<String, String> config = new HashMap<String, String>();
144+
config.put("errors.tolerance", "none");
145+
config.put("mq.message.body.jms", "true");
146+
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
147+
148+
builder.configure(config);
149+
assertThrows(DataException.class, () -> {
150+
builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
151+
});
152+
}
134153
}

0 commit comments

Comments
 (0)