Skip to content

Commit f39b8b3

Browse files
kristoffSCswtwsk
andauthored
[HTTP-34] Add support for lookup on Row type (#35)
* [HTTP-34] - Adding support for Lookup on Row type and nested types. This change adds support for SQL lookup joins scenarios where ROW type column is used in JOIN condition. In that case, Lookup connector converts used columns (including nested types like Arrays, Maps Lists and ROWs) into proper http query: for GET requests it converts all fields (including ROW columns) to key=value pairs and use them in HTTP query parameters For Body based requests like POST/PUT it converts entire lookup row into JSON object and use it as query's body. By default Flink's Json-Format is used for RowData -> Json conversion. Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com> * Address review comments: fix typos Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com> Co-authored-by: Andrzej Swatowski <andrzejswatowski@outlook.com>
1 parent 6a2846f commit f39b8b3

File tree

43 files changed

+1836
-503
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1836
-503
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
## [Unreleased]
44

5+
- Add to Lookup Source support for performing lookup on columns with complex types such as ROW, Map etc.
6+
- Add support for custom Json Serialization format for SQL Lookup Source when using [GenericJsonQueryCreator](src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonQueryCreator.java)
7+
The custom format can be defined using Flink's Factory mechanism. The format name can be defined using
8+
`lookup-request.format` option. The default format is `json` which means that connector will use FLink's [json-format](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/)
9+
510
## [0.6.0] - 2022-10-05
611

712
### Added

README.md

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,26 @@ the Http Lookup Source DDL property field `gid.connector.http.source.lookup.quer
133133

134134
A default implementation that builds an "ordinary" GET query, i.e. adds `?joinColumn1=value1&joinColumn2=value2&...`
135135
to the URI of the endpoint,
136+
137+
For body based queries such as POST/PUT requests, the
136138
([GenericGetQueryCreator](src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericGetQueryCreator.java))
137-
is provided and set as a default.
139+
is provided as a default query creator. This implementation uses Flink's [json-format](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/) to convert RowData object into Json String.
140+
141+
The important thing worth knowing is that `GenericGetQueryCreator` allows for using custom formats that will perform serialization to Json. Thanks to this, users can create their own logic for converting RowData to Json Strings that will match their requirements
142+
and use it in HTTP Lookup connector and SQL queries.
143+
To create a custom format user has to implement Flink's `SerializationSchema` and `SerializationFormatFactory` interfaces and register custom format factory along other factories in
144+
`resources/META-INF.services/org.apache.flink.table.factories.Factory` file. This is common Flink mechanism for providing custom implementations for various factories.
145+
In order to use custom format, user has to specify option `'lookup-request.format' = 'customFormatName'`, where `customFormatName` is the identifier of our custom format factory.
146+
147+
Additionally, it is possible to pass query format options from table's DDL.
148+
This can be done by using option like so: `'lookup-request.format.customFormatName.customFormatProperty' = 'propertyValue'`, for example
149+
`'lookup-request.format.customFormatName.fail-on-missing-field' = 'true'`. It is important that `customFormatName` part must match `SerializationFormatFactory` identifier used for custom format implementation.
150+
In this case, the `fail-on-missing-field` will be passed to `SerializationFormatFactory::createEncodingFormat(
151+
DynamicTableFactory.Context context, ReadableConfig formatOptions)` method in `ReadableConfig` object.
152+
153+
In configuration when default, Flink-Json format is used for `GenericGetQueryCreator`, all options defined in [json-format](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/)
154+
can be passed through table DDL. For example `'lookup-request.format.json.fail-on-missing-field' = 'true'`. In this case, format identifier is `json`.
155+
138156

139157
### HTTP Sink
140158
The following example shows the minimum Table API example to create a [HttpDynamicSink](src/main/java/com/getindata/connectors/http/internal/table/HttpDynamicSink.java) that writes JSON values to an HTTP endpoint using POST method, assuming Flink has JAR of [JSON serializer](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/json/) installed:

src/main/java/com/getindata/connectors/http/LookupArg.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,19 @@
44
import lombok.RequiredArgsConstructor;
55

66
/**
7-
* Transfer object that contains Http request argument and value.
7+
* Transfer object that contains single lookup argument (column name) and its value.
88
*/
99
@Data
1010
@RequiredArgsConstructor
1111
public class LookupArg {
1212

1313
/**
14-
* HTTP request argument's name.
14+
* Lookup argument name.
1515
*/
1616
private final String argName;
1717

1818
/**
19-
* HTTP request argument's value.
19+
* Lookup argument value.
2020
*/
2121
private final String argValue;
2222
}

src/main/java/com/getindata/connectors/http/LookupQueryCreator.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package com.getindata.connectors.http;
22

33
import java.io.Serializable;
4-
import java.util.List;
4+
5+
import org.apache.flink.table.data.RowData;
56

67
/**
78
* An interface for a creator of a lookup query in the Http Lookup Source (e.g., the query that
@@ -16,8 +17,8 @@ public interface LookupQueryCreator extends Serializable {
1617
* Create a lookup query (like the query appended to path in GET request)
1718
* out of the provided arguments.
1819
*
19-
* @param params the list of {@link LookupArg} containing request parameters.
20+
* @param lookupDataRow a {@link RowData} containing request parameters.
2021
* @return a lookup query.
2122
*/
22-
String createLookupQuery(List<LookupArg> params);
23+
String createLookupQuery(RowData lookupDataRow);
2324
}

src/main/java/com/getindata/connectors/http/LookupQueryCreatorFactory.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package com.getindata.connectors.http;
22

3+
import java.io.Serializable;
4+
5+
import org.apache.flink.configuration.ReadableConfig;
36
import org.apache.flink.table.factories.Factory;
47

58
import com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSource;
9+
import com.getindata.connectors.http.internal.table.lookup.LookupRow;
610

711
/**
812
* The {@link Factory} that dynamically creates and injects {@link LookupQueryCreator} to
@@ -29,10 +33,12 @@
2933
* )
3034
* }</pre>
3135
*/
32-
public interface LookupQueryCreatorFactory extends Factory {
36+
public interface LookupQueryCreatorFactory extends Factory, Serializable {
3337

3438
/**
3539
* @return {@link LookupQueryCreator} custom lookup query creator instance
3640
*/
37-
LookupQueryCreator createLookupQueryCreator();
41+
LookupQueryCreator createLookupQueryCreator(
42+
ReadableConfig readableConfig,
43+
LookupRow lookupRow);
3844
}
Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
package com.getindata.connectors.http.internal;
22

3-
import java.util.List;
43
import java.util.Optional;
54

6-
import com.getindata.connectors.http.LookupArg;
5+
import org.apache.flink.table.data.RowData;
76

87
/**
98
* A client that is used to get enrichment data from external component.
@@ -12,8 +11,8 @@ public interface PollingClient<T> {
1211

1312
/**
1413
* Gets enrichment data from external component using provided lookup arguments.
15-
* @param lookupArgs The list of {@link LookupArg} containing request parameters.
14+
* @param lookupRow A {@link RowData} containing request parameters.
1615
* @return an optional result of data lookup.
1716
*/
18-
Optional<T> pull(List<LookupArg> lookupArgs);
17+
Optional<T> pull(RowData lookupRow);
1918
}

src/main/java/com/getindata/connectors/http/internal/table/lookup/AsyncHttpTableLookupFunction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.apache.flink.table.functions.FunctionContext;
1515
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
1616

17-
import com.getindata.connectors.http.internal.table.lookup.HttpTableLookupFunction.ColumnData;
1817
import com.getindata.connectors.http.internal.utils.ThreadUtils;
1918

2019
@Slf4j
@@ -81,8 +80,8 @@ public void eval(CompletableFuture<Collection<RowData>> resultFuture, Object...
8180
publishingThreadPool);
8281
}
8382

84-
public ColumnData getColumnData() {
85-
return decorate.getColumnData();
83+
public LookupRow getLookupRow() {
84+
return decorate.getLookupRow();
8685
}
8786

8887
public HttpLookupConfig getOptions() {

src/main/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
import java.net.http.HttpRequest.Builder;
88
import java.time.Duration;
99

10+
import lombok.extern.slf4j.Slf4j;
11+
import org.slf4j.Logger;
12+
1013
import com.getindata.connectors.http.LookupQueryCreator;
1114
import com.getindata.connectors.http.internal.HeaderPreprocessor;
1215
import com.getindata.connectors.http.internal.utils.uri.URIBuilder;
@@ -15,6 +18,7 @@
1518
* Implementation of {@link HttpRequestFactory} for REST calls that sends their parameters using
1619
* request body.
1720
*/
21+
@Slf4j
1822
public class BodyBasedRequestFactory extends RequestFactoryBase {
1923

2024
private final String methodName;
@@ -44,6 +48,11 @@ protected Builder setUpRequestMethod(String lookupQuery) {
4448
.timeout(Duration.ofMinutes(2));
4549
}
4650

51+
@Override
52+
protected Logger getLogger() {
53+
return log;
54+
}
55+
4756
private URI constructGetUri() {
4857
try {
4958
return new URIBuilder(baseUrl).build();

src/main/java/com/getindata/connectors/http/internal/table/lookup/GetRequestFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,17 @@
66
import java.net.http.HttpRequest.Builder;
77
import java.time.Duration;
88

9+
import lombok.extern.slf4j.Slf4j;
10+
import org.slf4j.Logger;
11+
912
import com.getindata.connectors.http.LookupQueryCreator;
1013
import com.getindata.connectors.http.internal.HeaderPreprocessor;
1114
import com.getindata.connectors.http.internal.utils.uri.URIBuilder;
1215

1316
/**
1417
* Implementation of {@link HttpRequestFactory} for GET REST calls.
1518
*/
19+
@Slf4j
1620
public class GetRequestFactory extends RequestFactoryBase {
1721

1822
public GetRequestFactory(
@@ -23,6 +27,11 @@ public GetRequestFactory(
2327
super(lookupQueryCreator, headerPreprocessor, options);
2428
}
2529

30+
@Override
31+
protected Logger getLogger() {
32+
return log;
33+
}
34+
2635
/**
2736
* Method for preparing {@link HttpRequest.Builder} for REST GET request, where lookupQuery
2837
* is used as query parameters for example:
Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,29 @@
11
package com.getindata.connectors.http.internal.table.lookup;
22

33
import java.io.Serializable;
4-
import java.util.Collections;
5-
import java.util.List;
64
import java.util.Properties;
75

86
import lombok.Builder;
97
import lombok.Data;
108
import lombok.RequiredArgsConstructor;
9+
import org.apache.flink.configuration.Configuration;
10+
import org.apache.flink.configuration.ReadableConfig;
1111

1212
@Builder
1313
@Data
1414
@RequiredArgsConstructor
1515
public class HttpLookupConfig implements Serializable {
1616

17-
private final String url;
17+
private final String lookupMethod;
1818

19-
@Builder.Default
20-
private final List<String> arguments = Collections.emptyList();
19+
private final String url;
2120

2221
@Builder.Default
2322
private final boolean useAsync = false;
2423

2524
@Builder.Default
2625
private final Properties properties = new Properties();
26+
27+
@Builder.Default
28+
private final ReadableConfig readableConfig = new Configuration();
2729
}

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,9 @@ public class HttpLookupConnectorOptions {
3535
ConfigOptions.key(SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER)
3636
.stringType()
3737
.noDefaultValue();
38+
39+
public static final ConfigOption<String> LOOKUP_REQUEST_FORMAT =
40+
ConfigOptions.key("lookup-request.format")
41+
.stringType()
42+
.defaultValue("json");
3843
}

0 commit comments

Comments
 (0)