Skip to content

Commit d19cdcf

Browse files
author
Mike Davis
authored
Add more control over BatchEventProcessor. (#319)
* Add flush method to BatchEventProcessor. * Allow BatchEventProcessor to be restarted. * Add option to build BatchEventProcessor without starting the processing thread.
1 parent 848b44d commit d19cdcf

File tree

2 files changed

+71
-4
lines changed

2 files changed

+71
-4
lines changed

core-api/src/main/java/com/optimizely/ab/event/BatchEventProcessor.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable {
5050
public static final long DEFAULT_TIMEOUT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
5151

5252
private static final Object SHUTDOWN_SIGNAL = new Object();
53+
private static final Object FLUSH_SIGNAL = new Object();
5354

5455
private final BlockingQueue<Object> eventQueue;
5556
private final EventHandler eventHandler;
@@ -81,8 +82,6 @@ private BatchEventProcessor(BlockingQueue<Object> eventQueue, EventHandler event
8182
} else {
8283
this.executor = executor;
8384
}
84-
85-
start();
8685
}
8786

8887
public synchronized void start() {
@@ -107,6 +106,8 @@ public void close() throws Exception {
107106
Thread.currentThread().interrupt();
108107
} catch (TimeoutException e) {
109108
logger.error("Timeout exceeded attempting to close for {} ms", timeoutMillis);
109+
} finally {
110+
isStarted = false;
110111
}
111112
}
112113

@@ -123,6 +124,10 @@ public void process(UserEvent userEvent) {
123124
}
124125
}
125126

127+
public void flush() throws InterruptedException {
128+
eventQueue.put(FLUSH_SIGNAL);
129+
}
130+
126131
public class EventConsumer implements Runnable {
127132
private LinkedList<UserEvent> currentBatch = new LinkedList<>();
128133
private long deadline = System.currentTimeMillis() + flushInterval;
@@ -148,6 +153,12 @@ public void run() {
148153
break;
149154
}
150155

156+
if (item == FLUSH_SIGNAL) {
157+
logger.debug("Received flush signal.");
158+
flush();
159+
continue;
160+
}
161+
151162
addToBatch((UserEvent) item);
152163
}
153164
} catch (InterruptedException e) {
@@ -267,8 +278,17 @@ public Builder withNotificationCenter(NotificationCenter notificationCenter) {
267278
}
268279

269280
public BatchEventProcessor build() {
270-
return new BatchEventProcessor(eventQueue, eventHandler, batchSize, flushInterval, timeoutMillis, executor, notificationCenter);
281+
return build(true);
271282
}
272-
}
273283

284+
public BatchEventProcessor build(boolean shouldStart) {
285+
BatchEventProcessor batchEventProcessor = new BatchEventProcessor(eventQueue, eventHandler, batchSize, flushInterval, timeoutMillis, executor, notificationCenter);
286+
287+
if (shouldStart) {
288+
batchEventProcessor.start();
289+
}
290+
291+
return batchEventProcessor;
292+
}
293+
}
274294
}

core-api/src/test/java/com/optimizely/ab/event/BatchEventProcessorTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,28 @@ public void testFlushMaxBatchSize() throws Exception {
121121
assertEquals(0, eventQueue.size());
122122
}
123123

124+
@Test
125+
public void testFlush() throws Exception {
126+
CountDownLatch countDownLatch = new CountDownLatch(2);
127+
setEventProcessor(logEvent -> {
128+
eventHandlerRule.dispatchEvent(logEvent);
129+
countDownLatch.countDown();
130+
});
131+
132+
UserEvent userEvent = buildConversionEvent(EVENT_NAME);
133+
eventProcessor.process(userEvent);
134+
eventProcessor.flush();
135+
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);
136+
137+
eventProcessor.process(userEvent);
138+
eventProcessor.flush();
139+
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);
140+
141+
if (!countDownLatch.await(MAX_DURATION_MS / 2, TimeUnit.MILLISECONDS)) {
142+
fail("Exceeded timeout waiting for notification.");
143+
}
144+
}
145+
124146
@Test
125147
public void testFlushOnMismatchRevision() throws Exception {
126148
CountDownLatch countDownLatch = new CountDownLatch(2);
@@ -177,6 +199,31 @@ public void testFlushOnMismatchProjectId() throws Exception {
177199
}
178200
}
179201

202+
@Test
203+
public void testStopAndStart() throws Exception {
204+
CountDownLatch countDownLatch = new CountDownLatch(2);
205+
setEventProcessor(logEvent -> {
206+
eventHandlerRule.dispatchEvent(logEvent);
207+
countDownLatch.countDown();
208+
});
209+
210+
UserEvent userEvent = buildConversionEvent(EVENT_NAME);
211+
eventProcessor.process(userEvent);
212+
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);
213+
214+
eventProcessor.close();
215+
216+
eventProcessor.process(userEvent);
217+
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);
218+
219+
eventProcessor.start();
220+
221+
eventProcessor.close();
222+
if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
223+
fail("Exceeded timeout waiting for notification.");
224+
}
225+
}
226+
180227
@Test
181228
public void testNotificationCenter() throws Exception {
182229
CountDownLatch countDownLatch = new CountDownLatch(1);

0 commit comments

Comments
 (0)