Skip to content

Commit 938323b

Browse files
kristoffSCswtwsk
andauthored
ESP-171_HttpLookupSource_ErrorCodeSupport - HTTP ErrorCode Status config for Lookup Source (#22)
* ESP-171_HttpLookupSource_ErrorCodeSupport - Adding to Lookup HTTP Source HTTP Error Code configuration from properties. Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com> * Update CHANGELOG.md Co-authored-by: Andrzej Swatowski <33041023+swtwsk@users.noreply.github.com> Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com> Co-authored-by: Andrzej Swatowski <33041023+swtwsk@users.noreply.github.com>
1 parent c2716f0 commit 938323b

File tree

14 files changed

+210
-72
lines changed

14 files changed

+210
-72
lines changed

CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
- Fix JavaDoc errors.
77

88
### Added
9-
- Add to Http Sink a new properties `gid.connector.http.sink.error.code` and `gid.connector.http.sink.error.code.exclude`
10-
to set HTTP status code that should be interpreted as errors.
9+
- Add new properties `gid.connector.http.sink.error.code`,`gid.connector.http.sink.error.code.exclude`,
10+
`gid.connector.http.source.lookup.error.code` and `gid.connector.http.source.lookup.error.code.exclude`
11+
to set HTTP status codes that should be interpreted as errors both for HTTP Sink and HTTP Lookup Source.
1112
- Use Flink's format support to Http Lookup Source.
1213
- Add HTTP Lookup source client header configuration via properties.
1314

README.md

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -184,17 +184,18 @@ CREATE TABLE http (
184184
)
185185
```
186186

187-
#### HTTP status code handler (currently supported only for HTTP Sink)
188-
Http Sink connector allows defining list of HTTP status codes that should be treated as errors.
187+
## HTTP status code handler
188+
Http Sink and Lookup Source connectors allow defining list of HTTP status codes that should be treated as errors.
189189
By default all 400s and 500s response codes will be interpreted as error code.
190190

191-
This behavior can be changed by using below properties in table definition (DDL) or passing it via
191+
This behavior can be changed by using below properties in table definition (DDL) for Sink and Lookup Source or passing it via
192192
`setProperty' method from Sink's builder. The property names are:
193-
- `gid.connector.http.sink.error.code` used to defined HTTP status code value that should be treated as error for example 404.
193+
- `gid.connector.http.sink.error.code` and `gid.connector.http.source.lookup.error.code` used to defined HTTP status code value that should be treated as error for example 404.
194194
Many status codes can be defined in one value, where each code should be separated with comma, for example:
195195
`401, 402, 403`. User can use this property also to define a type code mask. In that case, all codes from given HTTP response type will be treated as errors.
196196
An example of such a mask would be `3XX, 4XX, 5XX`. In this case, all 300s, 400s and 500s status codes will be treated as errors.
197-
- `gid.connector.http.sink.error.code.exclude` used to exclude a HTTP code from error list. Many status codes can be defined in one value, where each code should be separated with comma, for example:
197+
- `gid.connector.http.sink.error.code.exclude` and `gid.connector.http.source.lookup.error.code.exclude` used to exclude a HTTP code from error list.
198+
Many status codes can be defined in one value, where each code should be separated with comma, for example:
198199
`401, 402, 403`. In this example, codes 401, 402 and 403 would not be interpreted as error codes.
199200

200201

@@ -291,21 +292,13 @@ Issue was discussed on Flink's user mailing list - https://lists.apache.org/thre
291292
Implementation of an HTTP Sink is based on Flink's `AsyncSinkBase` introduced in Flink 1.15 [3, 4].
292293

293294
#### Http Response to Table schema mapping
294-
The mapping from Http Json Response to SQL table schema is done via Json Paths [5].
295-
This is achieved thanks to `com.jayway.jsonpath:json-path` library.
296-
297-
If no `root` or `field.#.path` option is defined, the connector will use the column name as json path and will try to look for Json Node with that name in received Json. If no node with a given name is found, the connector will return `null` as value for this field.
298-
299-
If the `field.#.path` option is defined, connector will use given Json path from option's value in order to find Json data that should be used for this column.
300-
For example `'field.isActive.path' = '$.details.isActive'` - the value for table column `isActive` will be taken from `$.details.isActive` node from received Json.
295+
The mapping from Http Json Response to SQL table schema is done via Flink's Json Format [5].
301296

302297
## TODO
303298

304299
### HTTP TableLookup Source
305300
- Implement caches.
306-
- Add support for other Flink types. Currently, STRING type is only fully supported.
307301
- Think about Retry Policy for Http Request
308-
- Use Flink Format [7] to parse Json response
309302
- Add Configurable Timeout value
310303
- Check other `//TODO`'s.
311304

@@ -321,9 +314,7 @@ For example `'field.isActive.path' = '$.details.isActive'` - the value for table
321314
</br>
322315
[4] https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/connector/base/sink/AsyncSinkBase.html
323316
</br>
324-
[5] https://support.smartbear.com/alertsite/docs/monitors/api/endpoint/jsonpath.html
317+
[5] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/
325318
</br>
326319
[6] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/
327320
</br>
328-
[7] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/
329-
</br>

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,18 @@ public final class HttpConnectorConfigConstants {
2626
public static final String LOOKUP_SOURCE_HEADER_PREFIX = GID_CONNECTOR_HTTP
2727
+ "source.lookup.header.";
2828

29-
// Error code handling configuration.
30-
public static final String HTTP_ERROR_CODE_WHITE_LIST =
29+
// --------- Error code handling configuration ---------
30+
public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST =
3131
GID_CONNECTOR_HTTP + "sink.error.code.exclude";
3232

33-
public static final String HTTP_ERROR_CODES_LIST = GID_CONNECTOR_HTTP + "sink.error.code";
33+
public static final String HTTP_ERROR_SINK_CODES_LIST = GID_CONNECTOR_HTTP + "sink.error.code";
34+
35+
public static final String HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST =
36+
GID_CONNECTOR_HTTP + "source.lookup.error.code.exclude";
37+
38+
public static final String HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST =
39+
GID_CONNECTOR_HTTP + "source.lookup.error.code";
40+
41+
// -----------------------------------------------------
3442

3543
}

src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020

2121
import com.getindata.connectors.http.internal.SinkHttpClient;
2222
import com.getindata.connectors.http.internal.SinkHttpClientResponse;
23+
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
2324
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
24-
import com.getindata.connectors.http.internal.sink.httpclient.status.ComposeHttpStatusCodeChecker;
25-
import com.getindata.connectors.http.internal.sink.httpclient.status.HttpStatusCodeChecker;
25+
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker;
26+
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig;
27+
import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker;
2628
import com.getindata.connectors.http.internal.utils.ConfigUtils;
2729
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SINK_HEADER_PREFIX;
2830

@@ -51,7 +53,14 @@ public JavaNetSinkHttpClient(Properties properties) {
5153

5254
// TODO Inject this via constructor when implementing a response processor.
5355
// Processor will be injected and it will wrap statusChecker implementation.
54-
this.statusCodeChecker = new ComposeHttpStatusCodeChecker(properties);
56+
ComposeHttpStatusCodeCheckerConfig checkerConfig =
57+
ComposeHttpStatusCodeCheckerConfig.builder()
58+
.properties(properties)
59+
.whiteListPrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST)
60+
.errorCodePrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST)
61+
.build();
62+
63+
this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig);
5564
}
5665

