Skip to content

Commit da2ef1f

Browse files
liuml07reswqa
authored andcommitted
[FLINK-35504] Improve Elasticsearch 8 connector observability
1 parent 50327f8 commit da2ef1f

File tree

1 file changed

+20
-3
lines changed

1 file changed

+20
-3
lines changed

flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,13 @@ public class Elasticsearch8AsyncWriter<InputT> extends AsyncSinkWriter<InputT, O
6262
private boolean close = false;
6363

6464
private final Counter numRecordsOutErrorsCounter;
65+
/**
66+
* A counter to track number of records that are returned by Elasticsearch as failed and then
67+
* retried by this writer.
68+
*/
69+
private final Counter numRecordsSendPartialFailureCounter;
70+
/** A counter to track the number of bulk requests that are sent to Elasticsearch. */
71+
private final Counter numRequestSubmittedCounter;
6572

6673
private static final FatalExceptionClassifier ELASTICSEARCH_FATAL_EXCEPTION_CLASSIFIER =
6774
FatalExceptionClassifier.createChain(
@@ -103,11 +110,15 @@ public Elasticsearch8AsyncWriter(
103110
checkNotNull(metricGroup);
104111

105112
this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter();
113+
this.numRecordsSendPartialFailureCounter =
114+
metricGroup.counter("numRecordsSendPartialFailure");
115+
this.numRequestSubmittedCounter = metricGroup.counter("numRequestSubmitted");
106116
}
107117

108118
@Override
109119
protected void submitRequestEntries(
110120
List<Operation> requestEntries, Consumer<List<Operation>> requestResult) {
121+
numRequestSubmittedCounter.inc();
111122
LOG.debug("submitRequestEntries with {} items", requestEntries.size());
112123

113124
BulkRequest.Builder br = new BulkRequest.Builder();
@@ -133,7 +144,11 @@ private void handleFailedRequest(
133144
List<Operation> requestEntries,
134145
Consumer<List<Operation>> requestResult,
135146
Throwable error) {
136-
LOG.debug("The BulkRequest of {} operation(s) has failed.", requestEntries.size());
147+
LOG.warn(
148+
"The BulkRequest of {} operation(s) has failed due to: {}",
149+
requestEntries.size(),
150+
error.getMessage());
151+
LOG.debug("The BulkRequest has failed", error);
137152
numRecordsOutErrorsCounter.inc(requestEntries.size());
138153

139154
if (isRetryable(error.getCause())) {
@@ -145,15 +160,17 @@ private void handlePartiallyFailedRequest(
145160
List<Operation> requestEntries,
146161
Consumer<List<Operation>> requestResult,
147162
BulkResponse response) {
163+
LOG.debug("The BulkRequest has failed partially. Response: {}", response);
148164
ArrayList<Operation> failedItems = new ArrayList<>();
149165
for (int i = 0; i < response.items().size(); i++) {
150166
if (response.items().get(i).error() != null) {
151-
numRecordsOutErrorsCounter.inc();
152167
failedItems.add(requestEntries.get(i));
153168
}
154169
}
155170

156-
LOG.debug(
171+
numRecordsOutErrorsCounter.inc(failedItems.size());
172+
numRecordsSendPartialFailureCounter.inc(failedItems.size());
173+
LOG.info(
157174
"The BulkRequest with {} operation(s) has {} failure(s). It took {}ms",
158175
requestEntries.size(),
159176
failedItems.size(),

0 commit comments

Comments
 (0)