Skip to content

Commit 8df9637

Browse files
committed
[HTTP-37] - Add support for Flink 1.16. Modify CI/CD to run builds for Flink 1.15 and 1.16.
Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
1 parent 2666971 commit 8df9637

File tree

14 files changed

+223
-14
lines changed

14 files changed

+223
-14
lines changed

.github/workflows/build.yml

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ env:
1616
jobs:
1717
build:
1818
runs-on: ubuntu-latest
19+
strategy:
20+
matrix:
21+
flink: [ "1.15.0", "1.15.3", "1.16.1" ]
1922
steps:
2023
- uses: actions/checkout@v3
2124

@@ -26,23 +29,24 @@ jobs:
2629
distribution: 'adopt'
2730
cache: maven
2831

29-
- name: Build
30-
run: mvn $MAVEN_CLI_OPTS $JAVA_ADDITIONAL_OPTS compile
32+
- name: Build for Flink ${{ matrix.flink }}
33+
run: mvn $MAVEN_CLI_OPTS $JAVA_ADDITIONAL_OPTS -Dflink.version=${{ matrix.flink }} compile
3134

32-
- name: Tests
35+
- name: Tests for Flink ${{ matrix.flink }}
3336
run: |
34-
mvn $MAVEN_CLI_OPTS $JAVA_ADDITIONAL_OPTS test integration-test
37+
mvn $MAVEN_CLI_OPTS $JAVA_ADDITIONAL_OPTS -Dflink.version=${{ matrix.flink }} test integration-test
3538
cat target/site/jacoco/index.html | grep -o 'Total[^%]*%'
3639
3740
- name: Test JavaDoc
3841
run: mvn $MAVEN_CLI_OPTS $JAVA_ADDITIONAL_OPTS javadoc:javadoc
42+
if: startsWith(matrix.flink, '1.15.0')
3943

4044
- name: Add coverage to PR
4145
id: jacoco
4246
uses: madrapps/jacoco-report@v1.2
43-
if: github.event_name == 'pull_request'
4447
with:
4548
paths: ${{ github.workspace }}/target/site/jacoco/jacoco.xml
4649
token: ${{ secrets.GITHUB_TOKEN }}
4750
min-coverage-overall: 40
4851
min-coverage-changed-files: 60
52+
if: startsWith(matrix.flink, '1.15.0')

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
# Changelog
22

33
## [Unreleased]
4+
- Add support for Flink 1.16.
5+
- Add [SchemaLifecycleAwareElementConverter](src/main/java/com/getindata/connectors/http/SchemaLifecycleAwareElementConverter.java) that can be used for createing
6+
schema lifecycle aware Element converters for Http Sink.
47

58
## [0.8.1] - 2022-12-22
69

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ under the License.
6868

6969
<!-- IMPORTANT: If you update Flink, remember to update link to its docs in maven-javadoc-plugin <links>
7070
section, omitting the patch part (so for 1.15.0 use 1.15). -->
71-
<flink.version>1.15.0</flink.version>
71+
<flink.version>[1.15.0,)</flink.version>
7272

7373
<target.java.version>11</target.java.version>
7474
<scala.binary.version>2.12</scala.binary.version>

