Skip to content

Commit bb9b929

Browse files
authored
Merge pull request #27 from getindata/adapt-source-to-elastic
Adapt Http Lookup Source to Elastic
2 parents ba68043 + 15ff63a commit bb9b929

30 files changed

+389
-35
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,15 @@ New properties are:
1313
- `gid.connector.http.security.cert.client` - path to connector's certificate.
1414
- `gid.connector.http.security.key.client` - path to connector's private key.
1515
- `gid.connector.http.security.cert.server.allowSelfSigned` - allowing for self-signed certificates without adding them to KeyStore (not recommended for a production).
16+
- Add [LookupQueryCreator](src/main/java/com/getindata/connectors/http/LookupQueryCreator.java) and
17+
[LookupQueryCreatorFactory](src/main/java/com/getindata/connectors/http/LookupQueryCreatorFactory.java) interfaces
18+
(along with a "default"
19+
[GenericGetQueryCreator](src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericGetQueryCreator.java)
20+
implementation) for customization of queries prepared by Lookup Source for its HTTP requests.
21+
- Add [ElasticSearchLiteQueryCreator](src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/ElasticSearchLiteQueryCreator.java)
22+
that prepares [`q` parameter query](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html#search-api-query-params-q)
23+
using Lucene query string syntax (in first versions of ElasticSearch called
24+
[Search _Lite_](https://www.elastic.co/guide/en/elasticsearch/guide/current/search-lite.html)).
1625

1726
## [0.4.0] - 2022-08-31
1827

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,18 @@ CREATE TABLE http-lookup (
113113
)
114114
```
115115

116+
#### Custom REST query
117+
Http Lookup Source builds queries out of `JOIN` clauses. One can customize how those queries are built by implementing
118+
[LookupQueryCreator](src/main/java/com/getindata/connectors/http/LookupQueryCreator.java) and
119+
[LookupQueryCreatorFactory](src/main/java/com/getindata/connectors/http/LookupQueryCreatorFactory.java) interfaces.
120+
Custom implementations of `LookupQueryCreatorFactory` can be registered along other factories in
121+
`resources/META-INF.services/org.apache.flink.table.factories.Factory` file and then referenced by their identifiers in
122+
the Http Lookup Source DDL property field `gid.connector.http.source.lookup.query-creator`.
123+
124+
A default implementation that builds an "ordinary" GET query, i.e. adds `?joinColumn1=value1&joinColumn2=value2&...`
125+
to the URI of the endpoint,
126+
([GenericGetQueryCreator](src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericGetQueryCreator.java))
127+
is provided and set as a default.
116128

117129
### HTTP Sink
118130
The following example shows the minimum Table API example to create a [HttpDynamicSink](src/main/java/com/getindata/connectors/http/internal/table/HttpDynamicSink.java) that writes JSON values to an HTTP endpoint using POST method, assuming Flink has JAR of [JSON serializer](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/json/) installed:
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.getindata.connectors.http.internal.table.lookup;
1+
package com.getindata.connectors.http;
22

33
import lombok.Data;
44
import lombok.RequiredArgsConstructor;
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.getindata.connectors.http;
2+
3+
import java.io.Serializable;
4+
import java.util.List;
5+
6+
/**
7+
* An interface for a creator of a lookup query in the Http Lookup Source (e.g., the query that
8+
* gets appended to the URI in GET request).
9+
*
10+
* <p>One can customize how those queries are built by implementing {@link LookupQueryCreator} and
11+
* {@link LookupQueryCreatorFactory}.
12+
*/
13+
public interface LookupQueryCreator extends Serializable {
14+
/**
15+
* Create a lookup query (like the query appended to path in GET request)
16+
* out of the provided arguments.
17+
*
18+
* @param params the list of {@link LookupArg} containing request parameters.
19+
* @return a lookup query.
20+
*/
21+
String createLookupQuery(List<LookupArg> params);
22+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.getindata.connectors.http;
2+
3+
import org.apache.flink.table.factories.Factory;
4+
5+
import com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSource;
6+
7+
/**
8+
* The {@link Factory} that dynamically creates and injects {@link LookupQueryCreator} to
9+
* {@link HttpLookupTableSource}.
10+
*
11+
* <p>Custom implementations of {@link LookupQueryCreatorFactory} can be registered along other
12+
* factories in <pre>resources/META-INF.services/org.apache.flink.table.factories.Factory</pre>
13+
* file and then referenced by their identifiers in the HttpLookupSource DDL property field
14+
* <i>gid.connector.http.source.lookup.query-creator</i>.
15+
*
16+
* <p>The following example shows the minimum Table API example to create a
17+
* {@link HttpLookupTableSource} that uses a custom query creator created by a factory that returns
18+
* <i>my-query-creator</i> as its identifier.
19+
*
20+
* <pre>{@code
21+
* CREATE TABLE http (
22+
* id bigint,
23+
* some_field string
24+
* ) WITH (
25+
* 'connector' = 'rest-lookup',
26+
* 'format' = 'json',
27+
* 'url' = 'http://example.com/myendpoint',
28+
* 'gid.connector.http.source.lookup.query-creator' = 'my-query-creator'
29+
* )
30+
* }</pre>
31+
*/
32+
public interface LookupQueryCreatorFactory extends Factory {
33+
/**
34+
* @return {@link LookupQueryCreator} custom lookup query creator instance
35+
*/
36+
LookupQueryCreator createLookupQueryCreator();
37+
}

src/main/java/com/getindata/connectors/http/internal/PollingClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import java.util.List;
44
import java.util.Optional;
55

6-
import com.getindata.connectors.http.internal.table.lookup.LookupArg;
6+
import com.getindata.connectors.http.LookupArg;
77

88
/**
99
* A client that is used to get enrichment data from external component.

src/main/java/com/getindata/connectors/http/internal/PollingClientFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44

55
import org.apache.flink.api.common.serialization.DeserializationSchema;
66

7+
import com.getindata.connectors.http.LookupQueryCreator;
78
import com.getindata.connectors.http.internal.table.lookup.HttpLookupConfig;
89

910
public interface PollingClientFactory<OUT> extends Serializable {
1011

1112
PollingClient<OUT> createPollClient(
1213
HttpLookupConfig options,
13-
DeserializationSchema<OUT> schemaDecoder
14+
DeserializationSchema<OUT> schemaDecoder,
15+
LookupQueryCreator lookupQueryCreator
1416
);
1517
}

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ public final class HttpConnectorConfigConstants {
4242
public static final String SINK_REQUEST_CALLBACK_IDENTIFIER =
4343
GID_CONNECTOR_HTTP + "sink.request-callback";
4444

45+
public static final String SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER =
46+
GID_CONNECTOR_HTTP + "source.lookup.query-creator";
47+
4548
// -------------- HTTPS security settings --------------
4649
public static final String ALLOW_SELF_SIGNED =
4750
GID_CONNECTOR_HTTP + "security.cert.server.allowSelfSigned";

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import org.apache.flink.configuration.ConfigOption;
44
import org.apache.flink.configuration.ConfigOptions;
55

6+
import com.getindata.connectors.http.internal.table.lookup.querycreators.GenericGetQueryCreatorFactory;
7+
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER;
8+
69
public class HttpLookupConnectorOptions {
710

811
public static final ConfigOption<String> URL =
@@ -22,4 +25,9 @@ public class HttpLookupConnectorOptions {
2225
.booleanType()
2326
.defaultValue(false)
2427
.withDescription("Whether to use Sync and Async polling mechanism");
28+
29+
public static final ConfigOption<String> LOOKUP_QUERY_CREATOR_IDENTIFIER =
30+
ConfigOptions.key(SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER)
31+
.stringType()
32+
.defaultValue(GenericGetQueryCreatorFactory.IDENTIFIER);
2533
}

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.flink.table.data.RowData;
1616
import org.apache.flink.table.types.DataType;
1717

18+
import com.getindata.connectors.http.LookupQueryCreator;
1819
import com.getindata.connectors.http.internal.PollingClientFactory;
1920
import com.getindata.connectors.http.internal.table.lookup.HttpTableLookupFunction.ColumnData;
2021

@@ -31,6 +32,8 @@ public class HttpLookupTableSource
3132

3233
private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
3334

35+
private final LookupQueryCreator lookupQueryCreator;
36+
3437
@Override
3538
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
3639
String[] keyNames = new String[context.getKeys().length];
@@ -50,7 +53,8 @@ public DynamicTableSource copy() {
5053
physicalRowDataType,
5154
pollingClientFactory,
5255
lookupConfig,
53-
decodingFormat
56+
decodingFormat,
57+
lookupQueryCreator
5458
);
5559
}
5660

@@ -80,6 +84,7 @@ private LookupRuntimeProvider buildLookupFunction(String[] keyNames, LookupConte
8084
.pollingClientFactory(pollingClientFactory)
8185
.schemaDecoder(schemaDecoder)
8286
.columnData(columnData)
87+
.lookupQueryCreator(lookupQueryCreator)
8388
.options(lookupConfig)
8489
.build();
8590

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,11 @@
2525
import static org.apache.flink.table.api.DataTypes.FIELD;
2626
import static org.apache.flink.table.types.utils.DataTypeUtils.removeTimeAttribute;
2727

28+
import com.getindata.connectors.http.LookupQueryCreatorFactory;
2829
import com.getindata.connectors.http.internal.PollingClientFactory;
2930
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
3031
import com.getindata.connectors.http.internal.utils.ConfigUtils;
31-
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.ASYNC_POLLING;
32-
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.URL;
33-
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.URL_ARGS;
32+
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.*;
3433

3534
public class HttpLookupTableSourceFactory implements DynamicTableSourceFactory {
3635

@@ -61,6 +60,13 @@ public DynamicTableSource createDynamicTableSource(Context context) {
6160
FactoryUtil.FORMAT
6261
);
6362

63+
final LookupQueryCreatorFactory lookupQueryCreatorFactory =
64+
FactoryUtil.discoverFactory(
65+
context.getClassLoader(),
66+
LookupQueryCreatorFactory.class,
67+
readableConfig.get(LOOKUP_QUERY_CREATOR_IDENTIFIER)
68+
);
69+
6470
PollingClientFactory<RowData> pollingClientFactory = new JavaNetHttpPollingClientFactory();
6571
HttpLookupConfig lookupConfig = getHttpLookupOptions(context, readableConfig);
6672

@@ -73,7 +79,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
7379
physicalRowDataType,
7480
pollingClientFactory,
7581
lookupConfig,
76-
decodingFormat
82+
decodingFormat,
83+
lookupQueryCreatorFactory.createLookupQueryCreator()
7784
);
7885
}
7986

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import org.apache.flink.table.functions.FunctionContext;
2020
import org.apache.flink.table.functions.TableFunction;
2121

22+
import com.getindata.connectors.http.LookupArg;
23+
import com.getindata.connectors.http.LookupQueryCreator;
2224
import com.getindata.connectors.http.internal.PollingClient;
2325
import com.getindata.connectors.http.internal.PollingClientFactory;
2426

@@ -29,6 +31,8 @@ public class HttpTableLookupFunction extends TableFunction<RowData> {
2931

3032
private final DeserializationSchema<RowData> schemaDecoder;
3133

34+
private final LookupQueryCreator lookupQueryCreator;
35+
3236
@Getter
3337
private final ColumnData columnData;
3438

@@ -44,19 +48,22 @@ private HttpTableLookupFunction(
4448
PollingClientFactory<RowData> pollingClientFactory,
4549
DeserializationSchema<RowData> schemaDecoder,
4650
ColumnData columnData,
47-
HttpLookupConfig options) {
51+
HttpLookupConfig options,
52+
LookupQueryCreator lookupQueryCreator) {
4853

4954
this.pollingClientFactory = pollingClientFactory;
5055
this.schemaDecoder = schemaDecoder;
5156
this.columnData = columnData;
5257
this.options = options;
58+
this.lookupQueryCreator = lookupQueryCreator;
5359
}
5460

5561
@Override
5662
public void open(FunctionContext context) throws Exception {
5763
super.open(context);
5864
this.localHttpCallCounter = new AtomicInteger(0);
59-
this.client = pollingClientFactory.createPollClient(options, schemaDecoder);
65+
this.client = pollingClientFactory
66+
.createPollClient(options, schemaDecoder, lookupQueryCreator);
6067

6168
context
6269
.getMetricGroup()

src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import org.apache.flink.table.data.RowData;
2020
import org.apache.flink.util.StringUtils;
2121

22+
import com.getindata.connectors.http.LookupArg;
23+
import com.getindata.connectors.http.LookupQueryCreator;
2224
import com.getindata.connectors.http.internal.HeaderPreprocessor;
2325
import com.getindata.connectors.http.internal.PollingClient;
2426
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
@@ -43,17 +45,21 @@ public class JavaNetHttpPollingClient implements PollingClient<RowData> {
4345

4446
private final HttpLookupConfig options;
4547

48+
private final LookupQueryCreator lookupQueryCreator;
49+
4650
private final String[] headersAndValues;
4751

4852
public JavaNetHttpPollingClient(
4953
HttpClient httpClient,
5054
DeserializationSchema<RowData> runtimeDecoder,
5155
HttpLookupConfig options,
56+
LookupQueryCreator lookupQueryCreator,
5257
HeaderPreprocessor headerPreprocessor) {
5358

5459
this.httpClient = httpClient;
5560
this.runtimeDecoder = runtimeDecoder;
5661
this.options = options;
62+
this.lookupQueryCreator = lookupQueryCreator;
5763

5864
var headerMap = HttpHeaderUtils
5965
.prepareHeaderMap(
@@ -96,7 +102,9 @@ private Optional<RowData> queryAndProcess(List<LookupArg> params) throws Excepti
96102
}
97103

98104
private HttpRequest buildHttpRequest(List<LookupArg> params) throws URISyntaxException {
99-
URI uri = buildUri(params);
105+
var lookupQuery = lookupQueryCreator.createLookupQuery(params);
106+
URI uri = new URIBuilder(options.getUrl() + "?" + lookupQuery).build();
107+
100108
Builder requestBuilder = HttpRequest.newBuilder()
101109
.uri(uri).GET()
102110
.timeout(Duration.ofMinutes(2));
@@ -108,16 +116,6 @@ private HttpRequest buildHttpRequest(List<LookupArg> params) throws URISyntaxExc
108116
return requestBuilder.build();
109117
}
110118

111-
private URI buildUri(List<LookupArg> params) throws URISyntaxException {
112-
113-
URIBuilder uriBuilder = new URIBuilder(options.getUrl());
114-
for (LookupArg arg : params) {
115-
uriBuilder.addParameter(arg.getArgName(), arg.getArgValue());
116-
}
117-
118-
return uriBuilder.build();
119-
}
120-
121119
private Optional<RowData> processHttpResponse(
122120
HttpResponse<String> response,
123121
HttpRequest request) throws IOException {

src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactory.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.apache.flink.api.common.serialization.DeserializationSchema;
66
import org.apache.flink.table.data.RowData;
77

8+
import com.getindata.connectors.http.LookupQueryCreator;
89
import com.getindata.connectors.http.internal.HeaderPreprocessor;
910
import com.getindata.connectors.http.internal.PollingClientFactory;
1011
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
@@ -14,16 +15,22 @@ public class JavaNetHttpPollingClientFactory implements PollingClientFactory<Row
1415

1516
@Override
1617
public JavaNetHttpPollingClient createPollClient(
17-
HttpLookupConfig options,
18-
DeserializationSchema<RowData> schemaDecoder) {
18+
HttpLookupConfig options,
19+
DeserializationSchema<RowData> schemaDecoder,
20+
LookupQueryCreator lookupQueryCreator) {
1921

2022
HttpClient httpClient = JavaNetHttpClientFactory.createClient(options.getProperties());
2123

2224
// TODO Consider this to be injected as method argument or factory field
2325
// so user could set this using API.
2426
HeaderPreprocessor headerPreprocessor = HttpHeaderUtils.createDefaultHeaderPreprocessor();
2527

26-
27-
return new JavaNetHttpPollingClient(httpClient, schemaDecoder, options, headerPreprocessor);
28+
return new JavaNetHttpPollingClient(
29+
httpClient,
30+
schemaDecoder,
31+
options,
32+
lookupQueryCreator,
33+
headerPreprocessor
34+
);
2835
}
2936
}

0 commit comments

Comments
 (0)