Skip to content

Commit 7001b9d

Browse files
kristoffSCKrzysztof Chmielewski
authored andcommitted
Add Run Example to README.md
Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
1 parent 02ffadf commit 7001b9d

File tree

2 files changed

+64
-7
lines changed

2 files changed

+64
-7
lines changed

README.md

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,10 @@ Connector supports only STRING types.
1111
* Maven 3
1212
* Flink 14+
1313

14-
## Structure and further work
15-
The main code can be found under `com.getindata.connectors.http.table` package plus additional classes directly from `com.getindata.connectors.http`
16-
17-
The `com.getindata.connectors.http.stream` package is pure PoC and currently not meant to be use. The purpose of this package was to test out the new Unified Source design for Flink Source Connector
18-
[2]. Currently, the implementation is purely PoC, and requires further development.
14+
## Implementation
15+
Implementation is based on Flink's `TableFunction` and `AsyncTableFunction` classes.
16+
To be more specific we are using a `LookupTableSource`. Unfortunately Flink's new unified source interface [2] cannot be used for this type of source.
17+
Issue was discussed on Flink's user mailing list - https://lists.apache.org/thread/tx2w1m15zt5qnvt924mmbvr7s8rlyjmw
1918

2019
## Usage
2120
Flink SQL table definition:
@@ -67,11 +66,67 @@ For example `'field.isActive.path' = '$.details.isActive'` - the value for table
6766
| root | optional | Sets the json root node for entire table. The value should be presented as Json Path [3], for example `$.details`.|
6867
| field.#.path | optional | The Json Path from response model that should be use for given `#` field. If `root` option was defined it will be added to field path. The value must be presented in Json Path format [3], for example `$.details.nestedDetails.balance` |
6968

69+
## Build and deployment
70+
Currently, we are not publishing this artifact to any repository. The CI/CD configuration is also next thing to do.
71+
To
72+
73+
To build the project locally you need to have `maven 3` and Java 11+. </br>
74+
75+
Project build command: `mvn package`. </br>
76+
Detailed test report can be found under `target/site/jacoco/index.xml`.
77+
78+
## Demo application
79+
You can test this connector using simple mock http server provided with this repository and Flink SQL-client.
80+
The mock server can be started from IDE (currently only this way) by running `HttpStubApp::main` method.
81+
It will start HTTP server listening on `http://localhost:8080/client`
82+
83+
Steps to follow:
84+
- Run Mock HTTP server from `HttpStubApp::main` method.
85+
- Start your Flink cluster, for example as described under https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/local_installation/
86+
- Start Flink SQL Client [4] by calling: `./bin/sql-client.sh -j flink-http-connector-1.0-SNAPSHOT.jar`
87+
- Execute SQL statements:
88+
Create Data Stream source Table:
89+
```roomsql
90+
CREATE TABLE Orders (id STRING, id2 STRING, proc_time AS PROCTIME()
91+
) WITH (
92+
'connector' = 'datagen',
93+
'rows-per-second' = '1',
94+
'fields.id.kind' = 'sequence',
95+
'fields.id.start' = '1',
96+
'fields.id.end' = '120',
97+
'fields.id2.kind' = 'sequence',
98+
'fields.id2.start' = '2',
99+
'fields.id2.end' = '120'
100+
);
101+
```
102+
103+
Create Http Connector Lookup Table:
104+
```roomsql
105+
CREATE TABLE Customers (id STRING, id2 STRING, msg STRING, uuid STRING, isActive STRING, balance STRING
106+
) WITH (
107+
'connector' = 'rest-lookup',
108+
'url' = 'http://localhost:8080/client',
109+
'asyncPolling' = 'true',
110+
'field.isActive.path' = '$.details.isActive',
111+
'field.balance.path' = '$.details.nestedDetails.balance'
112+
);
113+
```
114+
115+
Submit SQL Select query to join both tables:
116+
```roomsql
117+
SELECT o.id, o.id2, c.msg, c.uuid, c.isActive, c.balance FROM Orders AS o JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.id = c.id AND o.id2 = c.id2;
118+
```
119+
120+
As a result, you should see a table with joined records like so:
121+
![join-result](docs/JoinTable.PNG)
122+
123+
The `msg` column shows parameters used with REST call for given JOIN record.
124+
70125
## TODO
71126
- Implement caches.
72127
- Add support for other Flink types. Currently, STRING type is only fully supported.
73128
- Think about Retry Policy for Http Request
74-
- Use Flink Format [4] to parse Json response
129+
- Use Flink Format [5] to parse Json response
75130
- Add Configurable Timeout value
76131
- Check other `//TODO`'s.
77132

@@ -82,4 +137,6 @@ For example `'field.isActive.path' = '$.details.isActive'` - the value for table
82137
</br>
83138
[3] https://support.smartbear.com/alertsite/docs/monitors/api/endpoint/jsonpath.html
84139
</br>
85-
[4] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/
140+
[4] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/
141+
</br>
142+
[5] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/

docs/JoinTable.PNG

62.8 KB
Loading

0 commit comments

Comments
 (0)