Skip to content

Commit af3af22

Browse files
(fix) Mdodsworth/graceful shutdown (#236)
* adding graceful shutdown to the async event dispatcher * adding tests for AsyncEventHandler * run through travis and compat tests
1 parent e969215 commit af3af22

File tree

3 files changed

+215
-56
lines changed

3 files changed

+215
-56
lines changed

core-httpclient-impl/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
dependencies {
22
compile project(':core-api')
3+
testCompile project(':core-api').sourceSets.test.output
34

45
compileOnly group: 'com.google.code.gson', name: 'gson', version: gsonVersion
56

core-httpclient-impl/src/main/java/com/optimizely/ab/event/AsyncEventHandler.java

Lines changed: 76 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.optimizely.ab.HttpClientUtils;
2020
import com.optimizely.ab.NamedThreadFactory;
21+
import com.optimizely.ab.annotations.VisibleForTesting;
2122

2223
import org.apache.http.HttpResponse;
2324
import org.apache.http.client.ClientProtocolException;
@@ -33,23 +34,23 @@
3334
import org.slf4j.Logger;
3435
import org.slf4j.LoggerFactory;
3536

36-
import java.io.Closeable;
3737
import java.io.IOException;
3838
import java.io.UnsupportedEncodingException;
3939
import java.net.URISyntaxException;
4040
import java.util.Map;
4141
import java.util.concurrent.ArrayBlockingQueue;
42-
import java.util.concurrent.BlockingQueue;
4342
import java.util.concurrent.ExecutorService;
44-
import java.util.concurrent.Executors;
43+
import java.util.concurrent.RejectedExecutionException;
44+
import java.util.concurrent.ThreadPoolExecutor;
45+
import java.util.concurrent.TimeUnit;
4546

4647
import javax.annotation.CheckForNull;
4748

4849
/**
4950
* {@link EventHandler} implementation that queues events and has a separate pool of threads responsible
5051
* for the dispatch.
5152
*/
52-
public class AsyncEventHandler implements EventHandler, Closeable {
53+
public class AsyncEventHandler implements EventHandler {
5354

5455
// The following static values are public so that they can be tweaked if necessary.
5556
// These are the recommended settings for http protocol. https://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html
@@ -65,7 +66,6 @@ public class AsyncEventHandler implements EventHandler, Closeable {
6566

6667
private final CloseableHttpClient httpClient;
6768
private final ExecutorService workerExecutor;
68-
private final BlockingQueue<LogEvent> logEventQueue;
6969

7070
public AsyncEventHandler(int queueCapacity, int numWorkers) {
7171
this(queueCapacity, numWorkers, 200, 20, 5000);
@@ -80,21 +80,22 @@ public AsyncEventHandler(int queueCapacity, int numWorkers, int maxConnections,
8080
this.maxPerRoute = connectionsPerRoute;
8181
this.validateAfterInactivity = validateAfter;
8282

83-
this.logEventQueue = new ArrayBlockingQueue<LogEvent>(queueCapacity);
84-
this.httpClient = HttpClients.custom()
83+
this.httpClient = HttpClients.custom()
8584
.setDefaultRequestConfig(HttpClientUtils.DEFAULT_REQUEST_CONFIG)
8685
.setConnectionManager(poolingHttpClientConnectionManager())
8786
.disableCookieManagement()
8887
.build();
8988

90-
this.workerExecutor = Executors.newFixedThreadPool(
91-
numWorkers, new NamedThreadFactory("optimizely-event-dispatcher-thread-%s", true));
89+
this.workerExecutor = new ThreadPoolExecutor(numWorkers, numWorkers,
90+
0L, TimeUnit.MILLISECONDS,
91+
new ArrayBlockingQueue<Runnable>(queueCapacity),
92+
new NamedThreadFactory("optimizely-event-dispatcher-thread-%s", true));
93+
}
9294

93-
// create dispatch workers
94-
for (int i = 0; i < numWorkers; i++) {
95-
EventDispatchWorker worker = new EventDispatchWorker();
96-
workerExecutor.submit(worker);
97-
}
95+
@VisibleForTesting
96+
public AsyncEventHandler(CloseableHttpClient httpClient, ExecutorService workerExecutor) {
97+
this.httpClient = httpClient;
98+
this.workerExecutor = workerExecutor;
9899
}
99100

100101
private PoolingHttpClientConnectionManager poolingHttpClientConnectionManager()
@@ -108,61 +109,87 @@ private PoolingHttpClientConnectionManager poolingHttpClientConnectionManager()
108109

109110
@Override
110111
public void dispatchEvent(LogEvent logEvent) {
111-
// attempt to enqueue the log event for processing
112-
boolean submitted = logEventQueue.offer(logEvent);
113-
if (!submitted) {
114-
logger.error("unable to enqueue event because queue is full");
112+
try {
113+
// attempt to enqueue the log event for processing
114+
workerExecutor.execute(new EventDispatcher(logEvent));
115+
} catch (RejectedExecutionException e) {
116+
logger.error("event dispatch rejected");
115117
}
116118
}
117119

118-
@Override
119-
public void close() throws IOException {
120-
logger.info("closing event dispatcher");
120+
/**
121+
* Attempts to gracefully terminate all event dispatch workers and close all resources.
122+
* This method blocks, awaiting the completion of any queued or ongoing event dispatches.
123+
*
124+
* Note: termination of ongoing event dispatching is best-effort.
125+
*
126+
* @param timeout maximum time to wait for event dispatches to complete
127+
* @param unit the time unit of the timeout argument
128+
*/
129+
public void shutdownAndAwaitTermination(long timeout, TimeUnit unit) {
130+
131+
// Disable new tasks from being submitted
132+
logger.info("event handler shutting down. Attempting to dispatch previously submitted events");
133+
workerExecutor.shutdown();
121134

122-
// "close" all workers and the http client
123135
try {
124-
httpClient.close();
125-
} catch (IOException e) {
126-
logger.error("unable to close the event handler httpclient cleanly", e);
127-
} finally {
136+
// Wait a while for existing tasks to terminate
137+
if (!workerExecutor.awaitTermination(timeout, unit)) {
138+
int unprocessedCount = workerExecutor.shutdownNow().size();
139+
logger.warn("timed out waiting for previously submitted events to be dispatched. "
140+
+ "{} events were dropped. "
141+
+ "Interrupting dispatch worker(s)", unprocessedCount);
142+
// Cancel currently executing tasks
143+
// Wait a while for tasks to respond to being cancelled
144+
if (!workerExecutor.awaitTermination(timeout, unit)) {
145+
logger.error("unable to gracefully shutdown event handler");
146+
}
147+
}
148+
} catch (InterruptedException ie) {
149+
// (Re-)Cancel if current thread also interrupted
128150
workerExecutor.shutdownNow();
151+
// Preserve interrupt status
152+
Thread.currentThread().interrupt();
153+
} finally {
154+
try {
155+
httpClient.close();
156+
} catch (IOException e) {
157+
logger.error("unable to close event dispatcher http client", e);
158+
}
129159
}
160+
161+
logger.info("event handler shutdown complete");
130162
}
131163

132164
//======== Helper classes ========//
133165

134-
private class EventDispatchWorker implements Runnable {
166+
/**
167+
* Wrapper runnable for the actual event dispatch.
168+
*/
169+
private class EventDispatcher implements Runnable {
170+
171+
private final LogEvent logEvent;
172+
173+
EventDispatcher(LogEvent logEvent) {
174+
this.logEvent = logEvent;
175+
}
135176

136177
@Override
137178
public void run() {
138-
boolean terminate = false;
139-
140-
logger.info("starting event dispatch worker");
141-
// event loop that'll block waiting for events to appear in the queue
142-
//noinspection InfiniteLoopStatement
143-
while (!terminate) {
144-
try {
145-
LogEvent event = logEventQueue.take();
146-
HttpRequestBase request;
147-
if (event.getRequestMethod() == LogEvent.RequestMethod.GET) {
148-
request = generateGetRequest(event);
149-
} else {
150-
request = generatePostRequest(event);
151-
}
152-
httpClient.execute(request, EVENT_RESPONSE_HANDLER);
153-
} catch (InterruptedException e) {
154-
logger.info("terminating event dispatcher event loop");
155-
terminate = true;
156-
} catch (Throwable t) {
157-
logger.error("event dispatcher threw exception but will continue", t);
158-
}
179+
try {
180+
HttpGet request = generateRequest(logEvent);
181+
httpClient.execute(request, EVENT_RESPONSE_HANDLER);
182+
} catch (IOException e) {
183+
logger.error("event dispatch failed", e);
184+
} catch (URISyntaxException e) {
185+
logger.error("unable to parse generated URI", e);
159186
}
160187
}
161188

162189
/**
163190
* Helper method that generates the event request for the given {@link LogEvent}.
164191
*/
165-
private HttpGet generateGetRequest(LogEvent event) throws URISyntaxException {
192+
private HttpGet generateRequest(LogEvent event) throws URISyntaxException {
166193

167194
URIBuilder builder = new URIBuilder(event.getEndpointUrl());
168195
for (Map.Entry<String, String> param : event.getRequestParams().entrySet()) {
@@ -171,13 +198,6 @@ private HttpGet generateGetRequest(LogEvent event) throws URISyntaxException {
171198

172199
return new HttpGet(builder.build());
173200
}
174-
175-
private HttpPost generatePostRequest(LogEvent event) throws UnsupportedEncodingException {
176-
HttpPost post = new HttpPost(event.getEndpointUrl());
177-
post.setEntity(new StringEntity(event.getBody()));
178-
post.addHeader("Content-Type", "application/json");
179-
return post;
180-
}
181201
}
182202

183203
/**
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/**
2+
*
3+
* Copyright 2016, Optimizely
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.optimizely.ab.event;
18+
19+
import com.google.common.util.concurrent.MoreExecutors;
20+
21+
import com.optimizely.ab.event.internal.payload.EventBatch;
22+
import org.apache.http.client.ResponseHandler;
23+
import org.apache.http.client.methods.HttpGet;
24+
import org.apache.http.impl.client.CloseableHttpClient;
25+
import org.junit.Rule;
26+
import org.junit.Test;
27+
import org.junit.rules.ExpectedException;
28+
import org.mockito.Mock;
29+
import org.mockito.junit.MockitoJUnit;
30+
import org.mockito.junit.MockitoRule;
31+
32+
import java.io.IOException;
33+
import java.util.HashMap;
34+
import java.util.Map;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.RejectedExecutionException;
37+
import java.util.concurrent.TimeUnit;
38+
39+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
40+
41+
import static org.mockito.Matchers.any;
42+
import static org.mockito.Matchers.anyLong;
43+
import static org.mockito.Mockito.doThrow;
44+
import static org.mockito.Mockito.never;
45+
import static org.mockito.Mockito.verify;
46+
import static org.mockito.Mockito.when;
47+
48+
/**
49+
* Tests for {@link AsyncEventHandler}.
50+
*/
51+
public class AsyncEventHandlerTest {
52+
53+
@Rule
54+
@SuppressFBWarnings("URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
55+
public MockitoRule rule = MockitoJUnit.rule();
56+
57+
@Rule
58+
public ExpectedException thrown = ExpectedException.none();
59+
60+
@Mock CloseableHttpClient mockHttpClient;
61+
@Mock ExecutorService mockExecutorService;
62+
63+
@Test
64+
public void testQueueCapacityPreconditionCheck() throws Exception {
65+
thrown.expect(IllegalArgumentException.class);
66+
new AsyncEventHandler(-1, 1);
67+
}
68+
69+
@Test
70+
public void testDispatch() throws Exception {
71+
AsyncEventHandler eventHandler = new AsyncEventHandler(mockHttpClient, MoreExecutors.newDirectExecutorService());
72+
eventHandler.dispatchEvent(createLogEvent());
73+
verify(mockHttpClient).execute(any(HttpGet.class), any(ResponseHandler.class));
74+
}
75+
76+
/**
77+
* Verify that {@link RejectedExecutionException}s are caught, rather than being propagated.
78+
*/
79+
@Test
80+
public void testRejectedExecutionsAreHandled() throws Exception {
81+
AsyncEventHandler eventHandler = new AsyncEventHandler(mockHttpClient, mockExecutorService);
82+
doThrow(RejectedExecutionException.class).when(mockExecutorService).execute(any(Runnable.class));
83+
eventHandler.dispatchEvent(createLogEvent());
84+
}
85+
86+
/**
87+
* Verify that {@link IOException}s are caught, rather than being propagated (which would cause a worker
88+
* thread to die).
89+
*/
90+
@SuppressWarnings("unchecked")
91+
@Test
92+
public void testIOExceptionsCaughtInDispatch() throws Exception {
93+
AsyncEventHandler eventHandler = new AsyncEventHandler(mockHttpClient, MoreExecutors.newDirectExecutorService());
94+
95+
// have the http client throw an IOException on execute
96+
when(mockHttpClient.execute(any(HttpGet.class), any(ResponseHandler.class))).thenThrow(IOException.class);
97+
eventHandler.dispatchEvent(createLogEvent());
98+
verify(mockHttpClient).execute(any(HttpGet.class), any(ResponseHandler.class));
99+
}
100+
101+
/**
102+
* Verifies the case where all queued events could be processed before the timeout is exceeded.
103+
*/
104+
@Test
105+
public void testShutdownAndAwaitTermination() throws Exception {
106+
AsyncEventHandler eventHandler = new AsyncEventHandler(mockHttpClient, mockExecutorService);
107+
when(mockExecutorService.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true);
108+
109+
eventHandler.shutdownAndAwaitTermination(1, TimeUnit.SECONDS);
110+
verify(mockExecutorService).shutdown();
111+
verify(mockExecutorService, never()).shutdownNow();
112+
113+
verify(mockHttpClient).close();
114+
}
115+
116+
/**
117+
* Verify the case where all queued events count NOT be processed before the timeout was exceeded.
118+
* {@link ExecutorService#shutdownNow()} should be called to drop the queued events and attempt to interrupt
119+
* ongoing tasks.
120+
*/
121+
@Test
122+
public void testShutdownAndForcedTermination() throws Exception {
123+
AsyncEventHandler eventHandler = new AsyncEventHandler(mockHttpClient, mockExecutorService);
124+
when(mockExecutorService.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(false);
125+
126+
eventHandler.shutdownAndAwaitTermination(1, TimeUnit.SECONDS);
127+
verify(mockExecutorService).shutdown();
128+
verify(mockExecutorService).shutdownNow();
129+
verify(mockHttpClient).close();
130+
}
131+
132+
//======== Helper methods ========//
133+
134+
private LogEvent createLogEvent() {Map<String, String> testParams = new HashMap<String, String>();
135+
testParams.put("test", "params");
136+
return new LogEvent(LogEvent.RequestMethod.GET, "test_url", testParams, new EventBatch());
137+
}
138+
}

0 commit comments

Comments
 (0)