Skip to content

Commit b5f863e

Browse files
authored
Merge pull request #29 from getindata/ESP-263_LookupSource_newRestMethods
ESP-263 - Adding suport for POST REST method to http Lookup Source
2 parents 2f39a45 + 279b460 commit b5f863e

27 files changed

+586
-124
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
## [Unreleased]
44

5+
### Added
6+
- Add support for other REST methods like PUT and POST to lookup source connector. The request method can be set using
7+
new optional lookup-source property `lookup-method`. If property is not specified in table DDL, GET method will be used for
8+
lookup queries.
9+
510
## [0.5.0] - 2022-09-22
611

712
### Added

README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,21 @@ SELECT o.id, o.id2, c.msg, c.uuid, c.isActive, c.balance FROM Orders AS o
8383
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.id = c.id AND o.id2 = c.id2
8484
```
8585

86-
The columns and their values used for JOIN `ON` condition will be used as HTTP get parameters where the column name will be used as a request parameter name.
86+
The columns and their values used for JOIN `ON` condition will be used as HTTP GET parameters where the column name will be used as a request parameter name.
87+
8788
For Example:
8889
``
8990
http://localhost:8080/client/service?id=1&uuid=2
9091
``
92+
93+
Or for REST POST method they will be converted to Json and used as request body. In this case, json request body will look like this:
94+
```json
95+
{
96+
"id": "1",
97+
"uuid": "2"
98+
}
99+
```
100+
91101
#### Http headers
92102
It is possible to set HTTP headers that will be added to HTTP request send by lookup source connector.
93103
Headers are defined via property key `gid.connector.http.source.lookup.header.HEADER_NAME = header value` for example:
@@ -256,6 +266,7 @@ If the used value starts from prefix `Basic `, it will be used as header value a
256266
| format | required | Flink's format name that should be used to decode REST response, Use `json` for a typical REST endpoint. |
257267
| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ |
258268
| asyncPolling | optional | true/false - determines whether Async Pooling should be used. Mechanism is based on Flink's Async I/O. |
269+
| lookup-method | optional | GET/POST/PUT (and any other) - determines what REST method should be used for lookup REST query. If not specified, `GET` method will be used. |
259270
| gid.connector.http.lookup.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Source, separated with comma. |
260271
| gid.connector.http.lookup.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.lookup.error.code` list, separated with comma. |
261272
| gid.connector.http.security.cert.server | optional | Path to trusted HTTP server certificate that should be add to connectors key store. More than one path can be specified using `,` as path delimiter. |

