Skip to content

Commit a0d0ccb

Browse files
author
Mike Davis
authored
Implement Event Batching (#312)
* Introduces an EventProcessor interface. * Introduces a BatchEventProcessor (maybe should name it BatchingEventProcessor) * Introduces an OptimizelyRule to handle lifecycle management during unit testing. Buffering events within a queue before dispatching is an optimization that should prevent SDK implementations from exhausting resources while increasing throughput. This implementation relies on a `BlockingQueue` to buffer events received from one-to-many producers. A single consumer thread continuously polls from this queue to build a batch before emitting the batched `LogEvent`. Note that the `Optimizely` class only knows about the `EventProcessor` and `EventHandler` so that it can gracefully close those implementations, so not strictly necessary. The Optimizely class builds UserEvents via the `UserEventFactory` and passes those events to the `EventProcessor`. The `EventProcessor` emits `LogEvents` to a `NotificationManager<LogEvent>` which is subscribed to by the `EventHandler` for eventual dispatching.
1 parent c9549cd commit a0d0ccb

18 files changed

+826
-103
lines changed

core-api/src/main/java/com/optimizely/ab/Optimizely.java

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@
2424
import com.optimizely.ab.config.parser.ConfigParseException;
2525
import com.optimizely.ab.error.ErrorHandler;
2626
import com.optimizely.ab.error.NoOpErrorHandler;
27-
import com.optimizely.ab.event.EventHandler;
28-
import com.optimizely.ab.event.LogEvent;
29-
import com.optimizely.ab.event.NoopEventHandler;
27+
import com.optimizely.ab.event.*;
3028
import com.optimizely.ab.event.internal.*;
3129
import com.optimizely.ab.event.internal.payload.EventBatch;
3230
import com.optimizely.ab.notification.*;
@@ -78,6 +76,8 @@ public class Optimizely implements AutoCloseable {
7876
@VisibleForTesting
7977
final EventHandler eventHandler;
8078
@VisibleForTesting
79+
final EventProcessor eventProcessor;
80+
@VisibleForTesting
8181
final ErrorHandler errorHandler;
8282

8383
private final ProjectConfigManager projectConfigManager;
@@ -89,13 +89,15 @@ public class Optimizely implements AutoCloseable {
8989
private final UserProfileService userProfileService;
9090

9191
private Optimizely(@Nonnull EventHandler eventHandler,
92+
@Nonnull EventProcessor eventProcessor,
9293
@Nonnull ErrorHandler errorHandler,
9394
@Nonnull DecisionService decisionService,
9495
@Nullable UserProfileService userProfileService,
9596
@Nonnull ProjectConfigManager projectConfigManager,
9697
@Nonnull NotificationCenter notificationCenter
9798
) {
9899
this.eventHandler = eventHandler;
100+
this.eventProcessor = eventProcessor;
99101
this.errorHandler = errorHandler;
100102
this.decisionService = decisionService;
101103
this.userProfileService = userProfileService;
@@ -137,6 +139,7 @@ private void tryClose(Object obj) {
137139
*/
138140
@Override
139141
public void close() {
142+
tryClose(eventProcessor);
140143
tryClose(eventHandler);
141144
tryClose(projectConfigManager);
142145
}
@@ -230,28 +233,25 @@ private void sendImpression(@Nonnull ProjectConfig projectConfig,
230233
return;
231234
}
232235

233-
logger.info("Activating user \"{}\" in experiment \"{}\".", userId, experiment.getKey());
234236
UserEvent userEvent = UserEventFactory.createImpressionEvent(
235237
projectConfig,
236238
experiment,
237239
variation,
238240
userId,
239241
filteredAttributes);
240242

241-
LogEvent impressionEvent = EventFactory.createLogEvent(userEvent);
242-
243-
try {
244-
eventHandler.dispatchEvent(impressionEvent);
245-
} catch (Exception e) {
246-
logger.error("Unexpected exception in event dispatcher", e);
247-
}
243+
eventProcessor.process(userEvent);
244+
logger.info("Activating user \"{}\" in experiment \"{}\".", userId, experiment.getKey());
248245

249246
// Kept For backwards compatibility.
250247
// This notification is deprecated and the new DecisionNotifications
251248
// are sent via their respective method calls.
252-
ActivateNotification activateNotification = new ActivateNotification(
253-
experiment, userId, filteredAttributes, variation, impressionEvent);
254-
notificationCenter.send(activateNotification);
249+
if (notificationCenter.getNotificationManager(ActivateNotification.class).size() > 0) {
250+
LogEvent impressionEvent = EventFactory.createLogEvent(userEvent);
251+
ActivateNotification activateNotification = new ActivateNotification(
252+
experiment, userId, filteredAttributes, variation, impressionEvent);
253+
notificationCenter.send(activateNotification);
254+
}
255255
}
256256

257257
//======== track calls ========//
@@ -309,20 +309,17 @@ public void track(@Nonnull String eventName,
309309
copiedAttributes,
310310
eventTags);
311311

312-
// create the conversion event request parameters, then dispatch
313-
LogEvent conversionEvent = EventFactory.createLogEvent(userEvent);
312+
eventProcessor.process(userEvent);
314313
logger.info("Tracking event \"{}\" for user \"{}\".", eventName, userId);
315314

316-
try {
317-
eventHandler.dispatchEvent(conversionEvent);
318-
} catch (Exception e) {
319-
logger.error("Unexpected exception in event dispatcher", e);
320-
}
321-
322-
TrackNotification notification = new TrackNotification(eventName, userId,
323-
copiedAttributes, eventTags, conversionEvent);
315+
if (notificationCenter.getNotificationManager(TrackNotification.class).size() > 0) {
316+
// create the conversion event request parameters, then dispatch
317+
LogEvent conversionEvent = EventFactory.createLogEvent(userEvent);
318+
TrackNotification notification = new TrackNotification(eventName, userId,
319+
copiedAttributes, eventTags, conversionEvent);
324320

325-
notificationCenter.send(notification);
321+
notificationCenter.send(notification);
322+
}
326323
}
327324

328325
//======== FeatureFlag APIs ========//
@@ -1000,6 +997,7 @@ public static class Builder {
1000997
private DecisionService decisionService;
1001998
private ErrorHandler errorHandler;
1002999
private EventHandler eventHandler;
1000+
private EventProcessor eventProcessor;
10031001
private ProjectConfig projectConfig;
10041002
private ProjectConfigManager projectConfigManager;
10051003
private UserProfileService userProfileService;
@@ -1027,6 +1025,11 @@ public Builder withEventHandler(EventHandler eventHandler) {
10271025
return this;
10281026
}
10291027

1028+
public Builder withEventProcessor(EventProcessor eventProcessor) {
1029+
this.eventProcessor = eventProcessor;
1030+
return this;
1031+
}
1032+
10301033
public Builder withUserProfileService(UserProfileService userProfileService) {
10311034
this.userProfileService = userProfileService;
10321035
return this;
@@ -1094,6 +1097,11 @@ public Optimizely build() {
10941097
decisionService = new DecisionService(bucketer, errorHandler, userProfileService);
10951098
}
10961099

1100+
// For backwards compatibility
1101+
if (eventProcessor == null) {
1102+
eventProcessor = new ForwardingEventProcessor(eventHandler);
1103+
}
1104+
10971105
if (projectConfig == null && datafile != null && !datafile.isEmpty()) {
10981106
try {
10991107
projectConfig = new DatafileProjectConfig.Builder().withDatafile(datafile).build();
@@ -1117,7 +1125,7 @@ public Optimizely build() {
11171125
notificationCenter = new NotificationCenter();
11181126
}
11191127

1120-
return new Optimizely(eventHandler, errorHandler, decisionService, userProfileService, projectConfigManager, notificationCenter);
1128+
return new Optimizely(eventHandler, eventProcessor, errorHandler, decisionService, userProfileService, projectConfigManager, notificationCenter);
11211129
}
11221130
}
11231131
}

0 commit comments

Comments
 (0)