Skip to content

Commit cabdb63

Browse files
authored
ESP-148_UseFlink_JsonFormat_InSource - Use Flink Json Format instead custom implemented Json Format for Source Lookup. (#19)
Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com> Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
1 parent a7ddf6b commit cabdb63

32 files changed

+616
-527
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,17 @@
66
- Fix JavaDoc errors.
77

88
### Added
9-
- Add new properties `gid.connector.http.sink.error.code` and `gid.connector.http.sink.error.code.exclude`
9+
- Add to Http Sink a new properties `gid.connector.http.sink.error.code` and `gid.connector.http.sink.error.code.exclude`
1010
to set HTTP status code that should be interpreted as errors.
11+
- Use Flink's format support to Http Lookup Source.
1112

1213
### Changed
1314
- Change dependency scope for `org.apache.flink.flink-connector-base` from `compile` to `provided`.
15+
- Changed DDL of `rest-lookup` connector. Dropped `json-path` properties, and add mandatory `format` property.
1416

1517
### Removed
1618
- Remove dependency on `org.apache.httpcomponents.httpclient`from production code. Dependency is only for test scope.
19+
- Removed dependency on `com.jayway.jsonpath.json-path`
1720

1821
## [0.3.0] - 2022-07-21
1922

README.md

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ Please use [releases](https://github.com/getindata/flink-http-connector/releases
1010

1111
The goal for HTTP TableLookup connector was to use it in Flink SQL statement as a standard table that can be later joined with other stream using pure SQL Flink.
1212

13-
Currently, HTTP TableLookup connector supports only Lookup Joins [1] and expects JSON as a response body. It also supports only the STRING types.
13+
Currently, HTTP source connector supports only Lookup Joins (TableLookup) [1] in Table/SQL API.
1414
`HttpSink` supports both Streaming API (when using [HttpSink](src/main/java/com/getindata/connectors/http/internal/sink/HttpSink.java) built using [HttpSinkBuilder](src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkBuilder.java)) and the Table API (using connector created in [HttpDynamicTableSinkFactory](src/main/java/com/getindata/connectors/http/internal/table/HttpDynamicTableSinkFactory.java)).
1515

1616
## Prerequisites
@@ -23,7 +23,6 @@ This connector has few Flink's runtime dependencies, that are expected to be pro
2323
* `org.apache.flink.flink-java`
2424
* `org.apache.flink.flink-clients`
2525
* `org.apache.flink.flink-connector-base`
26-
* `org.apache.flink.flink-java`
2726

2827
## Installation
2928

@@ -38,23 +37,41 @@ You can read the official JavaDoc documentation of the latest release at [https:
3837
### HTTP TableLookup Source
3938
Flink SQL table definition:
4039

40+
Enrichment Lookup Table
4141
```roomsql
4242
CREATE TABLE Customers (
43-
id STRING,
44-
id2 STRING,
45-
msg STRING,
46-
uuid STRING,
47-
isActive STRING,
48-
balance STRING
43+
id STRING,
44+
id2 STRING,
45+
msg STRING,
46+
uuid STRING,
47+
details ROW<
48+
isActive BOOLEAN,
49+
nestedDetails ROW<
50+
balance STRING
51+
>
52+
>
4953
) WITH (
5054
'connector' = 'rest-lookup',
55+
'format' = 'json',
5156
'url' = 'http://localhost:8080/client',
52-
'asyncPolling' = 'true',
53-
'field.isActive.path' = '$.details.isActive',
54-
'field.balance.path' = '$.details.nestedDetails.balance'
57+
'asyncPolling' = 'true'
5558
)
5659
```
57-
Using _Customers_ table in Flink SQL Lookup Join:
60+
Data Source Table
61+
```roomsql
62+
CREATE TABLE Orders (id STRING, id2 STRING, proc_time AS PROCTIME()
63+
) WITH (
64+
'connector' = 'datagen',
65+
'rows-per-second' = '1',
66+
'fields.id.kind' = 'sequence',
67+
'fields.id.start' = '1',
68+
'fields.id.end' = '120',
69+
'fields.id2.kind' = 'sequence',
70+
'fields.id2.start' = '2',
71+
'fields.id2.end' = '120'
72+
);
73+
74+
Using _Customers_ table in Flink SQL Lookup Join with Orders table:
5875
5976
```roomsql
6077
SELECT o.id, o.id2, c.msg, c.uuid, c.isActive, c.balance FROM Orders AS o
@@ -153,13 +170,12 @@ An example of such a mask would be `3XX, 4XX, 5XX`. In this case, all 300s, 400s
153170

154171
## Table API Connector Options
155172
### HTTP TableLookup Source
156-
| Option | Required | Description/Value |
157-
|--------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
158-
| connector | required | The Value should be set to _rest-lookup_ |
159-
| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ |
160-
| asyncPolling | optional | true/false - determines whether Async Pooling should be used. Mechanism is based on Flink's Async I/O. |
161-
| root | optional | Sets the json root node for entire table. The value should be presented as Json Path [5], for example `$.details`. |
162-
| field.#.path | optional | The Json Path from response model that should be use for given `#` field. If `root` option was defined it will be added to field path. The value must be presented in Json Path format [5], for example `$.details.nestedDetails.balance` |
173+
| Option | Required | Description/Value |
174+
|--------------|----------|----------------------------------------------------------------------------------------------------------|
175+
| connector | required | The Value should be set to _rest-lookup_ |
176+
| format | required | Flink's format name that should be used to decode REST response, Use `json` for a typical REST endpoint. |
177+
| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ |
178+
| asyncPolling | optional | true/false - determines whether Async Pooling should be used. Mechanism is based on Flink's Async I/O. |
163179

164180
### HTTP Sink
165181
| Option | Required | Description/Value |
@@ -207,13 +223,22 @@ CREATE TABLE Orders (id STRING, id2 STRING, proc_time AS PROCTIME()
207223

208224
Create Http Connector Lookup Table:
209225
```roomsql
210-
CREATE TABLE Customers (id STRING, id2 STRING, msg STRING, uuid STRING, isActive STRING, balance STRING
226+
CREATE TABLE Customers (
227+
id STRING,
228+
id2 STRING,
229+
msg STRING,
230+
uuid STRING,
231+
details ROW<
232+
isActive BOOLEAN,
233+
nestedDetails ROW<
234+
balance STRING
235+
>
236+
>
211237
) WITH (
212-
'connector' = 'rest-lookup',
238+
'connector' = 'rest-lookup',
239+
'format' = 'json'
213240
'url' = 'http://localhost:8080/client',
214-
'asyncPolling' = 'true',
215-
'field.isActive.path' = '$.details.isActive',
216-
'field.balance.path' = '$.details.nestedDetails.balance'
241+
'asyncPolling' = 'true'
217242
);
218243
```
219244

pom.xml

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ under the License.
7777
<log4j.version>2.17.2</log4j.version>
7878
<lombok.version>1.18.22</lombok.version>
7979
<junit.jupiter.version>5.8.1</junit.jupiter.version>
80-
<json.path.version>2.6.0</json.path.version>
8180
<assertj.core.version>3.21.0</assertj.core.version>
8281
<mockito.version>4.0.0</mockito.version>
8382
<wiremock.version>2.27.2</wiremock.version>
@@ -151,13 +150,6 @@ under the License.
151150
<scope>provided</scope>
152151
</dependency>
153152

154-
<!--===== Needed Dep ====-->
155-
<dependency>
156-
<groupId>com.jayway.jsonpath</groupId>
157-
<artifactId>json-path</artifactId>
158-
<version>${json.path.version}</version>
159-
</dependency>
160-
161153
<dependency>
162154
<groupId>org.projectlombok</groupId>
163155
<artifactId>lombok</artifactId>

src/main/java/com/getindata/connectors/http/internal/HttpResultConverter.java

Lines changed: 0 additions & 10 deletions
This file was deleted.

src/main/java/com/getindata/connectors/http/internal/JsonResultTableConverter.java

Lines changed: 0 additions & 92 deletions
This file was deleted.
Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,19 @@
11
package com.getindata.connectors.http.internal;
22

33
import java.util.List;
4+
import java.util.Optional;
45

56
import com.getindata.connectors.http.internal.table.lookup.LookupArg;
67

8+
/**
9+
* A client that is used to get enrichment data from external component.
10+
*/
711
public interface PollingClient<T> {
812

9-
T pull(List<LookupArg> params);
13+
/**
14+
* Gets enrichment data from external component using provided lookup arguments.
15+
* @param lookupArgs The list of {@link LookupArg} containing request parameters.
16+
* @return an optional result of data lookup.
17+
*/
18+
Optional<T> pull(List<LookupArg> lookupArgs);
1019
}
Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
package com.getindata.connectors.http.internal;
22

3-
import org.apache.flink.api.connector.source.SourceReaderContext;
4-
import org.apache.flink.table.functions.FunctionContext;
3+
import java.io.Serializable;
54

6-
import com.getindata.connectors.http.internal.table.lookup.HttpLookupConfig;
5+
import org.apache.flink.api.common.serialization.DeserializationSchema;
76

8-
public interface PollingClientFactory<OUT> {
7+
import com.getindata.connectors.http.internal.table.lookup.HttpLookupConfig;
98

10-
PollingClient<OUT> createPollClient(FunctionContext context,
11-
HttpLookupConfig options);
9+
public interface PollingClientFactory<OUT> extends Serializable {
1210

13-
PollingClient<OUT> createPollClient(SourceReaderContext readerContext);
11+
PollingClient<OUT> createPollClient(
12+
HttpLookupConfig options,
13+
DeserializationSchema<OUT> schemaDecoder
14+
);
1415
}

src/main/java/com/getindata/connectors/http/internal/table/lookup/AbstractTablePollingClientFactory.java

Lines changed: 0 additions & 31 deletions
This file was deleted.

src/main/java/com/getindata/connectors/http/internal/table/lookup/AsyncHttpTableLookupFunction.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.Collection;
44
import java.util.Collections;
5+
import java.util.Optional;
56
import java.util.concurrent.CompletableFuture;
67
import java.util.concurrent.ExecutorService;
78
import java.util.concurrent.Executors;
@@ -24,10 +25,20 @@ public class AsyncHttpTableLookupFunction extends AsyncTableFunction<RowData> {
2425

2526
private static final int PUBLISHING_THREAD_POOL_SIZE = 4;
2627

28+
/**
29+
* The {@link org.apache.flink.table.functions.TableFunction} we want to decorate with
30+
* async framework.
31+
*/
2732
private final HttpTableLookupFunction decorate;
2833

34+
/**
35+
* Thread pool for polling data from Http endpoint.
36+
*/
2937
private transient ExecutorService pollingThreadPool;
3038

39+
/**
40+
* Thread pool for publishing data to Flink.
41+
*/
3142
private transient ExecutorService publishingThreadPool;
3243

3344
@Override
@@ -50,20 +61,21 @@ public void open(FunctionContext context) throws Exception {
5061

5162
public void eval(CompletableFuture<Collection<RowData>> resultFuture, Object... keys) {
5263

53-
CompletableFuture<RowData> future = new CompletableFuture<>();
64+
CompletableFuture<Optional<RowData>> future = new CompletableFuture<>();
5465
future.completeAsync(() -> decorate.lookupByKeys(keys), pollingThreadPool);
5566

5667
// We don't want to use ForkJoinPool at all. We are using a different thread pool
5768
// for publishing here intentionally to avoid thread starvation.
5869
future.whenCompleteAsync(
59-
(result, throwable) -> {
70+
(optionalResult, throwable) -> {
6071
if (throwable != null) {
6172
log.error("Exception while processing Http Async request", throwable);
6273
resultFuture.completeExceptionally(
6374
new RuntimeException("Exception while processing Http Async request",
6475
throwable));
6576
} else {
66-
resultFuture.complete(Collections.singleton(result));
77+
optionalResult
78+
.ifPresent(result -> resultFuture.complete(Collections.singleton(result)));
6779
}
6880
},
6981
publishingThreadPool);

0 commit comments

Comments
 (0)