From 40943f27609ec3060a9baedadd041c2ec456e65a Mon Sep 17 00:00:00 2001 From: Thomas Bay Date: Mon, 19 May 2025 15:39:18 +0200 Subject: [PATCH] Retry failed bulk operations --- .../sink/Elasticsearch8AsyncWriter.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java index 1163bf26..e44cd719 100644 --- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java @@ -144,15 +144,14 @@ private void handleFailedRequest( List requestEntries, Consumer> requestResult, Throwable error) { - LOG.warn( - "The BulkRequest of {} operation(s) has failed due to: {}", - requestEntries.size(), - error.getMessage()); - LOG.debug("The BulkRequest has failed", error); + LOG.debug("The BulkRequest has failed. Error: {}", error); numRecordsOutErrorsCounter.inc(requestEntries.size()); - if (isRetryable(error.getCause())) { + if (isRetryable(error.getCause() == null ? error : error.getCause())) { + LOG.info("The BulkRequest of {} operation(s) has failed due to: {}:{} - retried (cause {})", requestEntries.size(), error.getClass(), error.getMessage(), error.getCause()); requestResult.accept(requestEntries); + } else { + LOG.warn("The BulkRequest of {} operation(s) has failed due to: {}:{} - not retried (cause {})", requestEntries.size(), error.getClass(), error.getMessage(), error.getCause()); } } @@ -188,7 +187,8 @@ private void handleSuccessfulRequest( } private boolean isRetryable(Throwable error) { - return !ELASTICSEARCH_FATAL_EXCEPTION_CLASSIFIER.isFatal(error, getFatalExceptionCons()); + boolean trueIfExceptionNotThrownElseFalse = ELASTICSEARCH_FATAL_EXCEPTION_CLASSIFIER.isFatal(error, getFatalExceptionCons()); + return trueIfExceptionNotThrownElseFalse ; } @Override