5766
@Override
Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1-
package com.getindata.connectors.http.internal.sink.httpclient.status;
1+
package com.getindata.connectors.http.internal.status;
22

33
import java.util.Arrays;
44
import java.util.HashSet;
55
import java.util.Properties;
66
import java.util.Set;
77
import java.util.stream.Collectors;
88

9+
import lombok.AccessLevel;
10+
import lombok.Builder;
11+
import lombok.Data;
12+
import lombok.RequiredArgsConstructor;
913
import org.apache.flink.util.Preconditions;
1014
import org.apache.flink.util.StringUtils;
1115

@@ -36,9 +40,9 @@ public class ComposeHttpStatusCodeChecker implements HttpStatusCodeChecker {
3640
*/
3741
private final Set<HttpStatusCodeChecker> errorCodes;
3842

39-
public ComposeHttpStatusCodeChecker(Properties properties) {
40-
excludedCodes = prepareWhiteList(properties);
41-
errorCodes = prepareErrorCodes(properties);
43+
public ComposeHttpStatusCodeChecker(ComposeHttpStatusCodeCheckerConfig config) {
44+
excludedCodes = prepareWhiteList(config);
45+
errorCodes = prepareErrorCodes(config);
4246
}
4347

4448
/**
@@ -68,14 +72,19 @@ public boolean isErrorCode(int statusCode) {
6872
.anyMatch(httpStatusCodeChecker -> httpStatusCodeChecker.isErrorCode(statusCode));
6973
}
7074

71-
private Set<HttpStatusCodeChecker> prepareErrorCodes(Properties properties) {
72-
String sErrorCodes =
73-
properties.getProperty(HttpConnectorConfigConstants.HTTP_ERROR_CODES_LIST, "");
75+
private Set<HttpStatusCodeChecker> prepareErrorCodes(
76+
ComposeHttpStatusCodeCheckerConfig config) {
7477

75-
if (StringUtils.isNullOrWhitespaceOnly(sErrorCodes)) {
78+
Properties properties = config.getProperties();
79+
String errorCodePrefix = config.getErrorCodePrefix();
80+
81+
String errorCodes =
82+
properties.getProperty(errorCodePrefix, "");
83+
84+
if (StringUtils.isNullOrWhitespaceOnly(errorCodes)) {
7685
return DEFAULT_ERROR_CODES;
7786
} else {
78-
String[] splitCodes = sErrorCodes.split(HttpConnectorConfigConstants.ERROR_CODE_DELIM);
87+
String[] splitCodes = errorCodes.split(HttpConnectorConfigConstants.ERROR_CODE_DELIM);
7988
return prepareErrorCodes(splitCodes);
8089
}
8190
}
@@ -110,9 +119,14 @@ private Set<HttpStatusCodeChecker> prepareErrorCodes(String[] statusCodes) {
110119
return (errorCodes.isEmpty()) ? DEFAULT_ERROR_CODES : errorCodes;
111120
}
112121

113-
private Set<WhiteListHttpStatusCodeChecker> prepareWhiteList(Properties properties) {
122+
private Set<WhiteListHttpStatusCodeChecker> prepareWhiteList(
123+
ComposeHttpStatusCodeCheckerConfig config) {
124+
125+
Properties properties = config.getProperties();
126+
String whiteListPrefix = config.getWhiteListPrefix();
127+
114128
return Arrays.stream(
115-
properties.getProperty(HttpConnectorConfigConstants.HTTP_ERROR_CODE_WHITE_LIST, "")
129+
properties.getProperty(whiteListPrefix, "")
116130
.split(HttpConnectorConfigConstants.ERROR_CODE_DELIM))
117131
.filter(sCode -> !StringUtils.isNullOrWhitespaceOnly(sCode))
118132
.map(String::trim)
@@ -132,4 +146,16 @@ private Set<WhiteListHttpStatusCodeChecker> prepareWhiteList(Properties properti
132146
private boolean isTypeCode(final String code) {
133147
return code.charAt(1) == 'X' && code.charAt(2) == 'X';
134148
}
149+
150+
@Data
151+
@Builder
152+
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
153+
public static class ComposeHttpStatusCodeCheckerConfig {
154+
155+
private final String whiteListPrefix;
156+
157+
private final String errorCodePrefix;
158+
159+
private final Properties properties;
160+
}
135161
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.getindata.connectors.http.internal.sink.httpclient.status;
1+
package com.getindata.connectors.http.internal.status;
22

33
import java.util.HashMap;
44
import java.util.Map;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.getindata.connectors.http.internal.sink.httpclient.status;
1+
package com.getindata.connectors.http.internal.status;
22

33
/**
44
* Base interface for all classes that would validate HTTP status
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.getindata.connectors.http.internal.sink.httpclient.status;
1+
package com.getindata.connectors.http.internal.status;
22

33
import lombok.EqualsAndHashCode;
44
import lombok.RequiredArgsConstructor;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.getindata.connectors.http.internal.sink.httpclient.status;
1+
package com.getindata.connectors.http.internal.status;
22

33
import lombok.EqualsAndHashCode;
44

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.getindata.connectors.http.internal.sink.httpclient.status;
1+
package com.getindata.connectors.http.internal.status;
22

33
import lombok.EqualsAndHashCode;
44
import lombok.RequiredArgsConstructor;

src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,13 @@
1818
import org.apache.flink.annotation.VisibleForTesting;
1919
import org.apache.flink.api.common.serialization.DeserializationSchema;
2020
import org.apache.flink.table.data.RowData;
21+
import org.apache.flink.util.StringUtils;
2122

2223
import com.getindata.connectors.http.internal.PollingClient;
24+
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
25+
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker;
26+
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig;
27+
import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker;
2328
import com.getindata.connectors.http.internal.utils.ConfigUtils;
2429
import com.getindata.connectors.http.internal.utils.uri.URIBuilder;
2530
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_PREFIX;
@@ -33,6 +38,8 @@ public class JavaNetHttpPollingClient implements PollingClient<RowData> {
3338

3439
private final HttpClient httpClient;
3540

41+
private final HttpStatusCodeChecker statusCodeChecker;
42+
3643
public JavaNetHttpPollingClient(
3744
HttpClient httpClient,
3845
DeserializationSchema<RowData> runtimeDecoder,
@@ -49,6 +56,19 @@ public JavaNetHttpPollingClient(
4956
);
5057

5158
this.headersAndValues = ConfigUtils.toHeaderAndValueArray(headerMap);
59+
60+
// TODO Inject this via constructor when implementing a response processor.
61+
// Processor will be injected and it will wrap statusChecker implementation.
62+
ComposeHttpStatusCodeCheckerConfig checkerConfig =
63+
ComposeHttpStatusCodeCheckerConfig.builder()
64+
.properties(options.getProperties())
65+
.whiteListPrefix(
66+
HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST
67+
)
68+
.errorCodePrefix(HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST)
69+
.build();
70+
71+
this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig);
5272
}
5373

5474
private final DeserializationSchema<RowData> runtimeDecoder;
@@ -72,7 +92,7 @@ private Optional<RowData> queryAndProcess(List<LookupArg> params) throws Excepti
7292

7393
HttpRequest request = buildHttpRequest(params);
7494
HttpResponse<String> response = httpClient.send(request, BodyHandlers.ofString());
75-
return processHttpResponse(response);
95+
return processHttpResponse(response, request);
7696
}
7797

7898
private HttpRequest buildHttpRequest(List<LookupArg> params) throws URISyntaxException {
@@ -99,21 +119,38 @@ private URI buildUri(List<LookupArg> params) throws URISyntaxException {
99119
}
100120

101121
// TODO Think about handling 2xx responses other than 200
102-
private Optional<RowData> processHttpResponse(HttpResponse<String> response)
103-
throws IOException {
122+
private Optional<RowData> processHttpResponse(
123+
HttpResponse<String> response,
124+
HttpRequest request) throws IOException {
125+
126+
if (response == null) {
127+
log.warn("Null Http response for request " + request.uri().toString());
128+
return Optional.empty();
129+
}
130+
104131
String body = response.body();
105132
int statusCode = response.statusCode();
106133

107134
log.debug("Received {} status code for RestTableSource Request", statusCode);
108-
if (statusCode == 200) {
135+
if (notErrorCodeAndNotEmptyBody(body, statusCode)) {
109136
log.trace("Server response body" + body);
110137
return Optional.ofNullable(runtimeDecoder.deserialize(body.getBytes()));
111138
} else {
112-
log.warn("Http Error Body - {}", body);
139+
log.warn(
140+
String.format("Returned Http status code was invalid or returned body was empty. "
141+
+ "Status Code [%s], "
142+
+ "response body [%s]", statusCode, body)
143+
);
144+
113145
return Optional.empty();
114146
}
115147
}
116148

149+
private boolean notErrorCodeAndNotEmptyBody(String body, int statusCode) {
150+
return !(StringUtils.isNullOrWhitespaceOnly(body) || statusCodeChecker.isErrorCode(
151+
statusCode));
152+
}
153+
117154
@VisibleForTesting
118155
String[] getHeadersAndValues() {
119156
return Arrays.copyOf(headersAndValues, headersAndValues.length);

src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,17 @@
3232
public class HttpSinkConnectionTest {
3333

3434
private static final int SERVER_PORT = 9090;
35-
private static final Set<Integer> messageIds =
36-
IntStream.range(0, 50).boxed().collect(Collectors.toSet());
37-
private static final List<String> messages =
38-
messageIds.stream().map(i -> "{\"http-sink-id\":" + i + "}").collect(Collectors.toList());
35+
36+
private static final Set<Integer> messageIds = IntStream.range(0, 50)
37+
.boxed()
38+
.collect(Collectors.toSet());
39+
40+
private static final List<String> messages = messageIds.stream()
41+
.map(i -> "{\"http-sink-id\":" + i + "}")
42+
.collect(Collectors.toList());
3943

4044
private StreamExecutionEnvironment env;
45+
4146
private WireMockServer wireMockServer;
4247

4348
@BeforeEach

0 commit comments

Comments
 (0)