Skip to content

Commit 9b94c92

Browse files
authored
HTTP-42 Add support for Batch request submission in HTTP Sink. (#58)
* HTTP-42-BatchRequest - add support for batch request processing in HTTP sink. Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com> * HTTP-42-BatchRequest - add support for batch request processing in HTTP sink #2 Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com> * HTTP-42-BatchRequest - add support for batch request processing in HTTP sink #3 Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com> * HTTP-42-BatchRequest - add support for batch request processing in HTTP sink #4 Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com> * HTTP-42-BatchRequest - add support for batch request processing in HTTP sink #5 tests Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com> * HTTP-42-BatchRequest - add support for batch request processing in HTTP sink #6 tests Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com> * HTTP-42-BatchRequest - add support for batch request processing in HTTP sink #7 tests Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com> * HTTP-42-BatchRequest - add support for batch request processing in HTTP sink #8 tests Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com> * HTTP-42-BatchRequest - add support for batch request processing in HTTP sink #9 tests Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com> * HTTP-42-BatchRequest - add support for batch request processing in HTTP - doc. Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com> * HTTP-42-BatchRequest - add support for batch request processing in HTTP sink #9 Java doc Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com> * HTTP-42-BatchRequest - add support for batch request processing in HTTP sink #10 - changes after Code review - fix typos Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com> * HTTP-42-BatchRequest - add support for batch request processing in HTTP sink #10 - changes after Code review - fix batch split Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com> --------- Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
1 parent fd01857 commit 9b94c92

39 files changed

+1645
-208
lines changed

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,19 @@
22

33
## [Unreleased]
44

5+
### Added
6+
- Add support for batch request submission in HTTP sink. The mode can be changed by setting
7+
`gid.connector.http.sink.writer.request.mode` with value `single` or `batch`. The default value
8+
is `batch` bode which is breaking change comparing to previous versions. Additionally,
9+
`gid.connector.http.sink.request.batch.size` option can be used to set batch size. By default,
10+
batch size is 500 which is same as default value of HttpSink `maxBatchSize` parameter.
11+
12+
### Changed
13+
- Changed API for public HttpSink builder. The `setHttpPostRequestCallback` expects a `PostRequestCallback`
14+
of generic type [HttpRequest](src/main/java/com/getindata/connectors/http/internal/sink/httpclient/HttpRequest.java)
15+
instead `HttpSinkRequestEntry`.
16+
- Changed HTTP sink request and response processing thread pool sizes from 16 to 1.
17+
518
## [0.9.0] - 2023-02-10
619

720
- Add support for Flink 1.16.

README.md

Lines changed: 107 additions & 20 deletions
Large diffs are not rendered by default.

src/main/java/com/getindata/connectors/http/HttpSink.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.getindata.connectors.http.internal.SinkHttpClientBuilder;
1010
import com.getindata.connectors.http.internal.sink.HttpSinkInternal;
1111
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
12+
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
1213

1314
/**
1415
* A public implementation for {@code HttpSink} that performs async requests against a specified
@@ -41,7 +42,7 @@ public class HttpSink<InputT> extends HttpSinkInternal<InputT> {
4142
long maxTimeInBufferMS,
4243
long maxRecordSizeInBytes,
4344
String endpointUrl,
44-
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
45+
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
4546
HeaderPreprocessor headerPreprocessor,
4647
SinkHttpClientBuilder sinkHttpClientBuilder,
4748
Properties properties) {

src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.getindata.connectors.http.internal.SinkHttpClient;
1212
import com.getindata.connectors.http.internal.SinkHttpClientBuilder;
1313
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
14+
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
1415
import com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient;
1516
import com.getindata.connectors.http.internal.table.sink.Slf4jHttpPostRequestCallback;
1617
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
@@ -62,7 +63,7 @@ public class HttpSinkBuilder<InputT> extends
6263

6364
private static final SinkHttpClientBuilder DEFAULT_CLIENT_BUILDER = JavaNetSinkHttpClient::new;
6465

65-
private static final HttpPostRequestCallback<HttpSinkRequestEntry>
66+
private static final HttpPostRequestCallback<HttpRequest>
6667
DEFAULT_POST_REQUEST_CALLBACK = new Slf4jHttpPostRequestCallback();
6768

6869
private static final HeaderPreprocessor DEFAULT_HEADER_PREPROCESSOR =
@@ -80,7 +81,7 @@ public class HttpSinkBuilder<InputT> extends
8081
private SinkHttpClientBuilder sinkHttpClientBuilder;
8182

8283
// If not defined, should be set to DEFAULT_POST_REQUEST_CALLBACK
83-
private HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback;
84+
private HttpPostRequestCallback<HttpRequest> httpPostRequestCallback;
8485

8586
// If not defined, should be set to DEFAULT_HEADER_PREPROCESSOR
8687
private HeaderPreprocessor headerPreprocessor;
@@ -138,7 +139,7 @@ public HttpSinkBuilder<InputT> setElementConverter(
138139
}
139140

140141
public HttpSinkBuilder<InputT> setHttpPostRequestCallback(
141-
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback) {
142+
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback) {
142143
this.httpPostRequestCallback = httpPostRequestCallback;
143144
return this;
144145
}

src/main/java/com/getindata/connectors/http/internal/SinkHttpClientBuilder.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,22 @@
66
import org.apache.flink.annotation.PublicEvolving;
77

88
import com.getindata.connectors.http.HttpPostRequestCallback;
9-
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
9+
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
10+
import com.getindata.connectors.http.internal.sink.httpclient.RequestSubmitterFactory;
1011

1112
/**
1213
* Builder building {@link SinkHttpClient}.
1314
*/
1415
@PublicEvolving
1516
public interface SinkHttpClientBuilder extends Serializable {
1617

17-
// TODO Consider moving HttpPostRequestCallback and HeaderPreprocessor to be a
18+
// TODO Consider moving HttpPostRequestCallback and HeaderPreprocessor, RequestSubmitter to be a
1819
// SinkHttpClientBuilder fields. This method is getting more and more arguments.
1920
SinkHttpClient build(
2021
Properties properties,
21-
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
22-
HeaderPreprocessor headerPreprocessor
22+
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
23+
HeaderPreprocessor headerPreprocessor,
24+
RequestSubmitterFactory requestSubmitterFactory
25+
2326
);
2427
}

src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import lombok.ToString;
88

99
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
10+
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
1011

1112
/**
1213
* Data class holding {@link HttpSinkRequestEntry} instances that {@link SinkHttpClient} attempted
@@ -20,11 +21,11 @@ public class SinkHttpClientResponse {
2021
* A list of successfully written requests.
2122
*/
2223
@NonNull
23-
private final List<HttpSinkRequestEntry> successfulRequests;
24+
private final List<HttpRequest> successfulRequests;
2425

2526
/**
2627
* A list of requests that {@link SinkHttpClient} failed to write.
2728
*/
2829
@NonNull
29-
private final List<HttpSinkRequestEntry> failedRequests;
30+
private final List<HttpRequest> failedRequests;
3031
}

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
*/
1010
@UtilityClass
1111
@NoArgsConstructor(access = AccessLevel.NONE)
12+
// TODO Change this name to HttpConnectorConfigProperties
1213
public final class HttpConnectorConfigConstants {
1314

1415
public static final String PROP_DELIM = ",";
@@ -84,4 +85,14 @@ public final class HttpConnectorConfigConstants {
8485
GID_CONNECTOR_HTTP + "sink.writer.thread-pool.size";
8586

8687
// -----------------------------------------------------
88+
89+
90+
// ------ Sink request submitter settings ------
91+
public static final String SINK_HTTP_REQUEST_MODE =
92+
GID_CONNECTOR_HTTP + "sink.writer.request.mode";
93+
94+
public static final String SINK_HTTP_BATCH_REQUEST_SIZE =
95+
GID_CONNECTOR_HTTP + "sink.request.batch.size";
96+
97+
// ---------------------------------------------
8798
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.getindata.connectors.http.internal.config;
2+
3+
public enum SinkRequestSubmitMode {
4+
5+
SINGLE("single"),
6+
BATCH("batch");
7+
8+
private final String mode;
9+
10+
SinkRequestSubmitMode(String mode) {
11+
this.mode = mode;
12+
}
13+
14+
public String getMode() {
15+
return mode;
16+
}
17+
}

src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616
import com.getindata.connectors.http.SchemaLifecycleAwareElementConverter;
1717
import com.getindata.connectors.http.internal.HeaderPreprocessor;
1818
import com.getindata.connectors.http.internal.SinkHttpClientBuilder;
19+
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
20+
import com.getindata.connectors.http.internal.config.SinkRequestSubmitMode;
21+
import com.getindata.connectors.http.internal.sink.httpclient.BatchRequestSubmitterFactory;
22+
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
23+
import com.getindata.connectors.http.internal.sink.httpclient.PerRequestRequestSubmitterFactory;
24+
import com.getindata.connectors.http.internal.sink.httpclient.RequestSubmitterFactory;
1925

2026
/**
2127
* An internal implementation of HTTP Sink that performs async requests against a specified HTTP
@@ -59,7 +65,7 @@ public class HttpSinkInternal<InputT> extends AsyncSinkBase<InputT, HttpSinkRequ
5965
// makes it possible to serialize `HttpSink`
6066
private final SinkHttpClientBuilder sinkHttpClientBuilder;
6167

62-
private final HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback;
68+
private final HttpPostRequestCallback<HttpRequest> httpPostRequestCallback;
6369

6470
private final HeaderPreprocessor headerPreprocessor;
6571

@@ -74,7 +80,7 @@ protected HttpSinkInternal(
7480
long maxTimeInBufferMS,
7581
long maxRecordSizeInBytes,
7682
String endpointUrl,
77-
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
83+
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
7884
HeaderPreprocessor headerPreprocessor,
7985
SinkHttpClientBuilder sinkHttpClientBuilder,
8086
Properties properties) {
@@ -127,7 +133,12 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> cr
127133
getMaxTimeInBufferMS(),
128134
getMaxRecordSizeInBytes(),
129135
endpointUrl,
130-
sinkHttpClientBuilder.build(properties, httpPostRequestCallback, headerPreprocessor),
136+
sinkHttpClientBuilder.build(
137+
properties,
138+
httpPostRequestCallback,
139+
headerPreprocessor,
140+
getRequestSubmitterFactory()
141+
),
131142
Collections.emptyList(),
132143
properties
133144
);
@@ -149,15 +160,29 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> re
149160
getMaxTimeInBufferMS(),
150161
getMaxRecordSizeInBytes(),
151162
endpointUrl,
152-
sinkHttpClientBuilder.build(properties, httpPostRequestCallback, headerPreprocessor),
163+
sinkHttpClientBuilder.build(
164+
properties,
165+
httpPostRequestCallback,
166+
headerPreprocessor,
167+
getRequestSubmitterFactory()
168+
),
153169
recoveredState,
154170
properties
155171
);
156172
}
157173

158174
@Override
159175
public SimpleVersionedSerializer<BufferedRequestState<HttpSinkRequestEntry>>
160-
getWriterStateSerializer() {
176+
getWriterStateSerializer() {
161177
return new HttpSinkWriterStateSerializer();
162178
}
179+
180+
private RequestSubmitterFactory getRequestSubmitterFactory() {
181+
182+
if (SinkRequestSubmitMode.SINGLE.getMode().equalsIgnoreCase(
183+
properties.getProperty(HttpConnectorConfigConstants.SINK_HTTP_REQUEST_MODE))) {
184+
return new PerRequestRequestSubmitterFactory();
185+
}
186+
return new BatchRequestSubmitterFactory(getMaxBatchSize());
187+
}
163188
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.getindata.connectors.http.internal.sink.httpclient;
2+
3+
import java.net.http.HttpClient;
4+
import java.util.Properties;
5+
import java.util.concurrent.ExecutorService;
6+
import java.util.concurrent.Executors;
7+
8+
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
9+
10+
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
11+
import com.getindata.connectors.http.internal.utils.ThreadUtils;
12+
13+
public abstract class AbstractRequestSubmitter implements RequestSubmitter {
14+
15+
protected static final int HTTP_CLIENT_PUBLISHING_THREAD_POOL_SIZE = 1;
16+
17+
protected static final String DEFAULT_REQUEST_TIMEOUT_SECONDS = "30";
18+
19+
/**
20+
* Thread pool to handle HTTP response from HTTP client.
21+
*/
22+
protected final ExecutorService publishingThreadPool;
23+
24+
protected final int httpRequestTimeOutSeconds;
25+
26+
protected final String[] headersAndValues;
27+
28+
protected final HttpClient httpClient;
29+
30+
public AbstractRequestSubmitter(
31+
Properties properties,
32+
String[] headersAndValues,
33+
HttpClient httpClient) {
34+
35+
this.headersAndValues = headersAndValues;
36+
this.publishingThreadPool =
37+
Executors.newFixedThreadPool(
38+
HTTP_CLIENT_PUBLISHING_THREAD_POOL_SIZE,
39+
new ExecutorThreadFactory(
40+
"http-sink-client-response-worker", ThreadUtils.LOGGING_EXCEPTION_HANDLER));
41+
42+
this.httpRequestTimeOutSeconds = Integer.parseInt(
43+
properties.getProperty(HttpConnectorConfigConstants.SINK_HTTP_TIMEOUT_SECONDS,
44+
DEFAULT_REQUEST_TIMEOUT_SECONDS)
45+
);
46+
47+
this.httpClient = httpClient;
48+
}
49+
}

0 commit comments

Comments
 (0)