|
| 1 | +# flink-http-connector |
| 2 | +The HTTP TableLookup connector that allows for pulling data from external system via HTTP GET method. |
| 3 | +The goal for this connector was to use it in Flink SQL statement as a standard table that can be later joined with other stream using pure SQL Flink. |
| 4 | + |
| 5 | +Currently, connector supports only Lookup Joins [1] and expects JSON as a response body. |
| 6 | + |
| 7 | +Connector supports only STRING types. |
| 8 | + |
| 9 | +## Prerequisites |
| 10 | +* Java 11 |
| 11 | +* Maven 3 |
| 12 | +* Flink 14+ |
| 13 | + |
| 14 | +## Structure and further work |
| 15 | +The main code can be found under `com.getindata.connectors.http.table` package plus additional classes directly from `com.getindata.connectors.http` |
| 16 | + |
| 17 | +The `com.getindata.connectors.http.stream` package is pure PoC and currently not meant to be use. The purpose of this package was to test out the new Unified Source design for Flink Source Connector |
| 18 | +[2]. Currently, the implementation is purely PoC, and requires further development. |
| 19 | + |
| 20 | +## Usage |
| 21 | +Flink SQL table definition: |
| 22 | + |
| 23 | +```roomsql |
| 24 | +CREATE TABLE Customers ( |
| 25 | +id STRING, |
| 26 | +id2 STRING, |
| 27 | +msg STRING, |
| 28 | +uuid STRING, |
| 29 | +isActive STRING, |
| 30 | +balance STRING |
| 31 | +) WITH ( |
| 32 | +'connector' = 'rest-lookup', |
| 33 | +'url' = 'http://localhost:8080/client', |
| 34 | +'asyncPolling' = 'true', |
| 35 | +'field.isActive.path' = '$.details.isActive', |
| 36 | +'field.balance.path' = '$.details.nestedDetails.balance' |
| 37 | +) |
| 38 | +``` |
| 39 | +Using _Customers_ table in Flink SQL Lookup Join: |
| 40 | + |
| 41 | +```roomsql |
| 42 | +SELECT o.id, o.id2, c.msg, c.uuid, c.isActive, c.balance FROM Orders AS o |
| 43 | +JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.id = c.id AND o.id2 = c.id2 |
| 44 | +``` |
| 45 | + |
| 46 | +The columns and their values used for JOIN `ON` condition will be used as HTTP get parameters where the column name will be used as a request parameter name. |
| 47 | +For Example: |
| 48 | +`` |
| 49 | +http://localhost:8080/client/service?id=1&uuid=2 |
| 50 | +`` |
| 51 | + |
| 52 | +## Http Response to Table schema mapping |
| 53 | +The mapping from Http Json Response to SQL table schema is done via Json Paths [3]. |
| 54 | +This is achieved thanks to `com.jayway.jsonpath:json-path` library. |
| 55 | + |
| 56 | +If no `root` or `field.#.path` option is defined, the connector will use the column name as json path and will try to look for Json Node with that name in received Json. If no node with a given name is found, the connector will return `null` as value for this field. |
| 57 | + |
| 58 | +If the `field.#.path` option is defined, connector will use given Json path from option's value in order to find Json data that should be used for this column. |
| 59 | +For example `'field.isActive.path' = '$.details.isActive'` - the value for table column `isActive` will be taken from `$.details.isActive` node from received Json. |
| 60 | + |
| 61 | +## Connector Options |
| 62 | +| Option | Required | Description/Value| |
| 63 | +| -------------- | ----------- | -------------- | |
| 64 | +| connector | required | The Value should be set to _rest-lookup_| |
| 65 | +| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_| |
| 66 | +| asyncPolling | optional | true/false - determines whether Async Pooling should be used. Mechanism is based on Flink's Async I/O.| |
| 67 | +| root | optional | Sets the json root node for entire table. The value should be presented as Json Path [3], for example `$.details`.| |
| 68 | +| field.#.path | optional | The Json Path from response model that should be use for given `#` field. If `root` option was defined it will be added to field path. The value must be presented in Json Path format [3], for example `$.details.nestedDetails.balance` | |
| 69 | + |
| 70 | +## TODO |
| 71 | +- Implement caches. |
| 72 | +- Add support for other Flink types. Currently, STRING type is only fully supported. |
| 73 | +- Think about Retry Policy for Http Request |
| 74 | +- Use Flink Format [4] to parse Json response |
| 75 | +- Add Configurable Timeout value |
| 76 | +- Check other `//TODO`'s. |
| 77 | + |
| 78 | +### |
| 79 | +[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#lookup-join |
| 80 | +</br> |
| 81 | +[2] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/sources/ |
| 82 | +</br> |
| 83 | +[3] https://support.smartbear.com/alertsite/docs/monitors/api/endpoint/jsonpath.html |
| 84 | +</br> |
| 85 | +[4] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/ |
0 commit comments