src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,14 +114,29 @@ public HttpSinkBuilder<InputT> setSinkHttpClientBuilder(
114114
/**
115115
* @param elementConverter the {@link ElementConverter} to be used for the sink
116116
* @return {@link HttpSinkBuilder} itself
117+
* @deprecated Converters set by this method might not work properly for Flink 1.16+. Use {@link
118+
* #setElementConverter(SchemaLifecycleAwareElementConverter)} instead.
117119
*/
120+
@Deprecated
118121
@PublicEvolving
119122
public HttpSinkBuilder<InputT> setElementConverter(
120123
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter) {
121124
this.elementConverter = elementConverter;
122125
return this;
123126
}
124127

128+
/**
129+
* @param elementConverter the {@link SchemaLifecycleAwareElementConverter} to be used for the
130+
* sink
131+
* @return {@link HttpSinkBuilder} itself
132+
*/
133+
@PublicEvolving
134+
public HttpSinkBuilder<InputT> setElementConverter(
135+
SchemaLifecycleAwareElementConverter<InputT, HttpSinkRequestEntry> elementConverter) {
136+
this.elementConverter = elementConverter;
137+
return this;
138+
}
139+
125140
public HttpSinkBuilder<InputT> setHttpPostRequestCallback(
126141
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback) {
127142
this.httpPostRequestCallback = httpPostRequestCallback;
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.getindata.connectors.http;
2+
3+
import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
4+
import org.apache.flink.api.connector.sink2.Sink.InitContext;
5+
import org.apache.flink.connector.base.sink.writer.ElementConverter;
6+
7+
/**
8+
* An enhancement for Flink's {@link ElementConverter} that expose {@link #open(InitContext)} method
9+
* that will be called by HTTP connect code to ensure that element converter is initialized
10+
* properly. This is required for cases when Flink's SerializationSchema and DeserializationSchema
11+
* objects like JsonRowDataSerializationSchema are used.
12+
* <p>
13+
* This interface specifies the mapping between elements of a stream to request entries that can be
14+
* sent to the destination. The mapping is provided by the end-user of a sink, not the sink
15+
* creator.
16+
*
17+
* <p>The request entries contain all relevant information required to create and sent the actual
18+
* request. Eg, for Kinesis Data Streams, the request entry includes the payload and the partition
19+
* key.
20+
*/
21+
public interface SchemaLifecycleAwareElementConverter<InputT, RequestEntryT>
22+
extends ElementConverter<InputT, RequestEntryT> {
23+
24+
/**
25+
* Initialization element converter for the schema.
26+
*
27+
* <p>The provided {@link InitializationContext} can be used to access additional features such
28+
* as e.g. registering user metrics.
29+
*
30+
* @param context Contextual information that can be used during initialization.
31+
*/
32+
void open(InitContext context);
33+
34+
}

src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.flink.util.StringUtils;
1414

1515
import com.getindata.connectors.http.HttpPostRequestCallback;
16+
import com.getindata.connectors.http.SchemaLifecycleAwareElementConverter;
1617
import com.getindata.connectors.http.internal.HeaderPreprocessor;
1718
import com.getindata.connectors.http.internal.SinkHttpClientBuilder;
1819

@@ -110,8 +111,14 @@ protected HttpSinkInternal(
110111
public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> createWriter(
111112
InitContext context) throws IOException {
112113

114+
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter = getElementConverter();
115+
if (elementConverter instanceof SchemaLifecycleAwareElementConverter) {
116+
// This cast is needed for Flink 1.15.3 build
117+
((SchemaLifecycleAwareElementConverter<?, ?>) elementConverter).open(context);
118+
}
119+
113120
return new HttpSinkWriter<>(
114-
getElementConverter(),
121+
elementConverter,
115122
context,
116123
getMaxBatchSize(),
117124
getMaxInFlightRequests(),
@@ -128,8 +135,8 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> cr
128135

129136
@Override
130137
public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> restoreWriter(
131-
InitContext context,
132-
Collection<BufferedRequestState<HttpSinkRequestEntry>> recoveredState)
138+
InitContext context,
139+
Collection<BufferedRequestState<HttpSinkRequestEntry>> recoveredState)
133140
throws IOException {
134141

135142
return new HttpSinkWriter<>(
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.getindata.connectors.http.internal.table;
2+
3+
import org.apache.flink.api.common.serialization.SerializationSchema;
4+
import org.apache.flink.api.connector.sink2.Sink.InitContext;
5+
import org.apache.flink.api.connector.sink2.SinkWriter.Context;
6+
import org.apache.flink.table.data.RowData;
7+
import org.apache.flink.util.FlinkRuntimeException;
8+
9+
import com.getindata.connectors.http.SchemaLifecycleAwareElementConverter;
10+
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
11+
12+
public class SerializationSchemaElementConverter
13+
implements SchemaLifecycleAwareElementConverter<RowData, HttpSinkRequestEntry> {
14+
15+
private final String insertMethod;
16+
17+
private final SerializationSchema<RowData> serializationSchema;
18+
19+
private boolean schemaOpened = false;
20+
21+
public SerializationSchemaElementConverter(
22+
String insertMethod,
23+
SerializationSchema<RowData> serializationSchema) {
24+
25+
this.insertMethod = insertMethod;
26+
this.serializationSchema = serializationSchema;
27+
}
28+
29+
@Override
30+
public void open(InitContext context) {
31+
if (!schemaOpened) {
32+
try {
33+
serializationSchema.open(context.asSerializationSchemaInitializationContext());
34+
schemaOpened = true;
35+
} catch (Exception e) {
36+
throw new FlinkRuntimeException("Failed to initialize serialization schema.", e);
37+
}
38+
}
39+
}
40+
41+
@Override
42+
public HttpSinkRequestEntry apply(RowData rowData, Context context) {
43+
return new HttpSinkRequestEntry(
44+
insertMethod,
45+
serializationSchema.serialize(rowData));
46+
}
47+
}

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.getindata.connectors.http.internal.PollingClient;
1717
import com.getindata.connectors.http.internal.PollingClientFactory;
18+
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;
1819

1920
@Slf4j
2021
public class HttpTableLookupFunction extends TableFunction<RowData> {
@@ -50,6 +51,11 @@ public HttpTableLookupFunction(
5051
@Override
5152
public void open(FunctionContext context) throws Exception {
5253
super.open(context);
54+
55+
this.responseSchemaDecoder.open(
56+
SerializationSchemaUtils
57+
.createDeserializationInitContext(HttpTableLookupFunction.class));
58+
5359
this.localHttpCallCounter = new AtomicInteger(0);
5460
this.client = pollingClientFactory
5561
.createPollClient(options, responseSchemaDecoder);

src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonQueryCreator.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44

55
import org.apache.flink.api.common.serialization.SerializationSchema;
66
import org.apache.flink.table.data.RowData;
7+
import org.apache.flink.util.FlinkRuntimeException;
78

89
import com.getindata.connectors.http.LookupQueryCreator;
10+
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;
911

1012
/**
1113
* A {@link LookupQueryCreator} that builds Json based body for REST requests, i.e. adds
@@ -17,6 +19,8 @@ public class GenericJsonQueryCreator implements LookupQueryCreator {
1719
*/
1820
private final SerializationSchema<RowData> jsonSerialization;
1921

22+
private boolean schemaOpened = false;
23+
2024
public GenericJsonQueryCreator(SerializationSchema<RowData> jsonSerialization) {
2125

2226
this.jsonSerialization = jsonSerialization;
@@ -30,6 +34,22 @@ public GenericJsonQueryCreator(SerializationSchema<RowData> jsonSerialization) {
3034
*/
3135
@Override
3236
public String createLookupQuery(RowData lookupDataRow) {
37+
checkOpened();
3338
return new String(jsonSerialization.serialize(lookupDataRow), StandardCharsets.UTF_8);
3439
}
40+
41+
private void checkOpened() {
42+
if (!schemaOpened) {
43+
try {
44+
jsonSerialization.open(
45+
SerializationSchemaUtils
46+
.createSerializationInitContext(GenericJsonQueryCreator.class));
47+
} catch (Exception e) {
48+
throw new FlinkRuntimeException(
49+
"Failed to initialize serialization schema for GenericJsonQueryCreatorFactory.",
50+
e);
51+
}
52+
schemaOpened = true;
53+
}
54+
}
3555
}

src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.getindata.connectors.http.HttpSinkBuilder;
2323
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
2424
import com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient;
25+
import com.getindata.connectors.http.internal.table.SerializationSchemaElementConverter;
2526
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
2627
import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.INSERT_METHOD;
2728
import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.URL;
@@ -128,10 +129,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
128129
.setHttpPostRequestCallback(httpPostRequestCallback)
129130
// In future header preprocessor could be set via custom factory
130131
.setHttpHeaderPreprocessor(HttpHeaderUtils.createDefaultHeaderPreprocessor())
131-
.setElementConverter((rowData, _context) -> new HttpSinkRequestEntry(
132-
insertMethod,
133-
serializationSchema.serialize(rowData)
134-
))
132+
.setElementConverter(
133+
new SerializationSchemaElementConverter(insertMethod, serializationSchema))
135134
.setProperties(properties);
136135
addAsyncOptionsToSinkBuilder(builder);
137136

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.getindata.connectors.http.internal.utils;
2+
3+
import org.apache.flink.metrics.MetricGroup;
4+
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
5+
import org.apache.flink.util.SimpleUserCodeClassLoader;
6+
import org.apache.flink.util.UserCodeClassLoader;
7+
8+
public final class SerializationSchemaUtils {
9+
10+
private SerializationSchemaUtils() {
11+
12+
}
13+
14+
public static <T> org.apache.flink.api.common.serialization.SerializationSchema
15+
.InitializationContext createSerializationInitContext(Class<T> classForClassLoader) {
16+
17+
return new org.apache.flink.api.common.serialization.SerializationSchema
18+
.InitializationContext() {
19+
20+
@Override
21+
public MetricGroup getMetricGroup() {
22+
return new UnregisteredMetricsGroup();
23+
}
24+
25+
@Override
26+
public UserCodeClassLoader getUserCodeClassLoader() {
27+
return SimpleUserCodeClassLoader.create(classForClassLoader.getClassLoader());
28+
}
29+
};
30+
}
31+
32+
public static <T> org.apache.flink.api.common.serialization.DeserializationSchema
33+
.InitializationContext createDeserializationInitContext(Class<T> classForClassLoader) {
34+
35+
return new org.apache.flink.api.common.serialization.DeserializationSchema
36+
.InitializationContext() {
37+
38+
@Override
39+
public MetricGroup getMetricGroup() {
40+
return new UnregisteredMetricsGroup();
41+
}
42+
43+
@Override
44+
public UserCodeClassLoader getUserCodeClassLoader() {
45+
return SimpleUserCodeClassLoader.create(classForClassLoader.getClassLoader());
46+
}
47+
};
48+
}
49+
50+
}

src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
1414
import org.apache.flink.connector.base.sink.writer.ElementConverter;
1515
import org.apache.flink.metrics.Counter;
16+
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
1617
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
1718
import org.junit.jupiter.api.BeforeEach;
1819
import org.junit.jupiter.api.Test;
@@ -42,15 +43,20 @@ class HttpSinkWriterTest {
4243
@Mock
4344
private SinkHttpClient httpClient;
4445

45-
@Mock
46+
// To work with Flink 1.15 and Flink 1.16
47+
@Mock(lenient = true)
4648
private SinkWriterMetricGroup metricGroup;
4749

50+
@Mock
51+
private OperatorIOMetricGroup operatorIOMetricGroup;
52+
4853
@Mock
4954
private Counter errorCounter;
5055

5156
@BeforeEach
5257
public void setUp() {
5358
when(metricGroup.getNumRecordsSendErrorsCounter()).thenReturn(errorCounter);
59+
when(metricGroup.getIOMetricGroup()).thenReturn(operatorIOMetricGroup);
5460
when(context.metricGroup()).thenReturn(metricGroup);
5561

5662
Collection<BufferedRequestState<HttpSinkRequestEntry>> stateBuffer = new ArrayList<>();

src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import com.getindata.connectors.http.internal.table.lookup.querycreators.GenericGetQueryCreator;
5050
import com.getindata.connectors.http.internal.table.lookup.querycreators.GenericJsonQueryCreator;
5151
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
52+
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;
5253
import static com.getindata.connectors.http.TestHelper.readTestFile;
5354
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSourceFactory.row;
5455

@@ -368,6 +369,14 @@ private JavaNetHttpPollingClient setUpPollingClient(
368369
.createDecodingFormat(dynamicTableFactoryContext, new Configuration())
369370
.createRuntimeDecoder(dynamicTableSourceContext, physicalDataType);
370371

372+
try {
373+
schemaDecoder.open(
374+
SerializationSchemaUtils.createDeserializationInitContext(
375+
JavaNetHttpPollingClientConnectionTest.class));
376+
} catch (Exception e) {
377+
throw new RuntimeException("Unable to open schema decoder: " + e.getMessage(), e);
378+
}
379+
371380
JavaNetHttpPollingClientFactory pollingClientFactory =
372381
new JavaNetHttpPollingClientFactory(requestFactory);
373382

0 commit comments

Comments
 (0)