Skip to content

Commit 1b5deaa

Browse files
authored
fix: change EventDispatcher to EventProcessor + EventHandler (#272)
* change eventDispatcher to eventProcessor + eventHandler (wip) * change DefaultEventProcessor to BatchEventProcessor * add UserEvent and UserContext and pass them to process events * change names to OPTEventsProcessor + OPTEventsDispatcher * add sdkKey to ProjectConfig * change LogEvent notification to notificationCenter injection * remove sdkKey from EP/ED injection for sharing
1 parent 335e8b9 commit 1b5deaa

37 files changed

+2825
-650
lines changed

DemoSwiftApp/AppDelegate.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,12 @@ class AppDelegate: UIResponder, UIApplicationDelegate {
9999
// 30 sec interval may be too frequent. This is for demo purpose.
100100
// This should be should be much larger (default = 10 mins).
101101
let customDownloadIntervalInSecs = 30
102+
103+
let eventProcessor = BatchEventProcessor(batchSize: 10, timerInterval: 30, maxQueueSize: 1000)
102104

103105
optimizely = OptimizelyClient(sdkKey: sdkKey,
104106
logger: customLogger,
107+
eventProcessor: eventProcessor,
105108
periodicDownloadInterval: customDownloadIntervalInSecs,
106109
defaultLogLevel: logLevel)
107110

OptimizelySwiftSDK.xcodeproj/project.pbxproj

Lines changed: 246 additions & 32 deletions
Large diffs are not rendered by default.
Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
/****************************************************************************
2+
* Copyright 2019, Optimizely, Inc. and contributors *
3+
* *
4+
* Licensed under the Apache License, Version 2.0 (the "License"); *
5+
* you may not use this file except in compliance with the License. *
6+
* You may obtain a copy of the License at *
7+
* *
8+
* http://www.apache.org/licenses/LICENSE-2.0 *
9+
* *
10+
* Unless required by applicable law or agreed to in writing, software *
11+
* distributed under the License is distributed on an "AS IS" BASIS, *
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
13+
* See the License for the specific language governing permissions and *
14+
* limitations under the License. *
15+
***************************************************************************/
16+
17+
import Foundation
18+
19+
public enum DataStoreType {
20+
case file, memory, userDefaults
21+
}
22+
23+
open class BatchEventProcessor: BackgroundingCallbacks, OPTEventsProcessor {
24+
25+
static let sharedInstance = BatchEventProcessor()
26+
27+
// timer-interval for batching (0 = no batching, negative = use default)
28+
var timerInterval: TimeInterval
29+
// batch size (1 = no batching, 0 or negative = use default)
30+
// attempt to send events in batches with batchSize number of events combined
31+
var batchSize: Int
32+
var maxQueueSize: Int
33+
34+
public struct DefaultValues {
35+
static public let batchSize = 10
36+
static public let timeInterval: TimeInterval = 60 // secs
37+
static public let maxQueueSize = 10000
38+
static let maxFailureCount = 3
39+
}
40+
41+
// the max failure count. there is no backoff timer.
42+
43+
let eventDispatcher: OPTEventsDispatcher
44+
lazy var logger = OPTLoggerFactory.getLogger()
45+
var backingStore: DataStoreType
46+
var backingStoreName: String
47+
48+
// for dispatching events
49+
let processQueue = DispatchQueue(label: "BatchEventProcessQueue")
50+
let flushQueue = DispatchQueue(label: "BatchEventFlushQueue")
51+
// using a datastore queue with a backing file
52+
let dataStore: DataStoreQueueStackImpl<EventForDispatch>
53+
// timer as a atomic property.
54+
var timer: AtomicProperty<Timer> = AtomicProperty<Timer>()
55+
56+
// synchronous dispatching
57+
let notify = DispatchGroup()
58+
59+
var observerProjectId: NSObjectProtocol?
60+
var observerRevision: NSObjectProtocol?
61+
62+
func getNotificationCenter(sdkKey: String) -> OPTNotificationCenter? {
63+
return HandlerRegistryService.shared.injectNotificationCenter(sdkKey: sdkKey)
64+
}
65+
66+
public init(eventDispatcher: OPTEventsDispatcher = HTTPEventDispatcher(),
67+
batchSize: Int = DefaultValues.batchSize,
68+
timerInterval: TimeInterval = DefaultValues.timeInterval,
69+
maxQueueSize: Int = DefaultValues.maxQueueSize,
70+
backingStore: DataStoreType = .file,
71+
dataStoreName: String = "OPTEventQueue") {
72+
self.eventDispatcher = eventDispatcher
73+
self.batchSize = batchSize > 0 ? batchSize : DefaultValues.batchSize
74+
self.timerInterval = timerInterval >= 0 ? timerInterval : DefaultValues.timeInterval
75+
76+
var queueSize = maxQueueSize >= 100 ? maxQueueSize : DefaultValues.maxQueueSize
77+
if queueSize < batchSize {
78+
print("MaxQueueSize should be bigger than batchSize. Adjusting automatically.")
79+
queueSize = batchSize
80+
}
81+
self.maxQueueSize = queueSize
82+
83+
self.backingStore = backingStore
84+
self.backingStoreName = dataStoreName
85+
86+
switch backingStore {
87+
case .file:
88+
self.dataStore = DataStoreQueueStackImpl<EventForDispatch>(queueStackName: "OPTEventQueue",
89+
dataStore: DataStoreFile<[Data]>(storeName: backingStoreName))
90+
case .memory:
91+
self.dataStore = DataStoreQueueStackImpl<EventForDispatch>(queueStackName: "OPTEventQueue",
92+
dataStore: DataStoreMemory<[Data]>(storeName: backingStoreName))
93+
case .userDefaults:
94+
self.dataStore = DataStoreQueueStackImpl<EventForDispatch>(queueStackName: "OPTEventQueue",
95+
dataStore: DataStoreUserDefaults())
96+
}
97+
98+
if self.maxQueueSize < self.batchSize {
99+
self.logger.e(.eventDispatcherConfigError("batchSize cannot be bigger than maxQueueSize"))
100+
self.maxQueueSize = self.batchSize
101+
}
102+
103+
addProjectChangeNotificationObservers()
104+
105+
subscribe()
106+
}
107+
108+
deinit {
109+
stopTimer()
110+
111+
removeProjectChangeNotificationObservers()
112+
113+
unsubscribe()
114+
}
115+
116+
open func process(event: UserEvent, completionHandler: ProcessCompletionHandler? = nil) {
117+
// EP can be shared by multiple clients
118+
processQueue.async {
119+
guard self.dataStore.count < self.maxQueueSize else {
120+
let error = OptimizelyError.eventDispatchFailed("EventQueue is full")
121+
self.logger.e(error)
122+
123+
self.flush()
124+
completionHandler?(.failure(error))
125+
return
126+
}
127+
128+
guard let body = try? JSONEncoder().encode(event.batchEvent) else {
129+
let error = OptimizelyError.eventDispatchFailed("Event serialization failed")
130+
self.logger.e(error)
131+
132+
completionHandler?(.failure(error))
133+
return
134+
}
135+
136+
self.dataStore.save(item: EventForDispatch(sdkKey: event.userContext.config.sdkKey, body: body))
137+
138+
if self.dataStore.count >= self.batchSize {
139+
self.flush()
140+
} else {
141+
self.startTimer()
142+
}
143+
144+
completionHandler?(.success(body))
145+
}
146+
}
147+
148+
open func flush() {
149+
flushQueue.async {
150+
// we don't remove anthing off of the queue unless it is successfully sent.
151+
var failureCount = 0
152+
153+
func removeStoredEvents(num: Int) {
154+
if let removedItem = self.dataStore.removeFirstItems(count: num), removedItem.count > 0 {
155+
// avoid event-log-message preparation overheads with closure-logging
156+
self.logger.d({ "Removed stored \(num) events starting with \(removedItem.first!)" })
157+
} else {
158+
self.logger.e("Failed to removed \(num) events")
159+
}
160+
}
161+
162+
while let eventsToSend: [EventForDispatch] = self.dataStore.getFirstItems(count: self.batchSize) {
163+
let (numEvents, batched) = eventsToSend.batch()
164+
165+
guard numEvents > 0 else { break }
166+
167+
guard let batchEvent = batched else {
168+
// discard an invalid event that causes batching failure
169+
// - if an invalid event is found while batching, it batches all the valid ones before the invalid one and sends it out.
170+
// - when trying to batch next, it finds the invalid one at the header. It discards that specific invalid one and continue batching next ones.
171+
172+
removeStoredEvents(num: 1)
173+
continue
174+
}
175+
176+
// we've exhuasted our failure count. Give up and try the next time a event
177+
// is queued or someone calls flush.
178+
if failureCount > DefaultValues.maxFailureCount {
179+
self.logger.e(.eventSendRetyFailed(failureCount))
180+
break
181+
}
182+
183+
// send notification BEFORE sending event to the server
184+
if let event = try? JSONSerialization.jsonObject(with: batchEvent.body, options: []) as? [String: Any] {
185+
let url = batchEvent.url.absoluteString
186+
let sdkKey = batchEvent.sdkKey
187+
188+
if let notifCenter = self.getNotificationCenter(sdkKey: sdkKey) {
189+
let args: [Any] = [url, event]
190+
notifCenter.sendNotifications(type: NotificationType.logEvent.rawValue, args: args)
191+
}
192+
} else {
193+
self.logger.e("LogEvent notification discarded due to invalid event")
194+
}
195+
196+
// make the send event synchronous. enter our notify
197+
self.notify.enter()
198+
self.eventDispatcher.dispatch(event: batchEvent) { (result) -> Void in
199+
switch result {
200+
case .failure(let error):
201+
self.logger.e(error.reason)
202+
failureCount += 1
203+
case .success:
204+
// we succeeded. remove the batch size sent.
205+
removeStoredEvents(num: numEvents)
206+
207+
// reset failureCount
208+
failureCount = 0
209+
}
210+
// our send is done.
211+
self.notify.leave()
212+
213+
}
214+
// wait for send
215+
self.notify.wait()
216+
}
217+
}
218+
}
219+
220+
func applicationDidEnterBackground() {
221+
stopTimer()
222+
flush()
223+
}
224+
225+
func applicationDidBecomeActive() {
226+
if dataStore.count > 0 {
227+
flush()
228+
}
229+
}
230+
231+
func startTimer() {
232+
// timer is activated only for iOS10+ and non-zero interval value
233+
guard #available(iOS 10.0, tvOS 10.0, *), timerInterval > 0 else {
234+
flush()
235+
return
236+
}
237+
238+
guard self.timer.property == nil else { return }
239+
240+
DispatchQueue.main.async {
241+
// should check here again
242+
guard self.timer.property == nil else { return }
243+
244+
self.timer.property = Timer.scheduledTimer(withTimeInterval: self.timerInterval, repeats: true) { _ in
245+
self.flushQueue.async {
246+
if self.dataStore.count > 0 {
247+
self.flush()
248+
} else {
249+
self.stopTimer()
250+
}
251+
}
252+
}
253+
}
254+
}
255+
256+
func stopTimer() {
257+
timer.performAtomic { (timer) in
258+
timer.invalidate()
259+
}
260+
timer.property = nil
261+
}
262+
}
263+
264+
// MARK: - Notification Observers
265+
266+
extension BatchEventProcessor {
267+
268+
func addProjectChangeNotificationObservers() {
269+
observerProjectId = NotificationCenter.default.addObserver(forName: .didReceiveOptimizelyProjectIdChange, object: nil, queue: nil) { [weak self] (_) in
270+
self?.logger.d("Event flush triggered by datafile projectId change")
271+
self?.flush()
272+
}
273+
274+
observerRevision = NotificationCenter.default.addObserver(forName: .didReceiveOptimizelyRevisionChange, object: nil, queue: nil) { [weak self] (_) in
275+
self?.logger.d("Event flush triggered by datafile revision change")
276+
self?.flush()
277+
}
278+
}
279+
280+
func removeProjectChangeNotificationObservers() {
281+
if let observer = observerProjectId {
282+
NotificationCenter.default.removeObserver(observer, name: .didReceiveOptimizelyProjectIdChange, object: nil)
283+
}
284+
if let observer = observerRevision {
285+
NotificationCenter.default.removeObserver(observer, name: .didReceiveOptimizelyRevisionChange, object: nil)
286+
}
287+
}
288+
289+
// MARK: - Tests
290+
291+
open func clear() {
292+
processQueue.sync{}
293+
flush()
294+
flushQueue.sync{}
295+
}
296+
297+
open func sync() {
298+
processQueue.sync{}
299+
flushQueue.sync{}
300+
}
301+
302+
}

0 commit comments

Comments
 (0)