1
+ /**
2
+ * Copyright 2025 IBM Corporation
3
+ *
4
+ * Licensed under the Apache License, Version 2.0 (the "License");
5
+ * you may not use this file except in compliance with the License.
6
+ * You may obtain a copy of the License at
7
+ *
8
+ * http://www.apache.org/licenses/LICENSE-2.0
9
+ *
10
+ * Unless required by applicable law or agreed to in writing, software
11
+ * distributed under the License is distributed on an "AS IS" BASIS,
12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ * See the License for the specific language governing permissions and
14
+ * limitations under the License.
15
+ */
1
16
package com .ibm .eventstreams .connect .mqsource .util ;
2
17
3
18
import static java .nio .charset .StandardCharsets .UTF_8 ;
17
32
import org .apache .kafka .connect .header .ConnectHeaders ;
18
33
import org .apache .kafka .connect .header .Headers ;
19
34
import org .apache .kafka .connect .runtime .ConnectorConfig ;
20
- import org .apache .kafka .connect .runtime .errors .DeadLetterQueueReporter ;
21
35
import org .apache .kafka .connect .runtime .errors .ToleranceType ;
22
36
import org .apache .kafka .connect .source .SourceRecord ;
23
37
import org .slf4j .Logger ;
@@ -35,12 +49,6 @@ public class ErrorHandler {
35
49
36
50
public static final String HEADER_PREFIX = "__connect.errors." ;
37
51
public static final String ERROR_HEADER_ORIG_TOPIC = HEADER_PREFIX + "topic" ;
38
- public static final String ERROR_HEADER_ORIG_PARTITION = HEADER_PREFIX + "partition" ;
39
- public static final String ERROR_HEADER_ORIG_OFFSET = HEADER_PREFIX + "offset" ;
40
- public static final String ERROR_HEADER_CONNECTOR_NAME = HEADER_PREFIX + "connector.name" ;
41
- public static final String ERROR_HEADER_TASK_ID = HEADER_PREFIX + "task.id" ;
42
- public static final String ERROR_HEADER_STAGE = HEADER_PREFIX + "stage" ;
43
- public static final String ERROR_HEADER_EXECUTING_CLASS = HEADER_PREFIX + "class.name" ;
44
52
public static final String ERROR_HEADER_EXCEPTION = HEADER_PREFIX + "exception.class.name" ;
45
53
public static final String ERROR_HEADER_EXCEPTION_MESSAGE = HEADER_PREFIX + "exception.message" ;
46
54
public static final String ERROR_HEADER_EXCEPTION_STACK_TRACE = HEADER_PREFIX + "exception.stacktrace" ;
@@ -113,7 +121,6 @@ private void initializeErrorTolerance(final Map<String, String> props) {
113
121
dlqTopic = props .get (MQSourceConnector .DLQ_TOPIC_NAME_CONFIG );
114
122
if (dlqTopic != null && !dlqTopic .isEmpty ()) {
115
123
dlqTopic = dlqTopic .trim ();
116
- // TODO: Check if DLQ topic exists
117
124
}
118
125
119
126
queueName = props .get (MQSourceConnector .CONFIG_NAME_MQ_QUEUE );
@@ -282,9 +289,9 @@ private Headers createErrorHeaders(final Message message, final String originalT
282
289
}
283
290
284
291
// Basic error information
285
- headers .addString (DeadLetterQueueReporter . ERROR_HEADER_ORIG_TOPIC , originalTopic );
286
- headers .addString (DeadLetterQueueReporter . ERROR_HEADER_EXECUTING_CLASS , exception .getClass ().getName ());
287
- headers .addString (DeadLetterQueueReporter . ERROR_HEADER_EXCEPTION_MESSAGE , exception .getMessage ());
292
+ headers .addString (ERROR_HEADER_ORIG_TOPIC , originalTopic );
293
+ headers .addString (ERROR_HEADER_EXCEPTION , exception .getClass ().getName ());
294
+ headers .addString (ERROR_HEADER_EXCEPTION_MESSAGE , exception .getMessage ());
288
295
289
296
try {
290
297
headers .addString (ERROR_HEADER_JMS_MESSAGE_ID , message .getJMSMessageID ());
@@ -298,14 +305,14 @@ private Headers createErrorHeaders(final Message message, final String originalT
298
305
299
306
// Add cause if available
300
307
if (exception .getCause () != null ) {
301
- headers .addString (ERROR_HEADER_EXCEPTION_CAUSE_MESSAGE , exception .getCause ().getMessage ());
302
308
headers .addString (ERROR_HEADER_EXCEPTION_CAUSE_CLASS , exception .getCause ().getClass ().getName ());
309
+ headers .addString (ERROR_HEADER_EXCEPTION_CAUSE_MESSAGE , exception .getCause ().getMessage ());
303
310
}
304
311
305
312
// Add first few lines of stack trace (full stack trace might be too large)
306
313
final String stackTrace = getStackTrace (exception );
307
314
if (stackTrace != null ) {
308
- headers .addString (DeadLetterQueueReporter . ERROR_HEADER_EXCEPTION_STACK_TRACE , stackTrace );
315
+ headers .addString (ERROR_HEADER_EXCEPTION_STACK_TRACE , stackTrace );
309
316
}
310
317
311
318
return headers ;
0 commit comments