Skip to content

Commit 0a94cee

Browse files
authored
Merge pull request #39 from getindata/HTTP-38_HttpTimeout
[HTTP-38] - Fixing issue with handling Http timeout where Event task was not removed from Flink's AsyncIO operator internal queue.
2 parents 97f614b + 3e074a6 commit 0a94cee

25 files changed

+554
-107
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@
22

33
## [Unreleased]
44

5+
### Added
6+
- Add new parameters for HTTP timeout configuration and thread pool size for Sink and Lookup source http requests.
7+
8+
### Fixed
9+
- Fix issue with not cleaning Flink's internal task queue for AsyncIO requests after HTTP timeout in
10+
Lookup source - https://github.com/getindata/flink-http-connector/issues/38
11+
512
## [0.7.0] - 2022-10-27
613

714
- Add to Lookup Source support for performing lookup on columns with complex types such as ROW, Map etc.

README.md

Lines changed: 48 additions & 32 deletions
Large diffs are not rendered by default.

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,23 @@ public final class HttpConnectorConfigConstants {
6565
+ "security.keystore.type";
6666

6767
// -----------------------------------------------------
68+
69+
// ------ HTTPS timeouts and thread pool settings ------
70+
71+
public static final String LOOKUP_HTTP_TIMEOUT_SECONDS =
72+
GID_CONNECTOR_HTTP + "source.lookup.request.timeout";
73+
74+
public static final String SINK_HTTP_TIMEOUT_SECONDS =
75+
GID_CONNECTOR_HTTP + "sink.request.timeout";
76+
77+
public static final String LOOKUP_HTTP_PULING_THREAD_POOL_SIZE =
78+
GID_CONNECTOR_HTTP + "source.lookup.request.thread-pool.size";
79+
80+
public static final String LOOKUP_HTTP_RESPONSE_THREAD_POOL_SIZE =
81+
GID_CONNECTOR_HTTP + "source.lookup.response.thread-pool.size";
82+
83+
public static final String SINK_HTTP_WRITER_THREAD_POOL_SIZE =
84+
GID_CONNECTOR_HTTP + "sink.writer.thread-pool.size";
85+
86+
// -----------------------------------------------------
6887
}

src/main/java/com/getindata/connectors/http/internal/security/SecurityContext.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
import lombok.extern.slf4j.Slf4j;
2626

27+
import com.getindata.connectors.http.internal.utils.ConfigUtils;
28+
2729
/**
2830
* This class represents a security context for given Http connector instance. The Security context
2931
* is backed by in memory instance of Java's {@link KeyStore}. All keys and certificates managed by
@@ -42,17 +44,6 @@ public class SecurityContext {
4244

4345
private static final String PRIVATE_KEY_FOOTER = "-----END PRIVATE KEY-----";
4446

45-
/**
46-
* A pattern matcher linebreak regexp that represents any Unicode linebreak sequence making it
47-
* effectively equivalent to:
48-
* <pre>
49-
* {@code
50-
* &#92;u000D&#92;u000A|[&#92;u000A&#92;u000B&#92;u000C&#92;u000D&#92;u0085&#92;u2028&#92;u2029]
51-
* }
52-
* </pre>
53-
*/
54-
public static final String UNIVERSAL_NEW_LINE_REGEXP = "\\R";
55-
5647
private final char[] storePassword;
5748

