|
43 | 43 | import java.util.List;
|
44 | 44 | import java.util.Map;
|
45 | 45 | import java.util.stream.Collectors;
|
46 |
| -import java.util.stream.Stream; |
47 | 46 |
|
48 | 47 | import javax.jms.BytesMessage;
|
49 | 48 | import javax.jms.JMSException;
|
@@ -1006,4 +1005,225 @@ public void shouldContinueProcessingAfterUnhandleableDlqError() throws Exception
|
1006 | 1005 | assertThat(processedRecords.get(2).topic()).isEqualTo("__dlq.mq.source");
|
1007 | 1006 | assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic");
|
1008 | 1007 | }
|
| 1008 | + |
| 1009 | + @Test |
| 1010 | + public void verifyHeadersWithErrorTolerance() throws Exception { |
| 1011 | + connectTask = getSourceTaskWithEmptyKafkaOffset(); |
| 1012 | + |
| 1013 | + final Map<String, String> connectorConfigProps = createDefaultConnectorProperties(); |
| 1014 | + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); |
| 1015 | + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); |
| 1016 | + connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source"); |
| 1017 | + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); |
| 1018 | + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, |
| 1019 | + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); |
| 1020 | + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true"); |
| 1021 | + |
| 1022 | + connectTask.start(connectorConfigProps); |
| 1023 | + |
| 1024 | + final TextMessage message = getJmsContext().createTextMessage("Invalid JSON message"); |
| 1025 | + message.setStringProperty("teststring", "myvalue"); |
| 1026 | + message.setIntProperty("volume", 11); |
| 1027 | + message.setDoubleProperty("decimalmeaning", 42.0); |
| 1028 | + |
| 1029 | + // Both invalid and valid messages are received |
| 1030 | + final List<Message> testMessages = Arrays.asList( |
| 1031 | + message, // Poison message |
| 1032 | + getJmsContext().createTextMessage("{ \"i\": 0 }") // Valid message |
| 1033 | + ); |
| 1034 | + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); |
| 1035 | + |
| 1036 | + final List<SourceRecord> processedRecords = connectTask.poll(); |
| 1037 | + |
| 1038 | + assertThat(processedRecords).hasSize(2); |
| 1039 | + |
| 1040 | + assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); |
| 1041 | + |
| 1042 | + // Actual headers |
| 1043 | + assertThat(processedRecords.get(0).headers().lastWithName("teststring").value()).isEqualTo("myvalue"); |
| 1044 | + assertThat(processedRecords.get(0).headers().lastWithName("volume").value()).isEqualTo("11"); |
| 1045 | + assertThat(processedRecords.get(0).headers().lastWithName("decimalmeaning").value()).isEqualTo("42.0"); |
| 1046 | + |
| 1047 | + // Expected DLQ Headers |
| 1048 | + // ConnectHeaders(headers=[ConnectHeader(key=__connect.errors.topic, |
| 1049 | + // value=mytopic, schema=Schema{STRING}), |
| 1050 | + // ConnectHeader(key=__connect.errors.class.name, |
| 1051 | + // value=org.apache.kafka.connect.errors.DataException, schema=Schema{STRING}), |
| 1052 | + // ConnectHeader(key=__connect.errors.exception.message, value=Converting byte[] |
| 1053 | + // to Kafka Connect data failed due to serialization error: , |
| 1054 | + // schema=Schema{STRING}), ConnectHeader(key=__connect.errors.timestamp, |
| 1055 | + // value=1749036171558, schema=Schema{STRING}), |
| 1056 | + // ConnectHeader(key=__connect.errors.cause.message, |
| 1057 | + // value=com.fasterxml.jackson.core.JsonParseException: Unrecognized token |
| 1058 | + // 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null', |
| 1059 | + // 'true' or 'false') |
| 1060 | + // at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` |
| 1061 | + // disabled); line: 1, column: 9], schema=Schema{STRING}), |
| 1062 | + // ConnectHeader(key=__connect.errors.cause.class, |
| 1063 | + // value=org.apache.kafka.common.errors.SerializationException, |
| 1064 | + // schema=Schema{STRING}), |
| 1065 | + // ConnectHeader(key=__connect.errors.exception.stacktrace, |
| 1066 | + // value=org.apache.kafka.connect.errors.DataException: Converting byte[] to |
| 1067 | + // Kafka Connect data failed due to serialization error: |
| 1068 | + // at |
| 1069 | + // org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:333) |
| 1070 | + // at |
| 1071 | + // com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder.getValue(JsonRecordBuilder.java:81) |
| 1072 | + // at |
| 1073 | + // com.ibm.eventstreams.connect.mqsource.builders.BaseRecordBuilder.toSourceRecord(BaseRecordBuilder.java:238) |
| 1074 | + // at com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord(JMSWork... |
| 1075 | + // [truncated], schema=Schema{STRING})]) |
| 1076 | + assertThat(processedRecords.get(0).headers().lastWithName("__connect.errors.topic").value()) |
| 1077 | + .isEqualTo("mytopic"); |
| 1078 | + assertThat(processedRecords.get(0).headers().lastWithName("__connect.errors.class.name").value()) |
| 1079 | + .isEqualTo("org.apache.kafka.connect.errors.DataException"); |
| 1080 | + assertThat(processedRecords.get(0).headers().lastWithName("__connect.errors.exception.message").value()) |
| 1081 | + .isEqualTo("Converting byte[] to Kafka Connect data failed due to serialization error: "); |
| 1082 | + assertThat(processedRecords.get(0).headers().lastWithName("__connect.errors.timestamp").value().toString() |
| 1083 | + .isEmpty()).isFalse(); |
| 1084 | + assertThat(processedRecords.get(0).headers().lastWithName("__connect.errors.cause.message").value().toString()) |
| 1085 | + .contains( |
| 1086 | + "com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')"); |
| 1087 | + assertThat(processedRecords.get(0).headers().lastWithName("__connect.errors.cause.class").value()) |
| 1088 | + .isEqualTo("org.apache.kafka.common.errors.SerializationException"); |
| 1089 | + assertThat(processedRecords.get(0).headers().lastWithName("__connect.errors.exception.stacktrace").value() |
| 1090 | + .toString().contains("com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord")).isTrue(); |
| 1091 | + } |
| 1092 | + |
| 1093 | + @Test |
| 1094 | + public void verifyLoggingWarningWithErrorTolerance() throws Exception { |
| 1095 | + connectTask = getSourceTaskWithEmptyKafkaOffset(); |
| 1096 | + |
| 1097 | + final Map<String, String> connectorConfigProps = createDefaultConnectorProperties(); |
| 1098 | + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); |
| 1099 | + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); |
| 1100 | + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "false"); // default; Do not log errors |
| 1101 | + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "false"); // default; Do not log errors with message |
| 1102 | + connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source"); |
| 1103 | + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); |
| 1104 | + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, |
| 1105 | + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); |
| 1106 | + |
| 1107 | + connectTask.start(connectorConfigProps); |
| 1108 | + |
| 1109 | + // Both invalid and valid messages are received |
| 1110 | + final List<Message> testMessages = Arrays.asList( |
| 1111 | + getJmsContext().createTextMessage("Invalid JSON message"), // Poison message |
| 1112 | + getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message |
| 1113 | + getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message |
| 1114 | + getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message |
| 1115 | + ); |
| 1116 | + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); |
| 1117 | + |
| 1118 | + final List<SourceRecord> processedRecords = connectTask.poll(); |
| 1119 | + |
| 1120 | + assertThat(processedRecords).hasSize(4); |
| 1121 | + |
| 1122 | + assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); |
| 1123 | + assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); |
| 1124 | + assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic"); |
| 1125 | + assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic"); |
| 1126 | + } |
| 1127 | + |
| 1128 | + @Test |
| 1129 | + public void verifyLoggingErrorsWithErrorTolerance() throws Exception { |
| 1130 | + connectTask = getSourceTaskWithEmptyKafkaOffset(); |
| 1131 | + |
| 1132 | + final Map<String, String> connectorConfigProps = createDefaultConnectorProperties(); |
| 1133 | + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); |
| 1134 | + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); |
| 1135 | + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors enabled |
| 1136 | + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "false"); |
| 1137 | + connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source"); |
| 1138 | + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); |
| 1139 | + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, |
| 1140 | + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); |
| 1141 | + |
| 1142 | + connectTask.start(connectorConfigProps); |
| 1143 | + |
| 1144 | + // Both invalid and valid messages are received |
| 1145 | + final List<Message> testMessages = Arrays.asList( |
| 1146 | + getJmsContext().createTextMessage("Invalid JSON message"), // Poison message |
| 1147 | + getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message |
| 1148 | + getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message |
| 1149 | + getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message |
| 1150 | + ); |
| 1151 | + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); |
| 1152 | + |
| 1153 | + final List<SourceRecord> processedRecords = connectTask.poll(); |
| 1154 | + |
| 1155 | + assertThat(processedRecords).hasSize(4); |
| 1156 | + |
| 1157 | + assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); |
| 1158 | + assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); |
| 1159 | + assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic"); |
| 1160 | + assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic"); |
| 1161 | + } |
| 1162 | + |
| 1163 | + @Test |
| 1164 | + public void verifyLoggingErrorsWithMessage() throws Exception { |
| 1165 | + connectTask = getSourceTaskWithEmptyKafkaOffset(); |
| 1166 | + |
| 1167 | + final Map<String, String> connectorConfigProps = createDefaultConnectorProperties(); |
| 1168 | + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); |
| 1169 | + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); |
| 1170 | + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors |
| 1171 | + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); // Log errors with message |
| 1172 | + connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source"); |
| 1173 | + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); |
| 1174 | + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, |
| 1175 | + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); |
| 1176 | + |
| 1177 | + connectTask.start(connectorConfigProps); |
| 1178 | + |
| 1179 | + // Both invalid and valid messages are received |
| 1180 | + final List<Message> testMessages = Arrays.asList( |
| 1181 | + getJmsContext().createTextMessage("Invalid JSON message"), // Poison message |
| 1182 | + getJmsContext().createTextMessage("{ \"i\": 0 }"), // Valid message |
| 1183 | + getJmsContext().createTextMessage("{ \"i\": 1 }"), // Valid message |
| 1184 | + getJmsContext().createTextMessage("{ \"i\": 2 }") // Valid message |
| 1185 | + ); |
| 1186 | + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); |
| 1187 | + |
| 1188 | + final List<SourceRecord> processedRecords = connectTask.poll(); |
| 1189 | + |
| 1190 | + assertThat(processedRecords).hasSize(4); |
| 1191 | + |
| 1192 | + assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); |
| 1193 | + assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); |
| 1194 | + assertThat(processedRecords.get(2).topic()).isEqualTo("mytopic"); |
| 1195 | + assertThat(processedRecords.get(3).topic()).isEqualTo("mytopic"); |
| 1196 | + } |
| 1197 | + |
| 1198 | + @Test |
| 1199 | + public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Exception { |
| 1200 | + connectTask = getSourceTaskWithEmptyKafkaOffset(); |
| 1201 | + |
| 1202 | + final Map<String, String> connectorConfigProps = createDefaultConnectorProperties(); |
| 1203 | + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); |
| 1204 | + connectorConfigProps.put(ConnectorConfig.ERRORS_TOLERANCE_CONFIG, "all"); |
| 1205 | + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); // Log errors |
| 1206 | + connectorConfigProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); // Log errors with message |
| 1207 | + connectorConfigProps.put(MQSourceConnector.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "__dlq.mq.source"); |
| 1208 | + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); |
| 1209 | + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, |
| 1210 | + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); |
| 1211 | + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); |
| 1212 | + connectorConfigProps.put("value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); |
| 1213 | + connectTask.start(connectorConfigProps); |
| 1214 | + |
| 1215 | + // Both invalid and valid messages are received |
| 1216 | + final List<Message> testMessages = Arrays.asList( |
| 1217 | + getJmsContext().createObjectMessage("Invalid message"), // Poison message |
| 1218 | + getJmsContext().createTextMessage("Text") // Valid |
| 1219 | + ); |
| 1220 | + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); |
| 1221 | + |
| 1222 | + final List<SourceRecord> processedRecords = connectTask.poll(); |
| 1223 | + |
| 1224 | + assertThat(processedRecords).hasSize(2); |
| 1225 | + |
| 1226 | + assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source"); |
| 1227 | + assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic"); |
| 1228 | + } |
1009 | 1229 | }
|
0 commit comments