Skip to content

Commit ee9b86e

Browse files
authored
Http Sink Table API (#5)
* Add support for Table API in HTTP Sink * Update CHANGELOG.md * Update README.md
1 parent a987c60 commit ee9b86e

File tree

10 files changed

+731
-29
lines changed

10 files changed

+731
-29
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
## [Unreleased]
44

55
- Implement [HttpSink](src/main/java/com/getindata/connectors/http/sink/HttpSink.java) deriving from [AsyncSinkBase](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink) introduced in Flink 1.15.
6+
- Add support for Table API in HttpSink in the form of [HttpDynamicSink](src/main/java/com/getindata/connectors/http/table/HttpDynamicSink.java).
67

78
## [0.1.0] - 2022-05-26
89

9-
- Implement baisc support for Http connector for Flink SQL
10+
- Implement basic support for Http connector for Flink SQL
1011

1112
[Unreleased]: https://github.com/getindata/flink-http-connector/compare/0.1.0...HEAD
1213

README.md

Lines changed: 76 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,26 @@
11
# flink-http-connector
2-
The HTTP TableLookup connector that allows for pulling data from external system via HTTP GET method.
3-
The goal for this connector was to use it in Flink SQL statement as a standard table that can be later joined with other stream using pure SQL Flink.
2+
The HTTP TableLookup connector that allows for pulling data from external system via HTTP GET method and HTTP Sink that allows for sending data to external system via HTTP requests.
3+
4+
#### HTTP TableLookup Source
5+
The goal for HTTP TableLookup connector was to use it in Flink SQL statement as a standard table that can be later joined with other stream using pure SQL Flink.
46

5-
Currently, connector supports only Lookup Joins [1] and expects JSON as a response body.
7+
Currently, HTTP TableLookup connector supports only Lookup Joins [1] and expects JSON as a response body. It also supports only the STRING types.
68

7-
Connector supports only STRING types.
9+
#### HTTP Sink
10+
`HttpSink` supports both Streaming API (when using [HttpSink](src/main/java/com/getindata/connectors/http/sink/HttpSink.java) built using [HttpSinkBuilder](src/main/java/com/getindata/connectors/http/sink/HttpSinkBuilder.java)) and the Table API (using connector created in [HttpDynamicTableSinkFactory](src/main/java/com/getindata/connectors/http/table/HttpDynamicTableSinkFactory.java)).
811

912
## Prerequisites
1013
* Java 11
1114
* Maven 3
1215
* Flink 1.15+
1316

14-
## Implementation
15-
Implementation is based on Flink's `TableFunction` and `AsyncTableFunction` classes.
16-
To be more specific we are using a `LookupTableSource`. Unfortunately Flink's new unified source interface [2] cannot be used for this type of source.
17-
Issue was discussed on Flink's user mailing list - https://lists.apache.org/thread/tx2w1m15zt5qnvt924mmbvr7s8rlyjmw
17+
## Installation
18+
19+
In order to use the `flink-http-connector` the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. For build automation tool reference, look into Maven Central: [https://mvnrepository.com/artifact/com.getindata/flink-http-connector](https://mvnrepository.com/artifact/com.getindata/flink-http-connector).
1820

1921
## Usage
22+
23+
### HTTP TableLookup Source
2024
Flink SQL table definition:
2125

2226
```roomsql
@@ -48,28 +52,70 @@ For Example:
4852
http://localhost:8080/client/service?id=1&uuid=2
4953
``
5054

55+
### HTTP Sink
56+
The following example shows the minimum Table API example to create a [HttpDynamicSink](src/main/java/com/getindata/connectors/http/table/HttpDynamicSink.java) that writes JSON values to an HTTP endpoint using POST method, assuming Flink has JAR of [JSON serializer](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/json/) installed:
57+
58+
```roomsql
59+
CREATE TABLE http (
60+
id bigint,
61+
some_field string
62+
) WITH (
63+
'connector' = 'http-sink'
64+
'url' = 'http://example.com/myendpoint'
65+
'format' = 'json'
66+
)
67+
```
68+
69+
Then use `INSERT` SQL statement to send data to your HTTP endpoint:
70+
71+
```roomsql
72+
INSERT INTO http VALUES (1, 'Ninette'), (2, 'Hedy')
73+
```
74+
75+
Due to the fact that `HttpSink` sends bytes inside HTTP request's body, one can easily swap `'format' = 'json'` for some other [format](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/overview/).
76+
77+
Other examples of usage of the Table API can be found in [some tests](src/test/java/com/getindata/connectors/http/table/HttpDynamicSinkInsertTest.java).
78+
79+
## Implementation
80+
Implementation of an HTTP source connector is based on Flink's `TableFunction` and `AsyncTableFunction` classes.
81+
To be more specific we are using a `LookupTableSource`. Unfortunately Flink's new unified source interface [2] cannot be used for this type of source.
82+
Issue was discussed on Flink's user mailing list - https://lists.apache.org/thread/tx2w1m15zt5qnvt924mmbvr7s8rlyjmw
83+
84+
Implementation of an HTTP Sink is based on Flink's `AsyncSinkBase` introduced in Flink 1.15 [3, 4].
85+
5186
## Http Response to Table schema mapping
52-
The mapping from Http Json Response to SQL table schema is done via Json Paths [3].
87+
The mapping from Http Json Response to SQL table schema is done via Json Paths [5].
5388
This is achieved thanks to `com.jayway.jsonpath:json-path` library.
5489

5590
If no `root` or `field.#.path` option is defined, the connector will use the column name as json path and will try to look for Json Node with that name in received Json. If no node with a given name is found, the connector will return `null` as value for this field.
5691

5792
If the `field.#.path` option is defined, connector will use given Json path from option's value in order to find Json data that should be used for this column.
5893
For example `'field.isActive.path' = '$.details.isActive'` - the value for table column `isActive` will be taken from `$.details.isActive` node from received Json.
5994

60-
## Connector Options
61-
| Option | Required | Description/Value|
62-
| -------------- | ----------- | -------------- |
63-
| connector | required | The Value should be set to _rest-lookup_|
64-
| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_|
65-
| asyncPolling | optional | true/false - determines whether Async Pooling should be used. Mechanism is based on Flink's Async I/O.|
66-
| root | optional | Sets the json root node for entire table. The value should be presented as Json Path [3], for example `$.details`.|
67-
| field.#.path | optional | The Json Path from response model that should be use for given `#` field. If `root` option was defined it will be added to field path. The value must be presented in Json Path format [3], for example `$.details.nestedDetails.balance` |
95+
## Table API Connector Options
96+
### HTTP TableLookup Source
97+
| Option | Required | Description/Value |
98+
|--------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
99+
| connector | required | The Value should be set to _rest-lookup_ |
100+
| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ |
101+
| asyncPolling | optional | true/false - determines whether Async Pooling should be used. Mechanism is based on Flink's Async I/O. |
102+
| root | optional | Sets the json root node for entire table. The value should be presented as Json Path [5], for example `$.details`. |
103+
| field.#.path | optional | The Json Path from response model that should be use for given `#` field. If `root` option was defined it will be added to field path. The value must be presented in Json Path format [5], for example `$.details.nestedDetails.balance` |
104+
105+
### HTTP Sink
106+
| Option | Required | Description/Value |
107+
|----------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
108+
| connector | required | Specify what connector to use. For HTTP Sink it should be set to _'http-sink'_. |
109+
| url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. |
110+
| format | required | Specify what format to use. |
111+
| insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. |
112+
| sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. |
113+
| sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. |
114+
| sink.requests.max-buffered | optional | Maximum number of buffered records before applying backpressure. |
115+
| sink.flush-buffer.size | optional | The maximum size of a batch of entries that may be sent to the HTTP endpoint measured in bytes. |
116+
| sink.flush-buffer.timeout | optional | Threshold time in milliseconds for an element to be in a buffer before being flushed. |
68117

69118
## Build and deployment
70-
Currently, we are not publishing this artifact to any repository. The CI/CD configuration is also next thing to do.
71-
To
72-
73119
To build the project locally you need to have `maven 3` and Java 11+. </br>
74120

75121
Project build command: `mvn package`. </br>
@@ -83,7 +129,7 @@ It will start HTTP server listening on `http://localhost:8080/client`
83129
Steps to follow:
84130
- Run Mock HTTP server from `HttpStubApp::main` method.
85131
- Start your Flink cluster, for example as described under https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/try-flink/local_installation/
86-
- Start Flink SQL Client [4] by calling: `./bin/sql-client.sh -j flink-http-connector-1.0-SNAPSHOT.jar`
132+
- Start Flink SQL Client [6] by calling: `./bin/sql-client.sh -j flink-http-connector-1.0-SNAPSHOT.jar`
87133
- Execute SQL statements:
88134
Create Data Stream source Table:
89135
```roomsql
@@ -123,11 +169,10 @@ As a result, you should see a table with joined records like so:
123169
The `msg` column shows parameters used with REST call for given JOIN record.
124170

125171
## TODO
126-
- Setup CI/CD and release first version.
127172
- Implement caches.
128173
- Add support for other Flink types. Currently, STRING type is only fully supported.
129174
- Think about Retry Policy for Http Request
130-
- Use Flink Format [5] to parse Json response
175+
- Use Flink Format [7] to parse Json response
131176
- Add Configurable Timeout value
132177
- Check other `//TODO`'s.
133178

@@ -136,8 +181,13 @@ The `msg` column shows parameters used with REST call for given JOIN record.
136181
</br>
137182
[2] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/sources/
138183
</br>
139-
[3] https://support.smartbear.com/alertsite/docs/monitors/api/endpoint/jsonpath.html
184+
[3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
185+
</br>
186+
[4] https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/connector/base/sink/AsyncSinkBase.html
187+
</br>
188+
[5] https://support.smartbear.com/alertsite/docs/monitors/api/endpoint/jsonpath.html
189+
</br>
190+
[6] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/
140191
</br>
141-
[4] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/
192+
[7] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/
142193
</br>
143-
[5] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/

pom.xml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ under the License.
2727
<packaging>jar</packaging>
2828

2929
<name>flink-http-connector</name>
30-
<description>The HTTP TableLookup connector that allows for pulling data from external system via HTTP GET method. The goal for this connector was to use it in Flink SQL statement as a standard table that can be later joined with other stream using pure SQL Flink.</description>
30+
<description>The HTTP TableLookup connector that allows for pulling data from external system via HTTP GET method and HTTP Sink that allows for sending data to external system via HTTP requests. The goal for HTTP TableLookup connector was to use it in Flink SQL statement as a standard table that can be later joined with other stream using pure SQL Flink.</description>
3131
<url>https://github.com/getindata/flink-http-connector</url>
3232

3333
<licenses>
@@ -214,6 +214,15 @@ under the License.
214214
<scope>test</scope>
215215
</dependency>
216216

217+
<!-- Serializers for tests -->
218+
<dependency>
219+
<groupId>org.apache.flink</groupId>
220+
<artifactId>flink-json</artifactId>
221+
<version>${flink.version}</version>
222+
<scope>test</scope>
223+
</dependency>
224+
<!-- ***** -->
225+
217226
<dependency>
218227
<groupId>org.apache.flink</groupId>
219228
<artifactId>flink-runtime-web</artifactId>

0 commit comments

Comments
 (0)