Skip to content

Commit 4a96cfc

Browse files
authored
[FLINK-37517][[Connectors/Elasticsearch] Bump flink to 2.0.0
Bump flink to 2.0
1 parent c80d39b commit 4a96cfc

File tree

84 files changed

+670
-8927
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+670
-8927
lines changed

.github/workflows/push_pr.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ jobs:
2828
compile_and_test:
2929
strategy:
3030
matrix:
31-
flink: [ 1.19-SNAPSHOT, 1.20-SNAPSHOT ]
32-
jdk: [ '8, 11, 17, 21' ]
31+
flink: [ 2.1-SNAPSHOT ]
32+
jdk: [ '11', '17, 21' ]
3333
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
3434
with:
3535
flink_version: ${{ matrix.flink }}

.github/workflows/weekly.yml

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,9 @@ jobs:
3030
strategy:
3131
matrix:
3232
flink_branches: [{
33-
flink: 1.19-SNAPSHOT,
34-
branch: main
35-
}, {
36-
flink: 1.20-SNAPSHOT,
37-
branch: main
33+
flink: 2.1-SNAPSHOT,
34+
branch: main,
35+
jdk: '11, 17, 21'
3836
}, {
3937
flink: 1.19.1,
4038
branch: v3.0
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.elasticsearch;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.java.tuple.Tuple2;
23+
24+
import org.apache.http.HttpHost;
25+
import org.elasticsearch.action.search.SearchRequest;
26+
27+
import java.io.IOException;
28+
import java.io.Serializable;
29+
import java.util.List;
30+
31+
/**
32+
* An {@link ElasticsearchApiCallBridge} is used to bridge incompatible Elasticsearch Java API calls
33+
* across different versions. This includes calls to create Elasticsearch clients, handle failed
34+
* item responses, etc. Any incompatible Elasticsearch Java APIs should be bridged using this
35+
* interface.
36+
*
37+
* <p>Implementations are allowed to be stateful. For example, for Elasticsearch 1.x, since
38+
* connecting via an embedded node is allowed, the call bridge will hold reference to the created
39+
* embedded node. Each instance of the sink will hold exactly one instance of the call bridge, and
40+
* state cleanup is performed when the sink is closed.
41+
*
42+
* @param <C> The Elasticsearch client, that implements {@link AutoCloseable}.
43+
*/
44+
@Internal
45+
public interface ElasticsearchApiCallBridge<C extends AutoCloseable> extends Serializable {
46+
47+
/**
48+
* Creates an Elasticsearch client implementing {@link AutoCloseable}.
49+
*
50+
* @return The created client.
51+
*/
52+
C createClient(NetworkClientConfig networkClientConfig, List<HttpHost> hosts);
53+
54+
/**
55+
* Executes a search using the Search API.
56+
*
57+
* @param client the Elasticsearch client.
58+
* @param searchRequest A request to execute search against one or more indices (or all).
59+
*/
60+
Tuple2<String, String[]> search(C client, SearchRequest searchRequest) throws IOException;
61+
62+
/**
63+
* Closes this client and releases any system resources associated with it.
64+
*
65+
* @param client the Elasticsearch client.
66+
*/
67+
void close(C client) throws IOException;
68+
}
Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.flink.connector.elasticsearch.sink;
19+
package org.apache.flink.connector.elasticsearch;
2020

2121
import org.apache.flink.util.function.SerializableSupplier;
2222

@@ -26,7 +26,8 @@
2626

2727
import java.io.Serializable;
2828

29-
class NetworkClientConfig implements Serializable {
29+
/** Network config for es client. */
30+
public class NetworkClientConfig implements Serializable {
3031

3132
@Nullable private final String username;
3233
@Nullable private final String password;
@@ -37,7 +38,7 @@ class NetworkClientConfig implements Serializable {
3738
@Nullable private final SerializableSupplier<SSLContext> sslContextSupplier;
3839
@Nullable private final SerializableSupplier<HostnameVerifier> sslHostnameVerifier;
3940

40-
NetworkClientConfig(
41+
public NetworkClientConfig(
4142
@Nullable String username,
4243
@Nullable String password,
4344
@Nullable String connectionPathPrefix,
@@ -95,4 +96,69 @@ public SerializableSupplier<SSLContext> getSSLContextSupplier() {
9596
public SerializableSupplier<HostnameVerifier> getSslHostnameVerifier() {
9697
return sslHostnameVerifier;
9798
}
99+
100+
/** Builder for {@link NetworkClientConfig}. */
101+
public static class Builder {
102+
private String username;
103+
private String password;
104+
private String connectionPathPrefix;
105+
private Integer connectionRequestTimeout;
106+
private Integer connectionTimeout;
107+
private Integer socketTimeout;
108+
private SerializableSupplier<SSLContext> sslContextSupplier;
109+
private SerializableSupplier<HostnameVerifier> sslHostnameVerifier;
110+
111+
public Builder setUsername(String username) {
112+
this.username = username;
113+
return this;
114+
}
115+
116+
public Builder setPassword(String password) {
117+
this.password = password;
118+
return this;
119+
}
120+
121+
public Builder setConnectionPathPrefix(String connectionPathPrefix) {
122+
this.connectionPathPrefix = connectionPathPrefix;
123+
return this;
124+
}
125+
126+
public Builder setConnectionRequestTimeout(Integer connectionRequestTimeout) {
127+
this.connectionRequestTimeout = connectionRequestTimeout;
128+
return this;
129+
}
130+
131+
public Builder setConnectionTimeout(Integer connectionTimeout) {
132+
this.connectionTimeout = connectionTimeout;
133+
return this;
134+
}
135+
136+
public Builder setSocketTimeout(Integer socketTimeout) {
137+
this.socketTimeout = socketTimeout;
138+
return this;
139+
}
140+
141+
public Builder setSslContextSupplier(SerializableSupplier<SSLContext> sslContextSupplier) {
142+
this.sslContextSupplier = sslContextSupplier;
143+
return this;
144+
}
145+
146+
public Builder setSslHostnameVerifier(
147+
SerializableSupplier<HostnameVerifier> sslHostnameVerifier) {
148+
this.sslHostnameVerifier = sslHostnameVerifier;
149+
return this;
150+
}
151+
152+
public NetworkClientConfig build() {
153+
return new NetworkClientConfig(
154+
username,
155+
password,
156+
connectionPathPrefix,
157+
connectionRequestTimeout,
158+
connectionTimeout,
159+
socketTimeout,
160+
sslContextSupplier,
161+
sslHostnameVerifier);
162+
}
163+
}
98164
}
Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.flink.streaming.connectors.elasticsearch.table;
19+
package org.apache.flink.connector.elasticsearch.lookup;
2020

2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.api.common.serialization.DeserializationSchema;
2323
import org.apache.flink.api.java.tuple.Tuple2;
24-
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
24+
import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge;
25+
import org.apache.flink.connector.elasticsearch.NetworkClientConfig;
2526
import org.apache.flink.table.connector.source.LookupTableSource;
2627
import org.apache.flink.table.data.RowData;
2728
import org.apache.flink.table.data.util.DataFormatConverters;
@@ -31,6 +32,7 @@
3132
import org.apache.flink.util.FlinkRuntimeException;
3233
import org.apache.flink.util.Preconditions;
3334

35+
import org.apache.http.HttpHost;
3436
import org.elasticsearch.action.search.SearchRequest;
3537
import org.elasticsearch.common.Strings;
3638
import org.elasticsearch.index.query.BoolQueryBuilder;
@@ -44,6 +46,7 @@
4446
import java.util.Arrays;
4547
import java.util.Collection;
4648
import java.util.Collections;
49+
import java.util.List;
4750
import java.util.Map;
4851
import java.util.stream.Collectors;
4952
import java.util.stream.IntStream;
@@ -72,6 +75,8 @@ public class ElasticsearchRowDataLookupFunction<C extends AutoCloseable> extends
7275
private SearchSourceBuilder searchSourceBuilder;
7376

7477
private final ElasticsearchApiCallBridge<C> callBridge;
78+
private final NetworkClientConfig networkClientConfig;
79+
private final List<HttpHost> hosts;
7580

7681
private transient C client;
7782

@@ -83,13 +88,17 @@ public ElasticsearchRowDataLookupFunction(
8388
String[] producedNames,
8489
DataType[] producedTypes,
8590
String[] lookupKeys,
91+
List<HttpHost> hosts,
92+
NetworkClientConfig networkClientConfig,
8693
ElasticsearchApiCallBridge<C> callBridge) {
8794

8895
checkNotNull(deserializationSchema, "No DeserializationSchema supplied.");
8996
checkNotNull(maxRetryTimes, "No maxRetryTimes supplied.");
9097
checkNotNull(producedNames, "No fieldNames supplied.");
9198
checkNotNull(producedTypes, "No fieldTypes supplied.");
9299
checkNotNull(lookupKeys, "No keyNames supplied.");
100+
checkNotNull(hosts, "No hosts supplied.");
101+
checkNotNull(networkClientConfig, "No networkClientConfig supplied.");
93102
checkNotNull(callBridge, "No ElasticsearchApiCallBridge supplied.");
94103

95104
this.deserializationSchema = deserializationSchema;
@@ -110,12 +119,14 @@ public ElasticsearchRowDataLookupFunction(
110119
converters[i] = DataFormatConverters.getConverterForDataType(producedTypes[position]);
111120
}
112121

122+
this.networkClientConfig = networkClientConfig;
123+
this.hosts = hosts;
113124
this.callBridge = callBridge;
114125
}
115126

116127
@Override
117128
public void open(FunctionContext context) throws Exception {
118-
this.client = callBridge.createClient();
129+
this.client = callBridge.createClient(networkClientConfig, hosts);
119130

120131
// Set searchRequest in open method in case of amount of calling in eval method when every
121132
// record comes.

flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import org.apache.flink.annotation.VisibleForTesting;
2323
import org.apache.flink.api.connector.sink2.Sink;
2424
import org.apache.flink.api.connector.sink2.SinkWriter;
25+
import org.apache.flink.api.connector.sink2.WriterInitContext;
2526
import org.apache.flink.connector.base.DeliveryGuarantee;
27+
import org.apache.flink.connector.elasticsearch.NetworkClientConfig;
2628
import org.apache.flink.connector.elasticsearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
2729

2830
import org.apache.http.HttpHost;
@@ -82,7 +84,7 @@ public class ElasticsearchSink<IN> implements Sink<IN> {
8284
}
8385

8486
@Override
85-
public SinkWriter<IN> createWriter(InitContext context) throws IOException {
87+
public SinkWriter<IN> createWriter(WriterInitContext context) throws IOException {
8688
return new ElasticsearchWriter<>(
8789
hosts,
8890
emitter,

flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.api.common.ExecutionConfig;
2323
import org.apache.flink.api.java.ClosureCleaner;
2424
import org.apache.flink.connector.base.DeliveryGuarantee;
25+
import org.apache.flink.connector.elasticsearch.NetworkClientConfig;
2526
import org.apache.flink.connector.elasticsearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
2627
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector;
2728
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultFailureHandler;

0 commit comments

Comments
 (0)