5849
private final KeyStore keystore;
@@ -228,7 +219,7 @@ private byte[] decodePrivateData(String privateKeyPath, byte[] privateData) {
228219
if (privateKeyPath.endsWith(".pem")) {
229220
String privateString = new String(privateData, Charset.defaultCharset())
230221
.replace(PRIVATE_KEY_HEADER, "")
231-
.replaceAll(UNIVERSAL_NEW_LINE_REGEXP, "")
222+
.replaceAll(ConfigUtils.UNIVERSAL_NEW_LINE_REGEXP, "")
232223
.replace(PRIVATE_KEY_FOOTER, "");
233224

234225
return Base64.getDecoder().decode(privateString);

src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ protected HttpSinkInternal(
108108

109109
@Override
110110
public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> createWriter(
111-
InitContext context
112-
) throws IOException {
111+
InitContext context) throws IOException {
112+
113113
return new HttpSinkWriter<>(
114114
getElementConverter(),
115115
context,
@@ -121,14 +121,17 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> cr
121121
getMaxRecordSizeInBytes(),
122122
endpointUrl,
123123
sinkHttpClientBuilder.build(properties, httpPostRequestCallback, headerPreprocessor),
124-
Collections.emptyList()
124+
Collections.emptyList(),
125+
properties
125126
);
126127
}
127128

128129
@Override
129130
public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> restoreWriter(
130-
InitContext context, Collection<BufferedRequestState<HttpSinkRequestEntry>> recoveredState
131-
) throws IOException {
131+
InitContext context,
132+
Collection<BufferedRequestState<HttpSinkRequestEntry>> recoveredState)
133+
throws IOException {
134+
132135
return new HttpSinkWriter<>(
133136
getElementConverter(),
134137
context,
@@ -140,7 +143,8 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> re
140143
getMaxRecordSizeInBytes(),
141144
endpointUrl,
142145
sinkHttpClientBuilder.build(properties, httpPostRequestCallback, headerPreprocessor),
143-
recoveredState
146+
recoveredState,
147+
properties
144148
);
145149
}
146150

src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import java.util.Collection;
44
import java.util.Collections;
55
import java.util.List;
6+
import java.util.Properties;
7+
import java.util.concurrent.ExecutorService;
8+
import java.util.concurrent.Executors;
69
import java.util.function.Consumer;
710

811
import lombok.extern.slf4j.Slf4j;
@@ -11,8 +14,11 @@
1114
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
1215
import org.apache.flink.connector.base.sink.writer.ElementConverter;
1316
import org.apache.flink.metrics.Counter;
17+
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
1418

1519
import com.getindata.connectors.http.internal.SinkHttpClient;
20+
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
21+
import com.getindata.connectors.http.internal.utils.ThreadUtils;
1622

1723
/**
1824
* Sink writer created by {@link com.getindata.connectors.http.HttpSink} to write to an HTTP
@@ -26,6 +32,13 @@
2632
@Slf4j
2733
public class HttpSinkWriter<InputT> extends AsyncSinkWriter<InputT, HttpSinkRequestEntry> {
2834

35+
private static final String HTTP_SINK_WRITER_THREAD_POOL_SIZE = "4";
36+
37+
/**
38+
* Thread pool to handle HTTP response from HTTP client.
39+
*/
40+
private final ExecutorService sinkWriterThreadPool;
41+
2942
private final String endpointUrl;
3043

3144
private final SinkHttpClient sinkHttpClient;
@@ -43,14 +56,27 @@ public HttpSinkWriter(
4356
long maxRecordSizeInBytes,
4457
String endpointUrl,
4558
SinkHttpClient sinkHttpClient,
46-
Collection<BufferedRequestState<HttpSinkRequestEntry>> bufferedRequestStates) {
59+
Collection<BufferedRequestState<HttpSinkRequestEntry>> bufferedRequestStates,
60+
Properties properties) {
61+
4762
super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests,
4863
maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates);
4964
this.endpointUrl = endpointUrl;
5065
this.sinkHttpClient = sinkHttpClient;
5166

5267
var metrics = context.metricGroup();
5368
this.numRecordsSendErrorsCounter = metrics.getNumRecordsSendErrorsCounter();
69+
70+
int sinkWriterThreadPollSize = Integer.parseInt(properties.getProperty(
71+
HttpConnectorConfigConstants.SINK_HTTP_WRITER_THREAD_POOL_SIZE,
72+
HTTP_SINK_WRITER_THREAD_POOL_SIZE
73+
));
74+
75+
this.sinkWriterThreadPool =
76+
Executors.newFixedThreadPool(
77+
sinkWriterThreadPollSize,
78+
new ExecutorThreadFactory(
79+
"http-sink-writer-worker", ThreadUtils.LOGGING_EXCEPTION_HANDLER));
5480
}
5581

5682
// TODO: Reintroduce retries by adding backoff policy
@@ -59,7 +85,7 @@ protected void submitRequestEntries(
5985
List<HttpSinkRequestEntry> requestEntries,
6086
Consumer<List<HttpSinkRequestEntry>> requestResult) {
6187
var future = sinkHttpClient.putRequests(requestEntries, endpointUrl);
62-
future.whenComplete((response, err) -> {
88+
future.whenCompleteAsync((response, err) -> {
6389
if (err != null) {
6490
int failedRequestsNumber = requestEntries.size();
6591
log.error(
@@ -89,11 +115,17 @@ protected void submitRequestEntries(
89115
//}
90116
}
91117
requestResult.accept(Collections.emptyList());
92-
});
118+
}, sinkWriterThreadPool);
93119
}
94120

95121
@Override
96122
protected long getSizeInBytes(HttpSinkRequestEntry s) {
97123
return s.getSizeInBytes();
98124
}
125+
126+
@Override
127+
public void close() {
128+
sinkWriterThreadPool.shutdownNow();
129+
super.close();
130+
}
99131
}

src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,20 @@
77
import java.net.http.HttpRequest.BodyPublishers;
88
import java.net.http.HttpRequest.Builder;
99
import java.net.http.HttpResponse;
10-
import java.util.*;
10+
import java.time.Duration;
11+
import java.util.ArrayList;
12+
import java.util.Arrays;
13+
import java.util.List;
14+
import java.util.Map;
15+
import java.util.Properties;
1116
import java.util.concurrent.CompletableFuture;
17+
import java.util.concurrent.ExecutorService;
18+
import java.util.concurrent.Executors;
1219
import java.util.stream.Collectors;
1320

1421
import lombok.extern.slf4j.Slf4j;
1522
import org.apache.flink.annotation.VisibleForTesting;
23+
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
1624

1725
import com.getindata.connectors.http.HttpPostRequestCallback;
1826
import com.getindata.connectors.http.internal.HeaderPreprocessor;
@@ -26,14 +34,19 @@
2634
import com.getindata.connectors.http.internal.table.sink.Slf4jHttpPostRequestCallback;
2735
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
2836
import com.getindata.connectors.http.internal.utils.JavaNetHttpClientFactory;
37+
import com.getindata.connectors.http.internal.utils.ThreadUtils;
2938

3039
/**
31-
* An implementation of {@link SinkHttpClient} that uses Java 11's {@link HttpClient}.
32-
* This implementation supports HTTP traffic only.
40+
* An implementation of {@link SinkHttpClient} that uses Java 11's {@link HttpClient}. This
41+
* implementation supports HTTP traffic only.
3342
*/
3443
@Slf4j
3544
public class JavaNetSinkHttpClient implements SinkHttpClient {
3645

46+
private static final int HTTP_CLIENT_THREAD_POOL_SIZE = 16;
47+
48+
public static final String DEFAULT_REQUEST_TIMEOUT_SECONDS = "30";
49+
3750
private final HttpClient httpClient;
3851

3952
private final String[] headersAndValues;
@@ -44,16 +57,29 @@ public class JavaNetSinkHttpClient implements SinkHttpClient {
4457

4558
private final HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback;
4659

60+
/**
61+
* Thread pool to handle HTTP response from HTTP client.
62+
*/
63+
private final ExecutorService publishingThreadPool;
64+
65+
private final int httpRequestTimeOutSeconds;
66+
4767
public JavaNetSinkHttpClient(Properties properties, HeaderPreprocessor headerPreprocessor) {
4868
this(properties, new Slf4jHttpPostRequestCallback(), headerPreprocessor);
4969
}
5070

5171
public JavaNetSinkHttpClient(
52-
Properties properties,
53-
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
54-
HeaderPreprocessor headerPreprocessor) {
72+
Properties properties,
73+
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
74+
HeaderPreprocessor headerPreprocessor) {
5575

56-
this.httpClient = JavaNetHttpClientFactory.createClient(properties);
76+
ExecutorService httpClientExecutor =
77+
Executors.newFixedThreadPool(
78+
HTTP_CLIENT_THREAD_POOL_SIZE,
79+
new ExecutorThreadFactory(
80+
"http-sink-client-request-worker", ThreadUtils.LOGGING_EXCEPTION_HANDLER));
81+
82+
this.httpClient = JavaNetHttpClientFactory.createClient(properties, httpClientExecutor);
5783
this.httpPostRequestCallback = httpPostRequestCallback;
5884
this.headerMap = HttpHeaderUtils.prepareHeaderMap(
5985
HttpConnectorConfigConstants.SINK_HEADER_PREFIX,
@@ -72,12 +98,23 @@ public JavaNetSinkHttpClient(
7298
.build();
7399

74100
this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig);
101+
102+
this.publishingThreadPool =
103+
Executors.newFixedThreadPool(
104+
HTTP_CLIENT_THREAD_POOL_SIZE,
105+
new ExecutorThreadFactory(
106+
"http-sink-client-response-worker", ThreadUtils.LOGGING_EXCEPTION_HANDLER));
107+
108+
this.httpRequestTimeOutSeconds = Integer.parseInt(
109+
properties.getProperty(HttpConnectorConfigConstants.SINK_HTTP_TIMEOUT_SECONDS,
110+
DEFAULT_REQUEST_TIMEOUT_SECONDS)
111+
);
75112
}
76113

77114
@Override
78115
public CompletableFuture<SinkHttpClientResponse> putRequests(
79-
List<HttpSinkRequestEntry> requestEntries,
80-
String endpointUrl) {
116+
List<HttpSinkRequestEntry> requestEntries,
117+
String endpointUrl) {
81118
return submitRequests(requestEntries, endpointUrl)
82119
.thenApply(responses -> prepareSinkHttpClientResponse(responses, endpointUrl));
83120
}
@@ -87,6 +124,7 @@ private HttpRequest buildHttpRequest(HttpSinkRequestEntry requestEntry, URI endp
87124
.newBuilder()
88125
.uri(endpointUri)
89126
.version(Version.HTTP_1_1)
127+
.timeout(Duration.ofSeconds(httpRequestTimeOutSeconds))
90128
.method(requestEntry.method,
91129
BodyPublishers.ofByteArray(requestEntry.element));
92130

@@ -98,20 +136,25 @@ private HttpRequest buildHttpRequest(HttpSinkRequestEntry requestEntry, URI endp
98136
}
99137

100138
private CompletableFuture<List<JavaNetHttpResponseWrapper>> submitRequests(
101-
List<HttpSinkRequestEntry> requestEntries,
102-
String endpointUrl) {
139+
List<HttpSinkRequestEntry> requestEntries,
140+
String endpointUrl) {
103141
var endpointUri = URI.create(endpointUrl);
104142
var responseFutures = new ArrayList<CompletableFuture<JavaNetHttpResponseWrapper>>();
105143

106144
for (var entry : requestEntries) {
107145
var response = httpClient
108-
.sendAsync(buildHttpRequest(entry, endpointUri),
146+
.sendAsync(
147+
buildHttpRequest(entry, endpointUri),
109148
HttpResponse.BodyHandlers.ofString())
110149
.exceptionally(ex -> {
150+
// TODO This will be executed on a ForJoinPool Thread... refactor this someday.
111151
log.error("Request fatally failed because of an exception", ex);
112152
return null;
113153
})
114-
.thenApply(res -> new JavaNetHttpResponseWrapper(entry, res));
154+
.thenApplyAsync(
155+
res -> new JavaNetHttpResponseWrapper(entry, res),
156+
publishingThreadPool
157+
);
115158
responseFutures.add(response);
116159
}
117160

@@ -121,8 +164,8 @@ private CompletableFuture<List<JavaNetHttpResponseWrapper>> submitRequests(
121164
}
122165

123166
private SinkHttpClientResponse prepareSinkHttpClientResponse(
124-
List<JavaNetHttpResponseWrapper> responses,
125-
String endpointUrl) {
167+
List<JavaNetHttpResponseWrapper> responses,
168+
String endpointUrl) {
126169
var successfulResponses = new ArrayList<HttpSinkRequestEntry>();
127170
var failedResponses = new ArrayList<HttpSinkRequestEntry>();
128171

0 commit comments

Comments
 (0)