diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkBuilder.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkBuilder.java index 31e81200..9244bf3f 100644 --- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkBuilder.java +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkBuilder.java @@ -78,7 +78,7 @@ public class Elasticsearch8AsyncSinkBuilder * The element converter that will be called on every stream element to be processed and * buffered. */ - private ElementConverter elementConverter; + private ElementConverter> elementConverter; private SerializableSupplier sslContextSupplier; @@ -202,7 +202,7 @@ public Elasticsearch8AsyncSinkBuilder setPassword(String password) { * @return {@code Elasticsearch8AsyncSinkBuilder} */ public Elasticsearch8AsyncSinkBuilder setElementConverter( - ElementConverter elementConverter) { + ElementConverter> elementConverter) { checkNotNull(elementConverter); this.elementConverter = elementConverter; return this; @@ -232,7 +232,7 @@ public Elasticsearch8AsyncSink build() { } private OperationConverter buildOperationConverter( - ElementConverter converter) { + ElementConverter> converter) { return converter != null ? new OperationConverter<>(converter) : null; } @@ -244,9 +244,9 @@ private NetworkConfig buildNetworkConfig() { /** A wrapper that evolves the Operation, since a BulkOperationVariant is not Serializable. */ public static class OperationConverter implements ElementConverter { - private final ElementConverter converter; + private final ElementConverter> converter; - public OperationConverter(ElementConverter converter) { + public OperationConverter(ElementConverter> converter) { this.converter = converter; } diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java index 1163bf26..b5502689 100644 --- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java @@ -123,7 +123,12 @@ protected void submitRequestEntries( BulkRequest.Builder br = new BulkRequest.Builder(); for (Operation operation : requestEntries) { - br.operations(new BulkOperation(operation.getBulkOperationVariant())); + operation + .getBulkOperationVariant() + .forEach( + bulkOperationVariant -> { + br.operations(new BulkOperation(bulkOperationVariant)); + }); } esClient.bulk(br.build()) diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Operation.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Operation.java index 2cbdaa69..06243a04 100644 --- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Operation.java +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Operation.java @@ -24,29 +24,30 @@ import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant; import java.io.Serializable; +import java.util.List; import java.util.Objects; /** A single stream element which contains a BulkOperationVariant. */ public class Operation implements Serializable { private static final long serialVersionUID = 1L; - private final BulkOperationVariant bulkOperationVariant; + private final List bulkOperationVariants; - public Operation(BulkOperationVariant bulkOperation) { - this.bulkOperationVariant = bulkOperation; + public Operation(List bulkOperations) { + this.bulkOperationVariants = bulkOperations; } - public BulkOperationVariant getBulkOperationVariant() { - return bulkOperationVariant; + public List getBulkOperationVariant() { + return bulkOperationVariants; } @Override public int hashCode() { - return Objects.hash(bulkOperationVariant); + return Objects.hash(bulkOperationVariants); } @Override public String toString() { - return "Operation{" + "bulkOperationVariant=" + bulkOperationVariant + '}'; + return "Operation{" + "bulkOperationVariant=" + bulkOperationVariants + '}'; } } diff --git a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkBuilderTest.java b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkBuilderTest.java index 33b453a2..bb9fd87a 100644 --- a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkBuilderTest.java +++ b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkBuilderTest.java @@ -25,6 +25,8 @@ import org.apache.http.HttpHost; import org.junit.jupiter.api.Test; +import java.util.Arrays; + import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; /** Tests for {@link Elasticsearch8AsyncSinkBuilder}. */ @@ -46,10 +48,11 @@ void testThrowExceptionIfHostsAreNotProvided() { Elasticsearch8AsyncSinkBuilder.builder() .setElementConverter( (element, ctx) -> - new DeleteOperation.Builder() - .id("test") - .index("test") - .build()) + Arrays.asList( + new DeleteOperation.Builder() + .id("test") + .index("test") + .build())) .build()) .isInstanceOf(NullPointerException.class); } @@ -62,10 +65,11 @@ void testThrowExceptionIfHostsIsNull() { .setHosts(null) .setElementConverter( (element, ctx) -> - new DeleteOperation.Builder() - .id("test") - .index("test") - .build()) + Arrays.asList( + new DeleteOperation.Builder() + .id("test") + .index("test") + .build())) .build()) .isInstanceOf(NullPointerException.class); } diff --git a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkITCase.java b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkITCase.java index 22defb14..ccbff114 100644 --- a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkITCase.java +++ b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkITCase.java @@ -35,6 +35,7 @@ import org.testcontainers.junit.jupiter.Testcontainers; import java.io.IOException; +import java.util.Arrays; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -74,11 +75,12 @@ public void testWriteToElasticsearch() throws Exception { ES_CONTAINER.getFirstMappedPort())) .setElementConverter( (element, ctx) -> - new IndexOperation.Builder<>() - .index(index) - .id(element.getId()) - .document(element) - .build()) + Arrays.asList( + new IndexOperation.Builder<>() + .index(index) + .id(element.getId()) + .document(element) + .build())) .build(); env.fromElements("first", "second", "third", "fourth", "fifth") @@ -110,11 +112,12 @@ public void testRecovery() throws Exception { ES_CONTAINER.getFirstMappedPort())) .setElementConverter( (element, ctx) -> - new IndexOperation.Builder<>() - .index(index) - .id(element.getId()) - .document(element) - .build()) + Arrays.asList( + new IndexOperation.Builder<>() + .index(index) + .id(element.getId()) + .document(element) + .build())) .build(); env.fromElements("first", "second", "third", "fourth", "fifth") diff --git a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkSecureITCase.java b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkSecureITCase.java index ba546166..6bfee454 100644 --- a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkSecureITCase.java +++ b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkSecureITCase.java @@ -44,6 +44,8 @@ import org.testcontainers.junit.jupiter.Testcontainers; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; import static org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBaseITCase.DummyData; import static org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBaseITCase.ELASTICSEARCH_IMAGE; @@ -93,11 +95,12 @@ void testWriteToSecureElasticsearch8() throws Exception { "https")) .setElementConverter( (element, ctx) -> - new IndexOperation.Builder<>() - .index(index) - .id(element.getId()) - .document(element) - .build()) + Collections.singletonList( + new IndexOperation.Builder<>() + .index(index) + .id(element.getId()) + .document(element) + .build())) .setUsername(ES_CLUSTER_USERNAME) .setPassword(ES_CLUSTER_PASSWORD) .setSslContextSupplier(() -> ES_CONTAINER.createSslContextFromCa()) diff --git a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java index ba41ad88..8680b183 100644 --- a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java +++ b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java @@ -34,6 +34,7 @@ import org.testcontainers.junit.jupiter.Testcontainers; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -186,16 +187,18 @@ public void testHandlePartiallyFailedBulk() throws Exception { Elasticsearch8AsyncSinkBuilder.OperationConverter elementConverter = new Elasticsearch8AsyncSinkBuilder.OperationConverter<>( (element, ctx) -> - new UpdateOperation.Builder<>() - .id(element.getId()) - .index(index) - .action( - ac -> - ac.doc(element) - .docAsUpsert( - element.getId() - .equals("test-2"))) - .build()); + Arrays.asList( + new UpdateOperation.Builder<>() + .id(element.getId()) + .index(index) + .action( + ac -> + ac.doc(element) + .docAsUpsert( + element.getId() + .equals( + "test-2"))) + .build())); try (final Elasticsearch8AsyncWriter writer = createWriter(index, maxBatchSize, elementConverter)) { @@ -214,11 +217,12 @@ public void testHandlePartiallyFailedBulk() throws Exception { getDefaultTestElementConverter(String index) { return new Elasticsearch8AsyncSinkBuilder.OperationConverter<>( (element, ctx) -> - new IndexOperation.Builder() - .id(element.getId()) - .document(element) - .index(index) - .build()); + Collections.singletonList( + new IndexOperation.Builder() + .id(element.getId()) + .document(element) + .index(index) + .build())); } private Elasticsearch8AsyncWriter createWriter(String index, int maxBatchSize) diff --git a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/OperationSerializerTest.java b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/OperationSerializerTest.java index 5c45d7cf..11b7daf4 100644 --- a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/OperationSerializerTest.java +++ b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/OperationSerializerTest.java @@ -35,6 +35,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.util.Collections; +import java.util.List; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -43,8 +44,8 @@ public class OperationSerializerTest { @ParameterizedTest @MethodSource("operations") - public void testSerializeAndDeserialize(BulkOperationVariant operationVariant) { - Operation expectedState = new Operation(operationVariant); + public void testSerializeAndDeserialize(List operationVariants) { + Operation expectedState = new Operation(operationVariants); final ByteArrayOutputStream bytes = new ByteArrayOutputStream(); final DataOutputStream out = new DataOutputStream(bytes);