Skip to content

Commit b2c6371

Browse files
authored
ESP-99_ErrorCodes - Error Code configuration from properties. (#16)
* ESP-99_ErrorCodes - Error Code configuration from properties. Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com> * ESP-99_ErrorCodes - Error Code configuration from properties. Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com> * ESP-99_ErrorCodes - Changes after code review. Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com> * ESP-99_ErrorCodes - Changes after code review. Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
1 parent b3cce3d commit b2c6371

File tree

13 files changed

+521
-20
lines changed

13 files changed

+521
-20
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
### Fixed
66
- Fix JavaDoc errors.
77

8+
### Added
9+
- Add new properties `gid.connector.http.sink.error.code` and `gid.connector.http.sink.error.code.exclude`
10+
to set HTTP status code that should be interpreted as errors.
11+
812
## [0.3.0] - 2022-07-21
913

1014
- Package refactoring. Hide internal classes that does not have to be used by API users under "internal" package.

README.md

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,9 @@ The HTTP TableLookup connector that allows for pulling data from external system
88
**Note**: The `main` branch may be in an *unstable or even broken state* during development.
99
Please use [releases](https://github.com/getindata/flink-http-connector/releases) instead of the `main` branch in order to get a stable set of binaries.
1010

11-
#### HTTP TableLookup Source
1211
The goal for HTTP TableLookup connector was to use it in Flink SQL statement as a standard table that can be later joined with other stream using pure SQL Flink.
1312

1413
Currently, HTTP TableLookup connector supports only Lookup Joins [1] and expects JSON as a response body. It also supports only the STRING types.
15-
16-
#### HTTP Sink
1714
`HttpSink` supports both Streaming API (when using [HttpSink](src/main/java/com/getindata/connectors/http/internal/sink/HttpSink.java) built using [HttpSinkBuilder](src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkBuilder.java)) and the Table API (using connector created in [HttpDynamicTableSinkFactory](src/main/java/com/getindata/connectors/http/internal/table/HttpDynamicTableSinkFactory.java)).
1815

1916
## Prerequisites
@@ -133,21 +130,19 @@ CREATE TABLE http (
133130
)
134131
```
135132

136-
## Implementation
137-
Implementation of an HTTP source connector is based on Flink's `TableFunction` and `AsyncTableFunction` classes.
138-
To be more specific we are using a `LookupTableSource`. Unfortunately Flink's new unified source interface [2] cannot be used for this type of source.
139-
Issue was discussed on Flink's user mailing list - https://lists.apache.org/thread/tx2w1m15zt5qnvt924mmbvr7s8rlyjmw
133+
#### HTTP status code handler (currently supported only for HTTP Sink)
134+
Http Sink connector allows defining list of HTTP status codes that should be treated as errors.
135+
By default all 400s and 500s response codes will be interpreted as error code.
140136

141-
Implementation of an HTTP Sink is based on Flink's `AsyncSinkBase` introduced in Flink 1.15 [3, 4].
137+
This behavior can be changed by using below properties in table definition (DDL) or passing it via
138+
`setProperty' method from Sink's builder. The property names are:
139+
- `gid.connector.http.sink.error.code` used to defined HTTP status code value that should be treated as error for example 404.
140+
Many status codes can be defined in one value, where each code should be separated with comma, for example:
141+
`401, 402, 403`. User can use this property also to define a type code mask. In that case, all codes from given HTTP response type will be treated as errors.
142+
An example of such a mask would be `3XX, 4XX, 5XX`. In this case, all 300s, 400s and 500s status codes will be treated as errors.
143+
- `gid.connector.http.sink.error.code.exclude` used to exclude a HTTP code from error list. Many status codes can be defined in one value, where each code should be separated with comma, for example:
144+
`401, 402, 403`. In this example, codes 401, 402 and 403 would not be interpreted as error codes.
142145

143-
## Http Response to Table schema mapping
144-
The mapping from Http Json Response to SQL table schema is done via Json Paths [5].
145-
This is achieved thanks to `com.jayway.jsonpath:json-path` library.
146-
147-
If no `root` or `field.#.path` option is defined, the connector will use the column name as json path and will try to look for Json Node with that name in received Json. If no node with a given name is found, the connector will return `null` as value for this field.
148-
149-
If the `field.#.path` option is defined, connector will use given Json path from option's value in order to find Json data that should be used for this column.
150-
For example `'field.isActive.path' = '$.details.isActive'` - the value for table column `isActive` will be taken from `$.details.isActive` node from received Json.
151146

152147
## Table API Connector Options
153148
### HTTP TableLookup Source
@@ -225,6 +220,23 @@ As a result, you should see a table with joined records like so:
225220

226221
The `msg` column shows parameters used with REST call for given JOIN record.
227222

223+
## Implementation
224+
### HTTP Source
225+
Implementation of an HTTP source connector is based on Flink's `TableFunction` and `AsyncTableFunction` classes.
226+
To be more specific we are using a `LookupTableSource`. Unfortunately Flink's new unified source interface [2] cannot be used for this type of source.
227+
Issue was discussed on Flink's user mailing list - https://lists.apache.org/thread/tx2w1m15zt5qnvt924mmbvr7s8rlyjmw
228+
229+
Implementation of an HTTP Sink is based on Flink's `AsyncSinkBase` introduced in Flink 1.15 [3, 4].
230+
231+
#### Http Response to Table schema mapping
232+
The mapping from Http Json Response to SQL table schema is done via Json Paths [5].
233+
This is achieved thanks to `com.jayway.jsonpath:json-path` library.
234+
235+
If no `root` or `field.#.path` option is defined, the connector will use the column name as json path and will try to look for Json Node with that name in received Json. If no node with a given name is found, the connector will return `null` as value for this field.
236+
237+
If the `field.#.path` option is defined, connector will use given Json path from option's value in order to find Json data that should be used for this column.
238+
For example `'field.isActive.path' = '$.details.isActive'` - the value for table column `isActive` will be taken from `$.details.isActive` node from received Json.
239+
228240
## TODO
229241

230242
### HTTP TableLookup Source

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
@NoArgsConstructor(access = AccessLevel.NONE)
1212
public final class HttpConnectorConfigConstants {
1313

14+
public static final String ERROR_CODE_DELIM = ",";
15+
1416
/**
1517
* A property prefix for http connector.
1618
*/
@@ -21,4 +23,10 @@ public final class HttpConnectorConfigConstants {
2123
*/
2224
public static final String SINK_HEADER_PREFIX = GID_CONNECTOR_HTTP + "sink.header.";
2325

26+
// Error code handling configuration.
27+
public static final String HTTP_ERROR_CODE_WHITE_LIST =
28+
GID_CONNECTOR_HTTP + "sink.error.code.exclude";
29+
30+
public static final String HTTP_ERROR_CODES_LIST = GID_CONNECTOR_HTTP + "sink.error.code";
31+
2432
}

src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import com.getindata.connectors.http.internal.SinkHttpClient;
2222
import com.getindata.connectors.http.internal.SinkHttpClientResponse;
2323
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
24+
import com.getindata.connectors.http.internal.sink.httpclient.status.ComposeHttpStatusCodeChecker;
25+
import com.getindata.connectors.http.internal.sink.httpclient.status.HttpStatusCodeChecker;
2426
import com.getindata.connectors.http.internal.utils.ConfigUtils;
2527
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SINK_HEADER_PREFIX;
2628

@@ -35,6 +37,8 @@ public class JavaNetSinkHttpClient implements SinkHttpClient {
3537

3638
private final String[] headersAndValues;
3739

40+
private final HttpStatusCodeChecker statusCodeChecker;
41+
3842
public JavaNetSinkHttpClient(Properties properties) {
3943
this.httpClient = HttpClient.newBuilder()
4044
.followRedirects(HttpClient.Redirect.NORMAL)
@@ -44,6 +48,10 @@ public JavaNetSinkHttpClient(Properties properties) {
4448
ConfigUtils.propertiesToMap(properties, SINK_HEADER_PREFIX, String.class);
4549

4650
this.headersAndValues = ConfigUtils.toHeaderAndValueArray(headerMap);
51+
52+
// TODO Inject this via constructor when implementing a response processor.
53+
// Processor will be injected and it will wrap statusChecker implementation.
54+
this.statusCodeChecker = new ComposeHttpStatusCodeChecker(properties);
4755
}
4856

4957
@Override
@@ -99,8 +107,9 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse(
99107

100108
for (var response : responses) {
101109
var sinkRequestEntry = response.getSinkRequestEntry();
110+
// TODO Add response processor here and orchestrate it with statusCodeChecker.
102111
if (response.getResponse().isEmpty()
103-
|| response.getResponse().get().statusCode() >= 400) {
112+
|| statusCodeChecker.isErrorCode(response.getResponse().get().statusCode())) {
104113
failedResponses.add(sinkRequestEntry);
105114
} else {
106115
successfulResponses.add(sinkRequestEntry);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package com.getindata.connectors.http.internal.sink.httpclient.status;
2+
3+
import java.util.Arrays;
4+
import java.util.HashSet;
5+
import java.util.Properties;
6+
import java.util.Set;
7+
import java.util.stream.Collectors;
8+
9+
import org.apache.flink.util.Preconditions;
10+
import org.apache.flink.util.StringUtils;
11+
12+
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
13+
14+
/**
15+
* An implementation of {@link HttpStatusCodeChecker} that checks Http Status code against
16+
* white list, concrete value or {@link HttpResponseCodeType}
17+
*/
18+
public class ComposeHttpStatusCodeChecker implements HttpStatusCodeChecker {
19+
20+
private static final Set<HttpStatusCodeChecker> DEFAULT_ERROR_CODES =
21+
Set.of(
22+
new TypeStatusCodeChecker(HttpResponseCodeType.CLIENT_ERROR),
23+
new TypeStatusCodeChecker(HttpResponseCodeType.SERVER_ERROR)
24+
);
25+
26+
private static final int MIN_HTTP_STATUS_CODE = 100;
27+
28+
/**
29+
* Set of {@link HttpStatusCodeChecker} for white listed status codes.
30+
*/
31+
private final Set<WhiteListHttpStatusCodeChecker> excludedCodes;
32+
33+
/**
34+
* Set of {@link HttpStatusCodeChecker} that check status code againts value match or {@link
35+
* HttpResponseCodeType} match.
36+
*/
37+
private final Set<HttpStatusCodeChecker> errorCodes;
38+
39+
public ComposeHttpStatusCodeChecker(Properties properties) {
40+
excludedCodes = prepareWhiteList(properties);
41+
errorCodes = prepareErrorCodes(properties);
42+
}
43+
44+
/**
45+
* Checks whether given status code is considered as a error code.
46+
* This implementation checks if status code matches any single value mask like "404"
47+
* or http type mask such as "4XX". Code that matches one of those masks and is not on a
48+
* white list will be considered as error code.
49+
* @param statusCode http status code to assess.
50+
* @return true if status code is considered as error or false if not.
51+
*/
52+
public boolean isErrorCode(int statusCode) {
53+
54+
Preconditions.checkArgument(
55+
statusCode >= MIN_HTTP_STATUS_CODE,
56+
String.format(
57+
"Provided invalid Http status code %s,"
58+
+ " status code should be equal or bigger than %d.",
59+
statusCode,
60+
MIN_HTTP_STATUS_CODE)
61+
);
62+
63+
boolean isWhiteListed = excludedCodes.stream()
64+
.anyMatch(check -> check.isWhiteListed(statusCode));
65+
66+
return !isWhiteListed
67+
&& errorCodes.stream()
68+
.anyMatch(httpStatusCodeChecker -> httpStatusCodeChecker.isErrorCode(statusCode));
69+
}
70+
71+
private Set<HttpStatusCodeChecker> prepareErrorCodes(Properties properties) {
72+
String sErrorCodes =
73+
properties.getProperty(HttpConnectorConfigConstants.HTTP_ERROR_CODES_LIST, "");
74+
75+
if (StringUtils.isNullOrWhitespaceOnly(sErrorCodes)) {
76+
return DEFAULT_ERROR_CODES;
77+
} else {
78+
String[] splitCodes = sErrorCodes.split(HttpConnectorConfigConstants.ERROR_CODE_DELIM);
79+
return prepareErrorCodes(splitCodes);
80+
}
81+
}
82+
83+
/**
84+
* Process given array of status codes and assign them to
85+
* {@link SingleValueHttpStatusCodeChecker} for full codes such as 100, 404 etc. or to
86+
* {@link TypeStatusCodeChecker} for codes that were constructed with "XX" mask
87+
*/
88+
private Set<HttpStatusCodeChecker> prepareErrorCodes(String[] statusCodes) {
89+
90+
Set<HttpStatusCodeChecker> errorCodes = new HashSet<>();
91+
for (String sCode : statusCodes) {
92+
if (!StringUtils.isNullOrWhitespaceOnly(sCode)) {
93+
String trimCode = sCode.toUpperCase().trim();
94+
Preconditions.checkArgument(
95+
trimCode.length() == 3,
96+
"Status code should contain three characters. Provided [%s]",
97+
trimCode);
98+
99+
// at this point we have trim, upper case 3 character status code.
100+
if (isTypeCode(trimCode)) {
101+
int code = Integer.parseInt(trimCode.replace("X", ""));
102+
errorCodes.add(new TypeStatusCodeChecker(HttpResponseCodeType.getByCode(code)));
103+
} else {
104+
errorCodes.add(
105+
new SingleValueHttpStatusCodeChecker(Integer.parseInt(trimCode))
106+
);
107+
}
108+
}
109+
}
110+
return (errorCodes.isEmpty()) ? DEFAULT_ERROR_CODES : errorCodes;
111+
}
112+
113+
private Set<WhiteListHttpStatusCodeChecker> prepareWhiteList(Properties properties) {
114+
return Arrays.stream(
115+
properties.getProperty(HttpConnectorConfigConstants.HTTP_ERROR_CODE_WHITE_LIST, "")
116+
.split(HttpConnectorConfigConstants.ERROR_CODE_DELIM))
117+
.filter(sCode -> !StringUtils.isNullOrWhitespaceOnly(sCode))
118+
.map(String::trim)
119+
.mapToInt(Integer::parseInt)
120+
.mapToObj(WhiteListHttpStatusCodeChecker::new)
121+
.collect(Collectors.toSet());
122+
}
123+
124+
/**
125+
* This method checks if "code" param matches "digit + XX" mask. This method expects that
126+
* provided string will be 3 elements long, trim and upper case.
127+
*
128+
* @param code to check if it contains XX on second ant third position. Parameter is expected to
129+
* be 3 characters long, trim and uppercase.
130+
* @return true if string matches "anything + XX" and false if not.
131+
*/
132+
private boolean isTypeCode(final String code) {
133+
return code.charAt(1) == 'X' && code.charAt(2) == 'X';
134+
}
135+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.getindata.connectors.http.internal.sink.httpclient.status;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
6+
/**
7+
* This enum represents HTTP response code types, grouped by "hundreds" digit.
8+
*/
9+
public enum HttpResponseCodeType {
10+
11+
INFO(1),
12+
SUCCESS(2),
13+
REDIRECTION(3),
14+
CLIENT_ERROR(4),
15+
SERVER_ERROR(5);
16+
17+
private static final Map<Integer, HttpResponseCodeType> map;
18+
19+
static {
20+
map = new HashMap<>();
21+
for (HttpResponseCodeType httpResponseCodeType : HttpResponseCodeType.values()) {
22+
map.put(httpResponseCodeType.httpTypeCode, httpResponseCodeType);
23+
}
24+
}
25+
26+
private final int httpTypeCode;
27+
28+
HttpResponseCodeType(int httpTypeCode) {
29+
this.httpTypeCode = httpTypeCode;
30+
}
31+
32+
/**
33+
* @param statusCode Http status code to get the {@link HttpResponseCodeType} instance for.
34+
* @return a {@link HttpResponseCodeType} instance based on http type code, for example {@code
35+
* HttpResponseCodeType.getByCode(1)} will return {@link HttpResponseCodeType#INFO} type.
36+
*/
37+
public static HttpResponseCodeType getByCode(int statusCode) {
38+
return map.get(statusCode);
39+
}
40+
41+
/**
42+
* @return a "hundreds" digit that represents given {@link HttpResponseCodeType} instance.
43+
* For example {@code HttpResponseCodeType.INFO.getHttpTypeCode()} will return 1 since HTTP
44+
* information repossess have status codes in range 100 - 199.
45+
*/
46+
public int getHttpTypeCode() {
47+
return this.httpTypeCode;
48+
}
49+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.getindata.connectors.http.internal.sink.httpclient.status;
2+
3+
/**
4+
* Base interface for all classes that would validate HTTP status
5+
* code whether it is an error or not.
6+
*/
7+
public interface HttpStatusCodeChecker {
8+
9+
/**
10+
* Validates http status code wheter it is considered as error code. The logic for
11+
* what status codes are considered as "errors" depends on the concreted implementation
12+
* @param statusCode http status code to assess.
13+
* @return true if statusCode is considered as Error and false if not.
14+
*/
15+
boolean isErrorCode(int statusCode);
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.getindata.connectors.http.internal.sink.httpclient.status;
2+
3+
import lombok.EqualsAndHashCode;
4+
import lombok.RequiredArgsConstructor;
5+
6+
/**
7+
* An implementation of {@link HttpStatusCodeChecker} that validates status code against
8+
* constant value.
9+
*/
10+
@RequiredArgsConstructor
11+
@EqualsAndHashCode
12+
public class SingleValueHttpStatusCodeChecker implements HttpStatusCodeChecker {
13+
14+
/**
15+
* A reference http status code to compare with.
16+
*/
17+
private final int errorCode;
18+
19+
/**
20+
* Validates given statusCode against constant value.
21+
* @param statusCode http status code to assess.
22+
* @return true if status code is considered as error or false if not.
23+
*/
24+
@Override
25+
public boolean isErrorCode(int statusCode) {
26+
return errorCode == statusCode;
27+
}
28+
}

0 commit comments

Comments
 (0)