Skip to content

Commit 2022b49

Browse files
[FSSDK-9054] fix(odp): flush odp events instantly upon calling close (#511)
* corrected flush all test and added shutdown event instead of shutdown boolean. * Added check --------- Co-authored-by: NomanShoaib <m.nomanshoaib09@gmail.com>
1 parent 9e02c7e commit 2022b49

File tree

2 files changed

+14
-11
lines changed

2 files changed

+14
-11
lines changed

core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class ODPEventManager {
3535
private static final int DEFAULT_FLUSH_INTERVAL = 1000;
3636
private static final int MAX_RETRIES = 3;
3737
private static final String EVENT_URL_PATH = "/v3/events";
38+
private static final Object SHUTDOWN_SIGNAL = new Object();
3839

3940
private final int queueSize;
4041
private final int batchSize;
@@ -188,8 +189,6 @@ public void stop() {
188189

189190
private class EventDispatcherThread extends Thread {
190191

191-
private volatile boolean shouldStop = false;
192-
193192
private final List<ODPEvent> currentBatch = new ArrayList<>();
194193

195194
private long nextFlushTime = new Date().getTime();
@@ -198,7 +197,7 @@ private class EventDispatcherThread extends Thread {
198197
public void run() {
199198
while (true) {
200199
try {
201-
Object nextEvent;
200+
Object nextEvent = null;
202201

203202
// If batch has events, set the timeout to remaining time for flush interval,
204203
// otherwise wait for the new event indefinitely
@@ -213,9 +212,6 @@ public void run() {
213212
if (!currentBatch.isEmpty()) {
214213
flush();
215214
}
216-
if (shouldStop) {
217-
break;
218-
}
219215
continue;
220216
}
221217

@@ -228,7 +224,11 @@ public void run() {
228224
// Batch starting, create a new flush time
229225
nextFlushTime = new Date().getTime() + flushInterval;
230226
}
231-
227+
if (nextEvent == SHUTDOWN_SIGNAL) {
228+
flush();
229+
logger.info("Received shutdown signal.");
230+
break;
231+
}
232232
currentBatch.add((ODPEvent) nextEvent);
233233

234234
if (currentBatch.size() >= batchSize) {
@@ -268,7 +268,9 @@ private void flush() {
268268
}
269269

270270
public void signalStop() {
271-
shouldStop = true;
271+
if (!eventQueue.offer(SHUTDOWN_SIGNAL)) {
272+
logger.error("Failed to Process Shutdown odp Event. Event Queue is not accepting any more events");
273+
}
272274
}
273275
}
274276

core-api/src/test/java/com/optimizely/ab/odp/ODPEventManagerTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,18 +182,19 @@ public void retryFailedEvents() throws InterruptedException {
182182

183183
@Test
184184
public void shouldFlushAllScheduledEventsBeforeStopping() throws InterruptedException {
185+
int flushInterval = 20000;
185186
Mockito.reset(mockApiManager);
186187
Mockito.when(mockApiManager.sendEvents(any(), any(), any())).thenReturn(202);
187188
ODPConfig odpConfig = new ODPConfig("key", "http://www.odp-host.com", null);
188-
ODPEventManager eventManager = new ODPEventManager(mockApiManager);
189+
ODPEventManager eventManager = new ODPEventManager(mockApiManager, null, flushInterval);
189190
eventManager.updateSettings(odpConfig);
190191
eventManager.start();
191-
for (int i = 0; i < 25; i++) {
192+
for (int i = 0; i < 8; i++) {
192193
eventManager.sendEvent(getEvent(i));
193194
}
194195
eventManager.stop();
195196
Thread.sleep(1500);
196-
Mockito.verify(mockApiManager, times(3)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any());
197+
Mockito.verify(mockApiManager, times(1)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any());
197198
logbackVerifier.expectMessage(Level.DEBUG, "Exiting ODP Event Dispatcher Thread.");
198199
}
199200

0 commit comments

Comments
 (0)