src/main/java/com/getindata/connectors/http/LookupQueryCreator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
* {@link LookupQueryCreatorFactory}.
1212
*/
1313
public interface LookupQueryCreator extends Serializable {
14+
1415
/**
1516
* Create a lookup query (like the query appended to path in GET request)
1617
* out of the provided arguments.

src/main/java/com/getindata/connectors/http/LookupQueryCreatorFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
* }</pre>
3131
*/
3232
public interface LookupQueryCreatorFactory extends Factory {
33+
3334
/**
3435
* @return {@link LookupQueryCreator} custom lookup query creator instance
3536
*/

src/main/java/com/getindata/connectors/http/internal/PollingClientFactory.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,12 @@
44

55
import org.apache.flink.api.common.serialization.DeserializationSchema;
66

7-
import com.getindata.connectors.http.LookupQueryCreator;
87
import com.getindata.connectors.http.internal.table.lookup.HttpLookupConfig;
98

109
public interface PollingClientFactory<OUT> extends Serializable {
1110

1211
PollingClient<OUT> createPollClient(
1312
HttpLookupConfig options,
14-
DeserializationSchema<OUT> schemaDecoder,
15-
LookupQueryCreator lookupQueryCreator
13+
DeserializationSchema<OUT> schemaDecoder
1614
);
1715
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.getindata.connectors.http.internal.table.lookup;
2+
3+
import java.net.URI;
4+
import java.net.URISyntaxException;
5+
import java.net.http.HttpRequest;
6+
import java.net.http.HttpRequest.BodyPublishers;
7+
import java.net.http.HttpRequest.Builder;
8+
import java.time.Duration;
9+
10+
import com.getindata.connectors.http.LookupQueryCreator;
11+
import com.getindata.connectors.http.internal.HeaderPreprocessor;
12+
import com.getindata.connectors.http.internal.utils.uri.URIBuilder;
13+
14+
/**
15+
* Implementation of {@link HttpRequestFactory} for REST calls that sends their parameters using
16+
* request body.
17+
*/
18+
public class BodyBasedRequestFactory extends RequestFactoryBase {
19+
20+
private final String methodName;
21+
22+
public BodyBasedRequestFactory(
23+
String methodName,
24+
LookupQueryCreator lookupQueryCreator,
25+
HeaderPreprocessor headerPreprocessor,
26+
HttpLookupConfig options) {
27+
28+
super(lookupQueryCreator, headerPreprocessor, options);
29+
this.methodName = methodName.toUpperCase();
30+
}
31+
32+
/**
33+
* Method for preparing {@link HttpRequest.Builder} for REST request that sends their parameters
34+
* in request body, for example PUT or POST methods
35+
*
36+
* @param lookupQuery lookup query used for request body.
37+
* @return {@link HttpRequest.Builder} for given lookupQuery.
38+
*/
39+
@Override
40+
protected Builder setUpRequestMethod(String lookupQuery) {
41+
return HttpRequest.newBuilder()
42+
.uri(constructGetUri())
43+
.method(methodName, BodyPublishers.ofString(lookupQuery))
44+
.timeout(Duration.ofMinutes(2));
45+
}
46+
47+
private URI constructGetUri() {
48+
try {
49+
return new URIBuilder(baseUrl).build();
50+
} catch (URISyntaxException e) {
51+
throw new RuntimeException(e);
52+
}
53+
}
54+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.getindata.connectors.http.internal.table.lookup;
2+
3+
import java.net.URI;
4+
import java.net.URISyntaxException;
5+
import java.net.http.HttpRequest;
6+
import java.net.http.HttpRequest.Builder;
7+
import java.time.Duration;
8+
9+
import com.getindata.connectors.http.LookupQueryCreator;
10+
import com.getindata.connectors.http.internal.HeaderPreprocessor;
11+
import com.getindata.connectors.http.internal.utils.uri.URIBuilder;
12+
13+
/**
14+
* Implementation of {@link HttpRequestFactory} for GET REST calls.
15+
*/
16+
public class GetRequestFactory extends RequestFactoryBase {
17+
18+
public GetRequestFactory(
19+
LookupQueryCreator lookupQueryCreator,
20+
HeaderPreprocessor headerPreprocessor,
21+
HttpLookupConfig options) {
22+
23+
super(lookupQueryCreator, headerPreprocessor, options);
24+
}
25+
26+
/**
27+
* Method for preparing {@link HttpRequest.Builder} for REST GET request, where lookupQuery
28+
* is used as query parameters for example:
29+
* <pre>
30+
* http:localhost:8080/service?id=1
31+
* </pre>
32+
* @param lookupQuery lookup query used for request query parameters.
33+
* @return {@link HttpRequest.Builder} for given GET lookupQuery
34+
*/
35+
@Override
36+
protected Builder setUpRequestMethod(String lookupQuery) {
37+
return HttpRequest.newBuilder()
38+
.uri(constructGetUri(lookupQuery))
39+
.GET()
40+
.timeout(Duration.ofMinutes(2));
41+
}
42+
43+
private URI constructGetUri(String lookupQuery) {
44+
try {
45+
return new URIBuilder(baseUrl + "?" + lookupQuery).build();
46+
} catch (URISyntaxException e) {
47+
throw new RuntimeException(e);
48+
}
49+
}
50+
}

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import org.apache.flink.configuration.ConfigOption;
44
import org.apache.flink.configuration.ConfigOptions;
55

6-
import com.getindata.connectors.http.internal.table.lookup.querycreators.GenericGetQueryCreatorFactory;
76
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER;
87

98
public class HttpLookupConnectorOptions {
@@ -26,8 +25,14 @@ public class HttpLookupConnectorOptions {
2625
.defaultValue(false)
2726
.withDescription("Whether to use Sync and Async polling mechanism");
2827

28+
public static final ConfigOption<String> LOOKUP_METHOD =
29+
ConfigOptions.key("lookup-method")
30+
.stringType()
31+
.defaultValue("GET")
32+
.withDescription("Method used for REST executed by lookup connector.");
33+
2934
public static final ConfigOption<String> LOOKUP_QUERY_CREATOR_IDENTIFIER =
3035
ConfigOptions.key(SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER)
3136
.stringType()
32-
.defaultValue(GenericGetQueryCreatorFactory.IDENTIFIER);
37+
.noDefaultValue();
3338
}

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.apache.flink.table.data.RowData;
1616
import org.apache.flink.table.types.DataType;
1717

18-
import com.getindata.connectors.http.LookupQueryCreator;
1918
import com.getindata.connectors.http.internal.PollingClientFactory;
2019
import com.getindata.connectors.http.internal.table.lookup.HttpTableLookupFunction.ColumnData;
2120

@@ -32,8 +31,6 @@ public class HttpLookupTableSource
3231

3332
private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
3433

35-
private final LookupQueryCreator lookupQueryCreator;
36-
3734
@Override
3835
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
3936
String[] keyNames = new String[context.getKeys().length];
@@ -53,8 +50,7 @@ public DynamicTableSource copy() {
5350
physicalRowDataType,
5451
pollingClientFactory,
5552
lookupConfig,
56-
decodingFormat,
57-
lookupQueryCreator
53+
decodingFormat
5854
);
5955
}
6056

@@ -84,7 +80,6 @@ private LookupRuntimeProvider buildLookupFunction(String[] keyNames, LookupConte
8480
.pollingClientFactory(pollingClientFactory)
8581
.schemaDecoder(schemaDecoder)
8682
.columnData(columnData)
87-
.lookupQueryCreator(lookupQueryCreator)
8883
.options(lookupConfig)
8984
.build();
9085

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,13 @@
2626
import static org.apache.flink.table.types.utils.DataTypeUtils.removeTimeAttribute;
2727

2828
import com.getindata.connectors.http.LookupQueryCreatorFactory;
29+
import com.getindata.connectors.http.internal.HeaderPreprocessor;
2930
import com.getindata.connectors.http.internal.PollingClientFactory;
3031
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
32+
import com.getindata.connectors.http.internal.table.lookup.querycreators.GenericGetQueryCreatorFactory;
33+
import com.getindata.connectors.http.internal.table.lookup.querycreators.GenericJsonQueryCreatorFactory;
3134
import com.getindata.connectors.http.internal.utils.ConfigUtils;
35+
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
3236
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.*;
3337

3438
public class HttpLookupTableSourceFactory implements DynamicTableSourceFactory {
@@ -60,16 +64,26 @@ public DynamicTableSource createDynamicTableSource(Context context) {
6064
FactoryUtil.FORMAT
6165
);
6266

63-
final LookupQueryCreatorFactory lookupQueryCreatorFactory =
67+
String lookupMethod = readableConfig.get(LOOKUP_METHOD);
68+
69+
LookupQueryCreatorFactory lookupQueryCreatorFactory =
6470
FactoryUtil.discoverFactory(
6571
context.getClassLoader(),
6672
LookupQueryCreatorFactory.class,
67-
readableConfig.get(LOOKUP_QUERY_CREATOR_IDENTIFIER)
73+
readableConfig.getOptional(LOOKUP_QUERY_CREATOR_IDENTIFIER).orElse(
74+
(lookupMethod.equalsIgnoreCase("GET") ?
75+
GenericGetQueryCreatorFactory.IDENTIFIER :
76+
GenericJsonQueryCreatorFactory.IDENTIFIER)
77+
)
6878
);
6979

70-
PollingClientFactory<RowData> pollingClientFactory = new JavaNetHttpPollingClientFactory();
7180
HttpLookupConfig lookupConfig = getHttpLookupOptions(context, readableConfig);
7281

82+
// TODO Consider this to be injected as method argument or factory field
83+
// so user could set this using API.
84+
PollingClientFactory<RowData> pollingClientFactory =
85+
createPollingClientFactory(lookupMethod, lookupQueryCreatorFactory, lookupConfig);
86+
7387
ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
7488

7589
DataType physicalRowDataType =
@@ -79,8 +93,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
7993
physicalRowDataType,
8094
pollingClientFactory,
8195
lookupConfig,
82-
decodingFormat,
83-
lookupQueryCreatorFactory.createLookupQueryCreator()
96+
decodingFormat
8497
);
8598
}
8699

@@ -96,7 +109,29 @@ public Set<ConfigOption<?>> requiredOptions() {
96109

97110
@Override
98111
public Set<ConfigOption<?>> optionalOptions() {
99-
return Set.of(URL_ARGS, ASYNC_POLLING);
112+
return Set.of(URL_ARGS, ASYNC_POLLING, LOOKUP_METHOD);
113+
}
114+
115+
private PollingClientFactory<RowData> createPollingClientFactory(
116+
String lookupMethod,
117+
LookupQueryCreatorFactory lookupQueryCreatorFactory,
118+
HttpLookupConfig lookupConfig) {
119+
120+
HeaderPreprocessor headerPreprocessor = HttpHeaderUtils.createDefaultHeaderPreprocessor();
121+
122+
HttpRequestFactory requestFactory = (lookupMethod.equalsIgnoreCase("GET")) ?
123+
new GetRequestFactory(
124+
lookupQueryCreatorFactory.createLookupQueryCreator(),
125+
headerPreprocessor,
126+
lookupConfig) :
127+
new BodyBasedRequestFactory(
128+
lookupMethod,
129+
lookupQueryCreatorFactory.createLookupQueryCreator(),
130+
headerPreprocessor,
131+
lookupConfig
132+
);
133+
134+
return new JavaNetHttpPollingClientFactory(requestFactory);
100135
}
101136

102137
private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig config) {
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.getindata.connectors.http.internal.table.lookup;
2+
3+
import java.io.Serializable;
4+
import java.net.http.HttpRequest;
5+
import java.util.List;
6+
7+
import com.getindata.connectors.http.LookupArg;
8+
9+
/**
10+
* Factory for creating {@link HttpRequest} objects for Rest clients.
11+
*/
12+
public interface HttpRequestFactory extends Serializable {
13+
14+
/**
15+
* Creates {@link HttpRequest} from given List of {@link LookupArg} objects.
16+
*
17+
* @param params {@link LookupArg} objects used for building http request.
18+
* @return {@link HttpRequest} created from {@link LookupArg}
19+
*/
20+
HttpRequest buildLookupRequest(List<LookupArg> params);
21+
}

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.flink.table.functions.TableFunction;
2121

2222
import com.getindata.connectors.http.LookupArg;
23-
import com.getindata.connectors.http.LookupQueryCreator;
2423
import com.getindata.connectors.http.internal.PollingClient;
2524
import com.getindata.connectors.http.internal.PollingClientFactory;
2625

@@ -31,8 +30,6 @@ public class HttpTableLookupFunction extends TableFunction<RowData> {
3130

3231
private final DeserializationSchema<RowData> schemaDecoder;
3332

34-
private final LookupQueryCreator lookupQueryCreator;
35-
3633
@Getter
3734
private final ColumnData columnData;
3835

@@ -48,22 +45,20 @@ private HttpTableLookupFunction(
4845
PollingClientFactory<RowData> pollingClientFactory,
4946
DeserializationSchema<RowData> schemaDecoder,
5047
ColumnData columnData,
51-
HttpLookupConfig options,
52-
LookupQueryCreator lookupQueryCreator) {
48+
HttpLookupConfig options) {
5349

5450
this.pollingClientFactory = pollingClientFactory;
5551
this.schemaDecoder = schemaDecoder;
5652
this.columnData = columnData;
5753
this.options = options;
58-
this.lookupQueryCreator = lookupQueryCreator;
5954
}
6055

6156
@Override
6257
public void open(FunctionContext context) throws Exception {
6358
super.open(context);
6459
this.localHttpCallCounter = new AtomicInteger(0);
6560
this.client = pollingClientFactory
66-
.createPollClient(options, schemaDecoder, lookupQueryCreator);
61+
.createPollClient(options, schemaDecoder);
6762

6863
context
6964
.getMetricGroup()

0 commit comments

Comments
 (0)