Skip to content

Commit c2716f0

Browse files
authored
1ESP-119_Http_headers_SourceSupport - Add Http header support for Http Lookup Source. (#21)
Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com> Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
1 parent 2f082fc commit c2716f0

14 files changed

+237
-66
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
- Add to Http Sink a new properties `gid.connector.http.sink.error.code` and `gid.connector.http.sink.error.code.exclude`
1010
to set HTTP status code that should be interpreted as errors.
1111
- Use Flink's format support to Http Lookup Source.
12+
- Add HTTP Lookup source client header configuration via properties.
1213

1314
### Changed
1415
- Change dependency scope for `org.apache.flink.flink-connector-base` from `compile` to `provided`.

README.md

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,26 +40,30 @@ Flink SQL table definition:
4040
Enrichment Lookup Table
4141
```roomsql
4242
CREATE TABLE Customers (
43-
id STRING,
44-
id2 STRING,
45-
msg STRING,
46-
uuid STRING,
47-
details ROW<
48-
isActive BOOLEAN,
49-
nestedDetails ROW<
50-
balance STRING
51-
>
52-
>
43+
id STRING,
44+
id2 STRING,
45+
msg STRING,
46+
uuid STRING,
47+
details ROW<
48+
isActive BOOLEAN,
49+
nestedDetails ROW<
50+
balance STRING
51+
>
52+
>
5353
) WITH (
5454
'connector' = 'rest-lookup',
5555
'format' = 'json',
5656
'url' = 'http://localhost:8080/client',
5757
'asyncPolling' = 'true'
5858
)
5959
```
60+
6061
Data Source Table
6162
```roomsql
62-
CREATE TABLE Orders (id STRING, id2 STRING, proc_time AS PROCTIME()
63+
CREATE TABLE Orders (
64+
id STRING,
65+
id2 STRING,
66+
proc_time AS PROCTIME()
6367
) WITH (
6468
'connector' = 'datagen',
6569
'rows-per-second' = '1',
@@ -70,8 +74,9 @@ CREATE TABLE Orders (id STRING, id2 STRING, proc_time AS PROCTIME()
7074
'fields.id2.start' = '2',
7175
'fields.id2.end' = '120'
7276
);
77+
```
7378

74-
Using _Customers_ table in Flink SQL Lookup Join with Orders table:
79+
Using _Customers_ table in Flink SQL Lookup Join with _Orders_ table:
7580

7681
```roomsql
7782
SELECT o.id, o.id2, c.msg, c.uuid, c.isActive, c.balance FROM Orders AS o
@@ -83,6 +88,31 @@ For Example:
8388
``
8489
http://localhost:8080/client/service?id=1&uuid=2
8590
``
91+
#### Http headers
92+
It is possible to set HTTP headers that will be added to HTTP request send by lookup source connector.
93+
Headers are defined via property key `gid.connector.http.source.lookup.header.HEADER_NAME = header value` for example:
94+
`gid.connector.http.source.lookup.header.X-Content-Type-Options = nosniff`.
95+
96+
Headers can be set using http lookup source table DDL. In example below, HTTP request done for `http-lookup` table will contain three headers:
97+
- `Origin`
98+
- `X-Content-Type-Options`
99+
- `Content-Type`
100+
101+
```roomsql
102+
CREATE TABLE http-lookup (
103+
id bigint,
104+
some_field string
105+
) WITH (
106+
'connector' = 'rest-lookup',
107+
'format' = 'json',
108+
'url' = 'http://localhost:8080/client',
109+
'asyncPolling' = 'true',
110+
'gid.connector.http.source.lookup.header.Origin' = '*',
111+
'gid.connector.http.source.lookup.header.X-Content-Type-Options' = 'nosniff',
112+
'gid.connector.http.source.lookup.header.Content-Type' = 'application/json'
113+
)
114+
```
115+
86116

87117
### HTTP Sink
88118
The following example shows the minimum Table API example to create a [HttpDynamicSink](src/main/java/com/getindata/connectors/http/internal/table/HttpDynamicSink.java) that writes JSON values to an HTTP endpoint using POST method, assuming Flink has JAR of [JSON serializer](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/json/) installed:
@@ -108,7 +138,7 @@ Due to the fact that `HttpSink` sends bytes inside HTTP request's body, one can
108138

109139
Other examples of usage of the Table API can be found in [some tests](src/test/java/com/getindata/connectors/http/table/HttpDynamicSinkInsertTest.java).
110140

111-
#### Http headers (currently supported only for HTTP Sink)
141+
#### Http headers
112142
It is possible to set HTTP headers that will be added to HTTP request send by sink connector.
113143
Headers are defined via property key `gid.connector.http.sink.header.HEADER_NAME = header value` for example:
114144
`gid.connector.http.sink.header.X-Content-Type-Options = nosniff`.

src/main/java/com/getindata/connectors/http/internal/SinkHttpClient.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ public interface SinkHttpClient {
2121
* @return the new {@link CompletableFuture} wrapping {@link SinkHttpClientResponse} that
2222
* completes when all requests have been sent and returned their statuses
2323
*/
24-
CompletableFuture<SinkHttpClientResponse> putRequests(List<HttpSinkRequestEntry> requestEntries,
25-
String endpointUrl);
24+
CompletableFuture<SinkHttpClientResponse> putRequests(
25+
List<HttpSinkRequestEntry> requestEntries,
26+
String endpointUrl
27+
);
2628
}

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ public final class HttpConnectorConfigConstants {
2323
*/
2424
public static final String SINK_HEADER_PREFIX = GID_CONNECTOR_HTTP + "sink.header.";
2525

26+
public static final String LOOKUP_SOURCE_HEADER_PREFIX = GID_CONNECTOR_HTTP
27+
+ "source.lookup.header.";
28+
2629
// Error code handling configuration.
2730
public static final String HTTP_ERROR_CODE_WHITE_LIST =
2831
GID_CONNECTOR_HTTP + "sink.error.code.exclude";

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.io.Serializable;
44
import java.util.Collections;
55
import java.util.List;
6+
import java.util.Properties;
67

78
import lombok.Builder;
89
import lombok.Data;
@@ -20,4 +21,7 @@ public class HttpLookupConfig implements Serializable {
2021

2122
@Builder.Default
2223
private final boolean useAsync = false;
24+
25+
@Builder.Default
26+
private final Properties properties = new Properties();
2327
}

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,14 @@
22

33
import java.util.Arrays;
44
import java.util.Collections;
5-
import java.util.HashSet;
65
import java.util.List;
6+
import java.util.Properties;
77
import java.util.Set;
88
import java.util.function.Predicate;
99
import java.util.stream.Collectors;
1010

1111
import org.apache.flink.api.common.serialization.DeserializationSchema;
1212
import org.apache.flink.configuration.ConfigOption;
13-
import org.apache.flink.configuration.ConfigOptions;
14-
import org.apache.flink.configuration.Configuration;
1513
import org.apache.flink.configuration.ReadableConfig;
1614
import org.apache.flink.table.api.DataTypes;
1715
import org.apache.flink.table.api.DataTypes.Field;
@@ -28,6 +26,8 @@
2826
import static org.apache.flink.table.types.utils.DataTypeUtils.removeTimeAttribute;
2927

3028
import com.getindata.connectors.http.internal.PollingClientFactory;
29+
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
30+
import com.getindata.connectors.http.internal.utils.ConfigUtils;
3131
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.ASYNC_POLLING;
3232
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.URL;
3333
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.URL_ARGS;
@@ -49,21 +49,20 @@ public static DataType row(List<Field> fields) {
4949

5050
@Override
5151
public DynamicTableSource createDynamicTableSource(Context context) {
52-
final FactoryUtil.TableFactoryHelper helper =
52+
FactoryUtil.TableFactoryHelper helper =
5353
FactoryUtil.createTableFactoryHelper(this, context);
5454

5555
ReadableConfig readableConfig = helper.getOptions();
56-
57-
validateOptions(context, readableConfig);
56+
helper.validateExcept(HttpConnectorConfigConstants.GID_CONNECTOR_HTTP);
5857

5958
DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
6059
helper.discoverDecodingFormat(
6160
DeserializationFormatFactory.class,
6261
FactoryUtil.FORMAT
6362
);
6463

65-
PollingClientFactory<RowData> pollingClientFactory = new RestTablePollingClientFactory();
66-
HttpLookupConfig lookupConfig = getHttpLookupOptions(readableConfig);
64+
PollingClientFactory<RowData> pollingClientFactory = new JavaNetHttpPollingClientFactory();
65+
HttpLookupConfig lookupConfig = getHttpLookupOptions(context, readableConfig);
6766

6867
ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
6968

@@ -93,32 +92,19 @@ public Set<ConfigOption<?>> optionalOptions() {
9392
return Set.of(URL_ARGS, ASYNC_POLLING);
9493
}
9594

96-
private HttpLookupConfig getHttpLookupOptions(ReadableConfig config) {
95+
private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig config) {
96+
97+
Properties httpConnectorProperties =
98+
ConfigUtils.getHttpConnectorProperties(context.getCatalogTable().getOptions());
99+
97100
return HttpLookupConfig.builder()
98101
.url(config.get(URL))
99102
.arguments(convertArguments(config.get((URL_ARGS))))
100103
.useAsync(config.get(ASYNC_POLLING))
104+
.properties(httpConnectorProperties)
101105
.build();
102106
}
103107

104-
// TODO FIX There is something ugly here. Verify with DataGenTableSourceFactory and refactor.
105-
private void validateOptions(Context context, ReadableConfig readableConfig) {
106-
107-
Set<ConfigOption<?>> allOptions = new HashSet<>();
108-
allOptions.addAll(optionalOptions());
109-
allOptions.addAll(requiredOptions());
110-
allOptions.add(ConfigOptions.key("connector").stringType().noDefaultValue());
111-
112-
FactoryUtil.validateFactoryOptions(requiredOptions(), allOptions, readableConfig);
113-
114-
Configuration options = new Configuration();
115-
context.getCatalogTable().getOptions().forEach(options::setString);
116-
Set<String> consumedOptionKeys = new HashSet<>();
117-
allOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
118-
FactoryUtil.validateUnconsumedKeys(factoryIdentifier(), options.keySet(),
119-
consumedOptionKeys);
120-
}
121-
122108
private List<String> convertArguments(String arguments) {
123109

124110
if (arguments == null || arguments.isEmpty() || arguments.isBlank()) {

src/main/java/com/getindata/connectors/http/internal/table/lookup/RestTablePollingClient.java renamed to src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,34 +5,58 @@
55
import java.net.URISyntaxException;
66
import java.net.http.HttpClient;
77
import java.net.http.HttpRequest;
8+
import java.net.http.HttpRequest.Builder;
89
import java.net.http.HttpResponse;
910
import java.net.http.HttpResponse.BodyHandlers;
1011
import java.time.Duration;
12+
import java.util.Arrays;
1113
import java.util.List;
14+
import java.util.Map;
1215
import java.util.Optional;
1316

14-
import lombok.RequiredArgsConstructor;
1517
import lombok.extern.slf4j.Slf4j;
18+
import org.apache.flink.annotation.VisibleForTesting;
1619
import org.apache.flink.api.common.serialization.DeserializationSchema;
1720
import org.apache.flink.table.data.RowData;
1821

1922
import com.getindata.connectors.http.internal.PollingClient;
23+
import com.getindata.connectors.http.internal.utils.ConfigUtils;
2024
import com.getindata.connectors.http.internal.utils.uri.URIBuilder;
25+
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_PREFIX;
2126

2227
/**
2328
* An implementation of {@link PollingClient} that uses Java 11's {@link HttpClient}.
2429
* This implementation supports HTTP traffic only.
2530
*/
2631
@Slf4j
27-
@RequiredArgsConstructor
28-
public class RestTablePollingClient implements PollingClient<RowData> {
32+
public class JavaNetHttpPollingClient implements PollingClient<RowData> {
2933

3034
private final HttpClient httpClient;
3135

36+
public JavaNetHttpPollingClient(
37+
HttpClient httpClient,
38+
DeserializationSchema<RowData> runtimeDecoder,
39+
HttpLookupConfig options) {
40+
41+
this.httpClient = httpClient;
42+
this.runtimeDecoder = runtimeDecoder;
43+
this.options = options;
44+
45+
Map<String, String> headerMap = ConfigUtils.propertiesToMap(
46+
options.getProperties(),
47+
LOOKUP_SOURCE_HEADER_PREFIX,
48+
String.class
49+
);
50+
51+
this.headersAndValues = ConfigUtils.toHeaderAndValueArray(headerMap);
52+
}
53+
3254
private final DeserializationSchema<RowData> runtimeDecoder;
3355

3456
private final HttpLookupConfig options;
3557

58+
private final String[] headersAndValues;
59+
3660
@Override
3761
public Optional<RowData> pull(List<LookupArg> lookupArgs) {
3862
try {
@@ -45,14 +69,25 @@ public Optional<RowData> pull(List<LookupArg> lookupArgs) {
4569

4670
// TODO Add Retry Policy And configure TimeOut from properties
4771
private Optional<RowData> queryAndProcess(List<LookupArg> params) throws Exception {
48-
URI uri = buildUri(params);
49-
HttpRequest request =
50-
HttpRequest.newBuilder().uri(uri).GET().timeout(Duration.ofMinutes(2)).build();
51-
HttpResponse<String> response = httpClient.send(request, BodyHandlers.ofString());
5272

73+
HttpRequest request = buildHttpRequest(params);
74+
HttpResponse<String> response = httpClient.send(request, BodyHandlers.ofString());
5375
return processHttpResponse(response);
5476
}
5577

78+
private HttpRequest buildHttpRequest(List<LookupArg> params) throws URISyntaxException {
79+
URI uri = buildUri(params);
80+
Builder requestBuilder = HttpRequest.newBuilder()
81+
.uri(uri).GET()
82+
.timeout(Duration.ofMinutes(2));
83+
84+
if (headersAndValues.length != 0) {
85+
requestBuilder.headers(headersAndValues);
86+
}
87+
88+
return requestBuilder.build();
89+
}
90+
5691
private URI buildUri(List<LookupArg> params) throws URISyntaxException {
5792

5893
URIBuilder uriBuilder = new URIBuilder(options.getUrl());
@@ -78,4 +113,9 @@ private Optional<RowData> processHttpResponse(HttpResponse<String> response)
78113
return Optional.empty();
79114
}
80115
}
116+
117+
@VisibleForTesting
118+
String[] getHeadersAndValues() {
119+
return Arrays.copyOf(headersAndValues, headersAndValues.length);
120+
}
81121
}
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,25 @@
11
package com.getindata.connectors.http.internal.table.lookup;
22

33
import java.net.http.HttpClient;
4+
import java.net.http.HttpClient.Redirect;
45

56
import org.apache.flink.api.common.serialization.DeserializationSchema;
67
import org.apache.flink.table.data.RowData;
78

89
import com.getindata.connectors.http.internal.PollingClient;
910
import com.getindata.connectors.http.internal.PollingClientFactory;
1011

11-
public class RestTablePollingClientFactory implements PollingClientFactory<RowData> {
12+
public class JavaNetHttpPollingClientFactory implements PollingClientFactory<RowData> {
1213

1314
@Override
1415
public PollingClient<RowData> createPollClient(
1516
HttpLookupConfig options,
1617
DeserializationSchema<RowData> schemaDecoder) {
1718

18-
return new RestTablePollingClient(HttpClient.newHttpClient(), schemaDecoder, options);
19+
HttpClient httpClient = HttpClient.newBuilder()
20+
.followRedirects(Redirect.NORMAL)
21+
.build();
22+
23+
return new JavaNetHttpPollingClient(httpClient, schemaDecoder, options);
1924
}
2025
}

src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,15 @@ public class HttpDynamicTableSinkFactory extends AsyncDynamicTableSinkFactory {
2525
@Override
2626
public DynamicTableSink createDynamicTableSink(Context context) {
2727
final AsyncDynamicSinkContext factoryContext = new AsyncDynamicSinkContext(this, context);
28+
29+
// This is actually same as calling helper.getOptions();
2830
ReadableConfig tableOptions = factoryContext.getTableOptions();
2931

3032
// Validate configuration
3133
FactoryUtil.createTableFactoryHelper(this, context)
3234
.validateExcept(HttpConnectorConfigConstants.GID_CONNECTOR_HTTP);
3335
validateHttpSinkOptions(tableOptions);
36+
3437
Properties asyncSinkProperties =
3538
new AsyncSinkConfigurationValidator(tableOptions).getValidatedConfigurations();
3639

src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public void setUp() {
4646
}
4747

4848
@Test
49-
public void shouldBuildClientWithNoHeader() {
49+
public void shouldBuildClientWithoutHeaders() {
5050

5151
JavaNetSinkHttpClient client = new JavaNetSinkHttpClient(new Properties());
5252
assertThat(client.getHeadersAndValues()).isEmpty();

0 commit comments

Comments
 (0)