Skip to content

Commit e39a51d

Browse files
authored
Introduce custom HTTP Sink Post Request Callback and its Factory (#23)
* Introduce custom HTTP Sink Post Request Callback * Address review comments: remove 'Sink' from Callback class name * Address review comments: make Request type generic * Address review comments: accept null responses in callback * Address review comments: set default callback in HttpSink
1 parent 938323b commit e39a51d

22 files changed

+458
-55
lines changed

CHANGELOG.md

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,17 @@
22

33
## [Unreleased]
44

5-
### Fixed
6-
- Fix JavaDoc errors.
7-
85
### Added
96
- Add new properties `gid.connector.http.sink.error.code`,`gid.connector.http.sink.error.code.exclude`,
107
`gid.connector.http.source.lookup.error.code` and `gid.connector.http.source.lookup.error.code.exclude`
118
to set HTTP status codes that should be interpreted as errors both for HTTP Sink and HTTP Lookup Source.
12-
- Use Flink's format support to Http Lookup Source.
9+
- Use Flink's format support to Http Lookup Source.
1310
- Add HTTP Lookup source client header configuration via properties.
11+
- Add [HttpPostRequestCallback](src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java) and
12+
[HttpPostRequestCallbackFactory](src/main/java/com/getindata/connectors/http/HttpPostRequestCallbackFactory.java)
13+
interfaces (along with a "default"
14+
[Slf4jHttpPostRequestCallback](src/main/java/com/getindata/connectors/http/internal/table/sink/Slf4jHttpPostRequestCallback.java)
15+
implementation) for customizable processing of HTTP Sink requests and responses in Table API.
1416

1517
### Changed
1618
- Change dependency scope for `org.apache.flink.flink-connector-base` from `compile` to `provided`.
@@ -20,6 +22,10 @@
2022
- Remove dependency on `org.apache.httpcomponents.httpclient`from production code. Dependency is only for test scope.
2123
- Removed dependency on `com.jayway.jsonpath.json-path`
2224

25+
### Fixed
26+
27+
- Fix JavaDoc errors.
28+
2329
## [0.3.0] - 2022-07-21
2430

2531
- Package refactoring. Hide internal classes that does not have to be used by API users under "internal" package.

README.md

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,19 @@ CREATE TABLE http (
184184
)
185185
```
186186

187+
#### Custom request/response callback
188+
Http Sink processes responses that it gets from the HTTP endpoint along their respective requests. One can customize the
189+
behaviour of the additional stage of processing done by Table API Sink by implementing
190+
[HttpPostRequestCallback](src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java) and
191+
[HttpPostRequestCallbackFactory](src/main/java/com/getindata/connectors/http/HttpPostRequestCallbackFactory.java)
192+
interfaces. Custom implementations of `HttpSinkRequestCallbackFactory` can be registered along other factories in
193+
`resources/META-INF.services/org.apache.flink.table.factories.Factory` file and then referenced by their identifiers in
194+
the HttpSink DDL property field `gid.connector.http.sink.request-callback`.
195+
196+
A default implementation that logs those pairs as *INFO* level logs using Slf4j
197+
([Slf4jHttpPostRequestCallback](src/main/java/com/getindata/connectors/http/internal/table/sink/Slf4jHttpPostRequestCallback.java))
198+
is provided.
199+
187200
## HTTP status code handler
188201
Http Sink and Lookup Source connectors allow defining list of HTTP status codes that should be treated as errors.
189202
By default all 400s and 500s response codes will be interpreted as error code.
@@ -201,25 +214,30 @@ An example of such a mask would be `3XX, 4XX, 5XX`. In this case, all 300s, 400s
201214

202215
## Table API Connector Options
203216
### HTTP TableLookup Source
204-
| Option | Required | Description/Value |
205-
|--------------|----------|----------------------------------------------------------------------------------------------------------|
206-
| connector | required | The Value should be set to _rest-lookup_ |
207-
| format | required | Flink's format name that should be used to decode REST response, Use `json` for a typical REST endpoint. |
208-
| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ |
209-
| asyncPolling | optional | true/false - determines whether Async Pooling should be used. Mechanism is based on Flink's Async I/O. |
217+
| Option | Required | Description/Value |
218+
|----------------------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------|
219+
| connector | required | The Value should be set to _rest-lookup_ |
220+
| format | required | Flink's format name that should be used to decode REST response, Use `json` for a typical REST endpoint. |
221+
| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ |
222+
| asyncPolling | optional | true/false - determines whether Async Pooling should be used. Mechanism is based on Flink's Async I/O. |
223+
| gid.connector.http.lookup.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Source, separated with comma. |
224+
| gid.connector.http.lookup.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.lookup.error.code` list, separated with comma. |
210225

211226
### HTTP Sink
212-
| Option | Required | Description/Value |
213-
|----------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
214-
| connector | required | Specify what connector to use. For HTTP Sink it should be set to _'http-sink'_. |
215-
| url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. |
216-
| format | required | Specify what format to use. |
217-
| insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. |
218-
| sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. |
219-
| sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. |
220-
| sink.requests.max-buffered | optional | Maximum number of buffered records before applying backpressure. |
221-
| sink.flush-buffer.size | optional | The maximum size of a batch of entries that may be sent to the HTTP endpoint measured in bytes. |
222-
| sink.flush-buffer.timeout | optional | Threshold time in milliseconds for an element to be in a buffer before being flushed. |
227+
| Option | Required | Description/Value |
228+
|--------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
229+
| connector | required | Specify what connector to use. For HTTP Sink it should be set to _'http-sink'_. |
230+
| url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. |
231+
| format | required | Specify what format to use. |
232+
| insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. |
233+
| sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. |
234+
| sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. |
235+
| sink.requests.max-buffered | optional | Maximum number of buffered records before applying backpressure. |
236+
| sink.flush-buffer.size | optional | The maximum size of a batch of entries that may be sent to the HTTP endpoint measured in bytes. |
237+
| sink.flush-buffer.timeout | optional | Threshold time in milliseconds for an element to be in a buffer before being flushed. |
238+
| gid.connector.http.sink.request-callback | optional | Specify which `HttpPostRequestCallback` implementation to use. By default, it is set to `slf4j-logger` corresponding to `Slf4jHttpPostRequestCallback`. |
239+
| gid.connector.http.sink.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Sink, separated with comma. |
240+
| gid.connector.http.sink.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.sink.error.code` list, separated with comma. |
223241

224242
## Build and deployment
225243
To build the project locally you need to have `maven 3` and Java 11+. </br>

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ under the License.
325325
<exclude>**/HttpLookupConnectorOptionsUtil.class</exclude>
326326
<exclude>**/HttpTableSourceFactoryHelper.class</exclude>
327327
<exclude>**/HttpPollTableSource.class</exclude>
328+
<exclude>**/Slf4jHttpPostRequestCallback.class</exclude>
328329
</excludes>
329330
</configuration>
330331
<executions>
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.getindata.connectors.http;
2+
3+
import java.io.Serializable;
4+
import java.net.http.HttpResponse;
5+
import java.util.Map;
6+
7+
/**
8+
* An interface for post request callback action, processing a response and its respective request.
9+
*
10+
* <p>One can customize the behaviour of such a callback by implementing both
11+
* {@link HttpPostRequestCallback} and {@link HttpPostRequestCallbackFactory}.
12+
*
13+
* @param <RequestT> type of the HTTP request wrapper
14+
*/
15+
public interface HttpPostRequestCallback<RequestT> extends Serializable {
16+
/**
17+
* Process HTTP request and the matching response.
18+
* @param response HTTP response
19+
* @param requestEntry request's payload
20+
* @param endpointUrl the URL of the endpoint
21+
* @param headerMap mapping of header names to header values
22+
*/
23+
void call(
24+
HttpResponse<String> response,
25+
RequestT requestEntry,
26+
String endpointUrl,
27+
Map<String, String> headerMap
28+
);
29+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.getindata.connectors.http;
2+
3+
import org.apache.flink.table.factories.Factory;
4+
5+
import com.getindata.connectors.http.internal.table.sink.HttpDynamicSink;
6+
7+
/**
8+
* The {@link Factory} that dynamically creates and injects {@link HttpPostRequestCallback} to
9+
* {@link HttpDynamicSink}.
10+
*
11+
* <p>Custom implementations of {@link HttpPostRequestCallbackFactory} can be registered along
12+
* other factories in
13+
* <pre>resources/META-INF.services/org.apache.flink.table.factories.Factory</pre>
14+
* file and then referenced by their identifiers in the HttpSink DDL property field
15+
* <i>gid.connector.http.sink.request-callback</i>.
16+
*
17+
* <p>The following example shows the minimum Table API example to create a {@link HttpDynamicSink}
18+
* that uses a custom callback created by a factory that returns <i>my-callback</i> as its
19+
* identifier.
20+
*
21+
* <pre>{@code
22+
* CREATE TABLE http (
23+
* id bigint,
24+
* some_field string
25+
* ) with (
26+
* 'connector' = 'http-sink'
27+
* 'url' = 'http://example.com/myendpoint'
28+
* 'format' = 'json',
29+
* 'gid.connector.http.sink.request-callback' = 'my-callback'
30+
* )
31+
* }</pre>
32+
*
33+
* @param <RequestT> type of the HTTP request wrapper
34+
*/
35+
public interface HttpPostRequestCallbackFactory<RequestT> extends Factory {
36+
/**
37+
* @return {@link HttpPostRequestCallback} custom request callback instance
38+
*/
39+
HttpPostRequestCallback<RequestT> createHttpPostRequestCallback();
40+
}

src/main/java/com/getindata/connectors/http/HttpSink.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class HttpSink<InputT> extends HttpSinkInternal<InputT> {
4040
long maxTimeInBufferMS,
4141
long maxRecordSizeInBytes,
4242
String endpointUrl,
43+
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
4344
SinkHttpClientBuilder sinkHttpClientBuilder,
4445
Properties properties) {
4546
super(elementConverter,
@@ -50,6 +51,7 @@ public class HttpSink<InputT> extends HttpSinkInternal<InputT> {
5051
maxTimeInBufferMS,
5152
maxRecordSizeInBytes,
5253
endpointUrl,
54+
httpPostRequestCallback,
5355
sinkHttpClientBuilder,
5456
properties
5557
);

src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.getindata.connectors.http.internal.SinkHttpClientBuilder;
1212
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
1313
import com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient;
14+
import com.getindata.connectors.http.internal.table.sink.Slf4jHttpPostRequestCallback;
1415

1516
/**
1617
* Builder to construct {@link HttpSink}.
@@ -59,6 +60,9 @@ public class HttpSinkBuilder<InputT> extends
5960

6061
private static final SinkHttpClientBuilder DEFAULT_CLIENT_BUILDER = JavaNetSinkHttpClient::new;
6162

63+
private static final HttpPostRequestCallback<HttpSinkRequestEntry>
64+
DEFAULT_POST_REQUEST_CALLBACK = new Slf4jHttpPostRequestCallback();
65+
6266
private final Properties properties = new Properties();
6367

6468
// Mandatory field
@@ -70,8 +74,12 @@ public class HttpSinkBuilder<InputT> extends
7074
// If not defined, should be set to DEFAULT_CLIENT_BUILDER
7175
private SinkHttpClientBuilder sinkHttpClientBuilder;
7276

77+
// If not defined, should be set to DEFAULT_POST_REQUEST_CALLBACK
78+
private HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback;
79+
7380
HttpSinkBuilder() {
7481
this.sinkHttpClientBuilder = DEFAULT_CLIENT_BUILDER;
82+
this.httpPostRequestCallback = DEFAULT_POST_REQUEST_CALLBACK;
7583
}
7684

7785
/**
@@ -105,6 +113,12 @@ public HttpSinkBuilder<InputT> setElementConverter(
105113
return this;
106114
}
107115

116+
public HttpSinkBuilder<InputT> setHttpPostRequestCallback(
117+
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback) {
118+
this.httpPostRequestCallback = httpPostRequestCallback;
119+
return this;
120+
}
121+
108122
/**
109123
* Set property for Http Sink.
110124
* @param propertyName property name
@@ -137,6 +151,7 @@ public HttpSink<InputT> build() {
137151
Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS),
138152
Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B),
139153
endpointUrl,
154+
httpPostRequestCallback,
140155
sinkHttpClientBuilder,
141156
properties
142157
);

src/main/java/com/getindata/connectors/http/internal/SinkHttpClientBuilder.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,16 @@
55

66
import org.apache.flink.annotation.PublicEvolving;
77

8+
import com.getindata.connectors.http.HttpPostRequestCallback;
9+
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
10+
811
/**
912
* Builder building {@link SinkHttpClient}.
1013
*/
1114
@PublicEvolving
1215
public interface SinkHttpClientBuilder extends Serializable {
13-
14-
SinkHttpClient build(Properties properties);
16+
SinkHttpClient build(
17+
Properties properties,
18+
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback
19+
);
1520
}

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ public final class HttpConnectorConfigConstants {
3737

3838
public static final String HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST =
3939
GID_CONNECTOR_HTTP + "source.lookup.error.code";
40-
4140
// -----------------------------------------------------
4241

42+
public static final String SINK_REQUEST_CALLBACK_IDENTIFIER =
43+
GID_CONNECTOR_HTTP + "sink.request-callback";
4344
}

0 commit comments

Comments
 (0)