47
47
* {@link EventHandler} implementation that queues events and has a separate pool of threads responsible
48
48
* for the dispatch.
49
49
*/
50
- public class AsyncEventHandler implements EventHandler {
50
+ public class AsyncEventHandler implements EventHandler , AutoCloseable {
51
51
52
52
private static final Logger logger = LoggerFactory .getLogger (AsyncEventHandler .class );
53
53
private static final ProjectConfigResponseHandler EVENT_RESPONSE_HANDLER = new ProjectConfigResponseHandler ();
54
54
55
55
private final OptimizelyHttpClient httpClient ;
56
56
private final ExecutorService workerExecutor ;
57
57
58
- public AsyncEventHandler (int queueCapacity , int numWorkers ) {
59
- this (queueCapacity , numWorkers , 200 , 20 , 5000 );
60
- }
58
+ private final long closeTimeout ;
59
+ private final TimeUnit closeTimeoutUnit ;
61
60
62
- public AsyncEventHandler (int queueCapacity , int numWorkers , int maxConnections , int connectionsPerRoute , int validateAfter ) {
61
+ public AsyncEventHandler (int queueCapacity ,
62
+ int numWorkers ,
63
+ int maxConnections ,
64
+ int connectionsPerRoute ,
65
+ int validateAfter ,
66
+ long closeTimeout ,
67
+ TimeUnit closeTimeoutUnit ) {
63
68
if (queueCapacity <= 0 ) {
64
69
throw new IllegalArgumentException ("queue capacity must be > 0" );
65
70
}
@@ -74,12 +79,17 @@ public AsyncEventHandler(int queueCapacity, int numWorkers, int maxConnections,
74
79
0L , TimeUnit .MILLISECONDS ,
75
80
new ArrayBlockingQueue <Runnable >(queueCapacity ),
76
81
new NamedThreadFactory ("optimizely-event-dispatcher-thread-%s" , true ));
82
+
83
+ this .closeTimeout = closeTimeout ;
84
+ this .closeTimeoutUnit = closeTimeoutUnit ;
77
85
}
78
86
79
87
@ VisibleForTesting
80
88
public AsyncEventHandler (OptimizelyHttpClient httpClient , ExecutorService workerExecutor ) {
81
89
this .httpClient = httpClient ;
82
90
this .workerExecutor = workerExecutor ;
91
+ this .closeTimeout = Long .MAX_VALUE ;
92
+ this .closeTimeoutUnit = TimeUnit .MILLISECONDS ;
83
93
}
84
94
85
95
@ Override
@@ -136,6 +146,11 @@ public void shutdownAndAwaitTermination(long timeout, TimeUnit unit) {
136
146
logger .info ("event handler shutdown complete" );
137
147
}
138
148
149
+ @ Override
150
+ public void close () {
151
+ shutdownAndAwaitTermination (closeTimeout , closeTimeoutUnit );
152
+ }
153
+
139
154
//======== Helper classes ========//
140
155
141
156
/**
@@ -205,4 +220,62 @@ public Void handleResponse(HttpResponse response) throws IOException {
205
220
}
206
221
}
207
222
}
223
+
224
+ //======== Builder ========//
225
+
226
+ public static Builder builder () { return new Builder (); }
227
+
228
+ public static class Builder {
229
+
230
+ private int queueCapacity ;
231
+ private int numWorkers ;
232
+ private int maxTotalConnections = 200 ;
233
+ private int maxPerRoute = 20 ;
234
+ private int validateAfterInactivity = 5000 ;
235
+ private long closeTimeout = Long .MAX_VALUE ;
236
+ private TimeUnit closeTimeoutUnit = TimeUnit .MILLISECONDS ;
237
+
238
+ public Builder withQueueCapacity (int queueCapacity ) {
239
+ this .queueCapacity = queueCapacity ;
240
+ return this ;
241
+ }
242
+
243
+ public Builder withNumWorkers (int numWorkers ) {
244
+ this .numWorkers = numWorkers ;
245
+ return this ;
246
+ }
247
+
248
+ public Builder withMaxTotalConnections (int maxTotalConnections ) {
249
+ this .maxTotalConnections = maxTotalConnections ;
250
+ return this ;
251
+ }
252
+
253
+ public Builder withMaxPerRoute (int maxPerRoute ) {
254
+ this .maxPerRoute = maxPerRoute ;
255
+ return this ;
256
+ }
257
+
258
+ public Builder withValidateAfterInactivity (int validateAfterInactivity ) {
259
+ this .validateAfterInactivity = validateAfterInactivity ;
260
+ return this ;
261
+ }
262
+
263
+ public Builder withCloseTimeout (long closeTimeout , TimeUnit unit ) {
264
+ this .closeTimeout = closeTimeout ;
265
+ this .closeTimeoutUnit = unit ;
266
+ return this ;
267
+ }
268
+
269
+ public AsyncEventHandler build () {
270
+ return new AsyncEventHandler (
271
+ queueCapacity ,
272
+ numWorkers ,
273
+ maxTotalConnections ,
274
+ maxPerRoute ,
275
+ validateAfterInactivity ,
276
+ closeTimeout ,
277
+ closeTimeoutUnit
278
+ );
279
+ }
280
+ }
208
281
}
0 commit comments