7
7
import java .net .http .HttpRequest .BodyPublishers ;
8
8
import java .net .http .HttpRequest .Builder ;
9
9
import java .net .http .HttpResponse ;
10
- import java .util .*;
10
+ import java .time .Duration ;
11
+ import java .util .ArrayList ;
12
+ import java .util .Arrays ;
13
+ import java .util .List ;
14
+ import java .util .Map ;
15
+ import java .util .Properties ;
11
16
import java .util .concurrent .CompletableFuture ;
17
+ import java .util .concurrent .ExecutorService ;
18
+ import java .util .concurrent .Executors ;
12
19
import java .util .stream .Collectors ;
13
20
14
21
import lombok .extern .slf4j .Slf4j ;
15
22
import org .apache .flink .annotation .VisibleForTesting ;
23
+ import org .apache .flink .util .concurrent .ExecutorThreadFactory ;
16
24
17
25
import com .getindata .connectors .http .HttpPostRequestCallback ;
18
26
import com .getindata .connectors .http .internal .HeaderPreprocessor ;
26
34
import com .getindata .connectors .http .internal .table .sink .Slf4jHttpPostRequestCallback ;
27
35
import com .getindata .connectors .http .internal .utils .HttpHeaderUtils ;
28
36
import com .getindata .connectors .http .internal .utils .JavaNetHttpClientFactory ;
37
+ import com .getindata .connectors .http .internal .utils .ThreadUtils ;
29
38
30
39
/**
31
- * An implementation of {@link SinkHttpClient} that uses Java 11's {@link HttpClient}.
32
- * This implementation supports HTTP traffic only.
40
+ * An implementation of {@link SinkHttpClient} that uses Java 11's {@link HttpClient}. This
41
+ * implementation supports HTTP traffic only.
33
42
*/
34
43
@ Slf4j
35
44
public class JavaNetSinkHttpClient implements SinkHttpClient {
36
45
46
+ private static final int HTTP_CLIENT_THREAD_POOL_SIZE = 16 ;
47
+
48
+ public static final String DEFAULT_REQUEST_TIMEOUT_SECONDS = "30" ;
49
+
37
50
private final HttpClient httpClient ;
38
51
39
52
private final String [] headersAndValues ;
@@ -44,16 +57,29 @@ public class JavaNetSinkHttpClient implements SinkHttpClient {
44
57
45
58
private final HttpPostRequestCallback <HttpSinkRequestEntry > httpPostRequestCallback ;
46
59
60
+ /**
61
+ * Thread pool to handle HTTP response from HTTP client.
62
+ */
63
+ private final ExecutorService publishingThreadPool ;
64
+
65
+ private final int httpRequestTimeOutSeconds ;
66
+
47
67
public JavaNetSinkHttpClient (Properties properties , HeaderPreprocessor headerPreprocessor ) {
48
68
this (properties , new Slf4jHttpPostRequestCallback (), headerPreprocessor );
49
69
}
50
70
51
71
public JavaNetSinkHttpClient (
52
- Properties properties ,
53
- HttpPostRequestCallback <HttpSinkRequestEntry > httpPostRequestCallback ,
54
- HeaderPreprocessor headerPreprocessor ) {
72
+ Properties properties ,
73
+ HttpPostRequestCallback <HttpSinkRequestEntry > httpPostRequestCallback ,
74
+ HeaderPreprocessor headerPreprocessor ) {
55
75
56
- this .httpClient = JavaNetHttpClientFactory .createClient (properties );
76
+ ExecutorService httpClientExecutor =
77
+ Executors .newFixedThreadPool (
78
+ HTTP_CLIENT_THREAD_POOL_SIZE ,
79
+ new ExecutorThreadFactory (
80
+ "http-sink-client-request-worker" , ThreadUtils .LOGGING_EXCEPTION_HANDLER ));
81
+
82
+ this .httpClient = JavaNetHttpClientFactory .createClient (properties , httpClientExecutor );
57
83
this .httpPostRequestCallback = httpPostRequestCallback ;
58
84
this .headerMap = HttpHeaderUtils .prepareHeaderMap (
59
85
HttpConnectorConfigConstants .SINK_HEADER_PREFIX ,
@@ -72,12 +98,23 @@ public JavaNetSinkHttpClient(
72
98
.build ();
73
99
74
100
this .statusCodeChecker = new ComposeHttpStatusCodeChecker (checkerConfig );
101
+
102
+ this .publishingThreadPool =
103
+ Executors .newFixedThreadPool (
104
+ HTTP_CLIENT_THREAD_POOL_SIZE ,
105
+ new ExecutorThreadFactory (
106
+ "http-sink-client-response-worker" , ThreadUtils .LOGGING_EXCEPTION_HANDLER ));
107
+
108
+ this .httpRequestTimeOutSeconds = Integer .parseInt (
109
+ properties .getProperty (HttpConnectorConfigConstants .SINK_HTTP_TIMEOUT_SECONDS ,
110
+ DEFAULT_REQUEST_TIMEOUT_SECONDS )
111
+ );
75
112
}
76
113
77
114
@ Override
78
115
public CompletableFuture <SinkHttpClientResponse > putRequests (
79
- List <HttpSinkRequestEntry > requestEntries ,
80
- String endpointUrl ) {
116
+ List <HttpSinkRequestEntry > requestEntries ,
117
+ String endpointUrl ) {
81
118
return submitRequests (requestEntries , endpointUrl )
82
119
.thenApply (responses -> prepareSinkHttpClientResponse (responses , endpointUrl ));
83
120
}
@@ -87,6 +124,7 @@ private HttpRequest buildHttpRequest(HttpSinkRequestEntry requestEntry, URI endp
87
124
.newBuilder ()
88
125
.uri (endpointUri )
89
126
.version (Version .HTTP_1_1 )
127
+ .timeout (Duration .ofSeconds (httpRequestTimeOutSeconds ))
90
128
.method (requestEntry .method ,
91
129
BodyPublishers .ofByteArray (requestEntry .element ));
92
130
@@ -98,20 +136,25 @@ private HttpRequest buildHttpRequest(HttpSinkRequestEntry requestEntry, URI endp
98
136
}
99
137
100
138
private CompletableFuture <List <JavaNetHttpResponseWrapper >> submitRequests (
101
- List <HttpSinkRequestEntry > requestEntries ,
102
- String endpointUrl ) {
139
+ List <HttpSinkRequestEntry > requestEntries ,
140
+ String endpointUrl ) {
103
141
var endpointUri = URI .create (endpointUrl );
104
142
var responseFutures = new ArrayList <CompletableFuture <JavaNetHttpResponseWrapper >>();
105
143
106
144
for (var entry : requestEntries ) {
107
145
var response = httpClient
108
- .sendAsync (buildHttpRequest (entry , endpointUri ),
146
+ .sendAsync (
147
+ buildHttpRequest (entry , endpointUri ),
109
148
HttpResponse .BodyHandlers .ofString ())
110
149
.exceptionally (ex -> {
150
+ // TODO This will be executed on a ForJoinPool Thread... refactor this someday.
111
151
log .error ("Request fatally failed because of an exception" , ex );
112
152
return null ;
113
153
})
114
- .thenApply (res -> new JavaNetHttpResponseWrapper (entry , res ));
154
+ .thenApplyAsync (
155
+ res -> new JavaNetHttpResponseWrapper (entry , res ),
156
+ publishingThreadPool
157
+ );
115
158
responseFutures .add (response );
116
159
}
117
160
@@ -121,8 +164,8 @@ private CompletableFuture<List<JavaNetHttpResponseWrapper>> submitRequests(
121
164
}
122
165
123
166
private SinkHttpClientResponse prepareSinkHttpClientResponse (
124
- List <JavaNetHttpResponseWrapper > responses ,
125
- String endpointUrl ) {
167
+ List <JavaNetHttpResponseWrapper > responses ,
168
+ String endpointUrl ) {
126
169
var successfulResponses = new ArrayList <HttpSinkRequestEntry >();
127
170
var failedResponses = new ArrayList <HttpSinkRequestEntry >();
128
171
0 commit comments