17
17
import org .apache .kafka .connect .header .ConnectHeaders ;
18
18
import org .apache .kafka .connect .header .Headers ;
19
19
import org .apache .kafka .connect .runtime .ConnectorConfig ;
20
- import org .apache .kafka .connect .runtime .errors .DeadLetterQueueReporter ;
21
20
import org .apache .kafka .connect .runtime .errors .ToleranceType ;
22
21
import org .apache .kafka .connect .source .SourceRecord ;
23
22
import org .slf4j .Logger ;
@@ -35,12 +34,6 @@ public class ErrorHandler {
35
34
36
35
public static final String HEADER_PREFIX = "__connect.errors." ;
37
36
public static final String ERROR_HEADER_ORIG_TOPIC = HEADER_PREFIX + "topic" ;
38
- public static final String ERROR_HEADER_ORIG_PARTITION = HEADER_PREFIX + "partition" ;
39
- public static final String ERROR_HEADER_ORIG_OFFSET = HEADER_PREFIX + "offset" ;
40
- public static final String ERROR_HEADER_CONNECTOR_NAME = HEADER_PREFIX + "connector.name" ;
41
- public static final String ERROR_HEADER_TASK_ID = HEADER_PREFIX + "task.id" ;
42
- public static final String ERROR_HEADER_STAGE = HEADER_PREFIX + "stage" ;
43
- public static final String ERROR_HEADER_EXECUTING_CLASS = HEADER_PREFIX + "class.name" ;
44
37
public static final String ERROR_HEADER_EXCEPTION = HEADER_PREFIX + "exception.class.name" ;
45
38
public static final String ERROR_HEADER_EXCEPTION_MESSAGE = HEADER_PREFIX + "exception.message" ;
46
39
public static final String ERROR_HEADER_EXCEPTION_STACK_TRACE = HEADER_PREFIX + "exception.stacktrace" ;
@@ -282,9 +275,9 @@ private Headers createErrorHeaders(final Message message, final String originalT
282
275
}
283
276
284
277
// Basic error information
285
- headers .addString (DeadLetterQueueReporter . ERROR_HEADER_ORIG_TOPIC , originalTopic );
286
- headers .addString (DeadLetterQueueReporter . ERROR_HEADER_EXECUTING_CLASS , exception .getClass ().getName ());
287
- headers .addString (DeadLetterQueueReporter . ERROR_HEADER_EXCEPTION_MESSAGE , exception .getMessage ());
278
+ headers .addString (ERROR_HEADER_ORIG_TOPIC , originalTopic );
279
+ headers .addString (ERROR_HEADER_EXCEPTION , exception .getClass ().getName ());
280
+ headers .addString (ERROR_HEADER_EXCEPTION_MESSAGE , exception .getMessage ());
288
281
289
282
try {
290
283
headers .addString (ERROR_HEADER_JMS_MESSAGE_ID , message .getJMSMessageID ());
@@ -298,14 +291,14 @@ private Headers createErrorHeaders(final Message message, final String originalT
298
291
299
292
// Add cause if available
300
293
if (exception .getCause () != null ) {
301
- headers .addString (ERROR_HEADER_EXCEPTION_CAUSE_MESSAGE , exception .getCause ().getMessage ());
302
294
headers .addString (ERROR_HEADER_EXCEPTION_CAUSE_CLASS , exception .getCause ().getClass ().getName ());
295
+ headers .addString (ERROR_HEADER_EXCEPTION_CAUSE_MESSAGE , exception .getCause ().getMessage ());
303
296
}
304
297
305
298
// Add first few lines of stack trace (full stack trace might be too large)
306
299
final String stackTrace = getStackTrace (exception );
307
300
if (stackTrace != null ) {
308
- headers .addString (DeadLetterQueueReporter . ERROR_HEADER_EXCEPTION_STACK_TRACE , stackTrace );
301
+ headers .addString (ERROR_HEADER_EXCEPTION_STACK_TRACE , stackTrace );
309
302
}
310
303
311
304
return headers ;
0 commit comments