Skip to content

Support multiple updates on a single document in ES8 client #131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: v3.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class Elasticsearch8AsyncSinkBuilder<InputT>
* The element converter that will be called on every stream element to be processed and
* buffered.
*/
private ElementConverter<InputT, BulkOperationVariant> elementConverter;
private ElementConverter<InputT, List<BulkOperationVariant>> elementConverter;

private SerializableSupplier<SSLContext> sslContextSupplier;

Expand Down Expand Up @@ -202,7 +202,7 @@ public Elasticsearch8AsyncSinkBuilder<InputT> setPassword(String password) {
* @return {@code Elasticsearch8AsyncSinkBuilder}
*/
public Elasticsearch8AsyncSinkBuilder<InputT> setElementConverter(
ElementConverter<InputT, BulkOperationVariant> elementConverter) {
ElementConverter<InputT, List<BulkOperationVariant>> elementConverter) {
checkNotNull(elementConverter);
this.elementConverter = elementConverter;
return this;
Expand Down Expand Up @@ -232,7 +232,7 @@ public Elasticsearch8AsyncSink<InputT> build() {
}

private OperationConverter<InputT> buildOperationConverter(
ElementConverter<InputT, BulkOperationVariant> converter) {
ElementConverter<InputT, List<BulkOperationVariant>> converter) {
return converter != null ? new OperationConverter<>(converter) : null;
}

Expand All @@ -244,9 +244,9 @@ private NetworkConfig buildNetworkConfig() {

/** A wrapper that evolves the Operation, since a BulkOperationVariant is not Serializable. */
public static class OperationConverter<T> implements ElementConverter<T, Operation> {
private final ElementConverter<T, BulkOperationVariant> converter;
private final ElementConverter<T, List<BulkOperationVariant>> converter;

public OperationConverter(ElementConverter<T, BulkOperationVariant> converter) {
public OperationConverter(ElementConverter<T, List<BulkOperationVariant>> converter) {
this.converter = converter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@ protected void submitRequestEntries(

BulkRequest.Builder br = new BulkRequest.Builder();
for (Operation operation : requestEntries) {
br.operations(new BulkOperation(operation.getBulkOperationVariant()));
operation
.getBulkOperationVariant()
.forEach(
bulkOperationVariant -> {
br.operations(new BulkOperation(bulkOperationVariant));
});
}

esClient.bulk(br.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,30 @@
import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant;

import java.io.Serializable;
import java.util.List;
import java.util.Objects;

/** A single stream element which contains a BulkOperationVariant. */
public class Operation implements Serializable {
private static final long serialVersionUID = 1L;

private final BulkOperationVariant bulkOperationVariant;
private final List<BulkOperationVariant> bulkOperationVariants;

public Operation(BulkOperationVariant bulkOperation) {
this.bulkOperationVariant = bulkOperation;
public Operation(List<BulkOperationVariant> bulkOperations) {
this.bulkOperationVariants = bulkOperations;
}

public BulkOperationVariant getBulkOperationVariant() {
return bulkOperationVariant;
public List<BulkOperationVariant> getBulkOperationVariant() {
return bulkOperationVariants;
}

@Override
public int hashCode() {
return Objects.hash(bulkOperationVariant);
return Objects.hash(bulkOperationVariants);
}

@Override
public String toString() {
return "Operation{" + "bulkOperationVariant=" + bulkOperationVariant + '}';
return "Operation{" + "bulkOperationVariant=" + bulkOperationVariants + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.http.HttpHost;
import org.junit.jupiter.api.Test;

import java.util.Arrays;

import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

/** Tests for {@link Elasticsearch8AsyncSinkBuilder}. */
Expand All @@ -46,10 +48,11 @@ void testThrowExceptionIfHostsAreNotProvided() {
Elasticsearch8AsyncSinkBuilder.<String>builder()
.setElementConverter(
(element, ctx) ->
new DeleteOperation.Builder()
.id("test")
.index("test")
.build())
Arrays.asList(
new DeleteOperation.Builder()
.id("test")
.index("test")
.build()))
.build())
.isInstanceOf(NullPointerException.class);
}
Expand All @@ -62,10 +65,11 @@ void testThrowExceptionIfHostsIsNull() {
.setHosts(null)
.setElementConverter(
(element, ctx) ->
new DeleteOperation.Builder()
.id("test")
.index("test")
.build())
Arrays.asList(
new DeleteOperation.Builder()
.id("test")
.index("test")
.build()))
.build())
.isInstanceOf(NullPointerException.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.testcontainers.junit.jupiter.Testcontainers;

import java.io.IOException;
import java.util.Arrays;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

Expand Down Expand Up @@ -74,11 +75,12 @@ public void testWriteToElasticsearch() throws Exception {
ES_CONTAINER.getFirstMappedPort()))
.setElementConverter(
(element, ctx) ->
new IndexOperation.Builder<>()
.index(index)
.id(element.getId())
.document(element)
.build())
Arrays.asList(
new IndexOperation.Builder<>()
.index(index)
.id(element.getId())
.document(element)
.build()))
.build();

env.fromElements("first", "second", "third", "fourth", "fifth")
Expand Down Expand Up @@ -110,11 +112,12 @@ public void testRecovery() throws Exception {
ES_CONTAINER.getFirstMappedPort()))
.setElementConverter(
(element, ctx) ->
new IndexOperation.Builder<>()
.index(index)
.id(element.getId())
.document(element)
.build())
Arrays.asList(
new IndexOperation.Builder<>()
.index(index)
.id(element.getId())
.document(element)
.build()))
.build();

env.fromElements("first", "second", "third", "fourth", "fifth")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.testcontainers.junit.jupiter.Testcontainers;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;

import static org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBaseITCase.DummyData;
import static org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBaseITCase.ELASTICSEARCH_IMAGE;
Expand Down Expand Up @@ -93,11 +95,12 @@ void testWriteToSecureElasticsearch8() throws Exception {
"https"))
.setElementConverter(
(element, ctx) ->
new IndexOperation.Builder<>()
.index(index)
.id(element.getId())
.document(element)
.build())
Collections.singletonList(
new IndexOperation.Builder<>()
.index(index)
.id(element.getId())
.document(element)
.build()))
.setUsername(ES_CLUSTER_USERNAME)
.setPassword(ES_CLUSTER_PASSWORD)
.setSslContextSupplier(() -> ES_CONTAINER.createSslContextFromCa())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.testcontainers.junit.jupiter.Testcontainers;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -186,16 +187,18 @@ public void testHandlePartiallyFailedBulk() throws Exception {
Elasticsearch8AsyncSinkBuilder.OperationConverter<DummyData> elementConverter =
new Elasticsearch8AsyncSinkBuilder.OperationConverter<>(
(element, ctx) ->
new UpdateOperation.Builder<>()
.id(element.getId())
.index(index)
.action(
ac ->
ac.doc(element)
.docAsUpsert(
element.getId()
.equals("test-2")))
.build());
Arrays.asList(
new UpdateOperation.Builder<>()
.id(element.getId())
.index(index)
.action(
ac ->
ac.doc(element)
.docAsUpsert(
element.getId()
.equals(
"test-2")))
.build()));

try (final Elasticsearch8AsyncWriter<DummyData> writer =
createWriter(index, maxBatchSize, elementConverter)) {
Expand All @@ -214,11 +217,12 @@ public void testHandlePartiallyFailedBulk() throws Exception {
getDefaultTestElementConverter(String index) {
return new Elasticsearch8AsyncSinkBuilder.OperationConverter<>(
(element, ctx) ->
new IndexOperation.Builder<DummyData>()
.id(element.getId())
.document(element)
.index(index)
.build());
Collections.singletonList(
new IndexOperation.Builder<DummyData>()
.id(element.getId())
.document(element)
.index(index)
.build()));
}

private Elasticsearch8AsyncWriter<DummyData> createWriter(String index, int maxBatchSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -43,8 +44,8 @@
public class OperationSerializerTest {
@ParameterizedTest
@MethodSource("operations")
public void testSerializeAndDeserialize(BulkOperationVariant operationVariant) {
Operation expectedState = new Operation(operationVariant);
public void testSerializeAndDeserialize(List<BulkOperationVariant> operationVariants) {
Operation expectedState = new Operation(operationVariants);

final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(bytes);
Expand Down